""" Standalone database module for the API. All database operations are contained here, making the API independent. """ import os from typing import Optional, List, Dict, Any from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlalchemy import ( MetaData, Table, Column, Integer, String, Boolean, ForeignKey, Float, JSON, select, insert, update, delete, and_, or_, text, UniqueConstraint ) import time import logging from . import items # Configure logging logger = logging.getLogger(__name__) # Redis manager for caching (imported globally to avoid circular imports) # Will be None if Redis is not available redis_manager = None try: from .redis_manager import redis_manager as _redis_manager redis_manager = _redis_manager except ImportError: pass # Redis not available, caching disabled # Database connection DB_USER = os.getenv("POSTGRES_USER") DB_PASS = os.getenv("POSTGRES_PASSWORD") DB_NAME = os.getenv("POSTGRES_DB") DB_HOST = os.getenv("POSTGRES_HOST", "echoes_of_the_ashes_db") DB_PORT = os.getenv("POSTGRES_PORT", "5432") DATABASE_URL = f"postgresql+psycopg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}" engine = create_async_engine( DATABASE_URL, echo=False, pool_size=20, # Increased from default 5 to support 8 workers max_overflow=30, # Allow bursts up to 50 total connections pool_timeout=30, # Wait up to 30s for connection pool_recycle=3600, # Recycle connections every hour pool_pre_ping=True # Verify connections before use ) async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) metadata = MetaData() # Define all tables # Authentication: Accounts table accounts = Table( "accounts", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("email", String(255), unique=True, nullable=False), Column("password_hash", String(255), nullable=True), # NULL for Steam/OAuth Column("steam_id", String(255), unique=True, nullable=True), Column("account_type", String(20), default="web"), # 'web' or 'steam' Column("premium_expires_at", Float, nullable=True), # NULL = lifetime premium Column("email_verified", Boolean, default=False), Column("email_verification_token", String(255), nullable=True), Column("password_reset_token", String(255), nullable=True), Column("password_reset_expires", Float, nullable=True), Column("created_at", Float, default=lambda: time.time()), Column("last_login_at", Float, nullable=True), ) # Gameplay: Characters table characters = Table( "characters", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("account_id", Integer, ForeignKey("accounts.id", ondelete="CASCADE"), nullable=False), Column("name", String(100), unique=True, nullable=False), # Character name (unique across all) Column("avatar_data", String, nullable=True), # JSON for avatar customization # RPG Stats Column("level", Integer, default=1), Column("xp", Integer, default=0), Column("hp", Integer, default=100), Column("max_hp", Integer, default=100), Column("stamina", Integer, default=100), Column("max_stamina", Integer, default=100), # Base Attributes Column("strength", Integer, default=0), Column("agility", Integer, default=0), Column("endurance", Integer, default=0), Column("intellect", Integer, default=0), Column("unspent_points", Integer, default=20), # Initial stat points to allocate # Game State Column("location_id", String, default="cabin"), Column("is_dead", Boolean, default=False), Column("last_movement_time", Float, default=0), # Timestamps Column("created_at", Float, default=lambda: time.time()), Column("last_played_at", Float, default=lambda: time.time()), ) # DEPRECATED: Old players table (kept temporarily for reference) players = Table( "players", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("username", String(50), unique=True, nullable=True), # For web users Column("password_hash", String(255), nullable=True), # For web users Column("name", String, default="Survivor"), Column("hp", Integer, default=100), Column("max_hp", Integer, default=100), Column("stamina", Integer, default=20), Column("max_stamina", Integer, default=20), Column("strength", Integer, default=5), Column("agility", Integer, default=5), Column("endurance", Integer, default=5), Column("intellect", Integer, default=5), Column("location_id", String, default="start_point"), Column("is_dead", Boolean, default=False), Column("level", Integer, default=1), Column("xp", Integer, default=0), Column("unspent_points", Integer, default=0), Column("last_movement_time", Float, default=0), # Timestamp of last movement for cooldown ) # Unique items table - single source of truth for individual item instances unique_items = Table( "unique_items", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("item_id", String, nullable=False), # References item template in items.json Column("durability", Integer, nullable=True), Column("max_durability", Integer, nullable=True), Column("tier", Integer, default=1), Column("unique_stats", JSON, nullable=True), Column("created_at", Float, default=lambda: time.time()), ) inventory = Table( "inventory", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("character_id", Integer, ForeignKey("characters.id", ondelete="CASCADE")), Column("item_id", String), # For stackable items Column("quantity", Integer, default=1), Column("is_equipped", Boolean, default=False), Column("unique_item_id", Integer, ForeignKey("unique_items.id", ondelete="CASCADE"), nullable=True), # For unique items # Old columns kept for backward compatibility (can be removed in future) Column("durability", Integer, nullable=True), Column("max_durability", Integer, nullable=True), Column("tier", Integer, nullable=True), Column("unique_stats", JSON, nullable=True), ) dropped_items = Table( "dropped_items", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("item_id", String), # For stackable items Column("quantity", Integer, default=1), Column("location_id", String), Column("drop_timestamp", Float), Column("unique_item_id", Integer, ForeignKey("unique_items.id", ondelete="CASCADE"), nullable=True), # For unique items # Old columns kept for backward compatibility (can be removed in future) Column("durability", Integer, nullable=True), Column("max_durability", Integer, nullable=True), Column("tier", Integer, default=1), Column("unique_stats", JSON, nullable=True), ) active_combats = Table( "active_combats", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("character_id", Integer, ForeignKey("characters.id", ondelete="CASCADE"), unique=True), Column("npc_id", String, nullable=False), Column("npc_hp", Integer, nullable=False), Column("npc_max_hp", Integer, nullable=False), Column("turn", String, nullable=False), # "player" or "npc" Column("turn_started_at", Float, nullable=False), Column("player_status_effects", String, default=""), Column("npc_status_effects", String, default=""), Column("location_id", String, nullable=False), Column("from_wandering_enemy", Boolean, default=False), Column("npc_intent", String, default="attack"), ) pvp_combats = Table( "pvp_combats", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("attacker_character_id", Integer, ForeignKey("characters.id", ondelete="CASCADE"), nullable=False), Column("defender_character_id", Integer, ForeignKey("characters.id", ondelete="CASCADE"), nullable=False), Column("turn", String, nullable=False), # "attacker" or "defender" Column("turn_started_at", Float, nullable=False), Column("turn_timeout_seconds", Integer, default=300), # 5 minutes default Column("location_id", String, nullable=False), Column("created_at", Float, nullable=False), Column("attacker_fled", Boolean, default=False), Column("defender_fled", Boolean, default=False), Column("last_action", String, nullable=True), # Last combat action message Column("attacker_acknowledged", Boolean, default=False), # Has attacker acknowledged combat end Column("defender_acknowledged", Boolean, default=False), # Has defender acknowledged combat end ) player_corpses = Table( "player_corpses", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("player_name", String, nullable=False), Column("location_id", String, nullable=False), Column("items", String, nullable=False), # JSON string Column("death_timestamp", Float, nullable=False), ) npc_corpses = Table( "npc_corpses", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("npc_id", String, nullable=False), Column("location_id", String, nullable=False), Column("loot_remaining", String, nullable=False), Column("death_timestamp", Float, nullable=False), ) interactable_cooldowns = Table( "interactable_cooldowns", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("interactable_instance_id", String, nullable=False), Column("action_id", String, nullable=False), Column("expiry_timestamp", Float, nullable=False), # Composite unique constraint: same interactable + action can only have one cooldown UniqueConstraint("interactable_instance_id", "action_id", name="uix_interactable_action") ) wandering_enemies = Table( "wandering_enemies", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("npc_id", String, nullable=False), Column("location_id", String, nullable=False), Column("spawn_timestamp", Float, nullable=False), Column("despawn_timestamp", Float, nullable=False), ) image_cache = Table( "image_cache", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("image_path", String, nullable=False, unique=True), Column("telegram_file_id", String, nullable=False), Column("uploaded_at", Float, nullable=False), ) player_status_effects = Table( "player_status_effects", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("character_id", Integer, ForeignKey("characters.id", ondelete="CASCADE"), nullable=False), Column("effect_name", String(50), nullable=False), Column("effect_icon", String(10), nullable=False), Column("damage_per_tick", Integer, nullable=False, default=0), Column("ticks_remaining", Integer, nullable=False), Column("applied_at", Float, nullable=False), ) player_statistics = Table( "player_statistics", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("character_id", Integer, ForeignKey("characters.id", ondelete="CASCADE"), nullable=False, unique=True), Column("distance_walked", Integer, default=0), # Number of location moves Column("enemies_killed", Integer, default=0), Column("damage_dealt", Integer, default=0), Column("damage_taken", Integer, default=0), Column("hp_restored", Integer, default=0), Column("stamina_used", Integer, default=0), Column("stamina_restored", Integer, default=0), Column("items_collected", Integer, default=0), Column("items_dropped", Integer, default=0), Column("items_used", Integer, default=0), Column("deaths", Integer, default=0), Column("successful_flees", Integer, default=0), Column("failed_flees", Integer, default=0), Column("combats_initiated", Integer, default=0), # PvP Statistics Column("pvp_combats_initiated", Integer, default=0), Column("pvp_combats_won", Integer, default=0), Column("pvp_combats_lost", Integer, default=0), Column("pvp_damage_dealt", Integer, default=0), Column("pvp_damage_taken", Integer, default=0), Column("players_killed", Integer, default=0), Column("pvp_deaths", Integer, default=0), Column("pvp_successful_flees", Integer, default=0), Column("pvp_failed_flees", Integer, default=0), Column("pvp_attacks_landed", Integer, default=0), Column("pvp_attacks_received", Integer, default=0), Column("total_playtime", Integer, default=0), # Seconds Column("last_activity", Float, nullable=True), Column("created_at", Float, nullable=False), ) # Database session context manager class DatabaseSession: """Context manager for database sessions""" def __init__(self): self.session: Optional[AsyncSession] = None async def __aenter__(self): self.session = async_session_maker() return self.session async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: if exc_type is not None: await self.session.rollback() else: await self.session.commit() await self.session.close() # Initialize database async def init_db(): """Create all tables and indexes if they don't exist""" async with engine.begin() as conn: await conn.run_sync(metadata.create_all) # Create performance indexes # These indexes significantly improve query performance on frequently accessed columns indexes = [ # Players table - most commonly queried "CREATE INDEX IF NOT EXISTS idx_players_username ON players(username);", "CREATE INDEX IF NOT EXISTS idx_players_location_id ON players(location_id);", # Dropped items - queried on every location view "CREATE INDEX IF NOT EXISTS idx_dropped_items_location ON dropped_items(location_id);", # Wandering enemies - checked frequently "CREATE INDEX IF NOT EXISTS idx_wandering_enemies_location ON wandering_enemies(location_id);", "CREATE INDEX IF NOT EXISTS idx_wandering_enemies_despawn ON wandering_enemies(despawn_timestamp);", # Inventory - queried on every inventory operation (character_id instead of player_id) "CREATE INDEX IF NOT EXISTS idx_inventory_character_item ON inventory(character_id, item_id);", "CREATE INDEX IF NOT EXISTS idx_inventory_character ON inventory(character_id);", # Active combats - checked on most actions (character_id instead of player_id) "CREATE INDEX IF NOT EXISTS idx_active_combats_character ON active_combats(character_id);", # Interactable cooldowns - checked on interact attempts "CREATE INDEX IF NOT EXISTS idx_interactable_cooldowns_instance ON interactable_cooldowns(interactable_instance_id);", ] for index_sql in indexes: await conn.execute(text(index_sql)) async def get_player_by_username(username: str) -> Optional[Dict[str, Any]]: """Get player by username (web users)""" async with DatabaseSession() as session: result = await session.execute( select(players).where(players.c.username == username) ) row = result.first() return dict(row._mapping) if row else None async def get_players_in_location(location_id: str) -> List[Dict[str, Any]]: """Get all players currently in a specific location""" async with DatabaseSession() as session: result = await session.execute( select(characters).where(characters.c.location_id == location_id) ) rows = result.fetchall() return [dict(row._mapping) for row in rows] async def create_player( username: Optional[str] = None, password_hash: Optional[str] = None, name: str = "Survivor" ) -> Dict[str, Any]: """Create a new player""" async with DatabaseSession() as session: stmt = insert(players).values( username=username, password_hash=password_hash, name=name, hp=100, max_hp=100, stamina=20, max_stamina=20, strength=5, agility=5, endurance=5, intellect=5, location_id="start_point", is_dead=False, level=1, xp=0, unspent_points=0, ).returning(players) result = await session.execute(stmt) row = result.first() await session.commit() return dict(row._mapping) if row else None async def update_player_location(player_id: int, location_id: str) -> bool: """Update player location""" return await update_player(player_id, location_id=location_id) async def update_player_hp(player_id: int, hp: int) -> bool: """Update player HP""" return await update_player(player_id, hp=hp) async def update_player_stamina(player_id: int, stamina: int) -> bool: """Update player stamina""" return await update_player(player_id, stamina=stamina) # ======================================================================== # NEW ACCOUNT AND CHARACTER OPERATIONS # ======================================================================== # Account operations async def get_account_by_email(email: str) -> Optional[Dict[str, Any]]: """Get account by email""" async with DatabaseSession() as session: result = await session.execute( select(accounts).where(accounts.c.email == email) ) row = result.first() return dict(row._mapping) if row else None async def get_account_by_id(account_id: int) -> Optional[Dict[str, Any]]: """Get account by ID""" async with DatabaseSession() as session: result = await session.execute( select(accounts).where(accounts.c.id == account_id) ) row = result.first() return dict(row._mapping) if row else None async def get_account_by_steam_id(steam_id: str) -> Optional[Dict[str, Any]]: """Get account by Steam ID""" async with DatabaseSession() as session: result = await session.execute( select(accounts).where(accounts.c.steam_id == steam_id) ) row = result.first() return dict(row._mapping) if row else None async def create_account( email: str, password_hash: Optional[str] = None, steam_id: Optional[str] = None, account_type: str = "web" ) -> Dict[str, Any]: """Create a new account""" async with DatabaseSession() as session: stmt = insert(accounts).values( email=email, password_hash=password_hash, steam_id=steam_id, account_type=account_type, premium_expires_at=999999999999, # NULL = free tier / premium by default for testing email_verified=False, created_at=time.time(), last_login_at=time.time(), ).returning(accounts) result = await session.execute(stmt) row = result.first() await session.commit() return dict(row._mapping) if row else None async def update_account(account_id: int, **kwargs) -> bool: """Update account fields""" async with DatabaseSession() as session: stmt = update(accounts).where(accounts.c.id == account_id).values(**kwargs) await session.execute(stmt) await session.commit() return True async def update_account_last_login(account_id: int) -> bool: """Update account last login timestamp""" return await update_account(account_id, last_login_at=time.time()) async def update_account_email(account_id: int, new_email: str) -> bool: """Update account email address""" # Check if email is already in use by another account existing = await get_account_by_email(new_email) if existing and existing['id'] != account_id: raise ValueError("Email already in use by another account") return await update_account(account_id, email=new_email) async def update_account_password(account_id: int, password_hash: str) -> bool: """Update account password hash""" return await update_account(account_id, password_hash=password_hash) # Character operations async def get_character_by_id(character_id: int) -> Optional[Dict[str, Any]]: """Get character by ID""" async with DatabaseSession() as session: result = await session.execute( select(characters).where(characters.c.id == character_id) ) row = result.first() return dict(row._mapping) if row else None async def get_character_by_name(name: str) -> Optional[Dict[str, Any]]: """Get character by name (unique check)""" async with DatabaseSession() as session: result = await session.execute( select(characters).where(characters.c.name == name) ) row = result.first() return dict(row._mapping) if row else None async def get_characters_by_account_id(account_id: int) -> List[Dict[str, Any]]: """Get all characters for an account""" async with DatabaseSession() as session: result = await session.execute( select(characters) .where(characters.c.account_id == account_id) .order_by(characters.c.last_played_at.desc()) ) rows = result.fetchall() return [dict(row._mapping) for row in rows] async def get_characters_in_location(location_id: str) -> List[Dict[str, Any]]: """Get all characters currently in a specific location""" async with DatabaseSession() as session: result = await session.execute( select(characters).where(characters.c.location_id == location_id) ) rows = result.fetchall() return [dict(row._mapping) for row in rows] async def create_character( account_id: int, name: str, strength: int = 0, agility: int = 0, endurance: int = 0, intellect: int = 0, avatar_data: Optional[str] = None ) -> Dict[str, Any]: """Create a new character""" # Calculate derived stats based on attributes hp = 30 + (strength * 2) stamina = 20 + (endurance * 1) async with DatabaseSession() as session: stmt = insert(characters).values( account_id=account_id, name=name, avatar_data=avatar_data, level=1, xp=0, hp=hp, max_hp=hp, stamina=stamina, max_stamina=stamina, strength=strength, agility=agility, endurance=endurance, intellect=intellect, unspent_points=0, # All points allocated at creation location_id="start_point", is_dead=False, last_movement_time=0, created_at=time.time(), last_played_at=time.time(), ).returning(characters) result = await session.execute(stmt) row = result.first() await session.commit() return dict(row._mapping) if row else None async def update_character(character_id: int, **kwargs) -> bool: """Update character fields""" async with DatabaseSession() as session: stmt = update(characters).where(characters.c.id == character_id).values(**kwargs) await session.execute(stmt) await session.commit() return True async def update_character_last_played(character_id: int) -> bool: """Update character last played timestamp""" return await update_character(character_id, last_played_at=time.time()) # Backward compatibility alias async def update_player(player_id: int, **kwargs) -> bool: """Alias for update_character for backward compatibility""" return await update_character(player_id, **kwargs) # Backward compatibility alias async def get_player_by_id(player_id: int) -> dict: """Alias for get_character_by_id for backward compatibility""" return await get_character_by_id(player_id) async def delete_character(character_id: int) -> bool: """Delete a character (CASCADE will handle related data)""" async with DatabaseSession() as session: stmt = delete(characters).where(characters.c.id == character_id) await session.execute(stmt) await session.commit() return True async def count_account_characters(account_id: int) -> int: """Count how many characters an account has""" async with DatabaseSession() as session: result = await session.execute( select(characters.c.id).where(characters.c.account_id == account_id) ) return len(result.fetchall()) async def is_premium_account(account_id: int) -> bool: """Check if account has premium (premium_expires_at is NULL or in future)""" account = await get_account_by_id(account_id) if not account: return False premium_expires = account.get('premium_expires_at') # NULL means lifetime premium if premium_expires is None: return False # Free tier (we changed logic: NULL = free, timestamp = trial end) # Check if premium hasn't expired return premium_expires > time.time() async def can_create_character(account_id: int) -> tuple[bool, str]: """Check if account can create a new character""" char_count = await count_account_characters(account_id) is_premium = await is_premium_account(account_id) max_chars = 10 if is_premium else 1 if char_count >= max_chars: if is_premium: return False, f"Maximum {max_chars} characters reached" else: return False, "Free accounts can only have 1 character. Upgrade to premium for 10 character slots!" return True, "" # ======================================================================== # END NEW ACCOUNT AND CHARACTER OPERATIONS # ======================================================================== # Inventory operations # NOTE: Functions below use 'player_id' parameter name for backward compatibility # but internally map to 'character_id' in the new schema async def get_inventory(player_id: int) -> List[Dict[str, Any]]: """ Get character inventory (player_id maps to character_id). Uses Redis cache if available for better performance. """ # Try Redis cache first if redis_manager and redis_manager.redis_client: try: cached = await redis_manager.get_cached_inventory(player_id) if cached is not None: return cached except Exception as e: logger.warning(f"Redis cache read failed for inventory {player_id}: {e}") # Cache miss or Redis unavailable - query database async with DatabaseSession() as session: result = await session.execute( select(inventory).where(inventory.c.character_id == player_id) ) inventory_data = [dict(row._mapping) for row in result.fetchall()] # Cache the result if redis_manager and redis_manager.redis_client: try: await redis_manager.cache_inventory(player_id, inventory_data) except Exception as e: logger.warning(f"Redis cache write failed for inventory {player_id}: {e}") return inventory_data async def clear_inventory(player_id: int) -> bool: """Clear all items from a character's inventory (used when creating corpse)""" async with DatabaseSession() as session: stmt = delete(inventory).where(inventory.c.character_id == player_id) await session.execute(stmt) await session.commit() # Invalidate cache if redis_manager and redis_manager.redis_client: try: await redis_manager.invalidate_inventory(player_id) except Exception as e: logger.warning(f"Redis cache invalidation failed for inventory {player_id}: {e}") return True async def add_item_to_inventory( player_id: int, item_id: str, quantity: int = 1, unique_item_id: Optional[int] = None, # Reference to existing unique_item durability: Optional[int] = None, # For creating new unique items max_durability: Optional[int] = None, tier: Optional[int] = None, unique_stats: Optional[Dict[str, Any]] = None ) -> bool: """ Add item to inventory. For unique items: Either pass unique_item_id (existing) or durability/tier/stats (create new) For stackable items: Just pass item_id and quantity """ async with DatabaseSession() as session: # Determine if this is a unique item is_unique = unique_item_id is not None or any([durability is not None, tier is not None, unique_stats is not None]) if is_unique: # Create unique_item if needed if unique_item_id is None: unique_item_id = await create_unique_item( item_id=item_id, durability=durability, max_durability=max_durability, tier=tier, unique_stats=unique_stats ) # Insert inventory row referencing the unique_item stmt = insert(inventory).values( character_id=player_id, item_id=item_id, quantity=1, # Unique items are always quantity 1 is_equipped=False, unique_item_id=unique_item_id ) else: # Stackable items - check if item already exists result = await session.execute( select(inventory).where( and_( inventory.c.character_id == player_id, inventory.c.item_id == item_id, inventory.c.unique_item_id.is_(None) # Only stack with other stackable items ) ) ) existing = result.first() if existing: # Update quantity stmt = update(inventory).where( inventory.c.id == existing.id ).values(quantity=existing.quantity + quantity) else: # Insert new item stmt = insert(inventory).values( character_id=player_id, item_id=item_id, quantity=quantity, is_equipped=False ) await session.execute(stmt) await session.commit() # Invalidate cache if redis_manager and redis_manager.redis_client: try: await redis_manager.invalidate_inventory(player_id) except Exception as e: logger.warning(f"Redis cache invalidation failed for inventory {player_id}: {e}") return True # Combat operations async def get_active_combat(player_id: int) -> Optional[Dict[str, Any]]: """Get active combat for player""" async with DatabaseSession() as session: result = await session.execute( select(active_combats).where(active_combats.c.character_id == player_id) ) row = result.first() return dict(row._mapping) if row else None async def create_combat(player_id: int, npc_id: str, npc_hp: int, npc_max_hp: int, location_id: str, from_wandering: bool = False, npc_intent: str = 'attack') -> Dict[str, Any]: """Create a new combat encounter""" async with DatabaseSession() as session: stmt = insert(active_combats).values( character_id=player_id, npc_id=npc_id, npc_hp=npc_hp, npc_max_hp=npc_max_hp, turn="player", turn_started_at=time.time(), player_status_effects="", npc_status_effects="", location_id=location_id, from_wandering_enemy=from_wandering, npc_intent=npc_intent ).returning(active_combats) result = await session.execute(stmt) row = result.first() await session.commit() return dict(row._mapping) if row else None async def update_combat(player_id: int, updates: dict) -> bool: """Update combat state for player""" async with DatabaseSession() as session: stmt = update(active_combats).where( active_combats.c.character_id == player_id ).values(**updates) await session.execute(stmt) await session.commit() return True async def end_combat(player_id: int) -> bool: """End combat for player""" async with DatabaseSession() as session: stmt = delete(active_combats).where(active_combats.c.character_id == player_id) await session.execute(stmt) await session.commit() return True # PvP Combat Functions async def create_pvp_combat(attacker_id: int, defender_id: int, location_id: str, turn_timeout: int = 300) -> dict: """Create a new PvP combat. First turn goes to defender.""" async with DatabaseSession() as session: stmt = insert(pvp_combats).values( attacker_character_id=attacker_id, defender_character_id=defender_id, turn='defender', # Defender goes first turn_started_at=time.time(), turn_timeout_seconds=turn_timeout, location_id=location_id, created_at=time.time(), attacker_fled=False, defender_fled=False ).returning(pvp_combats.c.id) result = await session.execute(stmt) await session.commit() # Return the created combat combat_id = result.scalar_one() return await get_pvp_combat_by_id(combat_id) async def get_pvp_combat_by_player(player_id: int) -> dict: """Get PvP combat involving a player (as attacker or defender)""" async with DatabaseSession() as session: stmt = select(pvp_combats).where( or_( pvp_combats.c.attacker_character_id == player_id, pvp_combats.c.defender_character_id == player_id ) ) result = await session.execute(stmt) row = result.fetchone() return dict(row._mapping) if row else None async def get_pvp_combat_by_id(combat_id: int) -> dict: """Get PvP combat by ID""" async with DatabaseSession() as session: stmt = select(pvp_combats).where(pvp_combats.c.id == combat_id) result = await session.execute(stmt) row = result.fetchone() return dict(row._mapping) if row else None async def update_pvp_combat(combat_id: int, updates: dict) -> bool: """Update PvP combat""" async with DatabaseSession() as session: stmt = update(pvp_combats).where(pvp_combats.c.id == combat_id).values(**updates) await session.execute(stmt) await session.commit() return True async def end_pvp_combat(combat_id: int) -> bool: """Mark PvP combat as ended (don't delete yet - wait for acknowledgment)""" # Combat is marked as ended via attacker_fled or defender_fled flags # or by HP reaching 0. Don't delete until both players acknowledge. return True async def acknowledge_pvp_combat(combat_id: int, player_id: int) -> bool: """Acknowledge PvP combat end. Delete if both players acknowledged.""" async with DatabaseSession() as session: # Get the combat result = await session.execute( select(pvp_combats).where(pvp_combats.c.id == combat_id) ) combat = result.fetchone() if not combat: return False # Determine if player is attacker or defender is_attacker = combat.attacker_character_id == player_id # Mark as acknowledged if is_attacker: stmt = update(pvp_combats).where(pvp_combats.c.id == combat_id).values( attacker_acknowledged=True ) else: stmt = update(pvp_combats).where(pvp_combats.c.id == combat_id).values( defender_acknowledged=True ) await session.execute(stmt) await session.commit() # Check if both acknowledged - if so, delete the combat result = await session.execute( select(pvp_combats).where(pvp_combats.c.id == combat_id) ) combat = result.fetchone() if combat and combat.attacker_acknowledged and combat.defender_acknowledged: stmt = delete(pvp_combats).where(pvp_combats.c.id == combat_id) await session.execute(stmt) await session.commit() return True async def get_all_pvp_combats() -> list: """Get all active PvP combats""" async with DatabaseSession() as session: result = await session.execute(select(pvp_combats)) return [dict(row._mapping) for row in result.fetchall()] # Interactable cooldowns async def set_interactable_cooldown(instance_id: str, action_id: str, cooldown_seconds: int) -> bool: """Set cooldown for a specific interactable action""" async with DatabaseSession() as session: expiry = time.time() + cooldown_seconds # Check if cooldown exists for this specific action result = await session.execute( select(interactable_cooldowns).where( and_( interactable_cooldowns.c.interactable_instance_id == instance_id, interactable_cooldowns.c.action_id == action_id ) ) ) existing = result.first() if existing: stmt = update(interactable_cooldowns).where( and_( interactable_cooldowns.c.interactable_instance_id == instance_id, interactable_cooldowns.c.action_id == action_id ) ).values(expiry_timestamp=expiry) else: stmt = insert(interactable_cooldowns).values( interactable_instance_id=instance_id, action_id=action_id, expiry_timestamp=expiry ) await session.execute(stmt) await session.commit() return True async def get_interactable_cooldown(instance_id: str, action_id: str) -> Optional[float]: """Get cooldown expiry timestamp for a specific interactable action""" async with DatabaseSession() as session: result = await session.execute( select(interactable_cooldowns).where( and_( interactable_cooldowns.c.interactable_instance_id == instance_id, interactable_cooldowns.c.action_id == action_id ) ) ) row = result.first() if row and row.expiry_timestamp > time.time(): return row.expiry_timestamp return None async def get_expired_interactable_cooldowns() -> List[Dict[str, Any]]: """Get all interactable cooldowns that have expired""" async with DatabaseSession() as session: current_time = time.time() result = await session.execute( select(interactable_cooldowns).where( interactable_cooldowns.c.expiry_timestamp <= current_time ) ) rows = result.fetchall() return [ { "instance_id": row.interactable_instance_id, "action_id": row.action_id, "expiry_timestamp": row.expiry_timestamp } for row in rows ] async def remove_expired_interactable_cooldowns() -> int: """Remove all expired interactable cooldowns and return count removed""" async with DatabaseSession() as session: current_time = time.time() stmt = delete(interactable_cooldowns).where( interactable_cooldowns.c.expiry_timestamp <= current_time ) result = await session.execute(stmt) await session.commit() return result.rowcount # Dropped items async def get_dropped_items(location_id: str) -> List[Dict[str, Any]]: """Get all dropped items at a location""" async with DatabaseSession() as session: result = await session.execute( select(dropped_items).where(dropped_items.c.location_id == location_id) ) return [dict(row._mapping) for row in result.fetchall()] async def add_dropped_item( location_id: str, item_id: str, quantity: int = 1, unique_item_id: Optional[int] = None ) -> bool: """Add a dropped item to a location (references unique_item if applicable)""" async with DatabaseSession() as session: # If this is a unique item, NEVER stack it - always create a new row if unique_item_id is not None: stmt = insert(dropped_items).values( item_id=item_id, quantity=1, # Unique items are always quantity 1 location_id=location_id, drop_timestamp=time.time(), unique_item_id=unique_item_id ) else: # For stackable items, try to stack with existing items in the same location result = await session.execute( select(dropped_items).where( and_( dropped_items.c.item_id == item_id, dropped_items.c.location_id == location_id, dropped_items.c.unique_item_id.is_(None) # Only stack with other stackable items ) ) ) existing = result.first() if existing: # Stack with existing item stmt = update(dropped_items).where( dropped_items.c.id == existing.id ).values( quantity=existing.quantity + quantity, drop_timestamp=time.time() ) else: # Create new stack stmt = insert(dropped_items).values( item_id=item_id, quantity=quantity, location_id=location_id, drop_timestamp=time.time(), unique_item_id=None ) await session.execute(stmt) await session.commit() return True async def remove_item_from_inventory(player_id: int, item_id: str, quantity: int = 1) -> bool: """Remove item from inventory (for stackable items only)""" async with DatabaseSession() as session: # Get current item (only stackable items - no unique_item_id) result = await session.execute( select(inventory).where( and_( inventory.c.character_id == player_id, inventory.c.item_id == item_id, inventory.c.unique_item_id.is_(None) # Only target stackable items ) ) ) existing = result.first() if not existing: return False if existing.quantity <= quantity: # Remove item completely stmt = delete(inventory).where(inventory.c.id == existing.id) else: # Decrease quantity stmt = update(inventory).where(inventory.c.id == existing.id).values( quantity=existing.quantity - quantity ) await session.execute(stmt) await session.commit() # Invalidate cache if redis_manager and redis_manager.redis_client: try: await redis_manager.invalidate_inventory(player_id) except Exception as e: logger.warning(f"Redis cache invalidation failed for inventory {player_id}: {e}") return True async def remove_inventory_row(inventory_id: int) -> bool: """Remove a specific inventory row by ID (for unique items)""" # Get player_id before deleting for cache invalidation async with DatabaseSession() as session: result = await session.execute( select(inventory.c.character_id).where(inventory.c.id == inventory_id) ) row = result.first() player_id = row[0] if row else None stmt = delete(inventory).where(inventory.c.id == inventory_id) await session.execute(stmt) await session.commit() # Invalidate cache if player_id and redis_manager and redis_manager.redis_client: try: await redis_manager.invalidate_inventory(player_id) except Exception as e: logger.warning(f"Redis cache invalidation failed for inventory {player_id}: {e}") return True async def update_item_equipped_status(player_id: int, item_id: str, is_equipped: bool) -> bool: """Update item equipped status""" async with DatabaseSession() as session: stmt = update(inventory).where( and_( inventory.c.character_id == player_id, inventory.c.item_id == item_id ) ).values(is_equipped=is_equipped) await session.execute(stmt) await session.commit() return True async def get_inventory_item(item_db_id: int) -> Optional[Dict[str, Any]]: """Get a specific inventory item by database ID""" async with DatabaseSession() as session: stmt = select(inventory).where(inventory.c.id == item_db_id) result = await session.execute(stmt) row = result.first() return dict(row._mapping) if row else None # ============= DROPPED ITEMS ============= async def drop_item_to_world( item_id: str, quantity: int, location_id: str, unique_item_id: Optional[int] = None ) -> bool: """Drop an item to the world at a location (references unique_item if applicable)""" async with DatabaseSession() as session: stmt = insert(dropped_items).values( item_id=item_id, quantity=quantity, location_id=location_id, drop_timestamp=time.time(), unique_item_id=unique_item_id ) await session.execute(stmt) await session.commit() return True async def get_dropped_item(dropped_item_id: int) -> Optional[Dict[str, Any]]: """Get a specific dropped item by ID""" async with DatabaseSession() as session: stmt = select(dropped_items).where(dropped_items.c.id == dropped_item_id) result = await session.execute(stmt) row = result.first() return dict(row._mapping) if row else None async def get_dropped_items_in_location(location_id: str) -> List[Dict[str, Any]]: """Get all dropped items in a specific location""" async with DatabaseSession() as session: stmt = select(dropped_items).where(dropped_items.c.location_id == location_id) result = await session.execute(stmt) return [dict(row._mapping) for row in result.all()] async def update_dropped_item(dropped_item_id: int, quantity: int) -> bool: """Update dropped item quantity""" async with DatabaseSession() as session: stmt = update(dropped_items).where( dropped_items.c.id == dropped_item_id ).values(quantity=quantity) await session.execute(stmt) await session.commit() return True async def remove_dropped_item(dropped_item_id: int) -> bool: """Remove a dropped item from the world""" async with DatabaseSession() as session: stmt = delete(dropped_items).where(dropped_items.c.id == dropped_item_id) await session.execute(stmt) await session.commit() return True async def update_dropped_item_quantity(dropped_item_id: int, new_quantity: int) -> bool: """Update the quantity of a dropped item""" async with DatabaseSession() as session: stmt = update(dropped_items).where( dropped_items.c.id == dropped_item_id ).values(quantity=new_quantity) await session.execute(stmt) await session.commit() return True # ============= CORPSES ============= async def create_player_corpse(player_name: str, location_id: str, items: str) -> int: """Create a player corpse with items""" logger.info(f"DB: Creating player corpse - player_name={player_name}, location_id={location_id}, items_length={len(items)}") async with DatabaseSession() as session: try: stmt = insert(player_corpses).values( player_name=player_name, location_id=location_id, items=items, death_timestamp=time.time() ).returning(player_corpses.c.id) result = await session.execute(stmt) corpse_id = result.scalar() await session.commit() logger.info(f"DB: Successfully created player corpse with ID={corpse_id}") return corpse_id except Exception as e: logger.error(f"DB: Failed to create player corpse - player_name={player_name}, error={e}", exc_info=True) raise async def get_player_corpse(corpse_id: int) -> Optional[Dict[str, Any]]: """Get a player corpse by ID""" async with DatabaseSession() as session: stmt = select(player_corpses).where(player_corpses.c.id == corpse_id) result = await session.execute(stmt) row = result.first() return dict(row._mapping) if row else None async def update_player_corpse(corpse_id: int, items: str) -> bool: """Update player corpse items""" async with DatabaseSession() as session: stmt = update(player_corpses).where( player_corpses.c.id == corpse_id ).values(items=items) await session.execute(stmt) await session.commit() return True async def remove_player_corpse(corpse_id: int) -> bool: """Remove a player corpse""" async with DatabaseSession() as session: stmt = delete(player_corpses).where(player_corpses.c.id == corpse_id) await session.execute(stmt) await session.commit() return True async def create_npc_corpse(npc_id: str, location_id: str, loot_remaining: str) -> int: """Create an NPC corpse with loot""" async with DatabaseSession() as session: stmt = insert(npc_corpses).values( npc_id=npc_id, location_id=location_id, loot_remaining=loot_remaining, death_timestamp=time.time() ).returning(npc_corpses.c.id) result = await session.execute(stmt) corpse_id = result.scalar() await session.commit() return corpse_id async def get_npc_corpse(corpse_id: int) -> Optional[Dict[str, Any]]: """Get an NPC corpse by ID""" async with DatabaseSession() as session: stmt = select(npc_corpses).where(npc_corpses.c.id == corpse_id) result = await session.execute(stmt) row = result.first() return dict(row._mapping) if row else None async def update_npc_corpse(corpse_id: int, loot_remaining: str) -> bool: """Update NPC corpse loot""" async with DatabaseSession() as session: stmt = update(npc_corpses).where( npc_corpses.c.id == corpse_id ).values(loot_remaining=loot_remaining) await session.execute(stmt) await session.commit() return True async def remove_npc_corpse(corpse_id: int) -> bool: """Remove an NPC corpse""" async with DatabaseSession() as session: stmt = delete(npc_corpses).where(npc_corpses.c.id == corpse_id) await session.execute(stmt) await session.commit() return True async def get_npc_corpses_in_location(location_id: str) -> list: """Get all NPC corpses at a location, sorted by death_timestamp (newest first)""" async with DatabaseSession() as session: stmt = select(npc_corpses).where(npc_corpses.c.location_id == location_id).order_by(npc_corpses.c.death_timestamp.desc()) result = await session.execute(stmt) rows = result.fetchall() return [dict(row._mapping) for row in rows] async def get_player_corpses_in_location(location_id: str) -> list: """Get all player corpses at a location, sorted by death_timestamp (newest first)""" async with DatabaseSession() as session: stmt = select(player_corpses).where(player_corpses.c.location_id == location_id).order_by(player_corpses.c.death_timestamp.desc()) result = await session.execute(stmt) rows = result.fetchall() return [dict(row._mapping) for row in rows] # ============= WANDERING ENEMIES ============= async def spawn_wandering_enemy(npc_id: str, location_id: str, current_hp: int, max_hp: int) -> int: """Spawn a wandering enemy at a location""" async with DatabaseSession() as session: stmt = insert(wandering_enemies).values( npc_id=npc_id, location_id=location_id, current_hp=current_hp, max_hp=max_hp, spawn_timestamp=time.time() ).returning(wandering_enemies.c.id) result = await session.execute(stmt) enemy_id = result.scalar() await session.commit() return enemy_id async def get_wandering_enemies_in_location(location_id: str) -> List[Dict[str, Any]]: """Get all wandering enemies in a location""" async with DatabaseSession() as session: stmt = select(wandering_enemies).where(wandering_enemies.c.location_id == location_id) result = await session.execute(stmt) return [dict(row._mapping) for row in result.all()] async def remove_wandering_enemy(enemy_id: int) -> bool: """Remove a wandering enemy""" async with DatabaseSession() as session: stmt = delete(wandering_enemies).where(wandering_enemies.c.id == enemy_id) await session.execute(stmt) await session.commit() return True # ============= COOLDOWNS ============= async def get_cooldown(cooldown_key: str) -> int: """Get remaining cooldown time in seconds (0 if expired or not found)""" async with DatabaseSession() as session: stmt = select(interactable_cooldowns).where( interactable_cooldowns.c.interactable_instance_id == cooldown_key ) result = await session.execute(stmt) row = result.first() if not row: return 0 expiry = row.expiry_timestamp current_time = time.time() if current_time >= expiry: # Expired, clean up await session.execute( delete(interactable_cooldowns).where( interactable_cooldowns.c.interactable_instance_id == cooldown_key ) ) await session.commit() return 0 return int(expiry - current_time) async def set_cooldown(cooldown_key: str, duration_seconds: int = 600) -> bool: """Set a cooldown (default 10 minutes)""" async with DatabaseSession() as session: expiry_time = time.time() + duration_seconds # Upsert - update if exists, insert if not stmt = insert(interactable_cooldowns).values( interactable_instance_id=cooldown_key, expiry_timestamp=expiry_time ) # PostgreSQL specific upsert syntax from sqlalchemy.dialects.postgresql import insert as pg_insert stmt = pg_insert(interactable_cooldowns).values( interactable_instance_id=cooldown_key, expiry_timestamp=expiry_time ).on_conflict_do_update( index_elements=['interactable_instance_id'], set_={'expiry_timestamp': expiry_time} ) await session.execute(stmt) await session.commit() return True # ============= CORPSE LISTS ============= async def get_player_corpses_in_location(location_id: str) -> List[Dict[str, Any]]: """Get all player corpses in a location, sorted by death_timestamp (oldest first)""" async with DatabaseSession() as session: stmt = select(player_corpses).where(player_corpses.c.location_id == location_id).order_by(player_corpses.c.death_timestamp.asc()) result = await session.execute(stmt) return [dict(row._mapping) for row in result.all()] async def get_npc_corpses_in_location(location_id: str) -> List[Dict[str, Any]]: """Get all NPC corpses in a location, sorted by death_timestamp (oldest first)""" async with DatabaseSession() as session: stmt = select(npc_corpses).where(npc_corpses.c.location_id == location_id).order_by(npc_corpses.c.death_timestamp.asc()) result = await session.execute(stmt) return [dict(row._mapping) for row in result.all()] # ============= IMAGE CACHE ============= async def get_cached_image(image_path: str) -> Optional[str]: """Get cached telegram file ID for an image path""" async with DatabaseSession() as session: stmt = select(image_cache).where(image_cache.c.image_path == image_path) result = await session.execute(stmt) row = result.first() return row.telegram_file_id if row else None async def cache_image(image_path: str, telegram_file_id: str) -> bool: """Cache a telegram file ID for an image path""" async with DatabaseSession() as session: stmt = insert(image_cache).values( image_path=image_path, telegram_file_id=telegram_file_id, uploaded_at=time.time() ) await session.execute(stmt) await session.commit() return True # ============= STATUS EFFECTS ============= async def get_player_status_effects(player_id: int) -> List[Dict[str, Any]]: """Get all active status effects for a player""" async with DatabaseSession() as session: stmt = select(player_status_effects).where(player_status_effects.c.character_id == player_id) result = await session.execute(stmt) return [dict(row._mapping) for row in result.all()] # ============= PLAYER STATISTICS ============= async def get_player_statistics(player_id: int) -> Optional[Dict[str, Any]]: """Get player statistics""" async with DatabaseSession() as session: stmt = select(player_statistics).where(player_statistics.c.character_id == player_id) result = await session.execute(stmt) row = result.first() if row: return dict(row._mapping) else: # Create initial statistics for player stmt = insert(player_statistics).values( character_id=player_id, created_at=time.time(), last_activity=time.time() ) await session.execute(stmt) await session.commit() # Return the newly created stats stmt = select(player_statistics).where(player_statistics.c.character_id == player_id) result = await session.execute(stmt) row = result.first() return dict(row._mapping) if row else None async def update_player_statistics(player_id: int, **kwargs) -> bool: """ Update player statistics. Use increment=True in kwargs to add to existing value. Example: update_player_statistics(1, enemies_killed=1, increment=True) """ async with DatabaseSession() as session: # Ensure stats exist await get_player_statistics(player_id) increment = kwargs.pop('increment', False) kwargs['last_activity'] = time.time() if increment: # Get current stats to increment current_stats = await get_player_statistics(player_id) for key, value in kwargs.items(): if key in current_stats and key != 'last_activity': kwargs[key] = current_stats[key] + value stmt = update(player_statistics).where( player_statistics.c.character_id == player_id ).values(**kwargs) await session.execute(stmt) await session.commit() return True async def get_leaderboard(stat_name: str, limit: int = 100) -> List[Dict[str, Any]]: """Get leaderboard for a specific stat""" async with DatabaseSession() as session: # Join with characters table to get character info stmt = select( player_statistics, characters.c.name, characters.c.level ).join( characters, player_statistics.c.character_id == characters.c.id ).where( getattr(player_statistics.c, stat_name) > 0 ).order_by( getattr(player_statistics.c, stat_name).desc() ).limit(limit) result = await session.execute(stmt) rows = result.all() leaderboard = [] for i, row in enumerate(rows, 1): data = dict(row._mapping) leaderboard.append({ "rank": i, "character_id": data['character_id'], "name": data['name'], "level": data['level'], "value": data[stat_name] }) return leaderboard # ============================================================================ # EQUIPMENT SYSTEM # ============================================================================ async def get_equipped_item_in_slot(player_id: int, slot: str) -> Optional[Dict[str, Any]]: """Get the equipped item in a specific slot""" async with DatabaseSession() as session: stmt = text(""" SELECT * FROM equipment_slots WHERE character_id = :player_id AND slot_type = :slot """) result = await session.execute(stmt, {"player_id": player_id, "slot": slot}) row = result.first() return dict(row._mapping) if row else None async def equip_item(player_id: int, slot: str, inventory_item_id: int) -> bool: """Equip an item to a slot""" async with DatabaseSession() as session: stmt = text(""" INSERT INTO equipment_slots (character_id, slot_type, item_id) VALUES (:player_id, :slot, :item_id) ON CONFLICT (character_id, slot_type) DO UPDATE SET item_id = :item_id """) await session.execute(stmt, { "player_id": player_id, "slot": slot, "item_id": inventory_item_id }) await session.commit() # Invalidate cache if redis_manager and redis_manager.redis_client: try: await redis_manager.invalidate_inventory(player_id) except Exception as e: logger.warning(f"Redis cache invalidation failed for inventory {player_id}: {e}") return True async def unequip_item(player_id: int, slot: str) -> bool: """Unequip an item from a slot""" async with DatabaseSession() as session: stmt = text(""" UPDATE equipment_slots SET item_id = NULL WHERE character_id = :player_id AND slot_type = :slot """) await session.execute(stmt, {"player_id": player_id, "slot": slot}) await session.commit() # Invalidate cache if redis_manager and redis_manager.redis_client: try: await redis_manager.invalidate_inventory(player_id) except Exception as e: logger.warning(f"Redis cache invalidation failed for inventory {player_id}: {e}") return True async def get_all_equipment(player_id: int) -> Dict[str, Optional[Dict[str, Any]]]: """Get all equipped items for a player""" async with DatabaseSession() as session: stmt = text(""" SELECT slot_type, item_id FROM equipment_slots WHERE character_id = :player_id """) result = await session.execute(stmt, {"player_id": player_id}) rows = result.fetchall() equipment = {} for row in rows: slot = row[0] item_id = row[1] equipment[slot] = {"item_id": item_id} if item_id else None return equipment async def update_encumbrance(player_id: int) -> int: """Calculate and update player encumbrance based on equipped items""" # This will be called after equip/unequip # For now, just set to 0, we'll implement the calculation in game logic async with DatabaseSession() as session: stmt = text(""" UPDATE characters SET encumbrance = 0 WHERE id = :player_id """) await session.execute(stmt, {"player_id": player_id}) await session.commit() return 0 async def get_inventory_item_by_id(inventory_id: int) -> Optional[Dict[str, Any]]: """Get a specific inventory item by its ID""" async with DatabaseSession() as session: stmt = text(""" SELECT * FROM inventory WHERE id = :id """) result = await session.execute(stmt, {"id": inventory_id}) row = result.first() return dict(row._mapping) if row else None async def update_inventory_item(inventory_id: int, **kwargs) -> bool: """Update an inventory item's properties""" if not kwargs: return False # Get player_id before updating for cache invalidation async with DatabaseSession() as session: result = await session.execute( select(inventory.c.character_id).where(inventory.c.id == inventory_id) ) row = result.first() player_id = row[0] if row else None # Build UPDATE statement dynamically set_clauses = [f"{key} = :{key}" for key in kwargs.keys()] stmt_str = f""" UPDATE inventory SET {', '.join(set_clauses)} WHERE id = :inventory_id """ params = {"inventory_id": inventory_id, **kwargs} await session.execute(text(stmt_str), params) await session.commit() # Invalidate cache if player_id and redis_manager and redis_manager.redis_client: try: await redis_manager.invalidate_inventory(player_id) except Exception as e: logger.warning(f"Redis cache invalidation failed for inventory {player_id}: {e}") return True async def decrease_item_durability(inventory_id: int, amount: int = 1) -> Optional[int]: """Decrease an item's durability and return new value""" async with DatabaseSession() as session: # Get current durability stmt = text("SELECT durability FROM inventory WHERE id = :id") result = await session.execute(stmt, {"id": inventory_id}) row = result.first() if not row or row[0] is None: return None new_durability = max(0, row[0] - amount) # Update durability stmt = text(""" UPDATE inventory SET durability = :durability WHERE id = :id """) await session.execute(stmt, {"durability": new_durability, "id": inventory_id}) await session.commit() return new_durability # ============================================================================ # UNIQUE ITEMS MANAGEMENT # ============================================================================ async def create_unique_item( item_id: str, durability: Optional[int] = None, max_durability: Optional[int] = None, tier: Optional[int] = None, unique_stats: Optional[Dict[str, Any]] = None ) -> int: """Create a new unique item instance and return its ID""" # Auto-populate from item definition if missing if unique_stats is None or durability is None or max_durability is None or tier is None: item_def = items.get_item(item_id) if item_def: if unique_stats is None: unique_stats = item_def.stats.copy() if item_def.stats else {} if durability is None: durability = item_def.durability if max_durability is None: max_durability = item_def.durability if tier is None: tier = item_def.tier async with DatabaseSession() as session: stmt = insert(unique_items).values( item_id=item_id, durability=durability, max_durability=max_durability, tier=tier, unique_stats=unique_stats ) result = await session.execute(stmt) await session.commit() return result.inserted_primary_key[0] async def get_unique_item(unique_item_id: int) -> Optional[Dict[str, Any]]: """Get a unique item by ID""" async with DatabaseSession() as session: result = await session.execute( select(unique_items).where(unique_items.c.id == unique_item_id) ) row = result.first() return dict(row._mapping) if row else None async def get_unique_items_batch(unique_item_ids: List[int]) -> Dict[int, Dict[str, Any]]: """ Batch fetch multiple unique items by IDs in a single query. Returns: Dict mapping unique_item_id -> unique_item data """ if not unique_item_ids: return {} async with DatabaseSession() as session: result = await session.execute( select(unique_items).where(unique_items.c.id.in_(unique_item_ids)) ) rows = result.fetchall() return {row.id: dict(row._mapping) for row in rows} async def update_unique_item(unique_item_id: int, **kwargs) -> bool: """Update a unique item's properties""" async with DatabaseSession() as session: stmt = update(unique_items).where( unique_items.c.id == unique_item_id ).values(**kwargs) await session.execute(stmt) await session.commit() return True async def delete_unique_item(unique_item_id: int) -> bool: """Delete a unique item (will cascade to inventory/dropped_items references)""" async with DatabaseSession() as session: stmt = delete(unique_items).where(unique_items.c.id == unique_item_id) await session.execute(stmt) await session.commit() return True async def decrease_unique_item_durability(unique_item_id: int, amount: int = 1) -> Optional[int]: """ Decrease durability of a unique item. If it reaches 0, delete the item. Returns new durability, or None if item was deleted. """ async with DatabaseSession() as session: # Get current durability result = await session.execute( select(unique_items.c.durability).where(unique_items.c.id == unique_item_id) ) row = result.first() if not row or row[0] is None: return None new_durability = max(0, row[0] - amount) if new_durability <= 0: # Item broken - delete it (cascades to inventory/dropped_items) await delete_unique_item(unique_item_id) return None else: # Update durability stmt = update(unique_items).where( unique_items.c.id == unique_item_id ).values(durability=new_durability) await session.execute(stmt) await session.commit() return new_durability # ============================================================================ # COMBAT TIMER FUNCTIONS # ============================================================================ async def get_all_idle_combats(idle_threshold: float): """Get all combats where the turn has been idle too long.""" async with DatabaseSession() as session: result = await session.execute( select(active_combats).where(active_combats.c.turn_started_at < idle_threshold) ) return [row._asdict() for row in result.fetchall()] # ============================================================================ # CORPSE MANAGEMENT FUNCTIONS # ============================================================================ # create_player_corpse is defined at line 1213 - removed duplicate here async def get_expired_player_corpses(timestamp_limit: float): """Get list of expired player corpses before cleanup.""" async with DatabaseSession() as session: result = await session.execute( select(player_corpses).where(player_corpses.c.death_timestamp < timestamp_limit) ) return [dict(row._mapping) for row in result.fetchall()] # Duplicate removed - get_expired_player_corpses is already defined above async def remove_expired_player_corpses(timestamp_limit: float) -> int: """Remove old player corpses.""" async with DatabaseSession() as session: stmt = delete(player_corpses).where(player_corpses.c.death_timestamp < timestamp_limit) result = await session.execute(stmt) await session.commit() return result.rowcount async def get_expired_npc_corpses(timestamp_limit: float): """Get list of expired NPC corpses before cleanup.""" async with DatabaseSession() as session: result = await session.execute( select(npc_corpses).where(npc_corpses.c.death_timestamp < timestamp_limit) ) return [dict(row._mapping) for row in result.fetchall()] async def remove_expired_npc_corpses(timestamp_limit: float) -> int: """Remove old NPC corpses.""" async with DatabaseSession() as session: stmt = delete(npc_corpses).where(npc_corpses.c.death_timestamp < timestamp_limit) result = await session.execute(stmt) await session.commit() return result.rowcount # ============================================================================ # STATUS EFFECTS FUNCTIONS # ============================================================================ async def get_player_status_effects(player_id: int): """Get all active status effects for a player.""" async with DatabaseSession() as session: result = await session.execute( select(player_status_effects).where( and_( player_status_effects.c.character_id == player_id, player_status_effects.c.ticks_remaining > 0 ) ) ) return [row._asdict() for row in result.fetchall()] async def remove_all_status_effects(player_id: int): """Remove all status effects from a player.""" async with DatabaseSession() as session: await session.execute( delete(player_status_effects).where(player_status_effects.c.character_id == player_id) ) await session.commit() async def decrement_all_status_effect_ticks(): """ Decrement ticks for all active status effects and return affected player IDs. Used by background processor. """ async with DatabaseSession() as session: # Get player IDs with effects before updating from sqlalchemy import distinct result = await session.execute( select(distinct(player_status_effects.c.character_id)).where( player_status_effects.c.ticks_remaining > 0 ) ) affected_players = [row[0] for row in result.fetchall()] # Decrement ticks await session.execute( update(player_status_effects).where( player_status_effects.c.ticks_remaining > 0 ).values(ticks_remaining=player_status_effects.c.ticks_remaining - 1) ) # Remove expired effects await session.execute( delete(player_status_effects).where(player_status_effects.c.ticks_remaining <= 0) ) await session.commit() return affected_players # ============================================================================ # WANDERING ENEMY SPAWN FUNCTIONS # ============================================================================ async def spawn_wandering_enemy(npc_id: str, location_id: str, lifetime_seconds: int = 600): """Spawn a wandering enemy at a location. Lifetime defaults to 10 minutes. Returns the spawned enemy data.""" import time async with DatabaseSession() as session: current_time = time.time() despawn_time = current_time + lifetime_seconds stmt = wandering_enemies.insert().values( npc_id=npc_id, location_id=location_id, spawn_timestamp=current_time, despawn_timestamp=despawn_time ).returning(wandering_enemies) result = await session.execute(stmt) await session.commit() row = result.fetchone() return dict(row._mapping) if row else None async def get_expired_wandering_enemies(): """Get list of expired wandering enemies before cleanup.""" import time async with DatabaseSession() as session: current_time = time.time() result = await session.execute( select(wandering_enemies).where(wandering_enemies.c.despawn_timestamp <= current_time) ) return [dict(row._mapping) for row in result.fetchall()] async def get_expired_wandering_enemies(): """Get all expired wandering enemies before cleanup.""" import time async with DatabaseSession() as session: current_time = time.time() result = await session.execute( select(wandering_enemies).where(wandering_enemies.c.despawn_timestamp <= current_time) ) return [dict(row._mapping) for row in result.fetchall()] async def cleanup_expired_wandering_enemies(): """Remove all expired wandering enemies.""" import time async with DatabaseSession() as session: current_time = time.time() stmt = delete(wandering_enemies).where(wandering_enemies.c.despawn_timestamp <= current_time) result = await session.execute(stmt) await session.commit() return result.rowcount # Number of enemies despawned async def get_wandering_enemy_count_in_location(location_id: str) -> int: """Count active wandering enemies at a location.""" import time async with DatabaseSession() as session: current_time = time.time() result = await session.execute( select(wandering_enemies).where( and_( wandering_enemies.c.location_id == location_id, wandering_enemies.c.despawn_timestamp > current_time ) ) ) return len(result.fetchall()) async def get_all_active_wandering_enemies(): """Get all active wandering enemies across all locations.""" import time async with DatabaseSession() as session: current_time = time.time() result = await session.execute( select(wandering_enemies).where(wandering_enemies.c.despawn_timestamp > current_time) ) return [row._asdict() for row in result.fetchall()] # ============================================================================ # STAMINA REGENERATION FUNCTIONS # ============================================================================ async def regenerate_all_players_stamina() -> List[Dict[str, Any]]: """ Regenerate stamina for all active players using a single optimized query. Recovery formula: - Base recovery: 1 stamina per cycle (5 minutes) - Endurance bonus: +1 stamina per 10 endurance points - Example: 5 endurance = 1 stamina, 15 endurance = 2 stamina, 25 endurance = 3 stamina - Only regenerates up to max_stamina - Only regenerates for living players Returns list of updated players with their new stamina values for notifications. PERFORMANCE: Single SQL query, scales to 100K+ players efficiently. """ from sqlalchemy import text async with DatabaseSession() as session: # First get the players that will be updated (for notifications) select_stmt = text(""" SELECT id, stamina, max_stamina, endurance, LEAST(stamina + 1 + (endurance / 10), max_stamina) as new_stamina FROM characters WHERE is_dead = FALSE AND stamina < max_stamina """) result = await session.execute(select_stmt) updated_players = [dict(row._mapping) for row in result.fetchall()] # Now perform the update update_stmt = text(""" UPDATE characters SET stamina = LEAST( stamina + 1 + (endurance / 10), max_stamina ) WHERE is_dead = FALSE AND stamina < max_stamina """) await session.execute(update_stmt) await session.commit() return updated_players # ============================================================================ # DROPPED ITEMS CLEANUP FUNCTIONS # ============================================================================ async def get_expired_dropped_items(timestamp_limit: float): """Get list of expired dropped items before cleanup.""" async with DatabaseSession() as session: result = await session.execute( select(dropped_items).where(dropped_items.c.drop_timestamp < timestamp_limit) ) return [dict(row._mapping) for row in result.fetchall()] async def get_expired_dropped_items(timestamp_limit: float): """Get all expired dropped items before removal.""" async with DatabaseSession() as session: result = await session.execute( select(dropped_items).where(dropped_items.c.drop_timestamp < timestamp_limit) ) return [dict(row._mapping) for row in result.fetchall()] async def remove_expired_dropped_items(timestamp_limit: float) -> int: """Remove old dropped items from the world.""" async with DatabaseSession() as session: stmt = delete(dropped_items).where(dropped_items.c.drop_timestamp < timestamp_limit) result = await session.execute(stmt) await session.commit() return result.rowcount # ============================================================================ # PVP COMBAT FUNCTIONS # ============================================================================ async def get_pvp_combat_by_id(combat_id: int) -> Optional[Dict[str, Any]]: """Get PVP combat by ID.""" async with DatabaseSession() as session: stmt = select(pvp_combats).where(pvp_combats.c.id == combat_id) result = await session.execute(stmt) row = result.first() return dict(row._mapping) if row else None async def get_pvp_combat_by_player(character_id: int) -> Optional[Dict[str, Any]]: """Get active PVP combat for a player (either as attacker or defender).""" async with DatabaseSession() as session: stmt = select(pvp_combats).where( and_( or_( pvp_combats.c.attacker_character_id == character_id, pvp_combats.c.defender_character_id == character_id ), # If acknowledged by both, it's effectively over for query purposes # But here we want the active one. # Logic: If I am attacker, and I haven't acknowledged => active # If I am defender, and I haven't acknowledged => active # Simplified: Just return the record, caller handles logic. ) ) result = await session.execute(stmt) # There should only be one active combat at a time per player row = result.first() return dict(row._mapping) if row else None async def create_pvp_combat( attacker_id: int, defender_id: int, location_id: str, turn_timeout: int = 300 ) -> Dict[str, Any]: """Create a new PVP combat encounter.""" import time async with DatabaseSession() as session: current_time = time.time() # Get names for denormalization attacker_res = await session.execute(select(characters.c.name).where(characters.c.id == attacker_id)) defender_res = await session.execute(select(characters.c.name).where(characters.c.id == defender_id)) attacker_name = attacker_res.scalar() or "Unknown" defender_name = defender_res.scalar() or "Unknown" stmt = insert(pvp_combats).values( attacker_character_id=attacker_id, defender_character_id=defender_id, attacker_name=attacker_name, defender_name=defender_name, location_id=location_id, started_at=current_time, updated_at=current_time, turn='defender', # Defender goes first usually, or random? 'initiator pays price?' # Requirement says: "You have initiated combat... They get the first turn." turn_started_at=current_time, turn_timeout_seconds=turn_timeout, attacker_acknowledged=False, defender_acknowledged=False ).returning(pvp_combats) result = await session.execute(stmt) await session.commit() return dict(result.fetchone()._mapping) async def update_pvp_combat(combat_id: int, updates: Dict[str, Any]) -> bool: """Update PVP combat state.""" import time updates['updated_at'] = time.time() async with DatabaseSession() as session: stmt = update(pvp_combats).where( pvp_combats.c.id == combat_id ).values(**updates) await session.execute(stmt) await session.commit() return True async def acknowledge_pvp_combat(combat_id: int, player_id: int) -> bool: """Player acknowledges combat end.""" async with DatabaseSession() as session: # First check who this player is stmt = select(pvp_combats).where(pvp_combats.c.id == combat_id) result = await session.execute(stmt) combat = result.first() if not combat: return False updates = {} if combat.attacker_character_id == player_id: updates['attacker_acknowledged'] = True elif combat.defender_character_id == player_id: updates['defender_acknowledged'] = True else: return False stmt = update(pvp_combats).where( pvp_combats.c.id == combat_id ).values(**updates) await session.execute(stmt) # Check if both acknowledged, then delete? # Or just keep it. We have acknowledge flags. # If both acknowledged, maybe delete to clean up? # Let's check updated flags if (updates.get('attacker_acknowledged') or combat.attacker_acknowledged) and \ (updates.get('defender_acknowledged') or combat.defender_acknowledged): delete_stmt = delete(pvp_combats).where(pvp_combats.c.id == combat_id) await session.execute(delete_stmt) await session.commit() return True