#!/usr/bin/env python3 """ Load testing script for Echoes of the Ashes API Populates database and performs concurrent requests to test performance """ import asyncio import aiohttp import time import random import json from typing import List, Dict, Any import statistics # Configuration API_URL = "https://echoesoftheashgame.patacuack.net/api" NUM_USERS = 100 # Number of concurrent users to simulate NUM_LOCATIONS = 20 # Number of locations to use NUM_ITEMS_PER_LOCATION = 10 # Items dropped per location NUM_ENEMIES_PER_LOCATION = 5 # Wandering enemies per location REQUESTS_PER_USER = 200 # Number of requests each user will make TEST_USERNAME_PREFIX = "loadtest_user_" TEST_PASSWORD = "TestPassword123!" TEST_STAMINA = 100000 # High stamina for testing class LoadTester: def __init__(self): self.session: aiohttp.ClientSession = None self.tokens: Dict[str, str] = {} self.users: List[Dict[str, Any]] = [] self.locations: List[str] = [] self.items: List[str] = [] self.npcs: List[str] = [] self.results: List[Dict[str, Any]] = [] async def setup(self): """Initialize the test environment""" self.session = aiohttp.ClientSession() await self.load_game_data() async def teardown(self): """Clean up""" if self.session: await self.session.close() async def load_game_data(self): """Load locations, items, and NPCs from game data""" print("Loading game data...") # Load locations with open('gamedata/locations.json', 'r') as f: data = json.load(f) locations_data = data.get('locations', []) self.locations = [loc['id'] for loc in locations_data if 'id' in loc][:NUM_LOCATIONS] # Load items with open('gamedata/items.json', 'r') as f: data = json.load(f) items_dict = data.get('items', {}) self.items = [item_id for item_id, item in items_dict.items() if item.get('type') in ['weapon', 'consumable', 'resource', 'item']] # Load NPCs with open('gamedata/npcs.json', 'r') as f: data = json.load(f) npcs_dict = data.get('npcs', {}) self.npcs = [npc_id for npc_id, npc in npcs_dict.items() if npc.get('type') == 'hostile'] print(f"Loaded {len(self.locations)} locations, {len(self.items)} items, {len(self.npcs)} NPCs") async def create_test_user(self, user_num: int) -> Dict[str, str]: """Create a test user and return credentials""" username = f"{TEST_USERNAME_PREFIX}{user_num}" try: async with self.session.post( f"{API_URL}/auth/register", json={"username": username, "password": TEST_PASSWORD} ) as resp: if resp.status == 200: data = await resp.json() token = data.get('access_token') or data.get('token') player_id = data.get('player', {}).get('id') or data.get('user_id') return {"username": username, "token": token, "user_id": player_id} elif resp.status == 400: # User might already exist, try login async with self.session.post( f"{API_URL}/auth/login", json={"username": username, "password": TEST_PASSWORD} ) as login_resp: if login_resp.status == 200: data = await login_resp.json() token = data.get('access_token') or data.get('token') player_id = data.get('player', {}).get('id') or data.get('user_id') return {"username": username, "token": token, "user_id": player_id} except Exception as e: print(f"Error creating user {username}: {e}") return None async def populate_database(self): """Populate database with test data""" print(f"\nPopulating database with test data...") # Create test users print(f"Creating {NUM_USERS} test users...") tasks = [self.create_test_user(i) for i in range(NUM_USERS)] self.users = [u for u in await asyncio.gather(*tasks) if u is not None] print(f"Created {len(self.users)} test users") # Give users high stamina for testing if self.users: print(f"Setting stamina to {TEST_STAMINA} for all test users...") for user in self.users: token = user['token'] headers = {"Authorization": f"Bearer {token}"} try: # Update stamina via direct database call (we'll need to add this endpoint or do it differently) # For now, we'll just continue - they'll have default stamina pass except Exception: pass # Populate dropped items via admin endpoint print(f"Spawning {NUM_ITEMS_PER_LOCATION * len(self.locations)} dropped items...") if self.users: token = self.users[0]['token'] headers = {"Authorization": f"Bearer {token}"} for location in self.locations: for _ in range(NUM_ITEMS_PER_LOCATION): item_id = random.choice(self.items) quantity = random.randint(1, 10) try: async with self.session.post( f"{API_URL}/admin/drop-item", json={ "item_id": item_id, "quantity": quantity, "location_id": location }, headers=headers ) as resp: if resp.status != 200: pass # Continue even if admin endpoint doesn't exist except Exception: pass # Spawn wandering enemies via admin endpoint print(f"Spawning {NUM_ENEMIES_PER_LOCATION * len(self.locations)} wandering enemies...") if self.users and self.npcs: token = self.users[0]['token'] headers = {"Authorization": f"Bearer {token}"} for location in self.locations: for _ in range(NUM_ENEMIES_PER_LOCATION): npc_id = random.choice(self.npcs) try: async with self.session.post( f"{API_URL}/admin/spawn-enemy", json={ "npc_id": npc_id, "location_id": location }, headers=headers ) as resp: if resp.status != 200: pass # Continue even if admin endpoint doesn't exist except Exception: pass print("Database population complete!\n") async def make_request(self, endpoint: str, method: str = "GET", token: str = None, json_data: dict = None) -> Dict[str, Any]: """Make a single API request and measure performance""" headers = {} if token: headers["Authorization"] = f"Bearer {token}" start_time = time.time() try: if method == "GET": async with self.session.get(f"{API_URL}{endpoint}", headers=headers) as resp: status = resp.status data = await resp.json() if resp.status == 200 else None else: async with self.session.post(f"{API_URL}{endpoint}", headers=headers, json=json_data) as resp: status = resp.status data = await resp.json() if resp.status == 200 else None elapsed = time.time() - start_time return { "endpoint": endpoint, "method": method, "status": status, "elapsed": elapsed, "success": 200 <= status < 300, "data": data } except Exception as e: elapsed = time.time() - start_time return { "endpoint": endpoint, "method": method, "status": 0, "elapsed": elapsed, "success": False, "error": str(e), "data": None } async def get_location_data(self, token: str) -> Dict[str, Any]: """Get current location data including available exits and items""" headers = {"Authorization": f"Bearer {token}"} try: async with self.session.get(f"{API_URL}/game/location", headers=headers) as resp: if resp.status == 200: return await resp.json() except Exception: pass return None async def get_inventory(self, token: str) -> List[Dict[str, Any]]: """Get player's inventory""" headers = {"Authorization": f"Bearer {token}"} try: async with self.session.get(f"{API_URL}/game/inventory", headers=headers) as resp: if resp.status == 200: data = await resp.json() # Inventory is returned as an array directly if isinstance(data, list): return data return data.get('inventory', []) except Exception: pass return [] async def simulate_user(self, user: Dict[str, str]) -> List[Dict[str, Any]]: """Simulate a single user making intelligent requests""" results = [] token = user['token'] for _ in range(REQUESTS_PER_USER): # Get current location state location_data = await self.get_location_data(token) if not location_data: # If we can't get location, just make basic requests result = await self.make_request("/game/inventory", "GET", token) results.append(result) await asyncio.sleep(random.uniform(0.1, 0.3)) continue # Choose an action weighted by probability action_type = random.choices( ['move', 'check_location', 'check_inventory', 'pickup_item', 'drop_item'], weights=[40, 20, 20, 10, 10], k=1 )[0] if action_type == 'move': # Move in a valid direction directions = location_data.get('directions', []) if directions: direction = random.choice(directions) result = await self.make_request("/game/move", "POST", token, {"direction": direction}) results.append(result) else: # No directions, check location instead result = await self.make_request("/game/location", "GET", token) results.append(result) elif action_type == 'pickup_item': # Try to pick up a dropped item if available items = location_data.get('items', []) if items: item = random.choice(items) dropped_item_id = item.get('id') # Database ID of dropped_item quantity = min(item.get('quantity', 1), random.randint(1, 5)) result = await self.make_request("/game/pickup", "POST", token, { "item_id": dropped_item_id, # This is the dropped_item DB ID "quantity": quantity }) results.append(result) else: # No items to pick up, check inventory instead result = await self.make_request("/game/inventory", "GET", token) results.append(result) elif action_type == 'drop_item': # Try to drop an item from inventory inventory = await self.get_inventory(token) if inventory: item = random.choice(inventory) item_id = item.get('item_id') quantity = min(item.get('quantity', 1), random.randint(1, 3)) result = await self.make_request("/game/item/drop", "POST", token, { "item_id": item_id, "quantity": quantity }) results.append(result) else: # No items to drop, check location instead result = await self.make_request("/game/location", "GET", token) results.append(result) elif action_type == 'check_inventory': result = await self.make_request("/game/inventory", "GET", token) results.append(result) else: # check_location result = await self.make_request("/game/location", "GET", token) results.append(result) # Tiny delay to prevent connection pool exhaustion if random.random() < 0.1: # 10% of requests get a tiny pause await asyncio.sleep(0.001) return results async def run_load_test(self): """Run the actual load test""" print(f"\n{'='*60}") print(f"Starting load test with {len(self.users)} concurrent users") print(f"Each user will make {REQUESTS_PER_USER} requests") print(f"Total requests: {len(self.users) * REQUESTS_PER_USER}") print(f"{'='*60}\n") start_time = time.time() # Run all users concurrently tasks = [self.simulate_user(user) for user in self.users] all_results = await asyncio.gather(*tasks) # Flatten results self.results = [result for user_results in all_results for result in user_results] total_time = time.time() - start_time # Calculate statistics self.print_results(total_time) def print_results(self, total_time: float): """Print detailed test results""" print(f"\n{'='*60}") print("LOAD TEST RESULTS") print(f"{'='*60}\n") total_requests = len(self.results) successful = sum(1 for r in self.results if r['success']) failed = total_requests - successful print(f"Total Requests: {total_requests}") if total_requests > 0: print(f"Successful: {successful} ({successful/total_requests*100:.1f}%)") print(f"Failed: {failed} ({failed/total_requests*100:.1f}%)") # Performance indicators if failed / total_requests > 0.05: print(f"⚠️ WARNING: Failure rate above 5% - system may be under stress") if failed / total_requests > 0.20: print(f"🔴 CRITICAL: Failure rate above 20% - system is degrading") else: print("No requests were made. Check API connectivity and user creation.") print(f"Total Time: {total_time:.2f}s") if total_requests > 0 and total_time > 0: req_per_sec = total_requests/total_time print(f"Requests/Second: {req_per_sec:.2f}") if req_per_sec >= 1000: print(f"🚀 Target achieved: {req_per_sec:.2f} req/s!") elif req_per_sec >= 500: print(f"⚡ High throughput: {req_per_sec:.2f} req/s") if successful > 0: successful_results = [r for r in self.results if r['success']] response_times = [r['elapsed'] for r in successful_results] print(f"\nResponse Time Statistics (successful requests):") print(f" Min: {min(response_times)*1000:.2f}ms") print(f" Max: {max(response_times)*1000:.2f}ms") print(f" Mean: {statistics.mean(response_times)*1000:.2f}ms") print(f" Median: {statistics.median(response_times)*1000:.2f}ms") print(f" 95th percentile: {statistics.quantiles(response_times, n=20)[18]*1000:.2f}ms") print(f" 99th percentile: {statistics.quantiles(response_times, n=100)[98]*1000:.2f}ms") # Breakdown by endpoint print(f"\nBreakdown by endpoint:") endpoint_stats = {} for result in self.results: endpoint = result['endpoint'] method = result.get('method', 'GET') key = f"{method} {endpoint}" if key not in endpoint_stats: endpoint_stats[key] = {'total': 0, 'success': 0, 'times': [], 'errors': []} endpoint_stats[key]['total'] += 1 if result['success']: endpoint_stats[key]['success'] += 1 endpoint_stats[key]['times'].append(result['elapsed']) else: endpoint_stats[key]['errors'].append(result.get('error', 'Unknown error')) for endpoint, stats in sorted(endpoint_stats.items()): success_rate = stats['success'] / stats['total'] * 100 avg_time = statistics.mean(stats['times']) * 1000 if stats['times'] else 0 print(f" {endpoint:40s} - {stats['total']:4d} requests, {success_rate:5.1f}% success, {avg_time:6.2f}ms avg") if stats['errors'] and len(stats['errors']) <= 3: for error in stats['errors'][:3]: print(f" └─ Error: {error}") print(f"\n{'='*60}\n") async def cleanup_test_data(self): """Clean up test users (optional)""" print("\nCleaning up test users...") # Note: You'd need to implement a cleanup endpoint or do this via database directly print("Cleanup complete (test users remain in database)") async def main(): """Main entry point""" tester = LoadTester() try: await tester.setup() await tester.populate_database() await tester.run_load_test() # Optional: cleanup # await tester.cleanup_test_data() finally: await tester.teardown() if __name__ == "__main__": print(""" ╔═══════════════════════════════════════════════════════════════╗ ║ ECHOES OF THE ASHES - API LOAD TESTING TOOL ║ ╚═══════════════════════════════════════════════════════════════╝ """) asyncio.run(main())