Files
echoes-of-the-ash/bot/handlers.py
Joan c0783340b0 Unify all handler signatures and simplify router
- Standardize all handlers to signature: (query, user_id, player, data=None)
- Replace 125-line if/elif chain with HANDLER_MAP dictionary
- Reduce router code by 90 lines (72% reduction)
- Add HANDLER_MAP registry for cleaner organization
- Enable future auto-discovery and decorator patterns
- Maintain full backward compatibility
- Document changes in HANDLER_REFACTORING_V2.md

Benefits:
- O(1) handler lookup vs O(n) if/elif chain
- Add new handlers by just updating the map
- Consistent signature makes code easier to understand
- Opens doors for middleware and hooks
2025-10-20 12:22:07 +02:00

363 lines
13 KiB
Python

"""
Main handlers for the Telegram bot.
This module contains the core message routing and utility functions.
All specific action handlers are organized in separate modules.
"""
import logging
import os
import json
from telegram import Update, InlineKeyboardMarkup, InputMediaPhoto
from telegram.ext import ContextTypes
from telegram.error import BadRequest
from . import database, keyboards
from .utils import admin_only
from data.world_loader import game_world
# Import organized action handlers
from .action_handlers import (
get_player_status_text,
handle_inspect_area,
handle_attack_wandering,
handle_inspect_interactable,
handle_action,
handle_main_menu,
handle_move_menu,
handle_move
)
from .inventory_handlers import (
handle_inventory_menu,
handle_inventory_item,
handle_inventory_use,
handle_inventory_drop,
handle_inventory_equip,
handle_inventory_unequip
)
from .pickup_handlers import (
handle_pickup_menu,
handle_pickup
)
from .combat_handlers import (
handle_combat_attack,
handle_combat_flee,
handle_combat_use_item_menu,
handle_combat_use_item,
handle_combat_back
)
from .profile_handlers import (
handle_profile,
handle_spend_points_menu,
handle_spend_point
)
from .corpse_handlers import (
handle_loot_player_corpse,
handle_take_corpse_item,
handle_scavenge_npc_corpse,
handle_scavenge_corpse_item
)
logger = logging.getLogger(__name__)
# ============================================================================
# HANDLER REGISTRY
# ============================================================================
# Map of action types to their handler functions
# All handlers have signature: async def handle_*(query, user_id, player, data=None)
HANDLER_MAP = {
# Inspection & World Interaction
'inspect_area': handle_inspect_area,
'inspect_area_menu': handle_inspect_area,
'attack_wandering': handle_attack_wandering,
'inspect': handle_inspect_interactable,
'action': handle_action,
# Navigation & Menu
'main_menu': handle_main_menu,
'move_menu': handle_move_menu,
'move': handle_move,
# Profile & Stats
'profile': handle_profile,
'spend_points_menu': handle_spend_points_menu,
'spend_point': handle_spend_point,
# Inventory Management
'inventory_menu': handle_inventory_menu,
'inventory_item': handle_inventory_item,
'inventory_use': handle_inventory_use,
'inventory_drop': handle_inventory_drop,
'inventory_equip': handle_inventory_equip,
'inventory_unequip': handle_inventory_unequip,
# Item Pickup
'pickup_menu': handle_pickup_menu,
'pickup': handle_pickup,
# Combat Actions
'combat_attack': handle_combat_attack,
'combat_flee': handle_combat_flee,
'combat_use_item_menu': handle_combat_use_item_menu,
'combat_use_item': handle_combat_use_item,
'combat_back': handle_combat_back,
# Corpse Looting
'loot_player_corpse': handle_loot_player_corpse,
'take_corpse_item': handle_take_corpse_item,
'scavenge_npc_corpse': handle_scavenge_npc_corpse,
'scavenge_corpse_item': handle_scavenge_corpse_item,
}
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
async def send_or_edit_with_image(query, text: str, reply_markup: InlineKeyboardMarkup,
image_path: str = None, parse_mode='HTML'):
"""
Send a message with an image (as caption) or edit existing message.
Uses edit_message_media for smooth transitions when changing images.
"""
current_message = query.message
has_photo = bool(current_message.photo)
if image_path:
# Get or upload image
cached_file_id = await database.get_cached_image(image_path)
if not cached_file_id and os.path.exists(image_path):
# Upload new image
try:
with open(image_path, 'rb') as img_file:
temp_msg = await current_message.reply_photo(
photo=img_file,
caption=text,
reply_markup=reply_markup,
parse_mode=parse_mode
)
if temp_msg.photo:
cached_file_id = temp_msg.photo[-1].file_id
await database.cache_image(image_path, cached_file_id)
# Delete old message to keep chat clean
try:
await current_message.delete()
except:
pass
return
except Exception as e:
logger.error(f"Error uploading image: {e}")
cached_file_id = None
if cached_file_id:
# Check if current message has same photo
if has_photo:
current_file_id = current_message.photo[-1].file_id
if current_file_id == cached_file_id:
# Same image, just edit caption
try:
await query.edit_message_caption(
caption=text,
reply_markup=reply_markup,
parse_mode=parse_mode
)
return
except BadRequest as e:
if "Message is not modified" in str(e):
return
else:
# Different image - use edit_message_media for smooth transition
try:
media = InputMediaPhoto(
media=cached_file_id,
caption=text,
parse_mode=parse_mode
)
await query.edit_message_media(
media=media,
reply_markup=reply_markup
)
return
except Exception as e:
logger.error(f"Error editing message media: {e}")
# Current message has no photo - need to delete and send new
if not has_photo:
try:
await current_message.delete()
except:
pass
try:
await current_message.reply_photo(
photo=cached_file_id,
caption=text,
reply_markup=reply_markup,
parse_mode=parse_mode
)
except Exception as e:
logger.error(f"Error sending cached image: {e}")
else:
# No image requested
if has_photo:
# Current message has photo, need to delete and send text-only
try:
await current_message.delete()
except:
pass
await current_message.reply_html(text=text, reply_markup=reply_markup)
else:
# Both text-only, just edit
try:
await query.edit_message_text(text=text, reply_markup=reply_markup, parse_mode=parse_mode)
except BadRequest as e:
if "Message is not modified" not in str(e):
await current_message.reply_html(text=text, reply_markup=reply_markup)
# ============================================================================
# COMMAND HANDLERS
# ============================================================================
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command - initialize or show player status."""
user = update.effective_user
player = await database.get_player(user.id)
if not player:
await database.create_player(user.id, user.first_name)
await update.message.reply_html(
f"Welcome, {user.mention_html()}! Your story is just beginning."
)
# Get player status and location image
player = await database.get_player(user.id)
status_text = await get_player_status_text(user.id)
location = game_world.get_location(player['location_id'])
# Send with image if available
if location and location.image_path:
cached_file_id = await database.get_cached_image(location.image_path)
if cached_file_id:
await update.message.reply_photo(
photo=cached_file_id,
caption=status_text,
reply_markup=keyboards.main_menu_keyboard(),
parse_mode='HTML'
)
elif os.path.exists(location.image_path):
with open(location.image_path, 'rb') as img_file:
msg = await update.message.reply_photo(
photo=img_file,
caption=status_text,
reply_markup=keyboards.main_menu_keyboard(),
parse_mode='HTML'
)
if msg.photo:
await database.cache_image(location.image_path, msg.photo[-1].file_id)
else:
await update.message.reply_html(
status_text,
reply_markup=keyboards.main_menu_keyboard()
)
else:
await update.message.reply_html(
status_text,
reply_markup=keyboards.main_menu_keyboard()
)
@admin_only
async def export_map(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Export map data as JSON for external visualization."""
from data.world_loader import export_map_data
from io import BytesIO
map_data = export_map_data()
json_str = json.dumps(map_data, indent=2)
# Send as text file
file = BytesIO(json_str.encode('utf-8'))
file.name = "map_data.json"
await update.message.reply_document(
document=file,
filename="map_data.json",
caption="🗺️ Game Map Data\n\nThis JSON file contains all locations, coordinates, and connections.\nYou can use it to visualize the game map in external tools."
)
@admin_only
async def spawn_stats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show wandering enemy spawn statistics (debug command)."""
from bot.spawn_manager import get_spawn_stats
stats = await get_spawn_stats()
text = "📊 <b>Wandering Enemy Statistics</b>\n\n"
text += f"<b>Total Active Enemies:</b> {stats['total_active']}\n\n"
if stats['by_location']:
text += "<b>Enemies by Location:</b>\n"
for loc_id, count in stats['by_location'].items():
location = game_world.get_location(loc_id)
loc_name = location.name if location else loc_id
text += f"{loc_name}: {count}\n"
else:
text += "<i>No wandering enemies currently active.</i>"
await update.message.reply_html(text)
# ============================================================================
# BUTTON CALLBACK ROUTER
# ============================================================================
async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Main router for button callbacks.
Delegates to specific handler functions based on action type.
All handlers have a unified signature: (query, user_id, player, data=None)
"""
query = update.callback_query
user_id = query.from_user.id
data = query.data.split(':')
action_type = data[0]
# Check if player exists and is alive
player = await database.get_player(user_id)
if not player or player['is_dead']:
await query.answer()
await send_or_edit_with_image(
query,
text="💀 Your journey has ended. You died in the wasteland. Create a new character with /start to begin again.",
reply_markup=None
)
return
# Check if player is in combat - restrict most actions
combat = await database.get_combat(user_id)
allowed_in_combat = {
'combat_attack', 'combat_flee', 'combat_use_item_menu',
'combat_use_item', 'combat_back', 'no_op'
}
if combat and action_type not in allowed_in_combat:
await query.answer("You're in combat! Focus on the fight!", show_alert=False)
return
# Route to appropriate handler
if action_type == 'no_op':
await query.answer()
return
handler = HANDLER_MAP.get(action_type)
if handler:
try:
await handler(query, user_id, player, data)
except Exception as e:
logger.error(f"Error handling button action {action_type}: {e}", exc_info=True)
await query.answer("An error occurred. Please try again.", show_alert=True)
else:
logger.warning(f"Unknown action type: {action_type}")
await query.answer("Unknown action", show_alert=False)