Files
echoes-of-the-ash/old/REDIS_INTEGRATION_PLAN.md
2025-11-27 16:27:01 +01:00

34 KiB

Redis Integration Plan: Scalable Multi-Worker Architecture

Executive Summary

This document outlines a comprehensive plan to integrate Redis into Echoes of the Ashes for:

  1. Horizontal Scaling: Support multiple FastAPI workers behind a load balancer
  2. Performance: Reduce database queries by caching frequently accessed data
  3. Real-time Updates: Improve WebSocket message delivery across workers using Pub/Sub

Current Architecture Analysis

Existing Limitations

  1. Single-Worker WebSocket Management

    • ConnectionManager stores WebSocket connections in memory
    • Each worker has its own isolated connection dictionary
    • Player on Worker A cannot receive broadcasts triggered by Worker B
    • No cross-worker communication mechanism
  2. Database Query Bottlenecks

    • Every location broadcast queries get_players_in_location() from PostgreSQL
    • Profile/state endpoints fetch fresh data on every request
    • Location data, item definitions, NPC stats loaded repeatedly
    • High database load with many concurrent players
  3. Background Tasks

    • Single worker executes background tasks (status effects, spawns, etc.)
    • Uses file locking to prevent duplicate execution
    • Not suitable for true multi-worker horizontal scaling

Proposed Redis Architecture

1. Redis Data Structures

A. Player Session Data (Redis Hash)

Key Pattern: player:{character_id}:session

Fields:

{
  "worker_id": "worker-1",
  "websocket_connected": "true",
  "location_id": "overpass",
  "hp": "85",
  "max_hp": "100",
  "stamina": "42",
  "max_stamina": "50",
  "level": "12",
  "xp": "2450",
  "last_movement_time": "1762710676.592",
  "in_combat": "false",
  "last_heartbeat": "1762710676.592"
}

TTL: 30 minutes (refreshed on activity)

Purpose:

  • Track which worker manages each player's WebSocket
  • Cache player stats to avoid DB queries
  • Detect disconnected/stale sessions

B. Location Player Registry (Redis Set)

Key Pattern: location:{location_id}:players

Values: Character IDs (integers)

Example:

location:overpass:players = {1, 2, 5, 12, 45}

Purpose:

  • Instantly query who's in a location (no DB query)
  • Used for targeted broadcasts
  • Updated on player movement

C. Worker Connection Registry (Redis Hash)

Key Pattern: worker:{worker_id}:connections

Fields: character_id -> websocket_info

Example:

{
  "1": "connected",
  "5": "connected",
  "12": "connected"
}

Purpose:

  • Each worker tracks its own connections
  • Used to route messages to correct worker
  • Cleaned up on worker shutdown

D. Cached Location Data (Redis Hash)

Key Pattern: location:{location_id}:cache

Fields:

{
  "name": "Overpass",
  "description": "A crumbling highway overpass...",
  "exits": "{\"north\": \"gas_station\", \"south\": \"ruins\"}",
  "danger_level": "3",
  "image_path": "/images/locations/overpass.png",
  "x": "2.5",
  "y": "3.0",
  "interactables": "[{...}]",
  "npcs": "[{...}]"
}

TTL: No expiration (static data, invalidated manually)

Purpose:

  • Eliminate DB queries for location data
  • Preloaded on server startup
  • Served directly from Redis

E. Cached Item Definitions (Redis Hash)

Key Pattern: item:{item_id}:def

Fields:

{
  "name": "Iron Sword",
  "type": "weapon",
  "weight": "3.5",
  "damage_min": "5",
  "damage_max": "12",
  "durability": "100",
  "tier": "2"
}

TTL: No expiration (static data)

Purpose:

  • Fast item lookups without parsing JSON files
  • Preloaded on server startup

F. Player Inventory Cache (Redis List)

Key Pattern: player:{character_id}:inventory

Values: JSON-encoded inventory items

TTL: 10 minutes

