What a mess

This commit is contained in:
Joan
2025-11-07 15:27:13 +01:00
parent 0b79b3ae59
commit 33cc9586c2
130 changed files with 29819 additions and 1175 deletions

View File

@@ -7,7 +7,8 @@ import json
import random
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ContextTypes
from . import database, keyboards, logic
from . import keyboards, logic
from .api_client import api_client
from .utils import format_stat_bar
from data.world_loader import game_world
from data.items import ITEMS
@@ -19,9 +20,43 @@ logger = logging.getLogger(__name__)
# UTILITY FUNCTIONS
# ============================================================================
async def get_player_status_text(telegram_id: int) -> str:
"""Generate player status text with location and stats."""
player = await database.get_player(telegram_id)
async def check_and_redirect_if_in_combat(query, user_id: int, player: dict) -> bool:
"""
Check if player is in combat and redirect to combat view if so.
Returns True if player is in combat (and was redirected), False otherwise.
"""
combat_data = await api_client.get_combat(user_id)
if combat_data:
from data.npcs import NPCS
npc_def = NPCS.get(combat_data['npc_id'])
message = f"⚔️ You're in combat with {npc_def.emoji} {npc_def.name}!\n"
message += format_stat_bar("Your HP", "❤️", player['hp'], player['max_hp']) + "\n"
message += format_stat_bar("Enemy HP", npc_def.emoji, combat_data['npc_hp'], combat_data['npc_max_hp']) + "\n\n"
message += "🎯 Your turn!" if combat_data['turn'] == 'player' else "⏳ Enemy's turn..."
keyboard = await keyboards.combat_keyboard(user_id)
from .handlers import send_or_edit_with_image
await send_or_edit_with_image(
query,
text=message,
reply_markup=keyboard,
image_path=npc_def.image_url if npc_def else None
)
await query.answer("⚔️ You're in combat! Finish or flee first.", show_alert=True)
return True
return False
async def get_player_status_text(player_id: int) -> str:
"""Generate player status text with location and stats.
Args:
player_id: The unique database ID of the player (not telegram_id)
"""
from .api_client import api_client
player = await api_client.get_player_by_id(player_id)
if not player:
return "Could not find player data."
@@ -29,7 +64,9 @@ async def get_player_status_text(telegram_id: int) -> str:
if not location:
return "Error: Player is in an unknown location."
inventory = await database.get_inventory(telegram_id)
# Get inventory from API
inv_result = await api_client.get_inventory(player_id)
inventory = inv_result.get('inventory', [])
weight, volume = logic.calculate_inventory_load(inventory)
max_weight, max_volume = logic.get_player_capacity(inventory, player)
@@ -61,11 +98,15 @@ async def get_player_status_text(telegram_id: int) -> str:
async def handle_inspect_area(query, user_id: int, player: dict, data: list = None):
"""Handle inspect area action - show NPCs and interactables in current location."""
# Check if player is in combat and redirect if so
if await check_and_redirect_if_in_combat(query, user_id, player):
return
await query.answer()
location_id = player['location_id']
location = game_world.get_location(location_id)
dropped_items = await database.get_dropped_items_in_location(location_id)
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
dropped_items = await api_client.get_dropped_items_in_location(location_id)
wandering_enemies = await api_client.get_wandering_enemies_in_location(location_id)
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
image_path = location.image_path if location else None
@@ -85,7 +126,7 @@ async def handle_attack_wandering(query, user_id: int, player: dict, data: list)
await query.answer()
# Get the enemy from database
wandering_enemies = await database.get_wandering_enemies_in_location(player['location_id'])
wandering_enemies = await api_client.get_wandering_enemies_in_location(player['location_id'])
enemy_data = next((e for e in wandering_enemies if e['id'] == enemy_db_id), None)
if not enemy_data:
@@ -93,8 +134,8 @@ async def handle_attack_wandering(query, user_id: int, player: dict, data: list)
# Refresh inspect menu
location_id = player['location_id']
location = game_world.get_location(location_id)
dropped_items = await database.get_dropped_items_in_location(location_id)
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
dropped_items = await api_client.get_dropped_items_in_location(location_id)
wandering_enemies = await api_client.get_wandering_enemies_in_location(location_id)
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
image_path = location.image_path if location else None
@@ -110,7 +151,7 @@ async def handle_attack_wandering(query, user_id: int, player: dict, data: list)
npc_id = enemy_data['npc_id']
# Remove enemy from wandering table (they're now in combat)
await database.remove_wandering_enemy(enemy_db_id)
await api_client.remove_wandering_enemy(enemy_db_id)
from data.npcs import NPCS
from bot import combat
@@ -143,6 +184,10 @@ async def handle_attack_wandering(query, user_id: int, player: dict, data: list)
async def handle_inspect_interactable(query, user_id: int, player: dict, data: list):
"""Handle inspecting an interactable object."""
# Check if player is in combat and redirect if so
if await check_and_redirect_if_in_combat(query, user_id, player):
return
location_id, instance_id = data[1], data[2]
location = game_world.get_location(location_id)
@@ -159,7 +204,7 @@ async def handle_inspect_interactable(query, user_id: int, player: dict, data: l
all_on_cooldown = True
for action_id in interactable.actions.keys():
cooldown_key = f"{instance_id}:{action_id}"
if await database.get_cooldown(cooldown_key) == 0:
if await api_client.get_cooldown(cooldown_key) == 0:
all_on_cooldown = False
break
@@ -185,9 +230,13 @@ async def handle_inspect_interactable(query, user_id: int, player: dict, data: l
async def handle_action(query, user_id: int, player: dict, data: list):
"""Handle performing an action on an interactable object."""
# Check if player is in combat and redirect if so
if await check_and_redirect_if_in_combat(query, user_id, player):
return
location_id, instance_id, action_id = data[1], data[2], data[3]
cooldown_key = f"{instance_id}:{action_id}"
cooldown = await database.get_cooldown(cooldown_key)
cooldown = await api_client.get_cooldown(cooldown_key)
if cooldown > 0:
await query.answer("Someone got to it just before you!", show_alert=False)
@@ -207,13 +256,13 @@ async def handle_action(query, user_id: int, player: dict, data: list):
await query.answer()
# Set cooldown
await database.set_cooldown(cooldown_key)
await api_client.set_cooldown(cooldown_key)
# Resolve action
outcome = logic.resolve_action(player, action_obj)
new_stamina = player['stamina'] - action_obj.stamina_cost
new_hp = player['hp'] - outcome.damage_taken
await database.update_player(user_id, {"stamina": new_stamina, "hp": new_hp})
await api_client.update_player(user_id, {"stamina": new_stamina, "hp": new_hp})
# Build detailed action result
result_details = [f"<i>{outcome.text}</i>"]
@@ -232,7 +281,7 @@ async def handle_action(query, user_id: int, player: dict, data: list):
can_add, reason = await logic.can_add_item_to_inventory(user_id, item_id, quantity)
if can_add:
await database.add_item_to_inventory(user_id, item_id, quantity)
await api_client.add_item_to_inventory(user_id, item_id, quantity)
item_def = ITEMS.get(item_id, {})
emoji = item_def.get('emoji', '')
item_name = item_def.get('name', item_id)
@@ -285,6 +334,10 @@ async def handle_main_menu(query, user_id: int, player: dict, data: list = None)
async def handle_move_menu(query, user_id: int, player: dict, data: list = None):
"""Show movement options menu."""
# Check if player is in combat and redirect if so
if await check_and_redirect_if_in_combat(query, user_id, player):
return
await query.answer()
location = game_world.get_location(player['location_id'])
location_image = location.image_path if location else None
@@ -300,31 +353,24 @@ async def handle_move_menu(query, user_id: int, player: dict, data: list = None)
async def handle_move(query, user_id: int, player: dict, data: list):
"""Handle player movement to a new location."""
# Check if player is in combat and redirect if so
if await check_and_redirect_if_in_combat(query, user_id, player):
return
destination_id = data[1]
from_location = game_world.get_location(player['location_id'])
to_location = game_world.get_location(destination_id)
# Use API to move player
from .api_client import api_client
result = await api_client.move_player(player['id'], destination_id)
if not from_location or not to_location:
await query.answer("Invalid location!", show_alert=True)
if not result.get('success'):
await query.answer(result.get('message', 'Cannot move there!'), show_alert=True)
return
# Calculate stamina cost
inventory = await database.get_inventory(user_id)
stamina_cost = logic.calculate_travel_stamina_cost(player, inventory, from_location, to_location)
await query.answer(result.get('message', 'Moving...'), show_alert=False)
if player['stamina'] < stamina_cost:
await query.answer(f"Too tired to travel! Need {stamina_cost} stamina.", show_alert=True)
return
# Deduct stamina and update location
new_stamina = player['stamina'] - stamina_cost
await database.update_player(user_id, {"location_id": destination_id, "stamina": new_stamina})
await query.answer(f"⚡️ -{stamina_cost} stamina", show_alert=False)
# Refresh player data
player = await database.get_player(user_id)
# Refresh player data from API using unique id
player = await api_client.get_player_by_id(user_id)
# Check for random NPC encounter
from data.npcs import NPCS, get_random_npc_for_location, get_location_encounter_rate

198
bot/api_client.old.py Normal file
View File

@@ -0,0 +1,198 @@
"""
API Client for Telegram Bot
Connects bot to FastAPI game server instead of using direct database access
"""
import os
import httpx
from typing import Optional, Dict, Any
API_BASE_URL = os.getenv("API_BASE_URL", "http://echoes_of_the_ashes_api:8000")
API_INTERNAL_KEY = os.getenv("API_INTERNAL_KEY", "internal-bot-access-key-change-me")
class GameAPIClient:
"""Client for interacting with the FastAPI game server"""
def __init__(self):
self.base_url = API_BASE_URL
self.headers = {
"X-Internal-Key": API_INTERNAL_KEY,
"Content-Type": "application/json"
}
self.client = httpx.AsyncClient(timeout=30.0)
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
# ==================== Player Management ====================
async def get_player(self, telegram_id: int) -> Optional[Dict[str, Any]]:
"""Get player by telegram ID"""
try:
response = await self.client.get(
f"{self.base_url}/api/internal/player/telegram/{telegram_id}",
headers=self.headers
)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting player: {e}")
return None
async def create_player(self, telegram_id: int, name: str) -> Optional[Dict[str, Any]]:
"""Create a new player"""
try:
response = await self.client.post(
f"{self.base_url}/api/internal/player",
headers=self.headers,
json={"telegram_id": telegram_id, "name": name}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error creating player: {e}")
return None
async def update_player(self, telegram_id: int, updates: Dict[str, Any]) -> bool:
"""Update player data"""
try:
response = await self.client.patch(
f"{self.base_url}/api/internal/player/telegram/{telegram_id}",
headers=self.headers,
json=updates
)
response.raise_for_status()
return True
except Exception as e:
print(f"Error updating player: {e}")
return False
# ==================== Location & Movement ====================
async def get_location(self, location_id: str) -> Optional[Dict[str, Any]]:
"""Get location details"""
try:
response = await self.client.get(
f"{self.base_url}/api/internal/location/{location_id}",
headers=self.headers
)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting location: {e}")
return None
async def move_player(self, telegram_id: int, direction: str) -> Optional[Dict[str, Any]]:
"""Move player in a direction"""
try:
response = await self.client.post(
f"{self.base_url}/api/internal/player/telegram/{telegram_id}/move",
headers=self.headers,
json={"direction": direction}
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
# Return error details
return {"success": False, "error": e.response.json().get("detail", str(e))}
except Exception as e:
print(f"Error moving player: {e}")
return {"success": False, "error": str(e)}
# ==================== Combat ====================
async def start_combat(self, telegram_id: int, npc_id: str) -> Optional[Dict[str, Any]]:
"""Start combat with an NPC"""
try:
response = await self.client.post(
f"{self.base_url}/api/internal/combat/start",
headers=self.headers,
json={"telegram_id": telegram_id, "npc_id": npc_id}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error starting combat: {e}")
return None
async def get_combat(self, telegram_id: int) -> Optional[Dict[str, Any]]:
"""Get active combat state"""
try:
response = await self.client.get(
f"{self.base_url}/api/internal/combat/telegram/{telegram_id}",
headers=self.headers
)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting combat: {e}")
return None
async def combat_action(self, telegram_id: int, action: str) -> Optional[Dict[str, Any]]:
"""Perform a combat action (attack, defend, flee)"""
try:
response = await self.client.post(
f"{self.base_url}/api/internal/combat/telegram/{telegram_id}/action",
headers=self.headers,
json={"action": action}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error performing combat action: {e}")
return None
# ==================== Inventory ====================
async def get_inventory(self, telegram_id: int) -> Optional[Dict[str, Any]]:
"""Get player's inventory"""
try:
response = await self.client.get(
f"{self.base_url}/api/internal/player/telegram/{telegram_id}/inventory",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting inventory: {e}")
return None
async def use_item(self, telegram_id: int, item_db_id: int) -> Optional[Dict[str, Any]]:
"""Use an item from inventory"""
try:
response = await self.client.post(
f"{self.base_url}/api/internal/player/telegram/{telegram_id}/use_item",
headers=self.headers,
json={"item_db_id": item_db_id}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error using item: {e}")
return None
async def equip_item(self, telegram_id: int, item_db_id: int) -> Optional[Dict[str, Any]]:
"""Equip/unequip an item"""
try:
response = await self.client.post(
f"{self.base_url}/api/internal/player/telegram/{telegram_id}/equip",
headers=self.headers,
json={"item_db_id": item_db_id}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error equipping item: {e}")
return None
# Global API client instance
api_client = GameAPIClient()

623
bot/api_client.py Normal file
View File

@@ -0,0 +1,623 @@
"""
API client for the bot to communicate with the standalone API.
All database operations now go through the API.
"""
import httpx
import os
from typing import Optional, Dict, Any, List
class APIClient:
"""Client for bot-to-API communication"""
def __init__(self):
self.api_url = os.getenv("API_BASE_URL", os.getenv("API_URL", "http://echoes_of_the_ashes_api:8000"))
self.internal_key = os.getenv("API_INTERNAL_KEY", "change-this-internal-key")
self.client = httpx.AsyncClient(timeout=30.0)
self.headers = {
"Authorization": f"Bearer {self.internal_key}",
"Content-Type": "application/json"
}
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
# Player operations
async def get_player(self, telegram_id: int) -> Optional[Dict[str, Any]]:
"""Get player by Telegram ID"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/player/{telegram_id}",
headers=self.headers
)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting player: {e}")
return None
async def get_player_by_id(self, player_id: int) -> Optional[Dict[str, Any]]:
"""Get player by unique database ID"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/player/by_id/{player_id}",
headers=self.headers
)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting player by id: {e}")
return None
async def create_player(self, telegram_id: int, name: str = "Survivor") -> Optional[Dict[str, Any]]:
"""Create a new player"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/player",
headers=self.headers,
params={"telegram_id": telegram_id, "name": name}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error creating player: {e}")
return None
# Movement operations
async def move_player(self, player_id: int, direction: str) -> Dict[str, Any]:
"""Move player in a direction"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/player/{player_id}/move",
headers=self.headers,
params={"direction": direction}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error moving player: {e}")
return {"success": False, "message": str(e)}
# Inspection operations
async def inspect_area(self, player_id: int) -> Dict[str, Any]:
"""Inspect current area"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/player/{player_id}/inspect",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error inspecting area: {e}")
return {"success": False, "message": str(e)}
# Interaction operations
async def interact(self, player_id: int, interactable_id: str, action_id: str) -> Dict[str, Any]:
"""Interact with an object"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/player/{player_id}/interact",
headers=self.headers,
params={"interactable_id": interactable_id, "action_id": action_id}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error interacting: {e}")
return {"success": False, "message": str(e)}
# Inventory operations
async def get_inventory(self, player_id: int) -> Dict[str, Any]:
"""Get player inventory"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/player/{player_id}/inventory",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting inventory: {e}")
return {"success": False, "inventory": []}
async def use_item(self, player_id: int, item_id: str) -> Dict[str, Any]:
"""Use an item"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/player/{player_id}/use_item",
headers=self.headers,
params={"item_id": item_id}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error using item: {e}")
return {"success": False, "message": str(e)}
async def pickup_item(self, player_id: int, item_id: str) -> Dict[str, Any]:
"""Pick up an item"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/player/{player_id}/pickup",
headers=self.headers,
params={"item_id": item_id}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error picking up item: {e}")
return {"success": False, "message": str(e)}
async def drop_item(self, player_id: int, item_id: str, quantity: int = 1) -> Dict[str, Any]:
"""Drop an item"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/player/{player_id}/drop_item",
headers=self.headers,
params={"item_id": item_id, "quantity": quantity}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error dropping item: {e}")
return {"success": False, "message": str(e)}
async def equip_item(self, player_id: int, item_id: str) -> Dict[str, Any]:
"""Equip an item"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/player/{player_id}/equip",
headers=self.headers,
params={"item_id": item_id}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error equipping item: {e}")
return {"success": False, "message": str(e)}
async def unequip_item(self, player_id: int, item_id: str) -> Dict[str, Any]:
"""Unequip an item"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/player/{player_id}/unequip",
headers=self.headers,
params={"item_id": item_id}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error unequipping item: {e}")
return {"success": False, "message": str(e)}
# Combat operations
async def get_combat(self, player_id: int) -> Optional[Dict[str, Any]]:
"""Get active combat for player"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/player/{player_id}/combat",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting combat: {e}")
return None
async def create_combat(self, player_id: int, npc_id: str, npc_hp: int, npc_max_hp: int, location_id: str, from_wandering: bool = False) -> Optional[Dict[str, Any]]:
"""Create new combat"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/combat/create",
headers=self.headers,
params={
"player_id": player_id,
"npc_id": npc_id,
"npc_hp": npc_hp,
"npc_max_hp": npc_max_hp,
"location_id": location_id,
"from_wandering": from_wandering
}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error creating combat: {e}")
return None
async def update_combat(self, player_id: int, updates: Dict[str, Any]) -> bool:
"""Update combat state"""
try:
response = await self.client.patch(
f"{self.api_url}/api/internal/combat/{player_id}",
headers=self.headers,
json=updates
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error updating combat: {e}")
return False
async def end_combat(self, player_id: int) -> bool:
"""End combat"""
try:
response = await self.client.delete(
f"{self.api_url}/api/internal/combat/{player_id}",
headers=self.headers
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error ending combat: {e}")
return False
# Player update operations
async def update_player(self, player_id: int, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Update player fields"""
try:
response = await self.client.patch(
f"{self.api_url}/api/internal/player/{player_id}",
headers=self.headers,
json=updates
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error updating player: {e}")
return None
# Dropped items operations
async def drop_item_to_world(self, item_id: str, quantity: int, location_id: str) -> bool:
"""Drop an item to the world"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/dropped-items",
headers=self.headers,
params={"item_id": item_id, "quantity": quantity, "location_id": location_id}
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error dropping item: {e}")
return False
async def get_dropped_item(self, dropped_item_id: int) -> Optional[Dict[str, Any]]:
"""Get a specific dropped item"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/dropped-items/{dropped_item_id}",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting dropped item: {e}")
return None
async def get_dropped_items_in_location(self, location_id: str) -> List[Dict[str, Any]]:
"""Get all dropped items in a location"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/location/{location_id}/dropped-items",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting dropped items: {e}")
return []
async def update_dropped_item(self, dropped_item_id: int, quantity: int) -> bool:
"""Update dropped item quantity"""
try:
response = await self.client.patch(
f"{self.api_url}/api/internal/dropped-items/{dropped_item_id}",
headers=self.headers,
params={"quantity": quantity}
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error updating dropped item: {e}")
return False
async def remove_dropped_item(self, dropped_item_id: int) -> bool:
"""Remove a dropped item"""
try:
response = await self.client.delete(
f"{self.api_url}/api/internal/dropped-items/{dropped_item_id}",
headers=self.headers
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error removing dropped item: {e}")
return False
# Corpse operations
async def create_player_corpse(self, player_name: str, location_id: str, items: str) -> Optional[int]:
"""Create a player corpse"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/corpses/player",
headers=self.headers,
params={"player_name": player_name, "location_id": location_id, "items": items}
)
response.raise_for_status()
result = response.json()
return result.get('corpse_id')
except Exception as e:
print(f"Error creating player corpse: {e}")
return None
async def get_player_corpse(self, corpse_id: int) -> Optional[Dict[str, Any]]:
"""Get a player corpse"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/corpses/player/{corpse_id}",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting player corpse: {e}")
return None
async def update_player_corpse(self, corpse_id: int, items: str) -> bool:
"""Update player corpse items"""
try:
response = await self.client.patch(
f"{self.api_url}/api/internal/corpses/player/{corpse_id}",
headers=self.headers,
params={"items": items}
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error updating player corpse: {e}")
return False
async def remove_player_corpse(self, corpse_id: int) -> bool:
"""Remove a player corpse"""
try:
response = await self.client.delete(
f"{self.api_url}/api/internal/corpses/player/{corpse_id}",
headers=self.headers
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error removing player corpse: {e}")
return False
async def create_npc_corpse(self, npc_id: str, location_id: str, loot_remaining: str) -> Optional[int]:
"""Create an NPC corpse"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/corpses/npc",
headers=self.headers,
params={"npc_id": npc_id, "location_id": location_id, "loot_remaining": loot_remaining}
)
response.raise_for_status()
result = response.json()
return result.get('corpse_id')
except Exception as e:
print(f"Error creating NPC corpse: {e}")
return None
async def get_npc_corpse(self, corpse_id: int) -> Optional[Dict[str, Any]]:
"""Get an NPC corpse"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/corpses/npc/{corpse_id}",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting NPC corpse: {e}")
return None
async def update_npc_corpse(self, corpse_id: int, loot_remaining: str) -> bool:
"""Update NPC corpse loot"""
try:
response = await self.client.patch(
f"{self.api_url}/api/internal/corpses/npc/{corpse_id}",
headers=self.headers,
params={"loot_remaining": loot_remaining}
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error updating NPC corpse: {e}")
return False
async def remove_npc_corpse(self, corpse_id: int) -> bool:
"""Remove an NPC corpse"""
try:
response = await self.client.delete(
f"{self.api_url}/api/internal/corpses/npc/{corpse_id}",
headers=self.headers
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error removing NPC corpse: {e}")
return False
# Wandering enemies operations
async def spawn_wandering_enemy(self, npc_id: str, location_id: str, current_hp: int, max_hp: int) -> Optional[int]:
"""Spawn a wandering enemy"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/wandering-enemies",
headers=self.headers,
params={"npc_id": npc_id, "location_id": location_id, "current_hp": current_hp, "max_hp": max_hp}
)
response.raise_for_status()
result = response.json()
return result.get('enemy_id')
except Exception as e:
print(f"Error spawning wandering enemy: {e}")
return None
async def get_wandering_enemies_in_location(self, location_id: str) -> List[Dict[str, Any]]:
"""Get all wandering enemies in a location"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/location/{location_id}/wandering-enemies",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting wandering enemies: {e}")
return []
async def remove_wandering_enemy(self, enemy_id: int) -> bool:
"""Remove a wandering enemy"""
try:
response = await self.client.delete(
f"{self.api_url}/api/internal/wandering-enemies/{enemy_id}",
headers=self.headers
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error removing wandering enemy: {e}")
return False
async def get_inventory_item(self, item_db_id: int) -> Optional[Dict[str, Any]]:
"""Get a specific inventory item by database ID"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/inventory/item/{item_db_id}",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting inventory item: {e}")
return None
# Cooldown operations
async def get_cooldown(self, cooldown_key: str) -> int:
"""Get remaining cooldown time in seconds"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/cooldown/{cooldown_key}",
headers=self.headers
)
response.raise_for_status()
result = response.json()
return result.get('remaining_seconds', 0)
except Exception as e:
print(f"Error getting cooldown: {e}")
return 0
async def set_cooldown(self, cooldown_key: str, duration_seconds: int = 600) -> bool:
"""Set a cooldown"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/cooldown/{cooldown_key}",
headers=self.headers,
params={"duration_seconds": duration_seconds}
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error setting cooldown: {e}")
return False
# Corpse list operations
async def get_player_corpses_in_location(self, location_id: str) -> List[Dict[str, Any]]:
"""Get all player corpses in a location"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/location/{location_id}/corpses/player",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting player corpses: {e}")
return []
async def get_npc_corpses_in_location(self, location_id: str) -> List[Dict[str, Any]]:
"""Get all NPC corpses in a location"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/location/{location_id}/corpses/npc",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting NPC corpses: {e}")
return []
# Image cache operations
async def get_cached_image(self, image_path: str) -> Optional[str]:
"""Get cached telegram file ID for an image"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/image-cache/{image_path}",
headers=self.headers
)
response.raise_for_status()
result = response.json()
return result.get('telegram_file_id')
except Exception as e:
# Not found is expected, not an error
return None
async def cache_image(self, image_path: str, telegram_file_id: str) -> bool:
"""Cache a telegram file ID for an image"""
try:
response = await self.client.post(
f"{self.api_url}/api/internal/image-cache",
headers=self.headers,
params={"image_path": image_path, "telegram_file_id": telegram_file_id}
)
response.raise_for_status()
result = response.json()
return result.get('success', False)
except Exception as e:
print(f"Error caching image: {e}")
return False
# Status effects operations
async def get_player_status_effects(self, player_id: int) -> List[Dict[str, Any]]:
"""Get player status effects"""
try:
response = await self.client.get(
f"{self.api_url}/api/internal/player/{player_id}/status-effects",
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting status effects: {e}")
return []
# Global API client instance
api_client = APIClient()

201
bot/background_tasks.py Normal file
View File

@@ -0,0 +1,201 @@
"""
Background tasks for the bot.
Handles periodic maintenance, regeneration, and processing.
"""
import asyncio
import logging
import time
from bot import database
logger = logging.getLogger(__name__)
async def decay_dropped_items(shutdown_event):
"""A background task that periodically cleans up old dropped items."""
while not shutdown_event.is_set():
try:
# Wait for 5 minutes before the next cleanup
await asyncio.wait_for(shutdown_event.wait(), timeout=300)
except asyncio.TimeoutError:
start_time = time.time()
logger.info("Running item decay task...")
# Set decay time to 1 hour (3600 seconds)
decay_seconds = 3600
timestamp_limit = int(time.time()) - decay_seconds
items_removed = await database.remove_expired_dropped_items(timestamp_limit)
elapsed = time.time() - start_time
if items_removed > 0:
logger.info(f"Decayed and removed {items_removed} old items in {elapsed:.2f}s")
async def regenerate_stamina(shutdown_event):
"""A background task that periodically regenerates stamina for all players."""
while not shutdown_event.is_set():
try:
# Wait for 5 minutes before the next regeneration cycle
await asyncio.wait_for(shutdown_event.wait(), timeout=300)
except asyncio.TimeoutError:
start_time = time.time()
logger.info("Running stamina regeneration...")
players_updated = await database.regenerate_all_players_stamina()
elapsed = time.time() - start_time
if players_updated > 0:
logger.info(f"Regenerated stamina for {players_updated} players in {elapsed:.2f}s")
# Alert if regeneration is taking too long (potential scaling issue)
if elapsed > 5.0:
logger.warning(f"⚠️ Stamina regeneration took {elapsed:.2f}s (threshold: 5s) - check database load!")
async def check_combat_timers(shutdown_event):
"""A background task that checks for idle combat turns and auto-attacks."""
while not shutdown_event.is_set():
try:
# Wait for 30 seconds before next check
await asyncio.wait_for(shutdown_event.wait(), timeout=30)
except asyncio.TimeoutError:
start_time = time.time()
# Check for combats idle for more than 5 minutes (300 seconds)
idle_threshold = time.time() - 300
idle_combats = await database.get_all_idle_combats(idle_threshold)
if idle_combats:
logger.info(f"Processing {len(idle_combats)} idle combats...")
for combat in idle_combats:
try:
from bot import combat as combat_logic
# Force end player's turn and let NPC attack
if combat['turn'] == 'player':
await database.update_combat(combat['player_id'], {
'turn': 'npc',
'turn_started_at': time.time()
})
# NPC attacks
await combat_logic.npc_attack(combat['player_id'])
except Exception as e:
logger.error(f"Error processing idle combat: {e}")
# Log performance for monitoring
if idle_combats:
elapsed = time.time() - start_time
logger.info(f"Processed {len(idle_combats)} idle combats in {elapsed:.2f}s")
# Warn if taking too long (potential scaling issue)
if elapsed > 10.0:
logger.warning(f"⚠️ Combat timer check took {elapsed:.2f}s (threshold: 10s) - consider batching!")
async def decay_corpses(shutdown_event):
"""A background task that removes old corpses."""
while not shutdown_event.is_set():
try:
# Wait for 10 minutes before next cleanup
await asyncio.wait_for(shutdown_event.wait(), timeout=600)
except asyncio.TimeoutError:
start_time = time.time()
logger.info("Running corpse decay...")
# Player corpses decay after 24 hours
player_corpse_limit = time.time() - (24 * 3600)
player_corpses_removed = await database.remove_expired_player_corpses(player_corpse_limit)
# NPC corpses decay after 2 hours
npc_corpse_limit = time.time() - (2 * 3600)
npc_corpses_removed = await database.remove_expired_npc_corpses(npc_corpse_limit)
elapsed = time.time() - start_time
if player_corpses_removed > 0 or npc_corpses_removed > 0:
logger.info(f"Decayed {player_corpses_removed} player corpses and {npc_corpses_removed} NPC corpses in {elapsed:.2f}s")
async def process_status_effects(shutdown_event):
"""
A background task that applies damage from persistent status effects.
Runs every 5 minutes to process status effect ticks.
"""
while not shutdown_event.is_set():
try:
# Wait for 5 minutes before next processing cycle
await asyncio.wait_for(shutdown_event.wait(), timeout=300)
except asyncio.TimeoutError:
start_time = time.time()
logger.info("Running status effects processor...")
try:
# Decrement all status effect ticks and get affected players
affected_players = await database.decrement_all_status_effect_ticks()
if not affected_players:
elapsed = time.time() - start_time
logger.info(f"No active status effects to process ({elapsed:.3f}s)")
continue
# Process each affected player
deaths = 0
damage_dealt = 0
for player_id in affected_players:
try:
# Get current status effects (after decrement)
effects = await database.get_player_status_effects(player_id)
if not effects:
continue
# Calculate total damage
from bot.status_utils import calculate_status_damage
total_damage = calculate_status_damage(effects)
if total_damage > 0:
damage_dealt += total_damage
player = await database.get_player(player_id)
if not player or player['is_dead']:
continue
new_hp = max(0, player['hp'] - total_damage)
# Check if player died from status effects
if new_hp <= 0:
await database.update_player(player_id, {'hp': 0, 'is_dead': True})
deaths += 1
# Create player corpse
inventory = await database.get_inventory(player_id)
await database.create_player_corpse(
player_name=player['name'],
location_id=player['location_id'],
items=inventory
)
# Remove status effects from dead player
await database.remove_all_status_effects(player_id)
logger.info(f"Player {player['name']} (ID: {player_id}) died from status effects")
else:
# Apply damage
await database.update_player(player_id, {'hp': new_hp})
except Exception as e:
logger.error(f"Error processing status effects for player {player_id}: {e}")
elapsed = time.time() - start_time
logger.info(
f"Processed status effects for {len(affected_players)} players "
f"({damage_dealt} total damage, {deaths} deaths) in {elapsed:.3f}s"
)
# Warn if taking too long (potential scaling issue)
if elapsed > 5.0:
logger.warning(
f"⚠️ Status effects processing took {elapsed:.3f}s (threshold: 5s) "
f"- {len(affected_players)} players affected"
)
except Exception as e:
logger.error(f"Error in status effects processor: {e}")

View File

@@ -6,7 +6,7 @@ import random
import json
import time
from typing import Dict, List, Tuple, Optional
from bot import database
from bot.api_client import api_client
from bot.utils import format_stat_bar
from data.npcs import NPCS, STATUS_EFFECTS
from data.items import ITEMS
@@ -27,7 +27,7 @@ async def calculate_player_damage(player: dict) -> int:
level_bonus = player['level']
# Check for equipped weapon
inventory = await database.get_inventory(player['telegram_id'])
inventory = await api_client.get_inventory(player['telegram_id'])
weapon_damage = 0
for item in inventory:
@@ -76,7 +76,7 @@ async def initiate_combat(player_id: int, npc_id: str, location_id: str, from_wa
npc_hp = random.randint(npc_def.hp_min, npc_def.hp_max)
# Create combat in database
combat_id = await database.create_combat(
combat_id = await api_client.create_combat(
player_id=player_id,
npc_id=npc_id,
npc_hp=npc_hp,
@@ -85,7 +85,7 @@ async def initiate_combat(player_id: int, npc_id: str, location_id: str, from_wa
from_wandering_enemy=from_wandering_enemy
)
return await database.get_combat(player_id)
return await api_client.get_combat(player_id)
async def player_attack(player_id: int) -> Tuple[str, bool, bool]:
@@ -93,11 +93,11 @@ async def player_attack(player_id: int) -> Tuple[str, bool, bool]:
Player attacks the NPC.
Returns: (message, npc_died, player_turn_ended)
"""
combat = await database.get_combat(player_id)
combat = await api_client.get_combat(player_id)
if not combat or combat['turn'] != 'player':
return ("It's not your turn!", False, False)
player = await database.get_player(player_id)
player = await api_client.get_player(player_id)
npc_def = NPCS.get(combat['npc_id'])
if not player or not npc_def:
@@ -109,7 +109,7 @@ async def player_attack(player_id: int) -> Tuple[str, bool, bool]:
if is_stunned:
# Update status effects
player_effects = update_status_effects(player_effects)
await database.update_combat(player_id, {
await api_client.update_combat(player_id, {
'turn': 'npc',
'turn_started_at': time.time(),
'player_status_effects': json.dumps(player_effects)
@@ -147,7 +147,7 @@ async def player_attack(player_id: int) -> Tuple[str, bool, bool]:
player_effects, status_damage, status_messages = apply_status_effects(player_effects)
if status_damage > 0:
new_player_hp = max(0, player['hp'] - status_damage)
await database.update_player(player_id, {'hp': new_player_hp})
await api_client.update_player(player_id, {'hp': new_player_hp})
message += f"\n{status_messages}"
if new_player_hp <= 0:
@@ -156,7 +156,7 @@ async def player_attack(player_id: int) -> Tuple[str, bool, bool]:
# Check if NPC died
if new_npc_hp <= 0:
await database.update_combat(player_id, {
await api_client.update_combat(player_id, {
'npc_hp': 0,
'npc_status_effects': json.dumps(npc_effects),
'player_status_effects': json.dumps(player_effects)
@@ -167,7 +167,7 @@ async def player_attack(player_id: int) -> Tuple[str, bool, bool]:
return (message + "\n\n" + victory_msg, True, True)
# Update combat - switch to NPC turn
await database.update_combat(player_id, {
await api_client.update_combat(player_id, {
'npc_hp': new_npc_hp,
'turn': 'npc',
'turn_started_at': time.time(),
@@ -189,11 +189,11 @@ async def npc_attack(player_id: int) -> Tuple[str, bool]:
NPC attacks the player.
Returns: (message, player_died)
"""
combat = await database.get_combat(player_id)
combat = await api_client.get_combat(player_id)
if not combat or combat['turn'] != 'npc':
return ("", False)
player = await database.get_player(player_id)
player = await api_client.get_player(player_id)
npc_def = NPCS.get(combat['npc_id'])
if not player or not npc_def:
@@ -205,7 +205,7 @@ async def npc_attack(player_id: int) -> Tuple[str, bool]:
if is_stunned:
# Update status effects
npc_effects = update_status_effects(npc_effects)
await database.update_combat(player_id, {
await api_client.update_combat(player_id, {
'turn': 'player',
'turn_started_at': time.time(),
'npc_status_effects': json.dumps(npc_effects)
@@ -217,7 +217,7 @@ async def npc_attack(player_id: int) -> Tuple[str, bool]:
# Apply damage to player
new_player_hp = max(0, player['hp'] - damage)
await database.update_player(player_id, {'hp': new_player_hp})
await api_client.update_player(player_id, {'hp': new_player_hp})
message = "━━━ ENEMY TURN ━━━\n"
message += f"💥 The {npc_def.name} attacks you for {damage} damage!"
@@ -237,7 +237,7 @@ async def npc_attack(player_id: int) -> Tuple[str, bool]:
npc_effects, status_damage, status_messages = apply_status_effects(npc_effects)
if status_damage > 0:
new_npc_hp = max(0, combat['npc_hp'] - status_damage)
await database.update_combat(player_id, {'npc_hp': new_npc_hp})
await api_client.update_combat(player_id, {'npc_hp': new_npc_hp})
message += f"\n{status_messages}"
if new_npc_hp <= 0:
@@ -250,7 +250,7 @@ async def npc_attack(player_id: int) -> Tuple[str, bool]:
return (message + "\n\n💀 You have been slain...", True)
# Update combat - switch to player turn
await database.update_combat(player_id, {
await api_client.update_combat(player_id, {
'turn': 'player',
'turn_started_at': time.time(),
'player_status_effects': json.dumps(player_effects),
@@ -270,11 +270,11 @@ async def flee_attempt(player_id: int) -> Tuple[str, bool, bool]:
Player attempts to flee from combat.
Returns: (message, fled_successfully, turn_ended)
"""
combat = await database.get_combat(player_id)
combat = await api_client.get_combat(player_id)
if not combat or combat['turn'] != 'player':
return ("It's not your turn!", False, False)
player = await database.get_player(player_id)
player = await api_client.get_player(player_id)
npc_def = NPCS.get(combat['npc_id'])
# Base flee chance is 50%, modified by agility
@@ -283,21 +283,22 @@ async def flee_attempt(player_id: int) -> Tuple[str, bool, bool]:
if random.random() < flee_chance:
# Success! Check if we need to respawn the wandering enemy
if combat.get('from_wandering_enemy', False):
# Respawn the enemy at the same location
await database.spawn_wandering_enemy(
# Respawn the enemy at the same location with full HP
await api_client.spawn_wandering_enemy(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
lifetime_seconds=600 # 10 minutes
current_hp=npc_def.hp,
max_hp=npc_def.hp
)
await database.end_combat(player_id)
await api_client.end_combat(player_id)
return (f"🏃 You successfully flee from the {npc_def.name}!", True, True)
else:
# Failed - lose turn and NPC attacks
message = f"❌ You failed to escape! The {npc_def.name} takes advantage!"
# NPC gets a free attack
await database.update_combat(player_id, {
await api_client.update_combat(player_id, {
'turn': 'npc',
'turn_started_at': time.time()
})
@@ -317,26 +318,46 @@ def update_status_effects(effects: List[Dict]) -> List[Dict]:
def apply_status_effects(effects: List[Dict]) -> Tuple[List[Dict], int, str]:
"""
Apply status effect damage.
Apply status effect damage with stacking.
Returns: (updated_effects, total_damage, message)
"""
from bot.status_utils import stack_status_effects
if not effects:
return effects, 0, ""
# Convert effect format if needed (name -> effect_name, damage_per_turn -> damage_per_tick)
normalized_effects = []
for effect in effects:
normalized = {
'effect_name': effect.get('name', effect.get('effect_name', 'Unknown')),
'effect_icon': effect.get('icon', effect.get('effect_icon', '')),
'damage_per_tick': effect.get('damage_per_turn', effect.get('damage_per_tick', 0)),
'ticks_remaining': effect.get('turns_remaining', effect.get('ticks_remaining', 0))
}
normalized_effects.append(normalized)
# Stack effects
stacked = stack_status_effects(normalized_effects)
total_damage = 0
messages = []
for effect in effects:
if effect['damage_per_turn'] > 0:
total_damage += effect['damage_per_turn']
if effect['name'] == 'Bleeding':
messages.append(f"🩸 Bleeding: -{effect['damage_per_turn']} HP")
elif effect['name'] == 'Infected':
messages.append(f"🦠 Infection: -{effect['damage_per_turn']} HP")
for name, data in stacked.items():
if data['total_damage'] > 0:
total_damage += data['total_damage']
# Show stacked damage
if data['stacks'] > 1:
messages.append(f"{data['icon']} {name.replace('_', ' ').title()}: -{data['total_damage']} HP (×{data['stacks']})")
else:
messages.append(f"{data['icon']} {name.replace('_', ' ').title()}: -{data['total_damage']} HP")
return effects, total_damage, "\n".join(messages)
async def handle_npc_death(player_id: int, combat: Dict, npc_def) -> str:
"""Handle NPC death - give XP, drop loot, create corpse."""
player = await database.get_player(player_id)
player = await api_client.get_player(player_id)
# Give XP
new_xp = player['xp'] + npc_def.xp_reward
@@ -353,7 +374,7 @@ async def handle_npc_death(player_id: int, combat: Dict, npc_def) -> str:
points_gained = 5
new_unspent_points = player.get('unspent_points', 0) + points_gained
await database.update_player(player_id, {
await api_client.update_player(player_id, {
'xp': new_xp,
'level': new_level,
'hp': player['max_hp'], # Heal on level up
@@ -366,7 +387,7 @@ async def handle_npc_death(player_id: int, combat: Dict, npc_def) -> str:
level_up_msg += f"\n❤️ Fully healed and stamina restored!"
level_up_msg += f"\n\n💡 Check your profile to spend your points!"
else:
await database.update_player(player_id, {'xp': new_xp})
await api_client.update_player(player_id, {'xp': new_xp})
# Drop loot
loot_msg = "\n\n💰 Loot dropped:"
@@ -374,7 +395,7 @@ async def handle_npc_death(player_id: int, combat: Dict, npc_def) -> str:
for loot_item in npc_def.loot_table:
if random.random() < loot_item.drop_chance:
quantity = random.randint(loot_item.quantity_min, loot_item.quantity_max)
await database.drop_item_to_world(
await api_client.drop_item_to_world(
loot_item.item_id,
quantity,
combat['location_id']
@@ -395,7 +416,7 @@ async def handle_npc_death(player_id: int, combat: Dict, npc_def) -> str:
'required_tool': cl.required_tool
} for cl in npc_def.corpse_loot])
await database.create_npc_corpse(
await api_client.create_npc_corpse(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
loot_remaining=corpse_loot_json
@@ -403,7 +424,7 @@ async def handle_npc_death(player_id: int, combat: Dict, npc_def) -> str:
loot_msg += f"\n\n{npc_def.emoji} The corpse can be scavenged for more resources."
# End combat
await database.end_combat(player_id)
await api_client.end_combat(player_id)
message = f"🏆 Victory! {npc_def.death_message}"
message += f"\n+{npc_def.xp_reward} XP"
@@ -415,17 +436,19 @@ async def handle_npc_death(player_id: int, combat: Dict, npc_def) -> str:
async def handle_player_death(player_id: int):
"""Handle player death - create corpse bag with all items."""
player = await database.get_player(player_id)
inventory_items = await database.get_inventory(player_id)
player = await api_client.get_player(player_id)
inventory_items = await api_client.get_inventory(player_id)
# Check if combat was with a wandering enemy that should respawn
combat = await database.get_combat(player_id)
combat = await api_client.get_combat(player_id)
if combat and combat.get('from_wandering_enemy', False):
# Respawn the enemy at the same location
await database.spawn_wandering_enemy(
# Respawn the enemy at the same location with full HP
npc_def = NPCS.get(combat['npc_id'])
await api_client.spawn_wandering_enemy(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
lifetime_seconds=600 # 10 minutes
current_hp=npc_def.hp,
max_hp=npc_def.hp
)
# Create corpse bag if player has items
@@ -435,7 +458,7 @@ async def handle_player_death(player_id: int):
'quantity': item['quantity']
} for item in inventory_items])
await database.create_player_corpse(
await api_client.create_player_corpse(
player_name=player['name'],
location_id=player['location_id'],
items=items_json
@@ -443,11 +466,11 @@ async def handle_player_death(player_id: int):
# Remove all items from player
for item in inventory_items:
await database.remove_item_from_inventory(item['id'], item['quantity'])
await api_client.remove_item_from_inventory(item['id'], item['quantity'])
# Mark player as dead and end any combat
await database.update_player(player_id, {'is_dead': True, 'hp': 0})
await database.end_combat(player_id)
await api_client.update_player(player_id, {'is_dead': True, 'hp': 0})
await api_client.end_combat(player_id)
async def use_item_in_combat(player_id: int, item_db_id: int) -> Tuple[str, bool]:
@@ -455,11 +478,11 @@ async def use_item_in_combat(player_id: int, item_db_id: int) -> Tuple[str, bool
Use a consumable item during combat.
Returns: (message, turn_ended)
"""
combat = await database.get_combat(player_id)
combat = await api_client.get_combat(player_id)
if not combat or combat['turn'] != 'player':
return ("It's not your turn!", False)
item_data = await database.get_inventory_item(item_db_id)
item_data = await api_client.get_inventory_item(item_db_id)
if not item_data or item_data['player_id'] != player_id:
return ("You don't have that item!", False)
@@ -467,7 +490,7 @@ async def use_item_in_combat(player_id: int, item_db_id: int) -> Tuple[str, bool
if not item_def or item_def.get('type') != 'consumable':
return ("That item cannot be used in combat!", False)
player = await database.get_player(player_id)
player = await api_client.get_player(player_id)
# Apply consumable effects
message = f"💊 Used {item_def['name']}!"
@@ -487,16 +510,16 @@ async def use_item_in_combat(player_id: int, item_db_id: int) -> Tuple[str, bool
message += f"\n⚡ +{stamina_restore} Stamina"
if updates:
await database.update_player(player_id, updates)
await api_client.update_player(player_id, updates)
# Remove item from inventory
if item_data['quantity'] > 1:
await database.update_inventory_item(item_db_id, item_data['quantity'] - 1)
await api_client.update_inventory_item(item_db_id, item_data['quantity'] - 1)
else:
await database.remove_item_from_inventory(item_db_id, 1)
await api_client.remove_item_from_inventory(item_db_id, 1)
# Using an item ends your turn
await database.update_combat(player_id, {
await api_client.update_combat(player_id, {
'turn': 'npc',
'turn_started_at': time.time()
})

View File

@@ -2,7 +2,8 @@
Combat-related action handlers.
"""
import logging
from . import database, keyboards
from . import keyboards
from .api_client import api_client
from .utils import format_stat_bar
from data.world_loader import game_world
@@ -37,7 +38,7 @@ async def handle_combat_attack(query, user_id: int, player: dict, data: list = N
from .handlers import send_or_edit_with_image
await send_or_edit_with_image(query, text=message, reply_markup=None)
else:
combat_data = await database.get_combat(user_id)
combat_data = await api_client.get_combat(user_id)
if combat_data:
from data.npcs import NPCS
npc_def = NPCS.get(combat_data['npc_id'])
@@ -82,7 +83,7 @@ async def handle_combat_flee(query, user_id: int, player: dict, data: list = Non
from .handlers import send_or_edit_with_image
await send_or_edit_with_image(query, text=message, reply_markup=None)
else:
combat_data = await database.get_combat(user_id)
combat_data = await api_client.get_combat(user_id)
if combat_data:
from data.npcs import NPCS
npc_def = NPCS.get(combat_data['npc_id'])
@@ -124,7 +125,7 @@ async def handle_combat_use_item(query, user_id: int, player: dict, data: list):
reply_markup=None
)
else:
combat_data = await database.get_combat(user_id)
combat_data = await api_client.get_combat(user_id)
if combat_data:
from data.npcs import NPCS
npc_def = NPCS.get(combat_data['npc_id'])
@@ -143,7 +144,7 @@ async def handle_combat_use_item(query, user_id: int, player: dict, data: list):
async def handle_combat_back(query, user_id: int, player: dict, data: list = None):
"""Return to combat menu from item selection."""
await query.answer()
combat_data = await database.get_combat(user_id)
combat_data = await api_client.get_combat(user_id)
if combat_data:
from data.npcs import NPCS

View File

@@ -9,7 +9,8 @@ import json
from io import BytesIO
from telegram import Update
from telegram.ext import ContextTypes
from . import database, keyboards
from . import keyboards
from .api_client import api_client
from .utils import admin_only
from .action_handlers import get_player_status_text
from data.world_loader import game_world
@@ -19,23 +20,25 @@ logger = logging.getLogger(__name__)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command - initialize or show player status."""
from .api_client import api_client
user = update.effective_user
player = await database.get_player(user.id)
player = await api_client.get_player(user.id)
if not player:
await database.create_player(user.id, user.first_name)
player = await api_client.create_player(user.id, user.first_name)
await update.message.reply_html(
f"Welcome, {user.mention_html()}! Your story is just beginning."
)
# Get player status and location image
player = await database.get_player(user.id)
player = await api_client.get_player(user.id)
status_text = await get_player_status_text(user.id)
location = game_world.get_location(player['location_id'])
# Send with image if available
if location and location.image_path:
cached_file_id = await database.get_cached_image(location.image_path)
cached_file_id = await api_client.get_cached_image(location.image_path)
if cached_file_id:
await update.message.reply_photo(
photo=cached_file_id,
@@ -52,7 +55,7 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
parse_mode='HTML'
)
if msg.photo:
await database.cache_image(location.image_path, msg.photo[-1].file_id)
await api_client.cache_image(location.image_path, msg.photo[-1].file_id)
else:
await update.message.reply_html(
status_text,

View File

@@ -4,7 +4,8 @@ Corpse looting handlers (player and NPC corpses).
import logging
import json
import random
from . import database, keyboards, logic
from . import keyboards, logic
from .api_client import api_client
from data.world_loader import game_world
from data.items import ITEMS
@@ -14,7 +15,7 @@ logger = logging.getLogger(__name__)
async def handle_loot_player_corpse(query, user_id: int, player: dict, data: list):
"""Show player corpse loot menu."""
corpse_id = int(data[1])
corpse = await database.get_player_corpse(corpse_id)
corpse = await api_client.get_player_corpse(corpse_id)
if not corpse:
await query.answer("Corpse not found.", show_alert=False)
@@ -43,7 +44,7 @@ async def handle_take_corpse_item(query, user_id: int, player: dict, data: list)
corpse_id = int(data[1])
item_index = int(data[2])
corpse = await database.get_player_corpse(corpse_id)
corpse = await api_client.get_player_corpse(corpse_id)
if not corpse:
await query.answer("Corpse not found.", show_alert=False)
return
@@ -66,13 +67,13 @@ async def handle_take_corpse_item(query, user_id: int, player: dict, data: list)
return
# Add to inventory
await database.add_item_to_inventory(user_id, item_data['item_id'], item_data['quantity'])
await api_client.add_item_to_inventory(user_id, item_data['item_id'], item_data['quantity'])
# Remove from corpse
items.pop(item_index)
if items:
await database.update_player_corpse(corpse_id, json.dumps(items))
await api_client.update_player_corpse(corpse_id, json.dumps(items))
keyboard = keyboards.player_corpse_loot_keyboard(corpse_id, items)
location = game_world.get_location(player['location_id'])
@@ -90,15 +91,15 @@ async def handle_take_corpse_item(query, user_id: int, player: dict, data: list)
)
else:
# Bag is empty, remove it
await database.remove_player_corpse(corpse_id)
await api_client.remove_player_corpse(corpse_id)
await query.answer(
f"Took {item_def.get('name', 'Unknown')}. The bag is now empty.",
show_alert=False
)
location = game_world.get_location(player['location_id'])
dropped_items = await database.get_dropped_items_in_location(player['location_id'])
wandering_enemies = await database.get_wandering_enemies_in_location(player['location_id'])
dropped_items = await api_client.get_dropped_items_in_location(player['location_id'])
wandering_enemies = await api_client.get_wandering_enemies_in_location(player['location_id'])
keyboard = await keyboards.inspect_keyboard(player['location_id'], dropped_items, wandering_enemies)
from .handlers import send_or_edit_with_image
@@ -113,7 +114,7 @@ async def handle_take_corpse_item(query, user_id: int, player: dict, data: list)
async def handle_scavenge_npc_corpse(query, user_id: int, player: dict, data: list):
"""Show NPC corpse scavenging menu."""
corpse_id = int(data[1])
corpse = await database.get_npc_corpse(corpse_id)
corpse = await api_client.get_npc_corpse(corpse_id)
if not corpse:
await query.answer("Corpse not found.", show_alert=False)
@@ -144,7 +145,7 @@ async def handle_scavenge_corpse_item(query, user_id: int, player: dict, data: l
corpse_id = int(data[1])
loot_index = int(data[2])
corpse = await database.get_npc_corpse(corpse_id)
corpse = await api_client.get_npc_corpse(corpse_id)
if not corpse:
await query.answer("Corpse not found.", show_alert=False)
return
@@ -159,7 +160,7 @@ async def handle_scavenge_corpse_item(query, user_id: int, player: dict, data: l
# Check if player has required tool
if required_tool:
inventory_items = await database.get_inventory(user_id)
inventory_items = await api_client.get_inventory(user_id)
has_tool = any(item['item_id'] == required_tool for item in inventory_items)
if not has_tool:
@@ -184,13 +185,13 @@ async def handle_scavenge_corpse_item(query, user_id: int, player: dict, data: l
return
# Add to inventory
await database.add_item_to_inventory(user_id, loot_data['item_id'], quantity)
await api_client.add_item_to_inventory(user_id, loot_data['item_id'], quantity)
# Remove from corpse
loot_items.pop(loot_index)
if loot_items:
await database.update_npc_corpse(corpse_id, json.dumps(loot_items))
await api_client.update_npc_corpse(corpse_id, json.dumps(loot_items))
keyboard = keyboards.npc_corpse_scavenge_keyboard(corpse_id, loot_items)
location = game_world.get_location(player['location_id'])
@@ -214,15 +215,15 @@ async def handle_scavenge_corpse_item(query, user_id: int, player: dict, data: l
)
else:
# Nothing left, remove corpse
await database.remove_npc_corpse(corpse_id)
await api_client.remove_npc_corpse(corpse_id)
await query.answer(
f"Scavenged {quantity}x {item_def.get('name', 'Unknown')}. Nothing left on the corpse.",
show_alert=False
)
location = game_world.get_location(player['location_id'])
dropped_items = await database.get_dropped_items_in_location(player['location_id'])
wandering_enemies = await database.get_wandering_enemies_in_location(player['location_id'])
dropped_items = await api_client.get_dropped_items_in_location(player['location_id'])
wandering_enemies = await api_client.get_wandering_enemies_in_location(player['location_id'])
keyboard = await keyboards.inspect_keyboard(player['location_id'], dropped_items, wandering_enemies)
from .handlers import send_or_edit_with_image

View File

@@ -12,7 +12,28 @@ engine = create_async_engine(DATABASE_URL)
metadata = MetaData()
# ... (players, inventory, dropped_items tables are unchanged) ...
players = Table("players", metadata, Column("telegram_id", Integer, primary_key=True), Column("name", String, default="Survivor"), Column("hp", Integer, default=100), Column("max_hp", Integer, default=100), Column("stamina", Integer, default=20), Column("max_stamina", Integer, default=20), Column("strength", Integer, default=5), Column("agility", Integer, default=5), Column("endurance", Integer, default=5), Column("intellect", Integer, default=5), Column("location_id", String, default="start_point"), Column("is_dead", Boolean, default=False), Column("level", Integer, default=1), Column("xp", Integer, default=0), Column("unspent_points", Integer, default=0))
players = Table(
"players",
metadata,
Column("telegram_id", Integer, primary_key=True),
Column("id", Integer, unique=True, autoincrement=True), # Web users ID
Column("username", String(50), unique=True, nullable=True), # Web users username
Column("password_hash", String(255), nullable=True), # Web users password hash
Column("name", String, default="Survivor"),
Column("hp", Integer, default=100),
Column("max_hp", Integer, default=100),
Column("stamina", Integer, default=20),
Column("max_stamina", Integer, default=20),
Column("strength", Integer, default=5),
Column("agility", Integer, default=5),
Column("endurance", Integer, default=5),
Column("intellect", Integer, default=5),
Column("location_id", String, default="start_point"),
Column("is_dead", Boolean, default=False),
Column("level", Integer, default=1),
Column("xp", Integer, default=0),
Column("unspent_points", Integer, default=0)
)
inventory = Table("inventory", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("player_id", Integer, ForeignKey("players.telegram_id", ondelete="CASCADE")), Column("item_id", String), Column("quantity", Integer, default=1), Column("is_equipped", Boolean, default=False))
dropped_items = Table("dropped_items", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("item_id", String), Column("quantity", Integer, default=1), Column("location_id", String), Column("drop_timestamp", Float))
@@ -82,25 +103,74 @@ wandering_enemies = Table(
Column("despawn_timestamp", Float, nullable=False), # When this enemy should despawn
)
# Persistent status effects table
player_status_effects = Table(
"player_status_effects",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("player_id", Integer, ForeignKey("players.telegram_id", ondelete="CASCADE"), nullable=False),
Column("effect_name", String(50), nullable=False),
Column("effect_icon", String(10), nullable=False),
Column("damage_per_tick", Integer, nullable=False, default=0),
Column("ticks_remaining", Integer, nullable=False),
Column("applied_at", Float, nullable=False),
)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(metadata.create_all)
# ... (All other database functions are unchanged except the cooldown ones) ...
async def get_player(telegram_id: int):
async def get_player(telegram_id: int = None, player_id: int = None, username: str = None):
"""Get player by telegram_id, player_id (web users), or username."""
async with engine.connect() as conn:
result = await conn.execute(players.select().where(players.c.telegram_id == telegram_id))
if telegram_id is not None:
result = await conn.execute(players.select().where(players.c.telegram_id == telegram_id))
elif player_id is not None:
result = await conn.execute(players.select().where(players.c.id == player_id))
elif username is not None:
result = await conn.execute(players.select().where(players.c.username == username))
else:
return None
row = result.first()
return row._asdict() if row else None
async def create_player(telegram_id: int, name: str):
async def create_player(telegram_id: int = None, name: str = "Survivor", username: str = None, password_hash: str = None):
"""Create a player (Telegram or web user)."""
async with engine.connect() as conn:
await conn.execute(players.insert().values(telegram_id=telegram_id, name=name))
await conn.execute(inventory.insert().values(player_id=telegram_id, item_id="tattered_rucksack", is_equipped=True))
values = {
"name": name,
"telegram_id": telegram_id,
"username": username,
"password_hash": password_hash,
}
result = await conn.execute(players.insert().values(**values))
await conn.commit()
return await get_player(telegram_id)
async def update_player(telegram_id: int, updates: dict):
# For telegram users, the primary key is telegram_id
# For web users, we need to get the auto-generated id
if telegram_id:
# Add starting inventory for Telegram users
await conn.execute(inventory.insert().values(player_id=telegram_id, item_id="tattered_rucksack", is_equipped=True))
await conn.commit()
# Return the created player
if telegram_id:
return await get_player(telegram_id=telegram_id)
elif username:
return await get_player(username=username)
async def update_player(telegram_id: int = None, player_id: int = None, updates: dict = None):
"""Update player by telegram_id (Telegram users) or player_id (web users)."""
if updates is None:
updates = {}
async with engine.connect() as conn:
await conn.execute(players.update().where(players.c.telegram_id == telegram_id).values(**updates))
if telegram_id is not None:
await conn.execute(players.update().where(players.c.telegram_id == telegram_id).values(**updates))
elif player_id is not None:
await conn.execute(players.update().where(players.c.id == player_id).values(**updates))
else:
raise ValueError("Must provide either telegram_id or player_id")
await conn.commit()
async def get_inventory(player_id: int):
async with engine.connect() as conn:
@@ -526,3 +596,134 @@ async def get_all_active_wandering_enemies():
)
result = await conn.execute(stmt)
return [row._asdict() for row in result.fetchall()]
# ============================================================================
# STATUS EFFECTS
# ============================================================================
async def get_player_status_effects(player_id: int):
"""Get all active status effects for a player."""
async with engine.connect() as conn:
stmt = player_status_effects.select().where(
player_status_effects.c.player_id == player_id,
player_status_effects.c.ticks_remaining > 0
)
result = await conn.execute(stmt)
return [row._asdict() for row in result.fetchall()]
async def add_status_effect(player_id: int, effect_name: str, effect_icon: str,
damage_per_tick: int, ticks_remaining: int):
"""Add a new status effect to a player."""
async with engine.connect() as conn:
await conn.execute(
player_status_effects.insert().values(
player_id=player_id,
effect_name=effect_name,
effect_icon=effect_icon,
damage_per_tick=damage_per_tick,
ticks_remaining=ticks_remaining,
applied_at=time.time()
)
)
await conn.commit()
async def update_status_effect_ticks(effect_id: int, ticks_remaining: int):
"""Update the remaining ticks for a status effect."""
async with engine.connect() as conn:
await conn.execute(
player_status_effects.update().where(
player_status_effects.c.id == effect_id
).values(ticks_remaining=ticks_remaining)
)
await conn.commit()
async def remove_status_effect(effect_id: int):
"""Remove a specific status effect."""
async with engine.connect() as conn:
await conn.execute(
player_status_effects.delete().where(player_status_effects.c.id == effect_id)
)
await conn.commit()
async def remove_all_status_effects(player_id: int):
"""Remove all status effects from a player."""
async with engine.connect() as conn:
await conn.execute(
player_status_effects.delete().where(player_status_effects.c.player_id == player_id)
)
await conn.commit()
async def remove_status_effects_by_name(player_id: int, effect_name: str, count: int = 1):
"""
Remove a specific number of status effects by name for a player.
Used for treatment items that cure specific effects.
Returns the number of effects actually removed.
"""
async with engine.connect() as conn:
# Get the effects to remove
stmt = player_status_effects.select().where(
player_status_effects.c.player_id == player_id,
player_status_effects.c.effect_name == effect_name,
player_status_effects.c.ticks_remaining > 0
).limit(count)
result = await conn.execute(stmt)
effects_to_remove = result.fetchall()
# Remove them
effect_ids = [row.id for row in effects_to_remove]
if effect_ids:
await conn.execute(
player_status_effects.delete().where(
player_status_effects.c.id.in_(effect_ids)
)
)
await conn.commit()
return len(effect_ids)
async def get_all_players_with_status_effects():
"""Get all player IDs that have active status effects (for background processing)."""
async with engine.connect() as conn:
from sqlalchemy import distinct
stmt = player_status_effects.select().with_only_columns(
distinct(player_status_effects.c.player_id)
).where(player_status_effects.c.ticks_remaining > 0)
result = await conn.execute(stmt)
return [row[0] for row in result.fetchall()]
async def decrement_all_status_effect_ticks():
"""
Decrement ticks for all active status effects and return affected player IDs.
Used by background processor.
"""
async with engine.connect() as conn:
# Get player IDs with effects before updating
from sqlalchemy import distinct
stmt = player_status_effects.select().with_only_columns(
distinct(player_status_effects.c.player_id)
).where(player_status_effects.c.ticks_remaining > 0)
result = await conn.execute(stmt)
affected_players = [row[0] for row in result.fetchall()]
# Decrement ticks
await conn.execute(
player_status_effects.update().where(
player_status_effects.c.ticks_remaining > 0
).values(ticks_remaining=player_status_effects.c.ticks_remaining - 1)
)
# Remove expired effects
await conn.execute(
player_status_effects.delete().where(player_status_effects.c.ticks_remaining <= 0)
)
await conn.commit()
return affected_players

View File

@@ -14,7 +14,6 @@ All other functionality is organized in separate modules:
import logging
from telegram import Update
from telegram.ext import ContextTypes
from . import database
from .message_utils import send_or_edit_with_image
# Import organized action handlers
@@ -124,14 +123,18 @@ async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
Main router for button callbacks.
Delegates to specific handler functions based on action type.
All handlers have a unified signature: (query, user_id, player, data=None)
Note: user_id passed to handlers is actually the player's unique DB id (not telegram_id)
"""
from .api_client import api_client
query = update.callback_query
user_id = query.from_user.id
telegram_id = query.from_user.id
data = query.data.split(':')
action_type = data[0]
# Check if player exists and is alive
player = await database.get_player(user_id)
# Get player by telegram_id and translate to unique id
player = await api_client.get_player(telegram_id)
if not player or player['is_dead']:
await query.answer()
await send_or_edit_with_image(
@@ -141,8 +144,11 @@ async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
)
return
# From now on, use player's unique database id
user_id = player['id']
# Check if player is in combat - restrict most actions
combat = await database.get_combat(user_id)
combat = await api_client.get_combat(user_id)
allowed_in_combat = {
'combat_attack', 'combat_flee', 'combat_use_item_menu',
'combat_use_item', 'combat_back', 'no_op'

View File

@@ -3,7 +3,7 @@ Inventory-related action handlers.
"""
import logging
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from . import database, keyboards, logic
from . import keyboards, logic
from data.world_loader import game_world
from data.items import ITEMS
@@ -13,9 +13,12 @@ logger = logging.getLogger(__name__)
async def handle_inventory_menu(query, user_id: int, player: dict, data: list = None):
"""Display player inventory with item management options."""
from .utils import format_stat_bar
from .api_client import api_client
await query.answer()
inventory_items = await database.get_inventory(user_id)
# Get inventory from API
inv_result = await api_client.get_inventory(player['id'])
inventory_items = inv_result.get('inventory', [])
current_weight, current_volume = logic.calculate_inventory_load(inventory_items)
max_weight, max_volume = logic.get_player_capacity(inventory_items, player)
@@ -41,35 +44,50 @@ async def handle_inventory_menu(query, user_id: int, player: dict, data: list =
async def handle_inventory_item(query, user_id: int, player: dict, data: list):
"""Show details for a specific inventory item."""
"""Show details for a specific inventory item.
Note: item_db_id is the inventory row id from the API response.
We need to get the full inventory and find the item by id.
"""
from .api_client import api_client
await query.answer()
item_db_id = int(data[1])
item = await database.get_inventory_item(item_db_id)
item_def = ITEMS.get(item['item_id'], {})
emoji = item_def.get('emoji', '')
# Get inventory from API
inv_result = await api_client.get_inventory(user_id)
inventory_items = inv_result.get('inventory', [])
# Find the specific item
item = next((i for i in inventory_items if i['id'] == item_db_id), None)
if not item:
await query.answer("Item not found in inventory", show_alert=True)
return
emoji = item.get('emoji', '')
# Build item details text
text = f"{emoji} <b>{item_def.get('name', 'Unknown')}</b>\n"
text = f"{emoji} <b>{item.get('name', 'Unknown')}</b>\n"
description = item_def.get('description')
description = item.get('description')
if description:
text += f"<i>{description}</i>\n\n"
else:
text += "\n"
text += f"<b>Weight:</b> {item_def.get('weight', 0)} kg | <b>Volume:</b> {item_def.get('volume', 0)} vol\n"
text += f"<b>Weight:</b> {item.get('weight', 0)} kg | <b>Volume:</b> {item.get('volume', 0)} vol\n"
# Add weapon stats if applicable
if item_def.get('type') == 'weapon':
text += f"<b>Damage:</b> {item_def.get('damage_min', 0)}-{item_def.get('damage_max', 0)}\n"
if item.get('type') == 'weapon':
text += f"<b>Damage:</b> {item.get('damage_min', 0)}-{item.get('damage_max', 0)}\n"
# Add consumable effects if applicable
if item_def.get('type') == 'consumable':
if item.get('type') == 'consumable':
effects = []
if item_def.get('hp_restore'):
effects.append(f"❤️ +{item_def.get('hp_restore')} HP")
if item_def.get('stamina_restore'):
effects.append(f"⚡ +{item_def.get('stamina_restore')} Stamina")
if item.get('hp_restore'):
effects.append(f"❤️ +{item.get('hp_restore')} HP")
if item.get('stamina_restore'):
effects.append(f"⚡ +{item.get('stamina_restore')} Stamina")
if effects:
text += f"<b>Effects:</b> {', '.join(effects)}\n"
@@ -85,7 +103,7 @@ async def handle_inventory_item(query, user_id: int, player: dict, data: list):
query,
text=text,
reply_markup=keyboards.inventory_item_actions_keyboard(
item_db_id, item_def, item.get('is_equipped', False), item['quantity']
item_db_id, item, item.get('is_equipped', False), item['quantity']
),
image_path=location_image
)
@@ -94,60 +112,38 @@ async def handle_inventory_item(query, user_id: int, player: dict, data: list):
async def handle_inventory_use(query, user_id: int, player: dict, data: list):
"""Use a consumable item from inventory."""
from .utils import format_stat_bar
from .api_client import api_client
item_db_id = int(data[1])
item = await database.get_inventory_item(item_db_id)
# Get inventory from API to find the item
inv_result = await api_client.get_inventory(user_id)
inventory_items = inv_result.get('inventory', [])
item = next((i for i in inventory_items if i['id'] == item_db_id), None)
if not item:
await query.answer("Item not found.", show_alert=False)
return
item_def = ITEMS.get(item['item_id'], {})
if item_def.get('type') != 'consumable':
if item.get('type') != 'consumable':
await query.answer("This item cannot be used.", show_alert=False)
return
await query.answer()
# Apply item effects
result_parts = []
updates = {}
# Use the API to use the item
result = await api_client.use_item(user_id, item['item_id'])
if 'hp_restore' in item_def:
hp_gain = item_def['hp_restore']
new_hp = min(player['max_hp'], player['hp'] + hp_gain)
actual_gain = new_hp - player['hp']
updates['hp'] = new_hp
if actual_gain > 0:
result_parts.append(f"❤️ HP: +{actual_gain}")
else:
result_parts.append(f"❤️ HP: Already at maximum!")
if 'stamina_restore' in item_def:
stamina_gain = item_def['stamina_restore']
new_stamina = min(player['max_stamina'], player['stamina'] + stamina_gain)
actual_gain = new_stamina - player['stamina']
updates['stamina'] = new_stamina
if actual_gain > 0:
result_parts.append(f"⚡ Stamina: +{actual_gain}")
else:
result_parts.append(f"⚡ Stamina: Already at maximum!")
if updates:
await database.update_player(user_id, updates)
if not result.get('success'):
await query.answer(result.get('message', 'Failed to use item'), show_alert=True)
return
# Refresh player data to get updated stats
player = await database.get_player(user_id)
player = await api_client.get_player_by_id(user_id)
# Remove one item from inventory
if item['quantity'] > 1:
await database.update_inventory_item(item['id'], quantity=item['quantity'] - 1)
else:
await database.remove_item_from_inventory(item['id'])
# Show updated inventory
inventory_items = await database.get_inventory(user_id)
# Get updated inventory
inv_result = await api_client.get_inventory(user_id)
inventory_items = inv_result.get('inventory', [])
current_weight, current_volume = logic.calculate_inventory_load(inventory_items)
max_weight, max_volume = logic.get_player_capacity(inventory_items, player)
@@ -159,13 +155,8 @@ async def handle_inventory_use(query, user_id: int, player: dict, data: list):
text += f"📦 Volume: {current_volume}/{max_volume} vol\n"
text += "━━━━━━━━━━━━━━━━━━━━\n"
# Build result message
emoji = item_def.get('emoji', '')
text += f"<b>✨ Used {emoji} {item_def.get('name')}</b>\n"
if result_parts:
text += "\n".join(result_parts)
else:
text += "No effect."
# Build result message from API response
text += result.get('message', 'Item used.')
location = game_world.get_location(player['location_id'])
location_image = location.image_path if location else None
@@ -181,33 +172,38 @@ async def handle_inventory_use(query, user_id: int, player: dict, data: list):
async def handle_inventory_drop(query, user_id: int, player: dict, data: list):
"""Drop an item from inventory to the world."""
from .api_client import api_client
item_db_id = int(data[1])
drop_amount_str = data[2] if len(data) > 2 else None
item = await database.get_inventory_item(item_db_id)
# Get inventory to find the item
inv_result = await api_client.get_inventory(user_id)
inventory_items = inv_result.get('inventory', [])
item = next((i for i in inventory_items if i['id'] == item_db_id), None)
if not item:
await query.answer("Item not found.", show_alert=False)
return
item_def = ITEMS.get(item['item_id'], {})
# Determine how much to drop
if drop_amount_str is None or drop_amount_str == "all":
await database.drop_item_to_world(item['item_id'], item['quantity'], player['location_id'])
await database.remove_item_from_inventory(item['id'], quantity=item['quantity'])
await query.answer(f"You dropped all {item['quantity']}x {item_def.get('name')}.", show_alert=False)
drop_amount = item['quantity']
else:
drop_amount = int(drop_amount_str)
if drop_amount >= item['quantity']:
await database.drop_item_to_world(item['item_id'], item['quantity'], player['location_id'])
await database.remove_item_from_inventory(item['id'], quantity=item['quantity'])
await query.answer(f"You dropped all {item['quantity']}x {item_def.get('name')}.", show_alert=False)
else:
await database.drop_item_to_world(item['item_id'], drop_amount, player['location_id'])
await database.update_inventory_item(item['id'], quantity=item['quantity'] - drop_amount)
await query.answer(f"You dropped {drop_amount}x {item_def.get('name')}.", show_alert=False)
drop_amount = min(int(drop_amount_str), item['quantity'])
inventory_items = await database.get_inventory(user_id)
# Use API to drop item
result = await api_client.drop_item(user_id, item['item_id'], drop_amount)
if result.get('success'):
await query.answer(result.get('message', f"Dropped {drop_amount}x {item['name']}"), show_alert=False)
else:
await query.answer(result.get('message', 'Failed to drop item'), show_alert=True)
return
# Get updated inventory
inv_result = await api_client.get_inventory(user_id)
inventory_items = inv_result.get('inventory', [])
current_weight, current_volume = logic.calculate_inventory_load(inventory_items)
max_weight, max_volume = logic.get_player_capacity(inventory_items, player)
@@ -232,54 +228,46 @@ async def handle_inventory_drop(query, user_id: int, player: dict, data: list):
async def handle_inventory_equip(query, user_id: int, player: dict, data: list):
"""Equip an item from inventory."""
from .api_client import api_client
item_db_id = int(data[1])
item = await database.get_inventory_item(item_db_id)
# Get inventory to find the item
inv_result = await api_client.get_inventory(user_id)
inventory_items = inv_result.get('inventory', [])
item = next((i for i in inventory_items if i['id'] == item_db_id), None)
if not item:
await query.answer("Item not found.", show_alert=False)
return
item_def = ITEMS.get(item['item_id'], {})
item_slot = item_def.get('slot')
if not item_slot:
if not item.get('equippable'):
await query.answer("This item cannot be equipped.", show_alert=False)
return
# Unequip any item in the same slot
inventory_items = await database.get_inventory(user_id)
for inv_item in inventory_items:
if inv_item.get('is_equipped'):
inv_item_def = ITEMS.get(inv_item['item_id'], {})
if inv_item_def.get('slot') == item_slot:
await database.update_inventory_item(inv_item['id'], is_equipped=False)
# Use API to equip item
result = await api_client.equip_item(user_id, item['item_id'])
# If equipping from a stack, split the stack
if item['quantity'] > 1:
await database.update_inventory_item(item_db_id, quantity=item['quantity'] - 1)
new_item_id = await database.add_equipped_item_to_inventory(user_id, item['item_id'])
await query.answer(f"Equipped {item_def.get('name')}!", show_alert=False)
item = await database.get_inventory_item(new_item_id)
item_db_id = new_item_id
else:
await database.update_inventory_item(item_db_id, is_equipped=True)
await query.answer(f"Equipped {item_def.get('name')}!", show_alert=False)
item = await database.get_inventory_item(item_db_id)
if not result.get('success'):
await query.answer(result.get('message', 'Failed to equip item'), show_alert=True)
return
await query.answer(result.get('message', f"Equipped {item['name']}!"), show_alert=False)
# Refresh the item view
emoji = item_def.get('emoji', '')
text = f"{emoji} <b>{item_def.get('name', 'Unknown')}</b>\n"
emoji = item.get('emoji', '')
text = f"{emoji} <b>{item.get('name', 'Unknown')}</b>\n"
description = item_def.get('description')
description = item.get('description')
if description:
text += f"<i>{description}</i>\n\n"
else:
text += "\n"
text += f"<b>Weight:</b> {item_def.get('weight', 0)} kg | <b>Volume:</b> {item_def.get('volume', 0)} vol\n"
text += f"<b>Weight:</b> {item.get('weight', 0)} kg | <b>Volume:</b> {item.get('volume', 0)} vol\n"
if item_def.get('type') == 'weapon':
text += f"<b>Damage:</b> {item_def.get('damage_min', 0)}-{item_def.get('damage_max', 0)}\n"
if item.get('type') == 'weapon':
text += f"<b>Damage:</b> {item.get('damage_min', 0)}-{item.get('damage_max', 0)}\n"
text += "\n✅ <b>Currently Equipped</b>"
@@ -291,7 +279,7 @@ async def handle_inventory_equip(query, user_id: int, player: dict, data: list):
query,
text=text,
reply_markup=keyboards.inventory_item_actions_keyboard(
item_db_id, item_def, True, item['quantity']
item_db_id, item, True, item['quantity']
),
image_path=location_image
)
@@ -299,52 +287,42 @@ async def handle_inventory_equip(query, user_id: int, player: dict, data: list):
async def handle_inventory_unequip(query, user_id: int, player: dict, data: list):
"""Unequip an item."""
from .api_client import api_client
item_db_id = int(data[1])
item = await database.get_inventory_item(item_db_id)
# Get inventory to find the item
inv_result = await api_client.get_inventory(user_id)
inventory_items = inv_result.get('inventory', [])
item = next((i for i in inventory_items if i['id'] == item_db_id), None)
if not item:
await query.answer("Item not found.", show_alert=False)
return
item_def = ITEMS.get(item['item_id'], {})
# Use API to unequip item
result = await api_client.unequip_item(user_id, item['item_id'])
# Check if there's an existing unequipped stack
inventory_items = await database.get_inventory(user_id)
existing_stack = None
for inv_item in inventory_items:
if (inv_item['item_id'] == item['item_id'] and
not inv_item.get('is_equipped') and
inv_item['id'] != item_db_id):
existing_stack = inv_item
break
if not result.get('success'):
await query.answer(result.get('message', 'Failed to unequip item'), show_alert=True)
return
if existing_stack:
# Merge into existing stack
await database.update_inventory_item(existing_stack['id'], quantity=existing_stack['quantity'] + 1)
await database.remove_item_from_inventory(item_db_id)
await query.answer(f"Unequipped {item_def.get('name')}.", show_alert=False)
item = await database.get_inventory_item(existing_stack['id'])
item_db_id = existing_stack['id']
else:
# Just unequip
await database.update_inventory_item(item_db_id, is_equipped=False)
await query.answer(f"Unequipped {item_def.get('name')}.", show_alert=False)
item = await database.get_inventory_item(item_db_id)
await query.answer(result.get('message', f"Unequipped {item['name']}."), show_alert=False)
# Refresh the item view
emoji = item_def.get('emoji', '')
text = f"{emoji} <b>{item_def.get('name', 'Unknown')}</b>\n"
emoji = item.get('emoji', '')
text = f"{emoji} <b>{item.get('name', 'Unknown')}</b>\n"
description = item_def.get('description')
description = item.get('description')
if description:
text += f"<i>{description}</i>\n\n"
else:
text += "\n"
text += f"<b>Weight:</b> {item_def.get('weight', 0)} kg | <b>Volume:</b> {item_def.get('volume', 0)} vol\n"
text += f"<b>Weight:</b> {item.get('weight', 0)} kg | <b>Volume:</b> {item.get('volume', 0)} vol\n"
if item_def.get('type') == 'weapon':
text += f"<b>Damage:</b> {item_def.get('damage_min', 0)}-{item_def.get('damage_max', 0)}\n"
if item.get('type') == 'weapon':
text += f"<b>Damage:</b> {item.get('damage_min', 0)}-{item.get('damage_max', 0)}\n"
location = game_world.get_location(player['location_id'])
location_image = location.image_path if location else None
@@ -354,7 +332,7 @@ async def handle_inventory_unequip(query, user_id: int, player: dict, data: list
query,
text=text,
reply_markup=keyboards.inventory_item_actions_keyboard(
item_db_id, item_def, False, item['quantity']
item_db_id, item, False, item['quantity']
),
image_path=location_image
)

View File

@@ -17,12 +17,13 @@ async def move_keyboard(current_location_id: str, player_id: int) -> InlineKeybo
[ Other exits (inside, down, etc.) ]
[ Back ]
"""
from bot import database, logic
from bot import logic
from bot.api_client import api_client
keyboard = []
location = game_world.get_location(current_location_id)
player = await database.get_player(player_id)
inventory = await database.get_inventory(player_id)
player = await api_client.get_player(player_id)
inventory = await api_client.get_inventory(player_id)
if location and player:
# Dictionary to hold direction buttons
@@ -157,7 +158,7 @@ async def move_keyboard(current_location_id: str, player_id: int) -> InlineKeybo
return InlineKeyboardMarkup(keyboard)
async def inspect_keyboard(location_id: str, dropped_items: list, wandering_enemies: list = None) -> InlineKeyboardMarkup:
from bot import database
from bot.api_client import api_client
from data.npcs import NPCS
keyboard = []
@@ -191,7 +192,7 @@ async def inspect_keyboard(location_id: str, dropped_items: list, wandering_enem
has_available_action = False
for action_id in interactable.actions.keys():
cooldown_key = f"{instance_id}:{action_id}"
if await database.get_cooldown(cooldown_key) == 0:
if await api_client.get_cooldown(cooldown_key) == 0:
has_available_action = True
break
if not has_available_action and len(interactable.actions) > 0:
@@ -218,7 +219,7 @@ async def inspect_keyboard(location_id: str, dropped_items: list, wandering_enem
keyboard.append(row)
# Show player corpse bags
player_corpses = await database.get_player_corpses_in_location(location_id)
player_corpses = await api_client.get_player_corpses_in_location(location_id)
if player_corpses:
keyboard.append([InlineKeyboardButton("--- Fallen survivors ---", callback_data="no_op")])
row = []
@@ -235,7 +236,7 @@ async def inspect_keyboard(location_id: str, dropped_items: list, wandering_enem
keyboard.append(row)
# Show NPC corpses
npc_corpses = await database.get_npc_corpses_in_location(location_id)
npc_corpses = await api_client.get_npc_corpses_in_location(location_id)
if npc_corpses:
if not player_corpses: # Only add separator if not already added
keyboard.append([InlineKeyboardButton("--- Corpses ---", callback_data="no_op")])
@@ -308,7 +309,7 @@ def pickup_options_keyboard(item_id: int, item_name: str, quantity: int) -> Inli
return InlineKeyboardMarkup(keyboard)
async def actions_keyboard(location_id: str, instance_id: str) -> InlineKeyboardMarkup:
from bot import database
from bot.api_client import api_client
keyboard = []
location = game_world.get_location(location_id)
@@ -318,7 +319,7 @@ async def actions_keyboard(location_id: str, instance_id: str) -> InlineKeyboard
if interactable:
for action_id, action in interactable.actions.items():
cooldown_key = f"{instance_id}:{action_id}"
cooldown = await database.get_cooldown(cooldown_key)
cooldown = await api_client.get_cooldown(cooldown_key)
label = action.label
# Add stamina cost to the label
if action.stamina_cost > 0:
@@ -487,7 +488,7 @@ def inventory_item_actions_keyboard(item_db_id: int, item_def: dict, is_equipped
async def combat_keyboard(player_id: int) -> InlineKeyboardMarkup:
"""Create combat action keyboard."""
from bot import database
from bot.api_client import api_client
keyboard = []
# Attack option
@@ -497,20 +498,23 @@ async def combat_keyboard(player_id: int) -> InlineKeyboardMarkup:
keyboard.append([InlineKeyboardButton("🏃 Try to Flee", callback_data="combat_flee")])
# Use item option (show consumables)
inventory_items = await database.get_inventory(player_id)
inventory_items = await api_client.get_inventory(player_id)
consumables = [item for item in inventory_items if ITEMS.get(item['item_id'], {}).get('type') == 'consumable']
if consumables:
keyboard.append([InlineKeyboardButton("💊 Use Item", callback_data="combat_use_item_menu")])
# Profile button (no effect on turn, just info)
keyboard.append([InlineKeyboardButton("👤 Profile", callback_data="profile")])
return InlineKeyboardMarkup(keyboard)
async def combat_items_keyboard(player_id: int) -> InlineKeyboardMarkup:
"""Show consumable items during combat."""
from bot import database
from bot.api_client import api_client
keyboard = []
inventory_items = await database.get_inventory(player_id)
inventory_items = await api_client.get_inventory(player_id)
consumables = [item for item in inventory_items if ITEMS.get(item['item_id'], {}).get('type') == 'consumable']
if consumables:

View File

@@ -52,13 +52,13 @@ async def can_add_item_to_inventory(user_id: int, item_id: str, quantity: int) -
Check if an item can be added to the player's inventory.
Returns (can_add, reason_if_not)
"""
from . import database
from .api_client import api_client
player = await database.get_player(user_id)
player = await api_client.get_player(user_id)
if not player:
return False, "Player not found."
inventory = await database.get_inventory(user_id)
inventory = await api_client.get_inventory(user_id)
item_def = ITEMS.get(item_id)
if not item_def:

View File

@@ -7,7 +7,7 @@ import logging
import os
from telegram import InlineKeyboardMarkup, InputMediaPhoto
from telegram.error import BadRequest
from . import database
from .api_client import api_client
logger = logging.getLogger(__name__)
@@ -30,7 +30,7 @@ async def send_or_edit_with_image(query, text: str, reply_markup: InlineKeyboard
if image_path:
# Get or upload image
cached_file_id = await database.get_cached_image(image_path)
cached_file_id = await api_client.get_cached_image(image_path)
if not cached_file_id and os.path.exists(image_path):
# Upload new image
@@ -44,7 +44,7 @@ async def send_or_edit_with_image(query, text: str, reply_markup: InlineKeyboard
)
if temp_msg.photo:
cached_file_id = temp_msg.photo[-1].file_id
await database.cache_image(image_path, cached_file_id)
await api_client.cache_image(image_path, cached_file_id)
# Delete old message to keep chat clean
try:
await current_message.delete()

View File

@@ -2,7 +2,8 @@
Pickup and item collection handlers.
"""
import logging
from . import database, keyboards, logic
from . import keyboards, logic
from .api_client import api_client
from data.world_loader import game_world
from data.items import ITEMS
@@ -12,14 +13,14 @@ logger = logging.getLogger(__name__)
async def handle_pickup_menu(query, user_id: int, player: dict, data: list):
"""Show pickup options for a dropped item."""
dropped_item_id = int(data[1])
item_to_pickup = await database.get_dropped_item(dropped_item_id)
item_to_pickup = await api_client.get_dropped_item(dropped_item_id)
if not item_to_pickup:
await query.answer("Someone already picked that up!", show_alert=False)
location_id = player['location_id']
location = game_world.get_location(location_id)
dropped_items = await database.get_dropped_items_in_location(location_id)
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
dropped_items = await api_client.get_dropped_items_in_location(location_id)
wandering_enemies = await api_client.get_wandering_enemies_in_location(location_id)
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
image_path = location.image_path if location else None
@@ -64,13 +65,13 @@ async def handle_pickup(query, user_id: int, player: dict, data: list):
dropped_item_id = int(data[1])
pickup_amount_str = data[2] if len(data) > 2 else "all"
item_to_pickup = await database.get_dropped_item(dropped_item_id)
item_to_pickup = await api_client.get_dropped_item(dropped_item_id)
if not item_to_pickup:
await query.answer("Someone already picked that up!", show_alert=False)
location_id = player['location_id']
location = game_world.get_location(location_id)
dropped_items = await database.get_dropped_items_in_location(location_id)
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
dropped_items = await api_client.get_dropped_items_in_location(location_id)
wandering_enemies = await api_client.get_wandering_enemies_in_location(location_id)
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
image_path = location.image_path if location else None
@@ -99,20 +100,20 @@ async def handle_pickup(query, user_id: int, player: dict, data: list):
return
# Add to inventory
await database.add_item_to_inventory(user_id, item_to_pickup['item_id'], pickup_amount)
await api_client.add_item_to_inventory(user_id, item_to_pickup['item_id'], pickup_amount)
# Update or remove dropped item
remaining = item_to_pickup['quantity'] - pickup_amount
item_def = ITEMS.get(item_to_pickup['item_id'], {})
if remaining > 0:
await database.update_dropped_item(dropped_item_id, remaining)
await api_client.update_dropped_item(dropped_item_id, remaining)
await query.answer(
f"Picked up {pickup_amount}x {item_def.get('name', 'item')}. {remaining} remaining.",
show_alert=False
)
else:
await database.remove_dropped_item(dropped_item_id)
await api_client.remove_dropped_item(dropped_item_id)
await query.answer(
f"Picked up {pickup_amount}x {item_def.get('name', 'item')}.",
show_alert=False
@@ -121,8 +122,8 @@ async def handle_pickup(query, user_id: int, player: dict, data: list):
# Return to inspect area
location_id = player['location_id']
location = game_world.get_location(location_id)
dropped_items = await database.get_dropped_items_in_location(location_id)
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
dropped_items = await api_client.get_dropped_items_in_location(location_id)
wandering_enemies = await api_client.get_wandering_enemies_in_location(location_id)
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
image_path = location.image_path if location else None

View File

@@ -3,7 +3,7 @@ Profile and character stat management handlers.
"""
import logging
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from . import database, keyboards
from . import keyboards
from data.world_loader import game_world
logger = logging.getLogger(__name__)
@@ -48,7 +48,22 @@ async def handle_profile(query, user_id: int, player: dict, data: list = None):
profile_text += f"<b>Combat:</b>\n"
profile_text += f"⚔️ Base Damage: {5 + player['strength'] // 2 + player['level']}\n"
profile_text += f"🛡️ Flee Chance: {int((0.5 + player['agility'] / 100) * 100)}%\n"
profile_text += f"💚 Stamina Regen: {1 + player['endurance'] // 10}/cycle\n"
profile_text += f"💚 Stamina Regen: {1 + player['endurance'] // 10}/cycle\n\n"
# Show status effects if any
try:
from .api_client import api_client
status_effects = await api_client.get_player_status_effects(user_id)
if status_effects:
from bot.status_utils import get_status_details
from .api_client import api_client
# Check if player is in combat
combat_state = await api_client.get_combat(user_id)
in_combat = combat_state is not None
profile_text += f"<b>Status Effects:</b>\n"
profile_text += get_status_details(status_effects, in_combat=in_combat) + "\n\n"
except:
pass # Status effects not critical, skip if error
location = game_world.get_location(player['location_id'])
location_image = location.image_path if location else None
@@ -124,7 +139,8 @@ async def handle_spend_point(query, user_id: int, player: dict, data: list):
new_value = player[db_field] + increase
new_unspent = unspent - 1
await database.update_player(user_id, {
from .api_client import api_client
await api_client.update_player(user_id, {
db_field: new_value,
'unspent_points': new_unspent
})

119
bot/status_utils.py Normal file
View File

@@ -0,0 +1,119 @@
"""
Status effect utilities for display and management.
"""
from collections import defaultdict
def stack_status_effects(effects: list) -> dict:
"""
Stack status effects by name, summing damage and counting stacks.
Args:
effects: List of dicts with keys: effect_name, effect_icon, damage_per_tick, ticks_remaining
Returns:
Dict with keys: effect_name -> {icon, total_damage, stacks, min_ticks, effects: [list of effect dicts]}
"""
stacked = defaultdict(lambda: {
'icon': '',
'total_damage': 0,
'stacks': 0,
'min_ticks': float('inf'),
'max_ticks': 0,
'effects': []
})
for effect in effects:
name = effect['effect_name']
stacked[name]['icon'] = effect['effect_icon']
stacked[name]['total_damage'] += effect.get('damage_per_tick', 0)
stacked[name]['stacks'] += 1
stacked[name]['min_ticks'] = min(stacked[name]['min_ticks'], effect['ticks_remaining'])
stacked[name]['max_ticks'] = max(stacked[name]['max_ticks'], effect['ticks_remaining'])
stacked[name]['effects'].append(effect)
return dict(stacked)
def get_status_summary(effects: list, in_combat: bool = False) -> str:
"""
Generate compact status summary for display in menus.
Args:
effects: List of status effect dicts
in_combat: If True, show "turns" instead of "cycles"
Returns:
String like "Statuses: 🩸 (-4), ☣️ (-3)" or empty string if no effects
"""
if not effects:
return ""
stacked = stack_status_effects(effects)
if not stacked:
return ""
parts = []
for name, data in stacked.items():
if data['total_damage'] > 0:
parts.append(f"{data['icon']} (-{data['total_damage']})")
else:
parts.append(f"{data['icon']}")
return "Statuses: " + ", ".join(parts)
def get_status_details(effects: list, in_combat: bool = False) -> str:
"""
Generate detailed status display for profile menu.
Args:
effects: List of status effect dicts
in_combat: If True, show "turns" instead of "cycles"
Returns:
Multi-line string with detailed effect info
"""
if not effects:
return "No active status effects."
stacked = stack_status_effects(effects)
lines = []
for name, data in stacked.items():
# Build effect line
effect_line = f"{data['icon']} {name.replace('_', ' ').title()}"
# Add damage info
if data['total_damage'] > 0:
effect_line += f": -{data['total_damage']} HP/{'turn' if in_combat else 'cycle'}"
# Add tick info
if data['stacks'] == 1:
tick_unit = 'turn' if in_combat else 'cycle'
tick_count = data['min_ticks']
effect_line += f" ({tick_count} {tick_unit}{'s' if tick_count != 1 else ''} left)"
else:
tick_unit = 'turns' if in_combat else 'cycles'
if data['min_ticks'] == data['max_ticks']:
effect_line += f" (×{data['stacks']}, {data['min_ticks']} {tick_unit} left)"
else:
effect_line += f" (×{data['stacks']}, {data['min_ticks']}-{data['max_ticks']} {tick_unit} left)"
lines.append(effect_line)
return "\n".join(lines)
def calculate_status_damage(effects: list) -> int:
"""
Calculate total damage from all status effects.
Args:
effects: List of status effect dicts
Returns:
Total damage per tick
"""
return sum(effect.get('damage_per_tick', 0) for effect in effects)