Files
echoes-of-the-ash/api/routers/auth.py
2026-02-23 15:42:21 +01:00

357 lines
11 KiB
Python

"""
Authentication router.
Handles user registration, login, and profile retrieval.
"""
from fastapi import APIRouter, HTTPException, Depends, status, Request
from typing import Dict, Any
from ..services.helpers import get_game_message
from ..core.security import create_access_token, hash_password, verify_password, get_current_user
from ..services.models import UserRegister, UserLogin
from .. import database as db
from ..items import items_manager
from ..services.helpers import calculate_player_capacity, enrich_character_data
router = APIRouter(prefix="/api/auth", tags=["authentication"])
@router.post("/register")
async def register(user: UserRegister):
"""Register a new account"""
# Check if email already exists
existing = await db.get_account_by_email(user.email)
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Hash password
password_hash = hash_password(user.password)
# Create account
account = await db.create_account(
email=user.email,
password_hash=password_hash,
account_type="web"
)
if not account:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create account"
)
# Get characters for this account (should be empty for new account)
characters = await db.get_characters_by_account_id(account["id"])
# Create access token with account_id (no character selected yet)
access_token = create_access_token({
"account_id": account["id"],
"character_id": None
})
return {
"access_token": access_token,
"token_type": "bearer",
"account": {
"id": account["id"],
"email": account["email"],
"account_type": account["account_type"],
"is_premium": account.get("premium_expires_at") is not None,
},
"characters": characters,
"needs_character_creation": len(characters) == 0
}
@router.post("/login")
async def login(user: UserLogin):
"""Login with email and password"""
# Get account by email
account = await db.get_account_by_email(user.email)
if not account:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
# Verify password
if not account.get('password_hash'):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
if not verify_password(user.password, account['password_hash']):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
# Update last login
await db.update_account_last_login(account["id"])
# Get characters for this account
characters = await db.get_characters_by_account_id(account["id"])
# Create access token with account_id (no character selected yet)
access_token = create_access_token({
"account_id": account["id"],
"character_id": None
})
return {
"access_token": access_token,
"token_type": "bearer",
"account": {
"id": account["id"],
"email": account["email"],
"account_type": account["account_type"],
"is_premium": account.get("premium_expires_at") is not None,
},
"characters": [
await enrich_character_data(char, items_manager)
for char in characters
],
"needs_character_creation": len(characters) == 0
}
@router.get("/me")
async def get_me(current_user: Dict[str, Any] = Depends(get_current_user)):
"""Get current user profile"""
return {
"id": current_user["id"],
"username": current_user.get("username"),
"name": current_user["name"],
"level": current_user["level"],
"xp": current_user["xp"],
"hp": current_user["hp"],
"max_hp": current_user["max_hp"],
"stamina": current_user["stamina"],
"max_stamina": current_user["max_stamina"],
"strength": current_user["strength"],
"agility": current_user["agility"],
"endurance": current_user["endurance"],
"intellect": current_user["intellect"],
"location_id": current_user["location_id"],
"is_dead": current_user["is_dead"],
"unspent_points": current_user["unspent_points"]
}
@router.get("/account")
async def get_account(current_user: Dict[str, Any] = Depends(get_current_user)):
"""Get current account details including characters"""
# Get account from current user's account_id
account_id = current_user.get("account_id")
if not account_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No account associated with this user"
)
account = await db.get_account_by_id(account_id)
if not account:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Account not found"
)
# Get characters for this account
characters = await db.get_characters_by_account_id(account_id)
return {
"account": {
"id": account["id"],
"email": account["email"],
"account_type": account["account_type"],
"is_premium": account.get("premium_expires_at") is not None and account.get("premium_expires_at") > 0,
"premium_expires_at": account.get("premium_expires_at"),
"created_at": account.get("created_at"),
"last_login_at": account.get("last_login_at"),
},
"characters": [
await enrich_character_data(char, items_manager)
for char in characters
]
}
@router.post("/change-email")
async def change_email(
request: "ChangeEmailRequest",
req: Request,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""Change account email address"""
from ..services.models import ChangeEmailRequest
locale = req.headers.get('Accept-Language', 'en')
# Get account
account_id = current_user.get("account_id")
if not account_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No account associated with this user"
)
account = await db.get_account_by_id(account_id)
if not account:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Account not found"
)
# Verify current password
if not account.get('password_hash'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This account does not have a password set"
)
if not verify_password(request.current_password, account['password_hash']):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Current password is incorrect"
)
# Validate new email format
import re
email_regex = r'^[^\s@]+@[^\s@]+\.[^\s@]+$'
if not re.match(email_regex, request.new_email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid email format"
)
# Update email
try:
await db.update_account_email(account_id, request.new_email)
return {"message": get_game_message('email_updated', locale), "new_email": request.new_email}
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.post("/change-password")
async def change_password(
request: "ChangePasswordRequest",
req: Request,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""Change account password"""
from ..services.models import ChangePasswordRequest
locale = req.headers.get('Accept-Language', 'en')
# Get account
account_id = current_user.get("account_id")
if not account_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No account associated with this user"
)
account = await db.get_account_by_id(account_id)
if not account:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Account not found"
)
# Verify current password
if not account.get('password_hash'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This account does not have a password set"
)
if not verify_password(request.current_password, account['password_hash']):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Current password is incorrect"
)
# Validate new password
if len(request.new_password) < 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="New password must be at least 6 characters"
)
# Hash and update password
new_password_hash = hash_password(request.new_password)
await db.update_account_password(account_id, new_password_hash)
return {"message": get_game_message('password_updated', locale)}
@router.post("/steam-login")
async def steam_login(steam_data: Dict[str, Any]):
"""
Login or register with Steam account.
Creates account if it doesn't exist.
"""
steam_id = steam_data.get("steam_id")
steam_name = steam_data.get("steam_name", "Steam User")
if not steam_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Steam ID is required"
)
# Try to find existing account by steam_id
account = await db.get_account_by_steam_id(steam_id)
if not account:
# Create new Steam account
# Use steam_id as email (unique identifier)
email = f"steam_{steam_id}@steamuser.local"
account = await db.create_account(
email=email,
password_hash=None, # Steam accounts don't have passwords
account_type="steam",
steam_id=steam_id
)
if not account:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create Steam account"
)
# Get characters for this account
characters = await db.get_characters_by_account_id(account["id"])
# Create access token with account_id (no character selected yet)
access_token = create_access_token({
"account_id": account["id"],
"character_id": None
})
return {
"access_token": access_token,
"token_type": "bearer",
"account": {
"id": account["id"],
"email": account["email"],
"account_type": account["account_type"],
"steam_id": steam_id,
"steam_name": steam_name,
"premium_expires_at": account.get("premium_expires_at"),
"created_at": account.get("created_at"),
"last_login_at": account.get("last_login_at")
},
"characters": [
await enrich_character_data(char, items_manager)
for char in characters
],
"needs_character_creation": len(characters) == 0
}