Purpose:

  • Cache inventory for quick profile/state responses
  • Invalidated on item add/remove/use

2. Redis Pub/Sub Channels

A. Global Broadcast Channel

Channel: game:broadcast

Message Format:

{
  "type": "global_broadcast",
  "payload": {
    "type": "server_announcement",
    "data": {
      "message": "Server maintenance in 5 minutes"
    }
  }
}

Subscribers: All workers

Purpose: Server-wide announcements


B. Location-Specific Channels

Channel Pattern: location:{location_id}

Message Format:

{
  "type": "location_update",
  "location_id": "overpass",
  "exclude_player_id": 5,
  "payload": {
    "type": "location_update",
    "data": {
      "message": "Jocaru picked up 3x Iron Ore",
      "action": "item_picked_up"
    },
    "timestamp": "2025-11-09T17:52:00Z"
  }
}

Subscribers: All workers

Purpose:

  • Workers subscribe to all location channels
  • When receiving a message, check if they have connected players in that location
  • Send WebSocket messages to their local connections only

C. Player-Specific Channels

Channel Pattern: player:{character_id}

Message Format:

{
  "type": "personal_message",
  "player_id": 5,
  "payload": {
    "type": "combat_update",
    "data": {
      "message": "You dealt 12 damage!",
      "combat_over": false
    }
  }
}

Subscribers: Worker managing that player's WebSocket

Purpose:

  • Direct messages to specific players
  • Combat updates, inventory changes, etc.

D. Worker Coordination Channel

Channel: game:workers

Message Format:

{
  "type": "worker_join",
  "worker_id": "worker-3",
  "timestamp": "2025-11-09T17:52:00Z"
}

Purpose:

  • Worker lifecycle events
  • Graceful shutdown coordination
  • Load distribution awareness

Implementation Plan

Phase 1: Redis Infrastructure Setup

1.1 Add Redis to Docker Compose

File: docker-compose.yml

  echoes_redis:
    image: redis:7-alpine
    container_name: echoes_of_the_ashes_redis
    restart: unless-stopped
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data
    networks:
      - default_docker
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 3s
      retries: 5

volumes:
  redis_data:

Environment Variables (.env):

REDIS_HOST=echoes_redis
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=  # Optional, leave empty for dev

1.2 Install Python Dependencies

File: requirements.txt

redis==5.0.1
hiredis==2.3.2  # C parser for performance

Phase 2: Redis Client Module

2.1 Create Redis Manager

File: api/redis_manager.py

"""
Redis manager for caching and pub/sub functionality.
Handles connection pooling, pub/sub, and caching operations.
"""
import os
import json
import asyncio
import redis.asyncio as redis
from typing import Optional, Dict, Any, List, Set
from datetime import timedelta

