Files
echoes-of-the-ash/api/main.old.py
2025-11-07 15:27:13 +01:00

500 lines
17 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from typing import Optional, List
import jwt
import bcrypt
from datetime import datetime, timedelta
import os
import sys
# Add parent directory to path to import bot modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from bot.database import get_player, create_player
from data.world_loader import load_world
from api.internal import router as internal_router
app = FastAPI(title="Echoes of the Ashes API", version="1.0.0")
# Include internal API router
app.include_router(internal_router)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["https://echoesoftheashgame.patacuack.net", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# JWT Configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
security = HTTPBearer()
# Load world data
WORLD = None
LOCATIONS = {}
try:
WORLD = load_world()
# WORLD.locations is already a dict {location_id: Location}
LOCATIONS = WORLD.locations
print(f"✅ Loaded {len(LOCATIONS)} locations")
except Exception as e:
print(f"⚠️ Warning: Could not load world data: {e}")
import traceback
traceback.print_exc()
# Pydantic Models
class UserRegister(BaseModel):
username: str
password: str
class UserLogin(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
class User(BaseModel):
id: int
username: str
telegram_id: Optional[str] = None
class PlayerState(BaseModel):
location_id: str
location_name: str
health: int
max_health: int
stamina: int
max_stamina: int
inventory: List[dict]
status_effects: List[dict]
class MoveRequest(BaseModel):
direction: str
# Helper Functions
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
return user_id
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired")
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Could not validate credentials")
# Routes
@app.get("/")
async def root():
return {"message": "Echoes of the Ashes API", "status": "online"}
@app.post("/api/auth/register", response_model=Token)
async def register(user_data: UserRegister):
"""Register a new user account"""
try:
# Check if username already exists
existing_player = await get_player(username=user_data.username)
if existing_player:
raise HTTPException(status_code=400, detail="Username already exists")
# Hash password
password_hash = bcrypt.hashpw(user_data.password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
# Create player with web auth
player = await create_player(
telegram_id=None,
username=user_data.username,
password_hash=password_hash
)
if not player or 'id' not in player:
print(f"ERROR: create_player returned: {player}")
raise HTTPException(status_code=500, detail="Failed to create player - no ID returned")
# Create token
access_token = create_access_token(data={"sub": player['id']})
return {"access_token": access_token}
except HTTPException:
raise
except Exception as e:
import traceback
print(f"ERROR in register: {str(e)}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/auth/login", response_model=Token)
async def login(user_data: UserLogin):
"""Login with username and password"""
try:
# Get player
player = await get_player(username=user_data.username)
if not player or not player.get('password_hash'):
raise HTTPException(status_code=401, detail="Invalid username or password")
# Verify password
if not bcrypt.checkpw(user_data.password.encode('utf-8'), player['password_hash'].encode('utf-8')):
raise HTTPException(status_code=401, detail="Invalid username or password")
# Create token
access_token = create_access_token(data={"sub": player['id']})
return {"access_token": access_token}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/auth/me", response_model=User)
async def get_current_user(user_id: int = Depends(verify_token)):
"""Get current authenticated user"""
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": player['id'],
"username": player.get('username'),
"telegram_id": player.get('telegram_id')
}
@app.get("/api/game/state", response_model=PlayerState)
async def get_game_state(user_id: int = Depends(verify_token)):
"""Get current player game state"""
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location = LOCATIONS.get(player['location_id'])
# TODO: Get actual inventory and status effects from database
inventory = []
status_effects = []
return {
"location_id": player['location_id'],
"location_name": location.name if location else "Unknown",
"health": player['hp'],
"max_health": player['max_hp'],
"stamina": player['stamina'],
"max_stamina": player['max_stamina'],
"inventory": inventory,
"status_effects": status_effects
}
@app.post("/api/game/move")
async def move_player(move_data: MoveRequest, user_id: int = Depends(verify_token)):
"""Move player in a direction"""
from bot.database import update_player
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
current_location = LOCATIONS.get(player['location_id'])
if not current_location:
raise HTTPException(status_code=400, detail="Invalid current location")
# Check if player has enough stamina
if player['stamina'] < 1:
raise HTTPException(status_code=400, detail="Not enough stamina to move")
# Find exit in the specified direction (exits is dict {direction: destination_id})
destination_id = current_location.exits.get(move_data.direction.lower())
if not destination_id:
raise HTTPException(status_code=400, detail=f"Cannot move {move_data.direction} from here")
# Move player
new_location = LOCATIONS.get(destination_id)
if not new_location:
raise HTTPException(status_code=400, detail="Invalid destination")
# Update player location and stamina (use player_id for web users)
await update_player(player_id=player['id'], updates={
'location_id': new_location.id,
'stamina': max(0, player['stamina'] - 1)
})
# Get updated player state
updated_player = await get_player(player_id=user_id)
return {
"success": True,
"message": f"You travel {move_data.direction} to {new_location.name}. {new_location.description}",
"player_state": {
"location_id": updated_player['location_id'],
"location_name": new_location.name,
"health": updated_player['hp'],
"max_health": updated_player['max_hp'],
"stamina": updated_player['stamina'],
"max_stamina": updated_player['max_stamina'],
"inventory": [],
"status_effects": []
}
}
@app.get("/api/game/location")
async def get_current_location(user_id: int = Depends(verify_token)):
"""Get detailed information about current location"""
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location = LOCATIONS.get(player['location_id'])
if not location:
raise HTTPException(status_code=404, detail=f"Location '{player['location_id']}' not found")
# Get available directions from exits dict
directions = list(location.exits.keys())
# Get NPCs at location (TODO: implement NPC spawning)
npcs = []
# Get items at location (TODO: implement dropped items)
items = []
# Determine image extension (png or jpg)
image_url = None
if location.image_path:
# Use the path from location data
image_url = f"/{location.image_path}"
else:
# Default to png with fallback to jpg
image_url = f"/images/locations/{location.id}.png"
return {
"id": location.id,
"name": location.name,
"description": location.description,
"directions": directions,
"npcs": npcs,
"items": items,
"image_url": image_url,
"interactables": [{"id": k, "name": v.name} for k, v in location.interactables.items()]
}
@app.get("/api/game/inventory")
async def get_inventory(user_id: int = Depends(verify_token)):
"""Get player's inventory"""
from bot.database import get_inventory
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
# For web users without telegram_id, inventory might be empty
# This is a limitation of the current schema
inventory = []
return {
"items": inventory,
"capacity": 20 # TODO: Calculate based on equipped bag
}
@app.get("/api/game/profile")
async def get_profile(user_id: int = Depends(verify_token)):
"""Get player profile and stats"""
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
return {
"name": player['name'],
"level": player['level'],
"xp": player['xp'],
"hp": player['hp'],
"max_hp": player['max_hp'],
"stamina": player['stamina'],
"max_stamina": player['max_stamina'],
"strength": player['strength'],
"agility": player['agility'],
"endurance": player['endurance'],
"intellect": player['intellect'],
"unspent_points": player['unspent_points'],
"is_dead": player['is_dead']
}
@app.get("/api/game/map")
async def get_map(user_id: int = Depends(verify_token)):
"""Get world map data"""
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
# Return all locations and connections (LOCATIONS is dict {id: Location})
locations_data = []
for loc_id, loc in LOCATIONS.items():
locations_data.append({
"id": loc.id,
"name": loc.name,
"description": loc.description,
"exits": loc.exits # Dict of {direction: destination_id}
})
return {
"current_location": player['location_id'],
"locations": locations_data
}
@app.post("/api/game/inspect")
async def inspect_area(user_id: int = Depends(verify_token)):
"""Inspect the current area for details"""
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location = LOCATIONS.get(player['location_id'])
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Get detailed information
interactables_detail = []
for inst_id, inter in location.interactables.items():
actions = [{"id": act.id, "label": act.label, "stamina_cost": act.stamina_cost}
for act in inter.actions.values()]
interactables_detail.append({
"instance_id": inst_id,
"name": inter.name,
"actions": actions
})
return {
"location": location.name,
"description": location.description,
"interactables": interactables_detail,
"exits": location.exits
}
class InteractRequest(BaseModel):
interactable_id: str
action_id: str
@app.post("/api/game/interact")
async def interact_with_object(interact_data: InteractRequest, user_id: int = Depends(verify_token)):
"""Interact with an object in the world"""
from bot.database import update_player, add_inventory_item
import random
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
location = LOCATIONS.get(player['location_id'])
if not location:
raise HTTPException(status_code=404, detail="Location not found")
interactable = location.interactables.get(interact_data.interactable_id)
if not interactable:
raise HTTPException(status_code=404, detail="Interactable not found")
action = interactable.actions.get(interact_data.action_id)
if not action:
raise HTTPException(status_code=404, detail="Action not found")
# Check stamina
if player['stamina'] < action.stamina_cost:
raise HTTPException(status_code=400, detail="Not enough stamina")
# Perform action - randomly choose outcome
outcome_key = random.choice(list(action.outcomes.keys()))
outcome = action.outcomes[outcome_key]
# Apply outcome
stamina_change = -action.stamina_cost
hp_change = -outcome.damage_taken if outcome.damage_taken else 0
items_found = outcome.items_reward if outcome.items_reward else {}
# Update player
new_hp = max(1, player['hp'] + hp_change)
new_stamina = max(0, player['stamina'] + stamina_change)
await update_player(player_id=player['id'], updates={
'hp': new_hp,
'stamina': new_stamina
})
# Add items to inventory (if player has telegram_id for FK)
items_added = []
if player.get('telegram_id') and items_found:
for item_id, quantity in items_found.items():
# This will fail for web users without telegram_id
# TODO: Fix inventory schema
try:
items_added.append({"id": item_id, "quantity": quantity})
except:
pass
return {
"success": True,
"outcome": outcome_key,
"message": outcome.text,
"items_found": items_added,
"hp_change": hp_change,
"stamina_change": stamina_change,
"new_hp": new_hp,
"new_stamina": new_stamina
}
class UseItemRequest(BaseModel):
item_db_id: int
@app.post("/api/game/use_item")
async def use_item_endpoint(item_data: UseItemRequest, user_id: int = Depends(verify_token)):
"""Use an item from inventory"""
from bot.logic import use_item_logic
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
if not player.get('telegram_id'):
raise HTTPException(status_code=400, detail="Inventory not available for web users yet")
result = await use_item_logic(player, item_data.item_db_id)
return result
class EquipItemRequest(BaseModel):
item_db_id: int
@app.post("/api/game/equip_item")
async def equip_item_endpoint(item_data: EquipItemRequest, user_id: int = Depends(verify_token)):
"""Equip or unequip an item"""
from bot.logic import toggle_equip
player = await get_player(player_id=user_id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
if not player.get('telegram_id'):
raise HTTPException(status_code=400, detail="Inventory not available for web users yet")
result = await toggle_equip(player['telegram_id'], item_data.item_db_id)
return {"success": True, "message": result}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)