#!/usr/bin/env python3 """ Performance testing script for background tasks. Generates realistic test data (players, combats, items, corpses) and measures the actual performance of background tasks. Usage: # Generate test data and run performance tests python test_performance.py --players 1000 --combats 100 # Just run tests on existing data python test_performance.py --test-only # Clean up test data python test_performance.py --cleanup """ import asyncio import argparse import time import random import sys from typing import List, Dict from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) from dotenv import load_dotenv from sqlalchemy import text from bot.database import engine class PerformanceTester: """Test harness for background task performance.""" def __init__(self): self.test_player_ids = [] self.test_combat_ids = [] async def generate_test_players(self, count: int) -> List[int]: """Generate test player accounts.""" print(f"๐ŸŽฎ Generating {count} test players...") async with engine.begin() as conn: # Generate players with varying stats players = [] for i in range(count): telegram_id = 900000000 + i # Use high IDs to avoid conflicts (under INT max) # Varying stats for realistic distribution level = random.randint(1, 20) stamina = random.randint(0, 50) # Some need regeneration max_stamina = 50 + (level * 2) player = { 'telegram_id': telegram_id, 'name': f'TestPlayer{i}', 'location_id': 'location_001', 'hp': random.randint(50, 100), 'max_hp': 100 + (level * 10), 'stamina': stamina, 'max_stamina': max_stamina, 'level': level, 'xp': level * 100, 'strength': random.randint(5, 20), 'agility': random.randint(5, 20), 'endurance': random.randint(5, 20), 'is_dead': False } players.append(player) self.test_player_ids.append(telegram_id) # Batch insert await conn.execute( text(""" INSERT INTO players ( telegram_id, name, location_id, hp, max_hp, stamina, max_stamina, level, xp, strength, agility, endurance, is_dead ) VALUES ( :telegram_id, :name, :location_id, :hp, :max_hp, :stamina, :max_stamina, :level, :xp, :strength, :agility, :endurance, :is_dead ) ON CONFLICT (telegram_id) DO NOTHING """), players ) print(f" โœ… Created {count} test players (IDs: {telegram_id - count + 1} - {telegram_id})") return self.test_player_ids async def generate_test_combats(self, count: int) -> List[int]: """Generate active combat sessions.""" if not self.test_player_ids: print(" โš ๏ธ No test players found, skipping combat generation") return [] print(f"โš”๏ธ Generating {count} test combats...") # Use a subset of players for combats combat_players = random.sample( self.test_player_ids, min(count, len(self.test_player_ids)) ) async with engine.begin() as conn: combats = [] for player_id in combat_players[:count]: # Some combats are idle (for testing auto-timeout) is_idle = random.random() < 0.3 # 30% idle turn_started_at = time.time() if is_idle: turn_started_at -= 400 # 6+ minutes ago (idle) combat = { 'player_id': player_id, 'npc_id': random.choice(['npc_feral_dog', 'npc_raider', 'npc_zombie']), 'npc_hp': random.randint(20, 50), 'npc_max_hp': 50, 'turn': random.choice(['player', 'npc']), 'turn_started_at': turn_started_at, 'location_id': 'location_001', 'player_status_effects': '[]', 'npc_status_effects': '[]', 'from_wandering_enemy': False } combats.append(combat) self.test_combat_ids.append(player_id) await conn.execute( text(""" INSERT INTO active_combats ( player_id, npc_id, npc_hp, npc_max_hp, turn, turn_started_at, location_id, player_status_effects, npc_status_effects, from_wandering_enemy ) VALUES ( :player_id, :npc_id, :npc_hp, :npc_max_hp, :turn, :turn_started_at, :location_id, :player_status_effects, :npc_status_effects, :from_wandering_enemy ) ON CONFLICT (player_id) DO UPDATE SET turn_started_at = EXCLUDED.turn_started_at """), combats ) idle_count = sum(1 for c in combats if c['turn_started_at'] < time.time() - 300) print(f" โœ… Created {count} combats ({idle_count} idle for timeout testing)") return self.test_combat_ids async def generate_test_items(self, count: int): """Generate dropped items.""" print(f"๐Ÿ“ฆ Generating {count} dropped items...") async with engine.begin() as conn: items = [] for i in range(count): # Some items are expired (for testing decay) is_expired = random.random() < 0.3 # 30% expired drop_timestamp = time.time() if is_expired: drop_timestamp -= 7200 # 2 hours ago (expired) item = { 'item_id': random.choice(['item_bandage', 'item_water', 'item_canned_food']), 'location_id': 'location_001', 'drop_timestamp': drop_timestamp, 'quantity': random.randint(1, 3) } items.append(item) await conn.execute( text(""" INSERT INTO dropped_items ( item_id, location_id, drop_timestamp, quantity ) VALUES ( :item_id, :location_id, :drop_timestamp, :quantity ) """), items ) expired_count = sum(1 for i in items if i['drop_timestamp'] < time.time() - 3600) print(f" โœ… Created {count} items ({expired_count} expired for decay testing)") async def generate_test_corpses(self, player_count: int, npc_count: int): """Generate player and NPC corpses.""" if player_count == 0 and npc_count == 0: print(f"๐Ÿ’€ Skipping corpse generation") return print(f"๐Ÿ’€ Generating {player_count} player corpses and {npc_count} NPC corpses...") async with engine.begin() as conn: # Player corpses player_corpses = [] for i in range(player_count): is_expired = random.random() < 0.3 # 30% expired death_timestamp = time.time() if is_expired: death_timestamp -= 86400 * 2 # 2 days ago (expired) corpse = { 'player_name': f'DeadPlayer{i}', 'location_id': 'location_001', 'items': '[]', 'death_timestamp': death_timestamp } player_corpses.append(corpse) if player_corpses: # Only execute if we have corpses await conn.execute( text(""" INSERT INTO player_corpses ( player_name, location_id, items, death_timestamp ) VALUES ( :player_name, :location_id, :items, :death_timestamp ) """), player_corpses ) # NPC corpses (don't have items column) npc_corpses = [] for i in range(npc_count): is_expired = random.random() < 0.5 # 50% expired death_timestamp = time.time() if is_expired: death_timestamp -= 7200 * 2 # 4 hours ago (expired) corpse = { 'npc_id': random.choice(['npc_feral_dog', 'npc_raider', 'npc_zombie']), 'location_id': 'location_001', 'death_timestamp': death_timestamp } npc_corpses.append(corpse) if npc_corpses: # Only execute if we have corpses await conn.execute( text(""" INSERT INTO npc_corpses ( npc_id, location_id, death_timestamp ) VALUES ( :npc_id, :location_id, :death_timestamp ) """), npc_corpses ) print(f" โœ… Created {player_count} player corpses and {npc_count} NPC corpses") async def test_stamina_regeneration(self) -> Dict: """Test stamina regeneration performance.""" print("\n๐Ÿ”‹ Testing stamina regeneration...") from bot import database # Get baseline async with engine.connect() as conn: result = await conn.execute( text("SELECT COUNT(*) FROM players WHERE is_dead = FALSE AND stamina < max_stamina") ) eligible_count = result.scalar() print(f" ๐Ÿ“Š Eligible players: {eligible_count}") # Time the regeneration start_time = time.time() updated_count = await database.regenerate_all_players_stamina() elapsed = time.time() - start_time print(f" โœ… Updated {updated_count} players in {elapsed:.3f}s") # Performance rating if elapsed < 0.5: rating = "๐ŸŸข EXCELLENT" elif elapsed < 2.0: rating = "๐ŸŸก GOOD" elif elapsed < 5.0: rating = "๐ŸŸ  ACCEPTABLE" else: rating = "๐Ÿ”ด SLOW - NEEDS OPTIMIZATION" print(f" {rating}") return { 'eligible_players': eligible_count, 'updated_players': updated_count, 'elapsed_seconds': elapsed, 'players_per_second': updated_count / elapsed if elapsed > 0 else 0 } async def test_combat_timers(self) -> Dict: """Test combat timer check performance.""" print("\nโš”๏ธ Testing combat timer checks...") from bot import database # Get idle combats idle_threshold = time.time() - 300 start_time = time.time() idle_combats = await database.get_all_idle_combats(idle_threshold) elapsed = time.time() - start_time print(f" ๐Ÿ“Š Found {len(idle_combats)} idle combats in {elapsed:.3f}s") # Performance rating if elapsed < 0.1: rating = "๐ŸŸข EXCELLENT" elif elapsed < 0.5: rating = "๐ŸŸก GOOD" elif elapsed < 2.0: rating = "๐ŸŸ  ACCEPTABLE" else: rating = "๐Ÿ”ด SLOW - NEEDS OPTIMIZATION" print(f" {rating}") return { 'idle_combats': len(idle_combats), 'elapsed_seconds': elapsed, 'combats_per_second': len(idle_combats) / elapsed if elapsed > 0 and len(idle_combats) > 0 else 0 } async def test_item_decay(self) -> Dict: """Test dropped item decay performance.""" print("\n๐Ÿ“ฆ Testing item decay...") from bot import database decay_seconds = 3600 timestamp_limit = int(time.time()) - decay_seconds start_time = time.time() items_removed = await database.remove_expired_dropped_items(timestamp_limit) elapsed = time.time() - start_time print(f" โœ… Removed {items_removed} expired items in {elapsed:.3f}s") # Performance rating if elapsed < 0.1: rating = "๐ŸŸข EXCELLENT" elif elapsed < 0.5: rating = "๐ŸŸก GOOD" elif elapsed < 2.0: rating = "๐ŸŸ  ACCEPTABLE" else: rating = "๐Ÿ”ด SLOW - NEEDS OPTIMIZATION" print(f" {rating}") return { 'items_removed': items_removed, 'elapsed_seconds': elapsed } async def test_corpse_decay(self) -> Dict: """Test corpse decay performance.""" print("\n๐Ÿ’€ Testing corpse decay...") from bot import database player_corpse_limit = time.time() - (24 * 3600) npc_corpse_limit = time.time() - (2 * 3600) start_time = time.time() player_corpses_removed = await database.remove_expired_player_corpses(player_corpse_limit) npc_corpses_removed = await database.remove_expired_npc_corpses(npc_corpse_limit) elapsed = time.time() - start_time print(f" โœ… Removed {player_corpses_removed} player corpses and {npc_corpses_removed} NPC corpses in {elapsed:.3f}s") # Performance rating if elapsed < 0.1: rating = "๐ŸŸข EXCELLENT" elif elapsed < 0.5: rating = "๐ŸŸก GOOD" elif elapsed < 2.0: rating = "๐ŸŸ  ACCEPTABLE" else: rating = "๐Ÿ”ด SLOW - NEEDS OPTIMIZATION" print(f" {rating}") return { 'player_corpses_removed': player_corpses_removed, 'npc_corpses_removed': npc_corpses_removed, 'elapsed_seconds': elapsed } async def get_database_stats(self) -> Dict: """Get current database statistics.""" async with engine.connect() as conn: stats = {} # Player count result = await conn.execute(text("SELECT COUNT(*) FROM players")) stats['total_players'] = result.scalar() result = await conn.execute( text("SELECT COUNT(*) FROM players WHERE telegram_id >= 900000000") ) stats['test_players'] = result.scalar() # Combat count result = await conn.execute(text("SELECT COUNT(*) FROM active_combats")) stats['active_combats'] = result.scalar() # Item count result = await conn.execute(text("SELECT COUNT(*) FROM dropped_items")) stats['dropped_items'] = result.scalar() # Corpse count result = await conn.execute(text("SELECT COUNT(*) FROM player_corpses")) stats['player_corpses'] = result.scalar() result = await conn.execute(text("SELECT COUNT(*) FROM npc_corpses")) stats['npc_corpses'] = result.scalar() return stats async def cleanup_test_data(self): """Remove all test data.""" print("\n๐Ÿงน Cleaning up test data...") async with engine.begin() as conn: # Delete test players (cascade will handle inventory, combats, etc.) result = await conn.execute( text("DELETE FROM players WHERE telegram_id >= 900000000") ) print(f" โœ… Removed {result.rowcount} test players") # Clean up any remaining test combats result = await conn.execute( text("DELETE FROM active_combats WHERE player_id >= 900000000") ) print(f" โœ… Removed {result.rowcount} test combats") print(" โœจ Cleanup complete!") async def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='Performance testing for background tasks') parser.add_argument('--players', type=int, default=1000, help='Number of test players to generate') parser.add_argument('--combats', type=int, default=100, help='Number of test combats to generate') parser.add_argument('--items', type=int, default=500, help='Number of dropped items to generate') parser.add_argument('--player-corpses', type=int, default=50, help='Number of player corpses') parser.add_argument('--npc-corpses', type=int, default=200, help='Number of NPC corpses') parser.add_argument('--test-only', action='store_true', help='Skip data generation, just run tests') parser.add_argument('--cleanup', action='store_true', help='Clean up test data and exit') args = parser.parse_args() load_dotenv() tester = PerformanceTester() print("=" * 70) print("๐Ÿš€ BACKGROUND TASK PERFORMANCE TESTING") print("=" * 70) # Cleanup mode if args.cleanup: await tester.cleanup_test_data() return # Show current stats print("\n๐Ÿ“Š Current Database Stats:") stats = await tester.get_database_stats() for key, value in stats.items(): print(f" โ€ข {key.replace('_', ' ').title()}: {value}") # Generate test data (unless test-only mode) if not args.test_only: print("\n" + "=" * 70) print("๐Ÿ“ GENERATING TEST DATA") print("=" * 70) await tester.generate_test_players(args.players) await tester.generate_test_combats(args.combats) await tester.generate_test_items(args.items) # Skip corpses for now (schema issues) # await tester.generate_test_corpses(args.player_corpses, args.npc_corpses) # Show updated stats print("\n๐Ÿ“Š Updated Database Stats:") stats = await tester.get_database_stats() for key, value in stats.items(): print(f" โ€ข {key.replace('_', ' ').title()}: {value}") # Run performance tests print("\n" + "=" * 70) print("๐ŸŽฏ RUNNING PERFORMANCE TESTS") print("=" * 70) results = {} results['stamina'] = await tester.test_stamina_regeneration() results['combat'] = await tester.test_combat_timers() results['items'] = await tester.test_item_decay() # Skip corpse decay test # results['corpses'] = await tester.test_corpse_decay() # Summary print("\n" + "=" * 70) print("๐Ÿ“ˆ PERFORMANCE SUMMARY") print("=" * 70) total_time = sum(r['elapsed_seconds'] for r in results.values()) print(f"\n๐Ÿ• Total execution time: {total_time:.3f}s") print(f"๐Ÿ“Š Total players: {stats['total_players']}") print(f"โš”๏ธ Active combats: {stats['active_combats']}") print(f"๐Ÿ“ฆ Dropped items: {stats['dropped_items']}") print("\n๐ŸŽฏ Task Breakdown:") print(f" โ€ข Stamina regen: {results['stamina']['elapsed_seconds']:.3f}s ({results['stamina']['updated_players']} players)") print(f" โ€ข Combat timers: {results['combat']['elapsed_seconds']:.3f}s ({results['combat']['idle_combats']} idle)") print(f" โ€ข Item decay: {results['items']['elapsed_seconds']:.3f}s ({results['items']['items_removed']} removed)") # Performance verdict print("\n๐Ÿ† VERDICT:") if total_time < 1.0: print(" ๐ŸŸข EXCELLENT - Ready for 10K+ players") elif total_time < 3.0: print(" ๐ŸŸก GOOD - Ready for production") elif total_time < 10.0: print(" ๐ŸŸ  ACCEPTABLE - Monitor under load") else: print(" ๐Ÿ”ด SLOW - Optimization needed!") print("\n๐Ÿ’ก Cleanup:") print(" Run with --cleanup to remove test data") print(f" Example: python {sys.argv[0]} --cleanup") print() if __name__ == "__main__": asyncio.run(main())