class RedisManager:
    def __init__(self):
        self.redis_client: Optional[redis.Redis] = None
        self.pubsub: Optional[redis.client.PubSub] = None
        self.worker_id: str = f"worker-{os.getpid()}"
        self.subscribed_channels: Set[str] = set()
        
    async def connect(self):
        """Initialize Redis connection pool"""
        host = os.getenv("REDIS_HOST", "localhost")
        port = int(os.getenv("REDIS_PORT", "6379"))
        db = int(os.getenv("REDIS_DB", "0"))
        password = os.getenv("REDIS_PASSWORD", None)
        
        self.redis_client = redis.Redis(
            host=host,
            port=port,
            db=db,
            password=password,
            decode_responses=True,
            socket_keepalive=True,
            socket_connect_timeout=5,
            retry_on_timeout=True
        )
        
        # Test connection
        await self.redis_client.ping()
        print(f"✅ Redis connected: {host}:{port}")
        
        # Initialize pub/sub
        self.pubsub = self.redis_client.pubsub()
        
    async def disconnect(self):
        """Close Redis connections"""
        if self.pubsub:
            await self.pubsub.close()
        if self.redis_client:
            await self.redis_client.close()
    
    # ====== SESSION MANAGEMENT ======
    
    async def set_player_session(self, character_id: int, data: Dict[str, Any], ttl: int = 1800):
        """Store player session data with TTL"""
        key = f"player:{character_id}:session"
        data['worker_id'] = self.worker_id
        data['last_heartbeat'] = str(asyncio.get_event_loop().time())
        
        await self.redis_client.hset(key, mapping=data)
        await self.redis_client.expire(key, ttl)
        
    async def get_player_session(self, character_id: int) -> Optional[Dict[str, Any]]:
        """Retrieve player session data"""
        key = f"player:{character_id}:session"
        data = await self.redis_client.hgetall(key)
        return data if data else None
    
    async def delete_player_session(self, character_id: int):
        """Remove player session"""
        key = f"player:{character_id}:session"
        await self.redis_client.delete(key)
    
    # ====== LOCATION REGISTRY ======
    
    async def add_player_to_location(self, character_id: int, location_id: str):
        """Add player to location set"""
        key = f"location:{location_id}:players"
        await self.redis_client.sadd(key, character_id)
    
    async def remove_player_from_location(self, character_id: int, location_id: str):
        """Remove player from location set"""
        key = f"location:{location_id}:players"
        await self.redis_client.srem(key, character_id)
    
    async def get_players_in_location(self, location_id: str) -> List[int]:
        """Get all player IDs in a location"""
        key = f"location:{location_id}:players"
        members = await self.redis_client.smembers(key)
        return [int(m) for m in members] if members else []
    
    async def move_player_location(self, character_id: int, from_location: str, to_location: str):
        """Atomically move player between locations"""
        await self.remove_player_from_location(character_id, from_location)
        await self.add_player_to_location(character_id, to_location)
    
    # ====== CACHING ======
    
    async def cache_location(self, location_id: str, data: Dict[str, Any]):
        """Cache location data (no expiration - static data)"""
        key = f"location:{location_id}:cache"
        # Serialize complex objects to JSON strings
        cache_data = {}
        for k, v in data.items():
            if isinstance(v, (dict, list)):
                cache_data[k] = json.dumps(v)
            else:
                cache_data[k] = str(v)
        await self.redis_client.hset(key, mapping=cache_data)
    
    async def get_cached_location(self, location_id: str) -> Optional[Dict[str, Any]]:
        """Retrieve cached location data"""
        key = f"location:{location_id}:cache"
        data = await self.redis_client.hgetall(key)
        if not data:
            return None
        
        # Deserialize JSON fields
        for k in ['exits', 'interactables', 'npcs']:
            if k in data:
                try:
                    data[k] = json.loads(data[k])
                except:
                    pass
        return data
    
    async def cache_inventory(self, character_id: int, inventory: List[Dict], ttl: int = 600):
        """Cache player inventory"""
        key = f"player:{character_id}:inventory"
        await self.redis_client.delete(key)  # Clear old data
        if inventory:
            inventory_json = [json.dumps(item) for item in inventory]
            await self.redis_client.rpush(key, *inventory_json)
            await self.redis_client.expire(key, ttl)
    
    async def get_cached_inventory(self, character_id: int) -> Optional[List[Dict]]:
        """Retrieve cached inventory"""
        key = f"player:{character_id}:inventory"
        items = await self.redis_client.lrange(key, 0, -1)
        if not items:
            return None
        return [json.loads(item) for item in items]
    
    async def invalidate_inventory(self, character_id: int):
        """Invalidate inventory cache"""
        key = f"player:{character_id}:inventory"
        await self.redis_client.delete(key)
    
    # ====== PUB/SUB ======
    
    async def publish_to_location(self, location_id: str, message: Dict[str, Any], exclude_player_id: Optional[int] = None):
        """Publish message to location channel"""
        channel = f"location:{location_id}"
        payload = {
            "type": "location_update",
            "location_id": location_id,
            "exclude_player_id": exclude_player_id,
            "payload": message
        }
        await self.redis_client.publish(channel, json.dumps(payload))
    
    async def publish_to_player(self, character_id: int, message: Dict[str, Any]):
        """Publish message to player-specific channel"""
        channel = f"player:{character_id}"
        payload = {
            "type": "personal_message",
            "player_id": character_id,
            "payload": message
        }
        await self.redis_client.publish(channel, json.dumps(payload))
    
    async def subscribe_to_channels(self, channels: List[str]):
        """Subscribe to multiple channels"""
        await self.pubsub.subscribe(*channels)
        self.subscribed_channels.update(channels)
        print(f"📡 Worker {self.worker_id} subscribed to {len(channels)} channels")
    
    async def subscribe_to_player(self, character_id: int):
        """Subscribe to player-specific channel"""
        channel = f"player:{character_id}"
        await self.pubsub.subscribe(channel)
        self.subscribed_channels.add(channel)
    
    async def unsubscribe_from_player(self, character_id: int):
        """Unsubscribe from player channel"""
        channel = f"player:{character_id}"
        if channel in self.subscribed_channels:
            await self.pubsub.unsubscribe(channel)
            self.subscribed_channels.remove(channel)
    
    async def listen_for_messages(self, message_handler):
        """Listen for pub/sub messages (blocking)"""
        async for message in self.pubsub.listen():
            if message['type'] == 'message':
                try:
                    data = json.loads(message['data'])
                    await message_handler(message['channel'], data)
                except Exception as e:
                    print(f"❌ Error handling pub/sub message: {e}")

