Files
echoes-of-the-ash/api/routers/loot.py
2026-02-08 20:18:42 +01:00

516 lines
22 KiB
Python

"""
Loot router.
Auto-generated from main.py migration.
"""
from fastapi import APIRouter, HTTPException, Depends, status, Request
from fastapi.security import HTTPAuthorizationCredentials
from typing import Optional, Dict, Any
from datetime import datetime
import random
import json
import logging
from ..core.security import get_current_user, security, verify_internal_key
from ..services.models import *
from ..services.helpers import calculate_distance, calculate_stamina_cost, calculate_player_capacity, get_locale_string, get_game_message
from .. import database as db
from ..items import ItemsManager
from .. import game_logic
from ..core.websockets import manager
from .equipment import consume_tool_durability
logger = logging.getLogger(__name__)
# These will be injected by main.py
LOCATIONS = None
ITEMS_MANAGER = None
WORLD = None
def init_router_dependencies(locations, items_manager, world):
"""Initialize router with game data dependencies"""
global LOCATIONS, ITEMS_MANAGER, WORLD
LOCATIONS = locations
ITEMS_MANAGER = items_manager
WORLD = world
router = APIRouter(tags=["loot"])
# Endpoints
@router.get("/api/game/corpse/{corpse_id}")
async def get_corpse_details(
corpse_id: str,
request: Request,
current_user: dict = Depends(get_current_user)
):
"""Get detailed information about a corpse's lootable items"""
import json
import sys
sys.path.insert(0, '/app')
from data.npcs import NPCS
# Extract locale
locale = request.headers.get('Accept-Language', 'en')
# Parse corpse ID
corpse_type, corpse_db_id = corpse_id.split('_', 1)
corpse_db_id = int(corpse_db_id)
player = current_user # current_user is already the character dict
# Get player's inventory to check available tools
inventory = await db.get_inventory(player['id'])
available_tools = set([item['item_id'] for item in inventory])
if corpse_type == 'npc':
# Get NPC corpse
corpse = await db.get_npc_corpse(corpse_db_id)
if not corpse:
raise HTTPException(status_code=404, detail="Corpse not found")
if corpse['location_id'] != player['location_id']:
raise HTTPException(status_code=400, detail="Corpse not at this location")
# Parse remaining loot
loot_remaining = json.loads(corpse['loot_remaining']) if corpse['loot_remaining'] else []
# Format loot items with tool requirements
loot_items = []
for idx, loot_item in enumerate(loot_remaining):
required_tool = loot_item.get('required_tool')
item_def = ITEMS_MANAGER.get_item(loot_item['item_id'])
has_tool = required_tool is None or required_tool in available_tools
tool_def = ITEMS_MANAGER.get_item(required_tool) if required_tool else None
loot_items.append({
'index': idx,
'item_id': loot_item['item_id'],
'item_name': item_def.name if item_def else loot_item['item_id'],
'description': item_def.description if item_def else None,
'image_path': item_def.image_path if item_def else None,
'emoji': item_def.emoji if item_def else '📦',
'quantity_min': loot_item['quantity_min'],
'quantity_max': loot_item['quantity_max'],
'required_tool': required_tool,
'required_tool_name': tool_def.name if tool_def else required_tool,
'has_tool': has_tool,
'can_loot': has_tool
})
npc_def = NPCS.get(corpse['npc_id'])
return {
'corpse_id': corpse_id,
'type': 'npc',
'name': get_game_message('corpse_name_npc', locale, name=get_locale_string(npc_def.name, locale) if npc_def else corpse['npc_id']),
'loot_items': loot_items,
'total_items': len(loot_items)
}
elif corpse_type == 'player':
# Get player corpse
corpse = await db.get_player_corpse(corpse_db_id)
if not corpse:
raise HTTPException(status_code=404, detail="Corpse not found")
if corpse['location_id'] != player['location_id']:
raise HTTPException(status_code=400, detail="Corpse not at this location")
# Parse items
items = json.loads(corpse['items']) if corpse['items'] else []
# Format items (player corpses don't require tools)
loot_items = []
for idx, item in enumerate(items):
item_def = ITEMS_MANAGER.get_item(item['item_id'])
loot_items.append({
'index': idx,
'item_id': item['item_id'],
'item_name': item_def.name if item_def else item['item_id'],
'description': item_def.description if item_def else None,
'image_path': item_def.image_path if item_def else None,
'emoji': item_def.emoji if item_def else '📦',
'quantity_min': item['quantity'],
'quantity_max': item['quantity'],
'required_tool': None,
'required_tool_name': None,
'has_tool': True,
'can_loot': True
})
return {
'corpse_id': corpse_id,
'type': 'player',
'name': get_game_message('corpse_name_player', locale, name=corpse['player_name']),
'loot_items': loot_items,
'total_items': len(loot_items)
}
else:
raise HTTPException(status_code=400, detail="Invalid corpse type")
@router.post("/api/game/loot_corpse")
async def loot_corpse(
req: LootCorpseRequest,
request: Request,
current_user: dict = Depends(get_current_user)
):
"""Loot a corpse (NPC or player) - can loot specific item by index or all items"""
import json
import sys
import random
sys.path.insert(0, '/app')
from data.npcs import NPCS
# Extract locale
locale = request.headers.get('Accept-Language', 'en')
# Parse corpse ID
corpse_type, corpse_db_id = req.corpse_id.split('_', 1)
corpse_db_id = int(corpse_db_id)
player = current_user # current_user is already the character dict
# Get player's current capacity
inventory = await db.get_inventory(player['id'])
current_weight, max_weight, current_volume, max_volume = await calculate_player_capacity(inventory, ITEMS_MANAGER)
if corpse_type == 'npc':
# Get NPC corpse
corpse = await db.get_npc_corpse(corpse_db_id)
if not corpse:
raise HTTPException(status_code=404, detail="Corpse not found")
# Check if player is at the same location
if corpse['location_id'] != player['location_id']:
raise HTTPException(status_code=400, detail="Corpse not at this location")
# Parse remaining loot
loot_remaining = json.loads(corpse['loot_remaining']) if corpse['loot_remaining'] else []
if not loot_remaining:
raise HTTPException(status_code=400, detail="Corpse has already been looted")
# Use inventory already fetched for capacity calculation
available_tools = set([item['item_id'] for item in inventory])
looted_items = []
remaining_loot = []
dropped_items = [] # Items that couldn't fit in inventory
tools_consumed = [] # Track tool durability consumed
# If specific item index provided, loot only that item
if req.item_index is not None:
if req.item_index < 0 or req.item_index >= len(loot_remaining):
raise HTTPException(status_code=400, detail="Invalid item index")
loot_item = loot_remaining[req.item_index]
required_tool = loot_item.get('required_tool')
durability_cost = loot_item.get('tool_durability_cost', 5) # Default 5 durability per loot
# Check if player has required tool and consume durability
if required_tool:
# Build tool requirement format for consume_tool_durability
tool_req = [{
'item_id': required_tool,
'durability_cost': durability_cost
}]
success, error_msg, tools_consumed = await consume_tool_durability(player['id'], tool_req, inventory)
if not success:
raise HTTPException(status_code=400, detail=error_msg)
# Determine quantity
quantity = random.randint(loot_item['quantity_min'], loot_item['quantity_max'])
if quantity > 0:
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(loot_item['item_id'])
if item_def:
item_weight = item_def.weight * quantity
item_volume = item_def.volume * quantity
if current_weight + item_weight > max_weight or current_volume + item_volume > max_volume:
# Item doesn't fit - drop it on ground
await db.add_dropped_item(player['location_id'], loot_item['item_id'], quantity)
dropped_items.append({
'item_id': loot_item['item_id'],
'quantity': quantity,
'emoji': item_def.emoji
})
else:
# Item fits - add to inventory
await db.add_item_to_inventory(player['id'], loot_item['item_id'], quantity)
current_weight += item_weight
current_volume += item_volume
looted_items.append({
'item_id': loot_item['item_id'],
'quantity': quantity
})
# Remove this item from loot, keep others
remaining_loot = [item for i, item in enumerate(loot_remaining) if i != req.item_index]
else:
# Loot all items that don't require tools or player has tools for
for loot_item in loot_remaining:
required_tool = loot_item.get('required_tool')
durability_cost = loot_item.get('tool_durability_cost', 5)
# If tool is required, consume durability
can_loot = True
if required_tool:
tool_req = [{
'item_id': required_tool,
'durability_cost': durability_cost
}]
# Check if player has tool with enough durability
success, error_msg, consumed_info = await consume_tool_durability(player['id'], tool_req, inventory)
if success:
# Tool consumed successfully
tools_consumed.extend(consumed_info)
# Refresh inventory after tool consumption
inventory = await db.get_inventory(player['id'])
else:
# Can't loot this item
can_loot = False
if can_loot:
# Can loot this item
quantity = random.randint(loot_item['quantity_min'], loot_item['quantity_max'])
if quantity > 0:
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(loot_item['item_id'])
if item_def:
item_weight = item_def.weight * quantity
item_volume = item_def.volume * quantity
if current_weight + item_weight > max_weight or current_volume + item_volume > max_volume:
# Item doesn't fit - drop it on ground
await db.add_dropped_item(player['location_id'], loot_item['item_id'], quantity)
dropped_items.append({
'item_id': loot_item['item_id'],
'quantity': quantity,
'emoji': item_def.emoji
})
else:
# Item fits - add to inventory
await db.add_item_to_inventory(player['id'], loot_item['item_id'], quantity)
current_weight += item_weight
current_volume += item_volume
looted_items.append({
'item_id': loot_item['item_id'],
'quantity': quantity
})
else:
# Keep in corpse
remaining_loot.append(loot_item)
# Update or remove corpse
if remaining_loot:
await db.update_npc_corpse(corpse_db_id, json.dumps(remaining_loot))
else:
await db.remove_npc_corpse(corpse_db_id)
# Build response message
message_parts = []
for item in looted_items:
item_def = ITEMS_MANAGER.get_item(item['item_id'])
item_name = get_locale_string(item_def.name, locale) if item_def else item['item_id']
message_parts.append(f"{item_def.emoji if item_def else ''} {item_name} x{item['quantity']}")
dropped_parts = []
for item in dropped_items:
item_def = ITEMS_MANAGER.get_item(item['item_id'])
item_name = get_locale_string(item_def.name, locale) if item_def else item['item_id']
dropped_parts.append(f"{item.get('emoji', '📦')} {item_name} x{item['quantity']}")
message = ""
if message_parts:
message = get_game_message('looted_items_start', locale) + ", ".join(message_parts)
if dropped_parts:
if message:
message += "\n"
message += get_game_message('backpack_full_drop', locale) + ", ".join(dropped_parts)
if not message_parts and not dropped_parts:
message = get_game_message('nothing_looted', locale)
if remaining_loot and req.item_index is None:
message += "\n" + get_game_message('items_require_tools', locale, count=len(remaining_loot))
# Broadcast to location about corpse looting
if len(remaining_loot) == 0:
# Corpse fully looted
await manager.send_to_location(
location_id=player['location_id'],
message={
"type": "location_update",
"data": {
"message": get_game_message('full_loot_broadcast', locale, player_name=player['name']),
"action": "corpse_looted"
},
"timestamp": datetime.utcnow().isoformat()
},
exclude_player_id=player['id']
)
return {
"success": True,
"message": message,
"looted_items": looted_items,
"dropped_items": dropped_items,
"tools_consumed": tools_consumed,
"corpse_empty": len(remaining_loot) == 0,
"remaining_count": len(remaining_loot)
}
elif corpse_type == 'player':
# Get player corpse
corpse = await db.get_player_corpse(corpse_db_id)
if not corpse:
raise HTTPException(status_code=404, detail="Corpse not found")
if corpse['location_id'] != player['location_id']:
raise HTTPException(status_code=400, detail="Corpse not at this location")
# Parse items
items = json.loads(corpse['items']) if corpse['items'] else []
if not items:
raise HTTPException(status_code=400, detail="Corpse has no items")
looted_items = []
remaining_items = []
dropped_items = [] # Items that couldn't fit in inventory
# If specific item index provided, loot only that item
if req.item_index is not None:
if req.item_index < 0 or req.item_index >= len(items):
raise HTTPException(status_code=400, detail="Invalid item index")
item = items[req.item_index]
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(item['item_id'])
if item_def:
item_weight = item_def.weight * item['quantity']
item_volume = item_def.volume * item['quantity']
if current_weight + item_weight > max_weight or current_volume + item_volume > max_volume:
# Item doesn't fit - drop it on ground
await db.add_dropped_item(player['location_id'], item['item_id'], item['quantity'])
dropped_items.append({
'item_id': item['item_id'],
'quantity': item['quantity'],
'emoji': item_def.emoji
})
else:
# Item fits - add to inventory
await db.add_item_to_inventory(player['id'], item['item_id'], item['quantity'])
looted_items.append(item)
# Remove this item, keep others
remaining_items = [it for i, it in enumerate(items) if i != req.item_index]
else:
# Loot all items
for item in items:
# Check if item fits in inventory
item_def = ITEMS_MANAGER.get_item(item['item_id'])
if item_def:
item_weight = item_def.weight * item['quantity']
item_volume = item_def.volume * item['quantity']
if current_weight + item_weight > max_weight or current_volume + item_volume > max_volume:
# Item doesn't fit - drop it on ground
await db.add_dropped_item(player['location_id'], item['item_id'], item['quantity'])
dropped_items.append({
'item_id': item['item_id'],
'quantity': item['quantity'],
'emoji': item_def.emoji
})
else:
# Item fits - add to inventory
await db.add_item_to_inventory(player['id'], item['item_id'], item['quantity'])
current_weight += item_weight
current_volume += item_volume
looted_items.append(item)
# Update or remove corpse
if remaining_items:
await db.update_player_corpse(corpse_db_id, json.dumps(remaining_items))
else:
await db.remove_player_corpse(corpse_db_id)
# Build message
message_parts = []
for item in looted_items:
item_def = ITEMS_MANAGER.get_item(item['item_id'])
item_name = get_locale_string(item_def.name, locale) if item_def else item['item_id']
message_parts.append(f"{item_def.emoji if item_def else ''} {item_name} x{item['quantity']}")
dropped_parts = []
for item in dropped_items:
item_def = ITEMS_MANAGER.get_item(item['item_id'])
item_name = get_locale_string(item_def.name, locale) if item_def else item['item_id']
dropped_parts.append(f"{item.get('emoji', '📦')} {item_name} x{item['quantity']}")
message = ""
if message_parts:
message = get_game_message('looted_items_start', locale) + ", ".join(message_parts)
if dropped_parts:
if message:
message += "\n"
message += get_game_message('backpack_full_drop', locale) + ", ".join(dropped_parts)
if not message_parts and not dropped_parts:
message = get_game_message('nothing_looted', locale)
# Broadcast to location about corpse looting
if len(remaining_items) == 0:
# Corpse fully looted - broadcast removal
await manager.send_to_location(
location_id=player['location_id'],
message={
"type": "location_update",
"data": {
"message": get_game_message('player_corpse_emptied_broadcast', locale, player_name=player['name'], corpse_name=corpse['player_name']),
"action": "player_corpse_emptied",
"corpse_id": req.corpse_id
},
"timestamp": datetime.utcnow().isoformat()
},
exclude_player_id=player['id']
)
else:
# Corpse partially looted - broadcast item updates
await manager.send_to_location(
location_id=player['location_id'],
message={
"type": "location_update",
"data": {
"message": get_game_message('player_corpse_looted_broadcast', locale, player_name=player['name'], corpse_name=corpse['player_name']),
"action": "player_corpse_looted",
"corpse_id": req.corpse_id,
"remaining_items": remaining_items,
"looted_item_ids": [item['item_id'] for item in looted_items] if req.item_index is not None else None
},
"timestamp": datetime.utcnow().isoformat()
},
exclude_player_id=player['id']
)
return {
"success": True,
"message": message,
"looted_items": looted_items,
"dropped_items": dropped_items,
"corpse_empty": len(remaining_items) == 0,
"remaining_count": len(remaining_items)
}
else:
raise HTTPException(status_code=400, detail="Invalid corpse type")