500 lines
17 KiB
Python
500 lines
17 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List
|
|
import jwt
|
|
import bcrypt
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
import sys
|
|
|
|
# Add parent directory to path to import bot modules
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from bot.database import get_player, create_player
|
|
from data.world_loader import load_world
|
|
from api.internal import router as internal_router
|
|
|
|
app = FastAPI(title="Echoes of the Ashes API", version="1.0.0")
|
|
|
|
# Include internal API router
|
|
app.include_router(internal_router)
|
|
|
|
# CORS configuration
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["https://echoesoftheashgame.patacuack.net", "http://localhost:3000"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# JWT Configuration
|
|
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production")
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
|
|
|
|
security = HTTPBearer()
|
|
|
|
# Load world data
|
|
WORLD = None
|
|
LOCATIONS = {}
|
|
try:
|
|
WORLD = load_world()
|
|
# WORLD.locations is already a dict {location_id: Location}
|
|
LOCATIONS = WORLD.locations
|
|
print(f"✅ Loaded {len(LOCATIONS)} locations")
|
|
except Exception as e:
|
|
print(f"⚠️ Warning: Could not load world data: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Pydantic Models
|
|
class UserRegister(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
class UserLogin(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
class Token(BaseModel):
|
|
access_token: str
|
|
token_type: str = "bearer"
|
|
|
|
class User(BaseModel):
|
|
id: int
|
|
username: str
|
|
telegram_id: Optional[str] = None
|
|
|
|
class PlayerState(BaseModel):
|
|
location_id: str
|
|
location_name: str
|
|
health: int
|
|
max_health: int
|
|
stamina: int
|
|
max_stamina: int
|
|
inventory: List[dict]
|
|
status_effects: List[dict]
|
|
|
|
class MoveRequest(BaseModel):
|
|
direction: str
|
|
|
|
|
|
# Helper Functions
|
|
def create_access_token(data: dict):
|
|
to_encode = data.copy()
|
|
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
try:
|
|
token = credentials.credentials
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
user_id: int = payload.get("sub")
|
|
if user_id is None:
|
|
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
|
|
return user_id
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(status_code=401, detail="Token has expired")
|
|
except jwt.JWTError:
|
|
raise HTTPException(status_code=401, detail="Could not validate credentials")
|
|
|
|
|
|
# Routes
|
|
@app.get("/")
|
|
async def root():
|
|
return {"message": "Echoes of the Ashes API", "status": "online"}
|
|
|
|
@app.post("/api/auth/register", response_model=Token)
|
|
async def register(user_data: UserRegister):
|
|
"""Register a new user account"""
|
|
try:
|
|
# Check if username already exists
|
|
existing_player = await get_player(username=user_data.username)
|
|
if existing_player:
|
|
raise HTTPException(status_code=400, detail="Username already exists")
|
|
|
|
# Hash password
|
|
password_hash = bcrypt.hashpw(user_data.password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
|
|
|
# Create player with web auth
|
|
player = await create_player(
|
|
telegram_id=None,
|
|
username=user_data.username,
|
|
password_hash=password_hash
|
|
)
|
|
|
|
if not player or 'id' not in player:
|
|
print(f"ERROR: create_player returned: {player}")
|
|
raise HTTPException(status_code=500, detail="Failed to create player - no ID returned")
|
|
|
|
# Create token
|
|
access_token = create_access_token(data={"sub": player['id']})
|
|
|
|
return {"access_token": access_token}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
import traceback
|
|
print(f"ERROR in register: {str(e)}")
|
|
print(traceback.format_exc())
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/api/auth/login", response_model=Token)
|
|
async def login(user_data: UserLogin):
|
|
"""Login with username and password"""
|
|
try:
|
|
# Get player
|
|
player = await get_player(username=user_data.username)
|
|
if not player or not player.get('password_hash'):
|
|
raise HTTPException(status_code=401, detail="Invalid username or password")
|
|
|
|
# Verify password
|
|
if not bcrypt.checkpw(user_data.password.encode('utf-8'), player['password_hash'].encode('utf-8')):
|
|
raise HTTPException(status_code=401, detail="Invalid username or password")
|
|
|
|
# Create token
|
|
access_token = create_access_token(data={"sub": player['id']})
|
|
|
|
return {"access_token": access_token}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.get("/api/auth/me", response_model=User)
|
|
async def get_current_user(user_id: int = Depends(verify_token)):
|
|
"""Get current authenticated user"""
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
return {
|
|
"id": player['id'],
|
|
"username": player.get('username'),
|
|
"telegram_id": player.get('telegram_id')
|
|
}
|
|
|
|
@app.get("/api/game/state", response_model=PlayerState)
|
|
async def get_game_state(user_id: int = Depends(verify_token)):
|
|
"""Get current player game state"""
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
location = LOCATIONS.get(player['location_id'])
|
|
|
|
# TODO: Get actual inventory and status effects from database
|
|
inventory = []
|
|
status_effects = []
|
|
|
|
return {
|
|
"location_id": player['location_id'],
|
|
"location_name": location.name if location else "Unknown",
|
|
"health": player['hp'],
|
|
"max_health": player['max_hp'],
|
|
"stamina": player['stamina'],
|
|
"max_stamina": player['max_stamina'],
|
|
"inventory": inventory,
|
|
"status_effects": status_effects
|
|
}
|
|
|
|
@app.post("/api/game/move")
|
|
async def move_player(move_data: MoveRequest, user_id: int = Depends(verify_token)):
|
|
"""Move player in a direction"""
|
|
from bot.database import update_player
|
|
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
current_location = LOCATIONS.get(player['location_id'])
|
|
if not current_location:
|
|
raise HTTPException(status_code=400, detail="Invalid current location")
|
|
|
|
# Check if player has enough stamina
|
|
if player['stamina'] < 1:
|
|
raise HTTPException(status_code=400, detail="Not enough stamina to move")
|
|
|
|
# Find exit in the specified direction (exits is dict {direction: destination_id})
|
|
destination_id = current_location.exits.get(move_data.direction.lower())
|
|
|
|
if not destination_id:
|
|
raise HTTPException(status_code=400, detail=f"Cannot move {move_data.direction} from here")
|
|
|
|
# Move player
|
|
new_location = LOCATIONS.get(destination_id)
|
|
if not new_location:
|
|
raise HTTPException(status_code=400, detail="Invalid destination")
|
|
|
|
# Update player location and stamina (use player_id for web users)
|
|
await update_player(player_id=player['id'], updates={
|
|
'location_id': new_location.id,
|
|
'stamina': max(0, player['stamina'] - 1)
|
|
})
|
|
|
|
# Get updated player state
|
|
updated_player = await get_player(player_id=user_id)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"You travel {move_data.direction} to {new_location.name}. {new_location.description}",
|
|
"player_state": {
|
|
"location_id": updated_player['location_id'],
|
|
"location_name": new_location.name,
|
|
"health": updated_player['hp'],
|
|
"max_health": updated_player['max_hp'],
|
|
"stamina": updated_player['stamina'],
|
|
"max_stamina": updated_player['max_stamina'],
|
|
"inventory": [],
|
|
"status_effects": []
|
|
}
|
|
}
|
|
|
|
@app.get("/api/game/location")
|
|
async def get_current_location(user_id: int = Depends(verify_token)):
|
|
"""Get detailed information about current location"""
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
location = LOCATIONS.get(player['location_id'])
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail=f"Location '{player['location_id']}' not found")
|
|
|
|
# Get available directions from exits dict
|
|
directions = list(location.exits.keys())
|
|
|
|
# Get NPCs at location (TODO: implement NPC spawning)
|
|
npcs = []
|
|
|
|
# Get items at location (TODO: implement dropped items)
|
|
items = []
|
|
|
|
# Determine image extension (png or jpg)
|
|
image_url = None
|
|
if location.image_path:
|
|
# Use the path from location data
|
|
image_url = f"/{location.image_path}"
|
|
else:
|
|
# Default to png with fallback to jpg
|
|
image_url = f"/images/locations/{location.id}.png"
|
|
|
|
return {
|
|
"id": location.id,
|
|
"name": location.name,
|
|
"description": location.description,
|
|
"directions": directions,
|
|
"npcs": npcs,
|
|
"items": items,
|
|
"image_url": image_url,
|
|
"interactables": [{"id": k, "name": v.name} for k, v in location.interactables.items()]
|
|
}
|
|
|
|
@app.get("/api/game/inventory")
|
|
async def get_inventory(user_id: int = Depends(verify_token)):
|
|
"""Get player's inventory"""
|
|
from bot.database import get_inventory
|
|
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
# For web users without telegram_id, inventory might be empty
|
|
# This is a limitation of the current schema
|
|
inventory = []
|
|
|
|
return {
|
|
"items": inventory,
|
|
"capacity": 20 # TODO: Calculate based on equipped bag
|
|
}
|
|
|
|
@app.get("/api/game/profile")
|
|
async def get_profile(user_id: int = Depends(verify_token)):
|
|
"""Get player profile and stats"""
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
return {
|
|
"name": player['name'],
|
|
"level": player['level'],
|
|
"xp": player['xp'],
|
|
"hp": player['hp'],
|
|
"max_hp": player['max_hp'],
|
|
"stamina": player['stamina'],
|
|
"max_stamina": player['max_stamina'],
|
|
"strength": player['strength'],
|
|
"agility": player['agility'],
|
|
"endurance": player['endurance'],
|
|
"intellect": player['intellect'],
|
|
"unspent_points": player['unspent_points'],
|
|
"is_dead": player['is_dead']
|
|
}
|
|
|
|
@app.get("/api/game/map")
|
|
async def get_map(user_id: int = Depends(verify_token)):
|
|
"""Get world map data"""
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
# Return all locations and connections (LOCATIONS is dict {id: Location})
|
|
locations_data = []
|
|
for loc_id, loc in LOCATIONS.items():
|
|
locations_data.append({
|
|
"id": loc.id,
|
|
"name": loc.name,
|
|
"description": loc.description,
|
|
"exits": loc.exits # Dict of {direction: destination_id}
|
|
})
|
|
|
|
return {
|
|
"current_location": player['location_id'],
|
|
"locations": locations_data
|
|
}
|
|
|
|
@app.post("/api/game/inspect")
|
|
async def inspect_area(user_id: int = Depends(verify_token)):
|
|
"""Inspect the current area for details"""
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
location = LOCATIONS.get(player['location_id'])
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
# Get detailed information
|
|
interactables_detail = []
|
|
for inst_id, inter in location.interactables.items():
|
|
actions = [{"id": act.id, "label": act.label, "stamina_cost": act.stamina_cost}
|
|
for act in inter.actions.values()]
|
|
interactables_detail.append({
|
|
"instance_id": inst_id,
|
|
"name": inter.name,
|
|
"actions": actions
|
|
})
|
|
|
|
return {
|
|
"location": location.name,
|
|
"description": location.description,
|
|
"interactables": interactables_detail,
|
|
"exits": location.exits
|
|
}
|
|
|
|
class InteractRequest(BaseModel):
|
|
interactable_id: str
|
|
action_id: str
|
|
|
|
@app.post("/api/game/interact")
|
|
async def interact_with_object(interact_data: InteractRequest, user_id: int = Depends(verify_token)):
|
|
"""Interact with an object in the world"""
|
|
from bot.database import update_player, add_inventory_item
|
|
import random
|
|
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
location = LOCATIONS.get(player['location_id'])
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
interactable = location.interactables.get(interact_data.interactable_id)
|
|
if not interactable:
|
|
raise HTTPException(status_code=404, detail="Interactable not found")
|
|
|
|
action = interactable.actions.get(interact_data.action_id)
|
|
if not action:
|
|
raise HTTPException(status_code=404, detail="Action not found")
|
|
|
|
# Check stamina
|
|
if player['stamina'] < action.stamina_cost:
|
|
raise HTTPException(status_code=400, detail="Not enough stamina")
|
|
|
|
# Perform action - randomly choose outcome
|
|
outcome_key = random.choice(list(action.outcomes.keys()))
|
|
outcome = action.outcomes[outcome_key]
|
|
|
|
# Apply outcome
|
|
stamina_change = -action.stamina_cost
|
|
hp_change = -outcome.damage_taken if outcome.damage_taken else 0
|
|
items_found = outcome.items_reward if outcome.items_reward else {}
|
|
|
|
# Update player
|
|
new_hp = max(1, player['hp'] + hp_change)
|
|
new_stamina = max(0, player['stamina'] + stamina_change)
|
|
|
|
await update_player(player_id=player['id'], updates={
|
|
'hp': new_hp,
|
|
'stamina': new_stamina
|
|
})
|
|
|
|
# Add items to inventory (if player has telegram_id for FK)
|
|
items_added = []
|
|
if player.get('telegram_id') and items_found:
|
|
for item_id, quantity in items_found.items():
|
|
# This will fail for web users without telegram_id
|
|
# TODO: Fix inventory schema
|
|
try:
|
|
items_added.append({"id": item_id, "quantity": quantity})
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"outcome": outcome_key,
|
|
"message": outcome.text,
|
|
"items_found": items_added,
|
|
"hp_change": hp_change,
|
|
"stamina_change": stamina_change,
|
|
"new_hp": new_hp,
|
|
"new_stamina": new_stamina
|
|
}
|
|
|
|
class UseItemRequest(BaseModel):
|
|
item_db_id: int
|
|
|
|
@app.post("/api/game/use_item")
|
|
async def use_item_endpoint(item_data: UseItemRequest, user_id: int = Depends(verify_token)):
|
|
"""Use an item from inventory"""
|
|
from bot.logic import use_item_logic
|
|
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
if not player.get('telegram_id'):
|
|
raise HTTPException(status_code=400, detail="Inventory not available for web users yet")
|
|
|
|
result = await use_item_logic(player, item_data.item_db_id)
|
|
return result
|
|
|
|
class EquipItemRequest(BaseModel):
|
|
item_db_id: int
|
|
|
|
@app.post("/api/game/equip_item")
|
|
async def equip_item_endpoint(item_data: EquipItemRequest, user_id: int = Depends(verify_token)):
|
|
"""Equip or unequip an item"""
|
|
from bot.logic import toggle_equip
|
|
|
|
player = await get_player(player_id=user_id)
|
|
if not player:
|
|
raise HTTPException(status_code=404, detail="Player not found")
|
|
|
|
if not player.get('telegram_id'):
|
|
raise HTTPException(status_code=400, detail="Inventory not available for web users yet")
|
|
|
|
result = await toggle_equip(player['telegram_id'], item_data.item_db_id)
|
|
return {"success": True, "message": result}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|