# Global Redis instance
redis_manager = RedisManager()

Phase 3: Enhanced Connection Manager

3.1 Update ConnectionManager to Use Redis

File: api/main.py (ConnectionManager class)

Changes:

  1. On WebSocket Connect:

    • Store connection locally (as before)
    • Register player session in Redis
    • Add player to location registry
    • Subscribe to player-specific channel
    • Broadcast "player_joined" to location
  2. On WebSocket Disconnect:

    • Remove local connection
    • Delete player session from Redis
    • Remove from location registry
    • Unsubscribe from player channel
    • Broadcast "player_left" to location
  3. Broadcasting:

    • send_personal_message(): Publish to Redis player channel (not direct WebSocket)
    • send_to_location(): Publish to Redis location channel (not query DB)
    • Local message handler receives Redis pub/sub and sends to local WebSockets

Implementation:

class ConnectionManager:
    def __init__(self, redis_manager):
        self.active_connections: Dict[int, WebSocket] = {}
        self.player_usernames: Dict[int, str] = {}
        self.redis = redis_manager
        self.worker_id = redis_manager.worker_id
        
    async def connect(self, websocket: WebSocket, player_id: int, username: str, location_id: str):
        """Accept WebSocket and register in Redis"""
        await websocket.accept()
        self.active_connections[player_id] = websocket
        self.player_usernames[player_id] = username
        
        # Register in Redis
        await self.redis.set_player_session(player_id, {
            'websocket_connected': 'true',
            'location_id': location_id,
            'username': username
        })
        await self.redis.add_player_to_location(player_id, location_id)
        await self.redis.subscribe_to_player(player_id)
        
        print(f"🔌 WebSocket connected: {username} (player_id={player_id}) on {self.worker_id}")
    
    def disconnect(self, player_id: int, location_id: str):
        """Remove WebSocket and clean up Redis"""
        if player_id in self.active_connections:
            username = self.player_usernames.get(player_id, "unknown")
            del self.active_connections[player_id]
            if player_id in self.player_usernames:
                del self.player_usernames[player_id]
            
            # Clean up Redis (fire-and-forget)
            asyncio.create_task(self.redis.delete_player_session(player_id))
            asyncio.create_task(self.redis.remove_player_from_location(player_id, location_id))
            asyncio.create_task(self.redis.unsubscribe_from_player(player_id))
            
            print(f"🔌 WebSocket disconnected: {username} (player_id={player_id})")
    
    async def send_personal_message(self, player_id: int, message: dict):
        """Publish to Redis player channel (cross-worker)"""
        await self.redis.publish_to_player(player_id, message)
    
    async def send_to_location(self, location_id: str, message: dict, exclude_player_id: Optional[int] = None):
        """Publish to Redis location channel (cross-worker)"""
        await self.redis.publish_to_location(location_id, message, exclude_player_id)
    
    async def handle_redis_message(self, channel: str, data: dict):
        """Handle incoming Redis pub/sub messages"""
        if channel.startswith('player:'):
            # Personal message - send to local WebSocket if connected
            player_id = int(channel.split(':')[1])
            if player_id in self.active_connections:
                try:
                    await self.active_connections[player_id].send_json(data['payload'])
                except Exception as e:
                    print(f"❌ Failed to send to player {player_id}: {e}")
                    # Don't disconnect here - let WebSocket handler detect it
        
        elif channel.startswith('location:'):
            # Location broadcast - send to local WebSockets in that location
            location_id = data['location_id']
            exclude_player_id = data.get('exclude_player_id')
            payload = data['payload']
            
            # Get players in location from Redis (fast!)
            players_in_location = await self.redis.get_players_in_location(location_id)
            
            # Send to local connections only
            for player_id in players_in_location:
                if player_id != exclude_player_id and player_id in self.active_connections:
                    try:
                        await self.active_connections[player_id].send_json(payload)
                    except:
                        pass  # Let WebSocket handler detect disconnects

