23 KiB
Redis Integration: Questions & Answers
Q1: Why cache locations/items if they're already in memory?
Short Answer: You're absolutely right - we should NOT cache static data that's already loaded in memory!
Revised Approach:
What to Cache in Redis:
- ✅ Player sessions (dynamic, needs cross-worker sharing)
- ✅ Location player registry (who's where, changes constantly)
- ✅ Player inventory (reduce DB queries for frequently accessed data)
- ✅ Active combat states (for cross-worker coordination)
- ✅ Dropped items per location (dynamic world state)
What NOT to Cache:
- ❌ Locations - Already in
LOCATIONSdict fromworld_loader.py - ❌ Items - Already in
ITEMS_MANAGER.itemsfromitems.py - ❌ NPCs - Already in
NPCSdict fromnpcs.py - ❌ Interactables - Already in each
Location.interactableslist
Why This Matters:
- Each worker loads
load_world()on startup → all static data in memory - No point duplicating in Redis (wastes memory, adds latency)
- Redis should only store dynamic, cross-worker state
Q2: How do unique items work?
Database Structure:
# unique_items table (single source of truth)
unique_items = Table(
"unique_items",
Column("id", Integer, primary_key=True),
Column("item_id", String), # Template reference (e.g., "iron_sword")
Column("durability", Integer),
Column("max_durability", Integer),
Column("tier", Integer, default=1),
Column("unique_stats", JSON), # Custom stats
Column("created_at", Float)
)
# inventory table (references unique_items)
inventory = Table(
"inventory",
Column("id", Integer, primary_key=True),
Column("character_id", Integer),
Column("item_id", String), # Template ID
Column("quantity", Integer), # Always 1 for unique items
Column("unique_item_id", Integer, ForeignKey("unique_items.id")), # Link
Column("is_equipped", Boolean)
)
Flow:
- Creation: NPC drops weapon →
create_unique_item()→ insert intounique_items - Pickup: Player picks up → insert into
inventorywithunique_item_idreference - Equip: Player equips → queries join
inventory ⋈ unique_itemsto get stats - Drop: Player drops → move to
dropped_items(keepingunique_item_idlink) - Deletion: Item despawns → CASCADE delete removes from
inventory/dropped_items
Redis Caching Strategy:
# Cache unique item data when equipped/viewed
key = f"unique_item:{unique_item_id}"
value = {
"item_id": "iron_sword",
"durability": 85,
"max_durability": 100,
"tier": 2,
"unique_stats": {"damage_bonus": 5}
}
# TTL: 5 minutes (invalidate on durability change)
Q3: How do enemies work with custom stats?
Combat Initialization:
When combat starts, NPC gets randomized HP:
# NPCDefinition in npcs.py
@dataclass
class NPCDefinition:
hp_min: int # e.g., 80
hp_max: int # e.g., 120
damage_min: int
damage_max: int
defense: int
# ... other stats
# When combat starts (in game_logic.py or main.py)
import random
npc_def = NPCS.get("raider") # Load from memory
npc_hp = random.randint(npc_def.hp_min, npc_def.hp_max) # Random HP
# Store in database
await db.create_combat(
player_id=player_id,
npc_id="raider",
npc_hp=npc_hp, # Randomized
npc_max_hp=npc_hp,
location_id=location_id
)
Redis Caching for Active Combat:
# Cache active combat state (avoid repeated DB queries)
key = f"player:{character_id}:combat"
value = {
"npc_id": "raider",
"npc_hp": 95,
"npc_max_hp": 115,
"turn": "player",
"npc_damage_min": 8,
"npc_damage_max": 15,
"npc_defense": 3
}
# TTL: No expiration (deleted when combat ends)
Combat Flow:
- Player attacks → Check Redis cache for combat state
- If miss → Query DB → Cache in Redis
- Calculate damage, update NPC HP
- Update Redis cache + Publish
combat_updateto player channel - NPC turn → Repeat
- Combat ends → Delete Redis cache + Publish
combat_over
Q4: How is everything loaded on server startup?
Current Flow (per worker):
# api/main.py - Lifespan startup
@asynccontextmanager
async def lifespan(app: FastAPI):
# 1. Database
await db.init_db() # Connect to PostgreSQL
# 2. Load static data into memory (THIS PART)
WORLD: World = load_world() # Load locations from gamedata/locations.json
LOCATIONS: Dict[str, Location] = WORLD.locations
ITEMS_MANAGER = ItemsManager() # Load items from gamedata/items.json
# NPCs loaded in data/npcs.py module (imported on demand)
# 3. Start background tasks (single worker via file lock)
tasks = await background_tasks.start_background_tasks(manager, LOCATIONS)
yield
With Redis Integration:
@asynccontextmanager
async def lifespan(app: FastAPI):
# 1. Database
await db.init_db()
# 2. Redis connection
await redis_manager.connect()
# 3. Load static data (STAYS IN MEMORY - NO REDIS CACHING)
WORLD: World = load_world()
LOCATIONS: Dict[str, Location] = WORLD.locations
ITEMS_MANAGER = ItemsManager()
# 4. Subscribe to Redis Pub/Sub channels
location_channels = [f"location:{loc_id}" for loc_id in LOCATIONS.keys()]
await redis_manager.subscribe_to_channels(location_channels + ['game:broadcast'])
# 5. Start Redis message listener (background task)
asyncio.create_task(redis_manager.listen_for_messages(manager.handle_redis_message))
# 6. Register this worker in Redis
await redis_manager.redis_client.sadd('active_workers', redis_manager.worker_id)
# 7. Start background tasks (distributed via Redis locks)
tasks = await background_tasks.start_background_tasks(manager, LOCATIONS)
yield
# Cleanup
await redis_manager.redis_client.srem('active_workers', redis_manager.worker_id)
await redis_manager.disconnect()
Q5: How many channels can exist?
Redis Pub/Sub Channels:
Fixed Channels (Always Active):
game:broadcast- Global announcements (1 channel)game:workers- Worker coordination (1 channel)
Dynamic Channels (Created on Demand):
Location Channels (14 currently):
location:start_pointlocation:overpasslocation:gas_station- ... (one per location in
locations.json)
Player Channels (one per connected player):
player:1(character_id=1)player:2player:5- ... (created on WebSocket connect, destroyed on disconnect)
Total Active Channels:
- Minimum: 16 (2 fixed + 14 locations)
- With 100 players: 116 (2 + 14 + 100)
- With 1000 players: 1016 (2 + 14 + 1000)
Redis Limits:
- Redis supports millions of channels simultaneously
- Each channel has minimal memory overhead (~100 bytes)
- 1000 channels = ~100 KB memory (negligible)
Subscription Strategy:
- All workers subscribe to:
game:broadcast+ all location channels - Each worker subscribes to: only its connected players' channels
- When player connects → Worker subscribes to
player:{id} - When player disconnects → Worker unsubscribes from
player:{id}
Q6: How does client update data in the UI?
Current Flow (without Redis):
1. User clicks "Attack" button
↓
2. Client: POST /api/game/combat/action {"action": "attack"}
↓
3. Server: Process attack, update DB
↓
4. Server: Send WebSocket message to player
↓
5. Server: Query DB for other players in location
↓
6. Server: Send WebSocket messages to location
↓
7. Client: Receives WebSocket "combat_update"
↓
8. Client: Updates UI (HP bar, combat log)
↓
9. Client: GET /api/game/state (refresh full state)
↓
10. Server: Query DB for player, inventory, combat, etc.
↓
11. Client: Re-render entire game UI
With Redis (optimized):
1. User clicks "Attack" button
↓
2. Client: POST /api/game/combat/action {"action": "attack"}
↓
3. Server: Process attack, update DB + Redis cache
↓
4. Server: Publish to Redis channel "player:{id}" (personal message)
↓
5. Worker handling that player: Receives Redis message
↓
6. Worker: Send WebSocket to local connection
↓
7. Client: Receives WebSocket "combat_update" with ALL needed data
↓
8. Client: Updates UI directly from WebSocket payload (NO API CALL)
↓
9. Server: Publish to Redis channel "location:{id}" (broadcast)
↓
10. All workers: Receive location broadcast
↓
11. Workers: Send WebSocket to their local connections in that location
↓
12. Other players: UI updates with "Jocaru is in combat"
Key Changes:
- ✅ No more
GET /api/game/stateafter actions - WebSocket payload contains everything - ✅ Cross-worker broadcasts - Redis pub/sub ensures all workers relay messages
- ✅ Reduced DB queries - Combat state cached in Redis
- ✅ Faster UI updates - WebSocket messages < 2ms via Redis
WebSocket Message Format (enhanced):
{
"type": "combat_update",
"data": {
"message": "You dealt 12 damage!",
"log_entry": "You dealt 12 damage!",
"combat_over": false,
"combat": {
"npc_id": "raider",
"npc_hp": 85,
"npc_max_hp": 115,
"turn": "npc"
},
"player": {
"hp": 78,
"stamina": 42,
"xp": 1250,
"level": 5
}
},
"timestamp": "2025-11-09T18:00:00Z"
}
Client receives this → Updates HP bar, combat log, turn indicator WITHOUT calling /api/game/state.
Q7: Disconnected players staying in location?
Excellent Gameplay Mechanic! This adds risk/consequence to disconnecting in dangerous areas.
Implementation:
When Player Disconnects:
# ConnectionManager.disconnect()
async def disconnect(self, player_id: int):
# 1. Remove local WebSocket connection
if player_id in self.active_connections:
del self.active_connections[player_id]
# 2. Update Redis session (mark as disconnected)
session = await redis_manager.get_player_session(player_id)
if session:
session['websocket_connected'] = 'false'
session['disconnect_time'] = str(time.time())
await redis_manager.set_player_session(player_id, session, ttl=3600) # Keep for 1 hour
# 3. KEEP player in location registry (don't remove)
# await redis_manager.remove_player_from_location(...) # DON'T DO THIS
# 4. Broadcast to location
await redis_manager.publish_to_location(
session['location_id'],
{
"type": "player_status_change",
"data": {
"player_id": player_id,
"username": session['username'],
"status": "disconnected",
"message": f"{session['username']} has disconnected (vulnerable)"
}
}
)
When Other Players Query Location:
# GET /api/game/location endpoint
@app.get("/api/game/location")
async def get_current_location(current_user: dict = Depends(get_current_user)):
# Get players in location from Redis
player_ids = await redis_manager.get_players_in_location(location_id)
other_players = []
for pid in player_ids:
if pid == current_user['id']:
continue
# Get player session
session = await redis_manager.get_player_session(pid)
if session:
other_players.append({
"id": pid,
"username": session['username'],
"level": int(session['level']),
"hp": int(session['hp']),
"is_connected": session['websocket_connected'] == 'true',
"can_attack": True # Always true, even if disconnected!
})
return {
"id": location_id,
"other_players": other_players # Includes disconnected players
}
Combat with Disconnected Player:
# POST /api/game/pvp/initiate
@app.post("/api/game/pvp/initiate")
async def initiate_pvp(target_id: int, current_user: dict = Depends(get_current_user)):
# Check target session
target_session = await redis_manager.get_player_session(target_id)
if not target_session:
raise HTTPException(400, detail="Target player not found")
# Allow combat even if disconnected
is_connected = target_session['websocket_connected'] == 'true'
# Create PvP combat
pvp_combat = await db.create_pvp_combat(
attacker_id=current_user['id'],
defender_id=target_id,
location_id=current_user['location_id']
)
if is_connected:
# Target is online → Send WebSocket notification
await redis_manager.publish_to_player(target_id, {
"type": "pvp_challenge",
"data": {
"attacker": current_user['name'],
"attacker_level": current_user['level']
}
})
else:
# Target is offline → Auto-acknowledge, they can't respond
await db.acknowledge_pvp_combat(pvp_combat['id'], target_id)
# Attacker gets free first strike advantage
return {
"message": f"{target_session['username']} is disconnected - you get first strike!",
"pvp_combat": pvp_combat,
"target_vulnerable": True
}
Cleanup Policy (optional):
# Background task: Remove disconnected players after 1 hour
async def cleanup_disconnected_players():
while True:
await asyncio.sleep(300) # Every 5 minutes
# Get all player sessions
keys = await redis_manager.redis_client.keys("player:*:session")
for key in keys:
session = await redis_manager.redis_client.hgetall(key)
if session['websocket_connected'] == 'false':
disconnect_time = float(session['disconnect_time'])
# If disconnected for > 1 hour
if time.time() - disconnect_time > 3600:
character_id = int(key.split(':')[1])
location_id = session['location_id']
# Remove from location registry
await redis_manager.remove_player_from_location(character_id, location_id)
# Delete session
await redis_manager.delete_player_session(character_id)
print(f"🧹 Cleaned up disconnected player {character_id}")
UI Display:
// Frontend: Show disconnected status
{otherPlayers.map(player => (
<div className={`player-card ${!player.is_connected ? 'disconnected' : ''}`}>
<span className="player-name">{player.username}</span>
<span className="player-level">Lv. {player.level}</span>
{!player.is_connected && (
<span className="player-status">⚠️ Disconnected (Vulnerable)</span>
)}
{player.can_attack && (
<button onClick={() => attackPlayer(player.id)}>
Attack {!player.is_connected ? '(Easy Target)' : ''}
</button>
)}
</div>
))}
Q8: RDB vs AOF - Code changes needed?
Short Answer: No code changes required, only Redis configuration.
Redis Persistence Options:
RDB (Snapshotting):
- Periodic snapshots to disk
- Fast restarts, smaller files
- May lose last few seconds of data
AOF (Append-Only File):
- Logs every write operation
- More durable, no data loss
- Slower restarts, larger files
Recommended Configuration (for your use case):
# docker-compose.yml
echoes_redis:
command: |
redis-server
--appendonly yes # Enable AOF
--appendfsync everysec # Sync every second (good balance)
--save 900 1 # RDB backup every 15 min if 1+ key changed
--save 300 10 # RDB backup every 5 min if 10+ keys changed
--save 60 10000 # RDB backup every 1 min if 10k+ keys changed
--maxmemory 512mb # Max memory usage
--maxmemory-policy allkeys-lru # Evict least recently used keys
What This Gives You:
- ✅ AOF for durability: Every write logged (max 1 second data loss)
- ✅ RDB for fast recovery: Snapshots for quick restarts
- ✅ Memory protection: Won't crash if memory full (evicts old caches)
Application Code: No changes needed! Redis handles persistence transparently.
Testing Persistence:
# 1. Add some data
docker exec echoes_of_the_ashes_redis redis-cli SET test:key "hello"
# 2. Restart Redis
docker restart echoes_of_the_ashes_redis
# 3. Check if data persisted
docker exec echoes_of_the_ashes_redis redis-cli GET test:key
# Should return: "hello"
Q9: What if cache invalidation isn't aggressive enough?
Potential Problems:
1. Stale Player Stats
Scenario: Player levels up, but Redis cache shows old level
1. Player gains XP → DB updated (level 6)
2. Redis cache still shows level 5
3. Other players see "Lv. 5" instead of "Lv. 6"
Solution: Invalidate on every stat change
async def update_character_stats(character_id: int, **kwargs):
# Update DB
await db.update_character(character_id, **kwargs)
# Invalidate Redis cache
await redis_manager.delete_player_session(character_id)
# Or update cache directly
session = await redis_manager.get_player_session(character_id)
if session:
session.update(kwargs)
await redis_manager.set_player_session(character_id, session)
2. Ghost Items in Inventory
Scenario: Player drops item, but cache shows they still have it
1. Player drops "Iron Sword"
2. DB updated (inventory row deleted)
3. Redis cache still shows sword in inventory
4. Player sees sword in UI, tries to equip → Error!
Solution: Invalidate inventory cache on add/remove/use
async def remove_item_from_inventory(character_id: int, item_id: str):
# Update DB
await db.delete_inventory_item(character_id, item_id)
# Invalidate cache (force reload next time)
await redis_manager.invalidate_inventory(character_id)
3. Wrong Player Count in Location
Scenario: Player moves, but old location still shows them
1. Player moves overpass → gas_station
2. Redis location registry not updated
3. Other players in overpass still see them
4. Broadcasts sent to wrong location
Solution: Atomic location updates
async def move_player(character_id: int, from_loc: str, to_loc: str):
# Use Redis transaction (atomic)
async with redis_manager.redis_client.pipeline() as pipe:
pipe.srem(f"location:{from_loc}:players", character_id)
pipe.sadd(f"location:{to_loc}:players", character_id)
await pipe.execute()
4. Combat State Desync
Scenario: Combat ends, but cache shows still in combat
1. Player defeats enemy
2. DB: active_combats row deleted
3. Redis: combat cache still exists
4. Player sees combat UI, can't move
Solution: Explicit cache deletion on combat end
async def end_combat(character_id: int):
# Delete from DB
await db.end_combat(character_id)
# Delete Redis cache
await redis_manager.redis_client.delete(f"player:{character_id}:combat")
# Update player session
session = await redis_manager.get_player_session(character_id)
if session:
session['in_combat'] = 'false'
await redis_manager.set_player_session(character_id, session)
General Strategy:
# PATTERN 1: Write-Through Cache (recommended for critical data)
async def update_data(key, value):
await db.update(key, value) # Write to DB first
await redis_manager.cache(key, value) # Update cache immediately
# PATTERN 2: Cache Invalidation (simpler, slight delay)
async def update_data(key, value):
await db.update(key, value) # Write to DB
await redis_manager.delete_cache(key) # Delete cache (reload on next access)
# PATTERN 3: TTL Fallback (for non-critical data)
# Set short TTLs (e.g., 30 seconds) so cache self-expires if not invalidated
await redis_manager.cache(key, value, ttl=30)
For Your Game:
- ✅ Aggressive invalidation for: inventory, combat state, player stats
- ✅ Write-through cache for: player sessions, location registry
- ✅ TTL fallback for: dropped items list, interactable cooldowns
Q10: No feature flags needed (dev only)
Agreed! Since you're the only tester, we can implement directly without feature flags.
Simplified Rollout:
Phase 1: Redis Infrastructure (Week 1)
- Add Redis to docker-compose
- Create redis_manager.py
- Test connection/pub-sub
Phase 2: Pub/Sub Only (Week 2)
- Update ConnectionManager to use Redis pub/sub
- Keep all other logic same (no caching yet)
- Test cross-worker broadcasts
Phase 3: Add Caching (Week 3)
- Add player session cache
- Add inventory cache
- Add combat state cache
- Test performance improvements
Phase 4: Multi-Worker (Week 4)
- Increase workers to 2
- Test load balancing
- Monitor for race conditions
Simplified Implementation (no toggles):
# Just implement Redis directly
async def lifespan(app: FastAPI):
await db.init_db()
await redis_manager.connect() # No if/else, just do it
# ... rest of startup
Updated Implementation Priority
Based on your feedback, here's what we'll actually implement:
Phase 1: Redis Pub/Sub (Core Multi-Worker Support)
Goal: Enable cross-worker broadcasts
Changes:
- Add Redis container
- Create
redis_manager.pywith pub/sub only - Update ConnectionManager:
- Keep local WebSocket storage
- Change
send_personal_message()→ publish to Redis - Change
send_to_location()→ publish to Redis - Add
handle_redis_message()→ send to local WebSockets
- Subscribe to location channels on startup
What We DON'T Cache:
- ❌ Locations (already in memory)
- ❌ Items (already in memory)
- ❌ NPCs (already in memory)
Phase 2: Dynamic State Caching (Performance)
Goal: Reduce database queries for frequently accessed data
What We DO Cache:
- ✅ Player sessions (location, HP, level, stats)
- ✅ Location player registry (Set of character IDs per location)
- ✅ Player inventory (with aggressive invalidation)
- ✅ Active combat state (with explicit deletion)
- ✅ Dropped items per location (with TTL)
Phase 3: Multi-Worker Deployment
Goal: Horizontal scaling
Changes:
- Update docker-compose for 4 workers
- Test load distribution
- Implement distributed background task locks
- Monitor performance
Next Steps
Ready to implement? Here's what I'll do:
- Create
redis_manager.py- Simplified version (no static data caching) - Update
docker-compose.yml- Add Redis container - Update
ConnectionManager- Integrate pub/sub - Update endpoints - Add cache invalidation where needed
- Implement disconnected player - Keep in location, mark as vulnerable
- Test suite - Verify cross-worker communication
Do you want me to proceed with implementation?