Files
echoes-of-the-ash/tests/load_test.py
2025-11-07 15:27:13 +01:00

444 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Load testing script for Echoes of the Ashes API
Populates database and performs concurrent requests to test performance
"""
import asyncio
import aiohttp
import time
import random
import json
from typing import List, Dict, Any
import statistics
# Configuration
API_URL = "https://echoesoftheashgame.patacuack.net/api"
NUM_USERS = 100 # Number of concurrent users to simulate
NUM_LOCATIONS = 20 # Number of locations to use
NUM_ITEMS_PER_LOCATION = 10 # Items dropped per location
NUM_ENEMIES_PER_LOCATION = 5 # Wandering enemies per location
REQUESTS_PER_USER = 200 # Number of requests each user will make
TEST_USERNAME_PREFIX = "loadtest_user_"
TEST_PASSWORD = "TestPassword123!"
TEST_STAMINA = 100000 # High stamina for testing
class LoadTester:
def __init__(self):
self.session: aiohttp.ClientSession = None
self.tokens: Dict[str, str] = {}
self.users: List[Dict[str, Any]] = []
self.locations: List[str] = []
self.items: List[str] = []
self.npcs: List[str] = []
self.results: List[Dict[str, Any]] = []
async def setup(self):
"""Initialize the test environment"""
self.session = aiohttp.ClientSession()
await self.load_game_data()
async def teardown(self):
"""Clean up"""
if self.session:
await self.session.close()
async def load_game_data(self):
"""Load locations, items, and NPCs from game data"""
print("Loading game data...")
# Load locations
with open('gamedata/locations.json', 'r') as f:
data = json.load(f)
locations_data = data.get('locations', [])
self.locations = [loc['id'] for loc in locations_data if 'id' in loc][:NUM_LOCATIONS]
# Load items
with open('gamedata/items.json', 'r') as f:
data = json.load(f)
items_dict = data.get('items', {})
self.items = [item_id for item_id, item in items_dict.items()
if item.get('type') in ['weapon', 'consumable', 'resource', 'item']]
# Load NPCs
with open('gamedata/npcs.json', 'r') as f:
data = json.load(f)
npcs_dict = data.get('npcs', {})
self.npcs = [npc_id for npc_id, npc in npcs_dict.items()
if npc.get('type') == 'hostile']
print(f"Loaded {len(self.locations)} locations, {len(self.items)} items, {len(self.npcs)} NPCs")
async def create_test_user(self, user_num: int) -> Dict[str, str]:
"""Create a test user and return credentials"""
username = f"{TEST_USERNAME_PREFIX}{user_num}"
try:
async with self.session.post(
f"{API_URL}/auth/register",
json={"username": username, "password": TEST_PASSWORD}
) as resp:
if resp.status == 200:
data = await resp.json()
token = data.get('access_token') or data.get('token')
player_id = data.get('player', {}).get('id') or data.get('user_id')
return {"username": username, "token": token, "user_id": player_id}
elif resp.status == 400:
# User might already exist, try login
async with self.session.post(
f"{API_URL}/auth/login",
json={"username": username, "password": TEST_PASSWORD}
) as login_resp:
if login_resp.status == 200:
data = await login_resp.json()
token = data.get('access_token') or data.get('token')
player_id = data.get('player', {}).get('id') or data.get('user_id')
return {"username": username, "token": token, "user_id": player_id}
except Exception as e:
print(f"Error creating user {username}: {e}")
return None
async def populate_database(self):
"""Populate database with test data"""
print(f"\nPopulating database with test data...")
# Create test users
print(f"Creating {NUM_USERS} test users...")
tasks = [self.create_test_user(i) for i in range(NUM_USERS)]
self.users = [u for u in await asyncio.gather(*tasks) if u is not None]
print(f"Created {len(self.users)} test users")
# Give users high stamina for testing
if self.users:
print(f"Setting stamina to {TEST_STAMINA} for all test users...")
for user in self.users:
token = user['token']
headers = {"Authorization": f"Bearer {token}"}
try:
# Update stamina via direct database call (we'll need to add this endpoint or do it differently)
# For now, we'll just continue - they'll have default stamina
pass
except Exception:
pass
# Populate dropped items via admin endpoint
print(f"Spawning {NUM_ITEMS_PER_LOCATION * len(self.locations)} dropped items...")
if self.users:
token = self.users[0]['token']
headers = {"Authorization": f"Bearer {token}"}
for location in self.locations:
for _ in range(NUM_ITEMS_PER_LOCATION):
item_id = random.choice(self.items)
quantity = random.randint(1, 10)
try:
async with self.session.post(
f"{API_URL}/admin/drop-item",
json={
"item_id": item_id,
"quantity": quantity,
"location_id": location
},
headers=headers
) as resp:
if resp.status != 200:
pass # Continue even if admin endpoint doesn't exist
except Exception:
pass
# Spawn wandering enemies via admin endpoint
print(f"Spawning {NUM_ENEMIES_PER_LOCATION * len(self.locations)} wandering enemies...")
if self.users and self.npcs:
token = self.users[0]['token']
headers = {"Authorization": f"Bearer {token}"}
for location in self.locations:
for _ in range(NUM_ENEMIES_PER_LOCATION):
npc_id = random.choice(self.npcs)
try:
async with self.session.post(
f"{API_URL}/admin/spawn-enemy",
json={
"npc_id": npc_id,
"location_id": location
},
headers=headers
) as resp:
if resp.status != 200:
pass # Continue even if admin endpoint doesn't exist
except Exception:
pass
print("Database population complete!\n")
async def make_request(self, endpoint: str, method: str = "GET", token: str = None, json_data: dict = None) -> Dict[str, Any]:
"""Make a single API request and measure performance"""
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
start_time = time.time()
try:
if method == "GET":
async with self.session.get(f"{API_URL}{endpoint}", headers=headers) as resp:
status = resp.status
data = await resp.json() if resp.status == 200 else None
else:
async with self.session.post(f"{API_URL}{endpoint}", headers=headers, json=json_data) as resp:
status = resp.status
data = await resp.json() if resp.status == 200 else None
elapsed = time.time() - start_time
return {
"endpoint": endpoint,
"method": method,
"status": status,
"elapsed": elapsed,
"success": 200 <= status < 300,
"data": data
}
except Exception as e:
elapsed = time.time() - start_time
return {
"endpoint": endpoint,
"method": method,
"status": 0,
"elapsed": elapsed,
"success": False,
"error": str(e),
"data": None
}
async def get_location_data(self, token: str) -> Dict[str, Any]:
"""Get current location data including available exits and items"""
headers = {"Authorization": f"Bearer {token}"}
try:
async with self.session.get(f"{API_URL}/game/location", headers=headers) as resp:
if resp.status == 200:
return await resp.json()
except Exception:
pass
return None
async def get_inventory(self, token: str) -> List[Dict[str, Any]]:
"""Get player's inventory"""
headers = {"Authorization": f"Bearer {token}"}
try:
async with self.session.get(f"{API_URL}/game/inventory", headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
# Inventory is returned as an array directly
if isinstance(data, list):
return data
return data.get('inventory', [])
except Exception:
pass
return []
async def simulate_user(self, user: Dict[str, str]) -> List[Dict[str, Any]]:
"""Simulate a single user making intelligent requests"""
results = []
token = user['token']
for _ in range(REQUESTS_PER_USER):
# Get current location state
location_data = await self.get_location_data(token)
if not location_data:
# If we can't get location, just make basic requests
result = await self.make_request("/game/inventory", "GET", token)
results.append(result)
await asyncio.sleep(random.uniform(0.1, 0.3))
continue
# Choose an action weighted by probability
action_type = random.choices(
['move', 'check_location', 'check_inventory', 'pickup_item', 'drop_item'],
weights=[40, 20, 20, 10, 10],
k=1
)[0]
if action_type == 'move':
# Move in a valid direction
directions = location_data.get('directions', [])
if directions:
direction = random.choice(directions)
result = await self.make_request("/game/move", "POST", token, {"direction": direction})
results.append(result)
else:
# No directions, check location instead
result = await self.make_request("/game/location", "GET", token)
results.append(result)
elif action_type == 'pickup_item':
# Try to pick up a dropped item if available
items = location_data.get('items', [])
if items:
item = random.choice(items)
dropped_item_id = item.get('id') # Database ID of dropped_item
quantity = min(item.get('quantity', 1), random.randint(1, 5))
result = await self.make_request("/game/pickup", "POST", token, {
"item_id": dropped_item_id, # This is the dropped_item DB ID
"quantity": quantity
})
results.append(result)
else:
# No items to pick up, check inventory instead
result = await self.make_request("/game/inventory", "GET", token)
results.append(result)
elif action_type == 'drop_item':
# Try to drop an item from inventory
inventory = await self.get_inventory(token)
if inventory:
item = random.choice(inventory)
item_id = item.get('item_id')
quantity = min(item.get('quantity', 1), random.randint(1, 3))
result = await self.make_request("/game/item/drop", "POST", token, {
"item_id": item_id,
"quantity": quantity
})
results.append(result)
else:
# No items to drop, check location instead
result = await self.make_request("/game/location", "GET", token)
results.append(result)
elif action_type == 'check_inventory':
result = await self.make_request("/game/inventory", "GET", token)
results.append(result)
else: # check_location
result = await self.make_request("/game/location", "GET", token)
results.append(result)
# Tiny delay to prevent connection pool exhaustion
if random.random() < 0.1: # 10% of requests get a tiny pause
await asyncio.sleep(0.001)
return results
async def run_load_test(self):
"""Run the actual load test"""
print(f"\n{'='*60}")
print(f"Starting load test with {len(self.users)} concurrent users")
print(f"Each user will make {REQUESTS_PER_USER} requests")
print(f"Total requests: {len(self.users) * REQUESTS_PER_USER}")
print(f"{'='*60}\n")
start_time = time.time()
# Run all users concurrently
tasks = [self.simulate_user(user) for user in self.users]
all_results = await asyncio.gather(*tasks)
# Flatten results
self.results = [result for user_results in all_results for result in user_results]
total_time = time.time() - start_time
# Calculate statistics
self.print_results(total_time)
def print_results(self, total_time: float):
"""Print detailed test results"""
print(f"\n{'='*60}")
print("LOAD TEST RESULTS")
print(f"{'='*60}\n")
total_requests = len(self.results)
successful = sum(1 for r in self.results if r['success'])
failed = total_requests - successful
print(f"Total Requests: {total_requests}")
if total_requests > 0:
print(f"Successful: {successful} ({successful/total_requests*100:.1f}%)")
print(f"Failed: {failed} ({failed/total_requests*100:.1f}%)")
# Performance indicators
if failed / total_requests > 0.05:
print(f"⚠️ WARNING: Failure rate above 5% - system may be under stress")
if failed / total_requests > 0.20:
print(f"🔴 CRITICAL: Failure rate above 20% - system is degrading")
else:
print("No requests were made. Check API connectivity and user creation.")
print(f"Total Time: {total_time:.2f}s")
if total_requests > 0 and total_time > 0:
req_per_sec = total_requests/total_time
print(f"Requests/Second: {req_per_sec:.2f}")
if req_per_sec >= 1000:
print(f"🚀 Target achieved: {req_per_sec:.2f} req/s!")
elif req_per_sec >= 500:
print(f"⚡ High throughput: {req_per_sec:.2f} req/s")
if successful > 0:
successful_results = [r for r in self.results if r['success']]
response_times = [r['elapsed'] for r in successful_results]
print(f"\nResponse Time Statistics (successful requests):")
print(f" Min: {min(response_times)*1000:.2f}ms")
print(f" Max: {max(response_times)*1000:.2f}ms")
print(f" Mean: {statistics.mean(response_times)*1000:.2f}ms")
print(f" Median: {statistics.median(response_times)*1000:.2f}ms")
print(f" 95th percentile: {statistics.quantiles(response_times, n=20)[18]*1000:.2f}ms")
print(f" 99th percentile: {statistics.quantiles(response_times, n=100)[98]*1000:.2f}ms")
# Breakdown by endpoint
print(f"\nBreakdown by endpoint:")
endpoint_stats = {}
for result in self.results:
endpoint = result['endpoint']
method = result.get('method', 'GET')
key = f"{method} {endpoint}"
if key not in endpoint_stats:
endpoint_stats[key] = {'total': 0, 'success': 0, 'times': [], 'errors': []}
endpoint_stats[key]['total'] += 1
if result['success']:
endpoint_stats[key]['success'] += 1
endpoint_stats[key]['times'].append(result['elapsed'])
else:
endpoint_stats[key]['errors'].append(result.get('error', 'Unknown error'))
for endpoint, stats in sorted(endpoint_stats.items()):
success_rate = stats['success'] / stats['total'] * 100
avg_time = statistics.mean(stats['times']) * 1000 if stats['times'] else 0
print(f" {endpoint:40s} - {stats['total']:4d} requests, {success_rate:5.1f}% success, {avg_time:6.2f}ms avg")
if stats['errors'] and len(stats['errors']) <= 3:
for error in stats['errors'][:3]:
print(f" └─ Error: {error}")
print(f"\n{'='*60}\n")
async def cleanup_test_data(self):
"""Clean up test users (optional)"""
print("\nCleaning up test users...")
# Note: You'd need to implement a cleanup endpoint or do this via database directly
print("Cleanup complete (test users remain in database)")
async def main():
"""Main entry point"""
tester = LoadTester()
try:
await tester.setup()
await tester.populate_database()
await tester.run_load_test()
# Optional: cleanup
# await tester.cleanup_test_data()
finally:
await tester.teardown()
if __name__ == "__main__":
print("""
╔═══════════════════════════════════════════════════════════════╗
║ ECHOES OF THE ASHES - API LOAD TESTING TOOL ║
╚═══════════════════════════════════════════════════════════════╝
""")
asyncio.run(main())