""" Background tasks for the API. Handles periodic maintenance, regeneration, spawning, and processing. """ import asyncio import logging import random import time from .services.constants import PVP_TURN_TIMEOUT import os import fcntl from typing import Dict, Optional # Import from API modules (not bot modules) from api import database as db from data.npcs import ( LOCATION_SPAWNS, LOCATION_DANGER, NPCS, get_random_npc_for_location, get_wandering_enemy_chance ) logger = logging.getLogger(__name__) # Lock file to ensure only one worker runs background tasks LOCK_FILE_PATH = "/tmp/echoes_background_tasks.lock" _lock_file_handle: Optional[int] = None # ============================================================================ # SPAWN MANAGER CONFIGURATION # ============================================================================ SPAWN_CHECK_INTERVAL = 120 # Check every 2 minutes ENEMY_LIFETIME = 600 # Enemies live for 10 minutes MAX_ENEMIES_PER_LOCATION = { 0: 0, # Safe zones - no wandering enemies 1: 1, # Low danger - max 1 enemy 2: 2, # Medium danger - max 2 enemies 3: 3, # High danger - max 3 enemies 4: 4, # Extreme danger - max 4 enemies } def get_danger_level(location_id: str) -> int: """Get danger level for a location.""" danger_data = LOCATION_DANGER.get(location_id, (0, 0.0, 0.0)) return danger_data[0] # ============================================================================ # BACKGROUND TASK: WANDERING ENEMY SPAWNER # ============================================================================ async def spawn_manager_loop(manager=None): """ Main spawn manager loop. Runs continuously, checking spawn conditions every SPAWN_CHECK_INTERVAL seconds. Args: manager: WebSocket ConnectionManager for broadcasting spawn events """ logger.info("๐ŸŽฒ Spawn Manager started") while True: try: await asyncio.sleep(SPAWN_CHECK_INTERVAL) # Clean up expired enemies first expired_enemies = await db.get_expired_wandering_enemies() despawned_count = await db.cleanup_expired_wandering_enemies() # Notify players in locations where enemies despawned if manager and expired_enemies: from datetime import datetime for enemy in expired_enemies: await manager.send_to_location( location_id=enemy['location_id'], message={ "type": "location_update", "data": { "message": f"A wandering enemy left the area", "action": "enemy_despawned", "enemy_id": enemy['id'] }, "timestamp": datetime.utcnow().isoformat() } ) if despawned_count > 0: logger.info(f"๐Ÿงน Cleaned up {despawned_count} expired wandering enemies") # Process each location spawned_count = 0 for location_id, spawn_table in LOCATION_SPAWNS.items(): if not spawn_table: continue # Skip locations with no spawns # Get danger level and max enemies for this location danger_level = get_danger_level(location_id) max_enemies = MAX_ENEMIES_PER_LOCATION.get(danger_level, 0) if max_enemies == 0: continue # Skip safe zones # Check current enemy count current_count = await db.get_wandering_enemy_count_in_location(location_id) if current_count >= max_enemies: continue # Location is at capacity # Calculate spawn chance based on wandering_enemy_chance spawn_chance = get_wandering_enemy_chance(location_id) # Attempt to spawn enemies up to max capacity for _ in range(max_enemies - current_count): if random.random() < spawn_chance: # Spawn an enemy npc_id = get_random_npc_for_location(location_id) if npc_id: enemy_data = await db.spawn_wandering_enemy( npc_id=npc_id, location_id=location_id, lifetime_seconds=ENEMY_LIFETIME ) if not enemy_data: logger.error(f"Failed to spawn {npc_id} at {location_id}") continue spawned_count += 1 logger.info(f"๐Ÿ‘น Spawned {npc_id} at {location_id} (current: {current_count + 1}/{max_enemies})") # Notify players in this location if manager: from datetime import datetime npc_def = NPCS.get(npc_id) npc_name_obj = npc_def.name if npc_def else npc_id.replace('_', ' ').title() # Handle localized name for the fallback message if isinstance(npc_name_obj, dict): npc_name_en = npc_name_obj.get('en', str(npc_name_obj)) else: npc_name_en = str(npc_name_obj) await manager.send_to_location( location_id=location_id, message={ "type": "location_update", "data": { "message": f"A {npc_name_en} appeared!", "action": "enemy_spawned", "npc_data": { "id": enemy_data['id'], "npc_id": npc_id, "name": npc_name_obj, "type": "enemy", "is_wandering": True, "image_path": npc_def.image_path if npc_def else None } }, "timestamp": datetime.utcnow().isoformat() } ) if spawned_count > 0: logger.info(f"โœจ Spawn cycle complete: {spawned_count} enemies spawned") except Exception as e: logger.error(f"โŒ Error in spawn manager loop: {e}", exc_info=True) # Continue running even if there's an error await asyncio.sleep(10) # ============================================================================ # BACKGROUND TASK: DROPPED ITEM DECAY # ============================================================================ async def decay_dropped_items(manager=None): """Periodically cleans up old dropped items. Args: manager: WebSocket ConnectionManager for broadcasting decay events """ logger.info("๐Ÿ—‘๏ธ Item Decay task started") while True: try: await asyncio.sleep(300) # Wait 5 minutes start_time = time.time() logger.info("Running item decay task...") # Set decay time to 1 hour (3600 seconds) decay_seconds = 3600 timestamp_limit = int(time.time()) - decay_seconds # Get expired items before removal to notify locations expired_items = await db.get_expired_dropped_items(timestamp_limit) items_removed = await db.remove_expired_dropped_items(timestamp_limit) # Group expired items by location if manager and expired_items: from datetime import datetime from collections import defaultdict items_by_location = defaultdict(int) for item in expired_items: items_by_location[item['location_id']] += 1 # Notify each location for location_id, count in items_by_location.items(): await manager.send_to_location( location_id=location_id, message={ "type": "location_update", "data": { "message": f"{count} dropped item(s) decayed", "action": "items_decayed", "count": count }, "timestamp": datetime.utcnow().isoformat() } ) elapsed = time.time() - start_time if items_removed > 0: logger.info(f"Decayed and removed {items_removed} old items in {elapsed:.2f}s") except Exception as e: logger.error(f"โŒ Error in item decay task: {e}", exc_info=True) await asyncio.sleep(10) # ============================================================================ # BACKGROUND TASK: STAMINA REGENERATION # ============================================================================ async def regenerate_stamina(manager=None): """Periodically regenerates stamina for all players. Args: manager: WebSocket ConnectionManager for notifying players """ logger.info("๐Ÿ’ช Stamina Regeneration task started") while True: try: await asyncio.sleep(300) # Wait 5 minutes start_time = time.time() logger.info("Running stamina regeneration...") updated_players = await db.regenerate_all_players_stamina() # Notify each player of their stamina regeneration if manager and updated_players: from datetime import datetime for player in updated_players: await manager.send_personal_message( player['id'], { "type": "stamina_update", "data": { "stamina": int(player['new_stamina']), "max_stamina": player['max_stamina'], "message": "Stamina regenerated" }, "timestamp": datetime.utcnow().isoformat() } ) elapsed = time.time() - start_time if updated_players: logger.info(f"Regenerated stamina for {len(updated_players)} players in {elapsed:.2f}s") # Alert if regeneration is taking too long (potential scaling issue) if elapsed > 5.0: logger.warning(f"โš ๏ธ Stamina regeneration took {elapsed:.2f}s (threshold: 5s) - check database load!") except Exception as e: logger.error(f"โŒ Error in stamina regeneration: {e}", exc_info=True) await asyncio.sleep(10) # ============================================================================ # BACKGROUND TASK: COMBAT TIMERS # ============================================================================ async def check_combat_timers(): """Checks for idle combat turns and auto-attacks.""" logger.info("โš”๏ธ Combat Timer task started") while True: try: await asyncio.sleep(30) # Wait 30 seconds start_time = time.time() # Check for combats idle for more than 5 minutes (300 seconds) idle_threshold = time.time() - 300 idle_combats = await db.get_all_idle_combats(idle_threshold) if idle_combats: logger.info(f"Processing {len(idle_combats)} idle combats...") for combat in idle_combats: try: # Only process if it's player's turn (don't double-process) if combat['turn'] != 'player': continue # Import required modules from api import game_logic from data.npcs import NPCS # Get NPC definition npc_def = NPCS.get(combat['npc_id']) if not npc_def: logger.warning(f"NPC definition not found: {combat['npc_id']}") continue # Import reduce_armor_durability from equipment router from .routers.equipment import reduce_armor_durability # NPC attacks due to timeout logger.info(f"Player {combat['character_id']} combat timed out, NPC attacking...") await game_logic.npc_attack( combat['character_id'], combat, npc_def, reduce_armor_durability ) except Exception as e: logger.error(f"Error processing idle combat: {e}") # Log performance for monitoring if idle_combats: elapsed = time.time() - start_time logger.info(f"Processed {len(idle_combats)} idle combats in {elapsed:.2f}s") # Warn if taking too long (potential scaling issue) if elapsed > 10.0: logger.warning(f"โš ๏ธ Combat timer check took {elapsed:.2f}s (threshold: 10s) - consider batching!") except Exception as e: logger.error(f"โŒ Error in combat timer check: {e}", exc_info=True) await asyncio.sleep(10) # ============================================================================ # BACKGROUND TASK: PVP COMBAT TIMERS # ============================================================================ async def check_pvp_combat_timers(manager=None): """Checks for expired PvP combat turns and auto-advances them.""" logger.info("โš”๏ธ PvP Combat Timer task started") while True: try: await asyncio.sleep(30) # Check every 30 seconds start_time = time.time() all_pvp_combats = await db.get_all_pvp_combats() processed = 0 for combat in all_pvp_combats: try: # Check if combat has already ended (fled or player dead) if combat.get('attacker_fled') or combat.get('defender_fled'): continue # Get both players to check HP attacker = await db.get_player_by_id(combat['attacker_character_id']) defender = await db.get_player_by_id(combat['defender_character_id']) if not attacker or not defender: # Player doesn't exist, clean up combat await db.end_pvp_combat(combat['id']) continue # Check if combat ended (someone died) if attacker['hp'] <= 0 or defender['hp'] <= 0: continue # Check if turn has timed out turn_timeout = combat.get('turn_timeout_seconds', PVP_TURN_TIMEOUT) # Use imported constant instead of hardcoded 300 turn_started = combat.get('turn_started_at', time.time()) time_elapsed = time.time() - turn_started if time_elapsed < turn_timeout: continue # Turn hasn't timed out yet # Turn has timed out - advance to other player current_turn = combat.get('turn', 'attacker') new_turn = 'defender' if current_turn == 'attacker' else 'attacker' logger.info(f"PvP turn timeout: combat {combat['id']} advancing from {current_turn} to {new_turn}") # Update combat with new turn await db.update_pvp_combat(combat['id'], { 'turn': new_turn, 'turn_started_at': time.time(), 'last_action': f"Turn timeout - {current_turn}'s turn skipped|{time.time()}" }) processed += 1 # Send WebSocket notifications to both players if manager: # Get updated combat data updated_combat = await db.get_pvp_combat_by_id(combat['id']) if updated_combat: # Calculate time remaining for new turn time_remaining = turn_timeout # Build combat update payload combat_update = { "type": "combat_update", "data": { "pvp_combat": { "id": updated_combat['id'], "turn": new_turn, "time_remaining": time_remaining, "turn_timeout": "skipped", "last_action": f"Turn timeout - {current_turn}'s turn skipped" }, "is_pvp": True, "message": f"โฑ๏ธ Turn skipped due to timeout!" }, "timestamp": time.time() } # Notify both players await manager.send_personal_message( combat['attacker_character_id'], combat_update ) await manager.send_personal_message( combat['defender_character_id'], combat_update ) except Exception as e: logger.error(f"Error processing PvP combat {combat.get('id')}: {e}") if processed > 0: elapsed = time.time() - start_time logger.info(f"Processed {processed} PvP combat timeouts in {elapsed:.2f}s") except Exception as e: logger.error(f"โŒ Error in PvP combat timer check: {e}", exc_info=True) await asyncio.sleep(10) # ============================================================================ # BACKGROUND TASK: INTERACTABLE COOLDOWN CLEANUP # ============================================================================ async def cleanup_interactable_cooldowns(manager=None, world_locations=None): """ Cleans up expired interactable cooldowns and notifies players. Args: manager: WebSocket ConnectionManager for broadcasting cooldown expiry world_locations: Dict of Location objects to map instance_id to location_id """ logger.info("โณ Interactable Cooldown Cleanup task started") while True: try: await asyncio.sleep(30) # Check every 30 seconds # Get expired cooldowns before removal expired_cooldowns = await db.get_expired_interactable_cooldowns() removed_count = await db.remove_expired_interactable_cooldowns() # Notify players in locations where cooldowns expired if manager and expired_cooldowns and world_locations: from datetime import datetime from collections import defaultdict # Map instance_id:action_id to location_id cooldowns_by_location = defaultdict(list) for cooldown in expired_cooldowns: instance_id = cooldown['instance_id'] action_id = cooldown['action_id'] # Find which location has this interactable for loc_id, location in world_locations.items(): for interactable in location.interactables: if interactable.id == instance_id: # Find action name action_name = action_id for action in interactable.actions: if action.id == action_id: action_name = action.label break cooldowns_by_location[loc_id].append({ 'instance_id': instance_id, 'action_id': action_id, 'name': interactable.name, 'action_name': action_name }) break # Notify each location (only if players are there) for location_id, cooldowns in cooldowns_by_location.items(): if not manager.has_players_in_location(location_id): continue # Skip if no active players for cooldown_info in cooldowns: await manager.send_to_location( location_id=location_id, message={ "type": "interactable_ready", "data": { "instance_id": cooldown_info['instance_id'], "action_id": cooldown_info['action_id'], "message": f"{cooldown_info['action_name']} is ready on {cooldown_info['name']}" }, "timestamp": datetime.utcnow().isoformat() } ) if removed_count > 0: logger.info(f"๐Ÿงน Cleaned up {removed_count} expired interactable cooldowns") except Exception as e: logger.error(f"โŒ Error in interactable cooldown cleanup: {e}", exc_info=True) await asyncio.sleep(10) # ============================================================================ # BACKGROUND TASK: CORPSE DECAY # ============================================================================ async def decay_corpses(manager=None): """Removes old corpses and empty corpses. Args: manager: WebSocket ConnectionManager for broadcasting decay events """ logger.info("๐Ÿ’€ Corpse Decay task started") while True: try: await asyncio.sleep(600) # Wait 10 minutes start_time = time.time() logger.info("Running corpse decay...") # ===== TIME-BASED DECAY ===== # Player corpses decay after 24 hours player_corpse_limit = time.time() - (24 * 3600) expired_player_corpses = await db.get_expired_player_corpses(player_corpse_limit) player_corpses_removed = await db.remove_expired_player_corpses(player_corpse_limit) # NPC corpses decay after 2 hours npc_corpse_limit = time.time() - (2 * 3600) expired_npc_corpses = await db.get_expired_npc_corpses(npc_corpse_limit) npc_corpses_removed = await db.remove_expired_npc_corpses(npc_corpse_limit) # ===== EMPTY CORPSE DECAY ===== # Empty corpses (no loot remaining) decay immediately empty_player_corpses = await db.get_empty_player_corpses() empty_player_removed = await db.remove_empty_player_corpses() empty_npc_corpses = await db.get_empty_npc_corpses() empty_npc_removed = await db.remove_empty_npc_corpses() # Combine all decayed corpses for notification all_decayed_player_corpses = expired_player_corpses + empty_player_corpses all_decayed_npc_corpses = expired_npc_corpses + empty_npc_corpses total_player_removed = player_corpses_removed + empty_player_removed total_npc_removed = npc_corpses_removed + empty_npc_removed # Notify players in locations where corpses decayed if manager: from datetime import datetime from collections import defaultdict # Group corpses by location corpses_by_location = defaultdict(lambda: {"player": 0, "npc": 0}) for corpse in all_decayed_player_corpses: corpses_by_location[corpse['location_id']]["player"] += 1 for corpse in all_decayed_npc_corpses: corpses_by_location[corpse['location_id']]["npc"] += 1 # Notify each location for location_id, counts in corpses_by_location.items(): total = counts["player"] + counts["npc"] corpse_type = "corpse" if total == 1 else "corpses" await manager.send_to_location( location_id=location_id, message={ "type": "location_update", "data": { "message": f"{total} {corpse_type} decayed", "action": "corpses_decayed", "count": total }, "timestamp": datetime.utcnow().isoformat() } ) elapsed = time.time() - start_time if total_player_removed > 0 or total_npc_removed > 0: logger.info( f"Decayed {total_player_removed} player corpses " f"({player_corpses_removed} expired, {empty_player_removed} empty) and " f"{total_npc_removed} NPC corpses " f"({npc_corpses_removed} expired, {empty_npc_removed} empty) in {elapsed:.2f}s" ) except Exception as e: logger.error(f"โŒ Error in corpse decay: {e}", exc_info=True) await asyncio.sleep(10) # ============================================================================ # BACKGROUND TASK: STATUS EFFECTS PROCESSOR # ============================================================================ async def process_status_effects(manager=None): """ Applies damage from persistent status effects. Runs every 5 minutes to process status effect ticks. Args: manager: WebSocket ConnectionManager for notifying players """ logger.info("๐Ÿฉธ Status Effects Processor started") while True: try: await asyncio.sleep(300) # Wait 5 minutes start_time = time.time() logger.info("Running status effects processor...") try: # Decrement all status effect ticks and get affected players affected_players = await db.decrement_all_status_effect_ticks() if not affected_players: elapsed = time.time() - start_time logger.info(f"No active status effects to process ({elapsed:.3f}s)") continue # Process each affected player deaths = 0 damage_dealt = 0 for player_id in affected_players: try: # Get current status effects (after decrement) effects = await db.get_player_status_effects(player_id) if not effects: continue # Calculate total damage from api.game_logic import calculate_status_damage total_damage = calculate_status_damage(effects) if total_damage > 0: damage_dealt += total_damage player = await db.get_player_by_id(player_id) if not player or player['is_dead']: continue new_hp = max(0, player['hp'] - total_damage) # Check if player died from status effects if new_hp <= 0: await db.update_player(player_id, {'hp': 0, 'is_dead': True}) deaths += 1 # Only create corpse if player has items inventory = await db.get_inventory(player_id) if inventory: import json await db.create_player_corpse( player_name=player['name'], location_id=player['location_id'], items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory]) ) logger.info(f"Created corpse for player {player['name']} with {len(inventory)} items") else: logger.info(f"Player {player['name']} died (status effects) with no items, skipping corpse creation") # Remove status effects from dead player await db.remove_all_status_effects(player_id) # Notify player of death if manager: from datetime import datetime await manager.send_personal_message( player_id, { "type": "player_died", "data": { "hp": 0, "is_dead": True, "message": "You died from status effects" }, "timestamp": datetime.utcnow().isoformat() } ) logger.info(f"Player {player['name']} (ID: {player_id}) died from status effects") else: # Apply damage and notify player await db.update_player(player_id, {'hp': new_hp}) if manager: from datetime import datetime await manager.send_personal_message( player_id, { "type": "status_effect_damage", "data": { "hp": new_hp, "max_hp": player['max_hp'], "damage": total_damage, "message": f"You took {total_damage} damage from status effects" }, "timestamp": datetime.utcnow().isoformat() } ) except Exception as e: logger.error(f"Error processing status effects for player {player_id}: {e}") elapsed = time.time() - start_time logger.info( f"Processed status effects for {len(affected_players)} players " f"({damage_dealt} total damage, {deaths} deaths) in {elapsed:.3f}s" ) # Warn if taking too long (potential scaling issue) if elapsed > 5.0: logger.warning( f"โš ๏ธ Status effects processing took {elapsed:.3f}s (threshold: 5s) " f"- {len(affected_players)} players affected" ) except Exception as e: logger.error(f"Error in status effects processor: {e}") except Exception as e: logger.error(f"โŒ Error in status effects task: {e}", exc_info=True) await asyncio.sleep(10) # ============================================================================ # TASK STARTUP FUNCTION # ============================================================================ def acquire_background_tasks_lock() -> bool: """ Try to acquire an exclusive lock for running background tasks. Only one worker across all Gunicorn processes should succeed. Returns True if lock acquired, False otherwise. """ global _lock_file_handle try: # Open lock file (create if doesn't exist) _lock_file_handle = os.open(LOCK_FILE_PATH, os.O_CREAT | os.O_RDWR) # Try to acquire exclusive, non-blocking lock fcntl.flock(_lock_file_handle, fcntl.LOCK_EX | fcntl.LOCK_NB) logger.info("๐Ÿ”’ Successfully acquired background tasks lock") return True except (IOError, OSError) as e: # Lock already held by another worker if _lock_file_handle is not None: try: os.close(_lock_file_handle) except: pass _lock_file_handle = None return False def release_background_tasks_lock(): """Release the background tasks lock.""" global _lock_file_handle if _lock_file_handle is not None: try: fcntl.flock(_lock_file_handle, fcntl.LOCK_UN) os.close(_lock_file_handle) logger.info("๐Ÿ”“ Released background tasks lock") except Exception as e: logger.error(f"Error releasing lock: {e}") finally: _lock_file_handle = None async def start_background_tasks(manager=None, world_locations=None): """ Start all background tasks. Called when the API starts up. Only runs in ONE worker (the first one to acquire the lock). Args: manager: WebSocket ConnectionManager for broadcasting events world_locations: Dict of Location objects for interactable mapping """ # Try to acquire lock - only one worker will succeed if not acquire_background_tasks_lock(): logger.info("โญ๏ธ Background tasks already running in another worker, skipping...") return [] logger.info("๐Ÿš€ Starting background tasks in this worker...") # Create tasks for all background jobs tasks = [ asyncio.create_task(spawn_manager_loop(manager)), asyncio.create_task(decay_dropped_items(manager)), asyncio.create_task(regenerate_stamina(manager)), asyncio.create_task(check_combat_timers()), asyncio.create_task(check_pvp_combat_timers(manager)), asyncio.create_task(decay_corpses(manager)), asyncio.create_task(process_status_effects(manager)), # Note: Interactable cooldowns are handled client-side with server validation ] logger.info(f"โœ… Started {len(tasks)} background tasks") return tasks async def stop_background_tasks(tasks): """Stop all background tasks and release the lock.""" if not tasks: return logger.info("๐Ÿ›‘ Shutting down background tasks...") for task in tasks: task.cancel() # Wait for tasks to finish canceling await asyncio.gather(*tasks, return_exceptions=True) # Release the lock release_background_tasks_lock() logger.info("โœ… Background tasks stopped") # ============================================================================ # MONITORING / DEBUG FUNCTIONS # ============================================================================ async def get_spawn_stats() -> Dict: """Get statistics about current spawns (for debugging/monitoring).""" all_enemies = await db.get_all_active_wandering_enemies() # Count by location location_counts = {} for enemy in all_enemies: loc = enemy['location_id'] location_counts[loc] = location_counts.get(loc, 0) + 1 return { "total_active": len(all_enemies), "by_location": location_counts, "enemies": all_enemies }