import sys import re with open('/opt/dockers/echoes_of_the_ashes/api/routers/combat.py', 'r', encoding='utf-8') as f: content = f.read() # 1. Tick Player Effects for PvP effect_tick_code = """ # Track the last action string for DB history last_action_text = "" # Process status effects (bleeding, poison, etc.) before action active_effects = await db.tick_player_effects(current_player['id']) if active_effects: from ..game_logic import calculate_status_impact total_impact = calculate_status_impact(active_effects) if total_impact > 0: damage = total_impact new_hp = max(0, current_player['hp'] - damage) await db.update_player(current_player['id'], hp=new_hp) current_player['hp'] = new_hp messages.append(create_combat_message( "effect_damage", origin="player", damage=damage, effect_name="status effects" )) if new_hp <= 0: messages.append(create_combat_message("died", origin="player", message="You died from status effects!")) combat_over = True winner_id = opponent['id'] # Update current player to dead state await db.update_player(current_player['id'], hp=0, is_dead=True) # Create corpse import json import time as time_module inventory = await db.get_inventory(current_player['id']) inventory_items = [] for inv_item in inventory: item_def = ITEMS_MANAGER.get_item(inv_item['item_id']) inventory_items.append({ 'item_id': inv_item['item_id'], 'name': item_def.name if item_def else inv_item['item_id'], 'emoji': item_def.emoji if (item_def and hasattr(item_def, 'emoji')) else '📦', 'quantity': inv_item['quantity'], 'durability': inv_item.get('durability'), 'max_durability': inv_item.get('max_durability'), 'tier': inv_item.get('tier') }) corpse_data = None if inventory_items: corpse_id = await db.create_player_corpse( player_name=current_player['name'], location_id=current_player['location_id'], items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory]) ) await db.clear_inventory(current_player['id']) corpse_data = { "id": f"player_{corpse_id}", "type": "player", "name": f"{current_player['name']}'s Corpse", "emoji": "⚰️", "player_name": current_player['name'], "loot_count": len(inventory_items), "items": inventory_items, "timestamp": time_module.time() } # Update PvP statistics for both players await db.update_player_statistics(current_player['id'], pvp_deaths=1, pvp_combats_lost=1, increment=True) await db.update_player_statistics(opponent['id'], players_killed=1, pvp_combats_won=1, increment=True) # Broadcast corpse broadcast_data = { "message": get_game_message('pvp_defeat_broadcast', locale, opponent=current_player['name'], winner=opponent['name']), "action": "player_died", "player_id": current_player['id'] } if corpse_data: broadcast_data["corpse"] = corpse_data await manager.send_to_location( location_id=current_player['location_id'], message={ "type": "location_update", "data": broadcast_data, "timestamp": datetime.utcnow().isoformat() } ) await db.end_pvp_combat(pvp_combat['id']) elif total_impact < 0: heal = abs(total_impact) new_hp = min(current_player_stats.get('max_hp', current_player['max_hp']), current_player['hp'] + heal) actual_heal = new_hp - current_player['hp'] if actual_heal > 0: await db.update_player(current_player['id'], hp=new_hp) current_player['hp'] = new_hp messages.append(create_combat_message( "effect_heal", origin="player", heal=actual_heal, effect_name="status effects" )) # Stop processing action if player died from status effects if not combat_over: """ content = content.replace( ' # Track the last action string for DB history\n last_action_text = ""', effect_tick_code ) # Fix indentation of the actions block since it's now wrapped in `if not combat_over:` import textwrap # 2. Add Skill and Use Item logic for PvP skill_and_item_code = """ elif req.action == 'skill': if not req.item_id: raise HTTPException(status_code=400, detail="skill_id (passed as item_id) required") from ..services.skills import skills_manager skill_id = req.item_id skill = skills_manager.get_skill(skill_id) if not skill: raise HTTPException(status_code=404, detail="Skill not found") # Check unlocked stat_val = current_player.get(skill.stat_requirement, 0) if stat_val < skill.stat_threshold or current_player['level'] < skill.level_requirement: raise HTTPException(status_code=400, detail="Skill not unlocked") # Check cooldown active_effects = await db.get_player_effects(current_player['id']) cd_source = f"cd:{skill.id}" for eff in active_effects: if eff.get('source') == cd_source and eff.get('ticks_remaining', 0) > 0: raise HTTPException(status_code=400, detail=f"Skill on cooldown ({eff['ticks_remaining']} turns)") # Check stamina if current_player['stamina'] < skill.stamina_cost: raise HTTPException(status_code=400, detail="Not enough stamina") # Deduct stamina new_stamina = current_player['stamina'] - skill.stamina_cost await db.update_player_stamina(current_player['id'], new_stamina) current_player['stamina'] = new_stamina # Add cooldown effect if skill.cooldown > 0: await db.add_effect( player_id=current_player['id'], effect_name=f"{skill.id}_cooldown", effect_icon="⏳", effect_type="cooldown", value=0, ticks_remaining=skill.cooldown, persist_after_combat=False, source=cd_source ) # Get weapon info equipment = await db.get_all_equipment(current_player['id']) weapon_damage = 0 weapon_inv_id = None inv_item = None weapon_def = None if equipment.get('weapon') and equipment['weapon']: weapon_slot = equipment['weapon'] inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id']) if inv_item: weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id']) if weapon_def and weapon_def.stats: weapon_damage = random.randint( weapon_def.stats.get('damage_min', 0), weapon_def.stats.get('damage_max', 0) ) weapon_inv_id = inv_item['id'] effects = skill.effects new_opponent_hp = opponent['hp'] damage_done = 0 actual_damage = 0 armor_absorbed = 0 # Damage skills if 'damage_multiplier' in effects: base_damage = 5 strength_bonus = int(current_player['strength'] * 1.5) level_bonus = current_player['level'] variance = random.randint(-2, 2) raw_damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance) multiplier = effects['damage_multiplier'] if 'execute_threshold' in effects: opponent_hp_pct = opponent['hp'] / opponent['max_hp'] if opponent['max_hp'] > 0 else 1 if opponent_hp_pct <= effects['execute_threshold']: multiplier = effects.get('execute_multiplier', multiplier) damage = max(1, int(raw_damage * multiplier)) if effects.get('guaranteed_crit'): damage = int(damage * 1.5) num_hits = effects.get('hits', 1) for hit in range(num_hits): hit_dmg = damage if hit == 0 else max(1, int(raw_damage * multiplier)) absorbed, broken_armor = await reduce_armor_durability(opponent['id'], hit_dmg) armor_absorbed += absorbed for broken in broken_armor: messages.append(create_combat_message( "item_broken", origin="enemy", item_name=broken['name'], emoji=broken['emoji'] )) last_action_text += f"\\n💔 {opponent['name']}'s {broken['emoji']} {broken['name']} broke!" actual_hit = max(1, hit_dmg - absorbed) damage_done += actual_hit new_opponent_hp = max(0, new_opponent_hp - actual_hit) actual_damage = damage_done messages.append(create_combat_message( "skill_attack", origin="player", damage=damage_done, skill_name=skill.name, skill_icon=skill.icon, hits=num_hits )) last_action_text = f"{current_player['name']} used {skill.name} on {opponent['name']} for {damage_done} damage! (Armor absorbed {armor_absorbed})" # Lifesteal if 'lifesteal' in effects: heal_amount = int(damage_done * effects['lifesteal']) new_hp = min(current_player['max_hp'], current_player['hp'] + heal_amount) if new_hp > current_player['hp']: await db.update_player(current_player['id'], hp=new_hp) current_player['hp'] = new_hp messages.append(create_combat_message( "skill_heal", origin="player", heal=heal_amount, skill_icon="🩸" )) # Poison DoT if 'poison_damage' in effects: await db.add_effect( player_id=opponent['id'], effect_name="Poison", effect_icon="🧪", effect_type="damage", damage_per_tick=effects['poison_damage'], ticks_remaining=effects['poison_duration'], persist_after_combat=True, source=f"skill_poison:{skill.id}" ) messages.append(create_combat_message( "skill_effect", origin="player", message=f"🧪 Poisoned! ({effects['poison_damage']} dmg/turn)" )) # Stun chance if 'stun_chance' in effects and random.random() < effects['stun_chance']: # Stun in PvP can be modeled as taking away a turn await db.add_effect( player_id=opponent['id'], effect_name="Stunned", effect_icon="💫", effect_type="debuff", ticks_remaining=1, persist_after_combat=False, source="skill_stun" ) messages.append(create_combat_message( "skill_effect", origin="player", message="💫 Stunned! (Currently skip effect)" )) # Weapon durability if weapon_inv_id and inv_item and inv_item.get('unique_item_id'): new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1) if new_durability is None: messages.append(create_combat_message("weapon_broke", origin="player", item_name=weapon_def.name if weapon_def else "weapon")) await db.unequip_item(current_player['id'], 'weapon') # Heal skills if 'heal_percent' in effects: heal_amount = int(current_player['max_hp'] * effects['heal_percent']) new_hp = min(current_player['max_hp'], current_player['hp'] + heal_amount) actual_heal = new_hp - current_player['hp'] if actual_heal > 0: await db.update_player(current_player['id'], hp=new_hp) current_player['hp'] = new_hp messages.append(create_combat_message( "skill_heal", origin="player", heal=actual_heal, skill_icon=skill.icon )) last_action_text = f"{current_player['name']} used {skill.name} and healed for {actual_heal} HP!" # Fortify if 'armor_boost' in effects: await db.add_effect( player_id=current_player['id'], effect_name="Fortify", effect_icon="🛡️", effect_type="buff", value=effects['armor_boost'], ticks_remaining=effects['duration'], persist_after_combat=False, source=f"skill_fortify:{skill.id}" ) messages.append(create_combat_message( "skill_effect", origin="player", message=f"🛡️ Fortified! (+{effects['armor_boost']} Armor)" )) last_action_text = f"{current_player['name']} used {skill.name} and bolstered their defenses!" # Process opponent HP if damage done if damage_done > 0: await db.update_player(opponent['id'], hp=new_opponent_hp) if new_opponent_hp <= 0: last_action_text += f"\\n🏆 {current_player['name']} has defeated {opponent['name']}!" messages.append(create_combat_message("victory", origin="neutral", npc_name=opponent['name'])) combat_over = True winner_id = current_player['id'] await db.update_player(opponent['id'], hp=0, is_dead=True) # Create corpse import json import time as time_module inventory = await db.get_inventory(opponent['id']) inventory_items = [] for inv_item in inventory: item_def = ITEMS_MANAGER.get_item(inv_item['item_id']) inventory_items.append({ 'item_id': inv_item['item_id'], 'name': item_def.name if item_def else inv_item['item_id'], 'emoji': item_def.emoji if (item_def and hasattr(item_def, 'emoji')) else '📦', 'quantity': inv_item['quantity'], 'durability': inv_item.get('durability'), 'max_durability': inv_item.get('max_durability'), 'tier': inv_item.get('tier') }) corpse_data = None if inventory_items: corpse_id = await db.create_player_corpse( player_name=opponent['name'], location_id=opponent['location_id'], items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory]) ) await db.clear_inventory(opponent['id']) corpse_data = { "id": f"player_{corpse_id}", "type": "player", "name": f"{opponent['name']}'s Corpse", "emoji": "⚰️", "player_name": opponent['name'], "loot_count": len(inventory_items), "items": inventory_items, "timestamp": time_module.time() } # Update statistics await db.update_player_statistics(opponent['id'], pvp_deaths=1, pvp_combats_lost=1, pvp_damage_taken=actual_damage, pvp_attacks_received=1, increment=True) await db.update_player_statistics(current_player['id'], players_killed=1, pvp_combats_won=1, pvp_damage_dealt=actual_damage, pvp_attacks_landed=1, increment=True) # Broadcast corpse broadcast_data = { "message": get_game_message('pvp_defeat_broadcast', locale, opponent=opponent['name'], winner=current_player['name']), "action": "player_died", "player_id": opponent['id'] } if corpse_data: broadcast_data["corpse"] = corpse_data await manager.send_to_location( location_id=opponent['location_id'], message={ "type": "location_update", "data": broadcast_data, "timestamp": datetime.utcnow().isoformat() } ) await db.end_pvp_combat(pvp_combat['id']) else: await db.update_player_statistics(current_player['id'], pvp_damage_dealt=actual_damage, pvp_attacks_landed=1, increment=True) await db.update_player_statistics(opponent['id'], pvp_damage_taken=actual_damage, pvp_attacks_received=1, increment=True) # End of turn swap updates = { 'turn': 'defender' if is_attacker else 'attacker', 'turn_started_at': time.time(), 'last_action': f"{last_action_text}|{time.time()}" } await db.update_pvp_combat(pvp_combat['id'], updates) else: # Skill didn't do damage, but turn still ends updates = { 'turn': 'defender' if is_attacker else 'attacker', 'turn_started_at': time.time(), 'last_action': f"{last_action_text}|{time.time()}" } await db.update_pvp_combat(pvp_combat['id'], updates) elif req.action == 'use_item': if not req.item_id: raise HTTPException(status_code=400, detail="item_id required for use_item action") player_inventory = await db.get_inventory(current_player['id']) inv_item = next((item for item in player_inventory if item['item_id'] == req.item_id), None) if not inv_item: raise HTTPException(status_code=400, detail="Item not found in inventory") item_def = ITEMS_MANAGER.get_item(req.item_id) if not item_def or not item_def.combat_usable: raise HTTPException(status_code=400, detail="This item cannot be used in combat") item_name = get_locale_string(item_def.name, locale) effects_applied = [] if item_def.effects.get('status_effect'): status_data = item_def.effects['status_effect'] await db.add_effect( player_id=current_player['id'], effect_name=status_data['name'], effect_icon=status_data.get('icon', '✨'), effect_type=status_data.get('type', 'buff'), damage_per_tick=status_data.get('damage_per_tick', 0), value=status_data.get('value', 0), ticks_remaining=status_data.get('ticks', 3), persist_after_combat=True, source=f"item:{item_def.id}" ) effects_applied.append(f"Applied {status_data['name']}") if item_def.effects.get('cures'): for cure_effect in item_def.effects['cures']: if await db.remove_effect(current_player['id'], cure_effect): effects_applied.append(f"Cured {cure_effect}") if item_def.effects.get('hp_restore') and item_def.effects['hp_restore'] > 0: item_effectiveness = current_player_stats.get('item_effectiveness', 1.0) restore_amount = int(item_def.effects['hp_restore'] * item_effectiveness) new_hp = min(current_player['max_hp'], current_player['hp'] + restore_amount) actual_heal = new_hp - current_player['hp'] if actual_heal > 0: await db.update_player(current_player['id'], hp=new_hp) current_player['hp'] = new_hp effects_applied.append(get_game_message('healed_amount_text', locale, amount=actual_heal)) messages.append(create_combat_message("item_heal", origin="player", heal=actual_heal)) if item_def.effects.get('stamina_restore') and item_def.effects['stamina_restore'] > 0: item_effectiveness = current_player_stats.get('item_effectiveness', 1.0) restore_amount = int(item_def.effects['stamina_restore'] * item_effectiveness) new_stamina = min(current_player['max_stamina'], current_player['stamina'] + restore_amount) actual_restore = new_stamina - current_player['stamina'] if actual_restore > 0: await db.update_player_stamina(current_player['id'], new_stamina) effects_applied.append(f"Restored {actual_restore} stamina") messages.append(create_combat_message("item_restore", origin="player", stat="stamina", amount=actual_restore)) if inv_item['quantity'] > 1: await db.update_inventory_quantity(inv_item['id'], inv_item['quantity'] - 1) else: await db.remove_from_inventory(inv_item['id']) messages.append(create_combat_message( "use_item", origin="player", item_name=item_name, message=get_game_message('used_item_text', locale, name=item_name) + " (" + ", ".join(effects_applied) + ")" )) last_action_text = f"{current_player['name']} used {item_name}!" # End of turn swap updates = { 'turn': 'defender' if is_attacker else 'attacker', 'turn_started_at': time.time(), 'last_action': f"{last_action_text}|{time.time()}" } await db.update_pvp_combat(pvp_combat['id'], updates) """ content = content.replace( " elif req.action == 'flee':", skill_and_item_code + "\n elif req.action == 'flee':" ) # Indent the blocks bounded by the combat_over condition lines = content.split('\\n') inside_combat_over = False new_lines = [] for line in lines: new_lines.append(line) # Since doing line manipulation might be tricky via replace string, # we need to be very precise. We will apply the wrapper around attack and flee logic inside the Python script by using Regex. # Well, wait, I can just write out the fully rewritten function or use `re` substitution for indenting. # It is simpler to just ensure all attack and flee actions are under `if not combat_over:` # by indenting the whole block manually in the script. try: with open('/opt/dockers/echoes_of_the_ashes/api/routers/combat.py', 'w', encoding='utf-8') as f: f.write(content) print("Script ran. Patched combat.py.") except Exception as e: print(e)