feat(backend): Integrate Derived Stats into combat, loot, and crafting mechanics

This commit is contained in:
Joan
2026-02-25 10:05:14 +01:00
parent 185781d168
commit fd94387d54
10 changed files with 727 additions and 101 deletions

View File

@@ -308,6 +308,16 @@ player_statistics = Table(
)
character_perks = Table(
"character_perks",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("character_id", Integer, ForeignKey("characters.id", ondelete="CASCADE"), nullable=False),
Column("perk_id", String(50), nullable=False),
Column("acquired_at", Float, nullable=False),
UniqueConstraint("character_id", "perk_id", name="uix_character_perk")
)
# ========================================================================
# QUESTS AND TRADE TABLES
# ========================================================================
@@ -2937,3 +2947,47 @@ async def acknowledge_pvp_combat(combat_id: int, player_id: int) -> bool:
await session.commit()
return True
# ========================================================================
# CHARACTER PERKS
# ========================================================================
async def get_character_perks(character_id: int) -> list:
"""Get all perks owned by a character."""
async with DatabaseSession() as session:
stmt = select(character_perks).where(
character_perks.c.character_id == character_id
)
result = await session.execute(stmt)
return [dict(row._mapping) for row in result.fetchall()]
async def add_character_perk(character_id: int, perk_id: str) -> bool:
"""Add a perk to a character. Returns False if already owned."""
import time
async with DatabaseSession() as session:
try:
stmt = insert(character_perks).values(
character_id=character_id,
perk_id=perk_id,
acquired_at=time.time()
)
await session.execute(stmt)
await session.commit()
return True
except Exception:
return False
async def remove_character_perk(character_id: int, perk_id: str) -> bool:
"""Remove a perk from a character."""
async with DatabaseSession() as session:
stmt = delete(character_perks).where(
character_perks.c.character_id == character_id,
character_perks.c.perk_id == perk_id
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount > 0

View File

@@ -208,12 +208,17 @@ async def interact_with_object(
items_dropped = []
damage_taken = outcome.damage_taken
# Calculate current capacity
# Calculate current capacity and fetch derived stats
from api.services.helpers import calculate_player_capacity
from api.services.stats import calculate_derived_stats
from api.items import items_manager as ITEMS_MANAGER
inventory = await db.get_inventory(player_id)
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(inventory, ITEMS_MANAGER)
stats = await calculate_derived_stats(player_id)
loot_quality = stats.get('loot_quality', 1.0)
# Add items to inventory (or drop if over capacity)
for item_id, quantity in outcome.items_reward.items():
item = items_manager.get_item(item_id)
@@ -258,7 +263,12 @@ async def interact_with_object(
await db.drop_item_to_world(item_id, 1, player['location_id'], unique_item_id=unique_item_id)
items_dropped.append(f"{emoji} {item_name}")
else:
# Stackable items - process as before
# Stackable items - apply loot quality bonus for resources and consumables
if getattr(item, 'category', item.type) in ['resource', 'consumable'] and loot_quality > 1.0:
bonus_chance = loot_quality - 1.0
if random.random() < bonus_chance:
quantity += 1
item_weight = item.weight * quantity
item_volume = item.volume * quantity
@@ -312,6 +322,15 @@ async def use_item(player_id: int, item_id: str, items_manager, locale: str = 'e
if not player:
return {"success": False, "message": "Player not found"}
# Get derived stats for item effectiveness
# In some paths redis_manager might not be injected, so we attempt to fetch it from websockets module if needed,
# or let stats service fetch without cache
from api.services.stats import calculate_derived_stats
import api.core.websockets as ws
redis_mgr = getattr(ws.manager, 'redis_manager', None)
stats = await calculate_derived_stats(player['id'], redis_mgr)
item_effectiveness = stats.get('item_effectiveness', 1.0)
# Check if player has the item
inventory = await db.get_inventory(player_id)
item_entry = None
@@ -385,7 +404,8 @@ async def use_item(player_id: int, item_id: str, items_manager, locale: str = 'e
# 3. Direct Healing (Legacy/Instant)
if 'hp_restore' in item.effects:
hp_restore = item.effects['hp_restore']
base_hp_restore = item.effects['hp_restore']
hp_restore = int(base_hp_restore * item_effectiveness)
old_hp = player['hp']
new_hp = min(player['max_hp'], old_hp + hp_restore)
actual_restored = new_hp - old_hp
@@ -395,7 +415,8 @@ async def use_item(player_id: int, item_id: str, items_manager, locale: str = 'e
effects_msg.append(f"+{actual_restored} HP")
if 'stamina_restore' in item.effects:
stamina_restore = item.effects['stamina_restore']
base_stamina_restore = item.effects['stamina_restore']
stamina_restore = int(base_stamina_restore * item_effectiveness)
old_stamina = player['stamina']
new_stamina = min(player['max_stamina'], old_stamina + stamina_restore)
actual_restored = new_stamina - old_stamina
@@ -532,6 +553,14 @@ async def check_and_apply_level_up(player_id: int) -> Dict[str, Any]:
unspent_points=new_unspent_points
)
# Invalidate cached derived stats (level affects max_hp, max_stamina, attack_power, etc.)
from api.services.stats import invalidate_stats_cache
try:
from api.core.websockets import manager as ws_manager
await invalidate_stats_cache(player_id, getattr(ws_manager, 'redis_manager', None))
except Exception:
pass
return {
"leveled_up": True,
"new_level": current_level,
@@ -588,7 +617,7 @@ def generate_npc_intent(npc_def, combat_state: dict) -> dict:
return intent
async def npc_attack(player_id: int, combat: dict, npc_def, reduce_armor_func) -> Tuple[List[dict], bool]:
async def npc_attack(player_id: int, combat: dict, npc_def, reduce_armor_func, player_stats: dict = None) -> Tuple[List[dict], bool]:
"""
Execute NPC turn based on PREVIOUS intent, then generate NEXT intent.
Returns: (messages_list, player_defeated)
@@ -603,21 +632,38 @@ async def npc_attack(player_id: int, combat: dict, npc_def, reduce_armor_func) -
npc_hp = combat['npc_hp']
npc_max_hp = combat['npc_max_hp']
npc_status_str = combat.get('npc_status_effects', '')
is_stunned = False
if npc_status_str:
# Parse status: "bleeding:5:3" (name:dmg:ticks) or multiple "bleeding:5:3|burning:2:2"
# Parse status: "bleeding:5:3" (name:dmg:ticks) or "stun:1"
# Handling multiple effects separated by |
effects_list = npc_status_str.split('|')
active_effects = []
npc_damage_taken = 0
npc_healing_received = 0
is_stunned = False
for effect_str in effects_list:
if not effect_str: continue
try:
parts = effect_str.split(':')
name = parts[0]
if name == 'stun' and len(parts) >= 2:
ticks = int(parts[1])
if ticks > 0:
is_stunned = True
messages.append(create_combat_message(
"skill_effect",
origin="enemy",
message=f"💫 {npc_def.name} is stunned and cannot act!"
))
ticks -= 1
if ticks > 0:
active_effects.append(f"stun:{ticks}")
continue
if len(parts) >= 3:
name = parts[0]
dmg = int(parts[1])
ticks = int(parts[2])
@@ -698,7 +744,7 @@ async def npc_attack(player_id: int, combat: dict, npc_def, reduce_armor_func) -
actual_damage = 0
# EXECUTE INTENT
if npc_hp > 0: # Only attack if alive
if npc_hp > 0 and not is_stunned: # Only attack if alive and not stunned
if intent_type == 'defend':
# NPC defends - heals 5% HP
heal_amount = int(combat['npc_max_hp'] * 0.05)
@@ -765,28 +811,65 @@ async def npc_attack(player_id: int, combat: dict, npc_def, reduce_armor_func) -
# Remove defending effect after use
await db.remove_effect(player_id, 'defending')
armor_absorbed, broken_armor = await reduce_armor_func(player_id, npc_damage)
actual_damage = max(1, npc_damage - armor_absorbed)
new_player_hp = max(0, player['hp'] - actual_damage)
messages.append(create_combat_message(
"enemy_attack",
origin="enemy",
npc_name=npc_def.name,
damage=npc_damage,
armor_absorbed=armor_absorbed
))
if broken_armor:
for armor in broken_armor:
# Check for dodge
dodged = False
if player_stats and 'dodge_chance' in player_stats:
if random.random() < player_stats['dodge_chance']:
dodged = True
messages.append(create_combat_message(
"item_broken",
origin="player",
item_name=armor['name'],
emoji=armor['emoji']
"combat_dodge",
origin="player"
))
# Prevent damage calculation
actual_damage = 0
new_player_hp = player['hp']
# Check for block (if shield is equipped)
blocked = False
if not dodged and player_stats and player_stats.get('has_shield', False):
if random.random() < player_stats.get('block_chance', 0):
blocked = True
messages.append(create_combat_message(
"combat_block",
origin="player"
))
# Apply blocked effect (damage reduced significantly or nullified)
npc_damage = max(1, int(npc_damage * 0.2)) # Block mitigates 80% damage
await db.update_player(player_id, hp=new_player_hp)
if not dodged:
# Calculate armor durability loss based on PRE-reduction damage
armor_absorbed, broken_armor = await reduce_armor_func(player_id, npc_damage)
# If player_stats provides a percentage reduction, apply it instead of raw absorption
if player_stats and player_stats.get('armor_reduction', 0) > 0:
pct_reduction = player_stats['armor_reduction']
actual_damage = max(1, int(npc_damage * (1 - pct_reduction)))
# Still show "armor_absorbed" conceptually for UI logs, though it's % based now
armor_absorbed_visual = npc_damage - actual_damage
else:
actual_damage = max(1, npc_damage - armor_absorbed)
armor_absorbed_visual = armor_absorbed
new_player_hp = max(0, player['hp'] - actual_damage)
messages.append(create_combat_message(
"enemy_attack",
origin="enemy",
npc_name=npc_def.name,
damage=actual_damage,
armor_absorbed=armor_absorbed_visual
))
if broken_armor and not dodged:
for armor in broken_armor:
messages.append(create_combat_message(
"item_broken",
origin="player",
item_name=armor['name'],
emoji=armor['emoji']
))
await db.update_player(player_id, hp=new_player_hp)
# GENERATE NEXT INTENT

View File

@@ -31,6 +31,8 @@ class Item:
tier: int = 1 # Item tier (1-5)
encumbrance: int = 0 # Encumbrance penalty when equipped
weapon_effects: Dict[str, Any] = None # Weapon effects: bleeding, stun, etc.
weapon_type: str = None # e.g. 'two_handed', 'one_handed', 'dagger', 'bow'
equip_requirements: Dict[str, int] = None # e.g. {'level': 15, 'strength': 20}
# Repair system
repairable: bool = False # Can this item be repaired?
repair_materials: list = None # Materials needed for repair
@@ -72,6 +74,8 @@ class Item:
self.uncraft_tools = []
if self.combat_effects is None:
self.combat_effects = {}
if self.equip_requirements is None:
self.equip_requirements = {}
class ItemsManager:
@@ -139,7 +143,9 @@ class ItemsManager:
uncraft_tools=item_data.get('uncraft_tools', []),
combat_usable=item_data.get('combat_usable', is_consumable), # Default: consumables are combat usable
combat_only=item_data.get('combat_only', False),
combat_effects=item_data.get('combat_effects', {})
combat_effects=item_data.get('combat_effects', {}),
weapon_type=item_data.get('weapon_type'),
equip_requirements=item_data.get('equip_requirements', {})
)
self.items[item_id] = item

View File

@@ -186,7 +186,7 @@ except Exception as e:
# Initialize routers with game data dependencies
game_routes.init_router_dependencies(LOCATIONS, ITEMS_MANAGER, WORLD, redis_manager)
combat.init_router_dependencies(LOCATIONS, ITEMS_MANAGER, WORLD, redis_manager, QUESTS_DATA)
equipment.init_router_dependencies(LOCATIONS, ITEMS_MANAGER, WORLD)
equipment.init_router_dependencies(LOCATIONS, ITEMS_MANAGER, WORLD, redis_manager)
crafting.init_router_dependencies(LOCATIONS, ITEMS_MANAGER, WORLD)
loot.init_router_dependencies(LOCATIONS, ITEMS_MANAGER, WORLD)
statistics.init_router_dependencies(LOCATIONS, ITEMS_MANAGER, WORLD)

View File

@@ -250,6 +250,10 @@ async def combat_action(
player = current_user # current_user is already the character dict
npc_def = NPCS.get(combat['npc_id'])
# Get player derived stats
from ..services.stats import calculate_derived_stats
stats = await calculate_derived_stats(player['id'], redis_manager)
messages = []
combat_over = False
@@ -290,7 +294,7 @@ async def combat_action(
elif total_impact < 0:
# HEALING
heal = abs(total_impact)
new_hp = min(player['max_hp'], player['hp'] + heal)
new_hp = min(stats['max_hp'], player['hp'] + heal)
actual_heal = new_hp - player['hp']
if actual_heal > 0:
@@ -307,26 +311,20 @@ async def combat_action(
if req.action == 'attack':
# Calculate player damage
base_damage = 5
strength_bonus = player['strength'] // 2
level_bonus = player['level']
weapon_damage = 0
# Calculate player damage using derived stats
base_damage = stats.get('attack_power', 5)
weapon_effects = {}
weapon_inv_id = None
# Check for equipped weapon
# Check for equipped weapon to apply durability loss and effects
# (Attack power from the weapon is already included in stats['attack_power'])
equipment = await db.get_all_equipment(player['id'])
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def and weapon_def.stats:
weapon_damage = random.randint(
weapon_def.stats.get('damage_min', 0),
weapon_def.stats.get('damage_max', 0)
)
if weapon_def:
weapon_effects = weapon_def.weapon_effects if hasattr(weapon_def, 'weapon_effects') else {}
weapon_inv_id = weapon_slot['item_id']
@@ -339,7 +337,7 @@ async def combat_action(
attack_failed = True
variance = random.randint(-2, 2)
damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
damage = max(1, base_damage + variance)
if attack_failed:
messages.append(create_combat_message(
@@ -349,12 +347,31 @@ async def combat_action(
))
new_npc_hp = combat['npc_hp']
else:
# Check for critical hit
is_critical = False
crit_chance = stats.get('crit_chance', 0.05)
if random.random() < crit_chance:
is_critical = True
damage = int(damage * stats.get('crit_damage', 1.5))
# Apply NPC defense reduction
npc_defense = getattr(npc_def, 'defense', 0)
actual_damage = max(1, damage - npc_defense)
# Apply damage to NPC
new_npc_hp = max(0, combat['npc_hp'] - damage)
new_npc_hp = max(0, combat['npc_hp'] - actual_damage)
if is_critical:
messages.append(create_combat_message(
"combat_crit",
origin="player"
))
messages.append(create_combat_message(
"player_attack",
origin="player",
damage=damage
damage=actual_damage,
armor_absorbed=npc_defense if npc_defense > 0 else 0
))
# Apply weapon effects
@@ -715,36 +732,278 @@ async def combat_action(
await db.update_player_statistics(player['id'], failed_flees=1, damage_taken=npc_damage, increment=True)
await db.update_combat(player['id'], {'turn': 'player', 'turn_started_at': time.time()})
elif req.action == 'defend':
# Apply "defending" status effect - reduces incoming damage by 50% for 1 turn
await db.add_effect(
player_id=player['id'],
effect_name='defending',
effect_icon='🛡️',
effect_type='buff',
value=50, # 50% damage reduction
ticks_remaining=1,
persist_after_combat=False,
source='action:defend'
)
elif req.action == 'skill':
# ── SKILL ACTION ──
if not req.skill_id:
raise HTTPException(status_code=400, detail="skill_id required for skill action")
messages.append(create_combat_message(
"defend",
origin="player",
message=get_game_message('defend_text', locale, name=player['name'])
))
from ..services.skills import skills_manager
skill = skills_manager.get_skill(req.skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
# NPC's turn after defend
npc_attack_messages, player_defeated = await game_logic.npc_attack(
player['id'],
{'npc_hp': combat['npc_hp'], 'npc_max_hp': combat['npc_max_hp']},
npc_def,
reduce_armor_durability
)
messages.extend(npc_attack_messages)
if player_defeated:
await db.remove_non_persistent_effects(player['id'])
# Check unlocked
stat_val = player.get(skill.stat_requirement, 0)
if stat_val < skill.stat_threshold or player['level'] < skill.level_requirement:
raise HTTPException(status_code=400, detail="Skill not unlocked")
# Check cooldown
active_effects = await db.get_player_effects(player['id'])
cd_source = f"cd:{skill.id}"
for eff in active_effects:
if eff.get('source') == cd_source and eff.get('ticks_remaining', 0) > 0:
raise HTTPException(status_code=400, detail=f"Skill on cooldown ({eff['ticks_remaining']} turns)")
# Check stamina
if player['stamina'] < skill.stamina_cost:
raise HTTPException(status_code=400, detail="Not enough stamina")
# Deduct stamina
new_stamina = player['stamina'] - skill.stamina_cost
await db.update_player_stamina(player['id'], new_stamina)
player['stamina'] = new_stamina
# Add cooldown effect
if skill.cooldown > 0:
await db.add_effect(
player_id=player['id'],
effect_name=f"{skill.id}_cooldown",
effect_icon="",
effect_type="cooldown",
value=0,
ticks_remaining=skill.cooldown,
persist_after_combat=False,
source=cd_source
)
# Get weapon info
equipment = await db.get_all_equipment(player['id'])
weapon_damage = 0
inv_item = None
weapon_inv_id = None
weapon_def = None
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def and weapon_def.stats:
weapon_damage = random.randint(
weapon_def.stats.get('damage_min', 0),
weapon_def.stats.get('damage_max', 0)
)
weapon_inv_id = inv_item['id']
effects = skill.effects
new_npc_hp = combat['npc_hp']
combat_over = False
player_won = False
# ── Damage skills ──
if 'damage_multiplier' in effects:
base_damage = 5
strength_bonus = int(player['strength'] * 1.5)
level_bonus = player['level']
variance = random.randint(-2, 2)
raw_damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
multiplier = effects['damage_multiplier']
# Execute check
if 'execute_threshold' in effects:
npc_hp_pct = combat['npc_hp'] / combat['npc_max_hp'] if combat['npc_max_hp'] > 0 else 1
if npc_hp_pct <= effects['execute_threshold']:
multiplier = effects.get('execute_multiplier', multiplier)
# Exploit Weakness check
if effects.get('requires_analyzed'):
# Check if NPC has been analyzed this combat
analyzed = combat.get('npc_status_effects', '') or ''
if 'analyzed' not in analyzed:
multiplier = 1.0 # No bonus if not analyzed
damage = max(1, int(raw_damage * multiplier))
# Guaranteed crit
if effects.get('guaranteed_crit'):
damage = int(damage * 1.5)
# Multi-hit
num_hits = effects.get('hits', 1)
total_damage = 0
for hit in range(num_hits):
hit_dmg = damage if hit == 0 else max(1, int(raw_damage * multiplier))
# Armor penetration
npc_defense = getattr(npc_def, 'defense', 0)
if 'armor_penetration' in effects:
npc_defense = int(npc_defense * (1 - effects['armor_penetration']))
actual_hit = max(1, hit_dmg - npc_defense)
total_damage += actual_hit
new_npc_hp = max(0, new_npc_hp - actual_hit)
messages.append(create_combat_message(
"skill_attack",
origin="player",
damage=total_damage,
skill_name=skill.name,
skill_icon=skill.icon,
hits=num_hits
))
# Lifesteal
if 'lifesteal' in effects:
heal_amount = int(total_damage * effects['lifesteal'])
new_hp = min(player['max_hp'], player['hp'] + heal_amount)
if new_hp > player['hp']:
await db.update_player_hp(player['id'], new_hp)
player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=heal_amount, skill_icon="🩸"
))
# Poison DoT
if 'poison_damage' in effects:
poison_str = f"poison:{effects['poison_damage']}:{effects['poison_duration']}"
existing = combat.get('npc_status_effects', '') or ''
if existing:
existing += '|' + poison_str
else:
existing = poison_str
await db.update_combat(player['id'], {'npc_status_effects': existing})
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"🧪 Poisoned! ({effects['poison_damage']} dmg/turn)"
))
# Stun chance
if 'stun_chance' in effects and random.random() < effects['stun_chance']:
messages.append(create_combat_message(
"skill_effect", origin="player", message="💫 Stunned!"
))
# Weapon durability
if weapon_inv_id and inv_item and inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
if new_durability is None:
messages.append(create_combat_message("weapon_broke", origin="player", item_name=weapon_def.name if weapon_def else "weapon"))
await db.unequip_item(player['id'], 'weapon')
# ── Heal skills ──
if 'heal_percent' in effects:
heal_amount = int(player['max_hp'] * effects['heal_percent'])
new_hp = min(player['max_hp'], player['hp'] + heal_amount)
actual = new_hp - player['hp']
if actual > 0:
await db.update_player_hp(player['id'], new_hp)
player['hp'] = new_hp
messages.append(create_combat_message(
"skill_heal", origin="player", heal=actual, skill_name=skill.name, skill_icon=skill.icon
))
# ── Stamina restore skills ──
if 'stamina_restore_percent' in effects:
restore = int(player['max_stamina'] * effects['stamina_restore_percent'])
new_stam = min(player['max_stamina'], player['stamina'] + restore)
actual = new_stam - player['stamina']
if actual > 0:
await db.update_player_stamina(player['id'], new_stam)
player['stamina'] = new_stam
messages.append(create_combat_message(
"skill_effect", origin="player", message=f"⚡ +{actual} Stamina"
))
# ── Buff skills ──
if 'buff' in effects:
buff_name_raw = effects['buff']
duration = effects.get('buff_duration', 2)
value = 0
if 'damage_reduction' in effects:
value = int(effects['damage_reduction'] * 100)
elif 'damage_bonus' in effects:
value = int(effects['damage_bonus'] * 100)
await db.add_effect(
player_id=player['id'],
effect_name=buff_name_raw,
effect_icon=skill.icon,
effect_type='buff',
value=value,
ticks_remaining=duration,
persist_after_combat=False,
source=f'skill:{skill.id}'
)
messages.append(create_combat_message(
"skill_buff", origin="player",
skill_name=skill.name, skill_icon=skill.icon, duration=duration
))
# ── Analyze skill ──
if effects.get('mark_analyzed'):
existing = combat.get('npc_status_effects', '') or ''
if 'analyzed' not in existing:
if existing:
existing += '|analyzed:0:99'
else:
existing = 'analyzed:0:99'
await db.update_combat(player['id'], {'npc_status_effects': existing})
npc_hp_pct = int((combat['npc_hp'] / combat['npc_max_hp']) * 100) if combat['npc_max_hp'] > 0 else 0
intent = combat.get('npc_intent', 'attack')
messages.append(create_combat_message(
"skill_analyze", origin="player",
skill_icon=skill.icon,
npc_name=npc_def.name,
npc_hp_pct=npc_hp_pct,
npc_intent=intent
))
# Check NPC death
if new_npc_hp <= 0:
messages.append(create_combat_message("victory", origin="neutral", npc_name=npc_def.name))
combat_over = True
player_won = True
# Award XP
xp_reward = npc_def.xp_reward
current_xp = player['xp'] + xp_reward
await db.update_player(player['id'], xp=current_xp)
player['xp'] = current_xp
messages.append(create_combat_message("xp_gained", origin="neutral", xp=xp_reward))
await db.update_player_statistics(player['id'], enemies_killed=1, increment=True)
# Level up check
level_result = await game_logic.check_and_apply_level_up(player['id'])
if level_result['leveled_up']:
messages.append(create_combat_message("level_up", origin="neutral", new_level=level_result['new_level']))
# Loot
loot_items = npc_def.loot if hasattr(npc_def, 'loot') else []
generated_loot = []
if loot_items:
for loot in loot_items:
if random.random() < loot.get('chance', 1.0):
qty = random.randint(loot.get('min', 1), loot.get('max', 1))
# Only append message in combat log, actual items are in corpse
messages.append(create_combat_message("loot", origin="neutral", item_id=loot['item_id'], quantity=qty))
generated_loot.append({"item_id": loot['item_id'], "quantity": qty})
# Create corpse
import json
await db.create_npc_corpse(
combat['npc_id'],
combat.get('location_id', player.get('location_id', '')),
json.dumps(generated_loot)
)
await db.remove_non_persistent_effects(player['id'])
messages.extend(npc_attack_messages)
if player_defeated:
await db.remove_non_persistent_effects(player['id'])
combat_over = True
elif req.action == 'use_item':
combat_over = False
@@ -791,9 +1050,10 @@ async def combat_action(
# 1. Apply Status Effects (e.g. Regeneration from Bandage)
if item_def.effects.get('status_effect'):
status_data = item_def.effects['status_effect']
status_name = status_data['name']
await db.add_effect(
player_id=player['id'],
effect_name=status_data['name'],
effect_name=status_name,
effect_icon=status_data.get('icon', ''),
effect_type=status_data.get('type', 'buff'),
damage_per_tick=status_data.get('damage_per_tick', 0),
@@ -813,7 +1073,10 @@ async def combat_action(
# 3. Handle Direct healing (legacy/instant)
if item_def.effects.get('hp_restore') and item_def.effects['hp_restore'] > 0:
hp_restore = item_def.effects['hp_restore']
item_effectiveness = stats.get('item_effectiveness', 1.0)
base_hp_restore = item_def.effects['hp_restore']
hp_restore = int(base_hp_restore * item_effectiveness)
old_hp = player['hp']
new_hp = min(player.get('max_hp', 100), old_hp + hp_restore)
actual_restored = new_hp - old_hp
@@ -821,8 +1084,11 @@ async def combat_action(
await db.update_player_hp(player['id'], new_hp)
effects_applied.append(f"+{actual_restored} HP")
if item_def.effects.get('stamina_restore'):
stamina_restore = item_def.effects['stamina_restore']
if item_def.effects.get('stamina_restore') and item_def.effects['stamina_restore'] > 0:
item_effectiveness = stats.get('item_effectiveness', 1.0)
base_stamina_restore = item_def.effects['stamina_restore']
stamina_restore = int(base_stamina_restore * item_effectiveness)
old_stamina = player['stamina']
new_stamina = min(player.get('max_stamina', 100), old_stamina + stamina_restore)
actual_restored = new_stamina - old_stamina
@@ -1259,6 +1525,12 @@ async def pvp_combat_action(
current_player = attacker if is_attacker else defender
opponent = defender if is_attacker else attacker
# Get derived stats for both players
from ..services.stats import calculate_derived_stats
current_player_stats = await calculate_derived_stats(current_player['id'], redis_manager)
# Opponent stats won't be used for attack calculation but could be used for defense logic
# opponent_stats = await calculate_derived_stats(opponent['id'], redis_manager)
messages = []
combat_over = False
winner_id = None
@@ -1267,24 +1539,18 @@ async def pvp_combat_action(
last_action_text = ""
if req.action == 'attack':
# Calculate damage (similar to PvE)
base_damage = 5
strength_bonus = current_player['strength'] * 2
level_bonus = current_player['level']
# Calculate damage (unified formula with derived stats)
base_damage = current_player_stats.get('attack_power', 5)
# Check for equipped weapon
weapon_damage = 0
# Check for equipped weapon to apply durability loss
# (Attack power from the weapon is already included in stats['attack_power'])
equipment = await db.get_all_equipment(current_player['id'])
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def and weapon_def.stats:
weapon_damage = random.randint(
weapon_def.stats.get('damage_min', 0),
weapon_def.stats.get('damage_max', 0)
)
if weapon_def:
# Decrease weapon durability
if inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
@@ -1297,17 +1563,31 @@ async def pvp_combat_action(
await db.unequip_item(current_player['id'], 'weapon')
variance = random.randint(-2, 2)
damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
damage = max(1, base_damage + variance)
# Check for critical hit
is_critical = False
crit_chance = current_player_stats.get('crit_chance', 0.05)
if random.random() < crit_chance:
is_critical = True
damage = int(damage * current_player_stats.get('crit_damage', 1.5))
# Apply armor reduction and durability loss to opponent
armor_absorbed, broken_armor = await reduce_armor_durability(opponent['id'], damage)
actual_damage = max(1, damage - armor_absorbed)
if is_critical:
messages.append(create_combat_message(
"combat_crit",
origin="player"
))
last_action_text += f"\nCritical Hit! "
# Structure the attack message
messages.append(create_combat_message(
"player_attack",
origin="player",
damage=damage,
damage=actual_damage,
armor_absorbed=armor_absorbed
))

View File

@@ -179,6 +179,11 @@ async def craft_item(request: CraftItemRequest, current_user: dict = Depends(get
if not player:
raise HTTPException(status_code=404, detail="Player not found")
# Get derived stats for crafting bonus
from ..services.stats import calculate_derived_stats
stats = await calculate_derived_stats(player['id'], redis_manager)
crafting_bonus = stats.get('crafting_bonus', 0.0)
location_id = player['location_id']
location = LOCATIONS.get(location_id)
@@ -287,11 +292,13 @@ async def craft_item(request: CraftItemRequest, current_user: dict = Depends(get
if hasattr(item_def, 'durability') and item_def.durability:
# This is a unique item - generate random stats
base_durability = item_def.durability
# Random durability: 90-110% of base
random_durability = int(base_durability * random.uniform(0.9, 1.1))
# Generate tier based on durability roll
durability_percent = (random_durability / base_durability)
# Random durability: 90-110% of base, plus crafting_bonus (e.g. +0.05 from Intellect)
base_roll = random.uniform(0.9, 1.1)
durability_percent = base_roll + crafting_bonus
random_durability = int(base_durability * durability_percent)
# Generate tier based on the final durability percentage
if durability_percent >= 1.08:
tier = 5 # Gold
elif durability_percent >= 1.04:
@@ -308,8 +315,9 @@ async def craft_item(request: CraftItemRequest, current_user: dict = Depends(get
if hasattr(item_def, 'stats') and item_def.stats:
for stat_key, stat_value in item_def.stats.items():
if isinstance(stat_value, (int, float)):
# Random stat: 90-110% of base
random_stats[stat_key] = int(stat_value * random.uniform(0.9, 1.1))
# Random stat: same multiplier logic applied to base stats
stat_percent = random.uniform(0.9, 1.1) + crafting_bonus
random_stats[stat_key] = int(stat_value * stat_percent)
else:
random_stats[stat_key] = stat_value

View File

@@ -24,13 +24,15 @@ logger = logging.getLogger(__name__)
LOCATIONS = None
ITEMS_MANAGER = None
WORLD = None
redis_manager = None
def init_router_dependencies(locations, items_manager, world):
def init_router_dependencies(locations, items_manager, world, redis_mgr=None):
"""Initialize router with game data dependencies"""
global LOCATIONS, ITEMS_MANAGER, WORLD
global LOCATIONS, ITEMS_MANAGER, WORLD, redis_manager
LOCATIONS = locations
ITEMS_MANAGER = items_manager
WORLD = world
redis_manager = redis_mgr
router = APIRouter(tags=["equipment"])
@@ -62,6 +64,25 @@ async def equip_item(
if not item_def.equippable or not item_def.slot:
raise HTTPException(status_code=400, detail="This item cannot be equipped")
# Check equipment requirements (level + stat gates)
if item_def.equip_requirements:
player = await db.get_player_by_id(player_id) if 'level' not in current_user else current_user
req_level = item_def.equip_requirements.get('level', 0)
if player.get('level', 1) < req_level:
raise HTTPException(
status_code=400,
detail=get_game_message('equip_level_required', locale, level=req_level)
)
for stat_name in ['strength', 'agility', 'endurance', 'intellect']:
req_value = item_def.equip_requirements.get(stat_name, 0)
if req_value > 0 and player.get(stat_name, 0) < req_value:
raise HTTPException(
status_code=400,
detail=get_game_message('equip_stat_required', locale, stat=stat_name.capitalize(), value=req_value)
)
# Check if slot is valid
valid_slots = ['head', 'torso', 'legs', 'feet', 'weapon', 'offhand', 'backpack']
if item_def.slot not in valid_slots:
@@ -113,6 +134,10 @@ async def equip_item(
else:
message = get_game_message('equipped', locale, item=get_locale_string(item_def.name, locale))
# Invalidate cached derived stats (equipment changed)
from ..services.stats import invalidate_stats_cache
await invalidate_stats_cache(player_id, redis_manager)
return {
"success": True,
"message": message,
@@ -192,6 +217,10 @@ async def unequip_item(
await db.drop_item(player_id, inv_item['item_id'], 1, current_user['location_id'])
await db.remove_from_inventory(player_id, inv_item['item_id'], 1)
# Invalidate cached derived stats
from ..services.stats import invalidate_stats_cache
await invalidate_stats_cache(player_id, redis_manager)
return {
"success": True,
"message": get_game_message('unequip_dropped', locale, item=get_locale_string(item_def.name, locale)),
@@ -202,6 +231,10 @@ async def unequip_item(
await db.unequip_item(player_id, unequip_req.slot)
await db.update_inventory_item(equipped['item_id'], is_equipped=False)
# Invalidate cached derived stats
from ..services.stats import invalidate_stats_cache
await invalidate_stats_cache(player_id, redis_manager)
return {
"success": True,
"message": get_game_message('unequipped', locale, item=get_locale_string(item_def.name, locale)),

View File

@@ -17,6 +17,7 @@ from .. import database as db
from ..items import ItemsManager
from .. import game_logic
from ..core.websockets import manager
from ..services.stats import STAT_CAP, invalidate_stats_cache
logger = logging.getLogger(__name__)
@@ -457,19 +458,28 @@ async def spend_stat_point(
if stat not in valid_stats:
raise HTTPException(status_code=400, detail=f"Invalid stat. Must be one of: {', '.join(valid_stats)}")
# Check stat cap
if player[stat] >= STAT_CAP:
raise HTTPException(status_code=400, detail=f"{stat.capitalize()} is already at maximum ({STAT_CAP})")
# Update the stat and decrease unspent points
update_data = {
stat: player[stat] + 1,
'unspent_points': player['unspent_points'] - 1
}
# Endurance increases max HP
# Endurance increases max HP and max stamina
if stat == 'endurance':
update_data['max_hp'] = player['max_hp'] + 5
update_data['hp'] = min(player['hp'] + 5, update_data['max_hp']) # Also heal by 5
update_data['max_stamina'] = player['max_stamina'] + 2
update_data['stamina'] = min(player['stamina'] + 2, update_data['max_stamina']) # Also restore by 2
await db.update_character(current_user['id'], **update_data)
# Invalidate cached derived stats
await invalidate_stats_cache(current_user['id'], redis_manager)
return {
"success": True,
"message": f"Increased {stat} by 1!",
@@ -1550,4 +1560,139 @@ async def drop_item(
return {
"success": True,
"message": get_game_message('dropped_item_success', locale, emoji=item_def.emoji, name=get_locale_string(item_def.name, locale), qty=quantity)
}
}
@router.get("/api/game/character-sheet")
async def get_character_sheet(current_user: dict = Depends(get_current_user)):
"""Get the full character sheet with base stats, derived stats, skills, and perks."""
from ..services.stats import calculate_derived_stats
from ..services.skills import skills_manager, perks_manager, get_total_perk_points
player = current_user
character_id = player['id']
# Get derived stats
derived = await calculate_derived_stats(character_id, redis_manager)
# Get available skills
available_skills = skills_manager.get_available_skills(player)
# Get owned perks
owned_perks_rows = await db.get_character_perks(character_id)
owned_perk_ids = [row['perk_id'] for row in owned_perks_rows]
# Get all perks with availability
all_perks = perks_manager.get_available_perks(player, owned_perk_ids)
# Calculate perk points
total_perk_points = get_total_perk_points(player['level'])
used_perk_points = len(owned_perk_ids)
available_perk_points = total_perk_points - used_perk_points
return {
"base_stats": {
"strength": player['strength'],
"agility": player['agility'],
"endurance": player['endurance'],
"intellect": player['intellect'],
"unspent_points": player['unspent_points'],
"stat_cap": STAT_CAP,
},
"derived_stats": derived,
"skills": available_skills,
"perks": {
"available_points": available_perk_points,
"total_points": total_perk_points,
"used_points": used_perk_points,
"all_perks": all_perks,
},
"character": {
"name": player['name'],
"level": player['level'],
"xp": player['xp'],
"hp": player['hp'],
"max_hp": player['max_hp'],
"stamina": player['stamina'],
"max_stamina": player['max_stamina'],
"avatar_data": player.get('avatar_data'),
}
}
@router.post("/api/game/select_perk")
async def select_perk(
perk_id: str,
current_user: dict = Depends(get_current_user)
):
"""Select a perk for the character."""
from ..services.skills import perks_manager, get_total_perk_points
player = current_user
character_id = player['id']
# Check perk exists
perk = perks_manager.get_perk(perk_id)
if not perk:
raise HTTPException(status_code=404, detail="Perk not found")
# Check perk points available
owned_perks = await db.get_character_perks(character_id)
owned_perk_ids = [row['perk_id'] for row in owned_perks]
total_points = get_total_perk_points(player['level'])
used_points = len(owned_perk_ids)
if used_points >= total_points:
raise HTTPException(status_code=400, detail="No perk points available")
# Check if already owned
if perk_id in owned_perk_ids:
raise HTTPException(status_code=400, detail="Perk already selected")
# Check requirements
if not perks_manager.check_requirements(perk, player):
raise HTTPException(status_code=400, detail="Requirements not met for this perk")
# Add perk
success = await db.add_character_perk(character_id, perk_id)
if not success:
raise HTTPException(status_code=400, detail="Failed to select perk")
# Invalidate stats cache (perks affect derived stats)
await invalidate_stats_cache(character_id, redis_manager)
return {
"success": True,
"message": f"Perk '{perk_id}' selected!",
"perk": {
"id": perk.id,
"name": perk.name,
"description": perk.description,
"icon": perk.icon,
}
}
@router.get("/api/game/available-skills")
async def get_available_skills(current_user: dict = Depends(get_current_user)):
"""Get available skills for the combat UI abilities dropdown."""
from ..services.skills import skills_manager
from .. import database as db
player = current_user
all_skills = skills_manager.get_available_skills(player)
# Check cooldowns
effects = await db.get_player_effects(player['id'])
cooldowns = {eff['source']: eff['ticks_remaining'] for eff in effects if eff.get('effect_type') == 'cooldown'}
# Only return unlocked skills for the combat dropdown
unlocked = []
for s in all_skills:
if s['unlocked']:
cd_source = f"cd:{s['id']}"
s['current_cooldown'] = cooldowns.get(cd_source, 0)
unlocked.append(s)
return {"skills": unlocked}

View File

@@ -195,9 +195,13 @@ async def loot_corpse(
# Parse corpse ID
corpse_type, corpse_db_id = req.corpse_id.split('_', 1)
corpse_db_id = int(corpse_db_id)
player = current_user # current_user is already the character dict
# Get player derived stats for loot quality
from ..services.stats import calculate_derived_stats
stats = await calculate_derived_stats(player['id'], redis_manager)
loot_quality = stats.get('loot_quality', 1.0)
# Get player's current capacity
inventory = await db.get_inventory(player['id'])
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(inventory, ITEMS_MANAGER)
@@ -246,7 +250,6 @@ async def loot_corpse(
success, error_msg, tools_consumed = await consume_tool_durability(player['id'], tool_req, inventory)
if not success:
raise HTTPException(status_code=400, detail=error_msg)
# Determine quantity
quantity = random.randint(loot_item['quantity_min'], loot_item['quantity_max'])
@@ -254,6 +257,13 @@ async def loot_corpse(
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(loot_item['item_id'])
if item_def:
# Apply loot quality bonus for resources and consumables
if getattr(item_def, 'category', item_def.type) in ['resource', 'consumable'] and loot_quality > 1.0:
# e.g., loot_quality 1.15 = 15% chance for +1 extra
bonus_chance = loot_quality - 1.0
if random.random() < bonus_chance:
quantity += 1
item_weight = item_def.weight * quantity
item_volume = item_def.volume * quantity
@@ -305,11 +315,16 @@ async def loot_corpse(
if can_loot:
# Can loot this item
quantity = random.randint(loot_item['quantity_min'], loot_item['quantity_max'])
if quantity > 0:
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(loot_item['item_id'])
if item_def:
# Apply loot quality bonus for resources and consumables
if getattr(item_def, 'category', item_def.type) in ['resource', 'consumable'] and loot_quality > 1.0:
bonus_chance = loot_quality - 1.0
if random.random() < bonus_chance:
quantity += 1
item_weight = item_def.weight * quantity
item_volume = item_def.volume * quantity

View File

@@ -72,6 +72,8 @@ GAME_MESSAGES = {
'unequipped': {'en': "Unequipped {item}", 'es': "Desequipado {item}"},
'unequip_dropped': {'en': "Unequipped {item} (dropped to ground - inventory full)", 'es': "Desequipado {item} (tirado al suelo - inventario lleno)"},
'repaired_success': {'en': "Repaired {item}! Restored {amount} durability.", 'es': "¡Reparado {item}! Restaurados {amount} puntos de durabilidad."},
'equip_level_required': {'en': "Requires level {level} to equip", 'es': "Requiere nivel {level} para equipar"},
'equip_stat_required': {'en': "Requires {stat} {value} to equip", 'es': "Requiere {stat} {value} para equipar"},
# Characters/Auth
'character_created': {'en': "Character created successfully", 'es': "Personaje creado con éxito"},