""" Combat Engine - Shared combat logic for PvE and PvP. All combat actions (attack, skill, use_item, flee) go through this engine. The engine returns structured results (messages, state changes) without sending WebSocket messages or HTTP responses โ€” that's the router's job. Target abstraction: PvE: {id, hp, max_hp, defense, name, type: "npc", npc_def} PvP: {id, hp, max_hp, defense: 0, name, type: "player"} """ import random import time import json import logging from typing import Dict, Any, List, Tuple, Optional from .. import database as db from ..items import ItemsManager from ..services.helpers import create_combat_message, get_locale_string, get_game_message from ..services.stats import calculate_derived_stats logger = logging.getLogger(__name__) # ============================================================================ # STATUS EFFECTS PROCESSING # ============================================================================ async def process_status_effects(player_id: int, player: dict, max_hp: int) -> Tuple[List[dict], int, bool]: """ Tick and process all status effects on a player before their action. Returns: (messages, new_hp, player_died) """ messages = [] current_hp = player['hp'] player_died = False active_effects = await db.tick_player_effects(player_id) if not active_effects: return messages, current_hp, False from ..game_logic import calculate_status_impact total_impact = calculate_status_impact(active_effects) if total_impact > 0: # DAMAGE from effects damage = total_impact current_hp = max(0, current_hp - damage) await db.update_player_hp(player_id, current_hp) messages.append(create_combat_message( "effect_damage", origin="player", damage=damage, effect_name="status effects" )) if current_hp <= 0: player_died = True elif total_impact < 0: # HEALING from effects heal = abs(total_impact) new_hp = min(max_hp, current_hp + heal) actual_heal = new_hp - current_hp if actual_heal > 0: current_hp = new_hp await db.update_player_hp(player_id, current_hp) messages.append(create_combat_message( "effect_heal", origin="player", heal=actual_heal, effect_name="status effects" )) return messages, current_hp, player_died # ============================================================================ # CHECK ACTIVE BUFFS # ============================================================================ async def get_active_buff_modifiers(player_id: int) -> Dict[str, Any]: """ Read active buff effects and return combat modifiers. Consumes one-shot buffs (evade, foresight tick) where appropriate. Returns dict with: - damage_reduction (float, 0-1): reduction on incoming damage - damage_bonus (float, 0+): bonus multiplier on outgoing damage - damage_taken_increase (float, 0+): increase on incoming damage - guaranteed_dodge (bool): auto-dodge next attack - enemy_miss (bool): enemy attacks miss - status_immunity (bool): immune to new status effects """ effects = await db.get_player_effects(player_id) modifiers = { 'damage_reduction': 0.0, 'damage_bonus': 0.0, 'damage_taken_increase': 0.0, 'guaranteed_dodge': False, 'enemy_miss': False, 'status_immunity': False, } for eff in effects: source = eff.get('source', '') effect_name = eff.get('effect_name', '') value = eff.get('value', 0) # Fortify: damage_reduction stored as value (e.g. 60 for 60%) if 'fortify' in source: modifiers['damage_reduction'] = max(modifiers['damage_reduction'], value / 100.0) # Berserker Rage: damage_bonus + damage_taken_increase elif effect_name == 'berserker_rage': modifiers['damage_bonus'] = 0.5 modifiers['damage_taken_increase'] = 0.25 # Evade: guaranteed dodge elif effect_name == 'evade': modifiers['guaranteed_dodge'] = True # Foresight: enemy misses elif effect_name == 'foresight': modifiers['enemy_miss'] = True # Iron Skin: status immunity elif effect_name == 'iron_skin': modifiers['status_immunity'] = True return modifiers # ============================================================================ # ATTACK ACTION # ============================================================================ async def execute_attack( attacker_id: int, attacker: dict, attacker_stats: dict, target: dict, is_pvp: bool, items_manager: ItemsManager, reduce_armor_func, ) -> Dict[str, Any]: """ Execute a basic attack action. Returns: { messages: list, damage_dealt: int, target_hp: int, target_defeated: bool, weapon_broke: bool } """ messages = [] # Get active buff modifiers buff_mods = await get_active_buff_modifiers(attacker_id) # Base damage from derived stats (includes weapon + str + level + perks) base_damage = attacker_stats.get('attack_power', 5) # Apply berserker rage damage bonus if buff_mods['damage_bonus'] > 0: base_damage = int(base_damage * (1 + buff_mods['damage_bonus'])) # Check encumbrance miss encumbrance = attacker.get('encumbrance', 0) if encumbrance > 0: miss_chance = min(0.3, encumbrance * 0.05) if random.random() < miss_chance: messages.append(create_combat_message( "player_miss", origin="player", reason="encumbrance" )) return { 'messages': messages, 'damage_dealt': 0, 'target_hp': target['hp'], 'target_defeated': False, 'weapon_broke': False } # Variance variance = random.randint(-2, 2) damage = max(1, base_damage + variance) # Critical hit is_critical = False crit_chance = attacker_stats.get('crit_chance', 0.05) if random.random() < crit_chance: is_critical = True damage = int(damage * attacker_stats.get('crit_damage', 1.5)) # Weapon effects and durability weapon_broke = False weapon_effects = {} equipment = await db.get_all_equipment(attacker_id) inv_item = None weapon_def = None if equipment.get('weapon') and equipment['weapon']: weapon_slot = equipment['weapon'] inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id']) if inv_item: weapon_def = items_manager.get_item(inv_item['item_id']) if weapon_def: weapon_effects = weapon_def.weapon_effects if hasattr(weapon_def, 'weapon_effects') else {} # Apply defense / armor armor_absorbed = 0 broken_armor = [] if is_pvp: # PvP: use equipment-based armor reduction + durability armor_absorbed, broken_armor = await reduce_armor_func(target['id'], damage) actual_damage = max(1, damage - armor_absorbed) else: # PvE: use NPC's flat defense value npc_defense = target.get('defense', 0) armor_absorbed = npc_defense if npc_defense > 0 else 0 actual_damage = max(1, damage - npc_defense) if is_critical: messages.append(create_combat_message("combat_crit", origin="player")) messages.append(create_combat_message( "player_attack", origin="player", damage=actual_damage, armor_absorbed=armor_absorbed )) for broken in broken_armor: messages.append(create_combat_message( "item_broken", origin="enemy", item_name=broken['name'], emoji=broken['emoji'] )) # Weapon bleeding effects (PvE and PvP) bleed_damage = 0 if weapon_effects and 'bleeding' in weapon_effects: bleeding = weapon_effects['bleeding'] if random.random() < bleeding.get('chance', 0): bleed_damage = bleeding.get('damage', 0) if is_pvp: # Apply as a status effect on the target player await db.add_effect( player_id=target['id'], effect_name="Bleeding", effect_icon="๐Ÿฉธ", effect_type="damage", damage_per_tick=bleed_damage, ticks_remaining=3, persist_after_combat=False, source="weapon_bleeding" ) messages.append(create_combat_message( "effect_bleeding", origin="player", damage=bleed_damage )) # Apply damage to target new_target_hp = max(0, target['hp'] - actual_damage) if not is_pvp and bleed_damage > 0: new_target_hp = max(0, new_target_hp - bleed_damage) # Weapon durability if inv_item and inv_item.get('unique_item_id'): new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1) if new_durability is None: weapon_broke = True messages.append(create_combat_message( "weapon_broke", origin="player", item_name=weapon_def.name if weapon_def else "weapon" )) await db.unequip_item(attacker_id, 'weapon') return { 'messages': messages, 'damage_dealt': actual_damage, 'target_hp': new_target_hp, 'target_defeated': new_target_hp <= 0, 'weapon_broke': weapon_broke, } # ============================================================================ # SKILL ACTION # ============================================================================ async def execute_skill( player_id: int, player: dict, player_stats: dict, target: dict, skill_id: str, combat_state: dict, is_pvp: bool, items_manager: ItemsManager, reduce_armor_func, redis_manager=None, ) -> Dict[str, Any]: """ Execute a skill action. Validates requirements, deducts stamina, applies effects. combat_state: For PvE, the combat dict. For PvP, can be minimal. Returns: { messages, damage_dealt, target_hp, target_defeated, player_hp, player_stamina, error (str or None) } """ from ..services.skills import skills_manager skill = skills_manager.get_skill(skill_id) if not skill: return {'error': 'Skill not found', 'status_code': 404} # Check unlocked stat_val = player.get(skill.stat_requirement, 0) if stat_val < skill.stat_threshold or player['level'] < skill.level_requirement: return {'error': 'Skill not unlocked', 'status_code': 400} # Check cooldown active_effects = await db.get_player_effects(player_id) cd_source = f"cd:{skill.id}" for eff in active_effects: if eff.get('source') == cd_source and eff.get('ticks_remaining', 0) > 0: return {'error': f"Skill on cooldown ({eff['ticks_remaining']} turns)", 'status_code': 400} # Check stamina if player['stamina'] < skill.stamina_cost: return {'error': 'Not enough stamina', 'status_code': 400} # Deduct stamina new_stamina = player['stamina'] - skill.stamina_cost await db.update_player_stamina(player_id, new_stamina) # Add cooldown if skill.cooldown > 0: await db.add_effect( player_id=player_id, effect_name=f"{skill.id}_cooldown", effect_icon="โณ", effect_type="cooldown", value=0, ticks_remaining=skill.cooldown, persist_after_combat=False, source=cd_source ) # Get weapon info for damage skills equipment = await db.get_all_equipment(player_id) weapon_damage = 0 weapon_inv_id = None inv_item = None weapon_def = None if equipment.get('weapon') and equipment['weapon']: weapon_slot = equipment['weapon'] inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id']) if inv_item: weapon_def = items_manager.get_item(inv_item['item_id']) if weapon_def and weapon_def.stats: weapon_damage = random.randint( weapon_def.stats.get('damage_min', 0), weapon_def.stats.get('damage_max', 0) ) weapon_inv_id = inv_item['id'] # Get buff modifiers buff_mods = await get_active_buff_modifiers(player_id) messages = [] effects = skill.effects target_hp = target['hp'] damage_dealt = 0 player_hp = player['hp'] # โ”€โ”€ Damage skills โ”€โ”€ if 'damage_multiplier' in effects: # Use derived attack_power for base (includes str, level, weapon, perks) base_damage = player_stats.get('attack_power', 5) variance = random.randint(-2, 2) raw_damage = max(1, base_damage + variance) # Apply berserker rage bonus if buff_mods['damage_bonus'] > 0: raw_damage = int(raw_damage * (1 + buff_mods['damage_bonus'])) multiplier = effects['damage_multiplier'] # Execute check if 'execute_threshold' in effects: hp_pct = target['hp'] / target['max_hp'] if target['max_hp'] > 0 else 1 if hp_pct <= effects['execute_threshold']: multiplier = effects.get('execute_multiplier', multiplier) # Exploit weakness (requires analyzed โ€” PvE only via npc_status_effects) if effects.get('requires_analyzed') and not is_pvp: analyzed = combat_state.get('npc_status_effects', '') or '' if 'analyzed' not in analyzed: multiplier = 1.0 damage = max(1, int(raw_damage * multiplier)) # Guaranteed crit if effects.get('guaranteed_crit'): damage = int(damage * player_stats.get('crit_damage', 1.5)) # Multi-hit num_hits = effects.get('hits', 1) total_damage = 0 total_armor_absorbed = 0 for hit in range(num_hits): hit_dmg = damage if hit == 0 else max(1, int(raw_damage * multiplier)) if is_pvp: # PvP: armor from equipment absorbed, broken_armor = await reduce_armor_func(target['id'], hit_dmg) total_armor_absorbed += absorbed for broken in broken_armor: messages.append(create_combat_message( "item_broken", origin="enemy", item_name=broken['name'], emoji=broken['emoji'] )) actual_hit = max(1, hit_dmg - absorbed) else: # PvE: NPC flat defense npc_defense = target.get('defense', 0) if 'armor_penetration' in effects: npc_defense = int(npc_defense * (1 - effects['armor_penetration'])) actual_hit = max(1, hit_dmg - npc_defense) total_armor_absorbed += npc_defense total_damage += actual_hit target_hp = max(0, target_hp - actual_hit) damage_dealt = total_damage messages.append(create_combat_message( "skill_attack", origin="player", damage=total_damage, skill_name=skill.name, skill_icon=skill.icon, hits=num_hits )) # Lifesteal if 'lifesteal' in effects: heal_amount = int(total_damage * effects['lifesteal']) new_hp = min(player.get('max_hp', player_stats.get('max_hp', 100)), player_hp + heal_amount) if new_hp > player_hp: await db.update_player_hp(player_id, new_hp) player_hp = new_hp messages.append(create_combat_message( "skill_heal", origin="player", heal=heal_amount, skill_icon="๐Ÿฉธ" )) # Poison DoT if 'poison_damage' in effects: if is_pvp: # PvP: add as player effect await db.add_effect( player_id=target['id'], effect_name="Poison", effect_icon="๐Ÿงช", effect_type="damage", damage_per_tick=effects['poison_damage'], ticks_remaining=effects['poison_duration'], persist_after_combat=True, source=f"skill_poison:{skill.id}" ) else: # PvE: add to npc_status_effects string poison_str = f"poison:{effects['poison_damage']}:{effects['poison_duration']}" existing = combat_state.get('npc_status_effects', '') or '' if existing: existing += '|' + poison_str else: existing = poison_str await db.update_combat(player_id, {'npc_status_effects': existing}) messages.append(create_combat_message( "skill_effect", origin="player", message=f"๐Ÿงช Poisoned! ({effects['poison_damage']} dmg/turn)" )) # Stun chance if 'stun_chance' in effects and random.random() < effects['stun_chance']: if is_pvp: await db.add_effect( player_id=target['id'], effect_name="Stunned", effect_icon="๐Ÿ’ซ", effect_type="debuff", ticks_remaining=1, persist_after_combat=False, source="skill_stun" ) else: # PvE: write stun to npc_status_effects string stun_str = "stun:1" existing = combat_state.get('npc_status_effects', '') or '' # Refresh combat state in case poison just updated it fresh_combat = await db.get_active_combat(player_id) if fresh_combat: existing = fresh_combat.get('npc_status_effects', '') or '' if existing: existing += '|' + stun_str else: existing = stun_str await db.update_combat(player_id, {'npc_status_effects': existing}) messages.append(create_combat_message( "skill_effect", origin="player", message="๐Ÿ’ซ Stunned!" )) # Weapon durability if weapon_inv_id and inv_item and inv_item.get('unique_item_id'): new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1) if new_durability is None: messages.append(create_combat_message( "weapon_broke", origin="player", item_name=weapon_def.name if weapon_def else "weapon" )) await db.unequip_item(player_id, 'weapon') # โ”€โ”€ Heal skills โ”€โ”€ if 'heal_percent' in effects: max_hp = player.get('max_hp', player_stats.get('max_hp', 100)) heal_amount = int(max_hp * effects['heal_percent']) new_hp = min(max_hp, player_hp + heal_amount) actual_heal = new_hp - player_hp if actual_heal > 0: await db.update_player_hp(player_id, new_hp) player_hp = new_hp messages.append(create_combat_message( "skill_heal", origin="player", heal=actual_heal, skill_name=skill.name, skill_icon=skill.icon )) # โ”€โ”€ Stamina restore โ”€โ”€ if 'stamina_restore_percent' in effects: max_stam = player.get('max_stamina', player_stats.get('max_stamina', 100)) restore = int(max_stam * effects['stamina_restore_percent']) new_stam = min(max_stam, new_stamina + restore) actual = new_stam - new_stamina if actual > 0: await db.update_player_stamina(player_id, new_stam) new_stamina = new_stam messages.append(create_combat_message( "skill_effect", origin="player", message=f"โšก +{actual} Stamina" )) # โ”€โ”€ Buff skills (fortify, berserker, evade, iron_skin, foresight) โ”€โ”€ if 'buff' in effects: buff_name = effects['buff'] duration = effects.get('buff_duration', 2) value = 0 if 'damage_reduction' in effects: value = int(effects['damage_reduction'] * 100) elif 'damage_bonus' in effects: value = int(effects['damage_bonus'] * 100) await db.add_effect( player_id=player_id, effect_name=buff_name, effect_icon=skill.icon, effect_type='buff', value=value, ticks_remaining=duration, persist_after_combat=False, source=f'skill:{skill.id}' ) messages.append(create_combat_message( "skill_buff", origin="player", skill_name=skill.name, skill_icon=skill.icon, duration=duration )) # โ”€โ”€ Analyze (PvE only) โ”€โ”€ if effects.get('mark_analyzed') and not is_pvp: existing = combat_state.get('npc_status_effects', '') or '' # Refresh in case previous effects updated it fresh_combat = await db.get_active_combat(player_id) if fresh_combat: existing = fresh_combat.get('npc_status_effects', '') or '' if 'analyzed' not in existing: if existing: existing += '|analyzed:0:99' else: existing = 'analyzed:0:99' await db.update_combat(player_id, {'npc_status_effects': existing}) npc_hp_pct = int((target['hp'] / target['max_hp']) * 100) if target['max_hp'] > 0 else 0 intent = combat_state.get('npc_intent', 'attack') messages.append(create_combat_message( "skill_analyze", origin="player", skill_icon=skill.icon, npc_name=target['name'], npc_hp_pct=npc_hp_pct, npc_intent=intent )) return { 'messages': messages, 'damage_dealt': damage_dealt, 'target_hp': target_hp, 'target_defeated': target_hp <= 0, 'player_hp': player_hp, 'player_stamina': new_stamina, 'error': None, } # ============================================================================ # USE ITEM ACTION # ============================================================================ async def execute_use_item( player_id: int, player: dict, player_stats: dict, item_id: str, combat_state: dict, target: dict, is_pvp: bool, items_manager: ItemsManager, locale: str = 'en', ) -> Dict[str, Any]: """ Use a combat item. Returns results without side effects on combat flow. Returns: { messages, effects_applied, target_hp, target_defeated, player_hp, combat_over, error (str or None) } """ # Get from inventory player_inventory = await db.get_inventory(player_id) inv_item = None for item in player_inventory: if item['item_id'] == item_id: inv_item = item break if not inv_item: return {'error': 'Item not found in inventory', 'status_code': 400} item_def = items_manager.get_item(item_id) if not item_def: return {'error': 'Unknown item', 'status_code': 400} if not item_def.combat_usable: return {'error': 'This item cannot be used in combat', 'status_code': 400} messages = [] effects_applied = [] item_name = get_locale_string(item_def.name, locale) player_hp = player['hp'] target_hp = target['hp'] if target else 0 target_defeated = False old_hp = player_hp old_stamina = player.get('stamina', 0) # 1. Status effects (e.g. Regeneration from Bandage) if item_def.effects.get('status_effect'): # Check status immunity buff_mods = await get_active_buff_modifiers(player_id) status_data = item_def.effects['status_effect'] await db.add_effect( player_id=player_id, effect_name=status_data['name'], effect_icon=status_data.get('icon', 'โœจ'), effect_type=status_data.get('type', 'buff'), damage_per_tick=status_data.get('damage_per_tick', 0), value=status_data.get('value', 0), ticks_remaining=status_data.get('ticks', 3), persist_after_combat=True, source=f"item:{item_def.id}" ) effects_applied.append(f"Applied {status_data['name']}") # 2. Cure status effects if item_def.effects.get('cures'): for cure_effect in item_def.effects['cures']: if await db.remove_effect(player_id, cure_effect): effects_applied.append(f"Cured {cure_effect}") # 3. HP restore if item_def.effects.get('hp_restore') and item_def.effects['hp_restore'] > 0: item_effectiveness = player_stats.get('item_effectiveness', 1.0) restore_amount = int(item_def.effects['hp_restore'] * item_effectiveness) max_hp = player.get('max_hp', player_stats.get('max_hp', 100)) new_hp = min(max_hp, player_hp + restore_amount) actual_heal = new_hp - player_hp if actual_heal > 0: await db.update_player_hp(player_id, new_hp) player_hp = new_hp effects_applied.append(f"+{actual_heal} HP") messages.append(create_combat_message("item_heal", origin="player", heal=actual_heal)) # 4. Stamina restore if item_def.effects.get('stamina_restore') and item_def.effects['stamina_restore'] > 0: item_effectiveness = player_stats.get('item_effectiveness', 1.0) restore_amount = int(item_def.effects['stamina_restore'] * item_effectiveness) max_stam = player.get('max_stamina', player_stats.get('max_stamina', 100)) new_stamina = min(max_stam, player.get('stamina', 0) + restore_amount) actual_restore = new_stamina - player.get('stamina', 0) if actual_restore > 0: await db.update_player_stamina(player_id, new_stamina) effects_applied.append(f"+{actual_restore} Stamina") messages.append(create_combat_message( "item_restore", origin="player", stat="stamina", amount=actual_restore )) # 5. Combat effects (throwables โ€” PvE only for now) combat_effects = getattr(item_def, 'combat_effects', None) or {} if combat_effects.get('damage_min') and combat_effects.get('damage_max') and not is_pvp: throwable_damage = random.randint(combat_effects['damage_min'], combat_effects['damage_max']) target_hp = max(0, target_hp - throwable_damage) effects_applied.append(f"{throwable_damage} damage") messages.append(create_combat_message( "item_damage", origin="player", damage=throwable_damage, item_name=item_name )) target_defeated = target_hp <= 0 # 6. Status effect on target (burn from molotov etc.) โ€” PvE only status_effect = combat_effects.get('status') if not is_pvp else None if status_effect and not target_defeated: npc_status = f"{status_effect['name']}:{status_effect.get('damage_per_tick', 0)}:{status_effect.get('ticks', 1)}" await db.update_combat(player_id, {'npc_status_effects': npc_status}) messages.append(create_combat_message( "effect_applied", origin="player", effect_name=status_effect['name'], effect_icon=status_effect.get('icon', '๐Ÿ”ฅ'), target="enemy" )) # Consume the item await db.remove_item_from_inventory(player_id, item_id, 1) await db.update_player_statistics(player_id, items_used=1, increment=True) # Item used message effects_str = f" ({', '.join(effects_applied)})" if effects_applied else "" hp_restored_val = player_hp - old_hp if player_hp > old_hp else 0 stamina_restored_val = 0 # Simplified messages.append(create_combat_message( "item_used", origin="player", item_name=item_name, effects=effects_str, hp_restore=hp_restored_val if hp_restored_val > 0 else None, stamina_restore=stamina_restored_val if stamina_restored_val > 0 else None )) return { 'messages': messages, 'effects_applied': effects_applied, 'target_hp': target_hp, 'target_defeated': target_defeated, 'player_hp': player_hp, 'error': None, } # ============================================================================ # FLEE ACTION # ============================================================================ async def execute_flee_pve( player_id: int, player: dict, player_stats: dict, combat: dict, npc_def, reduce_armor_func, locale: str = 'en', ) -> Dict[str, Any]: """ Attempt to flee from PvE combat. Returns: { messages, success, combat_over, player_defeated, player_hp, corpse_data (if died) } """ messages = [] flee_chance = player_stats.get('flee_chance_base', 0.5) if random.random() < flee_chance: # Success messages.append(create_combat_message( "flee_success", origin="player", message=get_game_message('flee_success_text', locale, name=player['name']) )) await db.update_player_statistics(player_id, successful_flees=1, increment=True) # Respawn wandering enemy if combat.get('from_wandering_enemy'): despawn_time = time.time() + 300 async with db.DatabaseSession() as session: from sqlalchemy import insert stmt = insert(db.wandering_enemies).values( npc_id=combat['npc_id'], location_id=combat['location_id'], spawn_timestamp=time.time(), despawn_timestamp=despawn_time ) await session.execute(stmt) await session.commit() await db.remove_non_persistent_effects(player_id) await db.end_combat(player_id) return { 'messages': messages, 'success': True, 'combat_over': True, 'player_defeated': False, 'player_hp': player['hp'], 'corpse_data': None } else: # Failed โ€” NPC gets a free attack npc_damage = random.randint(npc_def.damage_min, npc_def.damage_max) # Apply player's defensive buffs to flee penalty buff_mods = await get_active_buff_modifiers(player_id) if buff_mods['guaranteed_dodge'] or buff_mods['enemy_miss']: npc_damage = 0 messages.append(create_combat_message("combat_dodge", origin="player")) else: if buff_mods['damage_reduction'] > 0: npc_damage = max(1, int(npc_damage * (1 - buff_mods['damage_reduction']))) new_player_hp = max(0, player['hp'] - npc_damage) messages.append(create_combat_message( "flee_fail", origin="enemy", npc_name=npc_def.name, damage=npc_damage, message=get_game_message('flee_fail_text', locale, name=player['name']) )) if new_player_hp <= 0: # Died during flee attempt messages.append(create_combat_message( "player_defeated", origin="neutral", npc_name=npc_def.name )) await db.update_player(player_id, hp=0, is_dead=True) await db.update_player_statistics(player_id, deaths=1, failed_flees=1, damage_taken=npc_damage, increment=True) # Create corpse corpse_data = await _create_player_corpse(player_id, player, combat['location_id']) # Respawn enemy if combat.get('from_wandering_enemy'): despawn_time = time.time() + 300 async with db.DatabaseSession() as session: from sqlalchemy import insert stmt = insert(db.wandering_enemies).values( npc_id=combat['npc_id'], location_id=combat['location_id'], spawn_timestamp=time.time(), despawn_timestamp=despawn_time ) await session.execute(stmt) await session.commit() await db.remove_non_persistent_effects(player_id) await db.end_combat(player_id) return { 'messages': messages, 'success': False, 'combat_over': True, 'player_defeated': True, 'player_hp': 0, 'corpse_data': corpse_data } else: await db.update_player(player_id, hp=new_player_hp) await db.update_player_statistics(player_id, failed_flees=1, damage_taken=npc_damage, increment=True) await db.update_combat(player_id, {'turn': 'player', 'turn_started_at': time.time()}) return { 'messages': messages, 'success': False, 'combat_over': False, 'player_defeated': False, 'player_hp': new_player_hp, 'corpse_data': None } async def execute_flee_pvp( player_id: int, player: dict, player_stats: dict, locale: str = 'en', ) -> Dict[str, Any]: """ Attempt to flee from PvP combat. No penalty damage, just chance-based. Returns: { messages, success } """ messages = [] flee_chance = player_stats.get('flee_chance_base', 0.5) if random.random() < flee_chance: text = get_game_message('flee_success_text', locale, name=player['name']) messages.append(create_combat_message("flee_success", origin="player", message=text)) return {'messages': messages, 'success': True, 'last_action': text} else: text = get_game_message('flee_fail_text', locale, name=player['name']) messages.append(create_combat_message( "flee_fail", origin="player", reason="chance", message=text )) return {'messages': messages, 'success': False, 'last_action': text} # ============================================================================ # VICTORY / DEFEAT HELPERS # ============================================================================ async def handle_victory_pve( player: dict, combat: dict, npc_def, items_manager: ItemsManager, quests_data: dict, redis_manager, locale: str = 'en', ) -> Dict[str, Any]: """ Handle all post-victory PvE logic: XP, level up, corpse, quest progress. Returns: { messages, xp_gained, level_up_result, quest_updates } """ from .. import game_logic messages = [] quest_updates = [] # Award XP xp_gained = npc_def.xp_reward new_xp = player['xp'] + xp_gained messages.append(create_combat_message("xp_gain", origin="player", amount=xp_gained)) await db.update_player(player['id'], xp=new_xp) # Track kill await db.update_player_statistics(player['id'], enemies_killed=1, increment=True) # Level up check level_up_result = await game_logic.check_and_apply_level_up(player['id']) if level_up_result['leveled_up']: messages.append(create_combat_message( "level_up", origin="player", level=level_up_result['new_level'], stat_points=level_up_result['levels_gained'] )) # Create corpse corpse_loot = npc_def.corpse_loot if hasattr(npc_def, 'corpse_loot') else [] corpse_loot_dicts = [] for loot in corpse_loot: if hasattr(loot, '__dict__'): corpse_loot_dicts.append({ 'item_id': loot.item_id, 'quantity_min': loot.quantity_min, 'quantity_max': loot.quantity_max, 'required_tool': loot.required_tool }) else: corpse_loot_dicts.append(loot) await db.create_npc_corpse( npc_id=combat['npc_id'], location_id=player['location_id'], loot_remaining=json.dumps(corpse_loot_dicts) ) # Quest progress try: if quests_data: active_quests = await db.get_character_quests(player['id']) for q_record in active_quests: if q_record['status'] != 'active': continue q_def = quests_data.get(q_record['quest_id']) if not q_def: continue objectives = q_def.get('objectives', []) current_progress = q_record.get('progress') or {} new_progress = current_progress.copy() progress_changed = False for obj in objectives: if obj['type'] == 'kill_count' and obj['target'] == combat['npc_id']: current_count = current_progress.get(obj['target'], 0) if current_count < obj['count']: new_progress[obj['target']] = current_count + 1 progress_changed = True if progress_changed: progress_str = "" for obj in objectives: target = obj['target'] req_count = obj['count'] curr = new_progress.get(target, 0) if obj['type'] == 'kill_count' and target == combat['npc_id']: progress_str = f" ({curr}/{req_count})" await db.update_quest_progress(player['id'], q_record['quest_id'], new_progress, 'active') messages.append(create_combat_message( "quest_update", origin="system", message=f"{get_locale_string(q_def['title'], locale)}{progress_str}" )) updated_q_data = dict(q_record) updated_q_data['start_at'] = q_record['started_at'] updated_q_data.update(q_def) quest_updates.append(updated_q_data) except Exception as e: logger.error(f"Failed to update quest progress: {e}") # End combat await db.remove_non_persistent_effects(player['id']) await db.end_combat(player['id']) # Redis cache update if redis_manager: await redis_manager.delete_combat_state(player['id']) await redis_manager.update_player_session_field(player['id'], 'xp', new_xp) if level_up_result['leveled_up']: await redis_manager.update_player_session_field(player['id'], 'level', level_up_result['new_level']) return { 'messages': messages, 'xp_gained': xp_gained, 'new_xp': new_xp, 'level_up_result': level_up_result, 'quest_updates': quest_updates, } async def handle_victory_pvp( winner: dict, loser: dict, damage_dealt: int, items_manager: ItemsManager, locale: str = 'en', ) -> Dict[str, Any]: """ Handle PvP victory: corpse creation, stats, inventory clear. Returns: { messages, corpse_data } """ messages = [] messages.append(create_combat_message("victory", origin="neutral", npc_name=loser['name'])) await db.update_player(loser['id'], hp=0, is_dead=True) # Create corpse corpse_data = await _create_player_corpse(loser['id'], loser, loser['location_id'], items_manager) # Stats await db.update_player_statistics(loser['id'], pvp_deaths=1, pvp_combats_lost=1, pvp_damage_taken=damage_dealt, pvp_attacks_received=1, increment=True ) await db.update_player_statistics(winner['id'], players_killed=1, pvp_combats_won=1, pvp_damage_dealt=damage_dealt, pvp_attacks_landed=1, increment=True ) return { 'messages': messages, 'corpse_data': corpse_data, } async def handle_defeat_pve( player_id: int, player: dict, combat: dict, npc_def, items_manager: ItemsManager, locale: str = 'en', ) -> Dict[str, Any]: """ Handle PvE player death: corpse, stats, respawn enemy, end combat. Returns: { messages, corpse_data } """ messages = [] messages.append(create_combat_message( "player_defeated", origin="neutral", npc_name=npc_def.name )) await db.update_player(player_id, hp=0, is_dead=True) await db.update_player_statistics(player_id, deaths=1, increment=True) # Create corpse corpse_data = await _create_player_corpse(player_id, player, combat.get('location_id', player.get('location_id', '')), items_manager) # Respawn enemy if combat.get('from_wandering_enemy'): despawn_time = time.time() + 300 async with db.DatabaseSession() as session: from sqlalchemy import insert stmt = insert(db.wandering_enemies).values( npc_id=combat['npc_id'], location_id=combat['location_id'], spawn_timestamp=time.time(), despawn_timestamp=despawn_time ) await session.execute(stmt) await session.commit() await db.remove_non_persistent_effects(player_id) await db.end_combat(player_id) return { 'messages': messages, 'corpse_data': corpse_data, } # ============================================================================ # NPC TURN (wraps game_logic.npc_attack with buff checking) # ============================================================================ async def execute_npc_turn( player_id: int, combat: dict, npc_def, reduce_armor_func, redis_manager=None, ) -> Tuple[List[dict], bool]: """ Execute the NPC's turn with buff-aware damage reduction. Wraps game_logic.npc_attack but also checks for player buffs that alter incoming damage (fortify, berserker's damage taken increase). Returns: (messages, player_defeated) """ from ..services.stats import calculate_derived_stats stats = await calculate_derived_stats(player_id, redis_manager) # Check buff modifiers to pass along to npc_attack buff_mods = await get_active_buff_modifiers(player_id) # Inject buff-based modifiers into player_stats so npc_attack can use them stats['buff_damage_reduction'] = buff_mods['damage_reduction'] stats['buff_damage_taken_increase'] = buff_mods['damage_taken_increase'] stats['buff_guaranteed_dodge'] = buff_mods['guaranteed_dodge'] stats['buff_enemy_miss'] = buff_mods['enemy_miss'] from ..game_logic import npc_attack messages, player_defeated = await npc_attack( player_id, combat, npc_def, reduce_armor_func, player_stats=stats ) return messages, player_defeated # ============================================================================ # INTERNAL HELPERS # ============================================================================ async def _create_player_corpse( player_id: int, player: dict, location_id: str, items_manager: ItemsManager = None, ) -> Optional[Dict[str, Any]]: """ Create a corpse for a dead player with their inventory. Returns corpse_data dict or None if no items. """ import time as time_module inventory = await db.get_inventory(player_id) if not inventory: return None inventory_items = [] for inv_item in inventory: item_name = inv_item['item_id'] item_emoji = '๐Ÿ“ฆ' if items_manager: item_def = items_manager.get_item(inv_item['item_id']) if item_def: item_name = item_def.name if hasattr(item_def, 'name') else inv_item['item_id'] item_emoji = item_def.emoji if hasattr(item_def, 'emoji') else '๐Ÿ“ฆ' inventory_items.append({ 'item_id': inv_item['item_id'], 'name': item_name, 'emoji': item_emoji, 'quantity': inv_item['quantity'], 'durability': inv_item.get('durability'), 'max_durability': inv_item.get('max_durability'), 'tier': inv_item.get('tier') }) if not inventory_items: return None corpse_id = await db.create_player_corpse( player_name=player['name'], location_id=location_id, items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory]) ) await db.clear_inventory(player_id) return { "id": f"player_{corpse_id}", "type": "player", "name": f"{player['name']}'s Corpse", "emoji": "โšฐ๏ธ", "player_name": player['name'], "loot_count": len(inventory_items), "items": inventory_items, "timestamp": time_module.time() }