444 lines
19 KiB
Python
444 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Load testing script for Echoes of the Ashes API
|
|
Populates database and performs concurrent requests to test performance
|
|
"""
|
|
import asyncio
|
|
import aiohttp
|
|
import time
|
|
import random
|
|
import json
|
|
from typing import List, Dict, Any
|
|
import statistics
|
|
|
|
# Configuration
|
|
API_URL = "https://echoesoftheashgame.patacuack.net/api"
|
|
NUM_USERS = 100 # Number of concurrent users to simulate
|
|
NUM_LOCATIONS = 20 # Number of locations to use
|
|
NUM_ITEMS_PER_LOCATION = 10 # Items dropped per location
|
|
NUM_ENEMIES_PER_LOCATION = 5 # Wandering enemies per location
|
|
REQUESTS_PER_USER = 200 # Number of requests each user will make
|
|
TEST_USERNAME_PREFIX = "loadtest_user_"
|
|
TEST_PASSWORD = "TestPassword123!"
|
|
TEST_STAMINA = 100000 # High stamina for testing
|
|
|
|
class LoadTester:
|
|
def __init__(self):
|
|
self.session: aiohttp.ClientSession = None
|
|
self.tokens: Dict[str, str] = {}
|
|
self.users: List[Dict[str, Any]] = []
|
|
self.locations: List[str] = []
|
|
self.items: List[str] = []
|
|
self.npcs: List[str] = []
|
|
self.results: List[Dict[str, Any]] = []
|
|
|
|
async def setup(self):
|
|
"""Initialize the test environment"""
|
|
self.session = aiohttp.ClientSession()
|
|
await self.load_game_data()
|
|
|
|
async def teardown(self):
|
|
"""Clean up"""
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
async def load_game_data(self):
|
|
"""Load locations, items, and NPCs from game data"""
|
|
print("Loading game data...")
|
|
|
|
# Load locations
|
|
with open('gamedata/locations.json', 'r') as f:
|
|
data = json.load(f)
|
|
locations_data = data.get('locations', [])
|
|
self.locations = [loc['id'] for loc in locations_data if 'id' in loc][:NUM_LOCATIONS]
|
|
|
|
# Load items
|
|
with open('gamedata/items.json', 'r') as f:
|
|
data = json.load(f)
|
|
items_dict = data.get('items', {})
|
|
self.items = [item_id for item_id, item in items_dict.items()
|
|
if item.get('type') in ['weapon', 'consumable', 'resource', 'item']]
|
|
|
|
# Load NPCs
|
|
with open('gamedata/npcs.json', 'r') as f:
|
|
data = json.load(f)
|
|
npcs_dict = data.get('npcs', {})
|
|
self.npcs = [npc_id for npc_id, npc in npcs_dict.items()
|
|
if npc.get('type') == 'hostile']
|
|
|
|
print(f"Loaded {len(self.locations)} locations, {len(self.items)} items, {len(self.npcs)} NPCs")
|
|
|
|
async def create_test_user(self, user_num: int) -> Dict[str, str]:
|
|
"""Create a test user and return credentials"""
|
|
username = f"{TEST_USERNAME_PREFIX}{user_num}"
|
|
|
|
try:
|
|
async with self.session.post(
|
|
f"{API_URL}/auth/register",
|
|
json={"username": username, "password": TEST_PASSWORD}
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
token = data.get('access_token') or data.get('token')
|
|
player_id = data.get('player', {}).get('id') or data.get('user_id')
|
|
return {"username": username, "token": token, "user_id": player_id}
|
|
elif resp.status == 400:
|
|
# User might already exist, try login
|
|
async with self.session.post(
|
|
f"{API_URL}/auth/login",
|
|
json={"username": username, "password": TEST_PASSWORD}
|
|
) as login_resp:
|
|
if login_resp.status == 200:
|
|
data = await login_resp.json()
|
|
token = data.get('access_token') or data.get('token')
|
|
player_id = data.get('player', {}).get('id') or data.get('user_id')
|
|
return {"username": username, "token": token, "user_id": player_id}
|
|
except Exception as e:
|
|
print(f"Error creating user {username}: {e}")
|
|
return None
|
|
|
|
async def populate_database(self):
|
|
"""Populate database with test data"""
|
|
print(f"\nPopulating database with test data...")
|
|
|
|
# Create test users
|
|
print(f"Creating {NUM_USERS} test users...")
|
|
tasks = [self.create_test_user(i) for i in range(NUM_USERS)]
|
|
self.users = [u for u in await asyncio.gather(*tasks) if u is not None]
|
|
print(f"Created {len(self.users)} test users")
|
|
|
|
# Give users high stamina for testing
|
|
if self.users:
|
|
print(f"Setting stamina to {TEST_STAMINA} for all test users...")
|
|
for user in self.users:
|
|
token = user['token']
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
try:
|
|
# Update stamina via direct database call (we'll need to add this endpoint or do it differently)
|
|
# For now, we'll just continue - they'll have default stamina
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
# Populate dropped items via admin endpoint
|
|
print(f"Spawning {NUM_ITEMS_PER_LOCATION * len(self.locations)} dropped items...")
|
|
if self.users:
|
|
token = self.users[0]['token']
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
for location in self.locations:
|
|
for _ in range(NUM_ITEMS_PER_LOCATION):
|
|
item_id = random.choice(self.items)
|
|
quantity = random.randint(1, 10)
|
|
try:
|
|
async with self.session.post(
|
|
f"{API_URL}/admin/drop-item",
|
|
json={
|
|
"item_id": item_id,
|
|
"quantity": quantity,
|
|
"location_id": location
|
|
},
|
|
headers=headers
|
|
) as resp:
|
|
if resp.status != 200:
|
|
pass # Continue even if admin endpoint doesn't exist
|
|
except Exception:
|
|
pass
|
|
|
|
# Spawn wandering enemies via admin endpoint
|
|
print(f"Spawning {NUM_ENEMIES_PER_LOCATION * len(self.locations)} wandering enemies...")
|
|
if self.users and self.npcs:
|
|
token = self.users[0]['token']
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
for location in self.locations:
|
|
for _ in range(NUM_ENEMIES_PER_LOCATION):
|
|
npc_id = random.choice(self.npcs)
|
|
try:
|
|
async with self.session.post(
|
|
f"{API_URL}/admin/spawn-enemy",
|
|
json={
|
|
"npc_id": npc_id,
|
|
"location_id": location
|
|
},
|
|
headers=headers
|
|
) as resp:
|
|
if resp.status != 200:
|
|
pass # Continue even if admin endpoint doesn't exist
|
|
except Exception:
|
|
pass
|
|
|
|
print("Database population complete!\n")
|
|
|
|
async def make_request(self, endpoint: str, method: str = "GET", token: str = None, json_data: dict = None) -> Dict[str, Any]:
|
|
"""Make a single API request and measure performance"""
|
|
headers = {}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
if method == "GET":
|
|
async with self.session.get(f"{API_URL}{endpoint}", headers=headers) as resp:
|
|
status = resp.status
|
|
data = await resp.json() if resp.status == 200 else None
|
|
else:
|
|
async with self.session.post(f"{API_URL}{endpoint}", headers=headers, json=json_data) as resp:
|
|
status = resp.status
|
|
data = await resp.json() if resp.status == 200 else None
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
return {
|
|
"endpoint": endpoint,
|
|
"method": method,
|
|
"status": status,
|
|
"elapsed": elapsed,
|
|
"success": 200 <= status < 300,
|
|
"data": data
|
|
}
|
|
except Exception as e:
|
|
elapsed = time.time() - start_time
|
|
return {
|
|
"endpoint": endpoint,
|
|
"method": method,
|
|
"status": 0,
|
|
"elapsed": elapsed,
|
|
"success": False,
|
|
"error": str(e),
|
|
"data": None
|
|
}
|
|
|
|
async def get_location_data(self, token: str) -> Dict[str, Any]:
|
|
"""Get current location data including available exits and items"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
try:
|
|
async with self.session.get(f"{API_URL}/game/location", headers=headers) as resp:
|
|
if resp.status == 200:
|
|
return await resp.json()
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
async def get_inventory(self, token: str) -> List[Dict[str, Any]]:
|
|
"""Get player's inventory"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
try:
|
|
async with self.session.get(f"{API_URL}/game/inventory", headers=headers) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
# Inventory is returned as an array directly
|
|
if isinstance(data, list):
|
|
return data
|
|
return data.get('inventory', [])
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
async def simulate_user(self, user: Dict[str, str]) -> List[Dict[str, Any]]:
|
|
"""Simulate a single user making intelligent requests"""
|
|
results = []
|
|
token = user['token']
|
|
|
|
for _ in range(REQUESTS_PER_USER):
|
|
# Get current location state
|
|
location_data = await self.get_location_data(token)
|
|
|
|
if not location_data:
|
|
# If we can't get location, just make basic requests
|
|
result = await self.make_request("/game/inventory", "GET", token)
|
|
results.append(result)
|
|
await asyncio.sleep(random.uniform(0.1, 0.3))
|
|
continue
|
|
|
|
# Choose an action weighted by probability
|
|
action_type = random.choices(
|
|
['move', 'check_location', 'check_inventory', 'pickup_item', 'drop_item'],
|
|
weights=[40, 20, 20, 10, 10],
|
|
k=1
|
|
)[0]
|
|
|
|
if action_type == 'move':
|
|
# Move in a valid direction
|
|
directions = location_data.get('directions', [])
|
|
if directions:
|
|
direction = random.choice(directions)
|
|
result = await self.make_request("/game/move", "POST", token, {"direction": direction})
|
|
results.append(result)
|
|
else:
|
|
# No directions, check location instead
|
|
result = await self.make_request("/game/location", "GET", token)
|
|
results.append(result)
|
|
|
|
elif action_type == 'pickup_item':
|
|
# Try to pick up a dropped item if available
|
|
items = location_data.get('items', [])
|
|
if items:
|
|
item = random.choice(items)
|
|
dropped_item_id = item.get('id') # Database ID of dropped_item
|
|
quantity = min(item.get('quantity', 1), random.randint(1, 5))
|
|
result = await self.make_request("/game/pickup", "POST", token, {
|
|
"item_id": dropped_item_id, # This is the dropped_item DB ID
|
|
"quantity": quantity
|
|
})
|
|
results.append(result)
|
|
else:
|
|
# No items to pick up, check inventory instead
|
|
result = await self.make_request("/game/inventory", "GET", token)
|
|
results.append(result)
|
|
|
|
elif action_type == 'drop_item':
|
|
# Try to drop an item from inventory
|
|
inventory = await self.get_inventory(token)
|
|
if inventory:
|
|
item = random.choice(inventory)
|
|
item_id = item.get('item_id')
|
|
quantity = min(item.get('quantity', 1), random.randint(1, 3))
|
|
result = await self.make_request("/game/item/drop", "POST", token, {
|
|
"item_id": item_id,
|
|
"quantity": quantity
|
|
})
|
|
results.append(result)
|
|
else:
|
|
# No items to drop, check location instead
|
|
result = await self.make_request("/game/location", "GET", token)
|
|
results.append(result)
|
|
|
|
elif action_type == 'check_inventory':
|
|
result = await self.make_request("/game/inventory", "GET", token)
|
|
results.append(result)
|
|
|
|
else: # check_location
|
|
result = await self.make_request("/game/location", "GET", token)
|
|
results.append(result)
|
|
|
|
# Tiny delay to prevent connection pool exhaustion
|
|
if random.random() < 0.1: # 10% of requests get a tiny pause
|
|
await asyncio.sleep(0.001)
|
|
|
|
return results
|
|
|
|
async def run_load_test(self):
|
|
"""Run the actual load test"""
|
|
print(f"\n{'='*60}")
|
|
print(f"Starting load test with {len(self.users)} concurrent users")
|
|
print(f"Each user will make {REQUESTS_PER_USER} requests")
|
|
print(f"Total requests: {len(self.users) * REQUESTS_PER_USER}")
|
|
print(f"{'='*60}\n")
|
|
|
|
start_time = time.time()
|
|
|
|
# Run all users concurrently
|
|
tasks = [self.simulate_user(user) for user in self.users]
|
|
all_results = await asyncio.gather(*tasks)
|
|
|
|
# Flatten results
|
|
self.results = [result for user_results in all_results for result in user_results]
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Calculate statistics
|
|
self.print_results(total_time)
|
|
|
|
def print_results(self, total_time: float):
|
|
"""Print detailed test results"""
|
|
print(f"\n{'='*60}")
|
|
print("LOAD TEST RESULTS")
|
|
print(f"{'='*60}\n")
|
|
|
|
total_requests = len(self.results)
|
|
successful = sum(1 for r in self.results if r['success'])
|
|
failed = total_requests - successful
|
|
|
|
print(f"Total Requests: {total_requests}")
|
|
if total_requests > 0:
|
|
print(f"Successful: {successful} ({successful/total_requests*100:.1f}%)")
|
|
print(f"Failed: {failed} ({failed/total_requests*100:.1f}%)")
|
|
|
|
# Performance indicators
|
|
if failed / total_requests > 0.05:
|
|
print(f"⚠️ WARNING: Failure rate above 5% - system may be under stress")
|
|
if failed / total_requests > 0.20:
|
|
print(f"🔴 CRITICAL: Failure rate above 20% - system is degrading")
|
|
else:
|
|
print("No requests were made. Check API connectivity and user creation.")
|
|
print(f"Total Time: {total_time:.2f}s")
|
|
if total_requests > 0 and total_time > 0:
|
|
req_per_sec = total_requests/total_time
|
|
print(f"Requests/Second: {req_per_sec:.2f}")
|
|
if req_per_sec >= 1000:
|
|
print(f"🚀 Target achieved: {req_per_sec:.2f} req/s!")
|
|
elif req_per_sec >= 500:
|
|
print(f"⚡ High throughput: {req_per_sec:.2f} req/s")
|
|
|
|
if successful > 0:
|
|
successful_results = [r for r in self.results if r['success']]
|
|
response_times = [r['elapsed'] for r in successful_results]
|
|
|
|
print(f"\nResponse Time Statistics (successful requests):")
|
|
print(f" Min: {min(response_times)*1000:.2f}ms")
|
|
print(f" Max: {max(response_times)*1000:.2f}ms")
|
|
print(f" Mean: {statistics.mean(response_times)*1000:.2f}ms")
|
|
print(f" Median: {statistics.median(response_times)*1000:.2f}ms")
|
|
print(f" 95th percentile: {statistics.quantiles(response_times, n=20)[18]*1000:.2f}ms")
|
|
print(f" 99th percentile: {statistics.quantiles(response_times, n=100)[98]*1000:.2f}ms")
|
|
|
|
# Breakdown by endpoint
|
|
print(f"\nBreakdown by endpoint:")
|
|
endpoint_stats = {}
|
|
for result in self.results:
|
|
endpoint = result['endpoint']
|
|
method = result.get('method', 'GET')
|
|
key = f"{method} {endpoint}"
|
|
if key not in endpoint_stats:
|
|
endpoint_stats[key] = {'total': 0, 'success': 0, 'times': [], 'errors': []}
|
|
endpoint_stats[key]['total'] += 1
|
|
if result['success']:
|
|
endpoint_stats[key]['success'] += 1
|
|
endpoint_stats[key]['times'].append(result['elapsed'])
|
|
else:
|
|
endpoint_stats[key]['errors'].append(result.get('error', 'Unknown error'))
|
|
|
|
for endpoint, stats in sorted(endpoint_stats.items()):
|
|
success_rate = stats['success'] / stats['total'] * 100
|
|
avg_time = statistics.mean(stats['times']) * 1000 if stats['times'] else 0
|
|
print(f" {endpoint:40s} - {stats['total']:4d} requests, {success_rate:5.1f}% success, {avg_time:6.2f}ms avg")
|
|
if stats['errors'] and len(stats['errors']) <= 3:
|
|
for error in stats['errors'][:3]:
|
|
print(f" └─ Error: {error}")
|
|
|
|
print(f"\n{'='*60}\n")
|
|
|
|
async def cleanup_test_data(self):
|
|
"""Clean up test users (optional)"""
|
|
print("\nCleaning up test users...")
|
|
# Note: You'd need to implement a cleanup endpoint or do this via database directly
|
|
print("Cleanup complete (test users remain in database)")
|
|
|
|
|
|
async def main():
|
|
"""Main entry point"""
|
|
tester = LoadTester()
|
|
|
|
try:
|
|
await tester.setup()
|
|
await tester.populate_database()
|
|
await tester.run_load_test()
|
|
|
|
# Optional: cleanup
|
|
# await tester.cleanup_test_data()
|
|
|
|
finally:
|
|
await tester.teardown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("""
|
|
╔═══════════════════════════════════════════════════════════════╗
|
|
║ ECHOES OF THE ASHES - API LOAD TESTING TOOL ║
|
|
╚═══════════════════════════════════════════════════════════════╝
|
|
""")
|
|
|
|
asyncio.run(main())
|