""" Security module for authentication and authorization. Handles JWT tokens, password hashing, and auth dependencies. """ import jwt import bcrypt from datetime import datetime, timedelta from typing import Dict, Any from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from .config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, API_INTERNAL_KEY from .. import database as db security = HTTPBearer() def create_access_token(data: dict) -> str: """Create a JWT access token""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def decode_token(token: str) -> dict: """Decode JWT token and return payload""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired" ) except (jwt.InvalidTokenError, jwt.DecodeError, Exception): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials" ) def hash_password(password: str) -> str: """Hash a password using bcrypt""" return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') def verify_password(password: str, password_hash: str) -> bool: """Verify a password against its hash""" return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')) async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict[str, Any]: """ Verify JWT token and return current character (requires character selection). This is the main auth dependency for protected endpoints. """ try: token = credentials.credentials payload = decode_token(token) # New system: account_id + character_id account_id = payload.get("account_id") if account_id is not None: character_id = payload.get("character_id") if character_id is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No character selected. Please select a character first." ) player = await db.get_player_by_id(character_id) if player is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Character not found" ) # Verify character belongs to account if player.get('account_id') != account_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Character does not belong to this account" ) return player # Old system fallback: player_id (for backward compatibility) player_id = payload.get("player_id") if player_id is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid token: no player or character ID" ) player = await db.get_player_by_id(player_id) if player is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Player not found" ) return player except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired" ) except (jwt.InvalidTokenError, jwt.DecodeError, Exception) as e: if isinstance(e, HTTPException): raise e raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials" ) async def verify_internal_key(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify internal API key for bot endpoints""" if credentials.credentials != API_INTERNAL_KEY: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid internal API key" ) return True