Files
echoes-of-the-ash/api/game_logic.py

652 lines
25 KiB
Python

"""
Standalone game logic for the API.
Contains all game mechanics without bot dependencies.
"""
import random
import time
from typing import Dict, Any, Tuple, Optional, List
from . import database as db
from .services.helpers import get_locale_string, translate_travel_message, create_combat_message
async def move_player(player_id: int, direction: str, locations: Dict, locale: str = 'en') -> Tuple[bool, str, Optional[str], int, int]:
"""
Move player in a direction.
Returns: (success, message, new_location_id, stamina_cost, distance_meters)
"""
player = await db.get_character_by_id(player_id)
if not player:
return False, "Player not found", None, 0, 0
current_location_id = player['location_id']
current_location = locations.get(current_location_id)
if not current_location:
return False, "Current location not found", None, 0, 0
# Check if direction is valid
if direction not in current_location.exits:
return False, f"You cannot go {direction} from here.", None, 0, 0
new_location_id = current_location.exits[direction]
new_location = locations.get(new_location_id)
if not new_location:
return False, "Destination not found", None, 0, 0
# Calculate total weight and capacity
from api.items import items_manager as ITEMS_MANAGER
from api.services.helpers import calculate_player_capacity
inventory = await db.get_inventory(player_id)
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(inventory, ITEMS_MANAGER)
# Calculate distance between locations (1 coordinate unit = 100 meters)
import math
coord_distance = math.sqrt(
(new_location.x - current_location.x)**2 +
(new_location.y - current_location.y)**2
)
distance = int(coord_distance * 100) # Convert to meters, round to integer
# Calculate stamina cost: base from distance, adjusted by weight and agility
base_cost = max(1, round(distance / 50)) # 50m = 1 stamina
weight_penalty = int(current_weight / 10)
agility_reduction = int(player.get('agility', 5) / 3)
# Add over-capacity penalty (50% extra stamina cost if over limit)
over_capacity_penalty = 0
if current_weight > max_weight or current_volume > max_volume:
weight_excess_ratio = max(0, (current_weight - max_weight) / max_weight) if max_weight > 0 else 0
volume_excess_ratio = max(0, (current_volume - max_volume) / max_volume) if max_volume > 0 else 0
excess_ratio = max(weight_excess_ratio, volume_excess_ratio)
# Penalty scales from 50% to 200% based on how much over capacity
over_capacity_penalty = int((base_cost + weight_penalty) * (0.5 + min(1.5, excess_ratio)))
stamina_cost = max(1, base_cost + weight_penalty + over_capacity_penalty - agility_reduction)
# Check stamina
if player['stamina'] < stamina_cost:
return False, "You're too exhausted to move. Wait for your stamina to regenerate.", None, 0, 0
# Update player location and stamina
await db.update_character(
player_id,
location_id=new_location_id,
stamina=max(0, player['stamina'] - stamina_cost)
)
translated_location = get_locale_string(new_location.name, locale)
travel_message = translate_travel_message(direction, translated_location, locale)
return True, travel_message, new_location_id, stamina_cost, distance
async def inspect_area(player_id: int, location, interactables_data: Dict) -> str:
"""
Inspect the current area and return detailed information.
Returns formatted text with interactables and their actions.
"""
player = await db.get_player_by_id(player_id)
if not player:
return "Player not found"
# Check if player has enough stamina
if player['stamina'] < 1:
return "You're too exhausted to inspect the area thoroughly. Wait for your stamina to regenerate."
# Deduct stamina
await db.update_player_stamina(player_id, player['stamina'] - 1)
# Build inspection message
lines = [f"🔍 **Inspecting {location.name}**\n"]
lines.append(location.description)
lines.append("")
if location.interactables:
lines.append("**Interactables:**")
for interactable in location.interactables:
lines.append(f"• **{interactable.name}**")
if interactable.actions:
actions_text = ", ".join([f"{action.label} (⚡{action.stamina_cost})" for action in interactable.actions])
lines.append(f" Actions: {actions_text}")
lines.append("")
if location.npcs:
lines.append(f"**NPCs:** {', '.join(location.npcs)}")
lines.append("")
# Check for dropped items
dropped_items = await db.get_dropped_items(location.id)
if dropped_items:
lines.append("**Items on ground:**")
for item in dropped_items:
lines.append(f"{item['item_id']} x{item['quantity']}")
return "\n".join(lines)
async def interact_with_object(
player_id: int,
interactable_id: str,
action_id: str,
location,
items_manager
) -> Dict[str, Any]:
"""
Interact with an object using a specific action.
Returns: {success, message, items_found, damage_taken, stamina_cost}
"""
player = await db.get_player_by_id(player_id)
if not player:
return {"success": False, "message": "Player not found"}
# Find the interactable (match by id or instance_id)
interactable = None
for obj in location.interactables:
if obj.id == interactable_id or (hasattr(obj, 'instance_id') and obj.instance_id == interactable_id):
interactable = obj
break
if not interactable:
return {"success": False, "message": "Object not found"}
# Find the action
action = None
for act in interactable.actions:
if act.id == action_id:
action = act
break
if not action:
return {"success": False, "message": "Action not found"}
# Check stamina
if player['stamina'] < action.stamina_cost:
return {
"success": False,
"message": f"Not enough stamina. Need {action.stamina_cost}, have {player['stamina']}."
}
# Check cooldown for this specific action
cooldown_expiry = await db.get_interactable_cooldown(interactable_id, action_id)
if cooldown_expiry:
remaining = int(cooldown_expiry - time.time())
return {
"success": False,
"message": f"This action is still on cooldown. Wait {remaining} seconds."
}
# Deduct stamina
new_stamina = player['stamina'] - action.stamina_cost
await db.update_player_stamina(player_id, new_stamina)
# Determine outcome (simple success/failure for now)
# TODO: Implement proper skill checks
roll = random.randint(1, 100)
if roll <= 10: # 10% critical failure
outcome_key = 'critical_failure'
elif roll <= 30: # 20% failure
outcome_key = 'failure'
else: # 70% success
outcome_key = 'success'
outcome = action.outcomes.get(outcome_key)
if not outcome:
# Fallback to success if outcome not defined
outcome = action.outcomes.get('success')
if not outcome:
return {
"success": False,
"message": "Action has no defined outcomes"
}
# Process outcome
items_found = []
items_dropped = []
damage_taken = outcome.damage_taken
# Calculate current capacity
from api.services.helpers import calculate_player_capacity
from api.items import items_manager as ITEMS_MANAGER
inventory = await db.get_inventory(player_id)
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(inventory, ITEMS_MANAGER)
# Add items to inventory (or drop if over capacity)
for item_id, quantity in outcome.items_reward.items():
item = items_manager.get_item(item_id)
if not item:
continue
item_name = get_locale_string(item.name) if item else item_id
emoji = item.emoji if item and hasattr(item, 'emoji') else ''
# Check if item has durability (unique item)
has_durability = hasattr(item, 'durability') and item.durability is not None
# For items with durability, we need to create each one individually
if has_durability:
for _ in range(quantity):
# Check if item fits in inventory
if (current_weight + item.weight <= max_weight and
current_volume + item.volume <= max_volume):
# Add to inventory with durability properties
await db.add_item_to_inventory(
player_id,
item_id,
quantity=1,
durability=item.durability,
max_durability=item.durability,
tier=getattr(item, 'tier', None)
)
items_found.append(f"{emoji} {get_locale_string(item_name)}")
current_weight += item.weight
current_volume += item.volume
else:
# Create unique_item and drop to ground
# Save base stats to unique_stats
base_stats = {k: int(v) if isinstance(v, (int, float)) else v for k, v in item.stats.items()} if item.stats else {}
unique_item_id = await db.create_unique_item(
item_id=item_id,
durability=item.durability,
max_durability=item.durability,
tier=getattr(item, 'tier', None),
unique_stats=base_stats
)
await db.drop_item_to_world(item_id, 1, player['location_id'], unique_item_id=unique_item_id)
items_dropped.append(f"{emoji} {get_locale_string(item_name)}")
else:
# Stackable items - process as before
item_weight = item.weight * quantity
item_volume = item.volume * quantity
if (current_weight + item_weight <= max_weight and
current_volume + item_volume <= max_volume):
# Add to inventory
await db.add_item_to_inventory(player_id, item_id, quantity)
items_found.append(f"{emoji} {get_locale_string(item_name)} x{quantity}")
current_weight += item_weight
current_volume += item_volume
else:
# Drop to ground
await db.drop_item_to_world(item_id, quantity, player['location_id'])
items_dropped.append(f"{emoji} {get_locale_string(item_name)} x{quantity}")
# Apply damage
if damage_taken > 0:
new_hp = max(0, player['hp'] - damage_taken)
await db.update_player_hp(player_id, new_hp)
# Check if player died
if new_hp <= 0:
await db.update_player(player_id, is_dead=True)
# Set cooldown for this specific action (60 seconds default)
await db.set_interactable_cooldown(interactable_id, action_id, 60)
# Build message
final_message = get_locale_string(outcome.text)
if items_dropped:
final_message += f"\n⚠️ Inventory full! Dropped to ground: {', '.join(items_dropped)}"
return {
"success": True,
"message": final_message,
"items_found": items_found,
"items_dropped": items_dropped,
"damage_taken": damage_taken,
"stamina_cost": action.stamina_cost,
"new_stamina": new_stamina,
"new_hp": player['hp'] - damage_taken if damage_taken > 0 else player['hp']
}
async def use_item(player_id: int, item_id: str, items_manager) -> Dict[str, Any]:
"""
Use an item from inventory.
Returns: {success, message, effects}
"""
player = await db.get_player_by_id(player_id)
if not player:
return {"success": False, "message": "Player not found"}
# Check if player has the item
inventory = await db.get_inventory(player_id)
item_entry = None
for inv_item in inventory:
if inv_item['item_id'] == item_id:
item_entry = inv_item
break
if not item_entry:
return {"success": False, "message": "You don't have this item"}
# Get item data
item = items_manager.get_item(item_id)
if not item:
return {"success": False, "message": "Item not found in game data"}
if not item.consumable:
return {"success": False, "message": "This item cannot be used"}
# Apply item effects
effects = {}
effects_msg = []
if 'hp_restore' in item.effects:
hp_restore = item.effects['hp_restore']
old_hp = player['hp']
new_hp = min(player['max_hp'], old_hp + hp_restore)
actual_restored = new_hp - old_hp
if actual_restored > 0:
await db.update_player_hp(player_id, new_hp)
effects['hp_restored'] = actual_restored
effects_msg.append(f"+{actual_restored} HP")
if 'stamina_restore' in item.effects:
stamina_restore = item.effects['stamina_restore']
old_stamina = player['stamina']
new_stamina = min(player['max_stamina'], old_stamina + stamina_restore)
actual_restored = new_stamina - old_stamina
if actual_restored > 0:
await db.update_player_stamina(player_id, new_stamina)
effects['stamina_restored'] = actual_restored
effects_msg.append(f"+{actual_restored} Stamina")
# Consume the item (remove 1 from inventory)
await db.remove_item_from_inventory(player_id, item_id, 1)
# Track statistics
stat_updates = {"items_used": 1, "increment": True}
if 'hp_restored' in effects:
stat_updates['hp_restored'] = effects['hp_restored']
if 'stamina_restored' in effects:
stat_updates['stamina_restored'] = effects['stamina_restored']
await db.update_player_statistics(player_id, **stat_updates)
# Build message
msg = f"Used {item.name}"
if effects_msg:
msg += f" ({', '.join(effects_msg)})"
return {
"success": True,
"message": msg,
"effects": effects
}
async def pickup_item(player_id: int, item_id: int, location_id: str, quantity: int = None, items_manager=None) -> Dict[str, Any]:
"""
Pick up an item from the ground.
item_id is the dropped_item id, not the item_id field.
quantity: how many to pick up (None = all)
items_manager: ItemsManager instance to get item definitions
Returns: {success, message}
"""
# Get the dropped item by its ID
dropped_item = await db.get_dropped_item(item_id)
if not dropped_item:
return {"success": False, "message": "Item not found on ground"}
# Get item definition
item_def = items_manager.get_item(dropped_item['item_id']) if items_manager else None
if not item_def:
return {"success": False, "message": "Item data not found"}
# Determine how many to pick up
available_qty = dropped_item['quantity']
if quantity is None or quantity >= available_qty:
pickup_qty = available_qty
else:
if quantity < 1:
return {"success": False, "message": "Invalid quantity"}
pickup_qty = quantity
# Get player and calculate capacity
from api.services.helpers import calculate_player_capacity
player = await db.get_player_by_id(player_id)
inventory = await db.get_inventory(player_id)
# Calculate current weight and volume (including equipped bag capacity)
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(inventory, items_manager)
# Calculate weight and volume for items to pick up
item_weight = item_def.weight * pickup_qty
item_volume = item_def.volume * pickup_qty
new_weight = current_weight + item_weight
new_volume = current_volume + item_volume
# Check limits
if new_weight > max_weight:
return {
"success": False,
"message": f"⚠️ Item too heavy! {item_def.emoji} {item_def.name} x{pickup_qty} ({item_weight:.1f}kg) would exceed capacity. Current: {current_weight:.1f}/{max_weight:.1f}kg"
}
if new_volume > max_volume:
return {
"success": False,
"message": f"⚠️ Item too large! {item_def.emoji} {item_def.name} x{pickup_qty} ({item_volume:.1f}L) would exceed capacity. Current: {current_volume:.1f}/{max_volume:.1f}L"
}
# Items fit - update dropped item quantity or remove it
if pickup_qty >= available_qty:
await db.remove_dropped_item(item_id)
else:
new_qty = available_qty - pickup_qty
await db.update_dropped_item_quantity(item_id, new_qty)
# Add to inventory (pass unique_item_id if it's a unique item)
await db.add_item_to_inventory(
player_id,
dropped_item['item_id'],
pickup_qty,
unique_item_id=dropped_item.get('unique_item_id')
)
return {
"success": True,
"message": f"Picked up {item_def.emoji} {item_def.name} x{pickup_qty}"
}
async def check_and_apply_level_up(player_id: int) -> Dict[str, Any]:
"""
Check if player has enough XP to level up and apply it.
Returns: {leveled_up: bool, new_level: int, levels_gained: int}
"""
player = await db.get_player_by_id(player_id)
if not player:
return {"leveled_up": False, "new_level": 1, "levels_gained": 0}
current_level = player['level']
current_xp = player['xp']
levels_gained = 0
# Check for level ups (can level up multiple times if enough XP)
while current_xp >= (current_level * 100):
current_xp -= (current_level * 100)
current_level += 1
levels_gained += 1
if levels_gained > 0:
# Update player with new level, remaining XP, and unspent points
new_unspent_points = player['unspent_points'] + levels_gained
await db.update_player(
player_id,
level=current_level,
xp=current_xp,
unspent_points=new_unspent_points
)
return {
"leveled_up": True,
"new_level": current_level,
"levels_gained": levels_gained
}
return {"leveled_up": False, "new_level": current_level, "levels_gained": 0}
# ============================================================================
# STATUS EFFECTS UTILITIES
# ============================================================================
def calculate_status_damage(effects: list) -> int:
"""
Calculate total damage from all status effects.
Args:
effects: List of status effect dicts
Returns:
Total damage per tick
"""
return sum(effect.get('damage_per_tick', 0) for effect in effects)
# ============================================================================
# COMBAT UTILITIES
# ============================================================================
return message, player_defeated
def generate_npc_intent(npc_def, combat_state: dict) -> dict:
"""
Generate the NEXT intent for an NPC.
Returns a dict with intent type and details.
"""
# Default intent is attack
intent = {"type": "attack", "value": 0}
# Logic could be more complex based on NPC type, HP, etc.
roll = random.random()
# 20% chance to defend if HP < 50%
if (combat_state['npc_hp'] / combat_state['npc_max_hp'] < 0.5) and roll < 0.2:
intent = {"type": "defend", "value": 0}
# 15% chance for special attack (if defined, otherwise strong attack)
elif roll < 0.35:
intent = {"type": "special", "value": 0}
else:
intent = {"type": "attack", "value": 0}
return intent
async def npc_attack(player_id: int, combat: dict, npc_def, reduce_armor_func) -> Tuple[str, bool]:
"""
Execute NPC turn based on PREVIOUS intent, then generate NEXT intent.
"""
player = await db.get_player_by_id(player_id)
if not player:
return "Player not found", True
# Parse current intent (stored in DB as string or JSON, assuming simple string for now or we parse it)
# For now, let's assume simple string "attack", "defend", "special" stored in npc_intent
# If we want more complex data, we should use JSON, but the migration added VARCHAR.
# Let's stick to simple string for the column, but we can store "type:value" if needed.
current_intent_str = combat.get('npc_intent', 'attack')
# Handle legacy/null
if not current_intent_str:
current_intent_str = 'attack'
intent_type = current_intent_str
message = ""
actual_damage = 0
# EXECUTE INTENT
if intent_type == 'defend':
# NPC defends - maybe heals or takes less damage next turn?
# For simplicity: Heals 5% HP
heal_amount = int(combat['npc_max_hp'] * 0.05)
new_npc_hp = min(combat['npc_max_hp'], combat['npc_hp'] + heal_amount)
await db.update_combat(player_id, {'npc_hp': new_npc_hp})
message = f"{get_locale_string(npc_def.name)} defends and recovers {heal_amount} HP!"
elif intent_type == 'special':
# Strong attack (1.5x damage)
npc_damage = int(random.randint(npc_def.damage_min, npc_def.damage_max) * 1.5)
armor_absorbed, broken_armor = await reduce_armor_func(player_id, npc_damage)
actual_damage = max(1, npc_damage - armor_absorbed)
new_player_hp = max(0, player['hp'] - actual_damage)
message = f"{get_locale_string(npc_def.name)} uses a SPECIAL ATTACK for {npc_damage} damage!"
if armor_absorbed > 0:
message += f" (Armor absorbed {armor_absorbed})"
if broken_armor:
for armor in broken_armor:
message += f"\n💔 Your {armor['emoji']} {armor['name']} broke!"
await db.update_player(player_id, hp=new_player_hp)
else: # Default 'attack'
npc_damage = random.randint(npc_def.damage_min, npc_def.damage_max)
# Enrage bonus if NPC is below 30% HP
if combat['npc_hp'] / combat['npc_max_hp'] < 0.3:
npc_damage = int(npc_damage * 1.5)
message = f"{get_locale_string(npc_def.name)} is ENRAGED! "
else:
message = ""
armor_absorbed, broken_armor = await reduce_armor_func(player_id, npc_damage)
actual_damage = max(1, npc_damage - armor_absorbed)
new_player_hp = max(0, player['hp'] - actual_damage)
message += create_combat_message("enemy_attack", npc_name=npc_def.name, damage=npc_damage, armor_absorbed=armor_absorbed)
if armor_absorbed > 0:
message += f" (Armor absorbed {armor_absorbed})"
if broken_armor:
for armor in broken_armor:
message += f"\n💔 Your {armor['emoji']} {armor['name']} broke!"
await db.update_player(player_id, hp=new_player_hp)
# GENERATE NEXT INTENT
# We need to update the combat state with the new HP values first to make good decisions
# But we can just use the values we calculated.
# Check if player defeated
player_defeated = False
if player['hp'] - actual_damage <= 0 and intent_type != 'defend': # Check HP after damage
# Re-fetch to be sure or just trust calculation
if new_player_hp <= 0:
message += "\nYou have been defeated!"
player_defeated = True
await db.update_player(player_id, hp=0, is_dead=True)
await db.update_player_statistics(player_id, deaths=1, damage_taken=actual_damage, increment=True)
await db.end_combat(player_id)
return message, player_defeated
if not player_defeated:
if actual_damage > 0:
await db.update_player_statistics(player_id, damage_taken=actual_damage, increment=True)
# Generate NEXT intent
# We need the updated NPC HP for the logic
current_npc_hp = combat['npc_hp']
if intent_type == 'defend':
current_npc_hp = min(combat['npc_max_hp'], combat['npc_hp'] + int(combat['npc_max_hp'] * 0.05))
temp_combat_state = combat.copy()
temp_combat_state['npc_hp'] = current_npc_hp
next_intent = generate_npc_intent(npc_def, temp_combat_state)
# Update combat with new intent and turn
await db.update_combat(player_id, {
'turn': 'player',
'turn_started_at': time.time(),
'npc_intent': next_intent['type']
})
return message, player_defeated