Extract functionality from handlers.py into focused modules: New Modules: - bot/message_utils.py (120 lines) - Telegram message handling * send_or_edit_with_image() - Smart message/image transitions * Image caching and upload logic - bot/commands.py (110 lines) - Slash command handlers * start() - Player initialization * export_map() - Admin map export * spawn_stats() - Admin spawn statistics Refactored: - bot/handlers.py: 365 → 177 lines (-51% reduction) * Now contains only routing logic * Re-exports commands for backward compatibility * Cleaner, more focused responsibility Benefits: - Single Responsibility Principle achieved - Better testability (can test modules independently) - Improved organization and discoverability - Easier maintenance (changes isolated to specific files) - Full backward compatibility maintained Documented in MODULE_SEPARATION.md
107 lines
3.7 KiB
Python
107 lines
3.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Command handlers for the Telegram bot.
|
|
Handles slash commands like /start, /export_map, /spawn_stats.
|
|
"""
|
|
import logging
|
|
import os
|
|
import json
|
|
from io import BytesIO
|
|
from telegram import Update
|
|
from telegram.ext import ContextTypes
|
|
from . import database, keyboards
|
|
from .utils import admin_only
|
|
from .action_handlers import get_player_status_text
|
|
from data.world_loader import game_world
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Handle /start command - initialize or show player status."""
|
|
user = update.effective_user
|
|
player = await database.get_player(user.id)
|
|
|
|
if not player:
|
|
await database.create_player(user.id, user.first_name)
|
|
await update.message.reply_html(
|
|
f"Welcome, {user.mention_html()}! Your story is just beginning."
|
|
)
|
|
|
|
# Get player status and location image
|
|
player = await database.get_player(user.id)
|
|
status_text = await get_player_status_text(user.id)
|
|
location = game_world.get_location(player['location_id'])
|
|
|
|
# Send with image if available
|
|
if location and location.image_path:
|
|
cached_file_id = await database.get_cached_image(location.image_path)
|
|
if cached_file_id:
|
|
await update.message.reply_photo(
|
|
photo=cached_file_id,
|
|
caption=status_text,
|
|
reply_markup=keyboards.main_menu_keyboard(),
|
|
parse_mode='HTML'
|
|
)
|
|
elif os.path.exists(location.image_path):
|
|
with open(location.image_path, 'rb') as img_file:
|
|
msg = await update.message.reply_photo(
|
|
photo=img_file,
|
|
caption=status_text,
|
|
reply_markup=keyboards.main_menu_keyboard(),
|
|
parse_mode='HTML'
|
|
)
|
|
if msg.photo:
|
|
await database.cache_image(location.image_path, msg.photo[-1].file_id)
|
|
else:
|
|
await update.message.reply_html(
|
|
status_text,
|
|
reply_markup=keyboards.main_menu_keyboard()
|
|
)
|
|
else:
|
|
await update.message.reply_html(
|
|
status_text,
|
|
reply_markup=keyboards.main_menu_keyboard()
|
|
)
|
|
|
|
|
|
@admin_only
|
|
async def export_map(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Export map data as JSON for external visualization."""
|
|
from data.world_loader import export_map_data
|
|
|
|
map_data = export_map_data()
|
|
json_str = json.dumps(map_data, indent=2)
|
|
|
|
# Send as text file
|
|
file = BytesIO(json_str.encode('utf-8'))
|
|
file.name = "map_data.json"
|
|
|
|
await update.message.reply_document(
|
|
document=file,
|
|
filename="map_data.json",
|
|
caption="🗺️ Game Map Data\n\nThis JSON file contains all locations, coordinates, and connections.\nYou can use it to visualize the game map in external tools."
|
|
)
|
|
|
|
|
|
@admin_only
|
|
async def spawn_stats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Show wandering enemy spawn statistics (debug command)."""
|
|
from bot.spawn_manager import get_spawn_stats
|
|
|
|
stats = await get_spawn_stats()
|
|
|
|
text = "📊 <b>Wandering Enemy Statistics</b>\n\n"
|
|
text += f"<b>Total Active Enemies:</b> {stats['total_active']}\n\n"
|
|
|
|
if stats['by_location']:
|
|
text += "<b>Enemies by Location:</b>\n"
|
|
for loc_id, count in stats['by_location'].items():
|
|
location = game_world.get_location(loc_id)
|
|
loc_name = location.name if location else loc_id
|
|
text += f"• {loc_name}: {count}\n"
|
|
else:
|
|
text += "<i>No wandering enemies currently active.</i>"
|
|
|
|
await update.message.reply_html(text)
|