Phase 4: Preload Static Data into Redis

4.1 Startup Data Loader

File: api/redis_loader.py

"""
Load static game data into Redis on server startup.
Reduces database queries for frequently accessed data.
"""
from .redis_manager import redis_manager
from .world_loader import load_world
from .items import ItemsManager

async def preload_game_data():
    """Load all static game data into Redis"""
    print("🔄 Preloading game data into Redis...")
    
    # Load world data
    world = load_world()
    
    # Cache all locations
    for location_id, location in world.locations.items():
        location_data = {
            'id': location.id,
            'name': location.name,
            'description': location.description,
            'exits': location.exits,
            'danger_level': location.danger_level,
            'image_path': location.image_path,
            'x': getattr(location, 'x', 0.0),
            'y': getattr(location, 'y', 0.0),
            'tags': getattr(location, 'tags', []),
            'interactables': [interactable.to_dict() for interactable in location.interactables],
            'npcs': location.npcs
        }
        await redis_manager.cache_location(location_id, location_data)
    
    print(f"✅ Cached {len(world.locations)} locations in Redis")
    
    # Cache all items
    items_manager = ItemsManager()
    for item_id, item in items_manager.items.items():
        item_data = {
            'id': item.id,
            'name': item.name,
            'description': item.description,
            'type': item.type,
            'weight': item.weight,
            'volume': item.volume,
            'emoji': item.emoji,
            'image_path': item.image_path,
            'equippable': item.equippable,
            'consumable': item.consumable,
            'stats': item.stats or {},
            'effects': item.effects or {}
        }
        key = f"item:{item_id}:def"
        await redis_manager.redis_client.hset(key, mapping={
            k: json.dumps(v) if isinstance(v, (dict, list, bool)) else str(v) 
            for k, v in item_data.items()
        })
    
    print(f"✅ Cached {len(items_manager.items)} items in Redis")

Update lifespan() in main.py:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    await db.init_db()
    print("✅ Database initialized")
    
    # Connect to Redis
    await redis_manager.connect()
    
    # Preload static data
    await preload_game_data()
    
    # Start pub/sub listener in background
    asyncio.create_task(redis_manager.listen_for_messages(manager.handle_redis_message))
    
    # Subscribe to all location channels
    location_channels = [f"location:{loc_id}" for loc_id in LOCATIONS.keys()]
    await redis_manager.subscribe_to_channels(location_channels + ['game:broadcast'])
    
    # Start background tasks
    tasks = await background_tasks.start_background_tasks(manager, LOCATIONS)
    
    yield
    
    # Shutdown
    await background_tasks.stop_background_tasks(tasks)
    await redis_manager.disconnect()

