Files
echoes-of-the-ash/update_pvp.py

522 lines
24 KiB
Python

import sys
import re
with open('/opt/dockers/echoes_of_the_ashes/api/routers/combat.py', 'r', encoding='utf-8') as f:
content = f.read()
# 1. Tick Player Effects for PvP
effect_tick_code = """
# Track the last action string for DB history
last_action_text = ""
# Process status effects (bleeding, poison, etc.) before action
active_effects = await db.tick_player_effects(current_player['id'])
if active_effects:
from ..game_logic import calculate_status_impact
total_impact = calculate_status_impact(active_effects)
if total_impact > 0:
damage = total_impact
new_hp = max(0, current_player['hp'] - damage)
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"effect_damage",
origin="player",
damage=damage,
effect_name="status effects"
))
if new_hp <= 0:
messages.append(create_combat_message("died", origin="player", message="You died from status effects!"))
combat_over = True
winner_id = opponent['id']
# Update current player to dead state
await db.update_player(current_player['id'], hp=0, is_dead=True)
# Create corpse
import json
import time as time_module
inventory = await db.get_inventory(current_player['id'])
inventory_items = []
for inv_item in inventory:
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
inventory_items.append({
'item_id': inv_item['item_id'],
'name': item_def.name if item_def else inv_item['item_id'],
'emoji': item_def.emoji if (item_def and hasattr(item_def, 'emoji')) else '📦',
'quantity': inv_item['quantity'],
'durability': inv_item.get('durability'),
'max_durability': inv_item.get('max_durability'),
'tier': inv_item.get('tier')
})
corpse_data = None
if inventory_items:
corpse_id = await db.create_player_corpse(
player_name=current_player['name'],
location_id=current_player['location_id'],
items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory])
)
await db.clear_inventory(current_player['id'])
corpse_data = {
"id": f"player_{corpse_id}",
"type": "player",
"name": f"{current_player['name']}'s Corpse",
"emoji": "⚰️",
"player_name": current_player['name'],
"loot_count": len(inventory_items),
"items": inventory_items,
"timestamp": time_module.time()
}
# Update PvP statistics for both players
await db.update_player_statistics(current_player['id'], pvp_deaths=1, pvp_combats_lost=1, increment=True)
await db.update_player_statistics(opponent['id'], players_killed=1, pvp_combats_won=1, increment=True)
# Broadcast corpse
broadcast_data = {
"message": get_game_message('pvp_defeat_broadcast', locale, opponent=current_player['name'], winner=opponent['name']),
"action": "player_died",
"player_id": current_player['id']
}
if corpse_data:
broadcast_data["corpse"] = corpse_data
await manager.send_to_location(
location_id=current_player['location_id'],
message={
"type": "location_update",
"data": broadcast_data,
"timestamp": datetime.utcnow().isoformat()
}
)
await db.end_pvp_combat(pvp_combat['id'])
elif total_impact < 0:
heal = abs(total_impact)
new_hp = min(current_player_stats.get('max_hp', current_player['max_hp']), current_player['hp'] + heal)
actual_heal = new_hp - current_player['hp']
if actual_heal > 0:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"effect_heal",
origin="player",
heal=actual_heal,
effect_name="status effects"
))
# Stop processing action if player died from status effects
if not combat_over:
"""
content = content.replace(
' # Track the last action string for DB history\n last_action_text = ""',
effect_tick_code
)
# Fix indentation of the actions block since it's now wrapped in `if not combat_over:`
import textwrap
# 2. Add Skill and Use Item logic for PvP
skill_and_item_code = """
elif req.action == 'skill':
if not req.item_id:
raise HTTPException(status_code=400, detail="skill_id (passed as item_id) required")
from ..services.skills import skills_manager
skill_id = req.item_id
skill = skills_manager.get_skill(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# Check unlocked
stat_val = current_player.get(skill.stat_requirement, 0)
if stat_val < skill.stat_threshold or current_player['level'] < skill.level_requirement:
raise HTTPException(status_code=400, detail="Skill not unlocked")
# Check cooldown
active_effects = await db.get_player_effects(current_player['id'])
cd_source = f"cd:{skill.id}"
for eff in active_effects:
if eff.get('source') == cd_source and eff.get('ticks_remaining', 0) > 0:
raise HTTPException(status_code=400, detail=f"Skill on cooldown ({eff['ticks_remaining']} turns)")
# Check stamina
if current_player['stamina'] < skill.stamina_cost:
raise HTTPException(status_code=400, detail="Not enough stamina")
# Deduct stamina
new_stamina = current_player['stamina'] - skill.stamina_cost
await db.update_player_stamina(current_player['id'], new_stamina)
current_player['stamina'] = new_stamina
# Add cooldown effect
if skill.cooldown > 0:
await db.add_effect(
player_id=current_player['id'],
effect_name=f"{skill.id}_cooldown",
effect_icon="",
effect_type="cooldown",
value=0,
ticks_remaining=skill.cooldown,
persist_after_combat=False,
source=cd_source
)
# Get weapon info
equipment = await db.get_all_equipment(current_player['id'])
weapon_damage = 0
weapon_inv_id = None
inv_item = None
weapon_def = None
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def and weapon_def.stats:
weapon_damage = random.randint(
weapon_def.stats.get('damage_min', 0),
weapon_def.stats.get('damage_max', 0)
)
weapon_inv_id = inv_item['id']
effects = skill.effects
new_opponent_hp = opponent['hp']
damage_done = 0
actual_damage = 0
armor_absorbed = 0
# Damage skills
if 'damage_multiplier' in effects:
base_damage = 5
strength_bonus = int(current_player['strength'] * 1.5)
level_bonus = current_player['level']
variance = random.randint(-2, 2)
raw_damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
multiplier = effects['damage_multiplier']
if 'execute_threshold' in effects:
opponent_hp_pct = opponent['hp'] / opponent['max_hp'] if opponent['max_hp'] > 0 else 1
if opponent_hp_pct <= effects['execute_threshold']:
multiplier = effects.get('execute_multiplier', multiplier)
damage = max(1, int(raw_damage * multiplier))
if effects.get('guaranteed_crit'):
damage = int(damage * 1.5)
num_hits = effects.get('hits', 1)
for hit in range(num_hits):
hit_dmg = damage if hit == 0 else max(1, int(raw_damage * multiplier))
absorbed, broken_armor = await reduce_armor_durability(opponent['id'], hit_dmg)
armor_absorbed += absorbed
for broken in broken_armor:
messages.append(create_combat_message(
"item_broken",
origin="enemy",
item_name=broken['name'],
emoji=broken['emoji']
))
last_action_text += f"\\n💔 {opponent['name']}'s {broken['emoji']} {broken['name']} broke!"
actual_hit = max(1, hit_dmg - absorbed)
damage_done += actual_hit
new_opponent_hp = max(0, new_opponent_hp - actual_hit)
actual_damage = damage_done
messages.append(create_combat_message(
"skill_attack",
origin="player",
damage=damage_done,
skill_name=skill.name,
skill_icon=skill.icon,
hits=num_hits
))
last_action_text = f"{current_player['name']} used {skill.name} on {opponent['name']} for {damage_done} damage! (Armor absorbed {armor_absorbed})"
# Lifesteal
if 'lifesteal' in effects:
heal_amount = int(damage_done * effects['lifesteal'])
new_hp = min(current_player['max_hp'], current_player['hp'] + heal_amount)
if new_hp > current_player['hp']:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=heal_amount, skill_icon="🩸"
))
# Poison DoT
if 'poison_damage' in effects:
await db.add_effect(
player_id=opponent['id'],
effect_name="Poison",
effect_icon="🧪",
effect_type="damage",
damage_per_tick=effects['poison_damage'],
ticks_remaining=effects['poison_duration'],
persist_after_combat=True,
source=f"skill_poison:{skill.id}"
)
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"🧪 Poisoned! ({effects['poison_damage']} dmg/turn)"
))
# Stun chance
if 'stun_chance' in effects and random.random() < effects['stun_chance']:
# Stun in PvP can be modeled as taking away a turn
await db.add_effect(
player_id=opponent['id'],
effect_name="Stunned",
effect_icon="💫",
effect_type="debuff",
ticks_remaining=1,
persist_after_combat=False,
source="skill_stun"
)
messages.append(create_combat_message(
"skill_effect", origin="player", message="💫 Stunned! (Currently skip effect)"
))
# Weapon durability
if weapon_inv_id and inv_item and inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
if new_durability is None:
messages.append(create_combat_message("weapon_broke", origin="player", item_name=weapon_def.name if weapon_def else "weapon"))
await db.unequip_item(current_player['id'], 'weapon')
# Heal skills
if 'heal_percent' in effects:
heal_amount = int(current_player['max_hp'] * effects['heal_percent'])
new_hp = min(current_player['max_hp'], current_player['hp'] + heal_amount)
actual_heal = new_hp - current_player['hp']
if actual_heal > 0:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=actual_heal, skill_icon=skill.icon
))
last_action_text = f"{current_player['name']} used {skill.name} and healed for {actual_heal} HP!"
# Fortify
if 'armor_boost' in effects:
await db.add_effect(
player_id=current_player['id'],
effect_name="Fortify",
effect_icon="🛡️",
effect_type="buff",
value=effects['armor_boost'],
ticks_remaining=effects['duration'],
persist_after_combat=False,
source=f"skill_fortify:{skill.id}"
)
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"🛡️ Fortified! (+{effects['armor_boost']} Armor)"
))
last_action_text = f"{current_player['name']} used {skill.name} and bolstered their defenses!"
# Process opponent HP if damage done
if damage_done > 0:
await db.update_player(opponent['id'], hp=new_opponent_hp)
if new_opponent_hp <= 0:
last_action_text += f"\\n🏆 {current_player['name']} has defeated {opponent['name']}!"
messages.append(create_combat_message("victory", origin="neutral", npc_name=opponent['name']))
combat_over = True
winner_id = current_player['id']
await db.update_player(opponent['id'], hp=0, is_dead=True)
# Create corpse
import json
import time as time_module
inventory = await db.get_inventory(opponent['id'])
inventory_items = []
for inv_item in inventory:
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
inventory_items.append({
'item_id': inv_item['item_id'],
'name': item_def.name if item_def else inv_item['item_id'],
'emoji': item_def.emoji if (item_def and hasattr(item_def, 'emoji')) else '📦',
'quantity': inv_item['quantity'],
'durability': inv_item.get('durability'),
'max_durability': inv_item.get('max_durability'),
'tier': inv_item.get('tier')
})
corpse_data = None
if inventory_items:
corpse_id = await db.create_player_corpse(
player_name=opponent['name'],
location_id=opponent['location_id'],
items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory])
)
await db.clear_inventory(opponent['id'])
corpse_data = {
"id": f"player_{corpse_id}",
"type": "player",
"name": f"{opponent['name']}'s Corpse",
"emoji": "⚰️",
"player_name": opponent['name'],
"loot_count": len(inventory_items),
"items": inventory_items,
"timestamp": time_module.time()
}
# Update statistics
await db.update_player_statistics(opponent['id'], pvp_deaths=1, pvp_combats_lost=1, pvp_damage_taken=actual_damage, pvp_attacks_received=1, increment=True)
await db.update_player_statistics(current_player['id'], players_killed=1, pvp_combats_won=1, pvp_damage_dealt=actual_damage, pvp_attacks_landed=1, increment=True)
# Broadcast corpse
broadcast_data = {
"message": get_game_message('pvp_defeat_broadcast', locale, opponent=opponent['name'], winner=current_player['name']),
"action": "player_died",
"player_id": opponent['id']
}
if corpse_data:
broadcast_data["corpse"] = corpse_data
await manager.send_to_location(
location_id=opponent['location_id'],
message={
"type": "location_update",
"data": broadcast_data,
"timestamp": datetime.utcnow().isoformat()
}
)
await db.end_pvp_combat(pvp_combat['id'])
else:
await db.update_player_statistics(current_player['id'], pvp_damage_dealt=actual_damage, pvp_attacks_landed=1, increment=True)
await db.update_player_statistics(opponent['id'], pvp_damage_taken=actual_damage, pvp_attacks_received=1, increment=True)
# End of turn swap
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
}
await db.update_pvp_combat(pvp_combat['id'], updates)
else:
# Skill didn't do damage, but turn still ends
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
}
await db.update_pvp_combat(pvp_combat['id'], updates)
elif req.action == 'use_item':
if not req.item_id:
raise HTTPException(status_code=400, detail="item_id required for use_item action")
player_inventory = await db.get_inventory(current_player['id'])
inv_item = next((item for item in player_inventory if item['item_id'] == req.item_id), None)
if not inv_item:
raise HTTPException(status_code=400, detail="Item not found in inventory")
item_def = ITEMS_MANAGER.get_item(req.item_id)
if not item_def or not item_def.combat_usable:
raise HTTPException(status_code=400, detail="This item cannot be used in combat")
item_name = get_locale_string(item_def.name, locale)
effects_applied = []
if item_def.effects.get('status_effect'):
status_data = item_def.effects['status_effect']
await db.add_effect(
player_id=current_player['id'],
effect_name=status_data['name'],
effect_icon=status_data.get('icon', ''),
effect_type=status_data.get('type', 'buff'),
damage_per_tick=status_data.get('damage_per_tick', 0),
value=status_data.get('value', 0),
ticks_remaining=status_data.get('ticks', 3),
persist_after_combat=True,
source=f"item:{item_def.id}"
)
effects_applied.append(f"Applied {status_data['name']}")
if item_def.effects.get('cures'):
for cure_effect in item_def.effects['cures']:
if await db.remove_effect(current_player['id'], cure_effect):
effects_applied.append(f"Cured {cure_effect}")
if item_def.effects.get('hp_restore') and item_def.effects['hp_restore'] > 0:
item_effectiveness = current_player_stats.get('item_effectiveness', 1.0)
restore_amount = int(item_def.effects['hp_restore'] * item_effectiveness)
new_hp = min(current_player['max_hp'], current_player['hp'] + restore_amount)
actual_heal = new_hp - current_player['hp']
if actual_heal > 0:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
effects_applied.append(get_game_message('healed_amount_text', locale, amount=actual_heal))
messages.append(create_combat_message("item_heal", origin="player", heal=actual_heal))
if item_def.effects.get('stamina_restore') and item_def.effects['stamina_restore'] > 0:
item_effectiveness = current_player_stats.get('item_effectiveness', 1.0)
restore_amount = int(item_def.effects['stamina_restore'] * item_effectiveness)
new_stamina = min(current_player['max_stamina'], current_player['stamina'] + restore_amount)
actual_restore = new_stamina - current_player['stamina']
if actual_restore > 0:
await db.update_player_stamina(current_player['id'], new_stamina)
effects_applied.append(f"Restored {actual_restore} stamina")
messages.append(create_combat_message("item_restore", origin="player", stat="stamina", amount=actual_restore))
if inv_item['quantity'] > 1:
await db.update_inventory_quantity(inv_item['id'], inv_item['quantity'] - 1)
else:
await db.remove_from_inventory(inv_item['id'])
messages.append(create_combat_message(
"use_item", origin="player", item_name=item_name,
message=get_game_message('used_item_text', locale, name=item_name) + " (" + ", ".join(effects_applied) + ")"
))
last_action_text = f"{current_player['name']} used {item_name}!"
# End of turn swap
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
}
await db.update_pvp_combat(pvp_combat['id'], updates)
"""
content = content.replace(
" elif req.action == 'flee':",
skill_and_item_code + "\n elif req.action == 'flee':"
)
# Indent the blocks bounded by the combat_over condition
lines = content.split('\\n')
inside_combat_over = False
new_lines = []
for line in lines:
new_lines.append(line)
# Since doing line manipulation might be tricky via replace string,
# we need to be very precise. We will apply the wrapper around attack and flee logic inside the Python script by using Regex.
# Well, wait, I can just write out the fully rewritten function or use `re` substitution for indenting.
# It is simpler to just ensure all attack and flee actions are under `if not combat_over:`
# by indenting the whole block manually in the script.
try:
with open('/opt/dockers/echoes_of_the_ashes/api/routers/combat.py', 'w', encoding='utf-8') as f:
f.write(content)
print("Script ran. Patched combat.py.")
except Exception as e:
print(e)