Files
echoes-of-the-ash/api/routers/combat.py

2820 lines
123 KiB
Python

"""
Combat router.
Auto-generated from main.py migration.
"""
from fastapi import APIRouter, HTTPException, Depends, status, Request
from fastapi.security import HTTPAuthorizationCredentials
from typing import Optional, Dict, Any
from datetime import datetime
import random
import json
import logging
from ..services.constants import PVP_TURN_TIMEOUT
from ..core.security import get_current_user, security, verify_internal_key
from ..services.models import *
from ..services.helpers import calculate_distance, calculate_stamina_cost, calculate_player_capacity, get_locale_string, create_combat_message, get_game_message
from .. import database as db
from ..items import ItemsManager
from .. import game_logic
from ..core.websockets import manager
from .equipment import reduce_armor_durability
logger = logging.getLogger(__name__)
# These will be injected by main.py
LOCATIONS = None
ITEMS_MANAGER = None
WORLD = None
redis_manager = None
QUESTS_DATA = {}
def init_router_dependencies(locations, items_manager, world, redis_mgr=None, quests_data=None):
"""Initialize router with game data dependencies"""
global LOCATIONS, ITEMS_MANAGER, WORLD, redis_manager, QUESTS_DATA
LOCATIONS = locations
ITEMS_MANAGER = items_manager
WORLD = world
redis_manager = redis_mgr
if quests_data:
QUESTS_DATA = quests_data
router = APIRouter(tags=["combat"])
# Endpoints
@router.get("/api/game/combat")
async def get_combat_status(current_user: dict = Depends(get_current_user)):
"""Get current combat status"""
combat = await db.get_active_combat(current_user['id'])
if not combat:
return {"in_combat": False}
# Load NPC data from npcs.json
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
npc_def = NPCS.get(combat['npc_id'])
# Calculate time remaining for turn (server-side to avoid clock offset)
import time
turn_time_remaining = None
if combat['turn'] == 'player':
turn_started_at = combat.get('turn_started_at', 0)
time_elapsed = time.time() - turn_started_at
turn_time_remaining = max(0, 300 - time_elapsed) # 5 minutes = 300 seconds
return {
"in_combat": True,
"combat": {
"npc_id": combat['npc_id'],
"npc_name": npc_def.name if npc_def else combat['npc_id'].replace('_', ' ').title(),
"npc_hp": combat['npc_hp'],
"npc_max_hp": combat['npc_max_hp'],
"npc_image": f"{npc_def.image_path}" if npc_def else None,
"turn": combat['turn'],
"round": combat.get('round', 1),
"turn_time_remaining": turn_time_remaining
}
}
@router.post("/api/game/combat/initiate")
async def initiate_combat(
req: InitiateCombatRequest,
request: Request,
current_user: dict = Depends(get_current_user)
):
"""Start combat with a wandering enemy"""
import random
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
# Extract locale from Accept-Language header
locale = request.headers.get('Accept-Language', 'en')
# Check if already in combat
existing_combat = await db.get_active_combat(current_user['id'])
if existing_combat:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Already in combat"
)
# Get enemy from wandering_enemies table
async with db.DatabaseSession() as session:
from sqlalchemy import select
stmt = select(db.wandering_enemies).where(db.wandering_enemies.c.id == req.enemy_id)
result = await session.execute(stmt)
enemy = result.fetchone()
if not enemy:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Enemy not found"
)
# Get NPC definition
npc_def = NPCS.get(enemy.npc_id)
if not npc_def:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="NPC definition not found"
)
# Randomize HP
npc_hp = random.randint(npc_def.hp_min, npc_def.hp_max)
# Create combat
combat = await db.create_combat(
player_id=current_user['id'],
npc_id=enemy.npc_id,
npc_hp=npc_hp,
npc_max_hp=npc_hp,
location_id=current_user['location_id'],
from_wandering=True
)
# Remove the wandering enemy from the location
async with db.DatabaseSession() as session:
from sqlalchemy import delete
stmt = delete(db.wandering_enemies).where(db.wandering_enemies.c.id == req.enemy_id)
await session.execute(stmt)
await session.commit()
# Track combat initiation
await db.update_player_statistics(current_user['id'], combats_initiated=1, increment=True)
# Get player info for broadcasts
player = current_user # current_user is already the character dict
# Send WebSocket update to the player
await manager.send_personal_message(current_user['id'], {
"type": "combat_started",
"data": {
"messages": [create_combat_message("combat_start", origin="neutral", npc_name=npc_def.name)],
"combat": {
"npc_id": enemy.npc_id,
"npc_name": npc_def.name,
"npc_hp": npc_hp,
"npc_max_hp": npc_hp,
"npc_image": f"{npc_def.image_path}",
"turn": "player",
"round": 1
}
},
"timestamp": datetime.utcnow().isoformat()
})
# Broadcast to location that player entered combat
await manager.send_to_location(
location_id=current_user['location_id'],
message={
"type": "location_update",
"data": {
"message": get_game_message('player_entered_combat', locale, player_name=player['name'], npc_name=get_locale_string(npc_def.name, locale)),
"action": "combat_started",
"player_id": player['id']
},
"timestamp": datetime.utcnow().isoformat()
},
exclude_player_id=current_user['id']
)
return {
"success": True,
"messages": [create_combat_message("combat_start", origin="neutral", npc_name=npc_def.name)],
"combat": {
"npc_id": enemy.npc_id,
"npc_name": npc_def.name,
"npc_hp": npc_hp,
"npc_max_hp": npc_hp,
"npc_image": f"{npc_def.image_path}",
"turn": "player",
"round": 1
}
}
@router.post("/api/game/combat/action")
async def combat_action(
req: CombatActionRequest,
request: Request,
current_user: dict = Depends(get_current_user)
):
"""Perform a combat action"""
import random
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
# Extract locale from Accept-Language header
locale = request.headers.get('Accept-Language', 'en')
# Get active combat
combat = await db.get_active_combat(current_user['id'])
if not combat:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not in combat"
)
if combat['turn'] != 'player':
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not your turn"
)
# Prevent rapid-fire attacks: Check if enough time has passed since last action
# This prevents abuse by reloading the page to bypass the 2-second enemy turn delay
# Skip this check on the first turn (round 1) since player always starts
import time
current_round = combat.get('round', 1)
if current_round > 1: # Only check after first turn
turn_started_at = combat.get('turn_started_at', 0)
time_since_turn_start = time.time() - turn_started_at
# If the turn just started (less than 2 seconds ago), it means the enemy just attacked
# and we should wait for the animation to complete
if time_since_turn_start < 2.0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Please wait for the enemy's turn to complete"
)
# Get player and NPC data
player = current_user # current_user is already the character dict
npc_def = NPCS.get(combat['npc_id'])
# Get player derived stats
from ..services.stats import calculate_derived_stats
stats = await calculate_derived_stats(player['id'], redis_manager)
messages = []
combat_over = False
# Process status effects (bleeding, etc.) before action
active_effects = await db.tick_player_effects(player['id'])
# Process status effects before action
if active_effects:
from ..game_logic import calculate_status_impact
total_impact = calculate_status_impact(active_effects)
if total_impact > 0:
# DAMAGE
damage = total_impact
new_hp = max(0, player['hp'] - damage)
await db.update_player_hp(player['id'], new_hp)
player['hp'] = new_hp # Update local reference
messages.append(create_combat_message(
"effect_damage",
origin="player",
damage=damage,
effect_name="status effects"
))
if new_hp <= 0:
# Player died from effects
await db.remove_non_persistent_effects(player['id'])
await db.end_combat(player['id'])
return {
"player": player,
"combat": None,
"messages": messages + [create_combat_message("died", origin="player", message="You died from status effects!")],
"active_effects": [],
"round": combat['round']
}
elif total_impact < 0:
# HEALING
heal = abs(total_impact)
new_hp = min(stats['max_hp'], player['hp'] + heal)
actual_heal = new_hp - player['hp']
if actual_heal > 0:
await db.update_player_hp(player['id'], new_hp)
player['hp'] = new_hp
messages.append(create_combat_message(
"effect_heal",
origin="player",
heal=actual_heal,
effect_name="status effects"
))
if req.action == 'attack':
# Calculate player damage using derived stats
base_damage = stats.get('attack_power', 5)
weapon_effects = {}
weapon_inv_id = None
# Check for equipped weapon to apply durability loss and effects
# (Attack power from the weapon is already included in stats['attack_power'])
equipment = await db.get_all_equipment(player['id'])
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def:
weapon_effects = weapon_def.weapon_effects if hasattr(weapon_def, 'weapon_effects') else {}
weapon_inv_id = weapon_slot['item_id']
# Check encumbrance penalty (higher encumbrance = chance to miss)
encumbrance = player.get('encumbrance', 0)
attack_failed = False
if encumbrance > 0:
miss_chance = min(0.3, encumbrance * 0.05) # Max 30% miss chance
if random.random() < miss_chance:
attack_failed = True
variance = random.randint(-2, 2)
damage = max(1, base_damage + variance)
if attack_failed:
messages.append(create_combat_message(
"player_miss",
origin="player",
reason="encumbrance"
))
new_npc_hp = combat['npc_hp']
else:
# Check for critical hit
is_critical = False
crit_chance = stats.get('crit_chance', 0.05)
if random.random() < crit_chance:
is_critical = True
damage = int(damage * stats.get('crit_damage', 1.5))
# Apply NPC defense reduction
npc_defense = getattr(npc_def, 'defense', 0)
actual_damage = max(1, damage - npc_defense)
# Apply damage to NPC
new_npc_hp = max(0, combat['npc_hp'] - actual_damage)
if is_critical:
messages.append(create_combat_message(
"combat_crit",
origin="player"
))
messages.append(create_combat_message(
"player_attack",
origin="player",
damage=actual_damage,
armor_absorbed=npc_defense if npc_defense > 0 else 0
))
# Apply weapon effects
if weapon_effects and 'bleeding' in weapon_effects:
bleeding = weapon_effects['bleeding']
if random.random() < bleeding.get('chance', 0):
# Apply bleeding effect (would need combat effects table, for now just bonus damage)
bleed_damage = bleeding.get('damage', 0)
new_npc_hp = max(0, new_npc_hp - bleed_damage)
messages.append(create_combat_message(
"effect_bleeding",
origin="player",
damage=bleed_damage
))
# Decrease weapon durability (from unique_item)
if weapon_inv_id and inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
if new_durability is None:
# Weapon broke (unique_item was deleted, cascades to inventory)
messages.append(create_combat_message(
"weapon_broke",
origin="player",
item_name=weapon_def.name
))
await db.unequip_item(player['id'], 'weapon')
if new_npc_hp <= 0:
# NPC defeated
messages.append(create_combat_message(
"victory",
origin="neutral",
npc_name=npc_def.name
))
combat_over = True
player_won = True
# Award XP
xp_gained = npc_def.xp_reward
new_xp = player['xp'] + xp_gained
messages.append(create_combat_message(
"xp_gain",
origin="player",
amount=xp_gained
))
await db.update_player(player['id'], xp=new_xp)
# Track kill statistics
await db.update_player_statistics(player['id'], enemies_killed=1, damage_dealt=damage, increment=True)
# Check for level up
level_up_result = await game_logic.check_and_apply_level_up(player['id'])
if level_up_result['leveled_up']:
messages.append(create_combat_message(
"level_up",
origin="player",
level=level_up_result['new_level'],
stat_points=level_up_result['levels_gained']
))
# Create corpse with loot
import json
corpse_loot = npc_def.corpse_loot if hasattr(npc_def, 'corpse_loot') else []
# Convert CorpseLoot objects to dicts
corpse_loot_dicts = []
for loot in corpse_loot:
if hasattr(loot, '__dict__'):
corpse_loot_dicts.append({
'item_id': loot.item_id,
'quantity_min': loot.quantity_min,
'quantity_max': loot.quantity_max,
'required_tool': loot.required_tool
})
else:
corpse_loot_dicts.append(loot)
await db.create_npc_corpse(
npc_id=combat['npc_id'],
location_id=player['location_id'],
loot_remaining=json.dumps(corpse_loot_dicts)
)
# --- UPDATE QUEST PROGRESS ---
# --- UPDATE QUEST PROGRESS ---
try:
# Use global QUESTS_DATA injected dependency
if QUESTS_DATA:
active_quests = await db.get_character_quests(player['id'])
quest_updated = False
for q_record in active_quests:
if q_record['status'] != 'active':
continue
q_def = QUESTS_DATA.get(q_record['quest_id'])
if not q_def: continue
objectives = q_def.get('objectives', [])
current_progress = q_record.get('progress') or {}
new_progress = current_progress.copy()
progres_changed = False
for obj in objectives:
if obj['type'] == 'kill_count' and obj['target'] == combat['npc_id']:
current_count = current_progress.get(obj['target'], 0)
if current_count < obj['count']:
new_progress[obj['target']] = current_count + 1
progres_changed = True
if progres_changed:
# Check completion
all_done = True
progress_str = ""
for obj in objectives:
target = obj['target']
req_count = obj['count']
curr = new_progress.get(target, 0)
# Simple check (ignoring items for kill quests for now)
if obj['type'] == 'kill_count':
if curr < req_count:
all_done = False
# Capture progress string for the notification (if this was the target updated)
if target == combat['npc_id']:
progress_str = f" ({curr}/{req_count})"
elif obj['type'] == 'item_delivery':
pass
await db.update_quest_progress(player['id'], q_record['quest_id'], new_progress, 'active')
# Notify user
messages.append(create_combat_message(
"quest_update",
origin="system",
message=f"{get_locale_string(q_def['title'], locale)}{progress_str}"
))
quest_updated = True
# Add to quest updates list to return to client
# Filter/Enrich for frontend
updated_q_data = dict(q_record)
updated_q_data['start_at'] = q_record['started_at']
updated_q_data.update(q_def)
if 'quest_updates' not in locals(): quest_updates = []
quest_updates.append(updated_q_data)
except Exception as e:
logger.error(f"Failed to update quest progress: {e}")
# -----------------------------
# -----------------------------
await db.remove_non_persistent_effects(player['id'])
await db.end_combat(player['id'])
# Update Redis: Delete combat state cache
if redis_manager:
await redis_manager.delete_combat_state(player['id'])
# Update player session
await redis_manager.update_player_session_field(player['id'], 'xp', new_xp)
if level_up_result['leveled_up']:
await redis_manager.update_player_session_field(player['id'], 'level', level_up_result['new_level'])
# Broadcast to location that combat ended and corpse appeared
await manager.send_to_location(
location_id=player['location_id'],
message={
"type": "location_update",
"data": {
"message": get_game_message('player_defeated_enemy_broadcast', locale, player_name=player['name'], npc_name=get_locale_string(npc_def.name, locale)),
"action": "combat_ended",
"player_id": player['id'],
"corpse_created": True
},
"timestamp": datetime.utcnow().isoformat()
},
exclude_player_id=player['id']
)
else:
# NPC's turn - use shared logic
npc_attack_messages, player_defeated = await game_logic.npc_attack(
player['id'],
{'npc_hp': new_npc_hp, 'npc_max_hp': combat['npc_max_hp']},
npc_def,
reduce_armor_durability
)
messages.extend(npc_attack_messages)
if player_defeated:
combat_over = True
else:
# Update NPC HP (combat turn already updated by npc_attack)
await db.update_combat(player['id'], {
'npc_hp': new_npc_hp
})
elif req.action == 'skill':
if not req.item_id:
raise HTTPException(status_code=400, detail="skill_id (passed as item_id) required")
from ..services.skills import skills_manager
skill_id = req.item_id
skill = skills_manager.get_skill(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# Check unlocked
stat_val = current_player.get(skill.stat_requirement, 0)
if stat_val < skill.stat_threshold or current_player['level'] < skill.level_requirement:
raise HTTPException(status_code=400, detail="Skill not unlocked")
# Check cooldown
active_effects = await db.get_player_effects(current_player['id'])
cd_source = f"cd:{skill.id}"
for eff in active_effects:
if eff.get('source') == cd_source and eff.get('ticks_remaining', 0) > 0:
raise HTTPException(status_code=400, detail=f"Skill on cooldown ({eff['ticks_remaining']} turns)")
# Check stamina
if current_player['stamina'] < skill.stamina_cost:
raise HTTPException(status_code=400, detail="Not enough stamina")
# Deduct stamina
new_stamina = current_player['stamina'] - skill.stamina_cost
await db.update_player_stamina(current_player['id'], new_stamina)
current_player['stamina'] = new_stamina
# Add cooldown effect
if skill.cooldown > 0:
await db.add_effect(
player_id=current_player['id'],
effect_name=f"{skill.id}_cooldown",
effect_icon="",
effect_type="cooldown",
value=0,
ticks_remaining=skill.cooldown,
persist_after_combat=False,
source=cd_source
)
# Get weapon info
equipment = await db.get_all_equipment(current_player['id'])
weapon_damage = 0
weapon_inv_id = None
inv_item = None
weapon_def = None
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def and weapon_def.stats:
weapon_damage = random.randint(
weapon_def.stats.get('damage_min', 0),
weapon_def.stats.get('damage_max', 0)
)
weapon_inv_id = inv_item['id']
effects = skill.effects
new_opponent_hp = opponent['hp']
damage_done = 0
actual_damage = 0
armor_absorbed = 0
# Damage skills
if 'damage_multiplier' in effects:
base_damage = 5
strength_bonus = int(current_player['strength'] * 1.5)
level_bonus = current_player['level']
variance = random.randint(-2, 2)
raw_damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
multiplier = effects['damage_multiplier']
if 'execute_threshold' in effects:
opponent_hp_pct = opponent['hp'] / opponent['max_hp'] if opponent['max_hp'] > 0 else 1
if opponent_hp_pct <= effects['execute_threshold']:
multiplier = effects.get('execute_multiplier', multiplier)
damage = max(1, int(raw_damage * multiplier))
if effects.get('guaranteed_crit'):
damage = int(damage * 1.5)
num_hits = effects.get('hits', 1)
for hit in range(num_hits):
hit_dmg = damage if hit == 0 else max(1, int(raw_damage * multiplier))
absorbed, broken_armor = await reduce_armor_durability(opponent['id'], hit_dmg)
armor_absorbed += absorbed
for broken in broken_armor:
messages.append(create_combat_message(
"item_broken",
origin="enemy",
item_name=broken['name'],
emoji=broken['emoji']
))
last_action_text += f"\n💔 {opponent['name']}'s {broken['emoji']} {broken['name']} broke!"
actual_hit = max(1, hit_dmg - absorbed)
damage_done += actual_hit
new_opponent_hp = max(0, new_opponent_hp - actual_hit)
actual_damage = damage_done
messages.append(create_combat_message(
"skill_attack",
origin="player",
damage=damage_done,
skill_name=skill.name,
skill_icon=skill.icon,
hits=num_hits
))
last_action_text = f"{current_player['name']} used {skill.name} on {opponent['name']} for {damage_done} damage! (Armor absorbed {armor_absorbed})"
# Lifesteal
if 'lifesteal' in effects:
heal_amount = int(damage_done * effects['lifesteal'])
new_hp = min(current_player['max_hp'], current_player['hp'] + heal_amount)
if new_hp > current_player['hp']:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=heal_amount, skill_icon="🩸"
))
# Poison DoT
if 'poison_damage' in effects:
await db.add_effect(
player_id=opponent['id'],
effect_name="Poison",
effect_icon="🧪",
effect_type="damage",
damage_per_tick=effects['poison_damage'],
ticks_remaining=effects['poison_duration'],
persist_after_combat=True,
source=f"skill_poison:{skill.id}"
)
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"🧪 Poisoned! ({effects['poison_damage']} dmg/turn)"
))
# Stun chance
if 'stun_chance' in effects and random.random() < effects['stun_chance']:
# Stun in PvP can be modeled as taking away a turn
await db.add_effect(
player_id=opponent['id'],
effect_name="Stunned",
effect_icon="💫",
effect_type="debuff",
ticks_remaining=1,
persist_after_combat=False,
source="skill_stun"
)
messages.append(create_combat_message(
"skill_effect", origin="player", message="💫 Stunned! (Currently skip effect)"
))
# Weapon durability
if weapon_inv_id and inv_item and inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
if new_durability is None:
messages.append(create_combat_message("weapon_broke", origin="player", item_name=weapon_def.name if weapon_def else "weapon"))
await db.unequip_item(current_player['id'], 'weapon')
# Heal skills
if 'heal_percent' in effects:
heal_amount = int(current_player['max_hp'] * effects['heal_percent'])
new_hp = min(current_player['max_hp'], current_player['hp'] + heal_amount)
actual_heal = new_hp - current_player['hp']
if actual_heal > 0:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=actual_heal, skill_icon=skill.icon
))
last_action_text = f"{current_player['name']} used {skill.name} and healed for {actual_heal} HP!"
# Fortify
if 'armor_boost' in effects:
await db.add_effect(
player_id=current_player['id'],
effect_name="Fortify",
effect_icon="🛡️",
effect_type="buff",
value=effects['armor_boost'],
ticks_remaining=effects['duration'],
persist_after_combat=False,
source=f"skill_fortify:{skill.id}"
)
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"🛡️ Fortified! (+{effects['armor_boost']} Armor)"
))
last_action_text = f"{current_player['name']} used {skill.name} and bolstered their defenses!"
# Process opponent HP if damage done
if damage_done > 0:
await db.update_player(opponent['id'], hp=new_opponent_hp)
if new_opponent_hp <= 0:
last_action_text += f"\n🏆 {current_player['name']} has defeated {opponent['name']}!"
messages.append(create_combat_message("victory", origin="neutral", npc_name=opponent['name']))
combat_over = True
winner_id = current_player['id']
await db.update_player(opponent['id'], hp=0, is_dead=True)
# Create corpse
import json
import time as time_module
inventory = await db.get_inventory(opponent['id'])
inventory_items = []
for inv_item in inventory:
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
inventory_items.append({
'item_id': inv_item['item_id'],
'name': item_def.name if item_def else inv_item['item_id'],
'emoji': item_def.emoji if (item_def and hasattr(item_def, 'emoji')) else '📦',
'quantity': inv_item['quantity'],
'durability': inv_item.get('durability'),
'max_durability': inv_item.get('max_durability'),
'tier': inv_item.get('tier')
})
corpse_data = None
if inventory_items:
corpse_id = await db.create_player_corpse(
player_name=opponent['name'],
location_id=opponent['location_id'],
items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory])
)
await db.clear_inventory(opponent['id'])
corpse_data = {
"id": f"player_{corpse_id}",
"type": "player",
"name": f"{opponent['name']}'s Corpse",
"emoji": "⚰️",
"player_name": opponent['name'],
"loot_count": len(inventory_items),
"items": inventory_items,
"timestamp": time_module.time()
}
# Update statistics
await db.update_player_statistics(opponent['id'], pvp_deaths=1, pvp_combats_lost=1, pvp_damage_taken=actual_damage, pvp_attacks_received=1, increment=True)
await db.update_player_statistics(current_player['id'], players_killed=1, pvp_combats_won=1, pvp_damage_dealt=actual_damage, pvp_attacks_landed=1, increment=True)
# Broadcast corpse
broadcast_data = {
"message": get_game_message('pvp_defeat_broadcast', locale, opponent=opponent['name'], winner=current_player['name']),
"action": "player_died",
"player_id": opponent['id']
}
if corpse_data:
broadcast_data["corpse"] = corpse_data
await manager.send_to_location(
location_id=opponent['location_id'],
message={
"type": "location_update",
"data": broadcast_data,
"timestamp": datetime.utcnow().isoformat()
}
)
await db.end_pvp_combat(pvp_combat['id'])
else:
await db.update_player_statistics(current_player['id'], pvp_damage_dealt=actual_damage, pvp_attacks_landed=1, increment=True)
await db.update_player_statistics(opponent['id'], pvp_damage_taken=actual_damage, pvp_attacks_received=1, increment=True)
# End of turn swap
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
}
await db.update_pvp_combat(pvp_combat['id'], updates)
else:
# Skill didn't do damage, but turn still ends
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
}
await db.update_pvp_combat(pvp_combat['id'], updates)
elif req.action == 'use_item':
if not req.item_id:
raise HTTPException(status_code=400, detail="item_id required for use_item action")
player_inventory = await db.get_inventory(current_player['id'])
inv_item = next((item for item in player_inventory if item['item_id'] == req.item_id), None)
if not inv_item:
raise HTTPException(status_code=400, detail="Item not found in inventory")
item_def = ITEMS_MANAGER.get_item(req.item_id)
if not item_def or not item_def.combat_usable:
raise HTTPException(status_code=400, detail="This item cannot be used in combat")
item_name = get_locale_string(item_def.name, locale)
effects_applied = []
if item_def.effects.get('status_effect'):
status_data = item_def.effects['status_effect']
await db.add_effect(
player_id=current_player['id'],
effect_name=status_data['name'],
effect_icon=status_data.get('icon', ''),
effect_type=status_data.get('type', 'buff'),
damage_per_tick=status_data.get('damage_per_tick', 0),
value=status_data.get('value', 0),
ticks_remaining=status_data.get('ticks', 3),
persist_after_combat=True,
source=f"item:{item_def.id}"
)
effects_applied.append(f"Applied {status_data['name']}")
if item_def.effects.get('cures'):
for cure_effect in item_def.effects['cures']:
if await db.remove_effect(current_player['id'], cure_effect):
effects_applied.append(f"Cured {cure_effect}")
if item_def.effects.get('hp_restore') and item_def.effects['hp_restore'] > 0:
item_effectiveness = current_player_stats.get('item_effectiveness', 1.0)
restore_amount = int(item_def.effects['hp_restore'] * item_effectiveness)
new_hp = min(current_player['max_hp'], current_player['hp'] + restore_amount)
actual_heal = new_hp - current_player['hp']
if actual_heal > 0:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
effects_applied.append(get_game_message('healed_amount_text', locale, amount=actual_heal))
messages.append(create_combat_message("item_heal", origin="player", heal=actual_heal))
if item_def.effects.get('stamina_restore') and item_def.effects['stamina_restore'] > 0:
item_effectiveness = current_player_stats.get('item_effectiveness', 1.0)
restore_amount = int(item_def.effects['stamina_restore'] * item_effectiveness)
new_stamina = min(current_player['max_stamina'], current_player['stamina'] + restore_amount)
actual_restore = new_stamina - current_player['stamina']
if actual_restore > 0:
await db.update_player_stamina(current_player['id'], new_stamina)
effects_applied.append(f"Restored {actual_restore} stamina")
messages.append(create_combat_message("item_restore", origin="player", stat="stamina", amount=actual_restore))
if inv_item['quantity'] > 1:
await db.update_inventory_quantity(inv_item['id'], inv_item['quantity'] - 1)
else:
await db.remove_from_inventory(inv_item['id'])
messages.append(create_combat_message(
"use_item", origin="player", item_name=item_name,
message=get_game_message('used_item_text', locale, name=item_name) + " (" + ", ".join(effects_applied) + ")"
))
last_action_text = f"{current_player['name']} used {item_name}!"
# End of turn swap
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
}
await db.update_pvp_combat(pvp_combat['id'], updates)
elif req.action == 'flee':
# 50% chance to flee
if random.random() < 0.5:
messages.append(create_combat_message(
"flee_success",
origin="player",
message=get_game_message('flee_success_text', locale, name=player['name'])
))
combat_over = True
player_won = False # Fled, not won
# Track successful flee
await db.update_player_statistics(player['id'], successful_flees=1, increment=True)
# Respawn the enemy back to the location if it came from wandering
if combat.get('from_wandering_enemy'):
# Respawn enemy with current HP at the combat location
import time
despawn_time = time.time() + 300 # 5 minutes
async with db.DatabaseSession() as session:
from sqlalchemy import insert
stmt = insert(db.wandering_enemies).values(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
spawn_timestamp=time.time(),
despawn_timestamp=despawn_time
)
await session.execute(stmt)
await session.commit()
await db.remove_non_persistent_effects(player['id'])
await db.end_combat(player['id'])
# Broadcast to location that player fled from combat
await manager.send_to_location(
location_id=combat['location_id'],
message={
"type": "location_update",
"data": {
"message": get_game_message('player_fled_broadcast', locale, player_name=player['name']),
"action": "combat_fled",
"player_id": player['id']
},
"timestamp": datetime.utcnow().isoformat()
},
exclude_player_id=player['id']
)
else:
# Failed to flee, NPC attacks
npc_damage = random.randint(npc_def.damage_min, npc_def.damage_max)
new_player_hp = max(0, player['hp'] - npc_damage)
messages.append(create_combat_message(
"flee_fail",
origin="enemy",
npc_name=npc_def.name,
damage=npc_damage,
message=get_game_message('flee_fail_text', locale, name=player['name'])
))
if new_player_hp <= 0:
messages.append(create_combat_message(
"player_defeated",
origin="neutral",
npc_name=npc_def.name
))
combat_over = True
await db.update_player(player['id'], hp=0, is_dead=True)
await db.update_player_statistics(player['id'], deaths=1, failed_flees=1, damage_taken=npc_damage, increment=True)
# Create corpse with player's inventory
import json
import time as time_module
inventory = await db.get_inventory(player['id'])
inventory_items = []
for inv_item in inventory:
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
inventory_items.append({
'item_id': inv_item['item_id'],
'name': item_def.name if item_def else inv_item['item_id'],
'emoji': item_def.emoji if (item_def and hasattr(item_def, 'emoji')) else '📦',
'quantity': inv_item['quantity'],
'durability': inv_item.get('durability'),
'max_durability': inv_item.get('max_durability'),
'tier': inv_item.get('tier')
})
# Only create corpse if player has items
corpse_data = None
if inventory_items:
logger.info(f"Creating player corpse (failed flee) for {player['name']} at {combat['location_id']} with {len(inventory_items)} items")
corpse_id = await db.create_player_corpse(
player_name=player['name'],
location_id=combat['location_id'],
items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory])
)
logger.info(f"Successfully created player corpse (failed flee): ID={corpse_id}, player={player['name']}, location={combat['location_id']}, items_count={len(inventory_items)}")
# Clear player's inventory (items are now in corpse)
await db.clear_inventory(player['id'])
# Build corpse data for broadcast
corpse_data = {
"id": f"player_{corpse_id}",
"type": "player",
"name": f"{player['name']}'s Corpse",
"emoji": "⚰️",
"player_name": player['name'],
"loot_count": len(inventory_items),
"items": inventory_items,
"timestamp": time_module.time()
}
else:
logger.info(f"Player {player['name']} died (failed flee) with no items, skipping corpse creation")
# Respawn enemy if from wandering
if combat.get('from_wandering_enemy'):
import time
despawn_time = time.time() + 300
async with db.DatabaseSession() as session:
from sqlalchemy import insert
stmt = insert(db.wandering_enemies).values(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
spawn_timestamp=time.time(),
despawn_timestamp=despawn_time
)
await session.execute(stmt)
await session.commit()
await db.remove_non_persistent_effects(player['id'])
await db.end_combat(player['id'])
# Broadcast to location that player died (and corpse if created)
logger.info(f"Broadcasting player_died (failed flee) to location {combat['location_id']} for player {player['name']}")
broadcast_data = {
"message": get_game_message('player_defeated_broadcast', locale, player_name=player['name']),
"action": "player_died",
"player_id": player['id']
}
if corpse_data:
broadcast_data["corpse"] = corpse_data
await manager.send_to_location(
location_id=combat['location_id'],
message={
"type": "location_update",
"data": broadcast_data,
"timestamp": datetime.utcnow().isoformat()
},
exclude_player_id=player['id']
)
else:
# Player survived, update HP and turn back to player
await db.update_player(player['id'], hp=new_player_hp)
await db.update_player_statistics(player['id'], failed_flees=1, damage_taken=npc_damage, increment=True)
await db.update_combat(player['id'], {'turn': 'player', 'turn_started_at': time.time()})
elif req.action == 'skill':
# ── SKILL ACTION ──
if not req.skill_id:
raise HTTPException(status_code=400, detail="skill_id required for skill action")
from ..services.skills import skills_manager
skill = skills_manager.get_skill(req.skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# Check unlocked
stat_val = player.get(skill.stat_requirement, 0)
if stat_val < skill.stat_threshold or player['level'] < skill.level_requirement:
raise HTTPException(status_code=400, detail="Skill not unlocked")
# Check cooldown
active_effects = await db.get_player_effects(player['id'])
cd_source = f"cd:{skill.id}"
for eff in active_effects:
if eff.get('source') == cd_source and eff.get('ticks_remaining', 0) > 0:
raise HTTPException(status_code=400, detail=f"Skill on cooldown ({eff['ticks_remaining']} turns)")
# Check stamina
if player['stamina'] < skill.stamina_cost:
raise HTTPException(status_code=400, detail="Not enough stamina")
# Deduct stamina
new_stamina = player['stamina'] - skill.stamina_cost
await db.update_player_stamina(player['id'], new_stamina)
player['stamina'] = new_stamina
# Add cooldown effect
if skill.cooldown > 0:
await db.add_effect(
player_id=player['id'],
effect_name=f"{skill.id}_cooldown",
effect_icon="",
effect_type="cooldown",
value=0,
ticks_remaining=skill.cooldown,
persist_after_combat=False,
source=cd_source
)
# Get weapon info
equipment = await db.get_all_equipment(player['id'])
weapon_damage = 0
inv_item = None
weapon_inv_id = None
weapon_def = None
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def and weapon_def.stats:
weapon_damage = random.randint(
weapon_def.stats.get('damage_min', 0),
weapon_def.stats.get('damage_max', 0)
)
weapon_inv_id = inv_item['id']
effects = skill.effects
new_npc_hp = combat['npc_hp']
combat_over = False
player_won = False
# ── Damage skills ──
if 'damage_multiplier' in effects:
base_damage = 5
strength_bonus = int(player['strength'] * 1.5)
level_bonus = player['level']
variance = random.randint(-2, 2)
raw_damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
multiplier = effects['damage_multiplier']
# Execute check
if 'execute_threshold' in effects:
npc_hp_pct = combat['npc_hp'] / combat['npc_max_hp'] if combat['npc_max_hp'] > 0 else 1
if npc_hp_pct <= effects['execute_threshold']:
multiplier = effects.get('execute_multiplier', multiplier)
# Exploit Weakness check
if effects.get('requires_analyzed'):
# Check if NPC has been analyzed this combat
analyzed = combat.get('npc_status_effects', '') or ''
if 'analyzed' not in analyzed:
multiplier = 1.0 # No bonus if not analyzed
damage = max(1, int(raw_damage * multiplier))
# Guaranteed crit
if effects.get('guaranteed_crit'):
damage = int(damage * 1.5)
# Multi-hit
num_hits = effects.get('hits', 1)
total_damage = 0
for hit in range(num_hits):
hit_dmg = damage if hit == 0 else max(1, int(raw_damage * multiplier))
# Armor penetration
npc_defense = getattr(npc_def, 'defense', 0)
if 'armor_penetration' in effects:
npc_defense = int(npc_defense * (1 - effects['armor_penetration']))
actual_hit = max(1, hit_dmg - npc_defense)
total_damage += actual_hit
new_npc_hp = max(0, new_npc_hp - actual_hit)
messages.append(create_combat_message(
"skill_attack",
origin="player",
damage=total_damage,
skill_name=skill.name,
skill_icon=skill.icon,
hits=num_hits
))
# Lifesteal
if 'lifesteal' in effects:
heal_amount = int(total_damage * effects['lifesteal'])
new_hp = min(player['max_hp'], player['hp'] + heal_amount)
if new_hp > player['hp']:
await db.update_player_hp(player['id'], new_hp)
player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=heal_amount, skill_icon="🩸"
))
# Poison DoT
if 'poison_damage' in effects:
poison_str = f"poison:{effects['poison_damage']}:{effects['poison_duration']}"
existing = combat.get('npc_status_effects', '') or ''
if existing:
existing += '|' + poison_str
else:
existing = poison_str
await db.update_combat(player['id'], {'npc_status_effects': existing})
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"🧪 Poisoned! ({effects['poison_damage']} dmg/turn)"
))
# Stun chance
if 'stun_chance' in effects and random.random() < effects['stun_chance']:
messages.append(create_combat_message(
"skill_effect", origin="player", message="💫 Stunned!"
))
# Weapon durability
if weapon_inv_id and inv_item and inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
if new_durability is None:
messages.append(create_combat_message("weapon_broke", origin="player", item_name=weapon_def.name if weapon_def else "weapon"))
await db.unequip_item(player['id'], 'weapon')
# ── Heal skills ──
if 'heal_percent' in effects:
heal_amount = int(player['max_hp'] * effects['heal_percent'])
new_hp = min(player['max_hp'], player['hp'] + heal_amount)
actual = new_hp - player['hp']
if actual > 0:
await db.update_player_hp(player['id'], new_hp)
player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=actual, skill_name=skill.name, skill_icon=skill.icon
))
# ── Stamina restore skills ──
if 'stamina_restore_percent' in effects:
restore = int(player['max_stamina'] * effects['stamina_restore_percent'])
new_stam = min(player['max_stamina'], player['stamina'] + restore)
actual = new_stam - player['stamina']
if actual > 0:
await db.update_player_stamina(player['id'], new_stam)
player['stamina'] = new_stam
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"⚡ +{actual} Stamina"
))
# ── Buff skills ──
if 'buff' in effects:
buff_name_raw = effects['buff']
duration = effects.get('buff_duration', 2)
value = 0
if 'damage_reduction' in effects:
value = int(effects['damage_reduction'] * 100)
elif 'damage_bonus' in effects:
value = int(effects['damage_bonus'] * 100)
await db.add_effect(
player_id=player['id'],
effect_name=buff_name_raw,
effect_icon=skill.icon,
effect_type='buff',
value=value,
ticks_remaining=duration,
persist_after_combat=False,
source=f'skill:{skill.id}'
)
messages.append(create_combat_message(
"skill_buff", origin="player",
skill_name=skill.name, skill_icon=skill.icon, duration=duration
))
# ── Analyze skill ──
if effects.get('mark_analyzed'):
existing = combat.get('npc_status_effects', '') or ''
if 'analyzed' not in existing:
if existing:
existing += '|analyzed:0:99'
else:
existing = 'analyzed:0:99'
await db.update_combat(player['id'], {'npc_status_effects': existing})
npc_hp_pct = int((combat['npc_hp'] / combat['npc_max_hp']) * 100) if combat['npc_max_hp'] > 0 else 0
intent = combat.get('npc_intent', 'attack')
messages.append(create_combat_message(
"skill_analyze", origin="player",
skill_icon=skill.icon,
npc_name=npc_def.name,
npc_hp_pct=npc_hp_pct,
npc_intent=intent
))
# Check NPC death
if new_npc_hp <= 0:
messages.append(create_combat_message("victory", origin="neutral", npc_name=npc_def.name))
combat_over = True
player_won = True
# Award XP
xp_reward = npc_def.xp_reward
current_xp = player['xp'] + xp_reward
await db.update_player(player['id'], xp=current_xp)
player['xp'] = current_xp
messages.append(create_combat_message("xp_gained", origin="neutral", xp=xp_reward))
await db.update_player_statistics(player['id'], enemies_killed=1, increment=True)
# Level up check
level_result = await game_logic.check_and_apply_level_up(player['id'])
if level_result['leveled_up']:
messages.append(create_combat_message("level_up", origin="neutral", new_level=level_result['new_level']))
# Loot
loot_items = npc_def.loot if hasattr(npc_def, 'loot') else []
generated_loot = []
if loot_items:
for loot in loot_items:
if random.random() < loot.get('chance', 1.0):
qty = random.randint(loot.get('min', 1), loot.get('max', 1))
# Only append message in combat log, actual items are in corpse
messages.append(create_combat_message("loot", origin="neutral", item_id=loot['item_id'], quantity=qty))
generated_loot.append({"item_id": loot['item_id'], "quantity": qty})
# Create corpse
import json
# Convert CorpseLoot objects to dicts
corpse_loot = npc_def.corpse_loot if hasattr(npc_def, 'corpse_loot') else []
corpse_loot_dicts = []
for loot in corpse_loot:
if hasattr(loot, '__dict__'):
corpse_loot_dicts.append({
'item_id': loot.item_id,
'quantity_min': loot.quantity_min,
'quantity_max': loot.quantity_max,
'required_tool': loot.required_tool
})
else:
corpse_loot_dicts.append(loot)
await db.create_npc_corpse(
combat['npc_id'],
combat.get('location_id', player.get('location_id', '')),
json.dumps(corpse_loot_dicts)
)
# Update quests
try:
if QUESTS_DATA:
active_quests = await db.get_character_quests(player['id'])
for q_record in active_quests:
if q_record['status'] != 'active':
continue
q_def = QUESTS_DATA.get(q_record['quest_id'])
if not q_def: continue
objectives = q_def.get('objectives', [])
current_progress = q_record.get('progress') or {}
new_progress = current_progress.copy()
progres_changed = False
for obj in objectives:
if obj['type'] == 'kill_count' and obj['target'] == combat['npc_id']:
current_count = current_progress.get(obj['target'], 0)
if current_count < obj['count']:
new_progress[obj['target']] = current_count + 1
progres_changed = True
if progres_changed:
progress_str = ""
for obj in objectives:
target = obj['target']
req_count = obj['count']
curr = new_progress.get(target, 0)
if obj['type'] == 'kill_count':
if target == combat['npc_id']:
progress_str = f" ({curr}/{req_count})"
await db.update_quest_progress(player['id'], q_record['quest_id'], new_progress, 'active')
messages.append(create_combat_message(
"quest_update",
origin="system",
message=f"{get_locale_string(q_def['title'], locale)}{progress_str}"
))
except Exception as e:
logger.error(f"Failed to update quest progress in skill execution: {e}")
await db.remove_non_persistent_effects(player['id'])
await db.end_combat(player['id'])
# Update Redis cache
if redis_manager:
await redis_manager.delete_combat_state(player['id'])
await redis_manager.update_player_session_field(player['id'], 'xp', current_xp)
else:
# NPC turn for skill usage
from ..services.stats import calculate_derived_stats
stats = await calculate_derived_stats(player['id'], redis_manager)
npc_attack_messages, player_defeated = await game_logic.npc_attack(
player['id'],
{'npc_hp': new_npc_hp, 'npc_max_hp': combat['npc_max_hp'],
'npc_intent': combat.get('npc_intent', 'attack'),
'npc_status_effects': combat.get('npc_status_effects', '')},
npc_def,
reduce_armor_durability,
player_stats=stats
)
messages.extend(npc_attack_messages)
if player_defeated:
await db.remove_non_persistent_effects(player['id'])
combat_over = True
elif req.action == 'use_item':
combat_over = False
# Validate item_id provided
if not req.item_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="item_id required for use_item action"
)
# Get the item from inventory
player_inventory = await db.get_inventory(player['id'])
inv_item = None
for item in player_inventory:
if item['item_id'] == req.item_id:
inv_item = item
break
if not inv_item:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Item not found in inventory"
)
# Get item definition
item_def = ITEMS_MANAGER.get_item(req.item_id)
if not item_def:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unknown item"
)
# Check if item is combat usable
if not item_def.combat_usable:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This item cannot be used in combat"
)
# Apply item effects
item_name = get_locale_string(item_def.name, locale)
effects_applied = []
# 1. Apply Status Effects (e.g. Regeneration from Bandage)
if item_def.effects.get('status_effect'):
status_data = item_def.effects['status_effect']
status_name = status_data['name']
await db.add_effect(
player_id=player['id'],
effect_name=status_name,
effect_icon=status_data.get('icon', ''),
effect_type=status_data.get('type', 'buff'),
damage_per_tick=status_data.get('damage_per_tick', 0),
value=status_data.get('value', 0),
ticks_remaining=status_data.get('ticks', 3),
persist_after_combat=True, # Consumable effects usually persist
source=f"item:{item_def.id}"
)
effects_applied.append(f"Applied {status_data['name']}")
# 2. Cure Status Effects
if item_def.effects.get('cures'):
cures = item_def.effects['cures']
for cure_effect in cures:
if await db.remove_effect(player['id'], cure_effect):
effects_applied.append(f"Cured {cure_effect}")
# 3. Handle Direct healing (legacy/instant)
if item_def.effects.get('hp_restore') and item_def.effects['hp_restore'] > 0:
item_effectiveness = stats.get('item_effectiveness', 1.0)
base_hp_restore = item_def.effects['hp_restore']
hp_restore = int(base_hp_restore * item_effectiveness)
old_hp = player['hp']
new_hp = min(player.get('max_hp', 100), old_hp + hp_restore)
actual_restored = new_hp - old_hp
if actual_restored > 0:
await db.update_player_hp(player['id'], new_hp)
effects_applied.append(f"+{actual_restored} HP")
if item_def.effects.get('stamina_restore') and item_def.effects['stamina_restore'] > 0:
item_effectiveness = stats.get('item_effectiveness', 1.0)
base_stamina_restore = item_def.effects['stamina_restore']
stamina_restore = int(base_stamina_restore * item_effectiveness)
old_stamina = player['stamina']
new_stamina = min(player.get('max_stamina', 100), old_stamina + stamina_restore)
actual_restored = new_stamina - old_stamina
if actual_restored > 0:
await db.update_player_stamina(player['id'], new_stamina)
effects_applied.append(f"+{actual_restored} Stamina")
# Handle combat effects (throwables)
combat_effects = item_def.combat_effects or {}
# Direct damage from throwable
if combat_effects.get('damage_min') and combat_effects.get('damage_max'):
damage = random.randint(combat_effects['damage_min'], combat_effects['damage_max'])
new_npc_hp = max(0, combat['npc_hp'] - damage)
effects_applied.append(f"{damage} damage")
messages.append(create_combat_message(
"item_damage",
origin="player",
damage=damage,
item_name=item_name
))
# Check if NPC is defeated
if new_npc_hp <= 0:
messages.append(create_combat_message(
"victory",
origin="neutral",
npc_name=npc_def.name
))
combat_over = True
player_won = True
# Award XP
xp_gained = npc_def.xp_reward
new_xp = player['xp'] + xp_gained
messages.append(create_combat_message(
"xp_gain",
origin="player",
amount=xp_gained
))
await db.update_player(player['id'], xp=new_xp)
await db.update_player_statistics(player['id'], enemies_killed=1, damage_dealt=damage, increment=True)
# Check for level up
level_up_result = await game_logic.check_and_apply_level_up(player['id'])
if level_up_result['leveled_up']:
messages.append(create_combat_message(
"level_up",
origin="player",
level=level_up_result['new_level'],
stat_points=level_up_result['levels_gained']
))
# Create corpse with loot
import json as json_module
corpse_loot = npc_def.corpse_loot if hasattr(npc_def, 'corpse_loot') else []
corpse_loot_dicts = []
for loot in corpse_loot:
if hasattr(loot, '__dict__'):
corpse_loot_dicts.append({
'item_id': loot.item_id,
'quantity_min': loot.quantity_min,
'quantity_max': loot.quantity_max,
'required_tool': loot.required_tool
})
else:
corpse_loot_dicts.append(loot)
await db.create_npc_corpse(
npc_id=combat['npc_id'],
location_id=player['location_id'],
loot_remaining=json_module.dumps(corpse_loot_dicts)
)
await db.remove_non_persistent_effects(player['id'])
await db.end_combat(player['id'])
else:
# Update NPC HP
await db.update_combat(player['id'], {'npc_hp': new_npc_hp})
# Apply status effect from item (e.g., burning from molotov)
status_effect = combat_effects.get('status')
if status_effect and not combat_over:
# Apply to NPC via combat status (simplified - NPC status stored in combat record)
npc_status = f"{status_effect['name']}:{status_effect.get('damage_per_tick', 0)}:{status_effect.get('ticks', 1)}"
await db.update_combat(player['id'], {'npc_status_effects': npc_status})
messages.append(create_combat_message(
"effect_applied",
origin="player",
effect_name=status_effect['name'],
effect_icon=status_effect.get('icon', '🔥'),
target="enemy"
))
# Consume the item
await db.remove_item_from_inventory(player['id'], req.item_id, 1)
await db.update_player_statistics(player['id'], items_used=1, increment=True)
# Add item used message
effects_str = f" ({', '.join(effects_applied)})" if effects_applied else ""
# Calculate total restored amounts for frontend floating text
hp_restored_val = 0
stamina_restored_val = 0
if item_def.effects.get('hp_restore'):
hp_restored_val = min(player.get('max_hp', 100), old_hp + item_def.effects['hp_restore']) - old_hp
if item_def.effects.get('stamina_restore'):
stamina_restored_val = min(player.get('max_stamina', 100), old_stamina + item_def.effects['stamina_restore']) - old_stamina
messages.append(create_combat_message(
"item_used",
origin="player",
item_name=item_name,
effects=effects_str,
hp_restore=hp_restored_val if hp_restored_val > 0 else None,
stamina_restore=stamina_restored_val if stamina_restored_val > 0 else None
))
# NPC's turn after using item (if combat not over)
if not combat_over:
npc_attack_messages, player_defeated = await game_logic.npc_attack(
player['id'],
{'npc_hp': combat['npc_hp'], 'npc_max_hp': combat['npc_max_hp']},
npc_def,
reduce_armor_durability
)
messages.extend(npc_attack_messages)
if player_defeated:
await db.remove_non_persistent_effects(player['id'])
combat_over = True
# Get updated combat state if not over
updated_combat = None
if not combat_over:
raw_combat = await db.get_active_combat(current_user['id'])
if raw_combat:
# Calculate time remaining for turn
import time
turn_time_remaining = None
if raw_combat['turn'] == 'player':
turn_started_at = raw_combat.get('turn_started_at', 0)
time_elapsed = time.time() - turn_started_at
turn_time_remaining = max(0, 300 - time_elapsed)
updated_combat = {
"npc_id": raw_combat['npc_id'],
"npc_name": npc_def.name,
"npc_hp": raw_combat['npc_hp'],
"npc_max_hp": raw_combat['npc_max_hp'],
"npc_image": f"{npc_def.image_path}",
"turn": raw_combat['turn'],
"round": raw_combat.get('round', 1),
"turn_time_remaining": turn_time_remaining
}
# Get fresh player data with updated HP after NPC attack
updated_player = await db.get_player_by_id(current_user['id'])
if not updated_player:
updated_player = current_user # Fallback to current_user if something went wrong
# Note: PvE combat_update WebSocket removed - frontend uses client-side timer
# and polling for AFK timeout handling. WebSocket still used for PvP.
return {
"success": True,
"messages": messages,
"combat_over": combat_over,
"player_won": player_won if combat_over else None,
"combat": updated_combat if updated_combat else None,
"player": {
"hp": updated_player['hp'],
"max_hp": updated_player.get('max_hp', updated_player.get('max_health')),
"xp": updated_player['xp'],
"level": updated_player['level']
},
"quest_updates": quest_updates if 'quest_updates' in locals() else []
}
@router.post("/api/game/pvp/initiate")
async def initiate_pvp_combat(
req: PvPCombatInitiateRequest,
request: Request,
current_user: dict = Depends(get_current_user)
):
"""Initiate PvP combat with another player"""
# Get attacker (current user)
attacker = await db.get_player_by_id(current_user['id'])
if not attacker:
raise HTTPException(status_code=404, detail="Player not found")
# Extract locale
locale = request.headers.get('Accept-Language', 'en')
# Check if attacker is already in combat
existing_combat = await db.get_active_combat(attacker['id'])
if existing_combat:
raise HTTPException(status_code=400, detail="You are already in PvE combat")
existing_pvp = await db.get_pvp_combat_by_player(attacker['id'])
if existing_pvp:
raise HTTPException(status_code=400, detail="You are already in PvP combat")
# Get defender (target player)
defender = await db.get_player_by_id(req.target_player_id)
if not defender:
raise HTTPException(status_code=404, detail="Target player not found")
# Check if defender is in combat
defender_pve = await db.get_active_combat(defender['id'])
if defender_pve:
raise HTTPException(status_code=400, detail="Target player is in PvE combat")
defender_pvp = await db.get_pvp_combat_by_player(defender['id'])
if defender_pvp:
raise HTTPException(status_code=400, detail="Target player is in PvP combat")
# Check same location
if attacker['location_id'] != defender['location_id']:
raise HTTPException(status_code=400, detail="Target player is not in your location")
# Check danger level (>= 3 required for PvP)
location = LOCATIONS.get(attacker['location_id'])
if not location or location.danger_level < 3:
raise HTTPException(status_code=400, detail="PvP combat is only allowed in dangerous zones (danger level >= 3)")
level_diff = abs(attacker['level'] - defender['level'])
if level_diff > 3:
raise HTTPException(
status_code=400,
detail=f"Level difference too large! You can only fight players within 3 levels (target is level {defender['level']})"
)
# Create PvP combat
pvp_combat = await db.create_pvp_combat(
attacker_id=attacker['id'],
defender_id=req.target_player_id,
location_id=attacker['location_id'],
turn_timeout=PVP_TURN_TIMEOUT
)
# Track PvP combat initiation
await db.update_player_statistics(attacker['id'], pvp_combats_initiated=1, increment=True)
# Send WebSocket notifications to both players
await manager.send_personal_message(attacker['id'], {
"type": "combat_started",
"data": {
"message": get_game_message('pvp_initiated_attacker', locale, defender=defender['name']),
"pvp_combat": pvp_combat
},
"timestamp": datetime.utcnow().isoformat()
})
await manager.send_personal_message(defender['id'], {
"type": "combat_started",
"data": {
"message": get_game_message('pvp_challenged_defender', locale, attacker=attacker['name']),
"pvp_combat": pvp_combat
},
"timestamp": datetime.utcnow().isoformat()
})
# Broadcast to location that PvP combat started - both players should be removed from view
await manager.send_to_location(
attacker['location_id'],
{
"type": "location_update",
"data": {
"message": get_game_message('pvp_combat_started_broadcast', locale, attacker=attacker['name'], defender=defender['name']),
"action": "pvp_combat_started",
"players_in_combat": [attacker['id'], defender['id']],
"player_left_ids": [attacker['id'], defender['id']] # Remove both from location view
},
"timestamp": datetime.utcnow().isoformat()
},
exclude_player_id=None # Send to everyone including combatants
)
return {
"success": True,
"message": get_game_message('pvp_initiated_attacker', locale, defender=defender['name']),
"pvp_combat": pvp_combat
}
@router.get("/api/game/pvp/status")
async def get_pvp_combat_status(current_user: dict = Depends(get_current_user)):
"""Get current PvP combat status"""
pvp_combat = await db.get_pvp_combat_by_player(current_user['id'])
if not pvp_combat:
return {"in_pvp_combat": False, "pvp_combat": None}
# Check if current player has already acknowledged - if so, don't show combat anymore
is_attacker = pvp_combat['attacker_character_id'] == current_user['id']
if (is_attacker and pvp_combat.get('attacker_acknowledged', False)) or \
(not is_attacker and pvp_combat.get('defender_acknowledged', False)):
return {"in_pvp_combat": False, "pvp_combat": None}
# Get both players' data
attacker = await db.get_player_by_id(pvp_combat['attacker_character_id'])
defender = await db.get_player_by_id(pvp_combat['defender_character_id'])
# Determine if current user is attacker or defender
is_attacker = pvp_combat['attacker_character_id'] == current_user['id']
your_turn = (is_attacker and pvp_combat['turn'] == 'attacker') or \
(not is_attacker and pvp_combat['turn'] == 'defender')
# Calculate time remaining for turn
import time
time_elapsed = time.time() - pvp_combat['turn_started_at']
time_remaining = max(0, pvp_combat['turn_timeout_seconds'] - time_elapsed)
# Auto-advance if time expired
if time_remaining == 0 and your_turn:
# Skip turn
new_turn = 'defender' if is_attacker else 'attacker'
await db.update_pvp_combat(pvp_combat['id'], {
'turn': new_turn,
'turn_started_at': time.time()
})
pvp_combat = await db.get_pvp_combat_by_id(pvp_combat['id'])
your_turn = False
time_remaining = pvp_combat['turn_timeout_seconds']
return {
"in_pvp_combat": True,
"pvp_combat": {
"id": pvp_combat['id'],
"attacker": {
"id": attacker['id'],
"username": attacker['name'],
"level": attacker['level'],
"hp": attacker['hp'], # Use actual player HP
"max_hp": attacker['max_hp'],
"image": "/images/characters/default.webp"
},
"defender": {
"id": defender['id'],
"username": defender['name'],
"level": defender['level'],
"hp": defender['hp'], # Use actual player HP
"max_hp": defender['max_hp'],
"image": "/images/characters/default.webp"
},
"is_attacker": is_attacker,
"your_turn": your_turn,
"current_turn": pvp_combat['turn'],
"time_remaining": int(time_remaining),
"location_id": pvp_combat['location_id'],
"last_action": pvp_combat.get('last_action'),
"combat_over": pvp_combat.get('attacker_fled', False) or pvp_combat.get('defender_fled', False) or \
attacker['hp'] <= 0 or defender['hp'] <= 0,
"attacker_fled": pvp_combat.get('attacker_fled', False),
"defender_fled": pvp_combat.get('defender_fled', False)
}
}
class PvPAcknowledgeRequest(BaseModel):
combat_id: int
@router.post("/api/game/pvp/acknowledge")
async def acknowledge_pvp_combat(
req: PvPAcknowledgeRequest,
request: Request,
current_user: dict = Depends(get_current_user)
):
"""Acknowledge PvP combat end"""
await db.acknowledge_pvp_combat(req.combat_id, current_user['id'])
# Extract locale from Accept-Language header
locale = request.headers.get('Accept-Language', 'en')
# Broadcast to location that player has returned
player = current_user # current_user is already the character dict
if player:
await manager.send_to_location(
location_id=player['location_id'],
message={
"type": "player_arrived",
"data": {
"player_id": player['id'],
"username": player['name'],
"message": get_game_message('player_returned_pvp', locale, player_name=player['name'])
},
"timestamp": datetime.utcnow().isoformat()
},
exclude_player_id=player['id']
)
return {"success": True}
class PvPCombatActionRequest(BaseModel):
action: str # 'attack', 'flee', 'use_item'
item_id: Optional[str] = None # For use_item action
@router.post("/api/game/pvp/action")
async def pvp_combat_action(
req: PvPCombatActionRequest,
request: Request,
current_user: dict = Depends(get_current_user)
):
"""Perform a PvP combat action"""
import random
import time
# Extract locale
locale = request.headers.get('Accept-Language', 'en')
# Get PvP combat
pvp_combat = await db.get_pvp_combat_by_player(current_user['id'])
if not pvp_combat:
raise HTTPException(status_code=400, detail="Not in PvP combat")
# Determine roles
is_attacker = pvp_combat['attacker_character_id'] == current_user['id']
your_turn = (is_attacker and pvp_combat['turn'] == 'attacker') or \
(not is_attacker and pvp_combat['turn'] == 'defender')
if not your_turn:
raise HTTPException(status_code=400, detail="It's not your turn")
# Get both players
attacker = await db.get_player_by_id(pvp_combat['attacker_character_id'])
defender = await db.get_player_by_id(pvp_combat['defender_character_id'])
current_player = attacker if is_attacker else defender
opponent = defender if is_attacker else attacker
# Get derived stats for both players
from ..services.stats import calculate_derived_stats
current_player_stats = await calculate_derived_stats(current_player['id'], redis_manager)
# Opponent stats won't be used for attack calculation but could be used for defense logic
# opponent_stats = await calculate_derived_stats(opponent['id'], redis_manager)
messages = []
combat_over = False
winner_id = None
# Track the last action string for DB history
last_action_text = ""
# Process status effects (bleeding, poison, etc.) before action
active_effects = await db.tick_player_effects(current_player['id'])
if active_effects:
from ..game_logic import calculate_status_impact
total_impact = calculate_status_impact(active_effects)
if total_impact > 0:
damage = total_impact
new_hp = max(0, current_player['hp'] - damage)
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"effect_damage",
origin="player",
damage=damage,
effect_name="status effects"
))
if new_hp <= 0:
messages.append(create_combat_message("died", origin="player", message="You died from status effects!"))
combat_over = True
winner_id = opponent['id']
# Update current player to dead state
await db.update_player(current_player['id'], hp=0, is_dead=True)
# Create corpse
import json
import time as time_module
inventory = await db.get_inventory(current_player['id'])
inventory_items = []
for inv_item in inventory:
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
inventory_items.append({
'item_id': inv_item['item_id'],
'name': item_def.name if item_def else inv_item['item_id'],
'emoji': item_def.emoji if (item_def and hasattr(item_def, 'emoji')) else '📦',
'quantity': inv_item['quantity'],
'durability': inv_item.get('durability'),
'max_durability': inv_item.get('max_durability'),
'tier': inv_item.get('tier')
})
corpse_data = None
if inventory_items:
corpse_id = await db.create_player_corpse(
player_name=current_player['name'],
location_id=current_player['location_id'],
items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory])
)
await db.clear_inventory(current_player['id'])
corpse_data = {
"id": f"player_{corpse_id}",
"type": "player",
"name": f"{current_player['name']}'s Corpse",
"emoji": "⚰️",
"player_name": current_player['name'],
"loot_count": len(inventory_items),
"items": inventory_items,
"timestamp": time_module.time()
}
# Update PvP statistics for both players
await db.update_player_statistics(current_player['id'], pvp_deaths=1, pvp_combats_lost=1, increment=True)
await db.update_player_statistics(opponent['id'], players_killed=1, pvp_combats_won=1, increment=True)
# Broadcast corpse
broadcast_data = {
"message": get_game_message('pvp_defeat_broadcast', locale, opponent=current_player['name'], winner=opponent['name']),
"action": "player_died",
"player_id": current_player['id']
}
if corpse_data:
broadcast_data["corpse"] = corpse_data
await manager.send_to_location(
location_id=current_player['location_id'],
message={
"type": "location_update",
"data": broadcast_data,
"timestamp": datetime.utcnow().isoformat()
}
)
await db.end_pvp_combat(pvp_combat['id'])
elif total_impact < 0:
heal = abs(total_impact)
new_hp = min(current_player_stats.get('max_hp', current_player['max_hp']), current_player['hp'] + heal)
actual_heal = new_hp - current_player['hp']
if actual_heal > 0:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"effect_heal",
origin="player",
heal=actual_heal,
effect_name="status effects"
))
# Stop processing action if player died from status effects
if combat_over:
pass
elif req.action == 'attack':
# Calculate damage (unified formula with derived stats)
base_damage = current_player_stats.get('attack_power', 5)
# Check for equipped weapon to apply durability loss
# (Attack power from the weapon is already included in stats['attack_power'])
equipment = await db.get_all_equipment(current_player['id'])
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def:
# Decrease weapon durability
if inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
if new_durability is None:
messages.append(create_combat_message(
"weapon_broke",
origin="player",
item_name=weapon_def.name
))
await db.unequip_item(current_player['id'], 'weapon')
variance = random.randint(-2, 2)
damage = max(1, base_damage + variance)
# Check for critical hit
is_critical = False
crit_chance = current_player_stats.get('crit_chance', 0.05)
if random.random() < crit_chance:
is_critical = True
damage = int(damage * current_player_stats.get('crit_damage', 1.5))
# Apply armor reduction and durability loss to opponent
armor_absorbed, broken_armor = await reduce_armor_durability(opponent['id'], damage)
actual_damage = max(1, damage - armor_absorbed)
if is_critical:
messages.append(create_combat_message(
"combat_crit",
origin="player"
))
last_action_text += f"\nCritical Hit! "
# Structure the attack message
messages.append(create_combat_message(
"player_attack",
origin="player",
damage=actual_damage,
armor_absorbed=armor_absorbed
))
# Update opponent HP (use actual player HP, not pvp_combat fields)
new_opponent_hp = max(0, opponent['hp'] - actual_damage)
# Update opponent's HP in database
await db.update_player(opponent['id'], hp=new_opponent_hp)
# Construct summary string for DB history/passive player
last_action_text = f"{current_player['name']} attacks {opponent['name']} for {damage} damage!"
if armor_absorbed > 0:
last_action_text += f" (Armor absorbed {armor_absorbed})"
for broken in broken_armor:
messages.append(create_combat_message(
"item_broken",
origin="enemy", # Belongs to opponent
item_name=broken['name'],
emoji=broken['emoji']
))
last_action_text += f"\n💔 {opponent['name']}'s {broken['emoji']} {broken['name']} broke!"
# Check if opponent defeated
if new_opponent_hp <= 0:
last_action_text += f"\n🏆 {current_player['name']} has defeated {opponent['name']}!"
messages.append(create_combat_message(
"victory",
origin="neutral",
npc_name=opponent['name']
))
combat_over = True
winner_id = current_player['id']
# Update opponent to dead state
await db.update_player(opponent['id'], hp=0, is_dead=True)
# Create corpse with opponent's inventory
import json
import time as time_module
inventory = await db.get_inventory(opponent['id'])
inventory_items = []
for inv_item in inventory:
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
inventory_items.append({
'item_id': inv_item['item_id'],
'name': item_def.name if item_def else inv_item['item_id'],
'emoji': item_def.emoji if (item_def and hasattr(item_def, 'emoji')) else '📦',
'quantity': inv_item['quantity'],
'durability': inv_item.get('durability'),
'max_durability': inv_item.get('max_durability'),
'tier': inv_item.get('tier')
})
# Only create corpse if opponent has items
corpse_data = None
if inventory_items:
logger.info(f"Creating player corpse (PvP death) for {opponent['name']} at {opponent['location_id']} with {len(inventory_items)} items")
corpse_id = await db.create_player_corpse(
player_name=opponent['name'],
location_id=opponent['location_id'],
items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory])
)
logger.info(f"Successfully created player corpse (PvP death): ID={corpse_id}, player={opponent['name']}, location={opponent['location_id']}, items_count={len(inventory_items)}")
# Clear opponent's inventory (items are now in corpse)
await db.clear_inventory(opponent['id'])
# Build corpse data for broadcast
corpse_data = {
"id": f"player_{corpse_id}",
"type": "player",
"name": f"{opponent['name']}'s Corpse",
"emoji": "⚰️",
"player_name": opponent['name'],
"loot_count": len(inventory_items),
"items": inventory_items,
"timestamp": time_module.time()
}
else:
logger.info(f"Player {opponent['name']} died (PvP death) with no items, skipping corpse creation")
# Update PvP statistics for both players
await db.update_player_statistics(opponent['id'],
pvp_deaths=1,
pvp_combats_lost=1,
pvp_damage_taken=actual_damage,
pvp_attacks_received=1,
increment=True
)
await db.update_player_statistics(current_player['id'],
players_killed=1,
pvp_combats_won=1,
pvp_damage_dealt=damage,
pvp_attacks_landed=1,
increment=True
)
# Broadcast to location that player died (and corpse if created)
logger.info(f"Broadcasting player_died (PvP death) to location {opponent['location_id']} for player {opponent['name']}")
broadcast_data = {
"message": get_game_message('pvp_defeat_broadcast', locale, opponent=opponent['name'], winner=current_player['name']),
"action": "player_died",
"player_id": opponent['id']
}
if corpse_data:
broadcast_data["corpse"] = corpse_data
await manager.send_to_location(
location_id=opponent['location_id'],
message={
"type": "location_update",
"data": broadcast_data,
"timestamp": datetime.utcnow().isoformat()
}
)
# End PvP combat
await db.end_pvp_combat(pvp_combat['id'])
else:
# Combat continues
# Update PvP statistics for attack
await db.update_player_statistics(current_player['id'],
pvp_damage_dealt=damage,
pvp_attacks_landed=1,
increment=True
)
await db.update_player_statistics(opponent['id'],
pvp_damage_taken=actual_damage,
pvp_attacks_received=1,
increment=True
)
# Update combat state and switch turns
# Add timestamp to make each action unique for duplicate detection
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}" # Add timestamp for uniqueness
}
# No need to update HP in pvp_combat - we use player HP directly
await db.update_pvp_combat(pvp_combat['id'], updates)
await db.update_player_statistics(current_player['id'], damage_dealt=damage, increment=True)
elif req.action == 'skill':
if not req.item_id:
raise HTTPException(status_code=400, detail="skill_id (passed as item_id) required")
from ..services.skills import skills_manager
skill_id = req.item_id
skill = skills_manager.get_skill(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# Check unlocked
stat_val = current_player.get(skill.stat_requirement, 0)
if stat_val < skill.stat_threshold or current_player['level'] < skill.level_requirement:
raise HTTPException(status_code=400, detail="Skill not unlocked")
# Check cooldown
active_effects = await db.get_player_effects(current_player['id'])
cd_source = f"cd:{skill.id}"
for eff in active_effects:
if eff.get('source') == cd_source and eff.get('ticks_remaining', 0) > 0:
raise HTTPException(status_code=400, detail=f"Skill on cooldown ({eff['ticks_remaining']} turns)")
# Check stamina
if current_player['stamina'] < skill.stamina_cost:
raise HTTPException(status_code=400, detail="Not enough stamina")
# Deduct stamina
new_stamina = current_player['stamina'] - skill.stamina_cost
await db.update_player_stamina(current_player['id'], new_stamina)
current_player['stamina'] = new_stamina
# Add cooldown effect
if skill.cooldown > 0:
await db.add_effect(
player_id=current_player['id'],
effect_name=f"{skill.id}_cooldown",
effect_icon="",
effect_type="cooldown",
value=0,
ticks_remaining=skill.cooldown,
persist_after_combat=False,
source=cd_source
)
# Get weapon info
equipment = await db.get_all_equipment(current_player['id'])
weapon_damage = 0
weapon_inv_id = None
inv_item = None
weapon_def = None
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def and weapon_def.stats:
weapon_damage = random.randint(
weapon_def.stats.get('damage_min', 0),
weapon_def.stats.get('damage_max', 0)
)
weapon_inv_id = inv_item['id']
effects = skill.effects
new_opponent_hp = opponent['hp']
damage_done = 0
actual_damage = 0
armor_absorbed = 0
# Damage skills
if 'damage_multiplier' in effects:
base_damage = 5
strength_bonus = int(current_player['strength'] * 1.5)
level_bonus = current_player['level']
variance = random.randint(-2, 2)
raw_damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
multiplier = effects['damage_multiplier']
if 'execute_threshold' in effects:
opponent_hp_pct = opponent['hp'] / opponent['max_hp'] if opponent['max_hp'] > 0 else 1
if opponent_hp_pct <= effects['execute_threshold']:
multiplier = effects.get('execute_multiplier', multiplier)
damage = max(1, int(raw_damage * multiplier))
if effects.get('guaranteed_crit'):
damage = int(damage * 1.5)
num_hits = effects.get('hits', 1)
for hit in range(num_hits):
hit_dmg = damage if hit == 0 else max(1, int(raw_damage * multiplier))
absorbed, broken_armor = await reduce_armor_durability(opponent['id'], hit_dmg)
armor_absorbed += absorbed
for broken in broken_armor:
messages.append(create_combat_message(
"item_broken",
origin="enemy",
item_name=broken['name'],
emoji=broken['emoji']
))
last_action_text += f"\n💔 {opponent['name']}'s {broken['emoji']} {broken['name']} broke!"
actual_hit = max(1, hit_dmg - absorbed)
damage_done += actual_hit
new_opponent_hp = max(0, new_opponent_hp - actual_hit)
actual_damage = damage_done
messages.append(create_combat_message(
"skill_attack",
origin="player",
damage=damage_done,
skill_name=skill.name,
skill_icon=skill.icon,
hits=num_hits
))
last_action_text = f"{current_player['name']} used {skill.name} on {opponent['name']} for {damage_done} damage! (Armor absorbed {armor_absorbed})"
# Lifesteal
if 'lifesteal' in effects:
heal_amount = int(damage_done * effects['lifesteal'])
new_hp = min(current_player['max_hp'], current_player['hp'] + heal_amount)
if new_hp > current_player['hp']:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=heal_amount, skill_icon="🩸"
))
# Poison DoT
if 'poison_damage' in effects:
await db.add_effect(
player_id=opponent['id'],
effect_name="Poison",
effect_icon="🧪",
effect_type="damage",
damage_per_tick=effects['poison_damage'],
ticks_remaining=effects['poison_duration'],
persist_after_combat=True,
source=f"skill_poison:{skill.id}"
)
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"🧪 Poisoned! ({effects['poison_damage']} dmg/turn)"
))
# Stun chance
if 'stun_chance' in effects and random.random() < effects['stun_chance']:
# Stun in PvP can be modeled as taking away a turn
await db.add_effect(
player_id=opponent['id'],
effect_name="Stunned",
effect_icon="💫",
effect_type="debuff",
ticks_remaining=1,
persist_after_combat=False,
source="skill_stun"
)
messages.append(create_combat_message(
"skill_effect", origin="player", message="💫 Stunned! (Currently skip effect)"
))
# Weapon durability
if weapon_inv_id and inv_item and inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
if new_durability is None:
messages.append(create_combat_message("weapon_broke", origin="player", item_name=weapon_def.name if weapon_def else "weapon"))
await db.unequip_item(current_player['id'], 'weapon')
# Heal skills
if 'heal_percent' in effects:
heal_amount = int(current_player['max_hp'] * effects['heal_percent'])
new_hp = min(current_player['max_hp'], current_player['hp'] + heal_amount)
actual_heal = new_hp - current_player['hp']
if actual_heal > 0:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=actual_heal, skill_icon=skill.icon
))
last_action_text = f"{current_player['name']} used {skill.name} and healed for {actual_heal} HP!"
# Fortify
if 'armor_boost' in effects:
await db.add_effect(
player_id=current_player['id'],
effect_name="Fortify",
effect_icon="🛡️",
effect_type="buff",
value=effects['armor_boost'],
ticks_remaining=effects['duration'],
persist_after_combat=False,
source=f"skill_fortify:{skill.id}"
)
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"🛡️ Fortified! (+{effects['armor_boost']} Armor)"
))
last_action_text = f"{current_player['name']} used {skill.name} and bolstered their defenses!"
# Process opponent HP if damage done
if damage_done > 0:
await db.update_player(opponent['id'], hp=new_opponent_hp)
if new_opponent_hp <= 0:
last_action_text += f"\n🏆 {current_player['name']} has defeated {opponent['name']}!"
messages.append(create_combat_message("victory", origin="neutral", npc_name=opponent['name']))
combat_over = True
winner_id = current_player['id']
await db.update_player(opponent['id'], hp=0, is_dead=True)
# Create corpse
import json
import time as time_module
inventory = await db.get_inventory(opponent['id'])
inventory_items = []
for inv_item in inventory:
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
inventory_items.append({
'item_id': inv_item['item_id'],
'name': item_def.name if item_def else inv_item['item_id'],
'emoji': item_def.emoji if (item_def and hasattr(item_def, 'emoji')) else '📦',
'quantity': inv_item['quantity'],
'durability': inv_item.get('durability'),
'max_durability': inv_item.get('max_durability'),
'tier': inv_item.get('tier')
})
corpse_data = None
if inventory_items:
corpse_id = await db.create_player_corpse(
player_name=opponent['name'],
location_id=opponent['location_id'],
items=json.dumps([{'item_id': i['item_id'], 'quantity': i['quantity']} for i in inventory])
)
await db.clear_inventory(opponent['id'])
corpse_data = {
"id": f"player_{corpse_id}",
"type": "player",
"name": f"{opponent['name']}'s Corpse",
"emoji": "⚰️",
"player_name": opponent['name'],
"loot_count": len(inventory_items),
"items": inventory_items,
"timestamp": time_module.time()
}
# Update statistics
await db.update_player_statistics(opponent['id'], pvp_deaths=1, pvp_combats_lost=1, pvp_damage_taken=actual_damage, pvp_attacks_received=1, increment=True)
await db.update_player_statistics(current_player['id'], players_killed=1, pvp_combats_won=1, pvp_damage_dealt=actual_damage, pvp_attacks_landed=1, increment=True)
# Broadcast corpse
broadcast_data = {
"message": get_game_message('pvp_defeat_broadcast', locale, opponent=opponent['name'], winner=current_player['name']),
"action": "player_died",
"player_id": opponent['id']
}
if corpse_data:
broadcast_data["corpse"] = corpse_data
await manager.send_to_location(
location_id=opponent['location_id'],
message={
"type": "location_update",
"data": broadcast_data,
"timestamp": datetime.utcnow().isoformat()
}
)
await db.end_pvp_combat(pvp_combat['id'])
else:
await db.update_player_statistics(current_player['id'], pvp_damage_dealt=actual_damage, pvp_attacks_landed=1, increment=True)
await db.update_player_statistics(opponent['id'], pvp_damage_taken=actual_damage, pvp_attacks_received=1, increment=True)
# End of turn swap
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
}
await db.update_pvp_combat(pvp_combat['id'], updates)
else:
# Skill didn't do damage, but turn still ends
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
}
await db.update_pvp_combat(pvp_combat['id'], updates)
elif req.action == 'use_item':
if not req.item_id:
raise HTTPException(status_code=400, detail="item_id required for use_item action")
player_inventory = await db.get_inventory(current_player['id'])
inv_item = next((item for item in player_inventory if item['item_id'] == req.item_id), None)
if not inv_item:
raise HTTPException(status_code=400, detail="Item not found in inventory")
item_def = ITEMS_MANAGER.get_item(req.item_id)
if not item_def or not item_def.combat_usable:
raise HTTPException(status_code=400, detail="This item cannot be used in combat")
item_name = get_locale_string(item_def.name, locale)
effects_applied = []
if item_def.effects.get('status_effect'):
status_data = item_def.effects['status_effect']
await db.add_effect(
player_id=current_player['id'],
effect_name=status_data['name'],
effect_icon=status_data.get('icon', ''),
effect_type=status_data.get('type', 'buff'),
damage_per_tick=status_data.get('damage_per_tick', 0),
value=status_data.get('value', 0),
ticks_remaining=status_data.get('ticks', 3),
persist_after_combat=True,
source=f"item:{item_def.id}"
)
effects_applied.append(f"Applied {status_data['name']}")
if item_def.effects.get('cures'):
for cure_effect in item_def.effects['cures']:
if await db.remove_effect(current_player['id'], cure_effect):
effects_applied.append(f"Cured {cure_effect}")
if item_def.effects.get('hp_restore') and item_def.effects['hp_restore'] > 0:
item_effectiveness = current_player_stats.get('item_effectiveness', 1.0)
restore_amount = int(item_def.effects['hp_restore'] * item_effectiveness)
new_hp = min(current_player['max_hp'], current_player['hp'] + restore_amount)
actual_heal = new_hp - current_player['hp']
if actual_heal > 0:
await db.update_player(current_player['id'], hp=new_hp)
current_player['hp'] = new_hp
effects_applied.append(get_game_message('healed_amount_text', locale, amount=actual_heal))
messages.append(create_combat_message("item_heal", origin="player", heal=actual_heal))
if item_def.effects.get('stamina_restore') and item_def.effects['stamina_restore'] > 0:
item_effectiveness = current_player_stats.get('item_effectiveness', 1.0)
restore_amount = int(item_def.effects['stamina_restore'] * item_effectiveness)
new_stamina = min(current_player['max_stamina'], current_player['stamina'] + restore_amount)
actual_restore = new_stamina - current_player['stamina']
if actual_restore > 0:
await db.update_player_stamina(current_player['id'], new_stamina)
effects_applied.append(f"Restored {actual_restore} stamina")
messages.append(create_combat_message("item_restore", origin="player", stat="stamina", amount=actual_restore))
if inv_item['quantity'] > 1:
await db.update_inventory_quantity(inv_item['id'], inv_item['quantity'] - 1)
else:
await db.remove_from_inventory(inv_item['id'])
messages.append(create_combat_message(
"use_item", origin="player", item_name=item_name,
message=get_game_message('used_item_text', locale, name=item_name) + " (" + ", ".join(effects_applied) + ")"
))
last_action_text = f"{current_player['name']} used {item_name}!"
# End of turn swap
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
}
await db.update_pvp_combat(pvp_combat['id'], updates)
elif req.action == 'flee':
# 50% chance to flee from PvP
if random.random() < 0.5:
last_action_text = get_game_message('flee_success_text', locale, name=current_player['name'])
messages.append(create_combat_message(
"flee_success",
origin="player",
message=last_action_text
))
combat_over = True
# Mark as fled, store last action with timestamp, and end combat
flee_field = 'attacker_fled' if is_attacker else 'defender_fled'
await db.update_pvp_combat(pvp_combat['id'], {
flee_field: True,
'last_action': f"{last_action_text}|{time.time()}"
})
await db.end_pvp_combat(pvp_combat['id'])
await db.update_player_statistics(current_player['id'],
pvp_successful_flees=1,
increment=True
)
else:
# Failed to flee, skip turn
last_action_text = get_game_message('flee_fail_text', locale, name=current_player['name'])
messages.append(create_combat_message(
"flee_fail",
origin="player",
reason="chance",
message=last_action_text
))
await db.update_pvp_combat(pvp_combat['id'], {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{last_action_text}|{time.time()}"
})
await db.update_player_statistics(current_player['id'],
pvp_failed_flees=1,
increment=True
)
# Send WebSocket combat updates to both players
# Get fresh PvP combat data
updated_pvp = await db.get_pvp_combat_by_id(pvp_combat['id'])
# Get fresh player data for HP updates
fresh_attacker = await db.get_player_by_id(pvp_combat['attacker_character_id'])
fresh_defender = await db.get_player_by_id(pvp_combat['defender_character_id'])
# Send to both players with enriched data (like the API endpoint does)
for player_id in [pvp_combat['attacker_character_id'], pvp_combat['defender_character_id']]:
is_attacker = player_id == pvp_combat['attacker_character_id']
your_turn = (is_attacker and updated_pvp['turn'] == 'attacker') or \
(not is_attacker and updated_pvp['turn'] == 'defender')
# Calculate time remaining
import time
time_elapsed = time.time() - updated_pvp['turn_started_at']
time_remaining = max(0, updated_pvp['turn_timeout_seconds'] - time_elapsed)
# Build enriched pvp_combat object like the API does
enriched_pvp = {
"id": updated_pvp['id'],
"attacker": {
"id": fresh_attacker['id'],
"username": fresh_attacker['name'],
"level": fresh_attacker['level'],
"hp": fresh_attacker['hp'],
"max_hp": fresh_attacker['max_hp'],
"image": "/images/characters/default.webp"
},
"defender": {
"id": fresh_defender['id'],
"username": fresh_defender['name'],
"level": fresh_defender['level'],
"hp": fresh_defender['hp'],
"max_hp": fresh_defender['max_hp'],
"image": "/images/characters/default.webp"
},
"is_attacker": is_attacker,
"your_turn": your_turn,
"current_turn": updated_pvp['turn'],
"time_remaining": int(time_remaining),
"location_id": updated_pvp['location_id'],
"last_action": updated_pvp.get('last_action'),
"combat_over": updated_pvp.get('attacker_fled', False) or updated_pvp.get('defender_fled', False) or \
fresh_attacker['hp'] <= 0 or fresh_defender['hp'] <= 0,
"attacker_fled": updated_pvp.get('attacker_fled', False),
"defender_fled": updated_pvp.get('defender_fled', False)
}
# Determine which player object to send as "player" data for global state updates
player_data = {
"id": fresh_attacker['id'],
"username": fresh_attacker['name'],
"level": fresh_attacker['level'],
"hp": fresh_attacker['hp'],
"max_hp": fresh_attacker['max_hp'],
"xp": fresh_attacker['xp'],
"max_xp": fresh_attacker['level'] * 1000
} if is_attacker else {
"id": fresh_defender['id'],
"username": fresh_defender['name'],
"level": fresh_defender['level'],
"hp": fresh_defender['hp'],
"max_hp": fresh_defender['max_hp'],
"xp": fresh_defender['xp'],
"max_xp": fresh_defender['level'] * 1000
}
# Process messages for this player
# Use actor_id (current_player['id']) to identify who performed the action
# If I am NOT the actor, then the action was done BY an enemy against me.
# So I swap 'player' origin (Actor) to 'enemy' origin (Attacker from my perspective).
actor_id = current_player['id']
import copy
player_messages = []
is_actor = (player_id == actor_id)
# For the victim (non-actor), we strip the pre-generated text messages so frontend can generate
# "Enemy hit you" instead of "Alice hit Bob"
if not is_actor:
msgs_copy = copy.deepcopy(messages)
for m in msgs_copy:
if m.get('origin') == 'player':
m['origin'] = 'enemy'
elif m.get('origin') == 'enemy':
m['origin'] = 'player'
player_messages.append(m)
else:
player_messages = messages
# Send separate payloads
# For actor: keep full text
# For victim: strip main message text so frontend uses data to render "Enemy hit you"
payload_data = {
"message": last_action_text if is_actor else None, # key refactor: hide text for victim
"log_entry": last_action_text if is_actor else None,
"pvp_combat": enriched_pvp,
"combat_over": combat_over,
"winner_id": winner_id,
"player": player_data,
"attacker_hp": fresh_attacker['hp'],
"defender_hp": fresh_defender['hp'],
"messages": player_messages
}
await manager.send_personal_message(player_id, {
"type": "combat_update",
"data": payload_data,
"timestamp": datetime.utcnow().isoformat()
})
return {
"success": True,
"messages": messages,
"combat_over": combat_over,
"winner_id": winner_id,
"pvp_combat": updated_pvp
}