Files
echoes-of-the-ash/api/game_logic.py
2025-11-07 15:27:13 +01:00

507 lines
18 KiB
Python

"""
Standalone game logic for the API.
Contains all game mechanics without bot dependencies.
"""
import random
import time
from typing import Dict, Any, Tuple, Optional, List
from . import database as db
async def move_player(player_id: int, direction: str, locations: Dict) -> Tuple[bool, str, Optional[str], int, int]:
"""
Move player in a direction.
Returns: (success, message, new_location_id, stamina_cost, distance_meters)
"""
player = await db.get_player_by_id(player_id)
if not player:
return False, "Player not found", None, 0, 0
current_location_id = player['location_id']
current_location = locations.get(current_location_id)
if not current_location:
return False, "Current location not found", None, 0, 0
# Check if direction is valid
if direction not in current_location.exits:
return False, f"You cannot go {direction} from here.", None, 0, 0
new_location_id = current_location.exits[direction]
new_location = locations.get(new_location_id)
if not new_location:
return False, "Destination not found", None, 0, 0
# Calculate total weight
from api.items import items_manager as ITEMS_MANAGER
inventory = await db.get_inventory(player_id)
total_weight = 0.0
for inv_item in inventory:
item = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item:
total_weight += item.weight * inv_item['quantity']
# Calculate distance between locations (1 coordinate unit = 100 meters)
import math
coord_distance = math.sqrt(
(new_location.x - current_location.x)**2 +
(new_location.y - current_location.y)**2
)
distance = int(coord_distance * 100) # Convert to meters, round to integer
# Calculate stamina cost: base from distance, adjusted by weight and agility
base_cost = max(1, round(distance / 50)) # 50m = 1 stamina
weight_penalty = int(total_weight / 10)
agility_reduction = int(player.get('agility', 5) / 3)
stamina_cost = max(1, base_cost + weight_penalty - agility_reduction)
# Check stamina
if player['stamina'] < stamina_cost:
return False, "You're too exhausted to move. Wait for your stamina to regenerate.", None, 0, 0
# Update player location and stamina
await db.update_player(
player_id,
location_id=new_location_id,
stamina=max(0, player['stamina'] - stamina_cost)
)
return True, f"You travel {direction} to {new_location.name}.", new_location_id, stamina_cost, distance
async def inspect_area(player_id: int, location, interactables_data: Dict) -> str:
"""
Inspect the current area and return detailed information.
Returns formatted text with interactables and their actions.
"""
player = await db.get_player_by_id(player_id)
if not player:
return "Player not found"
# Check if player has enough stamina
if player['stamina'] < 1:
return "You're too exhausted to inspect the area thoroughly. Wait for your stamina to regenerate."
# Deduct stamina
await db.update_player_stamina(player_id, player['stamina'] - 1)
# Build inspection message
lines = [f"🔍 **Inspecting {location.name}**\n"]
lines.append(location.description)
lines.append("")
if location.interactables:
lines.append("**Interactables:**")
for interactable in location.interactables:
lines.append(f"• **{interactable.name}**")
if interactable.actions:
actions_text = ", ".join([f"{action.label} (⚡{action.stamina_cost})" for action in interactable.actions])
lines.append(f" Actions: {actions_text}")
lines.append("")
if location.npcs:
lines.append(f"**NPCs:** {', '.join(location.npcs)}")
lines.append("")
# Check for dropped items
dropped_items = await db.get_dropped_items(location.id)
if dropped_items:
lines.append("**Items on ground:**")
for item in dropped_items:
lines.append(f"{item['item_id']} x{item['quantity']}")
return "\n".join(lines)
async def interact_with_object(
player_id: int,
interactable_id: str,
action_id: str,
location,
items_manager
) -> Dict[str, Any]:
"""
Interact with an object using a specific action.
Returns: {success, message, items_found, damage_taken, stamina_cost}
"""
player = await db.get_player_by_id(player_id)
if not player:
return {"success": False, "message": "Player not found"}
# Find the interactable
interactable = None
for obj in location.interactables:
if obj.id == interactable_id:
interactable = obj
break
if not interactable:
return {"success": False, "message": "Object not found"}
# Find the action
action = None
for act in interactable.actions:
if act.id == action_id:
action = act
break
if not action:
return {"success": False, "message": "Action not found"}
# Check stamina
if player['stamina'] < action.stamina_cost:
return {
"success": False,
"message": f"Not enough stamina. Need {action.stamina_cost}, have {player['stamina']}."
}
# Check cooldown
cooldown_expiry = await db.get_interactable_cooldown(interactable_id)
if cooldown_expiry:
remaining = int(cooldown_expiry - time.time())
return {
"success": False,
"message": f"This object is still recovering. Wait {remaining} seconds."
}
# Deduct stamina
new_stamina = player['stamina'] - action.stamina_cost
await db.update_player_stamina(player_id, new_stamina)
# Determine outcome (simple success/failure for now)
# TODO: Implement proper skill checks
roll = random.randint(1, 100)
if roll <= 10: # 10% critical failure
outcome_key = 'critical_failure'
elif roll <= 30: # 20% failure
outcome_key = 'failure'
else: # 70% success
outcome_key = 'success'
outcome = action.outcomes.get(outcome_key)
if not outcome:
# Fallback to success if outcome not defined
outcome = action.outcomes.get('success')
if not outcome:
return {
"success": False,
"message": "Action has no defined outcomes"
}
# Process outcome
items_found = []
items_dropped = []
damage_taken = outcome.damage_taken
# Calculate current capacity
from api.main import calculate_player_capacity
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(player_id)
# Add items to inventory (or drop if over capacity)
for item_id, quantity in outcome.items_reward.items():
item = items_manager.get_item(item_id)
if not item:
continue
item_name = item.name if item else item_id
emoji = item.emoji if item and hasattr(item, 'emoji') else ''
# Check if item has durability (unique item)
has_durability = hasattr(item, 'durability') and item.durability is not None
# For items with durability, we need to create each one individually
if has_durability:
for _ in range(quantity):
# Check if item fits in inventory
if (current_weight + item.weight <= max_weight and
current_volume + item.volume <= max_volume):
# Add to inventory with durability properties
await db.add_item_to_inventory(
player_id,
item_id,
quantity=1,
durability=item.durability,
max_durability=item.durability,
tier=getattr(item, 'tier', None)
)
items_found.append(f"{emoji} {item_name}")
current_weight += item.weight
current_volume += item.volume
else:
# Create unique_item and drop to ground
unique_item_id = await db.create_unique_item(
item_id=item_id,
durability=item.durability,
max_durability=item.durability,
tier=getattr(item, 'tier', None)
)
await db.drop_item_to_world(item_id, 1, player['location_id'], unique_item_id=unique_item_id)
items_dropped.append(f"{emoji} {item_name}")
else:
# Stackable items - process as before
item_weight = item.weight * quantity
item_volume = item.volume * quantity
if (current_weight + item_weight <= max_weight and
current_volume + item_volume <= max_volume):
# Add to inventory
await db.add_item_to_inventory(player_id, item_id, quantity)
items_found.append(f"{emoji} {item_name} x{quantity}")
current_weight += item_weight
current_volume += item_volume
else:
# Drop to ground
await db.drop_item_to_world(item_id, quantity, player['location_id'])
items_dropped.append(f"{emoji} {item_name} x{quantity}")
# Apply damage
if damage_taken > 0:
new_hp = max(0, player['hp'] - damage_taken)
await db.update_player_hp(player_id, new_hp)
# Check if player died
if new_hp <= 0:
await db.update_player(player_id, is_dead=True)
# Set cooldown (60 seconds default)
await db.set_interactable_cooldown(interactable_id, 60)
# Build message
final_message = outcome.text
if items_dropped:
final_message += f"\n⚠️ Inventory full! Dropped to ground: {', '.join(items_dropped)}"
return {
"success": True,
"message": final_message,
"items_found": items_found,
"items_dropped": items_dropped,
"damage_taken": damage_taken,
"stamina_cost": action.stamina_cost,
"new_stamina": new_stamina,
"new_hp": player['hp'] - damage_taken if damage_taken > 0 else player['hp']
}
async def use_item(player_id: int, item_id: str, items_manager) -> Dict[str, Any]:
"""
Use an item from inventory.
Returns: {success, message, effects}
"""
player = await db.get_player_by_id(player_id)
if not player:
return {"success": False, "message": "Player not found"}
# Check if player has the item
inventory = await db.get_inventory(player_id)
item_entry = None
for inv_item in inventory:
if inv_item['item_id'] == item_id:
item_entry = inv_item
break
if not item_entry:
return {"success": False, "message": "You don't have this item"}
# Get item data
item = items_manager.get_item(item_id)
if not item:
return {"success": False, "message": "Item not found in game data"}
if not item.consumable:
return {"success": False, "message": "This item cannot be used"}
# Apply item effects
effects = {}
effects_msg = []
if 'hp_restore' in item.effects:
hp_restore = item.effects['hp_restore']
old_hp = player['hp']
new_hp = min(player['max_hp'], old_hp + hp_restore)
actual_restored = new_hp - old_hp
if actual_restored > 0:
await db.update_player_hp(player_id, new_hp)
effects['hp_restored'] = actual_restored
effects_msg.append(f"+{actual_restored} HP")
if 'stamina_restore' in item.effects:
stamina_restore = item.effects['stamina_restore']
old_stamina = player['stamina']
new_stamina = min(player['max_stamina'], old_stamina + stamina_restore)
actual_restored = new_stamina - old_stamina
if actual_restored > 0:
await db.update_player_stamina(player_id, new_stamina)
effects['stamina_restored'] = actual_restored
effects_msg.append(f"+{actual_restored} Stamina")
# Consume the item (remove 1 from inventory)
await db.remove_item_from_inventory(player_id, item_id, 1)
# Track statistics
stat_updates = {"items_used": 1, "increment": True}
if 'hp_restored' in effects:
stat_updates['hp_restored'] = effects['hp_restored']
if 'stamina_restored' in effects:
stat_updates['stamina_restored'] = effects['stamina_restored']
await db.update_player_statistics(player_id, **stat_updates)
# Build message
msg = f"Used {item.name}"
if effects_msg:
msg += f" ({', '.join(effects_msg)})"
return {
"success": True,
"message": msg,
"effects": effects
}
async def pickup_item(player_id: int, item_id: int, location_id: str, quantity: int = None, items_manager=None) -> Dict[str, Any]:
"""
Pick up an item from the ground.
item_id is the dropped_item id, not the item_id field.
quantity: how many to pick up (None = all)
items_manager: ItemsManager instance to get item definitions
Returns: {success, message}
"""
# Get the dropped item by its ID
dropped_item = await db.get_dropped_item(item_id)
if not dropped_item:
return {"success": False, "message": "Item not found on ground"}
# Get item definition
item_def = items_manager.get_item(dropped_item['item_id']) if items_manager else None
if not item_def:
return {"success": False, "message": "Item data not found"}
# Determine how many to pick up
available_qty = dropped_item['quantity']
if quantity is None or quantity >= available_qty:
pickup_qty = available_qty
else:
if quantity < 1:
return {"success": False, "message": "Invalid quantity"}
pickup_qty = quantity
# Get player and calculate capacity
player = await db.get_player_by_id(player_id)
inventory = await db.get_inventory(player_id)
# Calculate current weight and volume (including equipped bag capacity)
current_weight = 0.0
current_volume = 0.0
max_weight = 10.0 # Base capacity
max_volume = 10.0 # Base capacity
for inv_item in inventory:
inv_item_def = items_manager.get_item(inv_item['item_id']) if items_manager else None
if inv_item_def:
current_weight += inv_item_def.weight * inv_item['quantity']
current_volume += inv_item_def.volume * inv_item['quantity']
# Check for equipped bags/containers that increase capacity
if inv_item['is_equipped'] and inv_item_def.stats:
max_weight += inv_item_def.stats.get('weight_capacity', 0)
max_volume += inv_item_def.stats.get('volume_capacity', 0)
# Calculate weight and volume for items to pick up
item_weight = item_def.weight * pickup_qty
item_volume = item_def.volume * pickup_qty
new_weight = current_weight + item_weight
new_volume = current_volume + item_volume
# Check limits
if new_weight > max_weight:
return {
"success": False,
"message": f"⚠️ Item too heavy! {item_def.emoji} {item_def.name} x{pickup_qty} ({item_weight:.1f}kg) would exceed capacity. Current: {current_weight:.1f}/{max_weight:.1f}kg"
}
if new_volume > max_volume:
return {
"success": False,
"message": f"⚠️ Item too large! {item_def.emoji} {item_def.name} x{pickup_qty} ({item_volume:.1f}L) would exceed capacity. Current: {current_volume:.1f}/{max_volume:.1f}L"
}
# Items fit - update dropped item quantity or remove it
if pickup_qty >= available_qty:
await db.remove_dropped_item(item_id)
else:
new_qty = available_qty - pickup_qty
await db.update_dropped_item_quantity(item_id, new_qty)
# Add to inventory (pass unique_item_id if it's a unique item)
await db.add_item_to_inventory(
player_id,
dropped_item['item_id'],
pickup_qty,
unique_item_id=dropped_item.get('unique_item_id')
)
return {
"success": True,
"message": f"Picked up {item_def.emoji} {item_def.name} x{pickup_qty}"
}
async def check_and_apply_level_up(player_id: int) -> Dict[str, Any]:
"""
Check if player has enough XP to level up and apply it.
Returns: {leveled_up: bool, new_level: int, levels_gained: int}
"""
player = await db.get_player_by_id(player_id)
if not player:
return {"leveled_up": False, "new_level": 1, "levels_gained": 0}
current_level = player['level']
current_xp = player['xp']
levels_gained = 0
# Check for level ups (can level up multiple times if enough XP)
while current_xp >= (current_level * 100):
current_xp -= (current_level * 100)
current_level += 1
levels_gained += 1
if levels_gained > 0:
# Update player with new level, remaining XP, and unspent points
new_unspent_points = player['unspent_points'] + levels_gained
await db.update_player(
player_id,
level=current_level,
xp=current_xp,
unspent_points=new_unspent_points
)
return {
"leveled_up": True,
"new_level": current_level,
"levels_gained": levels_gained
}
return {"leveled_up": False, "new_level": current_level, "levels_gained": 0}
# ============================================================================
# STATUS EFFECTS UTILITIES
# ============================================================================
def calculate_status_damage(effects: list) -> int:
"""
Calculate total damage from all status effects.
Args:
effects: List of status effect dicts
Returns:
Total damage per tick
"""
return sum(effect.get('damage_per_tick', 0) for effect in effects)