1169 lines
34 KiB
Markdown
1169 lines
34 KiB
Markdown
# Redis Integration Plan: Scalable Multi-Worker Architecture
|
|
|
|
## Executive Summary
|
|
|
|
This document outlines a comprehensive plan to integrate Redis into Echoes of the Ashes for:
|
|
1. **Horizontal Scaling**: Support multiple FastAPI workers behind a load balancer
|
|
2. **Performance**: Reduce database queries by caching frequently accessed data
|
|
3. **Real-time Updates**: Improve WebSocket message delivery across workers using Pub/Sub
|
|
|
|
## Current Architecture Analysis
|
|
|
|
### Existing Limitations
|
|
|
|
1. **Single-Worker WebSocket Management**
|
|
- `ConnectionManager` stores WebSocket connections in memory
|
|
- Each worker has its own isolated connection dictionary
|
|
- Player on Worker A cannot receive broadcasts triggered by Worker B
|
|
- No cross-worker communication mechanism
|
|
|
|
2. **Database Query Bottlenecks**
|
|
- Every location broadcast queries `get_players_in_location()` from PostgreSQL
|
|
- Profile/state endpoints fetch fresh data on every request
|
|
- Location data, item definitions, NPC stats loaded repeatedly
|
|
- High database load with many concurrent players
|
|
|
|
3. **Background Tasks**
|
|
- Single worker executes background tasks (status effects, spawns, etc.)
|
|
- Uses file locking to prevent duplicate execution
|
|
- Not suitable for true multi-worker horizontal scaling
|
|
|
|
## Proposed Redis Architecture
|
|
|
|
### 1. Redis Data Structures
|
|
|
|
#### A. Player Session Data (Redis Hash)
|
|
**Key Pattern**: `player:{character_id}:session`
|
|
|
|
**Fields**:
|
|
```json
|
|
{
|
|
"worker_id": "worker-1",
|
|
"websocket_connected": "true",
|
|
"location_id": "overpass",
|
|
"hp": "85",
|
|
"max_hp": "100",
|
|
"stamina": "42",
|
|
"max_stamina": "50",
|
|
"level": "12",
|
|
"xp": "2450",
|
|
"last_movement_time": "1762710676.592",
|
|
"in_combat": "false",
|
|
"last_heartbeat": "1762710676.592"
|
|
}
|
|
```
|
|
|
|
**TTL**: 30 minutes (refreshed on activity)
|
|
|
|
**Purpose**:
|
|
- Track which worker manages each player's WebSocket
|
|
- Cache player stats to avoid DB queries
|
|
- Detect disconnected/stale sessions
|
|
|
|
---
|
|
|
|
#### B. Location Player Registry (Redis Set)
|
|
**Key Pattern**: `location:{location_id}:players`
|
|
|
|
**Values**: Character IDs (integers)
|
|
|
|
**Example**:
|
|
```
|
|
location:overpass:players = {1, 2, 5, 12, 45}
|
|
```
|
|
|
|
**Purpose**:
|
|
- Instantly query who's in a location (no DB query)
|
|
- Used for targeted broadcasts
|
|
- Updated on player movement
|
|
|
|
---
|
|
|
|
#### C. Worker Connection Registry (Redis Hash)
|
|
**Key Pattern**: `worker:{worker_id}:connections`
|
|
|
|
**Fields**: `character_id -> websocket_info`
|
|
|
|
**Example**:
|
|
```json
|
|
{
|
|
"1": "connected",
|
|
"5": "connected",
|
|
"12": "connected"
|
|
}
|
|
```
|
|
|
|
**Purpose**:
|
|
- Each worker tracks its own connections
|
|
- Used to route messages to correct worker
|
|
- Cleaned up on worker shutdown
|
|
|
|
---
|
|
|
|
#### D. Cached Location Data (Redis Hash)
|
|
**Key Pattern**: `location:{location_id}:cache`
|
|
|
|
**Fields**:
|
|
```json
|
|
{
|
|
"name": "Overpass",
|
|
"description": "A crumbling highway overpass...",
|
|
"exits": "{\"north\": \"gas_station\", \"south\": \"ruins\"}",
|
|
"danger_level": "3",
|
|
"image_path": "/images/locations/overpass.png",
|
|
"x": "2.5",
|
|
"y": "3.0",
|
|
"interactables": "[{...}]",
|
|
"npcs": "[{...}]"
|
|
}
|
|
```
|
|
|
|
**TTL**: No expiration (static data, invalidated manually)
|
|
|
|
**Purpose**:
|
|
- Eliminate DB queries for location data
|
|
- Preloaded on server startup
|
|
- Served directly from Redis
|
|
|
|
---
|
|
|
|
#### E. Cached Item Definitions (Redis Hash)
|
|
**Key Pattern**: `item:{item_id}:def`
|
|
|
|
**Fields**:
|
|
```json
|
|
{
|
|
"name": "Iron Sword",
|
|
"type": "weapon",
|
|
"weight": "3.5",
|
|
"damage_min": "5",
|
|
"damage_max": "12",
|
|
"durability": "100",
|
|
"tier": "2"
|
|
}
|
|
```
|
|
|
|
**TTL**: No expiration (static data)
|
|
|
|
**Purpose**:
|
|
- Fast item lookups without parsing JSON files
|
|
- Preloaded on server startup
|
|
|
|
---
|
|
|
|
#### F. Player Inventory Cache (Redis List)
|
|
**Key Pattern**: `player:{character_id}:inventory`
|
|
|
|
**Values**: JSON-encoded inventory items
|
|
|
|
**TTL**: 10 minutes
|
|
|
|
**Purpose**:
|
|
- Cache inventory for quick profile/state responses
|
|
- Invalidated on item add/remove/use
|
|
|
|
---
|
|
|
|
### 2. Redis Pub/Sub Channels
|
|
|
|
#### A. Global Broadcast Channel
|
|
**Channel**: `game:broadcast`
|
|
|
|
**Message Format**:
|
|
```json
|
|
{
|
|
"type": "global_broadcast",
|
|
"payload": {
|
|
"type": "server_announcement",
|
|
"data": {
|
|
"message": "Server maintenance in 5 minutes"
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
**Subscribers**: All workers
|
|
|
|
**Purpose**: Server-wide announcements
|
|
|
|
---
|
|
|
|
#### B. Location-Specific Channels
|
|
**Channel Pattern**: `location:{location_id}`
|
|
|
|
**Message Format**:
|
|
```json
|
|
{
|
|
"type": "location_update",
|
|
"location_id": "overpass",
|
|
"exclude_player_id": 5,
|
|
"payload": {
|
|
"type": "location_update",
|
|
"data": {
|
|
"message": "Jocaru picked up 3x Iron Ore",
|
|
"action": "item_picked_up"
|
|
},
|
|
"timestamp": "2025-11-09T17:52:00Z"
|
|
}
|
|
}
|
|
```
|
|
|
|
**Subscribers**: All workers
|
|
|
|
**Purpose**:
|
|
- Workers subscribe to all location channels
|
|
- When receiving a message, check if they have connected players in that location
|
|
- Send WebSocket messages to their local connections only
|
|
|
|
---
|
|
|
|
#### C. Player-Specific Channels
|
|
**Channel Pattern**: `player:{character_id}`
|
|
|
|
**Message Format**:
|
|
```json
|
|
{
|
|
"type": "personal_message",
|
|
"player_id": 5,
|
|
"payload": {
|
|
"type": "combat_update",
|
|
"data": {
|
|
"message": "You dealt 12 damage!",
|
|
"combat_over": false
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
**Subscribers**: Worker managing that player's WebSocket
|
|
|
|
**Purpose**:
|
|
- Direct messages to specific players
|
|
- Combat updates, inventory changes, etc.
|
|
|
|
---
|
|
|
|
#### D. Worker Coordination Channel
|
|
**Channel**: `game:workers`
|
|
|
|
**Message Format**:
|
|
```json
|
|
{
|
|
"type": "worker_join",
|
|
"worker_id": "worker-3",
|
|
"timestamp": "2025-11-09T17:52:00Z"
|
|
}
|
|
```
|
|
|
|
**Purpose**:
|
|
- Worker lifecycle events
|
|
- Graceful shutdown coordination
|
|
- Load distribution awareness
|
|
|
|
---
|
|
|
|
## Implementation Plan
|
|
|
|
### Phase 1: Redis Infrastructure Setup
|
|
|
|
#### 1.1 Add Redis to Docker Compose
|
|
|
|
**File**: `docker-compose.yml`
|
|
|
|
```yaml
|
|
echoes_redis:
|
|
image: redis:7-alpine
|
|
container_name: echoes_of_the_ashes_redis
|
|
restart: unless-stopped
|
|
command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
|
|
volumes:
|
|
- redis_data:/data
|
|
networks:
|
|
- default_docker
|
|
healthcheck:
|
|
test: ["CMD", "redis-cli", "ping"]
|
|
interval: 5s
|
|
timeout: 3s
|
|
retries: 5
|
|
|
|
volumes:
|
|
redis_data:
|
|
```
|
|
|
|
**Environment Variables** (`.env`):
|
|
```bash
|
|
REDIS_HOST=echoes_redis
|
|
REDIS_PORT=6379
|
|
REDIS_DB=0
|
|
REDIS_PASSWORD= # Optional, leave empty for dev
|
|
```
|
|
|
|
#### 1.2 Install Python Dependencies
|
|
|
|
**File**: `requirements.txt`
|
|
|
|
```txt
|
|
redis==5.0.1
|
|
hiredis==2.3.2 # C parser for performance
|
|
```
|
|
|
|
---
|
|
|
|
### Phase 2: Redis Client Module
|
|
|
|
#### 2.1 Create Redis Manager
|
|
|
|
**File**: `api/redis_manager.py`
|
|
|
|
```python
|
|
"""
|
|
Redis manager for caching and pub/sub functionality.
|
|
Handles connection pooling, pub/sub, and caching operations.
|
|
"""
|
|
import os
|
|
import json
|
|
import asyncio
|
|
import redis.asyncio as redis
|
|
from typing import Optional, Dict, Any, List, Set
|
|
from datetime import timedelta
|
|
|
|
class RedisManager:
|
|
def __init__(self):
|
|
self.redis_client: Optional[redis.Redis] = None
|
|
self.pubsub: Optional[redis.client.PubSub] = None
|
|
self.worker_id: str = f"worker-{os.getpid()}"
|
|
self.subscribed_channels: Set[str] = set()
|
|
|
|
async def connect(self):
|
|
"""Initialize Redis connection pool"""
|
|
host = os.getenv("REDIS_HOST", "localhost")
|
|
port = int(os.getenv("REDIS_PORT", "6379"))
|
|
db = int(os.getenv("REDIS_DB", "0"))
|
|
password = os.getenv("REDIS_PASSWORD", None)
|
|
|
|
self.redis_client = redis.Redis(
|
|
host=host,
|
|
port=port,
|
|
db=db,
|
|
password=password,
|
|
decode_responses=True,
|
|
socket_keepalive=True,
|
|
socket_connect_timeout=5,
|
|
retry_on_timeout=True
|
|
)
|
|
|
|
# Test connection
|
|
await self.redis_client.ping()
|
|
print(f"✅ Redis connected: {host}:{port}")
|
|
|
|
# Initialize pub/sub
|
|
self.pubsub = self.redis_client.pubsub()
|
|
|
|
async def disconnect(self):
|
|
"""Close Redis connections"""
|
|
if self.pubsub:
|
|
await self.pubsub.close()
|
|
if self.redis_client:
|
|
await self.redis_client.close()
|
|
|
|
# ====== SESSION MANAGEMENT ======
|
|
|
|
async def set_player_session(self, character_id: int, data: Dict[str, Any], ttl: int = 1800):
|
|
"""Store player session data with TTL"""
|
|
key = f"player:{character_id}:session"
|
|
data['worker_id'] = self.worker_id
|
|
data['last_heartbeat'] = str(asyncio.get_event_loop().time())
|
|
|
|
await self.redis_client.hset(key, mapping=data)
|
|
await self.redis_client.expire(key, ttl)
|
|
|
|
async def get_player_session(self, character_id: int) -> Optional[Dict[str, Any]]:
|
|
"""Retrieve player session data"""
|
|
key = f"player:{character_id}:session"
|
|
data = await self.redis_client.hgetall(key)
|
|
return data if data else None
|
|
|
|
async def delete_player_session(self, character_id: int):
|
|
"""Remove player session"""
|
|
key = f"player:{character_id}:session"
|
|
await self.redis_client.delete(key)
|
|
|
|
# ====== LOCATION REGISTRY ======
|
|
|
|
async def add_player_to_location(self, character_id: int, location_id: str):
|
|
"""Add player to location set"""
|
|
key = f"location:{location_id}:players"
|
|
await self.redis_client.sadd(key, character_id)
|
|
|
|
async def remove_player_from_location(self, character_id: int, location_id: str):
|
|
"""Remove player from location set"""
|
|
key = f"location:{location_id}:players"
|
|
await self.redis_client.srem(key, character_id)
|
|
|
|
async def get_players_in_location(self, location_id: str) -> List[int]:
|
|
"""Get all player IDs in a location"""
|
|
key = f"location:{location_id}:players"
|
|
members = await self.redis_client.smembers(key)
|
|
return [int(m) for m in members] if members else []
|
|
|
|
async def move_player_location(self, character_id: int, from_location: str, to_location: str):
|
|
"""Atomically move player between locations"""
|
|
await self.remove_player_from_location(character_id, from_location)
|
|
await self.add_player_to_location(character_id, to_location)
|
|
|
|
# ====== CACHING ======
|
|
|
|
async def cache_location(self, location_id: str, data: Dict[str, Any]):
|
|
"""Cache location data (no expiration - static data)"""
|
|
key = f"location:{location_id}:cache"
|
|
# Serialize complex objects to JSON strings
|
|
cache_data = {}
|
|
for k, v in data.items():
|
|
if isinstance(v, (dict, list)):
|
|
cache_data[k] = json.dumps(v)
|
|
else:
|
|
cache_data[k] = str(v)
|
|
await self.redis_client.hset(key, mapping=cache_data)
|
|
|
|
async def get_cached_location(self, location_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Retrieve cached location data"""
|
|
key = f"location:{location_id}:cache"
|
|
data = await self.redis_client.hgetall(key)
|
|
if not data:
|
|
return None
|
|
|
|
# Deserialize JSON fields
|
|
for k in ['exits', 'interactables', 'npcs']:
|
|
if k in data:
|
|
try:
|
|
data[k] = json.loads(data[k])
|
|
except:
|
|
pass
|
|
return data
|
|
|
|
async def cache_inventory(self, character_id: int, inventory: List[Dict], ttl: int = 600):
|
|
"""Cache player inventory"""
|
|
key = f"player:{character_id}:inventory"
|
|
await self.redis_client.delete(key) # Clear old data
|
|
if inventory:
|
|
inventory_json = [json.dumps(item) for item in inventory]
|
|
await self.redis_client.rpush(key, *inventory_json)
|
|
await self.redis_client.expire(key, ttl)
|
|
|
|
async def get_cached_inventory(self, character_id: int) -> Optional[List[Dict]]:
|
|
"""Retrieve cached inventory"""
|
|
key = f"player:{character_id}:inventory"
|
|
items = await self.redis_client.lrange(key, 0, -1)
|
|
if not items:
|
|
return None
|
|
return [json.loads(item) for item in items]
|
|
|
|
async def invalidate_inventory(self, character_id: int):
|
|
"""Invalidate inventory cache"""
|
|
key = f"player:{character_id}:inventory"
|
|
await self.redis_client.delete(key)
|
|
|
|
# ====== PUB/SUB ======
|
|
|
|
async def publish_to_location(self, location_id: str, message: Dict[str, Any], exclude_player_id: Optional[int] = None):
|
|
"""Publish message to location channel"""
|
|
channel = f"location:{location_id}"
|
|
payload = {
|
|
"type": "location_update",
|
|
"location_id": location_id,
|
|
"exclude_player_id": exclude_player_id,
|
|
"payload": message
|
|
}
|
|
await self.redis_client.publish(channel, json.dumps(payload))
|
|
|
|
async def publish_to_player(self, character_id: int, message: Dict[str, Any]):
|
|
"""Publish message to player-specific channel"""
|
|
channel = f"player:{character_id}"
|
|
payload = {
|
|
"type": "personal_message",
|
|
"player_id": character_id,
|
|
"payload": message
|
|
}
|
|
await self.redis_client.publish(channel, json.dumps(payload))
|
|
|
|
async def subscribe_to_channels(self, channels: List[str]):
|
|
"""Subscribe to multiple channels"""
|
|
await self.pubsub.subscribe(*channels)
|
|
self.subscribed_channels.update(channels)
|
|
print(f"📡 Worker {self.worker_id} subscribed to {len(channels)} channels")
|
|
|
|
async def subscribe_to_player(self, character_id: int):
|
|
"""Subscribe to player-specific channel"""
|
|
channel = f"player:{character_id}"
|
|
await self.pubsub.subscribe(channel)
|
|
self.subscribed_channels.add(channel)
|
|
|
|
async def unsubscribe_from_player(self, character_id: int):
|
|
"""Unsubscribe from player channel"""
|
|
channel = f"player:{character_id}"
|
|
if channel in self.subscribed_channels:
|
|
await self.pubsub.unsubscribe(channel)
|
|
self.subscribed_channels.remove(channel)
|
|
|
|
async def listen_for_messages(self, message_handler):
|
|
"""Listen for pub/sub messages (blocking)"""
|
|
async for message in self.pubsub.listen():
|
|
if message['type'] == 'message':
|
|
try:
|
|
data = json.loads(message['data'])
|
|
await message_handler(message['channel'], data)
|
|
except Exception as e:
|
|
print(f"❌ Error handling pub/sub message: {e}")
|
|
|
|
# Global Redis instance
|
|
redis_manager = RedisManager()
|
|
```
|
|
|
|
---
|
|
|
|
### Phase 3: Enhanced Connection Manager
|
|
|
|
#### 3.1 Update ConnectionManager to Use Redis
|
|
|
|
**File**: `api/main.py` (ConnectionManager class)
|
|
|
|
**Changes**:
|
|
|
|
1. **On WebSocket Connect**:
|
|
- Store connection locally (as before)
|
|
- Register player session in Redis
|
|
- Add player to location registry
|
|
- Subscribe to player-specific channel
|
|
- Broadcast "player_joined" to location
|
|
|
|
2. **On WebSocket Disconnect**:
|
|
- Remove local connection
|
|
- Delete player session from Redis
|
|
- Remove from location registry
|
|
- Unsubscribe from player channel
|
|
- Broadcast "player_left" to location
|
|
|
|
3. **Broadcasting**:
|
|
- `send_personal_message()`: Publish to Redis player channel (not direct WebSocket)
|
|
- `send_to_location()`: Publish to Redis location channel (not query DB)
|
|
- Local message handler receives Redis pub/sub and sends to local WebSockets
|
|
|
|
**Implementation**:
|
|
|
|
```python
|
|
class ConnectionManager:
|
|
def __init__(self, redis_manager):
|
|
self.active_connections: Dict[int, WebSocket] = {}
|
|
self.player_usernames: Dict[int, str] = {}
|
|
self.redis = redis_manager
|
|
self.worker_id = redis_manager.worker_id
|
|
|
|
async def connect(self, websocket: WebSocket, player_id: int, username: str, location_id: str):
|
|
"""Accept WebSocket and register in Redis"""
|
|
await websocket.accept()
|
|
self.active_connections[player_id] = websocket
|
|
self.player_usernames[player_id] = username
|
|
|
|
# Register in Redis
|
|
await self.redis.set_player_session(player_id, {
|
|
'websocket_connected': 'true',
|
|
'location_id': location_id,
|
|
'username': username
|
|
})
|
|
await self.redis.add_player_to_location(player_id, location_id)
|
|
await self.redis.subscribe_to_player(player_id)
|
|
|
|
print(f"🔌 WebSocket connected: {username} (player_id={player_id}) on {self.worker_id}")
|
|
|
|
def disconnect(self, player_id: int, location_id: str):
|
|
"""Remove WebSocket and clean up Redis"""
|
|
if player_id in self.active_connections:
|
|
username = self.player_usernames.get(player_id, "unknown")
|
|
del self.active_connections[player_id]
|
|
if player_id in self.player_usernames:
|
|
del self.player_usernames[player_id]
|
|
|
|
# Clean up Redis (fire-and-forget)
|
|
asyncio.create_task(self.redis.delete_player_session(player_id))
|
|
asyncio.create_task(self.redis.remove_player_from_location(player_id, location_id))
|
|
asyncio.create_task(self.redis.unsubscribe_from_player(player_id))
|
|
|
|
print(f"🔌 WebSocket disconnected: {username} (player_id={player_id})")
|
|
|
|
async def send_personal_message(self, player_id: int, message: dict):
|
|
"""Publish to Redis player channel (cross-worker)"""
|
|
await self.redis.publish_to_player(player_id, message)
|
|
|
|
async def send_to_location(self, location_id: str, message: dict, exclude_player_id: Optional[int] = None):
|
|
"""Publish to Redis location channel (cross-worker)"""
|
|
await self.redis.publish_to_location(location_id, message, exclude_player_id)
|
|
|
|
async def handle_redis_message(self, channel: str, data: dict):
|
|
"""Handle incoming Redis pub/sub messages"""
|
|
if channel.startswith('player:'):
|
|
# Personal message - send to local WebSocket if connected
|
|
player_id = int(channel.split(':')[1])
|
|
if player_id in self.active_connections:
|
|
try:
|
|
await self.active_connections[player_id].send_json(data['payload'])
|
|
except Exception as e:
|
|
print(f"❌ Failed to send to player {player_id}: {e}")
|
|
# Don't disconnect here - let WebSocket handler detect it
|
|
|
|
elif channel.startswith('location:'):
|
|
# Location broadcast - send to local WebSockets in that location
|
|
location_id = data['location_id']
|
|
exclude_player_id = data.get('exclude_player_id')
|
|
payload = data['payload']
|
|
|
|
# Get players in location from Redis (fast!)
|
|
players_in_location = await self.redis.get_players_in_location(location_id)
|
|
|
|
# Send to local connections only
|
|
for player_id in players_in_location:
|
|
if player_id != exclude_player_id and player_id in self.active_connections:
|
|
try:
|
|
await self.active_connections[player_id].send_json(payload)
|
|
except:
|
|
pass # Let WebSocket handler detect disconnects
|
|
```
|
|
|
|
---
|
|
|
|
### Phase 4: Preload Static Data into Redis
|
|
|
|
#### 4.1 Startup Data Loader
|
|
|
|
**File**: `api/redis_loader.py`
|
|
|
|
```python
|
|
"""
|
|
Load static game data into Redis on server startup.
|
|
Reduces database queries for frequently accessed data.
|
|
"""
|
|
from .redis_manager import redis_manager
|
|
from .world_loader import load_world
|
|
from .items import ItemsManager
|
|
|
|
async def preload_game_data():
|
|
"""Load all static game data into Redis"""
|
|
print("🔄 Preloading game data into Redis...")
|
|
|
|
# Load world data
|
|
world = load_world()
|
|
|
|
# Cache all locations
|
|
for location_id, location in world.locations.items():
|
|
location_data = {
|
|
'id': location.id,
|
|
'name': location.name,
|
|
'description': location.description,
|
|
'exits': location.exits,
|
|
'danger_level': location.danger_level,
|
|
'image_path': location.image_path,
|
|
'x': getattr(location, 'x', 0.0),
|
|
'y': getattr(location, 'y', 0.0),
|
|
'tags': getattr(location, 'tags', []),
|
|
'interactables': [interactable.to_dict() for interactable in location.interactables],
|
|
'npcs': location.npcs
|
|
}
|
|
await redis_manager.cache_location(location_id, location_data)
|
|
|
|
print(f"✅ Cached {len(world.locations)} locations in Redis")
|
|
|
|
# Cache all items
|
|
items_manager = ItemsManager()
|
|
for item_id, item in items_manager.items.items():
|
|
item_data = {
|
|
'id': item.id,
|
|
'name': item.name,
|
|
'description': item.description,
|
|
'type': item.type,
|
|
'weight': item.weight,
|
|
'volume': item.volume,
|
|
'emoji': item.emoji,
|
|
'image_path': item.image_path,
|
|
'equippable': item.equippable,
|
|
'consumable': item.consumable,
|
|
'stats': item.stats or {},
|
|
'effects': item.effects or {}
|
|
}
|
|
key = f"item:{item_id}:def"
|
|
await redis_manager.redis_client.hset(key, mapping={
|
|
k: json.dumps(v) if isinstance(v, (dict, list, bool)) else str(v)
|
|
for k, v in item_data.items()
|
|
})
|
|
|
|
print(f"✅ Cached {len(items_manager.items)} items in Redis")
|
|
```
|
|
|
|
**Update `lifespan()` in `main.py`**:
|
|
|
|
```python
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Startup
|
|
await db.init_db()
|
|
print("✅ Database initialized")
|
|
|
|
# Connect to Redis
|
|
await redis_manager.connect()
|
|
|
|
# Preload static data
|
|
await preload_game_data()
|
|
|
|
# Start pub/sub listener in background
|
|
asyncio.create_task(redis_manager.listen_for_messages(manager.handle_redis_message))
|
|
|
|
# Subscribe to all location channels
|
|
location_channels = [f"location:{loc_id}" for loc_id in LOCATIONS.keys()]
|
|
await redis_manager.subscribe_to_channels(location_channels + ['game:broadcast'])
|
|
|
|
# Start background tasks
|
|
tasks = await background_tasks.start_background_tasks(manager, LOCATIONS)
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
await background_tasks.stop_background_tasks(tasks)
|
|
await redis_manager.disconnect()
|
|
```
|
|
|
|
---
|
|
|
|
### Phase 5: Update Endpoints to Use Redis Cache
|
|
|
|
#### 5.1 Example: `/api/game/state` Endpoint
|
|
|
|
**Before** (database queries):
|
|
```python
|
|
@app.get("/api/game/state")
|
|
async def get_game_state(current_user: dict = Depends(get_current_user)):
|
|
player = await db.get_player_by_id(current_user['id']) # DB query
|
|
location = LOCATIONS.get(player['location_id']) # In-memory
|
|
inventory = await db.get_inventory(current_user['id']) # DB query
|
|
equipment = await db.get_all_equipment(current_user['id']) # DB query
|
|
# ... more DB queries
|
|
```
|
|
|
|
**After** (Redis cache):
|
|
```python
|
|
@app.get("/api/game/state")
|
|
async def get_game_state(current_user: dict = Depends(get_current_user)):
|
|
# Get player session from Redis (cached)
|
|
player_session = await redis_manager.get_player_session(current_user['id'])
|
|
|
|
# If not in cache, fetch from DB and cache
|
|
if not player_session:
|
|
player = await db.get_player_by_id(current_user['id'])
|
|
await redis_manager.set_player_session(current_user['id'], player)
|
|
player_session = player
|
|
|
|
# Get location from Redis cache
|
|
location_data = await redis_manager.get_cached_location(player_session['location_id'])
|
|
|
|
# Get inventory from Redis cache
|
|
inventory = await redis_manager.get_cached_inventory(current_user['id'])
|
|
if not inventory:
|
|
inventory = await db.get_inventory(current_user['id'])
|
|
await redis_manager.cache_inventory(current_user['id'], inventory)
|
|
|
|
# ... rest of endpoint
|
|
```
|
|
|
|
#### 5.2 Example: `/api/game/move` Endpoint
|
|
|
|
**Update to invalidate caches and publish events**:
|
|
|
|
```python
|
|
@app.post("/api/game/move")
|
|
async def move(move_req: MoveRequest, current_user: dict = Depends(get_current_user)):
|
|
# ... existing validation ...
|
|
|
|
# Execute move
|
|
success, message, new_location_id, stamina_cost, distance = await game_logic.move_player(
|
|
current_user['id'],
|
|
move_req.direction,
|
|
LOCATIONS
|
|
)
|
|
|
|
if success:
|
|
old_location = player['location_id']
|
|
|
|
# Update Redis location registry
|
|
await redis_manager.move_player_location(
|
|
current_user['id'],
|
|
old_location,
|
|
new_location_id
|
|
)
|
|
|
|
# Update player session cache
|
|
await redis_manager.set_player_session(current_user['id'], {
|
|
'location_id': new_location_id,
|
|
'stamina': player['stamina'] - stamina_cost,
|
|
'last_movement_time': current_time
|
|
})
|
|
|
|
# Broadcast to OLD location via Redis pub/sub
|
|
await redis_manager.publish_to_location(
|
|
old_location,
|
|
{
|
|
"type": "location_update",
|
|
"data": {
|
|
"message": f"{player['name']} left the area",
|
|
"action": "player_left",
|
|
"player_id": current_user['id']
|
|
},
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
},
|
|
exclude_player_id=current_user['id']
|
|
)
|
|
|
|
# Broadcast to NEW location via Redis pub/sub
|
|
await redis_manager.publish_to_location(
|
|
new_location_id,
|
|
{
|
|
"type": "location_update",
|
|
"data": {
|
|
"message": f"{player['name']} arrived",
|
|
"action": "player_arrived",
|
|
"player_id": current_user['id']
|
|
},
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
},
|
|
exclude_player_id=current_user['id']
|
|
)
|
|
|
|
# Send direct update to moving player
|
|
await redis_manager.publish_to_player(
|
|
current_user['id'],
|
|
{
|
|
"type": "state_update",
|
|
"data": {
|
|
"player": {
|
|
"stamina": player['stamina'] - stamina_cost,
|
|
"location_id": new_location_id
|
|
}
|
|
},
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
### Phase 6: Multi-Worker Setup
|
|
|
|
#### 6.1 Update Docker Compose for Multiple Workers
|
|
|
|
**File**: `docker-compose.yml`
|
|
|
|
```yaml
|
|
echoes_of_the_ashes_api:
|
|
build:
|
|
context: .
|
|
dockerfile: Dockerfile.api
|
|
restart: unless-stopped
|
|
env_file:
|
|
- .env
|
|
environment:
|
|
- WORKERS=4 # Number of Uvicorn workers
|
|
volumes:
|
|
- ./gamedata:/app/gamedata:ro
|
|
- ./images:/app/images:ro
|
|
depends_on:
|
|
- echoes_of_the_ashes_db
|
|
- echoes_redis
|
|
deploy:
|
|
replicas: 1 # Single container, multiple workers inside
|
|
networks:
|
|
- default_docker
|
|
- traefik
|
|
labels:
|
|
# ... Traefik labels
|
|
```
|
|
|
|
#### 6.2 Update Dockerfile to Support Multi-Worker
|
|
|
|
**File**: `Dockerfile.api`
|
|
|
|
```dockerfile
|
|
# ... existing build steps ...
|
|
|
|
# Run with multiple workers
|
|
CMD ["sh", "-c", "uvicorn api.main:app --host 0.0.0.0 --port 8000 --workers ${WORKERS:-4}"]
|
|
```
|
|
|
|
---
|
|
|
|
### Phase 7: Background Tasks with Redis Coordination
|
|
|
|
#### 7.1 Distributed Task Locking
|
|
|
|
**File**: `api/background_tasks.py`
|
|
|
|
**Update to use Redis locks instead of file locks**:
|
|
|
|
```python
|
|
import asyncio
|
|
from .redis_manager import redis_manager
|
|
|
|
async def acquire_distributed_lock(lock_name: str, ttl: int = 300) -> bool:
|
|
"""Acquire a distributed lock in Redis"""
|
|
key = f"lock:{lock_name}"
|
|
acquired = await redis_manager.redis_client.set(
|
|
key,
|
|
redis_manager.worker_id,
|
|
nx=True, # Only set if not exists
|
|
ex=ttl # TTL in seconds
|
|
)
|
|
return acquired
|
|
|
|
async def release_distributed_lock(lock_name: str):
|
|
"""Release a distributed lock"""
|
|
key = f"lock:{lock_name}"
|
|
# Only delete if we own the lock
|
|
lock_value = await redis_manager.redis_client.get(key)
|
|
if lock_value == redis_manager.worker_id:
|
|
await redis_manager.redis_client.delete(key)
|
|
|
|
async def spawn_manager_task():
|
|
"""Background task: Spawn wandering enemies (single worker)"""
|
|
while True:
|
|
try:
|
|
# Try to acquire lock
|
|
if await acquire_distributed_lock('spawn_manager', ttl=60):
|
|
print(f"🔒 {redis_manager.worker_id} acquired spawn_manager lock")
|
|
|
|
# Do work
|
|
await spawn_wandering_enemies()
|
|
|
|
# Release lock
|
|
await release_distributed_lock('spawn_manager')
|
|
else:
|
|
print(f"⏭️ Another worker handling spawn_manager")
|
|
except Exception as e:
|
|
print(f"❌ Error in spawn_manager: {e}")
|
|
await release_distributed_lock('spawn_manager')
|
|
|
|
await asyncio.sleep(30)
|
|
```
|
|
|
|
---
|
|
|
|
## Performance Benefits
|
|
|
|
### Expected Improvements
|
|
|
|
1. **Horizontal Scaling**
|
|
- Support 4+ workers behind load balancer
|
|
- Linear scaling with player count
|
|
- No single point of failure
|
|
|
|
2. **Database Load Reduction**
|
|
- **Location queries**: 0 DB queries (100% Redis)
|
|
- **Player sessions**: 90% cache hit rate (estimated)
|
|
- **Inventory queries**: 80% cache hit rate (estimated)
|
|
- **Overall DB load**: Reduced by 70-80%
|
|
|
|
3. **Latency Improvements**
|
|
- Location broadcasts: Redis pub/sub ~1-2ms vs DB query ~10-50ms
|
|
- Player state lookups: Redis GET ~0.5ms vs DB query ~5-20ms
|
|
- Inventory fetches: Redis LIST ~1ms vs DB query ~10-30ms
|
|
|
|
4. **Concurrent Player Capacity**
|
|
- Current (single worker): ~100-200 concurrent players
|
|
- With Redis (4 workers): ~800-1200 concurrent players
|
|
|
|
---
|
|
|
|
## Migration Strategy
|
|
|
|
### Step-by-Step Rollout
|
|
|
|
**Week 1: Infrastructure**
|
|
- Add Redis container to docker-compose
|
|
- Create redis_manager.py module
|
|
- Test Redis connectivity
|
|
|
|
**Week 2: Caching Layer**
|
|
- Implement session caching
|
|
- Implement location caching
|
|
- Implement inventory caching
|
|
- A/B test: 10% traffic uses Redis cache
|
|
|
|
**Week 3: Pub/Sub Integration**
|
|
- Update ConnectionManager for Redis pub/sub
|
|
- Migrate location broadcasts to Redis
|
|
- Migrate personal messages to Redis
|
|
- Test with 2 workers
|
|
|
|
**Week 4: Multi-Worker Rollout**
|
|
- Deploy 2 workers in production
|
|
- Monitor for 3 days
|
|
- If stable, scale to 4 workers
|
|
- Monitor database load reduction
|
|
|
|
**Week 5: Background Tasks**
|
|
- Migrate background tasks to Redis locks
|
|
- Remove file-based locking
|
|
- Test distributed task execution
|
|
|
|
**Week 6: Optimization**
|
|
- Fine-tune TTLs
|
|
- Add monitoring/metrics
|
|
- Optimize cache invalidation
|
|
- Performance profiling
|
|
|
|
---
|
|
|
|
## Monitoring & Metrics
|
|
|
|
### Redis Metrics to Track
|
|
|
|
1. **Connection Pool**
|
|
- Active connections per worker
|
|
- Connection failures
|
|
- Reconnection attempts
|
|
|
|
2. **Cache Performance**
|
|
- Hit rate (target: >80%)
|
|
- Miss rate
|
|
- Eviction rate
|
|
|
|
3. **Pub/Sub**
|
|
- Messages published/second
|
|
- Subscriber count
|
|
- Message delivery latency
|
|
|
|
4. **Memory Usage**
|
|
- Total Redis memory
|
|
- Key count by pattern
|
|
- Eviction policy effectiveness
|
|
|
|
### Database Metrics to Track
|
|
|
|
1. **Query Reduction**
|
|
- Queries/second before vs after
|
|
- Slow query log reduction
|
|
- Connection pool utilization
|
|
|
|
---
|
|
|
|
## Rollback Plan
|
|
|
|
If Redis integration causes issues:
|
|
|
|
1. **Immediate Rollback** (< 5 minutes)
|
|
- Set `REDIS_ENABLED=false` env variable
|
|
- Restart API containers
|
|
- Falls back to original ConnectionManager
|
|
|
|
2. **Graceful Degradation**
|
|
- Keep DB queries as fallback for cache misses
|
|
- Log Redis errors but don't fail requests
|
|
- Monitor error rate and alert if > 5%
|
|
|
|
---
|
|
|
|
## Cost Estimation
|
|
|
|
### Redis Resource Requirements
|
|
|
|
- **Memory**: 512MB (covers ~5000 concurrent players)
|
|
- **CPU**: Minimal (< 5% of single core)
|
|
- **Network**: ~1-5 Mbps for pub/sub
|
|
|
|
### Infrastructure Costs (AWS example)
|
|
|
|
- **ElastiCache (Redis)**: ~$15-30/month (cache.t3.micro)
|
|
- **Additional EC2 capacity**: ~$20-40/month (for extra workers)
|
|
- **Total increase**: ~$35-70/month
|
|
- **Savings**: ~$50-100/month (reduced RDS IOPS/queries)
|
|
- **Net cost**: $0-20/month (may actually save money!)
|
|
|
|
---
|
|
|
|
## Success Criteria
|
|
|
|
### Key Performance Indicators (KPIs)
|
|
|
|
1. **Scalability**
|
|
- ✅ Support 4+ workers
|
|
- ✅ Handle 1000+ concurrent players
|
|
- ✅ Linear scaling with worker count
|
|
|
|
2. **Performance**
|
|
- ✅ 70% reduction in database queries
|
|
- ✅ < 10ms p95 latency for cached operations
|
|
- ✅ < 2ms p95 for Redis pub/sub delivery
|
|
|
|
3. **Reliability**
|
|
- ✅ 99.9% uptime
|
|
- ✅ Graceful handling of Redis failures
|
|
- ✅ No message loss during worker restarts
|
|
|
|
4. **Developer Experience**
|
|
- ✅ Simple cache invalidation API
|
|
- ✅ Clear pub/sub message patterns
|
|
- ✅ Easy to add new cached data types
|
|
|
|
---
|
|
|
|
## Questions for Review
|
|
|
|
1. **TTL Strategy**: Are the proposed TTLs (30min for sessions, 10min for inventory) appropriate?
|
|
|
|
2. **Cache Invalidation**: Should we implement more aggressive cache invalidation (e.g., on every DB write)?
|
|
|
|
3. **Worker Count**: Start with 2 workers or go straight to 4?
|
|
|
|
4. **Redis Persistence**: Use RDB, AOF, or both? (Affects recovery time vs write performance)
|
|
|
|
5. **Fallback Strategy**: Should Redis cache misses always fall back to DB, or fail fast?
|
|
|
|
6. **Monitoring**: What additional metrics do you want to track?
|
|
|
|
---
|
|
|
|
## Next Steps
|
|
|
|
**If approved, I will:**
|
|
|
|
1. Create detailed implementation tasks for each phase
|
|
2. Set up feature flags for gradual rollout
|
|
3. Add comprehensive logging for debugging
|
|
4. Create automated tests for Redis functionality
|
|
5. Document all new pub/sub message formats
|
|
6. Create runbook for Redis operational issues
|
|
|
|
**Please review and provide feedback on:**
|
|
- Architecture approach
|
|
- Data structure choices
|
|
- Migration strategy
|
|
- Timeline/prioritization
|
|
- Any concerns or alternative approaches
|
|
|
|
---
|
|
|
|
## Appendix: Alternative Approaches Considered
|
|
|
|
### A. Using Redis Streams Instead of Pub/Sub
|
|
|
|
**Pros**: Message persistence, consumer groups, replay capability
|
|
**Cons**: More complex, higher memory usage, not needed for ephemeral game events
|
|
**Decision**: Stick with Pub/Sub for simplicity
|
|
|
|
### B. Using Kafka for Message Broker
|
|
|
|
**Pros**: Better for high-throughput, message persistence
|
|
**Cons**: Much heavier infrastructure, overkill for this use case
|
|
**Decision**: Redis Pub/Sub is sufficient
|
|
|
|
### C. Caching in Application Memory Instead of Redis
|
|
|
|
**Pros**: Faster access (no network hop)
|
|
**Cons**: No cross-worker sharing, higher memory per worker
|
|
**Decision**: Redis for cross-worker coordination
|
|
|