Phase 5: Update Endpoints to Use Redis Cache

5.1 Example: /api/game/state Endpoint

Before (database queries):

@app.get("/api/game/state")
async def get_game_state(current_user: dict = Depends(get_current_user)):
    player = await db.get_player_by_id(current_user['id'])  # DB query
    location = LOCATIONS.get(player['location_id'])  # In-memory
    inventory = await db.get_inventory(current_user['id'])  # DB query
    equipment = await db.get_all_equipment(current_user['id'])  # DB query
    # ... more DB queries

After (Redis cache):

@app.get("/api/game/state")
async def get_game_state(current_user: dict = Depends(get_current_user)):
    # Get player session from Redis (cached)
    player_session = await redis_manager.get_player_session(current_user['id'])
    
    # If not in cache, fetch from DB and cache
    if not player_session:
        player = await db.get_player_by_id(current_user['id'])
        await redis_manager.set_player_session(current_user['id'], player)
        player_session = player
    
    # Get location from Redis cache
    location_data = await redis_manager.get_cached_location(player_session['location_id'])
    
    # Get inventory from Redis cache
    inventory = await redis_manager.get_cached_inventory(current_user['id'])
    if not inventory:
        inventory = await db.get_inventory(current_user['id'])
        await redis_manager.cache_inventory(current_user['id'], inventory)
    
    # ... rest of endpoint

5.2 Example: /api/game/move Endpoint

Update to invalidate caches and publish events:

@app.post("/api/game/move")
async def move(move_req: MoveRequest, current_user: dict = Depends(get_current_user)):
    # ... existing validation ...
    
    # Execute move
    success, message, new_location_id, stamina_cost, distance = await game_logic.move_player(
        current_user['id'],
        move_req.direction,
        LOCATIONS
    )
    
    if success:
        old_location = player['location_id']
        
        # Update Redis location registry
        await redis_manager.move_player_location(
            current_user['id'],
            old_location,
            new_location_id
        )
        
        # Update player session cache
        await redis_manager.set_player_session(current_user['id'], {
            'location_id': new_location_id,
            'stamina': player['stamina'] - stamina_cost,
            'last_movement_time': current_time
        })
        
        # Broadcast to OLD location via Redis pub/sub
        await redis_manager.publish_to_location(
            old_location,
            {
                "type": "location_update",
                "data": {
                    "message": f"{player['name']} left the area",
                    "action": "player_left",
                    "player_id": current_user['id']
                },
                "timestamp": datetime.utcnow().isoformat()
            },
            exclude_player_id=current_user['id']
        )
        
        # Broadcast to NEW location via Redis pub/sub
        await redis_manager.publish_to_location(
            new_location_id,
            {
                "type": "location_update",
                "data": {
                    "message": f"{player['name']} arrived",
                    "action": "player_arrived",
                    "player_id": current_user['id']
                },
                "timestamp": datetime.utcnow().isoformat()
            },
            exclude_player_id=current_user['id']
        )
        
        # Send direct update to moving player
        await redis_manager.publish_to_player(
            current_user['id'],
            {
                "type": "state_update",
                "data": {
                    "player": {
                        "stamina": player['stamina'] - stamina_cost,
                        "location_id": new_location_id
                    }
                },
                "timestamp": datetime.utcnow().isoformat()
            }
        )

Phase 6: Multi-Worker Setup

6.1 Update Docker Compose for Multiple Workers

