Files
echoes-of-the-ash/api/main.py
2025-11-07 15:27:13 +01:00

4240 lines
167 KiB
Python

"""
Standalone FastAPI application for Echoes of the Ashes.
All dependencies are self-contained in the api/ directory.
"""
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import jwt
import bcrypt
import asyncio
from datetime import datetime, timedelta
import os
import math
from contextlib import asynccontextmanager
from pathlib import Path
# Import our standalone modules
from . import database as db
from .world_loader import load_world, World, Location
from .items import ItemsManager
from . import game_logic
from . import background_tasks
# Helper function for distance calculation
def calculate_distance(x1: float, y1: float, x2: float, y2: float) -> float:
"""
Calculate distance between two points using Euclidean distance.
Coordinate system: 1 unit = 100 meters (so distance(0,0 to 1,1) = 141.4m)
"""
# Calculate distance in coordinate units
coord_distance = math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
# Convert to meters (1 coordinate unit = 100 meters)
distance_meters = coord_distance * 100
return distance_meters
def calculate_stamina_cost(distance: float, weight: float, agility: int) -> int:
"""
Calculate stamina cost based on distance, weight, and agility.
- Base cost: distance / 50 (so 50m = 1 stamina, 100m = 2 stamina)
- Weight penalty: +1 stamina per 10kg
- Agility reduction: -1 stamina per 3 agility points
- Minimum: 1 stamina
"""
base_cost = max(1, round(distance / 50))
weight_penalty = int(weight / 10)
agility_reduction = int(agility / 3)
total_cost = max(1, base_cost + weight_penalty - agility_reduction)
return total_cost
async def calculate_player_capacity(player_id: int):
"""
Calculate player's current and max weight/volume capacity.
Returns: (current_weight, max_weight, current_volume, max_volume)
"""
inventory = await db.get_inventory(player_id)
current_weight = 0.0
current_volume = 0.0
max_weight = 10.0 # Base capacity
max_volume = 10.0 # Base capacity
for inv_item in inventory:
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item_def:
current_weight += item_def.weight * inv_item['quantity']
current_volume += item_def.volume * inv_item['quantity']
# Check for equipped bags/containers that increase capacity
if inv_item['is_equipped'] and item_def.stats:
max_weight += item_def.stats.get('weight_capacity', 0)
max_volume += item_def.stats.get('volume_capacity', 0)
return current_weight, max_weight, current_volume, max_volume
# Lifespan context manager for startup/shutdown
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await db.init_db()
print("✅ Database initialized")
# Start background tasks (only runs in one worker due to locking)
tasks = await background_tasks.start_background_tasks()
if tasks:
print(f"✅ Started {len(tasks)} background tasks in this worker")
else:
print("⏭️ Background tasks running in another worker")
yield
# Shutdown: Stop background tasks properly
await background_tasks.stop_background_tasks(tasks)
app = FastAPI(
title="Echoes of the Ash API",
version="2.0.0",
description="Standalone game API with web and bot support",
lifespan=lifespan
)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://echoesoftheashgame.patacuack.net",
"http://localhost:3000",
"http://localhost:5173"
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files for images
images_dir = Path(__file__).parent.parent / "images"
if images_dir.exists():
app.mount("/images", StaticFiles(directory=str(images_dir)), name="images")
print(f"✅ Mounted images directory: {images_dir}")
else:
print(f"⚠️ Images directory not found: {images_dir}")
# JWT Configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production-please")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
# Internal API key for bot communication
API_INTERNAL_KEY = os.getenv("API_INTERNAL_KEY", "change-this-internal-key")
security = HTTPBearer()
# Load game data
print("🔄 Loading game world...")
WORLD: World = load_world()
LOCATIONS: Dict[str, Location] = WORLD.locations
ITEMS_MANAGER = ItemsManager()
print(f"✅ Game world ready: {len(LOCATIONS)} locations, {len(ITEMS_MANAGER.items)} items")
# ============================================================================
# Pydantic Models
# ============================================================================
class UserRegister(BaseModel):
username: str
password: str
class UserLogin(BaseModel):
username: str
password: str
class MoveRequest(BaseModel):
direction: str
class InteractRequest(BaseModel):
interactable_id: str
action_id: str
class UseItemRequest(BaseModel):
item_id: str
class PickupItemRequest(BaseModel):
item_id: int # This is the dropped_item database ID, not the item type string
quantity: int = 1 # How many to pick up (default: 1)
class InitiateCombatRequest(BaseModel):
enemy_id: int # wandering_enemies.id from database
class CombatActionRequest(BaseModel):
action: str # 'attack', 'defend', 'flee'
# ============================================================================
# JWT Helper Functions
# ============================================================================
def create_access_token(data: dict) -> str:
"""Create a JWT access token"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict[str, Any]:
"""Verify JWT token and return current user"""
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
player_id: int = payload.get("player_id")
if player_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
player = await db.get_player_by_id(player_id)
if player is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Player not found"
)
return player
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
# ============================================================================
# Authentication Endpoints
# ============================================================================
@app.post("/api/auth/register")
async def register(user: UserRegister):
"""Register a new web user"""
# Check if username already exists
existing = await db.get_player_by_username(user.username)
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
# Hash password
password_hash = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
# Create player
player = await db.create_player(
username=user.username,
password_hash=password_hash,
name="Survivor"
)
if not player:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create player"
)
# Create access token
access_token = create_access_token({"player_id": player["id"]})
return {
"access_token": access_token,
"token_type": "bearer",
"player": {
"id": player["id"],
"username": player["username"],
"name": player["name"]
}
}
@app.post("/api/auth/login")
async def login(user: UserLogin):
"""Login for web users"""
# Get player by username
player = await db.get_player_by_username(user.username)
if not player:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
# Verify password
if not player.get('password_hash'):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
if not bcrypt.checkpw(user.password.encode('utf-8'), player['password_hash'].encode('utf-8')):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
# Create access token
access_token = create_access_token({"player_id": player["id"]})
return {
"access_token": access_token,
"token_type": "bearer",
"player": {
"id": player["id"],
"username": player["username"],
"name": player["name"]
}
}
@app.get("/api/auth/me")
async def get_me(current_user: dict = Depends(get_current_user)):
"""Get current user profile"""
return {
"id": current_user["id"],
"username": current_user.get("username"),
"telegram_id": current_user.get("telegram_id"),
"name": current_user["name"],
"level": current_user["level"],
"xp": current_user["xp"],
"hp": current_user["hp"],
"max_hp": current_user["max_hp"],
"stamina": current_user["stamina"],
"max_stamina": current_user["max_stamina"],
"strength": current_user["strength"],
"agility": current_user["agility"],
"endurance": current_user["endurance"],
"intellect": current_user["intellect"],
"location_id": current_user["location_id"],
"is_dead": current_user["is_dead"],
"unspent_points": current_user["unspent_points"]
}
# ============================================================================
# Game Endpoints
# ============================================================================
@app.get("/api/game/state")
async def get_game_state(current_user: dict = Depends(get_current_user)):
"""Get complete game state for the player"""
player_id = current_user['id']
# Get player data
player = await db.get_player_by_id(player_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
# Get location
location = LOCATIONS.get(player['location_id'])
# Get inventory and enrich with item data (exclude equipped items)
inventory_raw = await db.get_inventory(player_id)
inventory = []
total_weight = 0.0
total_volume = 0.0
for inv_item in inventory_raw:
item = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item:
item_weight = item.weight * inv_item['quantity']
# Equipped items count for weight but not volume
if not inv_item['is_equipped']:
item_volume = item.volume * inv_item['quantity']
total_volume += item_volume
total_weight += item_weight
# Only add non-equipped items to inventory list
if not inv_item['is_equipped']:
# Get unique item data if this is a unique item
durability = None
max_durability = None
tier = None
if inv_item.get('unique_item_id'):
unique_item = await db.get_unique_item(inv_item['unique_item_id'])
if unique_item:
durability = unique_item.get('durability')
max_durability = unique_item.get('max_durability')
tier = unique_item.get('tier')
inventory.append({
"id": inv_item['id'],
"item_id": item.id,
"name": item.name,
"description": item.description,
"type": item.type,
"category": getattr(item, 'category', item.type),
"quantity": inv_item['quantity'],
"is_equipped": inv_item['is_equipped'],
"equippable": item.equippable,
"consumable": item.consumable,
"weight": item.weight,
"volume": item.volume,
"image_path": item.image_path,
"emoji": item.emoji,
"slot": item.slot,
"durability": durability if durability is not None else None,
"max_durability": max_durability if max_durability is not None else None,
"tier": tier if tier is not None else None,
"hp_restore": item.effects.get('hp_restore') if item.effects else None,
"stamina_restore": item.effects.get('stamina_restore') if item.effects else None,
"damage_min": item.stats.get('damage_min') if item.stats else None,
"damage_max": item.stats.get('damage_max') if item.stats else None
})
# Get equipped items
equipment_slots = await db.get_all_equipment(player_id)
equipment = {}
for slot, item_data in equipment_slots.items():
if item_data and item_data['item_id']:
inv_item = await db.get_inventory_item_by_id(item_data['item_id'])
if inv_item:
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item_def:
# Get unique item data if this is a unique item
durability = None
max_durability = None
tier = None
if inv_item.get('unique_item_id'):
unique_item = await db.get_unique_item(inv_item['unique_item_id'])
if unique_item:
durability = unique_item.get('durability')
max_durability = unique_item.get('max_durability')
tier = unique_item.get('tier')
equipment[slot] = {
"inventory_id": item_data['item_id'],
"item_id": item_def.id,
"name": item_def.name,
"description": item_def.description,
"emoji": item_def.emoji,
"image_path": item_def.image_path,
"durability": durability if durability is not None else None,
"max_durability": max_durability if max_durability is not None else None,
"tier": tier if tier is not None else None,
"stats": item_def.stats,
"encumbrance": item_def.encumbrance,
"weapon_effects": item_def.weapon_effects if hasattr(item_def, 'weapon_effects') else {}
}
if slot not in equipment:
equipment[slot] = None
# Get combat state
combat = await db.get_active_combat(player_id)
# Get dropped items at location and enrich with item data
dropped_items_raw = await db.get_dropped_items(player['location_id'])
dropped_items = []
for dropped_item in dropped_items_raw:
item = ITEMS_MANAGER.get_item(dropped_item['item_id'])
if item:
# Get unique item data if this is a unique item
durability = None
max_durability = None
tier = None
if dropped_item.get('unique_item_id'):
unique_item = await db.get_unique_item(dropped_item['unique_item_id'])
if unique_item:
durability = unique_item.get('durability')
max_durability = unique_item.get('max_durability')
tier = unique_item.get('tier')
dropped_items.append({
"id": dropped_item['id'],
"item_id": item.id,
"name": item.name,
"description": item.description,
"type": item.type,
"quantity": dropped_item['quantity'],
"image_path": item.image_path,
"emoji": item.emoji,
"weight": item.weight,
"volume": item.volume,
"durability": durability if durability is not None else None,
"max_durability": max_durability if max_durability is not None else None,
"tier": tier if tier is not None else None,
"hp_restore": item.effects.get('hp_restore') if item.effects else None,
"stamina_restore": item.effects.get('stamina_restore') if item.effects else None,
"damage_min": item.stats.get('damage_min') if item.stats else None,
"damage_max": item.stats.get('damage_max') if item.stats else None
})
# Calculate max weight and volume based on equipment
# Base capacity
max_weight = 10.0 # Base carrying capacity
max_volume = 10.0 # Base volume capacity
# Check for equipped backpack that increases capacity
if equipment.get('backpack'):
backpack_stats = equipment['backpack'].get('stats', {})
max_weight += backpack_stats.get('weight_capacity', 0)
max_volume += backpack_stats.get('volume_capacity', 0)
# Convert location to dict
location_dict = None
if location:
location_dict = {
"id": location.id,
"name": location.name,
"description": location.description,
"exits": location.exits,
"image_path": location.image_path,
"x": getattr(location, 'x', 0.0),
"y": getattr(location, 'y', 0.0),
"tags": getattr(location, 'tags', [])
}
# Add weight/volume to player data
player_with_capacity = dict(player)
player_with_capacity['current_weight'] = round(total_weight, 2)
player_with_capacity['max_weight'] = round(max_weight, 2)
player_with_capacity['current_volume'] = round(total_volume, 2)
player_with_capacity['max_volume'] = round(max_volume, 2)
# Calculate movement cooldown
import time
current_time = time.time()
last_movement = player.get('last_movement_time', 0)
movement_cooldown = max(0, 5 - (current_time - last_movement))
player_with_capacity['movement_cooldown'] = int(movement_cooldown)
return {
"player": player_with_capacity,
"location": location_dict,
"inventory": inventory,
"equipment": equipment,
"combat": combat,
"dropped_items": dropped_items
}
@app.get("/api/game/profile")
async def get_player_profile(current_user: dict = Depends(get_current_user)):
"""Get player profile information"""
player_id = current_user['id']
player = await db.get_player_by_id(player_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
# Get inventory and enrich with item data
inventory_raw = await db.get_inventory(player_id)
inventory = []
total_weight = 0.0
total_volume = 0.0
max_weight = 10.0
max_volume = 10.0
for inv_item in inventory_raw:
item = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item:
item_weight = item.weight * inv_item['quantity']
item_volume = item.volume * inv_item['quantity']
total_weight += item_weight
total_volume += item_volume
# Check for equipped bags/containers
if inv_item['is_equipped'] and item.stats:
max_weight += item.stats.get('weight_capacity', 0)
max_volume += item.stats.get('volume_capacity', 0)
# Enrich inventory item with all necessary data
inventory.append({
"id": inv_item['id'],
"item_id": item.id,
"name": item.name,
"description": item.description,
"type": item.type,
"category": getattr(item, 'category', item.type),
"quantity": inv_item['quantity'],
"is_equipped": inv_item['is_equipped'],
"equippable": item.equippable,
"consumable": item.consumable,
"weight": item.weight,
"volume": item.volume,
"image_path": item.image_path,
"emoji": item.emoji,
"hp_restore": item.effects.get('hp_restore') if item.effects else None,
"stamina_restore": item.effects.get('stamina_restore') if item.effects else None,
"damage_min": item.stats.get('damage_min') if item.stats else None,
"damage_max": item.stats.get('damage_max') if item.stats else None
})
# Add weight/volume to player data
player_with_capacity = dict(player)
player_with_capacity['current_weight'] = round(total_weight, 2)
player_with_capacity['max_weight'] = round(max_weight, 2)
player_with_capacity['current_volume'] = round(total_volume, 2)
player_with_capacity['max_volume'] = round(max_volume, 2)
return {
"player": player_with_capacity,
"inventory": inventory
}
@app.post("/api/game/spend_point")
async def spend_stat_point(
stat: str,
current_user: dict = Depends(get_current_user)
):
"""Spend a stat point on a specific attribute"""
player = await db.get_player_by_id(current_user['id'])
if not player:
raise HTTPException(status_code=404, detail="Player not found")
if player['unspent_points'] < 1:
raise HTTPException(status_code=400, detail="No unspent points available")
# Valid stats
valid_stats = ['strength', 'agility', 'endurance', 'intellect']
if stat not in valid_stats:
raise HTTPException(status_code=400, detail=f"Invalid stat. Must be one of: {', '.join(valid_stats)}")
# Update the stat and decrease unspent points
update_data = {
stat: player[stat] + 1,
'unspent_points': player['unspent_points'] - 1
}
# Endurance increases max HP
if stat == 'endurance':
update_data['max_hp'] = player['max_hp'] + 5
update_data['hp'] = min(player['hp'] + 5, update_data['max_hp']) # Also heal by 5
await db.update_player(current_user['id'], **update_data)
return {
"success": True,
"message": f"Increased {stat} by 1!",
"new_value": player[stat] + 1,
"remaining_points": player['unspent_points'] - 1
}
@app.get("/api/game/location")
async def get_current_location(current_user: dict = Depends(get_current_user)):
"""Get current location information"""
location_id = current_user['location_id']
location = LOCATIONS.get(location_id)
if not location:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Location {location_id} not found"
)
# Get dropped items at location
dropped_items = await db.get_dropped_items(location_id)
# Get wandering enemies at location
wandering_enemies = await db.get_wandering_enemies_in_location(location_id)
# Format interactables for response with cooldown info
interactables_data = []
for interactable in location.interactables:
# Check cooldown status
cooldown_expiry = await db.get_interactable_cooldown(interactable.id)
import time
is_on_cooldown = False
remaining_cooldown = 0
if cooldown_expiry:
current_time = time.time()
if cooldown_expiry > current_time:
is_on_cooldown = True
remaining_cooldown = int(cooldown_expiry - current_time)
actions_data = []
for action in interactable.actions:
actions_data.append({
"id": action.id,
"name": action.label,
"stamina_cost": action.stamina_cost,
"description": f"Costs {action.stamina_cost} stamina"
})
interactables_data.append({
"instance_id": interactable.id,
"name": interactable.name,
"image_path": interactable.image_path,
"actions": actions_data,
"on_cooldown": is_on_cooldown,
"cooldown_remaining": remaining_cooldown
})
# Fix image URL - image_path already contains the full path from images/
image_url = f"/{location.image_path}" if location.image_path else "/images/locations/default.png"
# Calculate player's current weight for stamina cost adjustment
player = await db.get_player_by_id(current_user['id'])
inventory_raw = await db.get_inventory(current_user['id'])
total_weight = 0.0
for inv_item in inventory_raw:
item = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item:
total_weight += item.weight * inv_item['quantity']
# Format directions with stamina costs (calculated from distance, weight, agility)
directions_with_stamina = []
player_agility = player.get('agility', 5)
for direction in location.exits.keys():
destination_id = location.exits[direction]
destination_loc = LOCATIONS.get(destination_id)
if destination_loc:
# Calculate real distance using coordinates
distance = calculate_distance(
location.x, location.y,
destination_loc.x, destination_loc.y
)
# Calculate stamina cost based on distance, weight, and agility
stamina_cost = calculate_stamina_cost(distance, total_weight, player_agility)
destination_name = destination_loc.name
else:
# Fallback if destination not found
distance = 500 # Default 500m
stamina_cost = calculate_stamina_cost(distance, total_weight, player_agility)
destination_name = destination_id
directions_with_stamina.append({
"direction": direction,
"stamina_cost": stamina_cost,
"distance": int(distance), # Round to integer meters
"destination": destination_id,
"destination_name": destination_name
})
# Format NPCs (wandering enemies + static NPCs from JSON)
npcs_data = []
# Add wandering enemies from database
for enemy in wandering_enemies:
npcs_data.append({
"id": enemy['id'],
"name": enemy['npc_id'].replace('_', ' ').title(),
"type": "enemy",
"level": enemy.get('level', 1),
"is_wandering": True
})
# Add static NPCs from location JSON (if any)
for npc in location.npcs:
if isinstance(npc, dict):
npcs_data.append({
"id": npc.get('id', npc.get('name', 'unknown')),
"name": npc.get('name', 'Unknown NPC'),
"type": npc.get('type', 'npc'),
"level": npc.get('level'),
"is_wandering": False
})
else:
npcs_data.append({
"id": npc,
"name": npc,
"type": "npc",
"is_wandering": False
})
# Enrich dropped items with metadata - DON'T consolidate unique items!
items_dict = {}
for item in dropped_items:
item_def = ITEMS_MANAGER.get_item(item['item_id'])
if item_def:
# Get unique item data if this is a unique item
durability = None
max_durability = None
tier = None
if item.get('unique_item_id'):
unique_item = await db.get_unique_item(item['unique_item_id'])
if unique_item:
durability = unique_item.get('durability')
max_durability = unique_item.get('max_durability')
tier = unique_item.get('tier')
# Create a unique key for unique items to prevent stacking
if item.get('unique_item_id'):
dict_key = f"{item['item_id']}_{item['unique_item_id']}"
else:
dict_key = item['item_id']
if dict_key not in items_dict:
items_dict[dict_key] = {
"id": item['id'], # Use first ID for pickup
"item_id": item['item_id'],
"name": item_def.name,
"description": item_def.description,
"quantity": item['quantity'],
"emoji": item_def.emoji,
"image_path": item_def.image_path,
"weight": item_def.weight,
"volume": item_def.volume,
"durability": durability,
"max_durability": max_durability,
"tier": tier,
"hp_restore": item_def.effects.get('hp_restore') if item_def.effects else None,
"stamina_restore": item_def.effects.get('stamina_restore') if item_def.effects else None,
"damage_min": item_def.stats.get('damage_min') if item_def.stats else None,
"damage_max": item_def.stats.get('damage_max') if item_def.stats else None
}
else:
# Only stack if it's not a unique item (stackable items only)
if not item.get('unique_item_id'):
items_dict[dict_key]['quantity'] += item['quantity']
items_data = list(items_dict.values())
# Get other players in the same location (both Telegram and web users)
other_players = []
try:
async with db.engine.begin() as conn:
stmt = db.select(db.players).where(
db.and_(
db.players.c.location_id == location_id,
db.players.c.id != current_user['id'] # Exclude current player by database ID
)
)
result = await conn.execute(stmt)
players_rows = result.fetchall()
for player_row in players_rows:
# Check if player is in any combat (PvE or PvP)
in_pve_combat = await db.get_active_combat(player_row.id)
in_pvp_combat = await db.get_pvp_combat_by_player(player_row.id)
# Don't show players who are in combat
if in_pve_combat or in_pvp_combat:
continue
# For web users, use username. For Telegram users, use name or telegram_id
display_name = player_row.username if player_row.username else (player_row.name if player_row.name != "Survivor" else f"Player_{player_row.id}")
# Check if PvP is possible with this player
level_diff = abs(player['level'] - player_row.level)
can_pvp = location.danger_level >= 3 and level_diff <= 3
other_players.append({
"id": player_row.id,
"name": player_row.name,
"level": player_row.level,
"username": display_name,
"can_pvp": can_pvp,
"level_diff": level_diff
})
except Exception as e:
print(f"Error fetching other players: {e}")
# Get corpses at location
npc_corpses = await db.get_npc_corpses_in_location(location_id)
player_corpses = await db.get_player_corpses_in_location(location_id)
# Format corpses for response
corpses_data = []
import json
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
for corpse in npc_corpses:
loot = json.loads(corpse['loot_remaining']) if corpse['loot_remaining'] else []
npc_def = NPCS.get(corpse['npc_id'])
corpses_data.append({
"id": f"npc_{corpse['id']}",
"type": "npc",
"name": f"{npc_def.name if npc_def else corpse['npc_id']} Corpse",
"emoji": "💀",
"loot_count": len(loot),
"timestamp": corpse['death_timestamp']
})
for corpse in player_corpses:
items = json.loads(corpse['items']) if corpse['items'] else []
corpses_data.append({
"id": f"player_{corpse['id']}",
"type": "player",
"name": f"{corpse['player_name']}'s Corpse",
"emoji": "⚰️",
"loot_count": len(items),
"timestamp": corpse['death_timestamp']
})
return {
"id": location.id,
"name": location.name,
"description": location.description,
"image_url": image_url,
"directions": list(location.exits.keys()), # Keep for backwards compatibility
"directions_detailed": directions_with_stamina, # New detailed format
"danger_level": location.danger_level,
"tags": location.tags if hasattr(location, 'tags') else [], # Include location tags
"npcs": npcs_data,
"items": items_data,
"interactables": interactables_data,
"other_players": other_players,
"corpses": corpses_data
}
@app.post("/api/game/move")
async def move(
move_req: MoveRequest,
current_user: dict = Depends(get_current_user)
):
"""Move player in a direction"""
import time
# Check if player is in PvP combat
pvp_combat = await db.get_pvp_combat_by_player(current_user['id'])
if pvp_combat:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot move while in PvP combat!"
)
# Check movement cooldown (5 seconds)
player = await db.get_player_by_id(current_user['id'])
current_time = time.time()
last_movement = player.get('last_movement_time', 0)
cooldown_remaining = max(0, 5 - (current_time - last_movement))
if cooldown_remaining > 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"You must wait {int(cooldown_remaining)} seconds before moving again."
)
success, message, new_location_id, stamina_cost, distance = await game_logic.move_player(
current_user['id'],
move_req.direction,
LOCATIONS
)
if not success:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# Update last movement time
await db.update_player(current_user['id'], last_movement_time=current_time)
# Track movement statistics - use actual distance in meters
await db.update_player_statistics(current_user['id'], distance_walked=distance, increment=True)
# Check for encounter upon arrival (if danger level > 1)
import random
import sys
sys.path.insert(0, '/app')
from data.npcs import get_random_npc_for_location, LOCATION_DANGER, NPCS
new_location = LOCATIONS.get(new_location_id)
encounter_triggered = False
enemy_id = None
combat_data = None
if new_location and new_location.danger_level > 1:
# Get encounter rate from danger config
danger_data = LOCATION_DANGER.get(new_location_id)
if danger_data:
_, encounter_rate, _ = danger_data
# Roll for encounter
if random.random() < encounter_rate:
# Get a random enemy for this location
enemy_id = get_random_npc_for_location(new_location_id)
if enemy_id:
# Check if player is already in combat
existing_combat = await db.get_active_combat(current_user['id'])
if not existing_combat:
# Get NPC definition
npc_def = NPCS.get(enemy_id)
if npc_def:
# Randomize HP
npc_hp = random.randint(npc_def.hp_min, npc_def.hp_max)
# Create combat directly
combat = await db.create_combat(
player_id=current_user['id'],
npc_id=enemy_id,
npc_hp=npc_hp,
npc_max_hp=npc_hp,
location_id=new_location_id,
from_wandering=False # This is an encounter, not wandering
)
# Track combat initiation
await db.update_player_statistics(current_user['id'], combats_initiated=1, increment=True)
encounter_triggered = True
combat_data = {
"npc_id": enemy_id,
"npc_name": npc_def.name,
"npc_hp": npc_hp,
"npc_max_hp": npc_hp,
"npc_image": f"/images/npcs/{enemy_id}.png",
"turn": "player",
"round": 1
}
response = {
"success": True,
"message": message,
"new_location_id": new_location_id
}
# Add encounter info if triggered
if encounter_triggered:
response["encounter"] = {
"triggered": True,
"enemy_id": enemy_id,
"message": f"⚠️ An enemy ambushes you upon arrival!",
"combat": combat_data
}
return response
@app.post("/api/game/inspect")
async def inspect(current_user: dict = Depends(get_current_user)):
"""Inspect the current area"""
location_id = current_user['location_id']
location = LOCATIONS.get(location_id)
if not location:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Location not found"
)
# Get dropped items
dropped_items = await db.get_dropped_items(location_id)
message = await game_logic.inspect_area(
current_user['id'],
location,
{} # interactables_data - not needed with new structure
)
return {
"success": True,
"message": message
}
@app.post("/api/game/interact")
async def interact(
interact_req: InteractRequest,
current_user: dict = Depends(get_current_user)
):
"""Interact with an object"""
# Check if player is in combat
combat = await db.get_active_combat(current_user['id'])
if combat:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot interact with objects while in combat"
)
location_id = current_user['location_id']
location = LOCATIONS.get(location_id)
if not location:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Location not found"
)
result = await game_logic.interact_with_object(
current_user['id'],
interact_req.interactable_id,
interact_req.action_id,
location,
ITEMS_MANAGER
)
if not result['success']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=result['message']
)
return result
@app.post("/api/game/use_item")
async def use_item(
use_req: UseItemRequest,
current_user: dict = Depends(get_current_user)
):
"""Use an item from inventory"""
import random
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
# Check if in combat
combat = await db.get_active_combat(current_user['id'])
in_combat = combat is not None
result = await game_logic.use_item(
current_user['id'],
use_req.item_id,
ITEMS_MANAGER
)
if not result['success']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=result['message']
)
# If in combat, enemy gets a turn
if in_combat and combat['turn'] == 'player':
player = await db.get_player_by_id(current_user['id'])
npc_def = NPCS.get(combat['npc_id'])
# Enemy attacks
npc_damage = random.randint(npc_def.damage_min, npc_def.damage_max)
if combat['npc_hp'] / combat['npc_max_hp'] < 0.3:
npc_damage = int(npc_damage * 1.5)
new_player_hp = max(0, player['hp'] - npc_damage)
combat_message = f"\n{npc_def.name} attacks for {npc_damage} damage!"
if new_player_hp <= 0:
combat_message += "\nYou have been defeated!"
await db.update_player(current_user['id'], hp=0, is_dead=True)
await db.end_combat(current_user['id'])
result['combat_over'] = True
result['player_won'] = False
else:
await db.update_player(current_user['id'], hp=new_player_hp)
result['message'] += combat_message
result['in_combat'] = True
result['combat_over'] = result.get('combat_over', False)
return result
@app.post("/api/game/pickup")
async def pickup(
pickup_req: PickupItemRequest,
current_user: dict = Depends(get_current_user)
):
"""Pick up an item from the ground"""
result = await game_logic.pickup_item(
current_user['id'],
pickup_req.item_id,
current_user['location_id'],
pickup_req.quantity,
ITEMS_MANAGER
)
if not result['success']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=result['message']
)
# Track pickup statistics
quantity = pickup_req.quantity if pickup_req.quantity else 1
await db.update_player_statistics(current_user['id'], items_collected=quantity, increment=True)
return result
# ============================================================================
# EQUIPMENT SYSTEM
# ============================================================================
class EquipItemRequest(BaseModel):
inventory_id: int # ID of item in inventory to equip
class UnequipItemRequest(BaseModel):
slot: str # Equipment slot to unequip from
class RepairItemRequest(BaseModel):
inventory_id: int # ID of item in inventory to repair
@app.post("/api/game/equip")
async def equip_item(
equip_req: EquipItemRequest,
current_user: dict = Depends(get_current_user)
):
"""Equip an item from inventory"""
player_id = current_user['id']
# Get the inventory item
inv_item = await db.get_inventory_item_by_id(equip_req.inventory_id)
if not inv_item or inv_item['player_id'] != player_id:
raise HTTPException(status_code=404, detail="Item not found in inventory")
# Get item definition
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if not item_def:
raise HTTPException(status_code=404, detail="Item definition not found")
# Check if item is equippable
if not item_def.equippable or not item_def.slot:
raise HTTPException(status_code=400, detail="This item cannot be equipped")
# Check if slot is valid
valid_slots = ['head', 'torso', 'legs', 'feet', 'weapon', 'offhand', 'backpack']
if item_def.slot not in valid_slots:
raise HTTPException(status_code=400, detail=f"Invalid equipment slot: {item_def.slot}")
# Check if slot is already occupied
current_equipped = await db.get_equipped_item_in_slot(player_id, item_def.slot)
unequipped_item_name = None
if current_equipped and current_equipped.get('item_id'):
# Get the old item's name for the message
old_inv_item = await db.get_inventory_item_by_id(current_equipped['item_id'])
if old_inv_item:
old_item_def = ITEMS_MANAGER.get_item(old_inv_item['item_id'])
unequipped_item_name = old_item_def.name if old_item_def else "previous item"
# Unequip current item first
await db.unequip_item(player_id, item_def.slot)
# Mark as not equipped in inventory
await db.update_inventory_item(current_equipped['item_id'], is_equipped=False)
# Equip the new item
await db.equip_item(player_id, item_def.slot, equip_req.inventory_id)
# Mark as equipped in inventory
await db.update_inventory_item(equip_req.inventory_id, is_equipped=True)
# Initialize unique_item if this is first time equipping an equippable with durability
if inv_item.get('unique_item_id') is None and item_def.durability:
# Create a unique_item instance for this equipment
unique_item_id = await db.create_unique_item(
item_id=item_def.id,
durability=item_def.durability,
max_durability=item_def.durability,
tier=item_def.tier if hasattr(item_def, 'tier') else 1,
unique_stats=None
)
# Link the inventory item to this unique_item
await db.update_inventory_item(
equip_req.inventory_id,
unique_item_id=unique_item_id
)
# Update encumbrance
await db.update_encumbrance(player_id)
# Build message
if unequipped_item_name:
message = f"Unequipped {unequipped_item_name}, equipped {item_def.name}"
else:
message = f"Equipped {item_def.name}"
return {
"success": True,
"message": message,
"slot": item_def.slot,
"unequipped_item": unequipped_item_name
}
@app.post("/api/game/unequip")
async def unequip_item(
unequip_req: UnequipItemRequest,
current_user: dict = Depends(get_current_user)
):
"""Unequip an item from equipment slot"""
player_id = current_user['id']
# Check if slot is valid
valid_slots = ['head', 'torso', 'legs', 'feet', 'weapon', 'offhand', 'backpack']
if unequip_req.slot not in valid_slots:
raise HTTPException(status_code=400, detail=f"Invalid equipment slot: {unequip_req.slot}")
# Get currently equipped item
equipped = await db.get_equipped_item_in_slot(player_id, unequip_req.slot)
if not equipped:
raise HTTPException(status_code=400, detail=f"No item equipped in {unequip_req.slot} slot")
# Get inventory item and item definition
inv_item = await db.get_inventory_item_by_id(equipped['item_id'])
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
# Check if inventory has space (volume-wise)
inventory = await db.get_inventory(player_id)
total_volume = sum(
ITEMS_MANAGER.get_item(i['item_id']).volume * i['quantity']
for i in inventory
if ITEMS_MANAGER.get_item(i['item_id']) and not i['is_equipped']
)
# Get max volume (base 10 + backpack bonus)
max_volume = 10.0
for inv in inventory:
if inv['is_equipped']:
item = ITEMS_MANAGER.get_item(inv['item_id'])
if item and item.stats:
max_volume += item.stats.get('volume_capacity', 0)
# If unequipping backpack, check if items will fit
if unequip_req.slot == 'backpack' and item_def.stats:
backpack_volume = item_def.stats.get('volume_capacity', 0)
if total_volume > (max_volume - backpack_volume):
raise HTTPException(
status_code=400,
detail="Cannot unequip backpack: inventory would exceed volume capacity"
)
# Check if adding this item would exceed volume
if total_volume + item_def.volume > max_volume:
# Drop to ground instead
await db.unequip_item(player_id, unequip_req.slot)
await db.update_inventory_item(equipped['item_id'], is_equipped=False)
await db.drop_item(player_id, inv_item['item_id'], 1, current_user['location_id'])
await db.remove_from_inventory(player_id, inv_item['item_id'], 1)
await db.update_encumbrance(player_id)
return {
"success": True,
"message": f"Unequipped {item_def.name} (dropped to ground - inventory full)",
"dropped": True
}
# Unequip the item
await db.unequip_item(player_id, unequip_req.slot)
await db.update_inventory_item(equipped['item_id'], is_equipped=False)
await db.update_encumbrance(player_id)
return {
"success": True,
"message": f"Unequipped {item_def.name}",
"dropped": False
}
@app.get("/api/game/equipment")
async def get_equipment(current_user: dict = Depends(get_current_user)):
"""Get all equipped items"""
player_id = current_user['id']
equipment = await db.get_all_equipment(player_id)
# Enrich with item data
enriched = {}
for slot, item_data in equipment.items():
if item_data:
inv_item = await db.get_inventory_item_by_id(item_data['item_id'])
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item_def:
enriched[slot] = {
"inventory_id": item_data['item_id'],
"item_id": item_def.id,
"name": item_def.name,
"description": item_def.description,
"emoji": item_def.emoji,
"image_path": item_def.image_path,
"durability": inv_item.get('durability'),
"max_durability": inv_item.get('max_durability'),
"tier": inv_item.get('tier', 1),
"stats": item_def.stats,
"encumbrance": item_def.encumbrance
}
else:
enriched[slot] = None
return {"equipment": enriched}
@app.post("/api/game/repair_item")
async def repair_item(
repair_req: RepairItemRequest,
current_user: dict = Depends(get_current_user)
):
"""Repair an item using materials at a workbench location"""
player_id = current_user['id']
# Get player's location
player = await db.get_player_by_id(player_id)
location = LOCATIONS.get(player['location_id'])
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Check if location has workbench
location_tags = getattr(location, 'tags', [])
if 'workbench' not in location_tags and 'repair_station' not in location_tags:
raise HTTPException(
status_code=400,
detail="You need to be at a location with a workbench to repair items. Try the Gas Station!"
)
# Get inventory item
inv_item = await db.get_inventory_item(repair_req.inventory_id)
if not inv_item or inv_item['player_id'] != player_id:
raise HTTPException(status_code=404, detail="Item not found in inventory")
# Get item definition
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if not item_def:
raise HTTPException(status_code=404, detail="Item definition not found")
# Check if item is repairable
if not getattr(item_def, 'repairable', False):
raise HTTPException(status_code=400, detail=f"{item_def.name} cannot be repaired")
# Check if item has durability (unique item)
if not inv_item.get('unique_item_id'):
raise HTTPException(status_code=400, detail="This item doesn't have durability tracking")
# Get unique item data
unique_item = await db.get_unique_item(inv_item['unique_item_id'])
if not unique_item:
raise HTTPException(status_code=500, detail="Unique item data not found")
current_durability = unique_item.get('durability', 0)
max_durability = unique_item.get('max_durability', 100)
# Check if item needs repair
if current_durability >= max_durability:
raise HTTPException(status_code=400, detail=f"{item_def.name} is already at full durability")
# Get repair materials
repair_materials = getattr(item_def, 'repair_materials', [])
if not repair_materials:
raise HTTPException(status_code=500, detail="Item repair configuration missing")
# Get repair tools
repair_tools = getattr(item_def, 'repair_tools', [])
# Check if player has all required materials and tools
player_inventory = await db.get_inventory(player_id)
inventory_dict = {item['item_id']: item['quantity'] for item in player_inventory}
missing_materials = []
for material in repair_materials:
required_qty = material.get('quantity', 1)
available_qty = inventory_dict.get(material['item_id'], 0)
if available_qty < required_qty:
material_def = ITEMS_MANAGER.get_item(material['item_id'])
material_name = material_def.name if material_def else material['item_id']
missing_materials.append(f"{material_name} ({available_qty}/{required_qty})")
if missing_materials:
raise HTTPException(
status_code=400,
detail=f"Missing materials: {', '.join(missing_materials)}"
)
# Check and consume tools if required
tools_consumed = []
if repair_tools:
success, error_msg, tools_consumed = await consume_tool_durability(player_id, repair_tools, player_inventory)
if not success:
raise HTTPException(status_code=400, detail=error_msg)
# Consume materials
for material in repair_materials:
await db.remove_item_from_inventory(player_id, material['item_id'], material['quantity'])
# Calculate repair amount
repair_percentage = getattr(item_def, 'repair_percentage', 25)
repair_amount = int((max_durability * repair_percentage) / 100)
new_durability = min(current_durability + repair_amount, max_durability)
# Update unique item durability
await db.update_unique_item(inv_item['unique_item_id'], durability=new_durability)
# Build materials consumed message
materials_used = []
for material in repair_materials:
material_def = ITEMS_MANAGER.get_item(material['item_id'])
emoji = material_def.emoji if material_def and hasattr(material_def, 'emoji') else '📦'
name = material_def.name if material_def else material['item_id']
materials_used.append(f"{emoji} {name} x{material['quantity']}")
return {
"success": True,
"message": f"Repaired {item_def.name}! Restored {repair_amount} durability.",
"item_name": item_def.name,
"old_durability": current_durability,
"new_durability": new_durability,
"max_durability": max_durability,
"materials_consumed": materials_used,
"tools_consumed": tools_consumed,
"repair_amount": repair_amount
}
async def reduce_armor_durability(player_id: int, damage_taken: int) -> tuple:
"""
Reduce durability of equipped armor pieces when taking damage.
Formula: durability_loss = max(1, (damage_taken / armor_value) * base_reduction_rate)
Base reduction rate: 0.5 (so 10 damage with 5 armor = 1 durability loss)
Returns: (armor_damage_absorbed, broken_armor_pieces)
"""
equipment = await db.get_all_equipment(player_id)
armor_pieces = ['head', 'chest', 'legs', 'feet']
total_armor = 0
equipped_armor = []
# Collect all equipped armor
for slot in armor_pieces:
if equipment.get(slot) and equipment[slot]:
armor_slot = equipment[slot]
inv_item = await db.get_inventory_item_by_id(armor_slot['item_id'])
if inv_item and inv_item.get('unique_item_id'):
item_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item_def and item_def.stats and 'armor' in item_def.stats:
armor_value = item_def.stats['armor']
total_armor += armor_value
equipped_armor.append({
'slot': slot,
'inv_item_id': armor_slot['item_id'],
'unique_item_id': inv_item['unique_item_id'],
'item_id': inv_item['item_id'],
'item_def': item_def,
'armor_value': armor_value
})
if not equipped_armor:
return 0, []
# Calculate damage absorbed by armor (total armor reduces damage)
armor_absorbed = min(damage_taken // 2, total_armor) # Armor absorbs up to half the damage
# Calculate durability loss for each armor piece
# Balanced formula: armor should last many combats (10-20+ hits for low tier)
base_reduction_rate = 0.1 # Reduced from 0.5 to make armor more durable
broken_armor = []
for armor in equipped_armor:
# Each piece takes durability loss proportional to its armor value
proportion = armor['armor_value'] / total_armor if total_armor > 0 else 0
# Formula: durability_loss = (damage_taken * proportion / armor_value) * base_rate
# This means higher armor value = less durability loss per hit
# With base_rate = 0.1, a 5 armor piece taking 10 damage loses ~0.2 durability per hit
durability_loss = max(1, int((damage_taken * proportion / max(armor['armor_value'], 1)) * base_reduction_rate * 10))
# Get current durability
unique_item = await db.get_unique_item(armor['unique_item_id'])
if unique_item:
current_durability = unique_item.get('durability', 0)
new_durability = max(0, current_durability - durability_loss)
await db.update_unique_item(armor['unique_item_id'], durability=new_durability)
# If armor broke, unequip and remove from inventory
if new_durability <= 0:
await db.unequip_item(player_id, armor['slot'])
await db.remove_inventory_row(armor['inv_item_id'])
broken_armor.append({
'name': armor['item_def'].name,
'emoji': armor['item_def'].emoji,
'slot': armor['slot']
})
return armor_absorbed, broken_armor
async def consume_tool_durability(user_id: int, tools: list, inventory: list) -> tuple:
"""
Consume durability from required tools.
Returns: (success, error_message, consumed_tools_info)
"""
consumed_tools = []
tools_map = {}
# Build map of available tools with durability
for inv_item in inventory:
if inv_item.get('unique_item_id'):
unique_item = await db.get_unique_item(inv_item['unique_item_id'])
if unique_item:
item_id = inv_item['item_id']
durability = unique_item.get('durability', 0)
if item_id not in tools_map:
tools_map[item_id] = []
tools_map[item_id].append({
'inventory_id': inv_item['id'],
'unique_item_id': inv_item['unique_item_id'],
'durability': durability,
'max_durability': unique_item.get('max_durability', 100)
})
# Check and consume tools
for tool_req in tools:
tool_id = tool_req['item_id']
durability_cost = tool_req['durability_cost']
if tool_id not in tools_map or not tools_map[tool_id]:
tool_def = ITEMS_MANAGER.items.get(tool_id)
tool_name = tool_def.name if tool_def else tool_id
return False, f"Missing required tool: {tool_name}", []
# Find tool with enough durability
tool_found = None
for tool in tools_map[tool_id]:
if tool['durability'] >= durability_cost:
tool_found = tool
break
if not tool_found:
tool_def = ITEMS_MANAGER.items.get(tool_id)
tool_name = tool_def.name if tool_def else tool_id
return False, f"Tool {tool_name} doesn't have enough durability (need {durability_cost})", []
# Consume durability
new_durability = tool_found['durability'] - durability_cost
await db.update_unique_item(tool_found['unique_item_id'], durability=new_durability)
# If tool breaks, remove from inventory
if new_durability <= 0:
await db.remove_inventory_row(tool_found['inventory_id'])
tool_def = ITEMS_MANAGER.items.get(tool_id)
consumed_tools.append({
'item_id': tool_id,
'name': tool_def.name if tool_def else tool_id,
'durability_cost': durability_cost,
'broke': new_durability <= 0
})
return True, "", consumed_tools
@app.get("/api/game/craftable")
async def get_craftable_items(current_user: dict = Depends(get_current_user)):
"""Get all craftable items with material requirements and availability"""
try:
player = await db.get_player_by_id(current_user['id'])
if not player:
raise HTTPException(status_code=404, detail="Player not found")
# Get player's inventory with quantities
inventory = await db.get_inventory(current_user['id'])
inventory_counts = {}
for inv_item in inventory:
item_id = inv_item['item_id']
quantity = inv_item.get('quantity', 1)
inventory_counts[item_id] = inventory_counts.get(item_id, 0) + quantity
craftable_items = []
for item_id, item_def in ITEMS_MANAGER.items.items():
if not getattr(item_def, 'craftable', False):
continue
craft_materials = getattr(item_def, 'craft_materials', [])
if not craft_materials:
continue
# Check material availability
materials_info = []
can_craft = True
for material in craft_materials:
mat_item_id = material['item_id']
required = material['quantity']
available = inventory_counts.get(mat_item_id, 0)
mat_item_def = ITEMS_MANAGER.items.get(mat_item_id)
materials_info.append({
'item_id': mat_item_id,
'name': mat_item_def.name if mat_item_def else mat_item_id,
'emoji': mat_item_def.emoji if mat_item_def else '📦',
'required': required,
'available': available,
'has_enough': available >= required
})
if available < required:
can_craft = False
# Check tool requirements
craft_tools = getattr(item_def, 'craft_tools', [])
tools_info = []
for tool_req in craft_tools:
tool_id = tool_req['item_id']
durability_cost = tool_req['durability_cost']
tool_def = ITEMS_MANAGER.items.get(tool_id)
# Check if player has this tool
has_tool = False
tool_durability = 0
for inv_item in inventory:
if inv_item['item_id'] == tool_id and inv_item.get('unique_item_id'):
unique = await db.get_unique_item(inv_item['unique_item_id'])
if unique and unique.get('durability', 0) >= durability_cost:
has_tool = True
tool_durability = unique.get('durability', 0)
break
tools_info.append({
'item_id': tool_id,
'name': tool_def.name if tool_def else tool_id,
'emoji': tool_def.emoji if tool_def else '🔧',
'durability_cost': durability_cost,
'has_tool': has_tool,
'tool_durability': tool_durability
})
if not has_tool:
can_craft = False
# Check level requirement
craft_level = getattr(item_def, 'craft_level', 1)
player_level = player.get('level', 1)
meets_level = player_level >= craft_level
# Don't show recipes above player level
if player_level < craft_level:
continue
if not meets_level:
can_craft = False
craftable_items.append({
'item_id': item_id,
'name': item_def.name,
'emoji': item_def.emoji,
'description': item_def.description,
'tier': getattr(item_def, 'tier', 1),
'type': item_def.type,
'category': item_def.type, # Add category for filtering
'slot': getattr(item_def, 'slot', None),
'materials': materials_info,
'tools': tools_info,
'craft_level': craft_level,
'meets_level': meets_level,
'uncraftable': getattr(item_def, 'uncraftable', False),
'can_craft': can_craft
})
# Sort: craftable items first, then by tier, then by name
craftable_items.sort(key=lambda x: (not x['can_craft'], x['tier'], x['name']))
return {'craftable_items': craftable_items}
except Exception as e:
print(f"Error getting craftable items: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
class CraftItemRequest(BaseModel):
item_id: str
@app.post("/api/game/craft_item")
async def craft_item(request: CraftItemRequest, current_user: dict = Depends(get_current_user)):
"""Craft an item, consuming materials and creating item with random stats for unique items"""
try:
player = await db.get_player_by_id(current_user['id'])
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location_id = player['location_id']
location = LOCATIONS.get(location_id)
# Check if player is at a workbench
if not location or 'workbench' not in getattr(location, 'tags', []):
raise HTTPException(status_code=400, detail="You must be at a workbench to craft items")
# Get item definition
item_def = ITEMS_MANAGER.items.get(request.item_id)
if not item_def:
raise HTTPException(status_code=404, detail="Item not found")
if not getattr(item_def, 'craftable', False):
raise HTTPException(status_code=400, detail="This item cannot be crafted")
# Check level requirement
craft_level = getattr(item_def, 'craft_level', 1)
player_level = player.get('level', 1)
if player_level < craft_level:
raise HTTPException(status_code=400, detail=f"You need to be level {craft_level} to craft this item (you are level {player_level})")
craft_materials = getattr(item_def, 'craft_materials', [])
if not craft_materials:
raise HTTPException(status_code=400, detail="No crafting recipe found")
# Check if player has all materials
inventory = await db.get_inventory(current_user['id'])
inventory_counts = {}
inventory_items_map = {}
for inv_item in inventory:
item_id = inv_item['item_id']
quantity = inv_item.get('quantity', 1)
inventory_counts[item_id] = inventory_counts.get(item_id, 0) + quantity
if item_id not in inventory_items_map:
inventory_items_map[item_id] = []
inventory_items_map[item_id].append(inv_item)
# Check tools requirement
craft_tools = getattr(item_def, 'craft_tools', [])
if craft_tools:
success, error_msg, tools_consumed = await consume_tool_durability(current_user['id'], craft_tools, inventory)
if not success:
raise HTTPException(status_code=400, detail=error_msg)
else:
tools_consumed = []
# Verify all materials are available
for material in craft_materials:
required = material['quantity']
available = inventory_counts.get(material['item_id'], 0)
if available < required:
raise HTTPException(
status_code=400,
detail=f"Not enough {material['item_id']}. Need {required}, have {available}"
)
# Consume materials
materials_used = []
for material in craft_materials:
item_id = material['item_id']
quantity_needed = material['quantity']
items_of_type = inventory_items_map[item_id]
for inv_item in items_of_type:
if quantity_needed <= 0:
break
inv_quantity = inv_item.get('quantity', 1)
to_remove = min(quantity_needed, inv_quantity)
if inv_quantity > to_remove:
# Update quantity
await db.update_inventory_item(
inv_item['id'],
quantity=inv_quantity - to_remove
)
else:
# Remove entire stack - use item_id string, not inventory row id
await db.remove_item_from_inventory(current_user['id'], item_id, to_remove)
quantity_needed -= to_remove
mat_item_def = ITEMS_MANAGER.items.get(item_id)
materials_used.append({
'item_id': item_id,
'name': mat_item_def.name if mat_item_def else item_id,
'quantity': material['quantity']
})
# Generate random stats for unique items
import random
created_item = None
if hasattr(item_def, 'durability') and item_def.durability:
# This is a unique item - generate random stats
base_durability = item_def.durability
# Random durability: 90-110% of base
random_durability = int(base_durability * random.uniform(0.9, 1.1))
# Generate tier based on durability roll
durability_percent = (random_durability / base_durability)
if durability_percent >= 1.08:
tier = 5 # Gold
elif durability_percent >= 1.04:
tier = 4 # Purple
elif durability_percent >= 1.0:
tier = 3 # Blue
elif durability_percent >= 0.96:
tier = 2 # Green
else:
tier = 1 # White
# Generate random stats if item has stats
random_stats = {}
if hasattr(item_def, 'stats') and item_def.stats:
for stat_key, stat_value in item_def.stats.items():
if isinstance(stat_value, (int, float)):
# Random stat: 90-110% of base
random_stats[stat_key] = int(stat_value * random.uniform(0.9, 1.1))
else:
random_stats[stat_key] = stat_value
# Create unique item in database
unique_item_id = await db.create_unique_item(
item_id=request.item_id,
durability=random_durability,
max_durability=random_durability,
tier=tier,
unique_stats=random_stats
)
# Add to inventory
await db.add_item_to_inventory(
player_id=current_user['id'],
item_id=request.item_id,
quantity=1,
unique_item_id=unique_item_id
)
created_item = {
'item_id': request.item_id,
'name': item_def.name,
'emoji': item_def.emoji,
'tier': tier,
'durability': random_durability,
'max_durability': random_durability,
'stats': random_stats,
'unique': True
}
else:
# Stackable item - just add to inventory
await db.add_item_to_inventory(
player_id=current_user['id'],
item_id=request.item_id,
quantity=1
)
created_item = {
'item_id': request.item_id,
'name': item_def.name,
'emoji': item_def.emoji,
'tier': getattr(item_def, 'tier', 1),
'unique': False
}
return {
'success': True,
'message': f"Successfully crafted {item_def.name}!",
'item': created_item,
'materials_consumed': materials_used,
'tools_consumed': tools_consumed
}
except Exception as e:
print(f"Error crafting item: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
class UncraftItemRequest(BaseModel):
inventory_id: int
@app.post("/api/game/uncraft_item")
async def uncraft_item(request: UncraftItemRequest, current_user: dict = Depends(get_current_user)):
"""Uncraft an item, returning materials with a chance of loss"""
try:
player = await db.get_player_by_id(current_user['id'])
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location_id = player['location_id']
location = LOCATIONS.get(location_id)
# Check if player is at a workbench
if not location or 'workbench' not in getattr(location, 'tags', []):
raise HTTPException(status_code=400, detail="You must be at a workbench to uncraft items")
# Get inventory item
inventory = await db.get_inventory(current_user['id'])
inv_item = None
for item in inventory:
if item['id'] == request.inventory_id:
inv_item = item
break
if not inv_item:
raise HTTPException(status_code=404, detail="Item not found in inventory")
# Get item definition
item_def = ITEMS_MANAGER.items.get(inv_item['item_id'])
if not item_def:
raise HTTPException(status_code=404, detail="Item definition not found")
if not getattr(item_def, 'uncraftable', False):
raise HTTPException(status_code=400, detail="This item cannot be uncrafted")
uncraft_yield = getattr(item_def, 'uncraft_yield', [])
if not uncraft_yield:
raise HTTPException(status_code=400, detail="No uncraft recipe found")
# Check tools requirement
uncraft_tools = getattr(item_def, 'uncraft_tools', [])
if uncraft_tools:
success, error_msg, tools_consumed = await consume_tool_durability(current_user['id'], uncraft_tools, inventory)
if not success:
raise HTTPException(status_code=400, detail=error_msg)
else:
tools_consumed = []
# Remove the item from inventory
# Use remove_inventory_row since we have the inventory ID
await db.remove_inventory_row(inv_item['id'])
# Calculate durability ratio for yield reduction
durability_ratio = 1.0 # Default: full yield
if inv_item.get('unique_item_id'):
unique_item = await db.get_unique_item(inv_item['unique_item_id'])
if unique_item:
current_durability = unique_item.get('durability', 0)
max_durability = unique_item.get('max_durability', 1)
if max_durability > 0:
durability_ratio = current_durability / max_durability
# Calculate materials with loss chance and durability reduction
import random
loss_chance = getattr(item_def, 'uncraft_loss_chance', 0.3)
materials_yielded = []
materials_lost = []
for material in uncraft_yield:
# Apply durability reduction first
base_quantity = material['quantity']
adjusted_quantity = int(base_quantity * durability_ratio)
# If durability is too low (< 10%), yield nothing for this material
if durability_ratio < 0.1 or adjusted_quantity <= 0:
mat_def = ITEMS_MANAGER.items.get(material['item_id'])
materials_lost.append({
'item_id': material['item_id'],
'name': mat_def.name if mat_def else material['item_id'],
'quantity': base_quantity,
'reason': 'durability_too_low'
})
continue
# Roll for each material separately with loss chance
if random.random() < loss_chance:
# Lost this material
mat_def = ITEMS_MANAGER.items.get(material['item_id'])
materials_lost.append({
'item_id': material['item_id'],
'name': mat_def.name if mat_def else material['item_id'],
'quantity': adjusted_quantity,
'reason': 'random_loss'
})
else:
# Yield this material
await db.add_item_to_inventory(
player_id=current_user['id'],
item_id=material['item_id'],
quantity=adjusted_quantity
)
mat_def = ITEMS_MANAGER.items.get(material['item_id'])
materials_yielded.append({
'item_id': material['item_id'],
'name': mat_def.name if mat_def else material['item_id'],
'emoji': mat_def.emoji if mat_def else '📦',
'quantity': adjusted_quantity
})
message = f"Uncrafted {item_def.name}!"
if durability_ratio < 1.0:
message += f" (Item condition reduced yield by {int((1 - durability_ratio) * 100)}%)"
if materials_lost:
message += f" Lost {len(materials_lost)} material type(s) in the process."
return {
'success': True,
'message': message,
'item_name': item_def.name,
'materials_yielded': materials_yielded,
'materials_lost': materials_lost,
'tools_consumed': tools_consumed,
'loss_chance': loss_chance,
'durability_ratio': round(durability_ratio, 2)
}
except Exception as e:
print(f"Error uncrafting item: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/game/repairable")
async def get_repairable_items(current_user: dict = Depends(get_current_user)):
"""Get all repairable items from inventory and equipped slots"""
try:
player = await db.get_player_by_id(current_user['id'])
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location_id = player['location_id']
location = LOCATIONS.get(location_id)
# Check if player is at a repair station
if not location or 'repair_station' not in getattr(location, 'tags', []):
raise HTTPException(status_code=400, detail="You must be at a repair station to repair items")
repairable_items = []
# Check inventory items
inventory = await db.get_inventory(current_user['id'])
inventory_counts = {}
for inv_item in inventory:
item_id = inv_item['item_id']
quantity = inv_item.get('quantity', 1)
inventory_counts[item_id] = inventory_counts.get(item_id, 0) + quantity
for inv_item in inventory:
if inv_item.get('unique_item_id'):
unique_item = await db.get_unique_item(inv_item['unique_item_id'])
if not unique_item:
continue
item_def = ITEMS_MANAGER.items.get(inv_item['item_id'])
if not item_def or not getattr(item_def, 'repairable', False):
continue
current_durability = unique_item.get('durability', 0)
max_durability = unique_item.get('max_durability', 100)
needs_repair = current_durability < max_durability
# Check materials availability
repair_materials = getattr(item_def, 'repair_materials', [])
materials_info = []
has_materials = True
for material in repair_materials:
mat_item_def = ITEMS_MANAGER.items.get(material['item_id'])
available = inventory_counts.get(material['item_id'], 0)
required = material['quantity']
materials_info.append({
'item_id': material['item_id'],
'name': mat_item_def.name if mat_item_def else material['item_id'],
'emoji': mat_item_def.emoji if mat_item_def else '📦',
'quantity': required,
'available': available,
'has_enough': available >= required
})
if available < required:
has_materials = False
# Check tools availability
repair_tools = getattr(item_def, 'repair_tools', [])
tools_info = []
has_tools = True
for tool_req in repair_tools:
tool_id = tool_req['item_id']
durability_cost = tool_req['durability_cost']
tool_def = ITEMS_MANAGER.items.get(tool_id)
# Check if player has this tool
tool_found = False
tool_durability = 0
for check_item in inventory:
if check_item['item_id'] == tool_id and check_item.get('unique_item_id'):
unique = await db.get_unique_item(check_item['unique_item_id'])
if unique and unique.get('durability', 0) >= durability_cost:
tool_found = True
tool_durability = unique.get('durability', 0)
break
tools_info.append({
'item_id': tool_id,
'name': tool_def.name if tool_def else tool_id,
'emoji': tool_def.emoji if tool_def else '🔧',
'durability_cost': durability_cost,
'has_tool': tool_found,
'tool_durability': tool_durability
})
if not tool_found:
has_tools = False
can_repair = needs_repair and has_materials and has_tools
repairable_items.append({
'inventory_id': inv_item['id'],
'unique_item_id': inv_item['unique_item_id'],
'item_id': inv_item['item_id'],
'name': item_def.name,
'emoji': item_def.emoji,
'tier': unique_item.get('tier', 1),
'current_durability': current_durability,
'max_durability': max_durability,
'durability_percent': int((current_durability / max_durability) * 100),
'repair_percentage': getattr(item_def, 'repair_percentage', 25),
'needs_repair': needs_repair,
'materials': materials_info,
'tools': tools_info,
'can_repair': can_repair,
'location': 'inventory'
})
# Check equipped items
equipment_slots = ['head', 'weapon', 'torso', 'backpack', 'legs', 'feet']
for slot in equipment_slots:
equipped_item_id = player.get(f'equipped_{slot}')
if not equipped_item_id:
continue
unique_item = await db.get_unique_item(equipped_item_id)
if not unique_item:
continue
item_id = unique_item['item_id']
item_def = ITEMS_MANAGER.items.get(item_id)
if not item_def or not getattr(item_def, 'repairable', False):
continue
current_durability = unique_item.get('durability', 0)
max_durability = unique_item.get('max_durability', 100)
needs_repair = current_durability < max_durability
# Check materials availability
repair_materials = getattr(item_def, 'repair_materials', [])
materials_info = []
has_materials = True
for material in repair_materials:
mat_item_def = ITEMS_MANAGER.items.get(material['item_id'])
available = inventory_counts.get(material['item_id'], 0)
required = material['quantity']
materials_info.append({
'item_id': material['item_id'],
'name': mat_item_def.name if mat_item_def else material['item_id'],
'emoji': mat_item_def.emoji if mat_item_def else '📦',
'quantity': required,
'available': available,
'has_enough': available >= required
})
if available < required:
has_materials = False
# Check tools availability
repair_tools = getattr(item_def, 'repair_tools', [])
tools_info = []
has_tools = True
for tool_req in repair_tools:
tool_id = tool_req['item_id']
durability_cost = tool_req['durability_cost']
tool_def = ITEMS_MANAGER.items.get(tool_id)
# Check if player has this tool
tool_found = False
tool_durability = 0
for check_item in inventory:
if check_item['item_id'] == tool_id and check_item.get('unique_item_id'):
unique = await db.get_unique_item(check_item['unique_item_id'])
if unique and unique.get('durability', 0) >= durability_cost:
tool_found = True
tool_durability = unique.get('durability', 0)
break
tools_info.append({
'item_id': tool_id,
'name': tool_def.name if tool_def else tool_id,
'emoji': tool_def.emoji if tool_def else '🔧',
'durability_cost': durability_cost,
'has_tool': tool_found,
'tool_durability': tool_durability
})
if not tool_found:
has_tools = False
can_repair = needs_repair and has_materials and has_tools
repairable_items.append({
'unique_item_id': equipped_item_id,
'item_id': item_id,
'name': item_def.name,
'emoji': item_def.emoji,
'tier': unique_item.get('tier', 1),
'current_durability': current_durability,
'max_durability': max_durability,
'durability_percent': int((current_durability / max_durability) * 100),
'repair_percentage': getattr(item_def, 'repair_percentage', 25),
'needs_repair': needs_repair,
'materials': materials_info,
'tools': tools_info,
'can_repair': can_repair,
'location': 'equipped',
'slot': slot
})
# Sort: repairable items first (can_repair=True), then by durability percent (lowest first), then by name
repairable_items.sort(key=lambda x: (not x['can_repair'], -x['durability_percent'], x['name']))
return {'repairable_items': repairable_items}
except Exception as e:
print(f"Error getting repairable items: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/game/salvageable")
async def get_salvageable_items(current_user: dict = Depends(get_current_user)):
"""Get list of salvageable (uncraftable) items from inventory with their unique stats"""
try:
player = await db.get_player_by_id(current_user['id'])
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location_id = player['location_id']
location = LOCATIONS.get(location_id)
# Check if player is at a workbench
if not location or 'workbench' not in getattr(location, 'tags', []):
return {'salvageable_items': [], 'at_workbench': False}
# Get inventory
inventory = await db.get_inventory(current_user['id'])
salvageable_items = []
for inv_item in inventory:
item_id = inv_item['item_id']
item_def = ITEMS_MANAGER.items.get(item_id)
if not item_def or not getattr(item_def, 'uncraftable', False):
continue
# Get unique item details if it exists
unique_item_data = None
if inv_item.get('unique_item_id'):
unique_item = await db.get_unique_item(inv_item['unique_item_id'])
if unique_item:
current_durability = unique_item.get('durability', 0)
max_durability = unique_item.get('max_durability', 1)
durability_percent = int((current_durability / max_durability) * 100) if max_durability > 0 else 0
# Get item stats from definition merged with unique stats
item_stats = {}
if item_def.stats:
item_stats = dict(item_def.stats)
if unique_item.get('unique_stats'):
item_stats.update(unique_item.get('unique_stats'))
unique_item_data = {
'current_durability': current_durability,
'max_durability': max_durability,
'durability_percent': durability_percent,
'tier': unique_item.get('tier', 1),
'unique_stats': item_stats # Includes both base stats and unique overrides
}
# Get uncraft yield
uncraft_yield = getattr(item_def, 'uncraft_yield', [])
yield_info = []
for material in uncraft_yield:
mat_def = ITEMS_MANAGER.items.get(material['item_id'])
yield_info.append({
'item_id': material['item_id'],
'name': mat_def.name if mat_def else material['item_id'],
'emoji': mat_def.emoji if mat_def else '📦',
'quantity': material['quantity']
})
salvageable_items.append({
'inventory_id': inv_item['id'],
'unique_item_id': inv_item.get('unique_item_id'),
'item_id': item_id,
'name': item_def.name,
'emoji': item_def.emoji,
'tier': getattr(item_def, 'tier', 1),
'quantity': inv_item['quantity'],
'unique_item_data': unique_item_data,
'base_yield': yield_info,
'loss_chance': getattr(item_def, 'uncraft_loss_chance', 0.3)
})
return {
'salvageable_items': salvageable_items,
'at_workbench': True
}
except Exception as e:
print(f"Error getting salvageable items: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
class LootCorpseRequest(BaseModel):
corpse_id: str
item_index: Optional[int] = None # Index of specific item to loot (None = all)
@app.get("/api/game/corpse/{corpse_id}")
async def get_corpse_details(
corpse_id: str,
current_user: dict = Depends(get_current_user)
):
"""Get detailed information about a corpse's lootable items"""
import json
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
# Parse corpse ID
corpse_type, corpse_db_id = corpse_id.split('_', 1)
corpse_db_id = int(corpse_db_id)
player = await db.get_player_by_id(current_user['id'])
# Get player's inventory to check available tools
inventory = await db.get_inventory(player['id'])
available_tools = set([item['item_id'] for item in inventory])
if corpse_type == 'npc':
# Get NPC corpse
corpse = await db.get_npc_corpse(corpse_db_id)
if not corpse:
raise HTTPException(status_code=404, detail="Corpse not found")
if corpse['location_id'] != player['location_id']:
raise HTTPException(status_code=400, detail="Corpse not at this location")
# Parse remaining loot
loot_remaining = json.loads(corpse['loot_remaining']) if corpse['loot_remaining'] else []
# Format loot items with tool requirements
loot_items = []
for idx, loot_item in enumerate(loot_remaining):
required_tool = loot_item.get('required_tool')
item_def = ITEMS_MANAGER.get_item(loot_item['item_id'])
has_tool = required_tool is None or required_tool in available_tools
tool_def = ITEMS_MANAGER.get_item(required_tool) if required_tool else None
loot_items.append({
'index': idx,
'item_id': loot_item['item_id'],
'item_name': item_def.name if item_def else loot_item['item_id'],
'emoji': item_def.emoji if item_def else '📦',
'quantity_min': loot_item['quantity_min'],
'quantity_max': loot_item['quantity_max'],
'required_tool': required_tool,
'required_tool_name': tool_def.name if tool_def else required_tool,
'has_tool': has_tool,
'can_loot': has_tool
})
npc_def = NPCS.get(corpse['npc_id'])
return {
'corpse_id': corpse_id,
'type': 'npc',
'name': f"{npc_def.name if npc_def else corpse['npc_id']} Corpse",
'loot_items': loot_items,
'total_items': len(loot_items)
}
elif corpse_type == 'player':
# Get player corpse
corpse = await db.get_player_corpse(corpse_db_id)
if not corpse:
raise HTTPException(status_code=404, detail="Corpse not found")
if corpse['location_id'] != player['location_id']:
raise HTTPException(status_code=400, detail="Corpse not at this location")
# Parse items
items = json.loads(corpse['items']) if corpse['items'] else []
# Format items (player corpses don't require tools)
loot_items = []
for idx, item in enumerate(items):
item_def = ITEMS_MANAGER.get_item(item['item_id'])
loot_items.append({
'index': idx,
'item_id': item['item_id'],
'item_name': item_def.name if item_def else item['item_id'],
'emoji': item_def.emoji if item_def else '📦',
'quantity_min': item['quantity'],
'quantity_max': item['quantity'],
'required_tool': None,
'required_tool_name': None,
'has_tool': True,
'can_loot': True
})
return {
'corpse_id': corpse_id,
'type': 'player',
'name': f"{corpse['player_name']}'s Corpse",
'loot_items': loot_items,
'total_items': len(loot_items)
}
else:
raise HTTPException(status_code=400, detail="Invalid corpse type")
@app.post("/api/game/loot_corpse")
async def loot_corpse(
req: LootCorpseRequest,
current_user: dict = Depends(get_current_user)
):
"""Loot a corpse (NPC or player) - can loot specific item by index or all items"""
import json
import sys
import random
sys.path.insert(0, '/app')
from data.npcs import NPCS
# Parse corpse ID
corpse_type, corpse_db_id = req.corpse_id.split('_', 1)
corpse_db_id = int(corpse_db_id)
player = await db.get_player_by_id(current_user['id'])
# Get player's current capacity
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(player['id'])
if corpse_type == 'npc':
# Get NPC corpse
corpse = await db.get_npc_corpse(corpse_db_id)
if not corpse:
raise HTTPException(status_code=404, detail="Corpse not found")
# Check if player is at the same location
if corpse['location_id'] != player['location_id']:
raise HTTPException(status_code=400, detail="Corpse not at this location")
# Parse remaining loot
loot_remaining = json.loads(corpse['loot_remaining']) if corpse['loot_remaining'] else []
if not loot_remaining:
raise HTTPException(status_code=400, detail="Corpse has already been looted")
# Get player's inventory to check tools
inventory = await db.get_inventory(player['id'])
available_tools = set([item['item_id'] for item in inventory])
looted_items = []
remaining_loot = []
dropped_items = [] # Items that couldn't fit in inventory
tools_consumed = [] # Track tool durability consumed
# If specific item index provided, loot only that item
if req.item_index is not None:
if req.item_index < 0 or req.item_index >= len(loot_remaining):
raise HTTPException(status_code=400, detail="Invalid item index")
loot_item = loot_remaining[req.item_index]
required_tool = loot_item.get('required_tool')
durability_cost = loot_item.get('tool_durability_cost', 5) # Default 5 durability per loot
# Check if player has required tool and consume durability
if required_tool:
# Build tool requirement format for consume_tool_durability
tool_req = [{
'item_id': required_tool,
'durability_cost': durability_cost
}]
success, error_msg, tools_consumed = await consume_tool_durability(player['id'], tool_req, inventory)
if not success:
raise HTTPException(status_code=400, detail=error_msg)
# Determine quantity
quantity = random.randint(loot_item['quantity_min'], loot_item['quantity_max'])
if quantity > 0:
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(loot_item['item_id'])
if item_def:
item_weight = item_def.weight * quantity
item_volume = item_def.volume * quantity
if current_weight + item_weight > max_weight or current_volume + item_volume > max_volume:
# Item doesn't fit - drop it on ground
await db.add_dropped_item(player['location_id'], loot_item['item_id'], quantity)
dropped_items.append({
'item_id': loot_item['item_id'],
'quantity': quantity,
'emoji': item_def.emoji
})
else:
# Item fits - add to inventory
await db.add_item_to_inventory(player['id'], loot_item['item_id'], quantity)
current_weight += item_weight
current_volume += item_volume
looted_items.append({
'item_id': loot_item['item_id'],
'quantity': quantity
})
# Remove this item from loot, keep others
remaining_loot = [item for i, item in enumerate(loot_remaining) if i != req.item_index]
else:
# Loot all items that don't require tools or player has tools for
for loot_item in loot_remaining:
required_tool = loot_item.get('required_tool')
durability_cost = loot_item.get('tool_durability_cost', 5)
# If tool is required, consume durability
can_loot = True
if required_tool:
tool_req = [{
'item_id': required_tool,
'durability_cost': durability_cost
}]
# Check if player has tool with enough durability
success, error_msg, consumed_info = await consume_tool_durability(player['id'], tool_req, inventory)
if success:
# Tool consumed successfully
tools_consumed.extend(consumed_info)
# Refresh inventory after tool consumption
inventory = await db.get_inventory(player['id'])
else:
# Can't loot this item
can_loot = False
if can_loot:
# Can loot this item
quantity = random.randint(loot_item['quantity_min'], loot_item['quantity_max'])
if quantity > 0:
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(loot_item['item_id'])
if item_def:
item_weight = item_def.weight * quantity
item_volume = item_def.volume * quantity
if current_weight + item_weight > max_weight or current_volume + item_volume > max_volume:
# Item doesn't fit - drop it on ground
await db.add_dropped_item(player['location_id'], loot_item['item_id'], quantity)
dropped_items.append({
'item_id': loot_item['item_id'],
'quantity': quantity,
'emoji': item_def.emoji
})
else:
# Item fits - add to inventory
await db.add_item_to_inventory(player['id'], loot_item['item_id'], quantity)
current_weight += item_weight
current_volume += item_volume
looted_items.append({
'item_id': loot_item['item_id'],
'quantity': quantity
})
else:
# Keep in corpse
remaining_loot.append(loot_item)
# Update or remove corpse
if remaining_loot:
await db.update_npc_corpse(corpse_db_id, json.dumps(remaining_loot))
else:
await db.remove_npc_corpse(corpse_db_id)
# Build response message
message_parts = []
for item in looted_items:
item_def = ITEMS_MANAGER.get_item(item['item_id'])
item_name = item_def.name if item_def else item['item_id']
message_parts.append(f"{item_def.emoji if item_def else ''} {item_name} x{item['quantity']}")
dropped_parts = []
for item in dropped_items:
item_def = ITEMS_MANAGER.get_item(item['item_id'])
item_name = item_def.name if item_def else item['item_id']
dropped_parts.append(f"{item.get('emoji', '📦')} {item_name} x{item['quantity']}")
message = ""
if message_parts:
message = "Looted: " + ", ".join(message_parts)
if dropped_parts:
if message:
message += "\n"
message += "⚠️ Backpack full! Dropped on ground: " + ", ".join(dropped_parts)
if not message_parts and not dropped_parts:
message = "Nothing could be looted"
if remaining_loot and req.item_index is None:
message += f"\n{len(remaining_loot)} item(s) require tools to extract"
return {
"success": True,
"message": message,
"looted_items": looted_items,
"dropped_items": dropped_items,
"tools_consumed": tools_consumed,
"corpse_empty": len(remaining_loot) == 0,
"remaining_count": len(remaining_loot)
}
elif corpse_type == 'player':
# Get player corpse
corpse = await db.get_player_corpse(corpse_db_id)
if not corpse:
raise HTTPException(status_code=404, detail="Corpse not found")
if corpse['location_id'] != player['location_id']:
raise HTTPException(status_code=400, detail="Corpse not at this location")
# Parse items
items = json.loads(corpse['items']) if corpse['items'] else []
if not items:
raise HTTPException(status_code=400, detail="Corpse has no items")
looted_items = []
remaining_items = []
dropped_items = [] # Items that couldn't fit in inventory
# If specific item index provided, loot only that item
if req.item_index is not None:
if req.item_index < 0 or req.item_index >= len(items):
raise HTTPException(status_code=400, detail="Invalid item index")
item = items[req.item_index]
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(item['item_id'])
if item_def:
item_weight = item_def.weight * item['quantity']
item_volume = item_def.volume * item['quantity']
if current_weight + item_weight > max_weight or current_volume + item_volume > max_volume:
# Item doesn't fit - drop it on ground
await db.add_dropped_item(player['location_id'], item['item_id'], item['quantity'])
dropped_items.append({
'item_id': item['item_id'],
'quantity': item['quantity'],
'emoji': item_def.emoji
})
else:
# Item fits - add to inventory
await db.add_item_to_inventory(player['id'], item['item_id'], item['quantity'])
looted_items.append(item)
# Remove this item, keep others
remaining_items = [it for i, it in enumerate(items) if i != req.item_index]
else:
# Loot all items
for item in items:
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(item['item_id'])
if item_def:
item_weight = item_def.weight * item['quantity']
item_volume = item_def.volume * item['quantity']
if current_weight + item_weight > max_weight or current_volume + item_volume > max_volume:
# Item doesn't fit - drop it on ground
await db.add_dropped_item(player['location_id'], item['item_id'], item['quantity'])
dropped_items.append({
'item_id': item['item_id'],
'quantity': item['quantity'],
'emoji': item_def.emoji
})
else:
# Item fits - add to inventory
await db.add_item_to_inventory(player['id'], item['item_id'], item['quantity'])
current_weight += item_weight
current_volume += item_volume
looted_items.append(item)
# Update or remove corpse
if remaining_items:
await db.update_player_corpse(corpse_db_id, json.dumps(remaining_items))
else:
await db.remove_player_corpse(corpse_db_id)
# Build message
message_parts = []
for item in looted_items:
item_def = ITEMS_MANAGER.get_item(item['item_id'])
item_name = item_def.name if item_def else item['item_id']
message_parts.append(f"{item_def.emoji if item_def else ''} {item_name} x{item['quantity']}")
dropped_parts = []
for item in dropped_items:
item_def = ITEMS_MANAGER.get_item(item['item_id'])
item_name = item_def.name if item_def else item['item_id']
dropped_parts.append(f"{item.get('emoji', '📦')} {item_name} x{item['quantity']}")
message = ""
if message_parts:
message = "Looted: " + ", ".join(message_parts)
if dropped_parts:
if message:
message += "\n"
message += "⚠️ Backpack full! Dropped on ground: " + ", ".join(dropped_parts)
if not message_parts and not dropped_parts:
message = "Nothing could be looted"
return {
"success": True,
"message": message,
"looted_items": looted_items,
"dropped_items": dropped_items,
"corpse_empty": len(remaining_items) == 0,
"remaining_count": len(remaining_items)
}
else:
raise HTTPException(status_code=400, detail="Invalid corpse type")
# ============================================================================
# Combat Endpoints
# ============================================================================
@app.get("/api/game/combat")
async def get_combat_status(current_user: dict = Depends(get_current_user)):
"""Get current combat status"""
combat = await db.get_active_combat(current_user['id'])
if not combat:
return {"in_combat": False}
# Load NPC data from npcs.json
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
npc_def = NPCS.get(combat['npc_id'])
return {
"in_combat": True,
"combat": {
"npc_id": combat['npc_id'],
"npc_name": npc_def.name if npc_def else combat['npc_id'].replace('_', ' ').title(),
"npc_hp": combat['npc_hp'],
"npc_max_hp": combat['npc_max_hp'],
"npc_image": f"/images/npcs/{combat['npc_id']}.png" if npc_def else None,
"turn": combat['turn'],
"round": combat.get('round', 1)
}
}
@app.post("/api/game/combat/initiate")
async def initiate_combat(
req: InitiateCombatRequest,
current_user: dict = Depends(get_current_user)
):
"""Start combat with a wandering enemy"""
import random
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
# Check if already in combat
existing_combat = await db.get_active_combat(current_user['id'])
if existing_combat:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Already in combat"
)
# Get enemy from wandering_enemies table
async with db.DatabaseSession() as session:
from sqlalchemy import select
stmt = select(db.wandering_enemies).where(db.wandering_enemies.c.id == req.enemy_id)
result = await session.execute(stmt)
enemy = result.fetchone()
if not enemy:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Enemy not found"
)
# Get NPC definition
npc_def = NPCS.get(enemy.npc_id)
if not npc_def:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="NPC definition not found"
)
# Randomize HP
npc_hp = random.randint(npc_def.hp_min, npc_def.hp_max)
# Create combat
combat = await db.create_combat(
player_id=current_user['id'],
npc_id=enemy.npc_id,
npc_hp=npc_hp,
npc_max_hp=npc_hp,
location_id=current_user['location_id'],
from_wandering=True
)
# Remove the wandering enemy from the location
async with db.DatabaseSession() as session:
from sqlalchemy import delete
stmt = delete(db.wandering_enemies).where(db.wandering_enemies.c.id == req.enemy_id)
await session.execute(stmt)
await session.commit()
# Track combat initiation
await db.update_player_statistics(current_user['id'], combats_initiated=1, increment=True)
return {
"success": True,
"message": f"Combat started with {npc_def.name}!",
"combat": {
"npc_id": enemy.npc_id,
"npc_name": npc_def.name,
"npc_hp": npc_hp,
"npc_max_hp": npc_hp,
"npc_image": f"/images/npcs/{enemy.npc_id}.png",
"turn": "player",
"round": 1
}
}
@app.post("/api/game/combat/action")
async def combat_action(
req: CombatActionRequest,
current_user: dict = Depends(get_current_user)
):
"""Perform a combat action"""
import random
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
# Get active combat
combat = await db.get_active_combat(current_user['id'])
if not combat:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not in combat"
)
if combat['turn'] != 'player':
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not your turn"
)
# Get player and NPC data
player = await db.get_player_by_id(current_user['id'])
npc_def = NPCS.get(combat['npc_id'])
result_message = ""
combat_over = False
player_won = False
if req.action == 'attack':
# Calculate player damage
base_damage = 5
strength_bonus = player['strength'] // 2
level_bonus = player['level']
weapon_damage = 0
weapon_effects = {}
weapon_inv_id = None
# Check for equipped weapon
equipment = await db.get_all_equipment(player['id'])
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def and weapon_def.stats:
weapon_damage = random.randint(
weapon_def.stats.get('damage_min', 0),
weapon_def.stats.get('damage_max', 0)
)
weapon_effects = weapon_def.weapon_effects if hasattr(weapon_def, 'weapon_effects') else {}
weapon_inv_id = weapon_slot['item_id']
# Check encumbrance penalty (higher encumbrance = chance to miss)
encumbrance = player.get('encumbrance', 0)
attack_failed = False
if encumbrance > 0:
miss_chance = min(0.3, encumbrance * 0.05) # Max 30% miss chance
if random.random() < miss_chance:
attack_failed = True
variance = random.randint(-2, 2)
damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
if attack_failed:
result_message = f"Your attack misses due to heavy encumbrance! "
new_npc_hp = combat['npc_hp']
else:
# Apply damage to NPC
new_npc_hp = max(0, combat['npc_hp'] - damage)
result_message = f"You attack for {damage} damage! "
# Apply weapon effects
if weapon_effects and 'bleeding' in weapon_effects:
bleeding = weapon_effects['bleeding']
if random.random() < bleeding.get('chance', 0):
# Apply bleeding effect (would need combat effects table, for now just bonus damage)
bleed_damage = bleeding.get('damage', 0)
new_npc_hp = max(0, new_npc_hp - bleed_damage)
result_message += f"💉 Bleeding effect! +{bleed_damage} damage! "
# Decrease weapon durability (from unique_item)
if weapon_inv_id and inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
if new_durability is None:
# Weapon broke (unique_item was deleted, cascades to inventory)
result_message += "\n⚠️ Your weapon broke! "
await db.unequip_item(player['id'], 'weapon')
if new_npc_hp <= 0:
# NPC defeated
result_message += f"{npc_def.name} has been defeated!"
combat_over = True
player_won = True
# Award XP
xp_gained = npc_def.xp_reward
new_xp = player['xp'] + xp_gained
result_message += f"\n+{xp_gained} XP"
await db.update_player(player['id'], xp=new_xp)
# Track kill statistics
await db.update_player_statistics(player['id'], enemies_killed=1, damage_dealt=damage, increment=True)
# Check for level up
level_up_result = await game_logic.check_and_apply_level_up(player['id'])
if level_up_result['leveled_up']:
result_message += f"\n🎉 Level Up! You are now level {level_up_result['new_level']}!"
result_message += f"\n+{level_up_result['levels_gained']} stat point(s) to spend!"
# Create corpse with loot
import json
corpse_loot = npc_def.corpse_loot if hasattr(npc_def, 'corpse_loot') else []
# Convert CorpseLoot objects to dicts
corpse_loot_dicts = []
for loot in corpse_loot:
if hasattr(loot, '__dict__'):
corpse_loot_dicts.append({
'item_id': loot.item_id,
'quantity_min': loot.quantity_min,
'quantity_max': loot.quantity_max,
'required_tool': loot.required_tool
})
else:
corpse_loot_dicts.append(loot)
await db.create_npc_corpse(
npc_id=combat['npc_id'],
location_id=player['location_id'],
loot_remaining=json.dumps(corpse_loot_dicts)
)
await db.end_combat(player['id'])
else:
# NPC's turn
npc_damage = random.randint(npc_def.damage_min, npc_def.damage_max)
if new_npc_hp / combat['npc_max_hp'] < 0.3:
npc_damage = int(npc_damage * 1.5)
# Reduce armor durability and calculate absorbed damage
armor_absorbed, broken_armor = await reduce_armor_durability(player['id'], npc_damage)
actual_damage = max(1, npc_damage - armor_absorbed) # Always at least 1 damage
new_player_hp = max(0, player['hp'] - actual_damage)
result_message += f"\n{npc_def.name} attacks for {npc_damage} damage!"
if armor_absorbed > 0:
result_message += f" (Armor absorbed {armor_absorbed} damage)"
# Report broken armor
if broken_armor:
for armor in broken_armor:
result_message += f"\n💔 Your {armor['emoji']} {armor['name']} broke!"
if new_player_hp <= 0:
result_message += "\nYou have been defeated!"
combat_over = True
await db.update_player(player['id'], hp=0, is_dead=True)
await db.update_player_statistics(player['id'], deaths=1, damage_taken=actual_damage, increment=True)
await db.end_combat(player['id'])
else:
await db.update_player(player['id'], hp=new_player_hp)
await db.update_player_statistics(player['id'], damage_taken=actual_damage, damage_dealt=damage, increment=True)
await db.update_combat(player['id'], {
'npc_hp': new_npc_hp,
'turn': 'player'
})
elif req.action == 'flee':
# 50% chance to flee
if random.random() < 0.5:
result_message = "You successfully fled from combat!"
combat_over = True
player_won = False # Fled, not won
# Track successful flee
await db.update_player_statistics(player['id'], successful_flees=1, increment=True)
# Respawn the enemy back to the location if it came from wandering
if combat.get('from_wandering_enemy'):
# Respawn enemy with current HP at the combat location
import time
despawn_time = time.time() + 300 # 5 minutes
async with db.DatabaseSession() as session:
from sqlalchemy import insert
stmt = insert(db.wandering_enemies).values(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
spawn_timestamp=time.time(),
despawn_timestamp=despawn_time
)
await session.execute(stmt)
await session.commit()
await db.end_combat(player['id'])
else:
# Failed to flee, NPC attacks
npc_damage = random.randint(npc_def.damage_min, npc_def.damage_max)
new_player_hp = max(0, player['hp'] - npc_damage)
result_message = f"Failed to flee! {npc_def.name} attacks for {npc_damage} damage!"
if new_player_hp <= 0:
result_message += "\nYou have been defeated!"
combat_over = True
await db.update_player(player['id'], hp=0, is_dead=True)
await db.update_player_statistics(player['id'], deaths=1, failed_flees=1, damage_taken=npc_damage, increment=True)
# Respawn enemy if from wandering
if combat.get('from_wandering_enemy'):
import time
despawn_time = time.time() + 300
async with db.DatabaseSession() as session:
from sqlalchemy import insert
stmt = insert(db.wandering_enemies).values(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
spawn_timestamp=time.time(),
despawn_timestamp=despawn_time
)
await session.execute(stmt)
await session.commit()
await db.end_combat(player['id'])
else:
# Player survived, update HP and turn back to player
await db.update_player(player['id'], hp=new_player_hp)
await db.update_player_statistics(player['id'], failed_flees=1, damage_taken=npc_damage, increment=True)
await db.update_combat(player['id'], {'turn': 'player'})
# Get updated combat state if not over
updated_combat = None
if not combat_over:
raw_combat = await db.get_active_combat(current_user['id'])
if raw_combat:
updated_combat = {
"npc_id": raw_combat['npc_id'],
"npc_name": npc_def.name,
"npc_hp": raw_combat['npc_hp'],
"npc_max_hp": raw_combat['npc_max_hp'],
"npc_image": f"/images/npcs/{raw_combat['npc_id']}.png",
"turn": raw_combat['turn']
}
return {
"success": True,
"message": result_message,
"combat_over": combat_over,
"player_won": player_won if combat_over else None,
"combat": updated_combat if updated_combat else None
}
# ============================================================================
# PvP Combat Endpoints
# ============================================================================
class PvPCombatInitiateRequest(BaseModel):
target_player_id: int
@app.post("/api/game/pvp/initiate")
async def initiate_pvp_combat(
req: PvPCombatInitiateRequest,
current_user: dict = Depends(get_current_user)
):
"""Initiate PvP combat with another player"""
# Get attacker (current user)
attacker = await db.get_player_by_id(current_user['id'])
if not attacker:
raise HTTPException(status_code=404, detail="Player not found")
# Check if attacker is already in combat
existing_combat = await db.get_active_combat(attacker['id'])
if existing_combat:
raise HTTPException(status_code=400, detail="You are already in PvE combat")
existing_pvp = await db.get_pvp_combat_by_player(attacker['id'])
if existing_pvp:
raise HTTPException(status_code=400, detail="You are already in PvP combat")
# Get defender (target player)
defender = await db.get_player_by_id(req.target_player_id)
if not defender:
raise HTTPException(status_code=404, detail="Target player not found")
# Check if defender is in combat
defender_pve = await db.get_active_combat(defender['id'])
if defender_pve:
raise HTTPException(status_code=400, detail="Target player is in PvE combat")
defender_pvp = await db.get_pvp_combat_by_player(defender['id'])
if defender_pvp:
raise HTTPException(status_code=400, detail="Target player is in PvP combat")
# Check same location
if attacker['location_id'] != defender['location_id']:
raise HTTPException(status_code=400, detail="Target player is not in your location")
# Check danger level (>= 3 required for PvP)
location = LOCATIONS.get(attacker['location_id'])
if not location or location.danger_level < 3:
raise HTTPException(status_code=400, detail="PvP combat is only allowed in dangerous zones (danger level >= 3)")
# Check level difference (+/- 3 levels)
level_diff = abs(attacker['level'] - defender['level'])
if level_diff > 3:
raise HTTPException(
status_code=400,
detail=f"Level difference too large! You can only fight players within 3 levels (target is level {defender['level']})"
)
# Create PvP combat
pvp_combat = await db.create_pvp_combat(
attacker_id=attacker['id'],
defender_id=defender['id'],
location_id=attacker['location_id'],
turn_timeout=300 # 5 minutes
)
# Track PvP combat initiation
await db.update_player_statistics(attacker['id'], pvp_combats_initiated=1, increment=True)
return {
"success": True,
"message": f"You have initiated combat with {defender['username']}! They get the first turn.",
"pvp_combat": pvp_combat
}
@app.get("/api/game/pvp/status")
async def get_pvp_combat_status(current_user: dict = Depends(get_current_user)):
"""Get current PvP combat status"""
pvp_combat = await db.get_pvp_combat_by_player(current_user['id'])
if not pvp_combat:
return {"in_pvp_combat": False, "pvp_combat": None}
# Check if current player has already acknowledged - if so, don't show combat anymore
is_attacker = pvp_combat['attacker_id'] == current_user['id']
if (is_attacker and pvp_combat.get('attacker_acknowledged', False)) or \
(not is_attacker and pvp_combat.get('defender_acknowledged', False)):
return {"in_pvp_combat": False, "pvp_combat": None}
# Get both players' data
attacker = await db.get_player_by_id(pvp_combat['attacker_id'])
defender = await db.get_player_by_id(pvp_combat['defender_id'])
# Determine if current user is attacker or defender
is_attacker = pvp_combat['attacker_id'] == current_user['id']
your_turn = (is_attacker and pvp_combat['turn'] == 'attacker') or \
(not is_attacker and pvp_combat['turn'] == 'defender')
# Calculate time remaining for turn
import time
time_elapsed = time.time() - pvp_combat['turn_started_at']
time_remaining = max(0, pvp_combat['turn_timeout_seconds'] - time_elapsed)
# Auto-advance if time expired
if time_remaining == 0 and your_turn:
# Skip turn
new_turn = 'defender' if is_attacker else 'attacker'
await db.update_pvp_combat(pvp_combat['id'], {
'turn': new_turn,
'turn_started_at': time.time()
})
pvp_combat = await db.get_pvp_combat_by_id(pvp_combat['id'])
your_turn = False
time_remaining = pvp_combat['turn_timeout_seconds']
return {
"in_pvp_combat": True,
"pvp_combat": {
"id": pvp_combat['id'],
"attacker": {
"id": attacker['id'],
"username": attacker['username'],
"level": attacker['level'],
"hp": pvp_combat['attacker_hp'],
"max_hp": attacker['max_hp']
},
"defender": {
"id": defender['id'],
"username": defender['username'],
"level": defender['level'],
"hp": pvp_combat['defender_hp'],
"max_hp": defender['max_hp']
},
"is_attacker": is_attacker,
"your_turn": your_turn,
"current_turn": pvp_combat['turn'],
"time_remaining": int(time_remaining),
"location_id": pvp_combat['location_id'],
"last_action": pvp_combat.get('last_action'),
"combat_over": pvp_combat.get('attacker_fled', False) or pvp_combat.get('defender_fled', False) or \
pvp_combat['attacker_hp'] <= 0 or pvp_combat['defender_hp'] <= 0,
"attacker_fled": pvp_combat.get('attacker_fled', False),
"defender_fled": pvp_combat.get('defender_fled', False)
}
}
class PvPAcknowledgeRequest(BaseModel):
combat_id: int
@app.post("/api/game/pvp/acknowledge")
async def acknowledge_pvp_combat(
req: PvPAcknowledgeRequest,
current_user: dict = Depends(get_current_user)
):
"""Acknowledge PvP combat end"""
await db.acknowledge_pvp_combat(req.combat_id, current_user['id'])
return {"success": True}
class PvPCombatActionRequest(BaseModel):
action: str # 'attack', 'flee', 'use_item'
item_id: Optional[str] = None # For use_item action
@app.post("/api/game/pvp/action")
async def pvp_combat_action(
req: PvPCombatActionRequest,
current_user: dict = Depends(get_current_user)
):
"""Perform a PvP combat action"""
import random
import time
# Get PvP combat
pvp_combat = await db.get_pvp_combat_by_player(current_user['id'])
if not pvp_combat:
raise HTTPException(status_code=400, detail="Not in PvP combat")
# Determine roles
is_attacker = pvp_combat['attacker_id'] == current_user['id']
your_turn = (is_attacker and pvp_combat['turn'] == 'attacker') or \
(not is_attacker and pvp_combat['turn'] == 'defender')
if not your_turn:
raise HTTPException(status_code=400, detail="It's not your turn")
# Get both players
attacker = await db.get_player_by_id(pvp_combat['attacker_id'])
defender = await db.get_player_by_id(pvp_combat['defender_id'])
current_player = attacker if is_attacker else defender
opponent = defender if is_attacker else attacker
result_message = ""
combat_over = False
winner_id = None
if req.action == 'attack':
# Calculate damage (similar to PvE)
base_damage = 5
strength_bonus = current_player['strength'] * 2
level_bonus = current_player['level']
# Check for equipped weapon
weapon_damage = 0
equipment = await db.get_all_equipment(current_player['id'])
if equipment.get('weapon') and equipment['weapon']:
weapon_slot = equipment['weapon']
inv_item = await db.get_inventory_item_by_id(weapon_slot['item_id'])
if inv_item:
weapon_def = ITEMS_MANAGER.get_item(inv_item['item_id'])
if weapon_def and weapon_def.stats:
weapon_damage = random.randint(
weapon_def.stats.get('damage_min', 0),
weapon_def.stats.get('damage_max', 0)
)
# Decrease weapon durability
if inv_item.get('unique_item_id'):
new_durability = await db.decrease_unique_item_durability(inv_item['unique_item_id'], 1)
if new_durability is None:
result_message += "⚠️ Your weapon broke! "
await db.unequip_item(current_player['id'], 'weapon')
variance = random.randint(-2, 2)
damage = max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
# Apply armor reduction and durability loss to opponent
armor_absorbed, broken_armor = await reduce_armor_durability(opponent['id'], damage)
actual_damage = max(1, damage - armor_absorbed)
# Update opponent HP
new_opponent_hp = max(0, (pvp_combat['defender_hp'] if not is_attacker else pvp_combat['attacker_hp']) - actual_damage)
# Store message with attacker's username so both players can see it correctly
stored_message = f"{current_player['username']} attacks {opponent['username']} for {damage} damage!"
if armor_absorbed > 0:
stored_message += f" (Armor absorbed {armor_absorbed})"
for broken in broken_armor:
stored_message += f"\n💔 {opponent['username']}'s {broken['emoji']} {broken['name']} broke!"
# Return message with "You" for the attacker's UI
result_message = f"You attack {opponent['username']} for {damage} damage!"
if armor_absorbed > 0:
result_message += f" (Armor absorbed {armor_absorbed})"
for broken in broken_armor:
result_message += f"\n💔 {opponent['username']}'s {broken['emoji']} {broken['name']} broke!"
# Check if opponent defeated
if new_opponent_hp <= 0:
stored_message += f"\n🏆 {current_player['username']} has defeated {opponent['username']}!"
result_message += f"\n🏆 You have defeated {opponent['username']}!"
combat_over = True
winner_id = current_player['id']
# Update opponent to dead state
await db.update_player(opponent['id'], hp=0, is_dead=True)
# Update PvP statistics for both players
await db.update_player_statistics(opponent['id'],
pvp_deaths=1,
pvp_combats_lost=1,
pvp_damage_taken=actual_damage,
pvp_attacks_received=1,
increment=True
)
await db.update_player_statistics(current_player['id'],
players_killed=1,
pvp_combats_won=1,
pvp_damage_dealt=damage,
pvp_attacks_landed=1,
increment=True
)
# End PvP combat
await db.end_pvp_combat(pvp_combat['id'])
else:
# Update PvP statistics for attack
await db.update_player_statistics(current_player['id'],
pvp_damage_dealt=damage,
pvp_attacks_landed=1,
increment=True
)
await db.update_player_statistics(opponent['id'],
pvp_damage_taken=actual_damage,
pvp_attacks_received=1,
increment=True
)
# Update combat state and switch turns
# Add timestamp to make each action unique for duplicate detection
updates = {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{stored_message}|{time.time()}" # Add timestamp for uniqueness
}
if is_attacker:
updates['defender_hp'] = new_opponent_hp
else:
updates['attacker_hp'] = new_opponent_hp
await db.update_pvp_combat(pvp_combat['id'], updates)
await db.update_player_statistics(current_player['id'], damage_dealt=damage, increment=True)
elif req.action == 'flee':
# 50% chance to flee from PvP
if random.random() < 0.5:
result_message = f"You successfully fled from {opponent['username']}!"
combat_over = True
# Mark as fled, store last action with timestamp, and end combat
flee_field = 'attacker_fled' if is_attacker else 'defender_fled'
await db.update_pvp_combat(pvp_combat['id'], {
flee_field: True,
'last_action': f"{current_player['username']} fled from combat!|{time.time()}"
})
await db.end_pvp_combat(pvp_combat['id'])
await db.update_player_statistics(current_player['id'],
pvp_successful_flees=1,
increment=True
)
else:
# Failed to flee, skip turn
result_message = f"Failed to flee from {opponent['username']}!"
await db.update_pvp_combat(pvp_combat['id'], {
'turn': 'defender' if is_attacker else 'attacker',
'turn_started_at': time.time(),
'last_action': f"{current_player['username']} tried to flee but failed!|{time.time()}"
})
await db.update_player_statistics(current_player['id'],
pvp_failed_flees=1,
increment=True
)
return {
"success": True,
"message": result_message,
"combat_over": combat_over,
"winner_id": winner_id
}
@app.get("/api/game/inventory")
async def get_inventory(current_user: dict = Depends(get_current_user)):
"""Get player inventory"""
inventory = await db.get_inventory(current_user['id'])
# Enrich with item data
inventory_items = []
for inv_item in inventory:
item = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item:
item_data = {
"id": inv_item['id'],
"item_id": item.id,
"name": item.name,
"description": item.description,
"type": item.type,
"quantity": inv_item['quantity'],
"is_equipped": inv_item['is_equipped'],
"equippable": item.equippable,
"consumable": item.consumable,
"image_path": item.image_path,
"emoji": item.emoji if hasattr(item, 'emoji') else None,
"weight": item.weight if hasattr(item, 'weight') else 0,
"volume": item.volume if hasattr(item, 'volume') else 0,
"uncraftable": getattr(item, 'uncraftable', False),
"inventory_id": inv_item['id'],
"unique_item_id": inv_item.get('unique_item_id')
}
# Add combat/consumable stats if they exist
if hasattr(item, 'hp_restore'):
item_data["hp_restore"] = item.hp_restore
if hasattr(item, 'stamina_restore'):
item_data["stamina_restore"] = item.stamina_restore
if hasattr(item, 'damage_min'):
item_data["damage_min"] = item.damage_min
if hasattr(item, 'damage_max'):
item_data["damage_max"] = item.damage_max
# Add tier if unique item
if inv_item.get('unique_item_id'):
unique_item = await db.get_unique_item(inv_item['unique_item_id'])
if unique_item:
item_data["tier"] = unique_item.get('tier', 1)
item_data["durability"] = unique_item.get('durability', 0)
item_data["max_durability"] = unique_item.get('max_durability', 100)
# Add uncraft data if uncraftable
if getattr(item, 'uncraftable', False):
uncraft_yield = getattr(item, 'uncraft_yield', [])
uncraft_tools = getattr(item, 'uncraft_tools', [])
# Format materials
yield_materials = []
for mat in uncraft_yield:
mat_def = ITEMS_MANAGER.get_item(mat['item_id'])
yield_materials.append({
'item_id': mat['item_id'],
'name': mat_def.name if mat_def else mat['item_id'],
'emoji': mat_def.emoji if mat_def else '📦',
'quantity': mat['quantity']
})
# Check tools availability
tools_info = []
can_uncraft = True
for tool_req in uncraft_tools:
tool_id = tool_req['item_id']
durability_cost = tool_req['durability_cost']
tool_def = ITEMS_MANAGER.get_item(tool_id)
# Check if player has this tool
tool_found = False
tool_durability = 0
for check_item in inventory:
if check_item['item_id'] == tool_id and check_item.get('unique_item_id'):
unique = await db.get_unique_item(check_item['unique_item_id'])
if unique and unique.get('durability', 0) >= durability_cost:
tool_found = True
tool_durability = unique.get('durability', 0)
break
tools_info.append({
'item_id': tool_id,
'name': tool_def.name if tool_def else tool_id,
'emoji': tool_def.emoji if tool_def else '🔧',
'durability_cost': durability_cost,
'has_tool': tool_found,
'tool_durability': tool_durability
})
if not tool_found:
can_uncraft = False
item_data["uncraft_yield"] = yield_materials
item_data["uncraft_loss_chance"] = getattr(item, 'uncraft_loss_chance', 0.3)
item_data["uncraft_tools"] = tools_info
item_data["can_uncraft"] = can_uncraft
inventory_items.append(item_data)
return {"items": inventory_items}
@app.post("/api/game/item/drop")
async def drop_item(
drop_req: dict,
current_user: dict = Depends(get_current_user)
):
"""Drop an item from inventory"""
player_id = current_user['id']
item_id = drop_req.get('item_id') # This is the item_id string like "energy_bar"
quantity = drop_req.get('quantity', 1)
# Get player to know their location
player = await db.get_player_by_id(player_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
# Get inventory item by item_id (string), not database id
inventory = await db.get_inventory(player_id)
inv_item = None
for item in inventory:
if item['item_id'] == item_id:
inv_item = item
break
if not inv_item:
raise HTTPException(status_code=404, detail="Item not found in inventory")
if inv_item['quantity'] < quantity:
raise HTTPException(status_code=400, detail="Not enough items to drop")
# For unique items, we need to handle each one individually
if inv_item.get('unique_item_id'):
# This is a unique item - drop it and remove from inventory by row ID
await db.add_dropped_item(
player['location_id'],
inv_item['item_id'],
1,
unique_item_id=inv_item['unique_item_id']
)
# Remove this specific inventory row (not by item_id, by row id)
await db.remove_inventory_row(inv_item['id'])
else:
# Stackable item - drop the quantity requested
await db.add_dropped_item(
player['location_id'],
inv_item['item_id'],
quantity,
unique_item_id=None
)
# Remove from inventory (handles quantity reduction automatically)
await db.remove_item_from_inventory(player_id, inv_item['item_id'], quantity)
# Track drop statistics
await db.update_player_statistics(player_id, items_dropped=quantity, increment=True)
return {
"success": True,
"message": f"Dropped {quantity} {inv_item['item_id']}"
}
# ============================================================================
# Internal API Endpoints (for bot communication)
# ============================================================================
async def verify_internal_key(authorization: str = Depends(security)):
"""Verify internal API key"""
if authorization.credentials != API_INTERNAL_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid internal API key"
)
return True
@app.get("/api/internal/player/{telegram_id}", dependencies=[Depends(verify_internal_key)])
async def get_player_by_telegram(telegram_id: int):
"""Get player by Telegram ID (for bot)"""
player = await db.get_player_by_telegram_id(telegram_id)
if not player:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Player not found"
)
return player
@app.get("/api/internal/player/by_id/{player_id}", dependencies=[Depends(verify_internal_key)])
async def get_player_by_id(player_id: int):
"""Get player by unique database ID (for bot)"""
player = await db.get_player_by_id(player_id)
if not player:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Player not found"
)
return player
@app.get("/api/internal/player/{player_id}/combat", dependencies=[Depends(verify_internal_key)])
async def get_player_combat(player_id: int):
"""Get active combat for player (for bot)"""
combat = await db.get_active_combat(player_id)
return combat if combat else None
@app.post("/api/internal/combat/create", dependencies=[Depends(verify_internal_key)])
async def create_combat(player_id: int, npc_id: str, npc_hp: int, npc_max_hp: int, location_id: str, from_wandering: bool = False):
"""Create new combat (for bot)"""
combat = await db.create_combat(player_id, npc_id, npc_hp, npc_max_hp, location_id, from_wandering)
return combat
@app.patch("/api/internal/combat/{player_id}", dependencies=[Depends(verify_internal_key)])
async def update_combat(player_id: int, updates: dict):
"""Update combat state (for bot)"""
success = await db.update_combat(player_id, updates)
return {"success": success}
@app.delete("/api/internal/combat/{player_id}", dependencies=[Depends(verify_internal_key)])
async def end_combat(player_id: int):
"""End combat (for bot)"""
success = await db.end_combat(player_id)
return {"success": success}
@app.patch("/api/internal/player/{player_id}", dependencies=[Depends(verify_internal_key)])
async def update_player(player_id: int, updates: dict):
"""Update player fields (for bot)"""
success = await db.update_player(player_id, updates)
if not success:
raise HTTPException(status_code=404, detail="Player not found")
# Return updated player
player = await db.get_player_by_id(player_id)
return player
@app.post("/api/internal/player", dependencies=[Depends(verify_internal_key)])
async def create_telegram_player(telegram_id: int, name: str = "Survivor"):
"""Create player for Telegram bot"""
player = await db.create_player(
telegram_id=telegram_id,
name=name
)
return player
@app.post("/api/internal/player/{player_id}/move", dependencies=[Depends(verify_internal_key)])
async def bot_move_player(player_id: int, direction: str):
"""Move player (for bot)"""
success, message, new_location_id, stamina_cost, distance = await game_logic.move_player(
player_id,
direction,
LOCATIONS
)
# Track distance for bot players too
if success:
await db.update_player_statistics(player_id, distance_walked=distance, increment=True)
return {
"success": success,
"message": message,
"new_location_id": new_location_id
}
@app.get("/api/internal/player/{player_id}/inspect", dependencies=[Depends(verify_internal_key)])
async def bot_inspect_area(player_id: int):
"""Inspect area (for bot)"""
player = await db.get_player_by_id(player_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location = LOCATIONS.get(player['location_id'])
if not location:
raise HTTPException(status_code=404, detail="Location not found")
message = await game_logic.inspect_area(player_id, location, {})
return {"success": True, "message": message}
@app.post("/api/internal/player/{player_id}/interact", dependencies=[Depends(verify_internal_key)])
async def bot_interact(player_id: int, interactable_id: str, action_id: str):
"""Interact with object (for bot)"""
player = await db.get_player_by_id(player_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location = LOCATIONS.get(player['location_id'])
if not location:
raise HTTPException(status_code=404, detail="Location not found")
result = await game_logic.interact_with_object(
player_id,
interactable_id,
action_id,
location,
ITEMS_MANAGER
)
return result
@app.get("/api/internal/player/{player_id}/inventory", dependencies=[Depends(verify_internal_key)])
async def bot_get_inventory(player_id: int):
"""Get inventory (for bot)"""
inventory = await db.get_inventory(player_id)
# Enrich with item data (include all properties for bot compatibility)
inventory_items = []
for inv_item in inventory:
item = ITEMS_MANAGER.get_item(inv_item['item_id'])
if item:
inventory_items.append({
"id": inv_item['id'],
"item_id": item.id,
"name": item.name,
"description": item.description,
"type": item.type,
"quantity": inv_item['quantity'],
"is_equipped": inv_item['is_equipped'],
"equippable": item.equippable,
"consumable": item.consumable,
"weight": getattr(item, 'weight', 0),
"volume": getattr(item, 'volume', 0),
"emoji": getattr(item, 'emoji', ''),
"damage_min": getattr(item, 'damage_min', 0),
"damage_max": getattr(item, 'damage_max', 0),
"hp_restore": getattr(item, 'hp_restore', 0),
"stamina_restore": getattr(item, 'stamina_restore', 0),
"treats": getattr(item, 'treats', None)
})
return {"success": True, "inventory": inventory_items}
@app.post("/api/internal/player/{player_id}/use_item", dependencies=[Depends(verify_internal_key)])
async def bot_use_item(player_id: int, item_id: str):
"""Use item (for bot)"""
result = await game_logic.use_item(player_id, item_id, ITEMS_MANAGER)
return result
@app.post("/api/internal/player/{player_id}/pickup", dependencies=[Depends(verify_internal_key)])
async def bot_pickup_item(player_id: int, item_id: str):
"""Pick up item (for bot)"""
player = await db.get_player_by_id(player_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
result = await game_logic.pickup_item(player_id, item_id, player['location_id'])
return result
@app.post("/api/internal/player/{player_id}/drop_item", dependencies=[Depends(verify_internal_key)])
async def bot_drop_item(player_id: int, item_id: str, quantity: int = 1):
"""Drop item (for bot)"""
player = await db.get_player_by_id(player_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
# Get the item from inventory
inventory = await db.get_inventory(player_id)
inv_item = next((i for i in inventory if i['item_id'] == item_id), None)
if not inv_item or inv_item['quantity'] < quantity:
return {"success": False, "message": "You don't have that item"}
# Remove from inventory
await db.remove_item_from_inventory(player_id, item_id, quantity)
# Add to dropped items
await db.add_dropped_item(player['location_id'], item_id, quantity)
item = ITEMS_MANAGER.get_item(item_id)
item_name = item.name if item else item_id
return {
"success": True,
"message": f"You dropped {quantity}x {item_name}"
}
@app.post("/api/internal/player/{player_id}/equip", dependencies=[Depends(verify_internal_key)])
async def bot_equip_item(player_id: int, item_id: str):
"""Equip item (for bot)"""
# Get item info
item = ITEMS_MANAGER.get_item(item_id)
if not item or not item.equippable:
return {"success": False, "message": "This item cannot be equipped"}
# Check inventory
inventory = await db.get_inventory(player_id)
inv_item = next((i for i in inventory if i['item_id'] == item_id), None)
if not inv_item:
return {"success": False, "message": "You don't have this item"}
if inv_item['is_equipped']:
return {"success": False, "message": "This item is already equipped"}
# Unequip any item of the same type
for inv in inventory:
if inv['is_equipped']:
existing_item = ITEMS_MANAGER.get_item(inv['item_id'])
if existing_item and existing_item.type == item.type:
await db.update_item_equipped_status(player_id, inv['item_id'], False)
# Equip the new item
await db.update_item_equipped_status(player_id, item_id, True)
return {"success": True, "message": f"You equipped {item.name}"}
@app.post("/api/internal/player/{player_id}/unequip", dependencies=[Depends(verify_internal_key)])
async def bot_unequip_item(player_id: int, item_id: str):
"""Unequip item (for bot)"""
# Check inventory
inventory = await db.get_inventory(player_id)
inv_item = next((i for i in inventory if i['item_id'] == item_id), None)
if not inv_item:
return {"success": False, "message": "You don't have this item"}
if not inv_item['is_equipped']:
return {"success": False, "message": "This item is not equipped"}
# Unequip the item
await db.update_item_equipped_status(player_id, item_id, False)
item = ITEMS_MANAGER.get_item(item_id)
item_name = item.name if item else item_id
return {"success": True, "message": f"You unequipped {item_name}"}
# ============================================================================
# Dropped Items (Internal Bot API)
# ============================================================================
@app.post("/api/internal/dropped-items", dependencies=[Depends(verify_internal_key)])
async def drop_item(item_id: str, quantity: int, location_id: str):
"""Drop an item to the world (for bot)"""
success = await db.drop_item_to_world(item_id, quantity, location_id)
return {"success": success}
@app.get("/api/internal/dropped-items/{dropped_item_id}", dependencies=[Depends(verify_internal_key)])
async def get_dropped_item(dropped_item_id: int):
"""Get a specific dropped item (for bot)"""
item = await db.get_dropped_item(dropped_item_id)
if not item:
raise HTTPException(status_code=404, detail="Dropped item not found")
return item
@app.get("/api/internal/location/{location_id}/dropped-items", dependencies=[Depends(verify_internal_key)])
async def get_dropped_items_in_location(location_id: str):
"""Get all dropped items in a location (for bot)"""
items = await db.get_dropped_items_in_location(location_id)
return items
@app.patch("/api/internal/dropped-items/{dropped_item_id}", dependencies=[Depends(verify_internal_key)])
async def update_dropped_item(dropped_item_id: int, quantity: int):
"""Update dropped item quantity (for bot)"""
success = await db.update_dropped_item(dropped_item_id, quantity)
if not success:
raise HTTPException(status_code=404, detail="Dropped item not found")
return {"success": success}
@app.delete("/api/internal/dropped-items/{dropped_item_id}", dependencies=[Depends(verify_internal_key)])
async def remove_dropped_item(dropped_item_id: int):
"""Remove a dropped item (for bot)"""
success = await db.remove_dropped_item(dropped_item_id)
return {"success": success}
# ============================================================================
# Corpses (Internal Bot API)
# ============================================================================
@app.post("/api/internal/corpses/player", dependencies=[Depends(verify_internal_key)])
async def create_player_corpse(player_name: str, location_id: str, items: str):
"""Create a player corpse (for bot)"""
corpse_id = await db.create_player_corpse(player_name, location_id, items)
return {"corpse_id": corpse_id}
@app.get("/api/internal/corpses/player/{corpse_id}", dependencies=[Depends(verify_internal_key)])
async def get_player_corpse(corpse_id: int):
"""Get a player corpse (for bot)"""
corpse = await db.get_player_corpse(corpse_id)
if not corpse:
raise HTTPException(status_code=404, detail="Player corpse not found")
return corpse
@app.patch("/api/internal/corpses/player/{corpse_id}", dependencies=[Depends(verify_internal_key)])
async def update_player_corpse(corpse_id: int, items: str):
"""Update player corpse items (for bot)"""
success = await db.update_player_corpse(corpse_id, items)
if not success:
raise HTTPException(status_code=404, detail="Player corpse not found")
return {"success": success}
@app.delete("/api/internal/corpses/player/{corpse_id}", dependencies=[Depends(verify_internal_key)])
async def remove_player_corpse(corpse_id: int):
"""Remove a player corpse (for bot)"""
success = await db.remove_player_corpse(corpse_id)
return {"success": success}
@app.post("/api/internal/corpses/npc", dependencies=[Depends(verify_internal_key)])
async def create_npc_corpse(npc_id: str, location_id: str, loot_remaining: str):
"""Create an NPC corpse (for bot)"""
corpse_id = await db.create_npc_corpse(npc_id, location_id, loot_remaining)
return {"corpse_id": corpse_id}
@app.get("/api/internal/corpses/npc/{corpse_id}", dependencies=[Depends(verify_internal_key)])
async def get_npc_corpse(corpse_id: int):
"""Get an NPC corpse (for bot)"""
corpse = await db.get_npc_corpse(corpse_id)
if not corpse:
raise HTTPException(status_code=404, detail="NPC corpse not found")
return corpse
@app.patch("/api/internal/corpses/npc/{corpse_id}", dependencies=[Depends(verify_internal_key)])
async def update_npc_corpse(corpse_id: int, loot_remaining: str):
"""Update NPC corpse loot (for bot)"""
success = await db.update_npc_corpse(corpse_id, loot_remaining)
if not success:
raise HTTPException(status_code=404, detail="NPC corpse not found")
return {"success": success}
@app.delete("/api/internal/corpses/npc/{corpse_id}", dependencies=[Depends(verify_internal_key)])
async def remove_npc_corpse(corpse_id: int):
"""Remove an NPC corpse (for bot)"""
success = await db.remove_npc_corpse(corpse_id)
return {"success": success}
# ============================================================================
# Wandering Enemies (Internal Bot API)
# ============================================================================
@app.post("/api/internal/wandering-enemies", dependencies=[Depends(verify_internal_key)])
async def spawn_wandering_enemy(npc_id: str, location_id: str, current_hp: int, max_hp: int):
"""Spawn a wandering enemy (for bot)"""
enemy_id = await db.spawn_wandering_enemy(npc_id, location_id, current_hp, max_hp)
return {"enemy_id": enemy_id}
@app.get("/api/internal/location/{location_id}/wandering-enemies", dependencies=[Depends(verify_internal_key)])
async def get_wandering_enemies_in_location(location_id: str):
"""Get all wandering enemies in a location (for bot)"""
enemies = await db.get_wandering_enemies_in_location(location_id)
return enemies
@app.delete("/api/internal/wandering-enemies/{enemy_id}", dependencies=[Depends(verify_internal_key)])
async def remove_wandering_enemy(enemy_id: int):
"""Remove a wandering enemy (for bot)"""
success = await db.remove_wandering_enemy(enemy_id)
return {"success": success}
@app.get("/api/internal/inventory/item/{item_db_id}", dependencies=[Depends(verify_internal_key)])
async def get_inventory_item(item_db_id: int):
"""Get a specific inventory item by database ID (for bot)"""
item = await db.get_inventory_item(item_db_id)
if not item:
raise HTTPException(status_code=404, detail="Inventory item not found")
return item
# ============================================================================
# Cooldowns (Internal Bot API)
# ============================================================================
@app.get("/api/internal/cooldown/{cooldown_key}", dependencies=[Depends(verify_internal_key)])
async def get_cooldown(cooldown_key: str):
"""Get remaining cooldown time in seconds (for bot)"""
remaining = await db.get_cooldown(cooldown_key)
return {"remaining_seconds": remaining}
@app.post("/api/internal/cooldown/{cooldown_key}", dependencies=[Depends(verify_internal_key)])
async def set_cooldown(cooldown_key: str, duration_seconds: int = 600):
"""Set a cooldown (for bot)"""
success = await db.set_cooldown(cooldown_key, duration_seconds)
return {"success": success}
# ============================================================================
# Corpse Lists (Internal Bot API)
# ============================================================================
@app.get("/api/internal/location/{location_id}/corpses/player", dependencies=[Depends(verify_internal_key)])
async def get_player_corpses_in_location(location_id: str):
"""Get all player corpses in a location (for bot)"""
corpses = await db.get_player_corpses_in_location(location_id)
return corpses
@app.get("/api/internal/location/{location_id}/corpses/npc", dependencies=[Depends(verify_internal_key)])
async def get_npc_corpses_in_location(location_id: str):
"""Get all NPC corpses in a location (for bot)"""
corpses = await db.get_npc_corpses_in_location(location_id)
return corpses
# ============================================================================
# Image Cache (Internal Bot API)
# ============================================================================
@app.get("/api/internal/image-cache/{image_path:path}", dependencies=[Depends(verify_internal_key)])
async def get_cached_image(image_path: str):
"""Get cached telegram file ID for an image (for bot)"""
file_id = await db.get_cached_image(image_path)
if not file_id:
raise HTTPException(status_code=404, detail="Image not cached")
return {"telegram_file_id": file_id}
@app.post("/api/internal/image-cache", dependencies=[Depends(verify_internal_key)])
async def cache_image(image_path: str, telegram_file_id: str):
"""Cache a telegram file ID for an image (for bot)"""
success = await db.cache_image(image_path, telegram_file_id)
return {"success": success}
# ============================================================================
# Status Effects (Internal Bot API)
# ============================================================================
@app.get("/api/internal/player/{player_id}/status-effects", dependencies=[Depends(verify_internal_key)])
async def get_player_status_effects(player_id: int):
"""Get player status effects (for bot)"""
effects = await db.get_player_status_effects(player_id)
return effects
# ============================================================================
# Statistics & Leaderboard Endpoints
# ============================================================================
@app.get("/api/statistics/{player_id}")
async def get_player_stats(player_id: int):
"""Get player statistics by player ID (public)"""
stats = await db.get_player_statistics(player_id)
if not stats:
raise HTTPException(status_code=404, detail="Player statistics not found")
player = await db.get_player_by_id(player_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
return {
"player": {
"id": player['id'],
"username": player['username'],
"name": player['name'],
"level": player['level']
},
"statistics": stats
}
@app.get("/api/statistics/me")
async def get_my_stats(current_user: dict = Depends(get_current_user)):
"""Get current user's statistics"""
stats = await db.get_player_statistics(current_user['id'])
return {"statistics": stats}
@app.get("/api/leaderboard/{stat_name}")
async def get_leaderboard_by_stat(stat_name: str, limit: int = 100):
"""
Get leaderboard for a specific statistic.
Available stats: distance_walked, enemies_killed, damage_dealt, damage_taken,
hp_restored, stamina_used, items_collected, deaths, etc.
"""
valid_stats = [
"distance_walked", "enemies_killed", "damage_dealt", "damage_taken",
"hp_restored", "stamina_used", "stamina_restored", "items_collected",
"items_dropped", "items_used", "deaths", "successful_flees", "failed_flees",
"combats_initiated", "total_playtime"
]
if stat_name not in valid_stats:
raise HTTPException(
status_code=400,
detail=f"Invalid stat name. Valid stats: {', '.join(valid_stats)}"
)
leaderboard = await db.get_leaderboard(stat_name, limit)
return {
"stat_name": stat_name,
"leaderboard": leaderboard
}
# ============================================================================
# Health Check
# ============================================================================
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"version": "2.0.0",
"locations_loaded": len(LOCATIONS),
"items_loaded": len(ITEMS_MANAGER.items)
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)