Files
echoes-of-the-ash/api/game_logic.py

951 lines
40 KiB
Python

"""
Standalone game logic for the API.
Contains all game mechanics without bot dependencies.
"""
import random
import time
from typing import Dict, Any, Tuple, Optional, List
from . import database as db
from .services.helpers import get_locale_string, translate_travel_message, create_combat_message, get_game_message
async def move_player(player_id: int, direction: str, locations: Dict, locale: str = 'en') -> Tuple[bool, str, Optional[str], int, int]:
"""
Move player in a direction.
Returns: (success, message, new_location_id, stamina_cost, distance_meters)
"""
player = await db.get_character_by_id(player_id)
if not player:
return False, "Player not found", None, 0, 0
current_location_id = player['location_id']
current_location = locations.get(current_location_id)
if not current_location:
return False, "Current location not found", None, 0, 0
# Check if direction is valid
if direction not in current_location.exits:
return False, f"You cannot go {direction} from here.", None, 0, 0
new_location_id = current_location.exits[direction]
new_location = locations.get(new_location_id)
if not new_location:
return False, "Destination not found", None, 0, 0
# Calculate total weight and capacity
from api.items import items_manager as ITEMS_MANAGER
from api.services.helpers import calculate_player_capacity
inventory = await db.get_inventory(player_id)
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(inventory, ITEMS_MANAGER)
# Calculate distance between locations (1 coordinate unit = 100 meters)
import math
coord_distance = math.sqrt(
(new_location.x - current_location.x)**2 +
(new_location.y - current_location.y)**2
)
distance = int(coord_distance * 100) # Convert to meters, round to integer
# Calculate stamina cost: base from distance, adjusted by weight and agility
base_cost = max(1, round(distance / 50)) # 50m = 1 stamina
weight_penalty = int(current_weight / 10)
agility_reduction = int(player.get('agility', 5) / 3)
# Add over-capacity penalty (50% extra stamina cost if over limit)
over_capacity_penalty = 0
if current_weight > max_weight or current_volume > max_volume:
weight_excess_ratio = max(0, (current_weight - max_weight) / max_weight) if max_weight > 0 else 0
volume_excess_ratio = max(0, (current_volume - max_volume) / max_volume) if max_volume > 0 else 0
excess_ratio = max(weight_excess_ratio, volume_excess_ratio)
# Penalty scales from 50% to 200% based on how much over capacity
over_capacity_penalty = int((base_cost + weight_penalty) * (0.5 + min(1.5, excess_ratio)))
stamina_cost = max(1, base_cost + weight_penalty + over_capacity_penalty - agility_reduction)
# Check stamina
if player['stamina'] < stamina_cost:
return False, get_game_message('exhausted_move', locale), None, 0, 0
# Update player location and stamina
await db.update_character(
player_id,
location_id=new_location_id,
stamina=max(0, player['stamina'] - stamina_cost)
)
translated_location = get_locale_string(new_location.name, locale)
travel_message = translate_travel_message(direction, translated_location, locale)
return True, travel_message, new_location_id, stamina_cost, distance
async def inspect_area(player_id: int, location, interactables_data: Dict, locale: str = 'en') -> str:
"""
Inspect the current area and return detailed information.
Returns formatted text with interactables and their actions.
"""
player = await db.get_player_by_id(player_id)
if not player:
return "Player not found"
# Check if player has enough stamina
if player['stamina'] < 1:
return get_game_message('exhausted_inspect', locale)
# Deduct stamina
await db.update_player_stamina(player_id, player['stamina'] - 1)
# Build inspection message
lines = [get_game_message('inspecting_title', locale, name=location.name)]
lines.append(location.description)
lines.append("")
if location.interactables:
lines.append(get_game_message('interactables_title', locale))
for interactable in location.interactables:
lines.append(f"• **{interactable.name}**")
if interactable.actions:
actions_text = ", ".join([f"{action.label} (⚡{action.stamina_cost})" for action in interactable.actions])
lines.append(f" Actions: {actions_text}")
lines.append("")
if location.npcs:
lines.append(f"{get_game_message('npcs_title', locale)} {', '.join(location.npcs)}")
lines.append("")
# Check for dropped items
dropped_items = await db.get_dropped_items(location.id)
if dropped_items:
lines.append(get_game_message('items_ground_title', locale))
for item in dropped_items:
lines.append(f"{item['item_id']} x{item['quantity']}")
return "\n".join(lines)
async def interact_with_object(
player_id: int,
interactable_id: str,
action_id: str,
location,
items_manager,
locale: str = 'en'
) -> Dict[str, Any]:
"""
Interact with an object using a specific action.
Returns: {success, message, items_found, damage_taken, stamina_cost}
"""
player = await db.get_player_by_id(player_id)
if not player:
return {"success": False, "message": "Player not found"}
# Find the interactable (match by id or instance_id)
interactable = None
for obj in location.interactables:
if obj.id == interactable_id or (hasattr(obj, 'instance_id') and obj.instance_id == interactable_id):
interactable = obj
break
if not interactable:
return {"success": False, "message": get_game_message('object_not_found', locale)}
# Find the action
action = None
for act in interactable.actions:
if act.id == action_id:
action = act
break
if not action:
return {"success": False, "message": get_game_message('action_not_found', locale)}
# Check stamina
if player['stamina'] < action.stamina_cost:
return {
"success": False,
"message": get_game_message('not_enough_stamina', locale, cost=action.stamina_cost, current=player['stamina'])
}
# Check cooldown for this specific action
cooldown_expiry = await db.get_interactable_cooldown(interactable_id, action_id)
if cooldown_expiry:
remaining = int(cooldown_expiry - time.time())
return {
"success": False,
"message": get_game_message('cooldown_wait', locale, seconds=remaining)
}
# Deduct stamina
new_stamina = player['stamina'] - action.stamina_cost
await db.update_player_stamina(player_id, new_stamina)
# Determine outcome (simple success/failure for now)
# TODO: Implement proper skill checks
roll = random.randint(1, 100)
if roll <= 10: # 10% critical failure
outcome_key = 'critical_failure'
elif roll <= 30: # 20% failure
outcome_key = 'failure'
else: # 70% success
outcome_key = 'success'
outcome = action.outcomes.get(outcome_key)
if not outcome:
# Fallback to success if outcome not defined
outcome = action.outcomes.get('success')
if not outcome:
return {
"success": False,
"message": get_game_message('action_no_outcomes', locale)
}
# Process outcome
items_found = []
items_dropped = []
damage_taken = outcome.damage_taken
# Calculate current capacity and fetch derived stats
from api.services.helpers import calculate_player_capacity
from api.services.stats import calculate_derived_stats
from api.items import items_manager as ITEMS_MANAGER
inventory = await db.get_inventory(player_id)
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(inventory, ITEMS_MANAGER)
stats = await calculate_derived_stats(player_id)
loot_quality = stats.get('loot_quality', 1.0)
# Add items to inventory (or drop if over capacity)
for item_id, quantity in outcome.items_reward.items():
item = items_manager.get_item(item_id)
if not item:
continue
item_name = get_locale_string(item.name, locale) if item else item_id
emoji = item.emoji if item and hasattr(item, 'emoji') else ''
# Check if item has durability (unique item)
has_durability = hasattr(item, 'durability') and item.durability is not None
# For items with durability, we need to create each one individually
if has_durability:
for _ in range(quantity):
# Check if item fits in inventory
if (current_weight + item.weight <= max_weight and
current_volume + item.volume <= max_volume):
# Add to inventory with durability properties
await db.add_item_to_inventory(
player_id,
item_id,
quantity=1,
durability=item.durability,
max_durability=item.durability,
tier=getattr(item, 'tier', None)
)
items_found.append(f"{emoji} {item_name}")
current_weight += item.weight
current_volume += item.volume
else:
# Create unique_item and drop to ground
# Save base stats to unique_stats
base_stats = {k: int(v) if isinstance(v, (int, float)) else v for k, v in item.stats.items()} if item.stats else {}
unique_item_id = await db.create_unique_item(
item_id=item_id,
durability=item.durability,
max_durability=item.durability,
tier=getattr(item, 'tier', None),
unique_stats=base_stats
)
await db.drop_item_to_world(item_id, 1, player['location_id'], unique_item_id=unique_item_id)
items_dropped.append(f"{emoji} {item_name}")
else:
# Stackable items - apply loot quality bonus for resources and consumables
if getattr(item, 'category', item.type) in ['resource', 'consumable'] and loot_quality > 1.0:
bonus_chance = loot_quality - 1.0
if random.random() < bonus_chance:
quantity += 1
item_weight = item.weight * quantity
item_volume = item.volume * quantity
if (current_weight + item_weight <= max_weight and
current_volume + item_volume <= max_volume):
# Add to inventory
await db.add_item_to_inventory(player_id, item_id, quantity)
items_found.append(f"{emoji} {item_name} x{quantity}")
current_weight += item_weight
current_volume += item_volume
else:
# Drop to ground
await db.drop_item_to_world(item_id, quantity, player['location_id'])
items_dropped.append(f"{emoji} {item_name} x{quantity}")
# Apply damage
if damage_taken > 0:
new_hp = max(0, player['hp'] - damage_taken)
await db.update_player_hp(player_id, new_hp)
# Check if player died
if new_hp <= 0:
await db.update_player(player_id, is_dead=True)
# Set cooldown for this specific action (60 seconds default)
await db.set_interactable_cooldown(interactable_id, action_id, 60)
# Build message
final_message = get_locale_string(outcome.text, locale)
if items_dropped:
final_message += f"\n⚠️ {get_game_message('inventory_full', locale)}! {get_game_message('dropped_to_ground', locale)}: {', '.join(items_dropped)}"
return {
"success": True,
"message": final_message,
"items_found": items_found,
"items_dropped": items_dropped,
"damage_taken": damage_taken,
"stamina_cost": action.stamina_cost,
"new_stamina": new_stamina,
"new_hp": player['hp'] - damage_taken if damage_taken > 0 else player['hp']
}
async def use_item(player_id: int, item_id: str, items_manager, locale: str = 'en') -> Dict[str, Any]:
"""
Use an item from inventory.
Returns: {success, message, effects}
"""
player = await db.get_player_by_id(player_id)
if not player:
return {"success": False, "message": "Player not found"}
# Get derived stats for item effectiveness
# In some paths redis_manager might not be injected, so we attempt to fetch it from websockets module if needed,
# or let stats service fetch without cache
from api.services.stats import calculate_derived_stats
import api.core.websockets as ws
redis_mgr = getattr(ws.manager, 'redis_manager', None)
stats = await calculate_derived_stats(player['id'], redis_mgr)
item_effectiveness = stats.get('item_effectiveness', 1.0)
# Check if player has the item
inventory = await db.get_inventory(player_id)
item_entry = None
for inv_item in inventory:
if inv_item['item_id'] == item_id:
item_entry = inv_item
break
if not item_entry:
return {"success": False, "message": get_game_message('no_item', locale)}
# Get item data
item = items_manager.get_item(item_id)
if not item:
return {"success": False, "message": "Item not found in game data"}
if not item.consumable:
return {"success": False, "message": get_game_message('cannot_use', locale)}
# Apply item effects
effects = {}
effects_msg = []
# 1. Apply Status Effects (e.g. Regeneration from Bandage)
if 'status_effect' in item.effects:
status_data = item.effects['status_effect']
# Check if effect already exists
current_effects = await db.get_player_effects(player_id)
effect_name = status_data['name']
# Handle potential dict/string difference in validation (db stores as string usually)
# But we need to compare with what's in the DB.
# DB get_player_effects returns list of dicts with 'effect_name' key.
is_active = False
for effect in current_effects:
# Simple string comparison should suffice as both should be localized keys or raw strings
if effect['effect_name'] == effect_name:
is_active = True
break
if is_active:
return {"success": False, "message": get_game_message('effect_already_active', locale)}
await db.add_effect(
player_id=player['id'],
effect_name=status_data['name'],
effect_icon=status_data.get('icon', ''),
effect_type=status_data.get('type', 'buff'),
damage_per_tick=status_data.get('damage_per_tick', 0),
value=status_data.get('value', 0),
ticks_remaining=status_data.get('ticks', 3),
persist_after_combat=True, # Consumable effects usually persist
source=f"item:{item.id}"
)
effects['status_applied'] = status_data['name']
effects_msg.append(f"Applied {get_locale_string(status_data['name'], locale) if isinstance(status_data['name'], dict) else status_data['name']}")
# 2. Cure Status Effects
if 'cures' in item.effects:
cures = item.effects['cures']
cured_list = []
for cure_effect in cures:
if await db.remove_effect(player['id'], cure_effect):
cured_list.append(cure_effect)
if cured_list:
effects['cured'] = cured_list
effects_msg.append(f"{get_game_message('cured', locale)}: {', '.join(cured_list)}")
# 3. Direct Healing (Legacy/Instant)
if 'hp_restore' in item.effects:
base_hp_restore = item.effects['hp_restore']
hp_restore = int(base_hp_restore * item_effectiveness)
old_hp = player['hp']
new_hp = min(player['max_hp'], old_hp + hp_restore)
actual_restored = new_hp - old_hp
if actual_restored > 0:
await db.update_player_hp(player_id, new_hp)
effects['hp_restored'] = actual_restored
effects_msg.append(f"+{actual_restored} HP")
if 'stamina_restore' in item.effects:
base_stamina_restore = item.effects['stamina_restore']
stamina_restore = int(base_stamina_restore * item_effectiveness)
old_stamina = player['stamina']
new_stamina = min(player['max_stamina'], old_stamina + stamina_restore)
actual_restored = new_stamina - old_stamina
if actual_restored > 0:
await db.update_player_stamina(player_id, new_stamina)
effects['stamina_restored'] = actual_restored
effects_msg.append(f"+{actual_restored} Stamina")
# Consume the item (remove 1 from inventory)
await db.remove_item_from_inventory(player_id, item_id, 1)
# Track statistics
stat_updates = {"items_used": 1, "increment": True}
if 'hp_restored' in effects:
stat_updates['hp_restored'] = effects['hp_restored']
if 'stamina_restored' in effects:
stat_updates['stamina_restored'] = effects['stamina_restored']
await db.update_player_statistics(player_id, **stat_updates)
# Build message
msg = f"{get_game_message('item_used', locale, name=get_locale_string(item.name, locale))}"
if effects_msg:
msg += f" ({', '.join(effects_msg)})"
return {
"success": True,
"message": msg,
"effects": effects
}
async def pickup_item(player_id: int, item_id: int, location_id: str, quantity: int = None, items_manager=None, locale: str = 'en') -> Dict[str, Any]:
"""
Pick up an item from the ground.
item_id is the dropped_item id, not the item_id field.
quantity: how many to pick up (None = all)
items_manager: ItemsManager instance to get item definitions
Returns: {success, message}
"""
# Get the dropped item by its ID
dropped_item = await db.get_dropped_item(item_id)
if not dropped_item:
return {"success": False, "message": get_game_message('item_not_found_ground', locale)}
# Get item definition
item_def = items_manager.get_item(dropped_item['item_id']) if items_manager else None
if not item_def:
return {"success": False, "message": "Item data not found"}
# Determine how many to pick up
available_qty = dropped_item['quantity']
if quantity is None or quantity >= available_qty:
pickup_qty = available_qty
else:
if quantity < 1:
return {"success": False, "message": get_game_message('invalid_quantity', locale)}
pickup_qty = quantity
# Get player and calculate capacity
from api.services.helpers import calculate_player_capacity
player = await db.get_player_by_id(player_id)
inventory = await db.get_inventory(player_id)
# Calculate current weight and volume (including equipped bag capacity)
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(inventory, items_manager)
# Calculate weight and volume for items to pick up
item_weight = item_def.weight * pickup_qty
item_volume = item_def.volume * pickup_qty
new_weight = current_weight + item_weight
new_volume = current_volume + item_volume
# Check limits
if new_weight > max_weight:
return {
"success": False,
"message": get_game_message('item_too_heavy', locale, emoji=item_def.emoji, name=get_locale_string(item_def.name, locale), qty=pickup_qty, weight=item_weight, current=current_weight, max=max_weight)
}
if new_volume > max_volume:
return {
"success": False,
"message": get_game_message('item_too_large', locale, emoji=item_def.emoji, name=get_locale_string(item_def.name, locale), qty=pickup_qty, volume=item_volume, current=current_volume, max=max_volume)
}
# Items fit - update dropped item quantity or remove it
if pickup_qty >= available_qty:
await db.remove_dropped_item(item_id)
else:
new_qty = available_qty - pickup_qty
await db.update_dropped_item_quantity(item_id, new_qty)
# Add to inventory (pass unique_item_id if it's a unique item)
await db.add_item_to_inventory(
player_id,
dropped_item['item_id'],
pickup_qty,
unique_item_id=dropped_item.get('unique_item_id')
)
return {
"success": True,
"message": f"{get_game_message('picked_up', locale)} {item_def.emoji} {get_locale_string(item_def.name, locale)} x{pickup_qty}"
}
async def check_and_apply_level_up(player_id: int) -> Dict[str, Any]:
"""
Check if player has enough XP to level up and apply it.
Returns: {leveled_up: bool, new_level: int, levels_gained: int}
"""
player = await db.get_player_by_id(player_id)
if not player:
return {"leveled_up": False, "new_level": 1, "levels_gained": 0}
current_level = player['level']
current_xp = player['xp']
levels_gained = 0
# Check for level ups (can level up multiple times if enough XP)
while current_xp >= (current_level * 100):
current_xp -= (current_level * 100)
current_level += 1
levels_gained += 1
if levels_gained > 0:
# Update player with new level, remaining XP, and unspent points
new_unspent_points = player['unspent_points'] + levels_gained
await db.update_player(
player_id,
level=current_level,
xp=current_xp,
unspent_points=new_unspent_points
)
# Invalidate cached derived stats (level affects max_hp, max_stamina, attack_power, etc.)
from api.services.stats import invalidate_stats_cache
try:
from api.core.websockets import manager as ws_manager
await invalidate_stats_cache(player_id, getattr(ws_manager, 'redis_manager', None))
except Exception:
pass
return {
"leveled_up": True,
"new_level": current_level,
"levels_gained": levels_gained
}
return {"leveled_up": False, "new_level": current_level, "levels_gained": 0}
# ============================================================================
# STATUS EFFECTS UTILITIES
# ============================================================================
def calculate_status_impact(effects: list) -> int:
"""
Calculate total impact from all status effects.
Positive value = Damage
Negative value = Healing
Args:
effects: List of status effect dicts
Returns:
Total impact per tick
"""
return sum(effect.get('damage_per_tick', 0) for effect in effects)
# ============================================================================
# COMBAT UTILITIES
# ============================================================================
def generate_npc_intent(npc_def, combat_state: dict) -> dict:
"""
Generate the NEXT intent for an NPC.
Returns a dict with intent type and details.
"""
import random
from api.services.skills import skills_manager
npc_hp_pct = combat_state['npc_hp'] / combat_state['npc_max_hp'] if combat_state['npc_max_hp'] > 0 else 0
skills = getattr(npc_def, 'skills', [])
active_effects = combat_state.get('npc_status_effects', '')
cooldowns = {}
if active_effects:
for eff in active_effects.split('|'):
if eff.startswith('cd_'):
parts = eff.split(':')
if len(parts) >= 2:
cooldowns[parts[0][3:]] = int(parts[1])
available_skills = []
has_heal = None
has_buff = None
damage_skills = []
for skill_id in skills:
if cooldowns.get(skill_id, 0) > 0:
continue
skill = skills_manager.get_skill(skill_id)
if not skill: continue
available_skills.append(skill)
if 'heal_percent' in skill.effects:
has_heal = skill
elif 'buff' in skill.effects:
has_buff = skill
else:
damage_skills.append(skill)
# 1. Survival First
if has_heal and npc_hp_pct < 0.3:
if random.random() < 0.8:
return {"type": "skill", "value": has_heal.id}
# 2. Buffs
if has_buff:
buff_name = has_buff.effects['buff']
is_buff_active = False
if active_effects:
for eff in active_effects.split('|'):
if eff.startswith(buff_name + ':'):
is_buff_active = True
break
if not is_buff_active and random.random() < 0.6:
return {"type": "skill", "value": has_buff.id}
# 3. Telegraphed Attack Check (15% chance if health > 30%)
if npc_hp_pct > 0.3 and random.random() < 0.15:
return {"type": "charge", "value": "charging_attack"}
# 4. Damage Skills
if damage_skills and random.random() < 0.4:
chosen = random.choice(damage_skills)
return {"type": "skill", "value": chosen.id}
# Default to attack or defend (legacy logic)
roll = random.random()
if npc_hp_pct < 0.5 and roll < 0.1:
return {"type": "defend", "value": 0}
return {"type": "attack", "value": 0}
async def npc_attack(player_id: int, combat: dict, npc_def, reduce_armor_func, player_stats: dict = None, locale: str = 'en') -> Tuple[List[dict], bool]:
"""
Execute NPC turn based on PREVIOUS intent, then generate NEXT intent.
Returns: (messages_list, player_defeated)
"""
import random
import time
from api import database as db
from api.services.helpers import create_combat_message, get_game_message, get_locale_string
from api.services.skills import skills_manager
player = await db.get_player_by_id(player_id)
if not player:
return [], True
messages = []
# 1. PROCESS NPC STATUS EFFECTS
npc_hp = combat['npc_hp']
npc_max_hp = combat['npc_max_hp']
npc_status_str = combat.get('npc_status_effects', '')
is_stunned = False
if npc_status_str:
effects_list = npc_status_str.split('|')
active_effects = []
npc_damage_taken = 0
npc_healing_received = 0
for effect_str in effects_list:
if not effect_str: continue
try:
parts = effect_str.split(':')
name = parts[0]
if name == 'stun' and len(parts) >= 2:
ticks = int(parts[1])
if ticks > 0:
is_stunned = True
messages.append(create_combat_message(
"skill_effect",
origin="enemy",
message=get_game_message('npc_stunned_cannot_act', locale, npc_name=get_locale_string(npc_def.name, locale))
))
ticks -= 1
if ticks > 0:
active_effects.append(f"stun:{ticks}")
continue
if name.startswith('cd_') and len(parts) >= 3:
ticks = int(parts[2])
ticks -= 1
if ticks > 0:
active_effects.append(f"{name}:{parts[1]}:{ticks}")
continue
if len(parts) >= 3:
dmg = int(parts[1])
ticks = int(parts[2])
if ticks > 0:
if dmg > 0:
npc_damage_taken += dmg
messages.append(create_combat_message(
"effect_damage",
origin="enemy",
damage=dmg,
effect_name=name,
npc_name=npc_def.name
))
elif dmg < 0:
heal = abs(dmg)
npc_healing_received += heal
messages.append(create_combat_message(
"effect_heal",
origin="enemy",
heal=heal,
effect_name=name,
npc_name=npc_def.name
))
elif name in ["berserker_rage", "fortify", "analyzed"]:
pass
ticks -= 1
if ticks > 0:
active_effects.append(f"{name}:{dmg}:{ticks}")
except Exception as e:
print(f"Error parsing NPC status: {e}")
new_status_str = "|".join(active_effects)
if new_status_str != npc_status_str:
await db.update_combat(player_id, {'npc_status_effects': new_status_str})
if npc_damage_taken > 0:
npc_hp = max(0, npc_hp - npc_damage_taken)
if npc_healing_received > 0:
npc_hp = min(npc_max_hp, npc_hp + npc_healing_received)
await db.update_combat(player_id, {'npc_hp': npc_hp})
if npc_hp <= 0:
messages.append(create_combat_message("victory", origin="neutral", npc_name=npc_def.name))
return messages, False
current_intent_str = combat.get('npc_intent', 'attack')
if not current_intent_str:
current_intent_str = 'attack'
intent_parts = current_intent_str.split(':')
intent_type = intent_parts[0]
intent_value = intent_parts[1] if len(intent_parts) > 1 else None
actual_damage = 0
new_player_hp = player['hp']
if npc_hp > 0 and not is_stunned:
if intent_type == 'defend':
heal_amount = int(combat['npc_max_hp'] * 0.05)
new_npc_hp = min(combat['npc_max_hp'], combat['npc_hp'] + heal_amount)
await db.update_combat(player_id, {'npc_hp': new_npc_hp})
messages.append(create_combat_message("enemy_defend", origin="enemy", npc_name=npc_def.name, heal=heal_amount))
elif intent_type == 'charge':
messages.append(create_combat_message(
"skill_effect", origin="enemy", message=get_game_message('enemy_charging', locale, enemy=get_locale_string(npc_def.name, locale))
))
elif intent_type in ('charging_attack', 'special', 'attack', 'skill'):
npc_damage = random.randint(npc_def.damage_min, npc_def.damage_max)
skill = None
is_charging = intent_type == 'charging_attack'
if intent_type == 'charging_attack':
npc_damage = int(npc_damage * 2.5)
elif intent_type == 'special':
npc_damage = int(npc_damage * 1.5)
elif intent_type == 'skill' and intent_value:
skill = skills_manager.get_skill(intent_value)
if skill:
if skill.cooldown > 0:
cd_str = f"cd_{skill.id}:0:{skill.cooldown}"
curr_combat = await db.get_active_combat(player_id)
curr_status = curr_combat.get('npc_status_effects', '') if curr_combat else ''
new_status = curr_status + f"|{cd_str}" if curr_status else cd_str
await db.update_combat(player_id, {'npc_status_effects': new_status})
effects = skill.effects
if 'heal_percent' in effects:
heal_amount = int(combat['npc_max_hp'] * effects['heal_percent'])
new_npc_hp = min(combat['npc_max_hp'], npc_hp + heal_amount)
await db.update_combat(player_id, {'npc_hp': new_npc_hp})
messages.append(create_combat_message("skill_heal", origin="enemy", heal=heal_amount, skill_icon=skill.icon, skill_name=get_locale_string(skill.name, locale), npc_name=npc_def.name))
npc_damage = 0
if 'buff' in effects:
buff_str = f"{effects['buff']}:0:{effects['buff_duration']}"
curr_combat = await db.get_active_combat(player_id)
curr_status = curr_combat.get('npc_status_effects', '') if curr_combat else ''
new_status = curr_status + f"|{buff_str}" if curr_status else buff_str
await db.update_combat(player_id, {'npc_status_effects': new_status})
messages.append(create_combat_message("skill_buff", origin="enemy", skill_name=get_locale_string(skill.name, locale), skill_icon=skill.icon, duration=effects['buff_duration'], npc_name=npc_def.name))
if 'damage_multiplier' not in effects and 'poison_damage' not in effects:
npc_damage = 0
if 'damage_multiplier' in effects:
npc_damage = max(1, int(npc_damage * effects['damage_multiplier']))
from api.services.helpers import calculate_dynamic_status_damage
poison_dmg = calculate_dynamic_status_damage(effects, 'poison', player)
if poison_dmg is not None:
await db.add_effect(player_id=player_id, effect_name="Poison", effect_icon="🧪", effect_type="damage", damage_per_tick=poison_dmg, ticks_remaining=effects.get('poison_duration', 3), persist_after_combat=True, source=f"enemy_skill:{skill.id}")
burn_dmg = calculate_dynamic_status_damage(effects, 'burn', player)
if burn_dmg is not None:
await db.add_effect(player_id=player_id, effect_name="Burning", effect_icon="🔥", effect_type="damage", damage_per_tick=burn_dmg, ticks_remaining=effects.get('burn_duration', 3), persist_after_combat=True, source=f"enemy_skill:{skill.id}")
is_enraged = combat['npc_hp'] / combat['npc_max_hp'] < 0.3
if is_enraged and npc_damage > 0:
npc_damage = int(npc_damage * 1.5)
messages.append(create_combat_message("enemy_enraged", origin="enemy", npc_name=npc_def.name))
curr_combat = await db.get_active_combat(player_id)
curr_status = curr_combat.get('npc_status_effects', '') if curr_combat else ''
if 'berserker_rage' in curr_status and npc_damage > 0:
npc_damage = int(npc_damage * 1.5)
if npc_damage > 0:
dodged = False
is_defending = False
player_effects = await db.get_player_effects(player_id)
defending_effect = next((e for e in player_effects if e['effect_name'] == 'defending'), None)
if defending_effect:
is_defending = True
reduction = defending_effect.get('value', 50) / 100
npc_damage = max(1, int(npc_damage * (1 - reduction)))
messages.append(create_combat_message("damage_reduced", origin="player", reduction=int(reduction * 100)))
await db.remove_effect(player_id, 'defending')
buff_dmg_reduction = player_stats.get('buff_damage_reduction', 0.0) if player_stats else 0.0
if buff_dmg_reduction > 0:
npc_damage = max(1, int(npc_damage * (1 - buff_dmg_reduction)))
messages.append(create_combat_message("damage_reduced", origin="player", reduction=int(buff_dmg_reduction * 100)))
buff_dmg_taken_increase = player_stats.get('buff_damage_taken_increase', 0.0) if player_stats else 0.0
if buff_dmg_taken_increase > 0:
npc_damage = int(npc_damage * (1 + buff_dmg_taken_increase))
if player_stats and player_stats.get('buff_guaranteed_dodge', False):
dodged = True
messages.append(create_combat_message("combat_dodge", origin="player"))
await db.remove_effect(player_id, 'evade')
elif player_stats and player_stats.get('buff_enemy_miss', False):
dodged = True
messages.append(create_combat_message("combat_dodge", origin="player"))
elif player_stats and 'dodge_chance' in player_stats and random.random() < player_stats['dodge_chance']:
dodged = True
messages.append(create_combat_message("combat_dodge", origin="player"))
if not dodged and player_stats and player_stats.get('has_shield', False) and random.random() < player_stats.get('block_chance', 0):
messages.append(create_combat_message("combat_block", origin="player"))
npc_damage = max(1, int(npc_damage * 0.2))
if not dodged:
armor_absorbed, broken_armor = await reduce_armor_func(player_id, npc_damage, is_defending)
if player_stats and player_stats.get('armor_reduction', 0) > 0:
pct_reduction = player_stats['armor_reduction']
actual_damage = max(1, int(npc_damage * (1 - pct_reduction)))
armor_absorbed_visual = npc_damage - actual_damage
else:
actual_damage = max(1, npc_damage - armor_absorbed)
armor_absorbed_visual = armor_absorbed
new_player_hp = max(0, player['hp'] - actual_damage)
if skill and 'damage_multiplier' in skill.effects:
messages.append(create_combat_message("skill_attack", origin="enemy", damage=actual_damage, skill_name=get_locale_string(skill.name, locale), skill_icon=skill.icon, hits=1))
elif is_charging:
messages.append(create_combat_message("enemy_special", origin="enemy", npc_name=npc_def.name, damage=actual_damage, armor_absorbed=armor_absorbed_visual))
else:
messages.append(create_combat_message("enemy_attack", origin="enemy", npc_name=npc_def.name, damage=actual_damage, armor_absorbed=armor_absorbed_visual))
if broken_armor:
for armor in broken_armor:
messages.append(create_combat_message("item_broken", origin="player", item_name=armor['name'], emoji=armor['emoji']))
await db.update_player(player_id, hp=new_player_hp)
player_defeated = False
if new_player_hp <= 0 and intent_type != 'defend' and intent_type != 'charge':
messages.append(create_combat_message("player_defeated", origin="neutral", npc_name=npc_def.name))
player_defeated = True
await db.update_player(player_id, hp=0, is_dead=True)
await db.update_player_statistics(player_id, deaths=1, damage_taken=actual_damage, increment=True)
await db.end_combat(player_id)
return messages, player_defeated
if actual_damage > 0:
await db.update_player_statistics(player_id, damage_taken=actual_damage, increment=True)
current_npc_hp = combat['npc_hp']
if intent_type == 'defend':
current_npc_hp = min(combat['npc_max_hp'], combat['npc_hp'] + int(combat['npc_max_hp'] * 0.05))
temp_combat_state = combat.copy()
temp_combat_state['npc_hp'] = current_npc_hp
if intent_type == 'charge':
next_intent_str = 'charging_attack'
else:
next_intent = generate_npc_intent(npc_def, temp_combat_state)
next_intent_str = f"{next_intent['type']}:{next_intent['value']}" if next_intent['type'] == 'skill' else next_intent['type']
await db.update_combat(player_id, {
'turn': 'player',
'turn_started_at': time.time(),
'npc_intent': next_intent_str
})
return messages, player_defeated