File: docker-compose.yml

  echoes_of_the_ashes_api:
    build:
      context: .
      dockerfile: Dockerfile.api
    restart: unless-stopped
    env_file:
      - .env
    environment:
      - WORKERS=4  # Number of Uvicorn workers
    volumes:
      - ./gamedata:/app/gamedata:ro
      - ./images:/app/images:ro
    depends_on:
      - echoes_of_the_ashes_db
      - echoes_redis
    deploy:
      replicas: 1  # Single container, multiple workers inside
    networks:
      - default_docker
      - traefik
    labels:
      # ... Traefik labels

6.2 Update Dockerfile to Support Multi-Worker

File: Dockerfile.api

# ... existing build steps ...

# Run with multiple workers
CMD ["sh", "-c", "uvicorn api.main:app --host 0.0.0.0 --port 8000 --workers ${WORKERS:-4}"]

Phase 7: Background Tasks with Redis Coordination

7.1 Distributed Task Locking

File: api/background_tasks.py

Update to use Redis locks instead of file locks:

import asyncio
from .redis_manager import redis_manager

async def acquire_distributed_lock(lock_name: str, ttl: int = 300) -> bool:
    """Acquire a distributed lock in Redis"""
    key = f"lock:{lock_name}"
    acquired = await redis_manager.redis_client.set(
        key,
        redis_manager.worker_id,
        nx=True,  # Only set if not exists
        ex=ttl    # TTL in seconds
    )
    return acquired

async def release_distributed_lock(lock_name: str):
    """Release a distributed lock"""
    key = f"lock:{lock_name}"
    # Only delete if we own the lock
    lock_value = await redis_manager.redis_client.get(key)
    if lock_value == redis_manager.worker_id:
        await redis_manager.redis_client.delete(key)

async def spawn_manager_task():
    """Background task: Spawn wandering enemies (single worker)"""
    while True:
        try:
            # Try to acquire lock
            if await acquire_distributed_lock('spawn_manager', ttl=60):
                print(f"🔒 {redis_manager.worker_id} acquired spawn_manager lock")
                
                # Do work
                await spawn_wandering_enemies()
                
                # Release lock
                await release_distributed_lock('spawn_manager')
            else:
                print(f"⏭️  Another worker handling spawn_manager")
        except Exception as e:
            print(f"❌ Error in spawn_manager: {e}")
            await release_distributed_lock('spawn_manager')
        
        await asyncio.sleep(30)

Performance Benefits

Expected Improvements

  1. Horizontal Scaling

    • Support 4+ workers behind load balancer
    • Linear scaling with player count
    • No single point of failure
  2. Database Load Reduction

    • Location queries: 0 DB queries (100% Redis)
    • Player sessions: 90% cache hit rate (estimated)
    • Inventory queries: 80% cache hit rate (estimated)
    • Overall DB load: Reduced by 70-80%
  3. Latency Improvements

    • Location broadcasts: Redis pub/sub ~1-2ms vs DB query ~10-50ms
    • Player state lookups: Redis GET ~0.5ms vs DB query ~5-20ms
    • Inventory fetches: Redis LIST ~1ms vs DB query ~10-30ms
  4. Concurrent Player Capacity

    • Current (single worker): ~100-200 concurrent players
    • With Redis (4 workers): ~800-1200 concurrent players

Migration Strategy

Step-by-Step Rollout

Week 1: Infrastructure

  • Add Redis container to docker-compose
  • Create redis_manager.py module
  • Test Redis connectivity

Week 2: Caching Layer

  • Implement session caching
  • Implement location caching
  • Implement inventory caching
  • A/B test: 10% traffic uses Redis cache

Week 3: Pub/Sub Integration

  • Update ConnectionManager for Redis pub/sub
  • Migrate location broadcasts to Redis
  • Migrate personal messages to Redis
  • Test with 2 workers

Week 4: Multi-Worker Rollout

  • Deploy 2 workers in production
  • Monitor for 3 days
  • If stable, scale to 4 workers
  • Monitor database load reduction

Week 5: Background Tasks

  • Migrate background tasks to Redis locks
  • Remove file-based locking
  • Test distributed task execution

