# Redis Integration Plan: Scalable Multi-Worker Architecture ## Executive Summary This document outlines a comprehensive plan to integrate Redis into Echoes of the Ashes for: 1. **Horizontal Scaling**: Support multiple FastAPI workers behind a load balancer 2. **Performance**: Reduce database queries by caching frequently accessed data 3. **Real-time Updates**: Improve WebSocket message delivery across workers using Pub/Sub ## Current Architecture Analysis ### Existing Limitations 1. **Single-Worker WebSocket Management** - `ConnectionManager` stores WebSocket connections in memory - Each worker has its own isolated connection dictionary - Player on Worker A cannot receive broadcasts triggered by Worker B - No cross-worker communication mechanism 2. **Database Query Bottlenecks** - Every location broadcast queries `get_players_in_location()` from PostgreSQL - Profile/state endpoints fetch fresh data on every request - Location data, item definitions, NPC stats loaded repeatedly - High database load with many concurrent players 3. **Background Tasks** - Single worker executes background tasks (status effects, spawns, etc.) - Uses file locking to prevent duplicate execution - Not suitable for true multi-worker horizontal scaling ## Proposed Redis Architecture ### 1. Redis Data Structures #### A. Player Session Data (Redis Hash) **Key Pattern**: `player:{character_id}:session` **Fields**: ```json { "worker_id": "worker-1", "websocket_connected": "true", "location_id": "overpass", "hp": "85", "max_hp": "100", "stamina": "42", "max_stamina": "50", "level": "12", "xp": "2450", "last_movement_time": "1762710676.592", "in_combat": "false", "last_heartbeat": "1762710676.592" } ``` **TTL**: 30 minutes (refreshed on activity) **Purpose**: - Track which worker manages each player's WebSocket - Cache player stats to avoid DB queries - Detect disconnected/stale sessions --- #### B. Location Player Registry (Redis Set) **Key Pattern**: `location:{location_id}:players` **Values**: Character IDs (integers) **Example**: ``` location:overpass:players = {1, 2, 5, 12, 45} ``` **Purpose**: - Instantly query who's in a location (no DB query) - Used for targeted broadcasts - Updated on player movement --- #### C. Worker Connection Registry (Redis Hash) **Key Pattern**: `worker:{worker_id}:connections` **Fields**: `character_id -> websocket_info` **Example**: ```json { "1": "connected", "5": "connected", "12": "connected" } ``` **Purpose**: - Each worker tracks its own connections - Used to route messages to correct worker - Cleaned up on worker shutdown --- #### D. Cached Location Data (Redis Hash) **Key Pattern**: `location:{location_id}:cache` **Fields**: ```json { "name": "Overpass", "description": "A crumbling highway overpass...", "exits": "{\"north\": \"gas_station\", \"south\": \"ruins\"}", "danger_level": "3", "image_path": "/images/locations/overpass.png", "x": "2.5", "y": "3.0", "interactables": "[{...}]", "npcs": "[{...}]" } ``` **TTL**: No expiration (static data, invalidated manually) **Purpose**: - Eliminate DB queries for location data - Preloaded on server startup - Served directly from Redis --- #### E. Cached Item Definitions (Redis Hash) **Key Pattern**: `item:{item_id}:def` **Fields**: ```json { "name": "Iron Sword", "type": "weapon", "weight": "3.5", "damage_min": "5", "damage_max": "12", "durability": "100", "tier": "2" } ``` **TTL**: No expiration (static data) **Purpose**: - Fast item lookups without parsing JSON files - Preloaded on server startup --- #### F. Player Inventory Cache (Redis List) **Key Pattern**: `player:{character_id}:inventory` **Values**: JSON-encoded inventory items **TTL**: 10 minutes **Purpose**: - Cache inventory for quick profile/state responses - Invalidated on item add/remove/use --- ### 2. Redis Pub/Sub Channels #### A. Global Broadcast Channel **Channel**: `game:broadcast` **Message Format**: ```json { "type": "global_broadcast", "payload": { "type": "server_announcement", "data": { "message": "Server maintenance in 5 minutes" } } } ``` **Subscribers**: All workers **Purpose**: Server-wide announcements --- #### B. Location-Specific Channels **Channel Pattern**: `location:{location_id}` **Message Format**: ```json { "type": "location_update", "location_id": "overpass", "exclude_player_id": 5, "payload": { "type": "location_update", "data": { "message": "Jocaru picked up 3x Iron Ore", "action": "item_picked_up" }, "timestamp": "2025-11-09T17:52:00Z" } } ``` **Subscribers**: All workers **Purpose**: - Workers subscribe to all location channels - When receiving a message, check if they have connected players in that location - Send WebSocket messages to their local connections only --- #### C. Player-Specific Channels **Channel Pattern**: `player:{character_id}` **Message Format**: ```json { "type": "personal_message", "player_id": 5, "payload": { "type": "combat_update", "data": { "message": "You dealt 12 damage!", "combat_over": false } } } ``` **Subscribers**: Worker managing that player's WebSocket **Purpose**: - Direct messages to specific players - Combat updates, inventory changes, etc. --- #### D. Worker Coordination Channel **Channel**: `game:workers` **Message Format**: ```json { "type": "worker_join", "worker_id": "worker-3", "timestamp": "2025-11-09T17:52:00Z" } ``` **Purpose**: - Worker lifecycle events - Graceful shutdown coordination - Load distribution awareness --- ## Implementation Plan ### Phase 1: Redis Infrastructure Setup #### 1.1 Add Redis to Docker Compose **File**: `docker-compose.yml` ```yaml echoes_redis: image: redis:7-alpine container_name: echoes_of_the_ashes_redis restart: unless-stopped command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru volumes: - redis_data:/data networks: - default_docker healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s timeout: 3s retries: 5 volumes: redis_data: ``` **Environment Variables** (`.env`): ```bash REDIS_HOST=echoes_redis REDIS_PORT=6379 REDIS_DB=0 REDIS_PASSWORD= # Optional, leave empty for dev ``` #### 1.2 Install Python Dependencies **File**: `requirements.txt` ```txt redis==5.0.1 hiredis==2.3.2 # C parser for performance ``` --- ### Phase 2: Redis Client Module #### 2.1 Create Redis Manager **File**: `api/redis_manager.py` ```python """ Redis manager for caching and pub/sub functionality. Handles connection pooling, pub/sub, and caching operations. """ import os import json import asyncio import redis.asyncio as redis from typing import Optional, Dict, Any, List, Set from datetime import timedelta class RedisManager: def __init__(self): self.redis_client: Optional[redis.Redis] = None self.pubsub: Optional[redis.client.PubSub] = None self.worker_id: str = f"worker-{os.getpid()}" self.subscribed_channels: Set[str] = set() async def connect(self): """Initialize Redis connection pool""" host = os.getenv("REDIS_HOST", "localhost") port = int(os.getenv("REDIS_PORT", "6379")) db = int(os.getenv("REDIS_DB", "0")) password = os.getenv("REDIS_PASSWORD", None) self.redis_client = redis.Redis( host=host, port=port, db=db, password=password, decode_responses=True, socket_keepalive=True, socket_connect_timeout=5, retry_on_timeout=True ) # Test connection await self.redis_client.ping() print(f"✅ Redis connected: {host}:{port}") # Initialize pub/sub self.pubsub = self.redis_client.pubsub() async def disconnect(self): """Close Redis connections""" if self.pubsub: await self.pubsub.close() if self.redis_client: await self.redis_client.close() # ====== SESSION MANAGEMENT ====== async def set_player_session(self, character_id: int, data: Dict[str, Any], ttl: int = 1800): """Store player session data with TTL""" key = f"player:{character_id}:session" data['worker_id'] = self.worker_id data['last_heartbeat'] = str(asyncio.get_event_loop().time()) await self.redis_client.hset(key, mapping=data) await self.redis_client.expire(key, ttl) async def get_player_session(self, character_id: int) -> Optional[Dict[str, Any]]: """Retrieve player session data""" key = f"player:{character_id}:session" data = await self.redis_client.hgetall(key) return data if data else None async def delete_player_session(self, character_id: int): """Remove player session""" key = f"player:{character_id}:session" await self.redis_client.delete(key) # ====== LOCATION REGISTRY ====== async def add_player_to_location(self, character_id: int, location_id: str): """Add player to location set""" key = f"location:{location_id}:players" await self.redis_client.sadd(key, character_id) async def remove_player_from_location(self, character_id: int, location_id: str): """Remove player from location set""" key = f"location:{location_id}:players" await self.redis_client.srem(key, character_id) async def get_players_in_location(self, location_id: str) -> List[int]: """Get all player IDs in a location""" key = f"location:{location_id}:players" members = await self.redis_client.smembers(key) return [int(m) for m in members] if members else [] async def move_player_location(self, character_id: int, from_location: str, to_location: str): """Atomically move player between locations""" await self.remove_player_from_location(character_id, from_location) await self.add_player_to_location(character_id, to_location) # ====== CACHING ====== async def cache_location(self, location_id: str, data: Dict[str, Any]): """Cache location data (no expiration - static data)""" key = f"location:{location_id}:cache" # Serialize complex objects to JSON strings cache_data = {} for k, v in data.items(): if isinstance(v, (dict, list)): cache_data[k] = json.dumps(v) else: cache_data[k] = str(v) await self.redis_client.hset(key, mapping=cache_data) async def get_cached_location(self, location_id: str) -> Optional[Dict[str, Any]]: """Retrieve cached location data""" key = f"location:{location_id}:cache" data = await self.redis_client.hgetall(key) if not data: return None # Deserialize JSON fields for k in ['exits', 'interactables', 'npcs']: if k in data: try: data[k] = json.loads(data[k]) except: pass return data async def cache_inventory(self, character_id: int, inventory: List[Dict], ttl: int = 600): """Cache player inventory""" key = f"player:{character_id}:inventory" await self.redis_client.delete(key) # Clear old data if inventory: inventory_json = [json.dumps(item) for item in inventory] await self.redis_client.rpush(key, *inventory_json) await self.redis_client.expire(key, ttl) async def get_cached_inventory(self, character_id: int) -> Optional[List[Dict]]: """Retrieve cached inventory""" key = f"player:{character_id}:inventory" items = await self.redis_client.lrange(key, 0, -1) if not items: return None return [json.loads(item) for item in items] async def invalidate_inventory(self, character_id: int): """Invalidate inventory cache""" key = f"player:{character_id}:inventory" await self.redis_client.delete(key) # ====== PUB/SUB ====== async def publish_to_location(self, location_id: str, message: Dict[str, Any], exclude_player_id: Optional[int] = None): """Publish message to location channel""" channel = f"location:{location_id}" payload = { "type": "location_update", "location_id": location_id, "exclude_player_id": exclude_player_id, "payload": message } await self.redis_client.publish(channel, json.dumps(payload)) async def publish_to_player(self, character_id: int, message: Dict[str, Any]): """Publish message to player-specific channel""" channel = f"player:{character_id}" payload = { "type": "personal_message", "player_id": character_id, "payload": message } await self.redis_client.publish(channel, json.dumps(payload)) async def subscribe_to_channels(self, channels: List[str]): """Subscribe to multiple channels""" await self.pubsub.subscribe(*channels) self.subscribed_channels.update(channels) print(f"📡 Worker {self.worker_id} subscribed to {len(channels)} channels") async def subscribe_to_player(self, character_id: int): """Subscribe to player-specific channel""" channel = f"player:{character_id}" await self.pubsub.subscribe(channel) self.subscribed_channels.add(channel) async def unsubscribe_from_player(self, character_id: int): """Unsubscribe from player channel""" channel = f"player:{character_id}" if channel in self.subscribed_channels: await self.pubsub.unsubscribe(channel) self.subscribed_channels.remove(channel) async def listen_for_messages(self, message_handler): """Listen for pub/sub messages (blocking)""" async for message in self.pubsub.listen(): if message['type'] == 'message': try: data = json.loads(message['data']) await message_handler(message['channel'], data) except Exception as e: print(f"❌ Error handling pub/sub message: {e}") # Global Redis instance redis_manager = RedisManager() ``` --- ### Phase 3: Enhanced Connection Manager #### 3.1 Update ConnectionManager to Use Redis **File**: `api/main.py` (ConnectionManager class) **Changes**: 1. **On WebSocket Connect**: - Store connection locally (as before) - Register player session in Redis - Add player to location registry - Subscribe to player-specific channel - Broadcast "player_joined" to location 2. **On WebSocket Disconnect**: - Remove local connection - Delete player session from Redis - Remove from location registry - Unsubscribe from player channel - Broadcast "player_left" to location 3. **Broadcasting**: - `send_personal_message()`: Publish to Redis player channel (not direct WebSocket) - `send_to_location()`: Publish to Redis location channel (not query DB) - Local message handler receives Redis pub/sub and sends to local WebSockets **Implementation**: ```python class ConnectionManager: def __init__(self, redis_manager): self.active_connections: Dict[int, WebSocket] = {} self.player_usernames: Dict[int, str] = {} self.redis = redis_manager self.worker_id = redis_manager.worker_id async def connect(self, websocket: WebSocket, player_id: int, username: str, location_id: str): """Accept WebSocket and register in Redis""" await websocket.accept() self.active_connections[player_id] = websocket self.player_usernames[player_id] = username # Register in Redis await self.redis.set_player_session(player_id, { 'websocket_connected': 'true', 'location_id': location_id, 'username': username }) await self.redis.add_player_to_location(player_id, location_id) await self.redis.subscribe_to_player(player_id) print(f"🔌 WebSocket connected: {username} (player_id={player_id}) on {self.worker_id}") def disconnect(self, player_id: int, location_id: str): """Remove WebSocket and clean up Redis""" if player_id in self.active_connections: username = self.player_usernames.get(player_id, "unknown") del self.active_connections[player_id] if player_id in self.player_usernames: del self.player_usernames[player_id] # Clean up Redis (fire-and-forget) asyncio.create_task(self.redis.delete_player_session(player_id)) asyncio.create_task(self.redis.remove_player_from_location(player_id, location_id)) asyncio.create_task(self.redis.unsubscribe_from_player(player_id)) print(f"🔌 WebSocket disconnected: {username} (player_id={player_id})") async def send_personal_message(self, player_id: int, message: dict): """Publish to Redis player channel (cross-worker)""" await self.redis.publish_to_player(player_id, message) async def send_to_location(self, location_id: str, message: dict, exclude_player_id: Optional[int] = None): """Publish to Redis location channel (cross-worker)""" await self.redis.publish_to_location(location_id, message, exclude_player_id) async def handle_redis_message(self, channel: str, data: dict): """Handle incoming Redis pub/sub messages""" if channel.startswith('player:'): # Personal message - send to local WebSocket if connected player_id = int(channel.split(':')[1]) if player_id in self.active_connections: try: await self.active_connections[player_id].send_json(data['payload']) except Exception as e: print(f"❌ Failed to send to player {player_id}: {e}") # Don't disconnect here - let WebSocket handler detect it elif channel.startswith('location:'): # Location broadcast - send to local WebSockets in that location location_id = data['location_id'] exclude_player_id = data.get('exclude_player_id') payload = data['payload'] # Get players in location from Redis (fast!) players_in_location = await self.redis.get_players_in_location(location_id) # Send to local connections only for player_id in players_in_location: if player_id != exclude_player_id and player_id in self.active_connections: try: await self.active_connections[player_id].send_json(payload) except: pass # Let WebSocket handler detect disconnects ``` --- ### Phase 4: Preload Static Data into Redis #### 4.1 Startup Data Loader **File**: `api/redis_loader.py` ```python """ Load static game data into Redis on server startup. Reduces database queries for frequently accessed data. """ from .redis_manager import redis_manager from .world_loader import load_world from .items import ItemsManager async def preload_game_data(): """Load all static game data into Redis""" print("🔄 Preloading game data into Redis...") # Load world data world = load_world() # Cache all locations for location_id, location in world.locations.items(): location_data = { 'id': location.id, 'name': location.name, 'description': location.description, 'exits': location.exits, 'danger_level': location.danger_level, 'image_path': location.image_path, 'x': getattr(location, 'x', 0.0), 'y': getattr(location, 'y', 0.0), 'tags': getattr(location, 'tags', []), 'interactables': [interactable.to_dict() for interactable in location.interactables], 'npcs': location.npcs } await redis_manager.cache_location(location_id, location_data) print(f"✅ Cached {len(world.locations)} locations in Redis") # Cache all items items_manager = ItemsManager() for item_id, item in items_manager.items.items(): item_data = { 'id': item.id, 'name': item.name, 'description': item.description, 'type': item.type, 'weight': item.weight, 'volume': item.volume, 'emoji': item.emoji, 'image_path': item.image_path, 'equippable': item.equippable, 'consumable': item.consumable, 'stats': item.stats or {}, 'effects': item.effects or {} } key = f"item:{item_id}:def" await redis_manager.redis_client.hset(key, mapping={ k: json.dumps(v) if isinstance(v, (dict, list, bool)) else str(v) for k, v in item_data.items() }) print(f"✅ Cached {len(items_manager.items)} items in Redis") ``` **Update `lifespan()` in `main.py`**: ```python @asynccontextmanager async def lifespan(app: FastAPI): # Startup await db.init_db() print("✅ Database initialized") # Connect to Redis await redis_manager.connect() # Preload static data await preload_game_data() # Start pub/sub listener in background asyncio.create_task(redis_manager.listen_for_messages(manager.handle_redis_message)) # Subscribe to all location channels location_channels = [f"location:{loc_id}" for loc_id in LOCATIONS.keys()] await redis_manager.subscribe_to_channels(location_channels + ['game:broadcast']) # Start background tasks tasks = await background_tasks.start_background_tasks(manager, LOCATIONS) yield # Shutdown await background_tasks.stop_background_tasks(tasks) await redis_manager.disconnect() ``` --- ### Phase 5: Update Endpoints to Use Redis Cache #### 5.1 Example: `/api/game/state` Endpoint **Before** (database queries): ```python @app.get("/api/game/state") async def get_game_state(current_user: dict = Depends(get_current_user)): player = await db.get_player_by_id(current_user['id']) # DB query location = LOCATIONS.get(player['location_id']) # In-memory inventory = await db.get_inventory(current_user['id']) # DB query equipment = await db.get_all_equipment(current_user['id']) # DB query # ... more DB queries ``` **After** (Redis cache): ```python @app.get("/api/game/state") async def get_game_state(current_user: dict = Depends(get_current_user)): # Get player session from Redis (cached) player_session = await redis_manager.get_player_session(current_user['id']) # If not in cache, fetch from DB and cache if not player_session: player = await db.get_player_by_id(current_user['id']) await redis_manager.set_player_session(current_user['id'], player) player_session = player # Get location from Redis cache location_data = await redis_manager.get_cached_location(player_session['location_id']) # Get inventory from Redis cache inventory = await redis_manager.get_cached_inventory(current_user['id']) if not inventory: inventory = await db.get_inventory(current_user['id']) await redis_manager.cache_inventory(current_user['id'], inventory) # ... rest of endpoint ``` #### 5.2 Example: `/api/game/move` Endpoint **Update to invalidate caches and publish events**: ```python @app.post("/api/game/move") async def move(move_req: MoveRequest, current_user: dict = Depends(get_current_user)): # ... existing validation ... # Execute move success, message, new_location_id, stamina_cost, distance = await game_logic.move_player( current_user['id'], move_req.direction, LOCATIONS ) if success: old_location = player['location_id'] # Update Redis location registry await redis_manager.move_player_location( current_user['id'], old_location, new_location_id ) # Update player session cache await redis_manager.set_player_session(current_user['id'], { 'location_id': new_location_id, 'stamina': player['stamina'] - stamina_cost, 'last_movement_time': current_time }) # Broadcast to OLD location via Redis pub/sub await redis_manager.publish_to_location( old_location, { "type": "location_update", "data": { "message": f"{player['name']} left the area", "action": "player_left", "player_id": current_user['id'] }, "timestamp": datetime.utcnow().isoformat() }, exclude_player_id=current_user['id'] ) # Broadcast to NEW location via Redis pub/sub await redis_manager.publish_to_location( new_location_id, { "type": "location_update", "data": { "message": f"{player['name']} arrived", "action": "player_arrived", "player_id": current_user['id'] }, "timestamp": datetime.utcnow().isoformat() }, exclude_player_id=current_user['id'] ) # Send direct update to moving player await redis_manager.publish_to_player( current_user['id'], { "type": "state_update", "data": { "player": { "stamina": player['stamina'] - stamina_cost, "location_id": new_location_id } }, "timestamp": datetime.utcnow().isoformat() } ) ``` --- ### Phase 6: Multi-Worker Setup #### 6.1 Update Docker Compose for Multiple Workers **File**: `docker-compose.yml` ```yaml echoes_of_the_ashes_api: build: context: . dockerfile: Dockerfile.api restart: unless-stopped env_file: - .env environment: - WORKERS=4 # Number of Uvicorn workers volumes: - ./gamedata:/app/gamedata:ro - ./images:/app/images:ro depends_on: - echoes_of_the_ashes_db - echoes_redis deploy: replicas: 1 # Single container, multiple workers inside networks: - default_docker - traefik labels: # ... Traefik labels ``` #### 6.2 Update Dockerfile to Support Multi-Worker **File**: `Dockerfile.api` ```dockerfile # ... existing build steps ... # Run with multiple workers CMD ["sh", "-c", "uvicorn api.main:app --host 0.0.0.0 --port 8000 --workers ${WORKERS:-4}"] ``` --- ### Phase 7: Background Tasks with Redis Coordination #### 7.1 Distributed Task Locking **File**: `api/background_tasks.py` **Update to use Redis locks instead of file locks**: ```python import asyncio from .redis_manager import redis_manager async def acquire_distributed_lock(lock_name: str, ttl: int = 300) -> bool: """Acquire a distributed lock in Redis""" key = f"lock:{lock_name}" acquired = await redis_manager.redis_client.set( key, redis_manager.worker_id, nx=True, # Only set if not exists ex=ttl # TTL in seconds ) return acquired async def release_distributed_lock(lock_name: str): """Release a distributed lock""" key = f"lock:{lock_name}" # Only delete if we own the lock lock_value = await redis_manager.redis_client.get(key) if lock_value == redis_manager.worker_id: await redis_manager.redis_client.delete(key) async def spawn_manager_task(): """Background task: Spawn wandering enemies (single worker)""" while True: try: # Try to acquire lock if await acquire_distributed_lock('spawn_manager', ttl=60): print(f"🔒 {redis_manager.worker_id} acquired spawn_manager lock") # Do work await spawn_wandering_enemies() # Release lock await release_distributed_lock('spawn_manager') else: print(f"⏭️ Another worker handling spawn_manager") except Exception as e: print(f"❌ Error in spawn_manager: {e}") await release_distributed_lock('spawn_manager') await asyncio.sleep(30) ``` --- ## Performance Benefits ### Expected Improvements 1. **Horizontal Scaling** - Support 4+ workers behind load balancer - Linear scaling with player count - No single point of failure 2. **Database Load Reduction** - **Location queries**: 0 DB queries (100% Redis) - **Player sessions**: 90% cache hit rate (estimated) - **Inventory queries**: 80% cache hit rate (estimated) - **Overall DB load**: Reduced by 70-80% 3. **Latency Improvements** - Location broadcasts: Redis pub/sub ~1-2ms vs DB query ~10-50ms - Player state lookups: Redis GET ~0.5ms vs DB query ~5-20ms - Inventory fetches: Redis LIST ~1ms vs DB query ~10-30ms 4. **Concurrent Player Capacity** - Current (single worker): ~100-200 concurrent players - With Redis (4 workers): ~800-1200 concurrent players --- ## Migration Strategy ### Step-by-Step Rollout **Week 1: Infrastructure** - Add Redis container to docker-compose - Create redis_manager.py module - Test Redis connectivity **Week 2: Caching Layer** - Implement session caching - Implement location caching - Implement inventory caching - A/B test: 10% traffic uses Redis cache **Week 3: Pub/Sub Integration** - Update ConnectionManager for Redis pub/sub - Migrate location broadcasts to Redis - Migrate personal messages to Redis - Test with 2 workers **Week 4: Multi-Worker Rollout** - Deploy 2 workers in production - Monitor for 3 days - If stable, scale to 4 workers - Monitor database load reduction **Week 5: Background Tasks** - Migrate background tasks to Redis locks - Remove file-based locking - Test distributed task execution **Week 6: Optimization** - Fine-tune TTLs - Add monitoring/metrics - Optimize cache invalidation - Performance profiling --- ## Monitoring & Metrics ### Redis Metrics to Track 1. **Connection Pool** - Active connections per worker - Connection failures - Reconnection attempts 2. **Cache Performance** - Hit rate (target: >80%) - Miss rate - Eviction rate 3. **Pub/Sub** - Messages published/second - Subscriber count - Message delivery latency 4. **Memory Usage** - Total Redis memory - Key count by pattern - Eviction policy effectiveness ### Database Metrics to Track 1. **Query Reduction** - Queries/second before vs after - Slow query log reduction - Connection pool utilization --- ## Rollback Plan If Redis integration causes issues: 1. **Immediate Rollback** (< 5 minutes) - Set `REDIS_ENABLED=false` env variable - Restart API containers - Falls back to original ConnectionManager 2. **Graceful Degradation** - Keep DB queries as fallback for cache misses - Log Redis errors but don't fail requests - Monitor error rate and alert if > 5% --- ## Cost Estimation ### Redis Resource Requirements - **Memory**: 512MB (covers ~5000 concurrent players) - **CPU**: Minimal (< 5% of single core) - **Network**: ~1-5 Mbps for pub/sub ### Infrastructure Costs (AWS example) - **ElastiCache (Redis)**: ~$15-30/month (cache.t3.micro) - **Additional EC2 capacity**: ~$20-40/month (for extra workers) - **Total increase**: ~$35-70/month - **Savings**: ~$50-100/month (reduced RDS IOPS/queries) - **Net cost**: $0-20/month (may actually save money!) --- ## Success Criteria ### Key Performance Indicators (KPIs) 1. **Scalability** - ✅ Support 4+ workers - ✅ Handle 1000+ concurrent players - ✅ Linear scaling with worker count 2. **Performance** - ✅ 70% reduction in database queries - ✅ < 10ms p95 latency for cached operations - ✅ < 2ms p95 for Redis pub/sub delivery 3. **Reliability** - ✅ 99.9% uptime - ✅ Graceful handling of Redis failures - ✅ No message loss during worker restarts 4. **Developer Experience** - ✅ Simple cache invalidation API - ✅ Clear pub/sub message patterns - ✅ Easy to add new cached data types --- ## Questions for Review 1. **TTL Strategy**: Are the proposed TTLs (30min for sessions, 10min for inventory) appropriate? 2. **Cache Invalidation**: Should we implement more aggressive cache invalidation (e.g., on every DB write)? 3. **Worker Count**: Start with 2 workers or go straight to 4? 4. **Redis Persistence**: Use RDB, AOF, or both? (Affects recovery time vs write performance) 5. **Fallback Strategy**: Should Redis cache misses always fall back to DB, or fail fast? 6. **Monitoring**: What additional metrics do you want to track? --- ## Next Steps **If approved, I will:** 1. Create detailed implementation tasks for each phase 2. Set up feature flags for gradual rollout 3. Add comprehensive logging for debugging 4. Create automated tests for Redis functionality 5. Document all new pub/sub message formats 6. Create runbook for Redis operational issues **Please review and provide feedback on:** - Architecture approach - Data structure choices - Migration strategy - Timeline/prioritization - Any concerns or alternative approaches --- ## Appendix: Alternative Approaches Considered ### A. Using Redis Streams Instead of Pub/Sub **Pros**: Message persistence, consumer groups, replay capability **Cons**: More complex, higher memory usage, not needed for ephemeral game events **Decision**: Stick with Pub/Sub for simplicity ### B. Using Kafka for Message Broker **Pros**: Better for high-throughput, message persistence **Cons**: Much heavier infrastructure, overkill for this use case **Decision**: Redis Pub/Sub is sufficient ### C. Caching in Application Memory Instead of Redis **Pros**: Faster access (no network hop) **Cons**: No cross-worker sharing, higher memory per worker **Decision**: Redis for cross-worker coordination