Add visual progress bars and refactor handler modules
- Implement visual HP/Stamina/XP bars using Unicode characters (██░) - Refactor handlers.py (1308 → 377 lines) into specialized modules: * action_handlers.py - World interaction and status display * inventory_handlers.py - Inventory management * combat_handlers.py - Combat actions * profile_handlers.py - Character stats with visual bars * corpse_handlers.py - Looting system * pickup_handlers.py - Item collection - Add utility functions: create_progress_bar(), format_stat_bar() - Organize all documentation into docs/ structure - Create comprehensive documentation index with navigation - Add UI examples showing before/after visual improvements
This commit is contained in:
371
bot/action_handlers.py
Normal file
371
bot/action_handlers.py
Normal file
@@ -0,0 +1,371 @@
|
||||
"""
|
||||
Action handlers for button callbacks.
|
||||
This module contains organized handler functions for different types of player actions.
|
||||
"""
|
||||
import logging
|
||||
import json
|
||||
import random
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from telegram.ext import ContextTypes
|
||||
from . import database, keyboards, logic
|
||||
from data.world_loader import game_world
|
||||
from data.items import ITEMS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# UTILITY FUNCTIONS
|
||||
# ============================================================================
|
||||
|
||||
async def get_player_status_text(telegram_id: int) -> str:
|
||||
"""Generate player status text with location and stats."""
|
||||
from .utils import format_stat_bar
|
||||
|
||||
player = await database.get_player(telegram_id)
|
||||
if not player:
|
||||
return "Could not find player data."
|
||||
|
||||
location = game_world.get_location(player["location_id"])
|
||||
if not location:
|
||||
return "Error: Player is in an unknown location."
|
||||
|
||||
inventory = await database.get_inventory(telegram_id)
|
||||
weight, volume = logic.calculate_inventory_load(inventory)
|
||||
max_weight, max_volume = logic.get_player_capacity(inventory, player)
|
||||
|
||||
# Get equipped items
|
||||
equipped_items = []
|
||||
for item in inventory:
|
||||
if item.get('is_equipped'):
|
||||
item_def = ITEMS.get(item['item_id'], {})
|
||||
emoji = item_def.get('emoji', '❔')
|
||||
equipped_items.append(f"{emoji} {item_def.get('name', 'Unknown')}")
|
||||
|
||||
# Build status with visual bars
|
||||
status = f"<b>📍 Location:</b> {location.name}\n"
|
||||
status += f"{format_stat_bar('HP', '❤️', player['hp'], player['max_hp'])}\n"
|
||||
status += f"{format_stat_bar('Stamina', '⚡', player['stamina'], player['max_stamina'])}\n"
|
||||
status += f"🎒 <b>Load:</b> {weight}/{max_weight} kg | {volume}/{max_volume} vol\n"
|
||||
|
||||
if equipped_items:
|
||||
status += f"⚔️ <b>Equipped:</b> {', '.join(equipped_items)}\n"
|
||||
|
||||
status += f"━━━━━━━━━━━━━━━━━━━━\n<i>{location.description}</i>"
|
||||
return status
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# INSPECTION & WORLD INTERACTION HANDLERS
|
||||
# ============================================================================
|
||||
|
||||
async def handle_inspect_area(query, user_id: int, player: dict):
|
||||
"""Handle the inspect area action."""
|
||||
await query.answer()
|
||||
location_id = player['location_id']
|
||||
location = game_world.get_location(location_id)
|
||||
dropped_items = await database.get_dropped_items_in_location(location_id)
|
||||
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
|
||||
|
||||
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text="You scan the area. You notice...",
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
|
||||
|
||||
async def handle_attack_wandering(query, user_id: int, player: dict, data: list):
|
||||
"""Handle attacking a wandering enemy."""
|
||||
enemy_db_id = int(data[1])
|
||||
await query.answer()
|
||||
|
||||
# Get the enemy from database
|
||||
wandering_enemies = await database.get_wandering_enemies_in_location(player['location_id'])
|
||||
enemy_data = next((e for e in wandering_enemies if e['id'] == enemy_db_id), None)
|
||||
|
||||
if not enemy_data:
|
||||
await query.answer("That enemy has already moved on!", show_alert=True)
|
||||
# Refresh inspect menu
|
||||
location_id = player['location_id']
|
||||
location = game_world.get_location(location_id)
|
||||
dropped_items = await database.get_dropped_items_in_location(location_id)
|
||||
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
|
||||
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text="You scan the area. You notice...",
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
return
|
||||
|
||||
npc_id = enemy_data['npc_id']
|
||||
|
||||
# Remove enemy from wandering table (they're now in combat)
|
||||
await database.remove_wandering_enemy(enemy_db_id)
|
||||
|
||||
from data.npcs import NPCS
|
||||
from bot import combat
|
||||
|
||||
# Initiate combat
|
||||
combat_data = await combat.initiate_combat(
|
||||
user_id, npc_id, player['location_id'], from_wandering_enemy=True
|
||||
)
|
||||
|
||||
if combat_data:
|
||||
npc_def = NPCS.get(npc_id)
|
||||
message = f"⚔️ You engage the {npc_def.emoji} {npc_def.name}!\n\n"
|
||||
message += f"{npc_def.description}\n\n"
|
||||
message += f"{npc_def.emoji} Enemy HP: {combat_data['npc_hp']}/{combat_data['npc_max_hp']}\n"
|
||||
message += f"❤️ Your HP: {player['hp']}/{player['max_hp']}\n\n"
|
||||
message += "🎯 Your turn! What will you do?"
|
||||
|
||||
keyboard = await keyboards.combat_keyboard(user_id)
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=message,
|
||||
reply_markup=keyboard,
|
||||
image_path=npc_def.image_url if npc_def else None
|
||||
)
|
||||
else:
|
||||
await query.answer("Failed to initiate combat.", show_alert=True)
|
||||
|
||||
|
||||
async def handle_inspect_interactable(query, user_id: int, player: dict, data: list):
|
||||
"""Handle inspecting an interactable object."""
|
||||
location_id, instance_id = data[1], data[2]
|
||||
|
||||
location = game_world.get_location(location_id)
|
||||
if not location:
|
||||
await query.answer("Location not found.", show_alert=True)
|
||||
return
|
||||
|
||||
interactable = location.get_interactable(instance_id)
|
||||
if not interactable:
|
||||
await query.answer("Object not found.", show_alert=False)
|
||||
return
|
||||
|
||||
# Check if ALL actions are on cooldown
|
||||
all_on_cooldown = True
|
||||
for action_id in interactable.actions.keys():
|
||||
cooldown_key = f"{instance_id}:{action_id}"
|
||||
if await database.get_cooldown(cooldown_key) == 0:
|
||||
all_on_cooldown = False
|
||||
break
|
||||
|
||||
if all_on_cooldown and len(interactable.actions) > 0:
|
||||
await query.answer(
|
||||
f"The {interactable.name} has already been searched. Try again later.",
|
||||
show_alert=False
|
||||
)
|
||||
return
|
||||
|
||||
# Show action menu
|
||||
await query.answer()
|
||||
image_path = interactable.image_path if interactable else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=f"You focus on the {interactable.name}. What do you do?",
|
||||
reply_markup=await keyboards.actions_keyboard(location_id, instance_id),
|
||||
image_path=image_path
|
||||
)
|
||||
|
||||
|
||||
async def handle_action(query, user_id: int, player: dict, data: list):
|
||||
"""Handle performing an action on an interactable object."""
|
||||
location_id, instance_id, action_id = data[1], data[2], data[3]
|
||||
cooldown_key = f"{instance_id}:{action_id}"
|
||||
cooldown = await database.get_cooldown(cooldown_key)
|
||||
|
||||
if cooldown > 0:
|
||||
await query.answer("Someone got to it just before you!", show_alert=False)
|
||||
return
|
||||
|
||||
location = game_world.get_location(location_id)
|
||||
if not location:
|
||||
await query.answer("Location not found.", show_alert=True)
|
||||
return
|
||||
|
||||
action_obj = location.get_interactable(instance_id).get_action(action_id)
|
||||
|
||||
if player['stamina'] < action_obj.stamina_cost:
|
||||
await query.answer("You are too tired to do that!", show_alert=False)
|
||||
return
|
||||
|
||||
await query.answer()
|
||||
|
||||
# Set cooldown
|
||||
await database.set_cooldown(cooldown_key)
|
||||
|
||||
# Resolve action
|
||||
outcome = logic.resolve_action(player, action_obj)
|
||||
new_stamina = player['stamina'] - action_obj.stamina_cost
|
||||
new_hp = player['hp'] - outcome.damage_taken
|
||||
await database.update_player(user_id, {"stamina": new_stamina, "hp": new_hp})
|
||||
|
||||
# Build detailed action result
|
||||
result_details = [f"<i>{outcome.text}</i>"]
|
||||
|
||||
if action_obj.stamina_cost > 0:
|
||||
result_details.append(f"⚡️ <b>Stamina:</b> -{action_obj.stamina_cost}")
|
||||
|
||||
if outcome.damage_taken > 0:
|
||||
result_details.append(f"❤️ <b>HP:</b> -{outcome.damage_taken}")
|
||||
|
||||
# Add items gained
|
||||
if outcome.items_reward:
|
||||
items_text = []
|
||||
items_failed = []
|
||||
for item_id, quantity in outcome.items_reward.items():
|
||||
can_add, reason = await logic.can_add_item_to_inventory(user_id, item_id, quantity)
|
||||
|
||||
if can_add:
|
||||
await database.add_item_to_inventory(user_id, item_id, quantity)
|
||||
item_def = ITEMS.get(item_id, {})
|
||||
emoji = item_def.get('emoji', '❔')
|
||||
item_name = item_def.get('name', item_id)
|
||||
items_text.append(f"{emoji} {item_name} x{quantity}")
|
||||
else:
|
||||
item_def = ITEMS.get(item_id, {})
|
||||
item_name = item_def.get('name', item_id)
|
||||
items_failed.append(f"{item_name} ({reason})")
|
||||
|
||||
if items_text:
|
||||
result_details.append(f"🎁 <b>Gained:</b> {', '.join(items_text)}")
|
||||
if items_failed:
|
||||
result_details.append(f"⚠️ <b>Couldn't take:</b> {', '.join(items_failed)}")
|
||||
|
||||
final_text = await get_player_status_text(user_id)
|
||||
final_text += f"\n\n<b>━━━ Action Result ━━━</b>\n" + "\n".join(result_details)
|
||||
|
||||
# Get location image for the result screen
|
||||
current_location = game_world.get_location(player['location_id'])
|
||||
location_image = current_location.image_path if current_location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=final_text,
|
||||
reply_markup=keyboards.main_menu_keyboard(),
|
||||
image_path=location_image
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# NAVIGATION & MOVEMENT HANDLERS
|
||||
# ============================================================================
|
||||
|
||||
async def handle_main_menu(query, user_id: int, player: dict):
|
||||
"""Return to main menu."""
|
||||
await query.answer()
|
||||
status_text = await get_player_status_text(user_id)
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=status_text,
|
||||
reply_markup=keyboards.main_menu_keyboard(),
|
||||
image_path=location_image
|
||||
)
|
||||
|
||||
|
||||
async def handle_move_menu(query, user_id: int, player: dict):
|
||||
"""Show movement options."""
|
||||
await query.answer()
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text="Where do you want to go?",
|
||||
reply_markup=await keyboards.move_keyboard(player['location_id'], user_id),
|
||||
image_path=location_image
|
||||
)
|
||||
|
||||
|
||||
async def handle_move(query, user_id: int, player: dict, data: list):
|
||||
"""Handle player movement to a new location."""
|
||||
destination_id = data[1]
|
||||
|
||||
from_location = game_world.get_location(player['location_id'])
|
||||
to_location = game_world.get_location(destination_id)
|
||||
|
||||
if not from_location or not to_location:
|
||||
await query.answer("Invalid location!", show_alert=True)
|
||||
return
|
||||
|
||||
# Calculate stamina cost
|
||||
inventory = await database.get_inventory(user_id)
|
||||
stamina_cost = logic.calculate_travel_stamina_cost(player, inventory, from_location, to_location)
|
||||
|
||||
if player['stamina'] < stamina_cost:
|
||||
await query.answer(f"Too tired to travel! Need {stamina_cost} stamina.", show_alert=True)
|
||||
return
|
||||
|
||||
# Deduct stamina and update location
|
||||
new_stamina = player['stamina'] - stamina_cost
|
||||
await database.update_player(user_id, {"location_id": destination_id, "stamina": new_stamina})
|
||||
|
||||
await query.answer(f"⚡️ -{stamina_cost} stamina", show_alert=False)
|
||||
|
||||
# Refresh player data
|
||||
player = await database.get_player(user_id)
|
||||
|
||||
# Check for random NPC encounter
|
||||
from data.npcs import NPCS, get_random_npc_for_location, get_location_encounter_rate
|
||||
encounter_rate = get_location_encounter_rate(destination_id)
|
||||
|
||||
if random.random() < encounter_rate:
|
||||
from bot import combat
|
||||
logger.info(f"Encounter triggered at {destination_id} (rate: {encounter_rate})")
|
||||
|
||||
npc_id = get_random_npc_for_location(destination_id)
|
||||
|
||||
if npc_id:
|
||||
combat_data = await combat.initiate_combat(user_id, npc_id, destination_id)
|
||||
|
||||
if combat_data:
|
||||
npc_def = NPCS.get(npc_id)
|
||||
message = f"⚠️ A {npc_def.emoji} {npc_def.name} appears!\n\n"
|
||||
message += f"{npc_def.description}\n\n"
|
||||
message += f"{npc_def.emoji} Enemy HP: {combat_data['npc_hp']}/{combat_data['npc_max_hp']}\n"
|
||||
message += f"❤️ Your HP: {player['hp']}/{player['max_hp']}\n\n"
|
||||
message += "🎯 Your turn! What will you do?"
|
||||
|
||||
keyboard = await keyboards.combat_keyboard(user_id)
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=message,
|
||||
reply_markup=keyboard,
|
||||
image_path=npc_def.image_url if npc_def else None
|
||||
)
|
||||
return
|
||||
|
||||
status_text = await get_player_status_text(user_id)
|
||||
new_location = game_world.get_location(destination_id)
|
||||
location_image = new_location.image_path if new_location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=status_text,
|
||||
reply_markup=keyboards.main_menu_keyboard(),
|
||||
image_path=location_image
|
||||
)
|
||||
171
bot/combat_handlers.py
Normal file
171
bot/combat_handlers.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""
|
||||
Combat-related action handlers.
|
||||
"""
|
||||
import logging
|
||||
from . import database, keyboards
|
||||
from data.world_loader import game_world
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def handle_combat_attack(query, user_id: int, player: dict):
|
||||
"""Handle player attack action in combat."""
|
||||
from bot import combat
|
||||
await query.answer()
|
||||
|
||||
message, npc_died, turn_ended = await combat.player_attack(user_id)
|
||||
|
||||
if npc_died:
|
||||
# Combat ended - return to main menu
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=message,
|
||||
reply_markup=keyboards.main_menu_keyboard(),
|
||||
image_path=location_image
|
||||
)
|
||||
elif turn_ended:
|
||||
# NPC's turn - auto-attack
|
||||
npc_message, player_died = await combat.npc_attack(user_id)
|
||||
message += "\n\n" + npc_message
|
||||
|
||||
if player_died:
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(query, text=message, reply_markup=None)
|
||||
else:
|
||||
combat_data = await database.get_combat(user_id)
|
||||
if combat_data:
|
||||
from data.npcs import NPCS
|
||||
npc_def = NPCS.get(combat_data['npc_id'])
|
||||
keyboard = await keyboards.combat_keyboard(user_id)
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=message,
|
||||
reply_markup=keyboard,
|
||||
image_path=npc_def.image_url if npc_def else None
|
||||
)
|
||||
else:
|
||||
await query.answer(message, show_alert=False)
|
||||
|
||||
|
||||
async def handle_combat_flee(query, user_id: int, player: dict):
|
||||
"""Handle flee attempt in combat."""
|
||||
from bot import combat
|
||||
await query.answer()
|
||||
|
||||
message, fled, turn_ended = await combat.flee_attempt(user_id)
|
||||
|
||||
if fled:
|
||||
# Successfully fled - return to main menu
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=message,
|
||||
reply_markup=keyboards.main_menu_keyboard(),
|
||||
image_path=location_image
|
||||
)
|
||||
elif turn_ended:
|
||||
# Failed to flee - NPC attacks
|
||||
npc_message, player_died = await combat.npc_attack(user_id)
|
||||
message += "\n\n" + npc_message
|
||||
|
||||
if player_died:
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(query, text=message, reply_markup=None)
|
||||
else:
|
||||
combat_data = await database.get_combat(user_id)
|
||||
if combat_data:
|
||||
from data.npcs import NPCS
|
||||
npc_def = NPCS.get(combat_data['npc_id'])
|
||||
keyboard = await keyboards.combat_keyboard(user_id)
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=message,
|
||||
reply_markup=keyboard,
|
||||
image_path=npc_def.image_url if npc_def else None
|
||||
)
|
||||
else:
|
||||
await query.answer(message, show_alert=False)
|
||||
|
||||
|
||||
async def handle_combat_use_item_menu(query, user_id: int, player: dict):
|
||||
"""Show menu of items that can be used in combat."""
|
||||
await query.answer()
|
||||
keyboard = await keyboards.combat_items_keyboard(user_id)
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text="💊 Select an item to use:",
|
||||
reply_markup=keyboard
|
||||
)
|
||||
|
||||
|
||||
async def handle_combat_use_item(query, user_id: int, player: dict, data: list):
|
||||
"""Use an item during combat."""
|
||||
from bot import combat
|
||||
item_db_id = int(data[1])
|
||||
|
||||
message, turn_ended = await combat.use_item_in_combat(user_id, item_db_id)
|
||||
await query.answer(message, show_alert=False)
|
||||
|
||||
if turn_ended:
|
||||
# NPC's turn
|
||||
npc_message, player_died = await combat.npc_attack(user_id)
|
||||
|
||||
if player_died:
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=message + "\n\n" + npc_message,
|
||||
reply_markup=None
|
||||
)
|
||||
else:
|
||||
combat_data = await database.get_combat(user_id)
|
||||
if combat_data:
|
||||
from data.npcs import NPCS
|
||||
npc_def = NPCS.get(combat_data['npc_id'])
|
||||
keyboard = await keyboards.combat_keyboard(user_id)
|
||||
full_message = message + "\n\n" + npc_message + "\n\n🎯 Your turn!"
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=full_message,
|
||||
reply_markup=keyboard,
|
||||
image_path=npc_def.image_url if npc_def else None
|
||||
)
|
||||
|
||||
|
||||
async def handle_combat_back(query, user_id: int, player: dict):
|
||||
"""Return to combat menu from item selection."""
|
||||
await query.answer()
|
||||
combat_data = await database.get_combat(user_id)
|
||||
|
||||
if combat_data:
|
||||
from data.npcs import NPCS
|
||||
npc_def = NPCS.get(combat_data['npc_id'])
|
||||
keyboard = await keyboards.combat_keyboard(user_id)
|
||||
|
||||
message = f"⚔️ Combat with {npc_def.emoji} {npc_def.name}!\n"
|
||||
message += f"{npc_def.emoji} Enemy HP: {combat_data['npc_hp']}/{combat_data['npc_max_hp']}\n"
|
||||
message += f"❤️ Your HP: {player['hp']}/{player['max_hp']}\n\n"
|
||||
message += "🎯 Your turn!" if combat_data['turn'] == 'player' else "⏳ Enemy's turn..."
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=message,
|
||||
reply_markup=keyboard,
|
||||
image_path=npc_def.image_url if npc_def else None
|
||||
)
|
||||
234
bot/corpse_handlers.py
Normal file
234
bot/corpse_handlers.py
Normal file
@@ -0,0 +1,234 @@
|
||||
"""
|
||||
Corpse looting handlers (player and NPC corpses).
|
||||
"""
|
||||
import logging
|
||||
import json
|
||||
import random
|
||||
from . import database, keyboards, logic
|
||||
from data.world_loader import game_world
|
||||
from data.items import ITEMS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def handle_loot_player_corpse(query, user_id: int, player: dict, data: list):
|
||||
"""Show player corpse loot menu."""
|
||||
corpse_id = int(data[1])
|
||||
corpse = await database.get_player_corpse(corpse_id)
|
||||
|
||||
if not corpse:
|
||||
await query.answer("Corpse not found.", show_alert=False)
|
||||
return
|
||||
|
||||
items = json.loads(corpse['items'])
|
||||
keyboard = keyboards.player_corpse_loot_keyboard(corpse_id, items)
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
await query.answer()
|
||||
text = f"🎒 {corpse['player_name']}'s bag\n\nYou find the remains of another survivor..."
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
|
||||
|
||||
async def handle_take_corpse_item(query, user_id: int, player: dict, data: list):
|
||||
"""Take an item from a player corpse."""
|
||||
corpse_id = int(data[1])
|
||||
item_index = int(data[2])
|
||||
|
||||
corpse = await database.get_player_corpse(corpse_id)
|
||||
if not corpse:
|
||||
await query.answer("Corpse not found.", show_alert=False)
|
||||
return
|
||||
|
||||
items = json.loads(corpse['items'])
|
||||
if item_index >= len(items):
|
||||
await query.answer("Item not found.", show_alert=False)
|
||||
return
|
||||
|
||||
item_data = items[item_index]
|
||||
item_def = ITEMS.get(item_data['item_id'], {})
|
||||
|
||||
# Check inventory capacity
|
||||
can_add, reason = await logic.can_add_item_to_inventory(
|
||||
user_id, item_data['item_id'], item_data['quantity']
|
||||
)
|
||||
|
||||
if not can_add:
|
||||
await query.answer(reason, show_alert=False)
|
||||
return
|
||||
|
||||
# Add to inventory
|
||||
await database.add_item_to_inventory(user_id, item_data['item_id'], item_data['quantity'])
|
||||
|
||||
# Remove from corpse
|
||||
items.pop(item_index)
|
||||
|
||||
if items:
|
||||
await database.update_player_corpse(corpse_id, json.dumps(items))
|
||||
keyboard = keyboards.player_corpse_loot_keyboard(corpse_id, items)
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
await query.answer(f"Took {item_def.get('name', 'Unknown')}.", show_alert=False)
|
||||
text = f"🎒 {corpse['player_name']}'s bag"
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
else:
|
||||
# Bag is empty, remove it
|
||||
await database.remove_player_corpse(corpse_id)
|
||||
await query.answer(
|
||||
f"Took {item_def.get('name', 'Unknown')}. The bag is now empty.",
|
||||
show_alert=False
|
||||
)
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
dropped_items = await database.get_dropped_items_in_location(player['location_id'])
|
||||
wandering_enemies = await database.get_wandering_enemies_in_location(player['location_id'])
|
||||
keyboard = await keyboards.inspect_keyboard(player['location_id'], dropped_items, wandering_enemies)
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text="You scan the area. You notice...",
|
||||
reply_markup=keyboard,
|
||||
image_path=location.image_path if location else None
|
||||
)
|
||||
|
||||
|
||||
async def handle_scavenge_npc_corpse(query, user_id: int, player: dict, data: list):
|
||||
"""Show NPC corpse scavenging menu."""
|
||||
corpse_id = int(data[1])
|
||||
corpse = await database.get_npc_corpse(corpse_id)
|
||||
|
||||
if not corpse:
|
||||
await query.answer("Corpse not found.", show_alert=False)
|
||||
return
|
||||
|
||||
from data.npcs import NPCS
|
||||
npc_def = NPCS.get(corpse['npc_id'])
|
||||
loot_items = json.loads(corpse['loot_remaining'])
|
||||
keyboard = keyboards.npc_corpse_scavenge_keyboard(corpse_id, loot_items)
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
await query.answer()
|
||||
text = f"🔪 {npc_def.emoji} {npc_def.name} Corpse\n\n{npc_def.description}"
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
|
||||
|
||||
async def handle_scavenge_corpse_item(query, user_id: int, player: dict, data: list):
|
||||
"""Scavenge a specific item from an NPC corpse."""
|
||||
corpse_id = int(data[1])
|
||||
loot_index = int(data[2])
|
||||
|
||||
corpse = await database.get_npc_corpse(corpse_id)
|
||||
if not corpse:
|
||||
await query.answer("Corpse not found.", show_alert=False)
|
||||
return
|
||||
|
||||
loot_items = json.loads(corpse['loot_remaining'])
|
||||
if loot_index >= len(loot_items):
|
||||
await query.answer("Nothing to scavenge here.", show_alert=False)
|
||||
return
|
||||
|
||||
loot_data = loot_items[loot_index]
|
||||
required_tool = loot_data.get('required_tool')
|
||||
|
||||
# Check if player has required tool
|
||||
if required_tool:
|
||||
inventory_items = await database.get_inventory(user_id)
|
||||
has_tool = any(item['item_id'] == required_tool for item in inventory_items)
|
||||
|
||||
if not has_tool:
|
||||
tool_def = ITEMS.get(required_tool, {})
|
||||
await query.answer(
|
||||
f"You need a {tool_def.get('name', 'tool')} to scavenge this.",
|
||||
show_alert=False
|
||||
)
|
||||
return
|
||||
|
||||
# Determine quantity
|
||||
quantity = random.randint(loot_data['quantity_min'], loot_data['quantity_max'])
|
||||
item_def = ITEMS.get(loot_data['item_id'], {})
|
||||
|
||||
# Check inventory capacity
|
||||
can_add, reason = await logic.can_add_item_to_inventory(
|
||||
user_id, loot_data['item_id'], quantity
|
||||
)
|
||||
|
||||
if not can_add:
|
||||
await query.answer(reason, show_alert=False)
|
||||
return
|
||||
|
||||
# Add to inventory
|
||||
await database.add_item_to_inventory(user_id, loot_data['item_id'], quantity)
|
||||
|
||||
# Remove from corpse
|
||||
loot_items.pop(loot_index)
|
||||
|
||||
if loot_items:
|
||||
await database.update_npc_corpse(corpse_id, json.dumps(loot_items))
|
||||
keyboard = keyboards.npc_corpse_scavenge_keyboard(corpse_id, loot_items)
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
await query.answer(
|
||||
f"Scavenged {quantity}x {item_def.get('name', 'Unknown')}.",
|
||||
show_alert=False
|
||||
)
|
||||
|
||||
from data.npcs import NPCS
|
||||
npc_def = NPCS.get(corpse['npc_id'])
|
||||
text = f"🔪 {npc_def.emoji} {npc_def.name} Corpse"
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
else:
|
||||
# Nothing left, remove corpse
|
||||
await database.remove_npc_corpse(corpse_id)
|
||||
await query.answer(
|
||||
f"Scavenged {quantity}x {item_def.get('name', 'Unknown')}. Nothing left on the corpse.",
|
||||
show_alert=False
|
||||
)
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
dropped_items = await database.get_dropped_items_in_location(player['location_id'])
|
||||
wandering_enemies = await database.get_wandering_enemies_in_location(player['location_id'])
|
||||
keyboard = await keyboards.inspect_keyboard(player['location_id'], dropped_items, wandering_enemies)
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text="You scan the area. You notice...",
|
||||
reply_markup=keyboard,
|
||||
image_path=location.image_path if location else None
|
||||
)
|
||||
1233
bot/handlers.py
1233
bot/handlers.py
File diff suppressed because it is too large
Load Diff
355
bot/inventory_handlers.py
Normal file
355
bot/inventory_handlers.py
Normal file
@@ -0,0 +1,355 @@
|
||||
"""
|
||||
Inventory-related action handlers.
|
||||
"""
|
||||
import logging
|
||||
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from . import database, keyboards, logic
|
||||
from data.world_loader import game_world
|
||||
from data.items import ITEMS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def handle_inventory_menu(query, user_id: int, player: dict):
|
||||
"""Show player inventory."""
|
||||
await query.answer()
|
||||
inventory_items = await database.get_inventory(user_id)
|
||||
|
||||
# Calculate inventory summary
|
||||
current_weight, current_volume = logic.calculate_inventory_load(inventory_items)
|
||||
max_weight, max_volume = logic.get_player_capacity(inventory_items, player)
|
||||
|
||||
text = "<b>🎒 Your Inventory:</b>\n"
|
||||
text += f"📊 Weight: {current_weight}/{max_weight} kg\n"
|
||||
text += f"📦 Volume: {current_volume}/{max_volume} vol\n\n"
|
||||
|
||||
if not inventory_items:
|
||||
text += "It's empty."
|
||||
|
||||
# Keep current location image for context
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboards.inventory_keyboard(inventory_items),
|
||||
image_path=location_image
|
||||
)
|
||||
|
||||
|
||||
async def handle_inventory_item(query, user_id: int, player: dict, data: list):
|
||||
"""Show details for a specific inventory item."""
|
||||
await query.answer()
|
||||
item_db_id = int(data[1])
|
||||
item = await database.get_inventory_item(item_db_id)
|
||||
item_def = ITEMS.get(item['item_id'], {})
|
||||
emoji = item_def.get('emoji', '❔')
|
||||
|
||||
# Build item details text
|
||||
text = f"{emoji} <b>{item_def.get('name', 'Unknown')}</b>\n"
|
||||
|
||||
description = item_def.get('description')
|
||||
if description:
|
||||
text += f"<i>{description}</i>\n\n"
|
||||
else:
|
||||
text += "\n"
|
||||
|
||||
text += f"<b>Weight:</b> {item_def.get('weight', 0)} kg | <b>Volume:</b> {item_def.get('volume', 0)} vol\n"
|
||||
|
||||
# Add weapon stats if applicable
|
||||
if item_def.get('type') == 'weapon':
|
||||
text += f"<b>Damage:</b> {item_def.get('damage_min', 0)}-{item_def.get('damage_max', 0)}\n"
|
||||
|
||||
# Add consumable effects if applicable
|
||||
if item_def.get('type') == 'consumable':
|
||||
effects = []
|
||||
if item_def.get('hp_restore'):
|
||||
effects.append(f"❤️ +{item_def.get('hp_restore')} HP")
|
||||
if item_def.get('stamina_restore'):
|
||||
effects.append(f"⚡ +{item_def.get('stamina_restore')} Stamina")
|
||||
if effects:
|
||||
text += f"<b>Effects:</b> {', '.join(effects)}\n"
|
||||
|
||||
# Add equipped status
|
||||
if item.get('is_equipped'):
|
||||
text += "\n✅ <b>Currently Equipped</b>"
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboards.inventory_item_actions_keyboard(
|
||||
item_db_id, item_def, item.get('is_equipped', False), item['quantity']
|
||||
),
|
||||
image_path=location_image
|
||||
)
|
||||
|
||||
|
||||
async def handle_inventory_use(query, user_id: int, player: dict, data: list):
|
||||
"""Use a consumable item from inventory."""
|
||||
item_db_id = int(data[1])
|
||||
item = await database.get_inventory_item(item_db_id)
|
||||
|
||||
if not item:
|
||||
await query.answer("Item not found.", show_alert=False)
|
||||
return
|
||||
|
||||
item_def = ITEMS.get(item['item_id'], {})
|
||||
|
||||
if item_def.get('type') != 'consumable':
|
||||
await query.answer("This item cannot be used.", show_alert=False)
|
||||
return
|
||||
|
||||
await query.answer()
|
||||
|
||||
# Apply item effects
|
||||
result_parts = []
|
||||
updates = {}
|
||||
|
||||
if 'hp_restore' in item_def:
|
||||
hp_gain = item_def['hp_restore']
|
||||
new_hp = min(player['max_hp'], player['hp'] + hp_gain)
|
||||
actual_gain = new_hp - player['hp']
|
||||
updates['hp'] = new_hp
|
||||
if actual_gain > 0:
|
||||
result_parts.append(f"❤️ HP: +{actual_gain}")
|
||||
else:
|
||||
result_parts.append(f"❤️ HP: Already at maximum!")
|
||||
|
||||
if 'stamina_restore' in item_def:
|
||||
stamina_gain = item_def['stamina_restore']
|
||||
new_stamina = min(player['max_stamina'], player['stamina'] + stamina_gain)
|
||||
actual_gain = new_stamina - player['stamina']
|
||||
updates['stamina'] = new_stamina
|
||||
if actual_gain > 0:
|
||||
result_parts.append(f"⚡️ Stamina: +{actual_gain}")
|
||||
else:
|
||||
result_parts.append(f"⚡️ Stamina: Already at maximum!")
|
||||
|
||||
if updates:
|
||||
await database.update_player(user_id, updates)
|
||||
|
||||
# Remove one item from inventory
|
||||
if item['quantity'] > 1:
|
||||
await database.update_inventory_item(item['id'], quantity=item['quantity'] - 1)
|
||||
else:
|
||||
await database.remove_item_from_inventory(item['id'])
|
||||
|
||||
# Build result message
|
||||
emoji = item_def.get('emoji', '❔')
|
||||
result_text = f"<b>Used {emoji} {item_def.get('name')}</b>\n\n"
|
||||
if result_parts:
|
||||
result_text += "\n".join(result_parts)
|
||||
else:
|
||||
result_text += "No effect."
|
||||
|
||||
# Show updated inventory
|
||||
inventory_items = await database.get_inventory(user_id)
|
||||
current_weight, current_volume = logic.calculate_inventory_load(inventory_items)
|
||||
max_weight, max_volume = logic.get_player_capacity(inventory_items, player)
|
||||
|
||||
text = "<b>🎒 Your Inventory:</b>\n"
|
||||
text += f"📊 Weight: {current_weight}/{max_weight} kg\n"
|
||||
text += f"📦 Volume: {current_volume}/{max_volume} vol\n\n"
|
||||
|
||||
if not inventory_items:
|
||||
text += "It's empty."
|
||||
else:
|
||||
text += f"{result_text}"
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboards.inventory_keyboard(inventory_items),
|
||||
image_path=location_image
|
||||
)
|
||||
|
||||
|
||||
async def handle_inventory_drop(query, user_id: int, player: dict, data: list):
|
||||
"""Drop an item from inventory to the world."""
|
||||
item_db_id = int(data[1])
|
||||
drop_amount_str = data[2] if len(data) > 2 else None
|
||||
|
||||
item = await database.get_inventory_item(item_db_id)
|
||||
if not item:
|
||||
await query.answer("Item not found.", show_alert=False)
|
||||
return
|
||||
|
||||
item_def = ITEMS.get(item['item_id'], {})
|
||||
|
||||
# Determine how much to drop
|
||||
if drop_amount_str is None or drop_amount_str == "all":
|
||||
await database.drop_item_to_world(item['item_id'], item['quantity'], player['location_id'])
|
||||
await database.remove_item_from_inventory(item['id'], quantity=item['quantity'])
|
||||
await query.answer(f"You dropped all {item['quantity']}x {item_def.get('name')}.", show_alert=False)
|
||||
else:
|
||||
drop_amount = int(drop_amount_str)
|
||||
if drop_amount >= item['quantity']:
|
||||
await database.drop_item_to_world(item['item_id'], item['quantity'], player['location_id'])
|
||||
await database.remove_item_from_inventory(item['id'], quantity=item['quantity'])
|
||||
await query.answer(f"You dropped all {item['quantity']}x {item_def.get('name')}.", show_alert=False)
|
||||
else:
|
||||
await database.drop_item_to_world(item['item_id'], drop_amount, player['location_id'])
|
||||
await database.update_inventory_item(item['id'], quantity=item['quantity'] - drop_amount)
|
||||
await query.answer(f"You dropped {drop_amount}x {item_def.get('name')}.", show_alert=False)
|
||||
|
||||
inventory_items = await database.get_inventory(user_id)
|
||||
current_weight, current_volume = logic.calculate_inventory_load(inventory_items)
|
||||
max_weight, max_volume = logic.get_player_capacity(inventory_items, player)
|
||||
|
||||
text = "<b>🎒 Your Inventory:</b>\n"
|
||||
text += f"📊 Weight: {current_weight}/{max_weight} kg\n"
|
||||
text += f"📦 Volume: {current_volume}/{max_volume} vol\n\n"
|
||||
|
||||
if not inventory_items:
|
||||
text += "It's empty."
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboards.inventory_keyboard(inventory_items),
|
||||
image_path=location_image
|
||||
)
|
||||
|
||||
|
||||
async def handle_inventory_equip(query, user_id: int, player: dict, data: list):
|
||||
"""Equip an item from inventory."""
|
||||
item_db_id = int(data[1])
|
||||
item = await database.get_inventory_item(item_db_id)
|
||||
|
||||
if not item:
|
||||
await query.answer("Item not found.", show_alert=False)
|
||||
return
|
||||
|
||||
item_def = ITEMS.get(item['item_id'], {})
|
||||
item_slot = item_def.get('slot')
|
||||
|
||||
if not item_slot:
|
||||
await query.answer("This item cannot be equipped.", show_alert=False)
|
||||
return
|
||||
|
||||
# Unequip any item in the same slot
|
||||
inventory_items = await database.get_inventory(user_id)
|
||||
for inv_item in inventory_items:
|
||||
if inv_item.get('is_equipped'):
|
||||
inv_item_def = ITEMS.get(inv_item['item_id'], {})
|
||||
if inv_item_def.get('slot') == item_slot:
|
||||
await database.update_inventory_item(inv_item['id'], is_equipped=False)
|
||||
|
||||
# If equipping from a stack, split the stack
|
||||
if item['quantity'] > 1:
|
||||
await database.update_inventory_item(item_db_id, quantity=item['quantity'] - 1)
|
||||
new_item_id = await database.add_equipped_item_to_inventory(user_id, item['item_id'])
|
||||
await query.answer(f"Equipped {item_def.get('name')}!", show_alert=False)
|
||||
item = await database.get_inventory_item(new_item_id)
|
||||
item_db_id = new_item_id
|
||||
else:
|
||||
await database.update_inventory_item(item_db_id, is_equipped=True)
|
||||
await query.answer(f"Equipped {item_def.get('name')}!", show_alert=False)
|
||||
item = await database.get_inventory_item(item_db_id)
|
||||
|
||||
# Refresh the item view
|
||||
emoji = item_def.get('emoji', '❔')
|
||||
text = f"{emoji} <b>{item_def.get('name', 'Unknown')}</b>\n"
|
||||
|
||||
description = item_def.get('description')
|
||||
if description:
|
||||
text += f"<i>{description}</i>\n\n"
|
||||
else:
|
||||
text += "\n"
|
||||
|
||||
text += f"<b>Weight:</b> {item_def.get('weight', 0)} kg | <b>Volume:</b> {item_def.get('volume', 0)} vol\n"
|
||||
|
||||
if item_def.get('type') == 'weapon':
|
||||
text += f"<b>Damage:</b> {item_def.get('damage_min', 0)}-{item_def.get('damage_max', 0)}\n"
|
||||
|
||||
text += "\n✅ <b>Currently Equipped</b>"
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboards.inventory_item_actions_keyboard(
|
||||
item_db_id, item_def, True, item['quantity']
|
||||
),
|
||||
image_path=location_image
|
||||
)
|
||||
|
||||
|
||||
async def handle_inventory_unequip(query, user_id: int, player: dict, data: list):
|
||||
"""Unequip an item."""
|
||||
item_db_id = int(data[1])
|
||||
item = await database.get_inventory_item(item_db_id)
|
||||
|
||||
if not item:
|
||||
await query.answer("Item not found.", show_alert=False)
|
||||
return
|
||||
|
||||
item_def = ITEMS.get(item['item_id'], {})
|
||||
|
||||
# Check if there's an existing unequipped stack
|
||||
inventory_items = await database.get_inventory(user_id)
|
||||
existing_stack = None
|
||||
for inv_item in inventory_items:
|
||||
if (inv_item['item_id'] == item['item_id'] and
|
||||
not inv_item.get('is_equipped') and
|
||||
inv_item['id'] != item_db_id):
|
||||
existing_stack = inv_item
|
||||
break
|
||||
|
||||
if existing_stack:
|
||||
# Merge into existing stack
|
||||
await database.update_inventory_item(existing_stack['id'], quantity=existing_stack['quantity'] + 1)
|
||||
await database.remove_item_from_inventory(item_db_id)
|
||||
await query.answer(f"Unequipped {item_def.get('name')}.", show_alert=False)
|
||||
item = await database.get_inventory_item(existing_stack['id'])
|
||||
item_db_id = existing_stack['id']
|
||||
else:
|
||||
# Just unequip
|
||||
await database.update_inventory_item(item_db_id, is_equipped=False)
|
||||
await query.answer(f"Unequipped {item_def.get('name')}.", show_alert=False)
|
||||
item = await database.get_inventory_item(item_db_id)
|
||||
|
||||
# Refresh the item view
|
||||
emoji = item_def.get('emoji', '❔')
|
||||
text = f"{emoji} <b>{item_def.get('name', 'Unknown')}</b>\n"
|
||||
|
||||
description = item_def.get('description')
|
||||
if description:
|
||||
text += f"<i>{description}</i>\n\n"
|
||||
else:
|
||||
text += "\n"
|
||||
|
||||
text += f"<b>Weight:</b> {item_def.get('weight', 0)} kg | <b>Volume:</b> {item_def.get('volume', 0)} vol\n"
|
||||
|
||||
if item_def.get('type') == 'weapon':
|
||||
text += f"<b>Damage:</b> {item_def.get('damage_min', 0)}-{item_def.get('damage_max', 0)}\n"
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboards.inventory_item_actions_keyboard(
|
||||
item_db_id, item_def, False, item['quantity']
|
||||
),
|
||||
image_path=location_image
|
||||
)
|
||||
135
bot/pickup_handlers.py
Normal file
135
bot/pickup_handlers.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""
|
||||
Pickup and item collection handlers.
|
||||
"""
|
||||
import logging
|
||||
from . import database, keyboards, logic
|
||||
from data.world_loader import game_world
|
||||
from data.items import ITEMS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def handle_pickup_menu(query, user_id: int, player: dict, data: list):
|
||||
"""Show pickup options for a dropped item."""
|
||||
dropped_item_id = int(data[1])
|
||||
item_to_pickup = await database.get_dropped_item(dropped_item_id)
|
||||
|
||||
if not item_to_pickup:
|
||||
await query.answer("Someone already picked that up!", show_alert=False)
|
||||
location_id = player['location_id']
|
||||
location = game_world.get_location(location_id)
|
||||
dropped_items = await database.get_dropped_items_in_location(location_id)
|
||||
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
|
||||
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text="You scan the area. You notice...",
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
return
|
||||
|
||||
item_def = ITEMS.get(item_to_pickup['item_id'], {})
|
||||
emoji = item_def.get('emoji', '❔')
|
||||
text = f"{emoji} <b>{item_def.get('name', 'Unknown')}</b>\n\n"
|
||||
text += f"Available: {item_to_pickup['quantity']}\n"
|
||||
text += f"Weight: {item_def.get('weight', 0)} kg each\n"
|
||||
text += f"Volume: {item_def.get('volume', 0)} vol each\n\n"
|
||||
text += "How many do you want to pick up?"
|
||||
|
||||
await query.answer()
|
||||
keyboard = keyboards.pickup_options_keyboard(
|
||||
dropped_item_id,
|
||||
item_def.get('name', 'Unknown'),
|
||||
item_to_pickup['quantity']
|
||||
)
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=text,
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
|
||||
|
||||
async def handle_pickup(query, user_id: int, player: dict, data: list):
|
||||
"""Pick up a dropped item from the world."""
|
||||
dropped_item_id = int(data[1])
|
||||
pickup_amount_str = data[2] if len(data) > 2 else "all"
|
||||
|
||||
item_to_pickup = await database.get_dropped_item(dropped_item_id)
|
||||
if not item_to_pickup:
|
||||
await query.answer("Someone already picked that up!", show_alert=False)
|
||||
location_id = player['location_id']
|
||||
location = game_world.get_location(location_id)
|
||||
dropped_items = await database.get_dropped_items_in_location(location_id)
|
||||
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
|
||||
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text="You scan the area. You notice...",
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
return
|
||||
|
||||
# Determine how much to pick up
|
||||
if pickup_amount_str == "all":
|
||||
pickup_amount = item_to_pickup['quantity']
|
||||
else:
|
||||
pickup_amount = min(int(pickup_amount_str), item_to_pickup['quantity'])
|
||||
|
||||
# Check inventory capacity
|
||||
can_add, reason = await logic.can_add_item_to_inventory(
|
||||
user_id, item_to_pickup['item_id'], pickup_amount
|
||||
)
|
||||
|
||||
if not can_add:
|
||||
await query.answer(reason, show_alert=True)
|
||||
return
|
||||
|
||||
# Add to inventory
|
||||
await database.add_item_to_inventory(user_id, item_to_pickup['item_id'], pickup_amount)
|
||||
|
||||
# Update or remove dropped item
|
||||
remaining = item_to_pickup['quantity'] - pickup_amount
|
||||
item_def = ITEMS.get(item_to_pickup['item_id'], {})
|
||||
|
||||
if remaining > 0:
|
||||
await database.update_dropped_item(dropped_item_id, remaining)
|
||||
await query.answer(
|
||||
f"Picked up {pickup_amount}x {item_def.get('name', 'item')}. {remaining} remaining.",
|
||||
show_alert=False
|
||||
)
|
||||
else:
|
||||
await database.remove_dropped_item(dropped_item_id)
|
||||
await query.answer(
|
||||
f"Picked up {pickup_amount}x {item_def.get('name', 'item')}.",
|
||||
show_alert=False
|
||||
)
|
||||
|
||||
# Return to inspect area
|
||||
location_id = player['location_id']
|
||||
location = game_world.get_location(location_id)
|
||||
dropped_items = await database.get_dropped_items_in_location(location_id)
|
||||
wandering_enemies = await database.get_wandering_enemies_in_location(location_id)
|
||||
keyboard = await keyboards.inspect_keyboard(location_id, dropped_items, wandering_enemies)
|
||||
image_path = location.image_path if location else None
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text="You scan the area. You notice...",
|
||||
reply_markup=keyboard,
|
||||
image_path=image_path
|
||||
)
|
||||
152
bot/profile_handlers.py
Normal file
152
bot/profile_handlers.py
Normal file
@@ -0,0 +1,152 @@
|
||||
"""
|
||||
Profile and character stat management handlers.
|
||||
"""
|
||||
import logging
|
||||
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from . import database, keyboards
|
||||
from data.world_loader import game_world
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def handle_profile(query, user_id: int, player: dict):
|
||||
"""Show player profile and stats."""
|
||||
await query.answer()
|
||||
from bot import combat
|
||||
from .utils import format_stat_bar, create_progress_bar
|
||||
|
||||
# Calculate stats
|
||||
xp_current = player['xp']
|
||||
xp_needed = combat.xp_for_level(player['level'] + 1)
|
||||
xp_for_current_level = combat.xp_for_level(player['level'])
|
||||
xp_progress = max(0, xp_current - xp_for_current_level)
|
||||
xp_level_requirement = xp_needed - xp_for_current_level
|
||||
progress_percent = int((xp_progress / xp_level_requirement) * 100) if xp_level_requirement > 0 else 0
|
||||
|
||||
unspent = player.get('unspent_points', 0)
|
||||
|
||||
# Build profile with visual bars
|
||||
profile_text = f"👤 <b>{player['name']}</b>\n"
|
||||
profile_text += f"━━━━━━━━━━━━━━━━━━━━\n\n"
|
||||
profile_text += f"<b>Level:</b> {player['level']}\n"
|
||||
|
||||
# XP bar
|
||||
xp_bar = create_progress_bar(xp_progress, xp_level_requirement, length=10)
|
||||
profile_text += f"⭐ XP: {xp_bar} {progress_percent}% ({xp_current}/{xp_needed})\n"
|
||||
|
||||
if unspent > 0:
|
||||
profile_text += f"💎 <b>Unspent Points:</b> {unspent}\n"
|
||||
|
||||
profile_text += f"\n{format_stat_bar('HP', '❤️', player['hp'], player['max_hp'])}\n"
|
||||
profile_text += f"{format_stat_bar('Stamina', '⚡', player['stamina'], player['max_stamina'])}\n\n"
|
||||
profile_text += f"<b>Stats:</b>\n"
|
||||
profile_text += f"💪 Strength: {player['strength']}\n"
|
||||
profile_text += f"🏃 Agility: {player['agility']}\n"
|
||||
profile_text += f"💚 Endurance: {player['endurance']}\n"
|
||||
profile_text += f"🧠 Intellect: {player['intellect']}\n\n"
|
||||
profile_text += f"<b>Combat:</b>\n"
|
||||
profile_text += f"⚔️ Base Damage: {5 + player['strength'] // 2 + player['level']}\n"
|
||||
profile_text += f"🛡️ Flee Chance: {int((0.5 + player['agility'] / 100) * 100)}%\n"
|
||||
profile_text += f"💚 Stamina Regen: {1 + player['endurance'] // 10}/cycle\n"
|
||||
|
||||
location = game_world.get_location(player['location_id'])
|
||||
location_image = location.image_path if location else None
|
||||
|
||||
# Add spend points button if player has unspent points
|
||||
keyboard_buttons = []
|
||||
if unspent > 0:
|
||||
keyboard_buttons.append([
|
||||
InlineKeyboardButton("⭐ Spend Stat Points", callback_data="spend_points_menu")
|
||||
])
|
||||
keyboard_buttons.append([InlineKeyboardButton("⬅️ Back", callback_data="main_menu")])
|
||||
back_keyboard = InlineKeyboardMarkup(keyboard_buttons)
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(
|
||||
query,
|
||||
text=profile_text,
|
||||
reply_markup=back_keyboard,
|
||||
image_path=location_image
|
||||
)
|
||||
|
||||
|
||||
async def handle_spend_points_menu(query, user_id: int, player: dict):
|
||||
"""Show stat point spending menu."""
|
||||
await query.answer()
|
||||
unspent = player.get('unspent_points', 0)
|
||||
|
||||
if unspent <= 0:
|
||||
await query.answer("You have no points to spend!", show_alert=False)
|
||||
return
|
||||
|
||||
text = f"⭐ <b>Spend Stat Points</b>\n\n"
|
||||
text += f"Available Points: <b>{unspent}</b>\n\n"
|
||||
text += f"Current Stats:\n"
|
||||
text += f"❤️ Max HP: {player['max_hp']} (+10 per point)\n"
|
||||
text += f"⚡ Max Stamina: {player['max_stamina']} (+5 per point)\n"
|
||||
text += f"💪 Strength: {player['strength']} (+1 per point)\n"
|
||||
text += f"🏃 Agility: {player['agility']} (+1 per point)\n"
|
||||
text += f"💚 Endurance: {player['endurance']} (+1 per point)\n"
|
||||
text += f"🧠 Intellect: {player['intellect']} (+1 per point)\n\n"
|
||||
text += f"💡 Choose wisely! Each point matters."
|
||||
|
||||
keyboard = keyboards.spend_points_keyboard()
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(query, text=text, reply_markup=keyboard)
|
||||
|
||||
|
||||
async def handle_spend_point(query, user_id: int, player: dict, data: list):
|
||||
"""Spend a stat point on a specific attribute."""
|
||||
stat_name = data[1]
|
||||
unspent = player.get('unspent_points', 0)
|
||||
|
||||
if unspent <= 0:
|
||||
await query.answer("You have no points to spend!", show_alert=False)
|
||||
return
|
||||
|
||||
# Map stat names to updates
|
||||
stat_mapping = {
|
||||
'max_hp': ('max_hp', 10, '❤️ Max HP'),
|
||||
'max_stamina': ('max_stamina', 5, '⚡ Max Stamina'),
|
||||
'strength': ('strength', 1, '💪 Strength'),
|
||||
'agility': ('agility', 1, '🏃 Agility'),
|
||||
'endurance': ('endurance', 1, '💚 Endurance'),
|
||||
'intellect': ('intellect', 1, '🧠 Intellect'),
|
||||
}
|
||||
|
||||
if stat_name not in stat_mapping:
|
||||
await query.answer("Invalid stat!", show_alert=False)
|
||||
return
|
||||
|
||||
db_field, increase, display_name = stat_mapping[stat_name]
|
||||
new_value = player[db_field] + increase
|
||||
new_unspent = unspent - 1
|
||||
|
||||
await database.update_player(user_id, {
|
||||
db_field: new_value,
|
||||
'unspent_points': new_unspent
|
||||
})
|
||||
|
||||
# Update local player data
|
||||
player[db_field] = new_value
|
||||
player['unspent_points'] = new_unspent
|
||||
|
||||
await query.answer(f"+{increase} {display_name}!", show_alert=False)
|
||||
|
||||
# Refresh the spend points menu
|
||||
text = f"⭐ <b>Spend Stat Points</b>\n\n"
|
||||
text += f"Available Points: <b>{new_unspent}</b>\n\n"
|
||||
text += f"Current Stats:\n"
|
||||
text += f"❤️ Max HP: {player['max_hp']} (+10 per point)\n"
|
||||
text += f"⚡ Max Stamina: {player['max_stamina']} (+5 per point)\n"
|
||||
text += f"💪 Strength: {player['strength']} (+1 per point)\n"
|
||||
text += f"🏃 Agility: {player['agility']} (+1 per point)\n"
|
||||
text += f"💚 Endurance: {player['endurance']} (+1 per point)\n"
|
||||
text += f"🧠 Intellect: {player['intellect']} (+1 per point)\n\n"
|
||||
text += f"💡 Choose wisely! Each point matters."
|
||||
|
||||
keyboard = keyboards.spend_points_keyboard()
|
||||
|
||||
from .handlers import send_or_edit_with_image
|
||||
await send_or_edit_with_image(query, text=text, reply_markup=keyboard)
|
||||
58
bot/utils.py
58
bot/utils.py
@@ -10,6 +10,64 @@ from telegram.ext import ContextTypes
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_progress_bar(current: int, maximum: int, length: int = 10, filled_char: str = "█", empty_char: str = "░") -> str:
|
||||
"""
|
||||
Create a visual progress bar.
|
||||
|
||||
Args:
|
||||
current: Current value
|
||||
maximum: Maximum value
|
||||
length: Length of the bar in characters (default 10)
|
||||
filled_char: Character for filled portion (default █)
|
||||
empty_char: Character for empty portion (default ░)
|
||||
|
||||
Returns:
|
||||
String representation of progress bar
|
||||
|
||||
Examples:
|
||||
>>> create_progress_bar(75, 100)
|
||||
"███████░░░"
|
||||
>>> create_progress_bar(0, 100)
|
||||
"░░░░░░░░░░"
|
||||
>>> create_progress_bar(100, 100)
|
||||
"██████████"
|
||||
"""
|
||||
if maximum <= 0:
|
||||
return empty_char * length
|
||||
|
||||
percentage = min(1.0, max(0.0, current / maximum))
|
||||
filled_length = int(length * percentage)
|
||||
empty_length = length - filled_length
|
||||
|
||||
return filled_char * filled_length + empty_char * empty_length
|
||||
|
||||
|
||||
def format_stat_bar(label: str, emoji: str, current: int, maximum: int, bar_length: int = 10) -> str:
|
||||
"""
|
||||
Format a stat (HP, Stamina, etc.) with visual progress bar.
|
||||
|
||||
Args:
|
||||
label: Stat label (e.g., "HP", "Stamina")
|
||||
emoji: Emoji to display (e.g., "❤️", "⚡")
|
||||
current: Current value
|
||||
maximum: Maximum value
|
||||
bar_length: Length of the progress bar
|
||||
|
||||
Returns:
|
||||
Formatted string with bar and percentage
|
||||
|
||||
Examples:
|
||||
>>> format_stat_bar("HP", "❤️", 75, 100)
|
||||
"❤️ HP: ███████░░░ 75% (75/100)"
|
||||
>>> format_stat_bar("Stamina", "⚡", 50, 100)
|
||||
"⚡ Stamina: █████░░░░░ 50% (50/100)"
|
||||
"""
|
||||
bar = create_progress_bar(current, maximum, bar_length)
|
||||
percentage = int((current / maximum * 100)) if maximum > 0 else 0
|
||||
|
||||
return f"{emoji} {label}: {bar} {percentage}% ({current}/{maximum})"
|
||||
|
||||
|
||||
def get_admin_ids():
|
||||
"""Get the list of admin user IDs from environment variable."""
|
||||
admin_ids_str = os.getenv("ADMIN_IDS", "")
|
||||
|
||||
Reference in New Issue
Block a user