Week 6: Optimization

  • Fine-tune TTLs
  • Add monitoring/metrics
  • Optimize cache invalidation
  • Performance profiling

Monitoring & Metrics

Redis Metrics to Track

  1. Connection Pool

    • Active connections per worker
    • Connection failures
    • Reconnection attempts
  2. Cache Performance

    • Hit rate (target: >80%)
    • Miss rate
    • Eviction rate
  3. Pub/Sub

    • Messages published/second
    • Subscriber count
    • Message delivery latency
  4. Memory Usage

    • Total Redis memory
    • Key count by pattern
    • Eviction policy effectiveness

Database Metrics to Track

  1. Query Reduction
    • Queries/second before vs after
    • Slow query log reduction
    • Connection pool utilization

Rollback Plan

If Redis integration causes issues:

  1. Immediate Rollback (< 5 minutes)

    • Set REDIS_ENABLED=false env variable
    • Restart API containers
    • Falls back to original ConnectionManager
  2. Graceful Degradation

    • Keep DB queries as fallback for cache misses
    • Log Redis errors but don't fail requests
    • Monitor error rate and alert if > 5%

Cost Estimation

Redis Resource Requirements

  • Memory: 512MB (covers ~5000 concurrent players)
  • CPU: Minimal (< 5% of single core)
  • Network: ~1-5 Mbps for pub/sub

Infrastructure Costs (AWS example)

  • ElastiCache (Redis): ~$15-30/month (cache.t3.micro)
  • Additional EC2 capacity: ~$20-40/month (for extra workers)
  • Total increase: ~$35-70/month
  • Savings: ~$50-100/month (reduced RDS IOPS/queries)
  • Net cost: $0-20/month (may actually save money!)

Success Criteria

Key Performance Indicators (KPIs)

  1. Scalability

    • Support 4+ workers
    • Handle 1000+ concurrent players
    • Linear scaling with worker count
  2. Performance

    • 70% reduction in database queries
    • < 10ms p95 latency for cached operations
    • < 2ms p95 for Redis pub/sub delivery
  3. Reliability

    • 99.9% uptime
    • Graceful handling of Redis failures
    • No message loss during worker restarts
  4. Developer Experience

    • Simple cache invalidation API
    • Clear pub/sub message patterns
    • Easy to add new cached data types

Questions for Review

  1. TTL Strategy: Are the proposed TTLs (30min for sessions, 10min for inventory) appropriate?

  2. Cache Invalidation: Should we implement more aggressive cache invalidation (e.g., on every DB write)?

  3. Worker Count: Start with 2 workers or go straight to 4?

  4. Redis Persistence: Use RDB, AOF, or both? (Affects recovery time vs write performance)

  5. Fallback Strategy: Should Redis cache misses always fall back to DB, or fail fast?

  6. Monitoring: What additional metrics do you want to track?


Next Steps

If approved, I will:

  1. Create detailed implementation tasks for each phase
  2. Set up feature flags for gradual rollout
  3. Add comprehensive logging for debugging
  4. Create automated tests for Redis functionality
  5. Document all new pub/sub message formats
  6. Create runbook for Redis operational issues

Please review and provide feedback on:

  • Architecture approach
  • Data structure choices
  • Migration strategy
  • Timeline/prioritization
  • Any concerns or alternative approaches

Appendix: Alternative Approaches Considered

A. Using Redis Streams Instead of Pub/Sub

Pros: Message persistence, consumer groups, replay capability Cons: More complex, higher memory usage, not needed for ephemeral game events Decision: Stick with Pub/Sub for simplicity

B. Using Kafka for Message Broker

Pros: Better for high-throughput, message persistence Cons: Much heavier infrastructure, overkill for this use case Decision: Redis Pub/Sub is sufficient

C. Caching in Application Memory Instead of Redis

Pros: Faster access (no network hop) Cons: No cross-worker sharing, higher memory per worker Decision: Redis for cross-worker coordination