Initial commit: Echoes of the Ashes - Telegram RPG Bot

This commit is contained in:
Joan
2025-10-18 19:21:19 +02:00
commit 3ab412bc09
65 changed files with 14484 additions and 0 deletions

0
bot/__init__.py Normal file
View File

495
bot/combat.py Normal file
View File

@@ -0,0 +1,495 @@
"""
Combat system logic for turn-based NPC encounters.
"""
import random
import json
import time
from typing import Dict, List, Tuple, Optional
from bot import database
from data.npcs import NPCS, STATUS_EFFECTS
from data.items import ITEMS
# XP curve for leveling
def xp_for_level(level: int) -> int:
"""Calculate XP needed to reach a level."""
if level <= 1:
return 0 # Level 1 starts at 0 XP
return int(100 * (level ** 1.5))
async def calculate_player_damage(player: dict) -> int:
"""Calculate player's damage output based on stats and equipped weapon."""
base_damage = 5
strength_bonus = player['strength'] // 2
level_bonus = player['level']
# Check for equipped weapon
inventory = await database.get_inventory(player['telegram_id'])
weapon_damage = 0
for item in inventory:
if item.get('is_equipped'):
item_def = ITEMS.get(item['item_id'], {})
if item_def.get('type') == 'weapon':
# Get weapon damage range
damage_min = item_def.get('damage_min', 0)
damage_max = item_def.get('damage_max', 0)
weapon_damage = random.randint(damage_min, damage_max)
break
# Random variance
variance = random.randint(-2, 2)
return max(1, base_damage + strength_bonus + level_bonus + weapon_damage + variance)
def calculate_npc_damage(npc_def, npc_hp: int, npc_max_hp: int) -> int:
"""Calculate NPC's damage output."""
base_damage = random.randint(npc_def.damage_min, npc_def.damage_max)
# Enraged bonus if low HP
hp_percent = npc_hp / npc_max_hp
if hp_percent < 0.3:
base_damage = int(base_damage * 1.5)
return max(1, base_damage)
async def initiate_combat(player_id: int, npc_id: str, location_id: str, from_wandering_enemy: bool = False) -> Dict:
"""
Start a new combat encounter.
Args:
player_id: Telegram user ID
npc_id: NPC definition ID
location_id: Where combat is happening
from_wandering_enemy: If True, enemy will respawn if player flees or dies
Returns combat state dict.
"""
npc_def = NPCS.get(npc_id)
if not npc_def:
return None
# Randomize NPC HP
npc_hp = random.randint(npc_def.hp_min, npc_def.hp_max)
# Create combat in database
combat_id = await database.create_combat(
player_id=player_id,
npc_id=npc_id,
npc_hp=npc_hp,
npc_max_hp=npc_hp,
location_id=location_id,
from_wandering_enemy=from_wandering_enemy
)
return await database.get_combat(player_id)
async def player_attack(player_id: int) -> Tuple[str, bool, bool]:
"""
Player attacks the NPC.
Returns: (message, npc_died, player_turn_ended)
"""
combat = await database.get_combat(player_id)
if not combat or combat['turn'] != 'player':
return ("It's not your turn!", False, False)
player = await database.get_player(player_id)
npc_def = NPCS.get(combat['npc_id'])
if not player or not npc_def:
return ("Combat error!", False, False)
# Check if player is stunned
player_effects = json.loads(combat['player_status_effects'])
is_stunned = any(effect['name'] == 'Stunned' for effect in player_effects)
if is_stunned:
# Update status effects
player_effects = update_status_effects(player_effects)
await database.update_combat(player_id, {
'turn': 'npc',
'turn_started_at': time.time(),
'player_status_effects': json.dumps(player_effects)
})
return ("⚠️ You're stunned and cannot attack! The enemy seizes the opportunity!", False, True)
# Calculate damage
raw_damage = await calculate_player_damage(player)
actual_damage = max(1, raw_damage - npc_def.defense)
new_npc_hp = max(0, combat['npc_hp'] - actual_damage)
# Check for critical hit (10% chance)
is_crit = random.random() < 0.1
if is_crit:
actual_damage = int(actual_damage * 1.5)
new_npc_hp = max(0, combat['npc_hp'] - actual_damage)
message = f"⚔️ You attack the {npc_def.name} for {actual_damage} damage!"
if is_crit:
message += " 💥 CRITICAL HIT!"
# Check for status effect infliction (5% chance to stun)
npc_effects = json.loads(combat['npc_status_effects'])
if random.random() < 0.05:
npc_effects.append({
'name': 'Stunned',
'turns_remaining': 1,
'damage_per_turn': 0
})
message += f"\n🌟 You stunned the {npc_def.name}!"
# Apply status effect damage to player
player_effects, status_damage, status_messages = apply_status_effects(player_effects)
if status_damage > 0:
new_player_hp = max(0, player['hp'] - status_damage)
await database.update_player(player_id, {'hp': new_player_hp})
message += f"\n{status_messages}"
if new_player_hp <= 0:
await handle_player_death(player_id)
return (message + "\n\n💀 You have died from your wounds...", True, True)
# Check if NPC died
if new_npc_hp <= 0:
await database.update_combat(player_id, {
'npc_hp': 0,
'npc_status_effects': json.dumps(npc_effects),
'player_status_effects': json.dumps(player_effects)
})
# Handle victory
victory_msg = await handle_npc_death(player_id, combat, npc_def)
return (message + "\n\n" + victory_msg, True, True)
# Update combat - switch to NPC turn
await database.update_combat(player_id, {
'npc_hp': new_npc_hp,
'turn': 'npc',
'turn_started_at': time.time(),
'npc_status_effects': json.dumps(npc_effects),
'player_status_effects': json.dumps(player_effects)
})
message += f"\n{npc_def.emoji} {npc_def.name}: {new_npc_hp}/{combat['npc_max_hp']} HP"
return (message, False, True)
async def npc_attack(player_id: int) -> Tuple[str, bool]:
"""
NPC attacks the player.
Returns: (message, player_died)
"""
combat = await database.get_combat(player_id)
if not combat or combat['turn'] != 'npc':
return ("", False)
player = await database.get_player(player_id)
npc_def = NPCS.get(combat['npc_id'])
if not player or not npc_def:
return ("Combat error!", False)
# Check if NPC is stunned
npc_effects = json.loads(combat['npc_status_effects'])
is_stunned = any(effect['name'] == 'Stunned' for effect in npc_effects)
if is_stunned:
# Update status effects
npc_effects = update_status_effects(npc_effects)
await database.update_combat(player_id, {
'turn': 'player',
'turn_started_at': time.time(),
'npc_status_effects': json.dumps(npc_effects)
})
return (f"⚠️ The {npc_def.name} is stunned and cannot attack!", False)
# Calculate damage
damage = calculate_npc_damage(npc_def, combat['npc_hp'], combat['npc_max_hp'])
# Apply damage to player
new_player_hp = max(0, player['hp'] - damage)
await database.update_player(player_id, {'hp': new_player_hp})
message = f"💥 The {npc_def.name} attacks you for {damage} damage!"
# Check for status effect infliction
player_effects = json.loads(combat['player_status_effects'])
if random.random() < npc_def.status_inflict_chance:
# Bleeding is most common
player_effects.append({
'name': 'Bleeding',
'turns_remaining': 3,
'damage_per_turn': 2
})
message += "\n🩸 You're bleeding!"
# Apply status effect damage to NPC
npc_effects, status_damage, status_messages = apply_status_effects(npc_effects)
if status_damage > 0:
new_npc_hp = max(0, combat['npc_hp'] - status_damage)
await database.update_combat(player_id, {'npc_hp': new_npc_hp})
message += f"\n{status_messages}"
if new_npc_hp <= 0:
victory_msg = await handle_npc_death(player_id, combat, npc_def)
return (message + "\n\n" + victory_msg, False)
# Check if player died
if new_player_hp <= 0:
await handle_player_death(player_id)
return (message + "\n\n💀 You have been slain...", True)
# Update combat - switch to player turn
await database.update_combat(player_id, {
'turn': 'player',
'turn_started_at': time.time(),
'player_status_effects': json.dumps(player_effects),
'npc_status_effects': json.dumps(npc_effects)
})
message += f"\n❤️ Your HP: {new_player_hp}/{player['max_hp']}"
message += f"\n{npc_def.emoji} {npc_def.name}: {combat['npc_hp']}/{combat['npc_max_hp']} HP"
return (message, False)
async def flee_attempt(player_id: int) -> Tuple[str, bool, bool]:
"""
Player attempts to flee from combat.
Returns: (message, fled_successfully, turn_ended)
"""
combat = await database.get_combat(player_id)
if not combat or combat['turn'] != 'player':
return ("It's not your turn!", False, False)
player = await database.get_player(player_id)
npc_def = NPCS.get(combat['npc_id'])
# Base flee chance is 50%, modified by agility
flee_chance = 0.5 + (player['agility'] / 100)
if random.random() < flee_chance:
# Success! Check if we need to respawn the wandering enemy
if combat.get('from_wandering_enemy', False):
# Respawn the enemy at the same location
await database.spawn_wandering_enemy(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
lifetime_seconds=600 # 10 minutes
)
await database.end_combat(player_id)
return (f"🏃 You successfully flee from the {npc_def.name}!", True, True)
else:
# Failed - lose turn and NPC attacks
message = f"❌ You failed to escape! The {npc_def.name} takes advantage!"
# NPC gets a free attack
await database.update_combat(player_id, {
'turn': 'npc',
'turn_started_at': time.time()
})
return (message, False, True)
def update_status_effects(effects: List[Dict]) -> List[Dict]:
"""Decrease turn counters on status effects."""
new_effects = []
for effect in effects:
effect['turns_remaining'] -= 1
if effect['turns_remaining'] > 0:
new_effects.append(effect)
return new_effects
def apply_status_effects(effects: List[Dict]) -> Tuple[List[Dict], int, str]:
"""
Apply status effect damage.
Returns: (updated_effects, total_damage, message)
"""
total_damage = 0
messages = []
for effect in effects:
if effect['damage_per_turn'] > 0:
total_damage += effect['damage_per_turn']
if effect['name'] == 'Bleeding':
messages.append(f"🩸 Bleeding: -{effect['damage_per_turn']} HP")
elif effect['name'] == 'Infected':
messages.append(f"🦠 Infection: -{effect['damage_per_turn']} HP")
return effects, total_damage, "\n".join(messages)
async def handle_npc_death(player_id: int, combat: Dict, npc_def) -> str:
"""Handle NPC death - give XP, drop loot, create corpse."""
player = await database.get_player(player_id)
# Give XP
new_xp = player['xp'] + npc_def.xp_reward
level_up_msg = ""
# Check for level up
current_level = player['level']
xp_needed = xp_for_level(current_level + 1)
if new_xp >= xp_needed:
new_level = current_level + 1
# Give stat points instead of auto-allocating
# Players get 5 points per level to spend as they wish
points_gained = 5
new_unspent_points = player.get('unspent_points', 0) + points_gained
await database.update_player(player_id, {
'xp': new_xp,
'level': new_level,
'hp': player['max_hp'], # Heal on level up
'stamina': player['max_stamina'], # Restore stamina on level up
'unspent_points': new_unspent_points
})
level_up_msg = f"\n\n🎉 LEVEL UP! You are now level {new_level}!"
level_up_msg += f"\n⭐ You have {points_gained} stat points to spend!"
level_up_msg += f"\n❤️ Fully healed and stamina restored!"
level_up_msg += f"\n\n💡 Check your profile to spend your points!"
else:
await database.update_player(player_id, {'xp': new_xp})
# Drop loot
loot_msg = "\n\n💰 Loot dropped:"
loot_items = []
for loot_item in npc_def.loot_table:
if random.random() < loot_item.drop_chance:
quantity = random.randint(loot_item.quantity_min, loot_item.quantity_max)
await database.drop_item_to_world(
loot_item.item_id,
quantity,
combat['location_id']
)
item_def = ITEMS.get(loot_item.item_id, {})
loot_msg += f"\n{item_def.get('emoji', '')} {item_def.get('name', 'Unknown')} x{quantity}"
loot_items.append(loot_item.item_id)
if not loot_items:
loot_msg += "\nNothing..."
# Create corpse if it has corpse loot
if npc_def.corpse_loot:
corpse_loot_json = json.dumps([{
'item_id': cl.item_id,
'quantity_min': cl.quantity_min,
'quantity_max': cl.quantity_max,
'required_tool': cl.required_tool
} for cl in npc_def.corpse_loot])
await database.create_npc_corpse(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
loot_remaining=corpse_loot_json
)
loot_msg += f"\n\n{npc_def.emoji} The corpse can be scavenged for more resources."
# End combat
await database.end_combat(player_id)
message = f"🏆 Victory! {npc_def.death_message}"
message += f"\n+{npc_def.xp_reward} XP"
message += level_up_msg
message += loot_msg
return message
async def handle_player_death(player_id: int):
"""Handle player death - create corpse bag with all items."""
player = await database.get_player(player_id)
inventory_items = await database.get_inventory(player_id)
# Check if combat was with a wandering enemy that should respawn
combat = await database.get_combat(player_id)
if combat and combat.get('from_wandering_enemy', False):
# Respawn the enemy at the same location
await database.spawn_wandering_enemy(
npc_id=combat['npc_id'],
location_id=combat['location_id'],
lifetime_seconds=600 # 10 minutes
)
# Create corpse bag if player has items
if inventory_items:
items_json = json.dumps([{
'item_id': item['item_id'],
'quantity': item['quantity']
} for item in inventory_items])
await database.create_player_corpse(
player_name=player['name'],
location_id=player['location_id'],
items=items_json
)
# Remove all items from player
for item in inventory_items:
await database.remove_item_from_inventory(item['id'], item['quantity'])
# Mark player as dead and end any combat
await database.update_player(player_id, {'is_dead': True, 'hp': 0})
await database.end_combat(player_id)
async def use_item_in_combat(player_id: int, item_db_id: int) -> Tuple[str, bool]:
"""
Use a consumable item during combat.
Returns: (message, turn_ended)
"""
combat = await database.get_combat(player_id)
if not combat or combat['turn'] != 'player':
return ("It's not your turn!", False)
item_data = await database.get_inventory_item(item_db_id)
if not item_data or item_data['player_id'] != player_id:
return ("You don't have that item!", False)
item_def = ITEMS.get(item_data['item_id'])
if not item_def or item_def.get('type') != 'consumable':
return ("That item cannot be used in combat!", False)
player = await database.get_player(player_id)
# Apply consumable effects
message = f"💊 Used {item_def['name']}!"
hp_restore = item_def.get('hp_restore', 0)
stamina_restore = item_def.get('stamina_restore', 0)
updates = {}
if hp_restore > 0:
new_hp = min(player['hp'] + hp_restore, player['max_hp'])
updates['hp'] = new_hp
message += f"\n❤️ +{hp_restore} HP"
if stamina_restore > 0:
new_stamina = min(player['stamina'] + stamina_restore, player['max_stamina'])
updates['stamina'] = new_stamina
message += f"\n⚡ +{stamina_restore} Stamina"
if updates:
await database.update_player(player_id, updates)
# Remove item from inventory
if item_data['quantity'] > 1:
await database.update_inventory_item(item_db_id, item_data['quantity'] - 1)
else:
await database.remove_item_from_inventory(item_db_id, 1)
# Using an item ends your turn
await database.update_combat(player_id, {
'turn': 'npc',
'turn_started_at': time.time()
})
return (message, True)

539
bot/database.py Normal file
View File

@@ -0,0 +1,539 @@
import time
import os
from typing import Set
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import (
MetaData, Table, Column, Integer, String, Boolean, ForeignKey, Float, UniqueConstraint,
)
DB_USER, DB_PASS, DB_NAME, DB_HOST, DB_PORT = os.getenv("POSTGRES_USER"), os.getenv("POSTGRES_PASSWORD"), os.getenv("POSTGRES_DB"), os.getenv("POSTGRES_HOST"), os.getenv("POSTGRES_PORT")
DATABASE_URL = f"postgresql+psycopg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_async_engine(DATABASE_URL)
metadata = MetaData()
# ... (players, inventory, dropped_items tables are unchanged) ...
players = Table("players", metadata, Column("telegram_id", Integer, primary_key=True), Column("name", String, default="Survivor"), Column("hp", Integer, default=100), Column("max_hp", Integer, default=100), Column("stamina", Integer, default=20), Column("max_stamina", Integer, default=20), Column("strength", Integer, default=5), Column("agility", Integer, default=5), Column("endurance", Integer, default=5), Column("intellect", Integer, default=5), Column("location_id", String, default="start_point"), Column("is_dead", Boolean, default=False), Column("level", Integer, default=1), Column("xp", Integer, default=0), Column("unspent_points", Integer, default=0))
inventory = Table("inventory", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("player_id", Integer, ForeignKey("players.telegram_id", ondelete="CASCADE")), Column("item_id", String), Column("quantity", Integer, default=1), Column("is_equipped", Boolean, default=False))
dropped_items = Table("dropped_items", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("item_id", String), Column("quantity", Integer, default=1), Column("location_id", String), Column("drop_timestamp", Float))
# Combat-related tables
active_combats = Table(
"active_combats",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("player_id", Integer, ForeignKey("players.telegram_id", ondelete="CASCADE"), unique=True),
Column("npc_id", String, nullable=False),
Column("npc_hp", Integer, nullable=False),
Column("npc_max_hp", Integer, nullable=False),
Column("turn", String, nullable=False), # "player" or "npc"
Column("turn_started_at", Float, nullable=False),
Column("player_status_effects", String, default=""), # JSON string
Column("npc_status_effects", String, default=""), # JSON string
Column("location_id", String, nullable=False),
Column("from_wandering_enemy", Boolean, default=False), # If True, respawn on flee/death
)
player_corpses = Table(
"player_corpses",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("player_name", String, nullable=False),
Column("location_id", String, nullable=False),
Column("items", String, nullable=False), # JSON string of items
Column("death_timestamp", Float, nullable=False),
)
npc_corpses = Table(
"npc_corpses",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("npc_id", String, nullable=False),
Column("location_id", String, nullable=False),
Column("loot_remaining", String, nullable=False), # JSON string
Column("death_timestamp", Float, nullable=False),
)
interactable_cooldowns = Table(
"interactable_cooldowns",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("interactable_instance_id", String, nullable=False, unique=True), # Renamed for clarity
Column("expiry_timestamp", Float, nullable=False),
)
# Table to cache Telegram file IDs for images
image_cache = Table(
"image_cache",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("image_path", String, nullable=False, unique=True), # Local file path
Column("telegram_file_id", String, nullable=False), # Telegram's file_id for reuse
Column("uploaded_at", Float, nullable=False),
)
# Wandering enemies table - managed by spawn system
wandering_enemies = Table(
"wandering_enemies",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("npc_id", String, nullable=False),
Column("location_id", String, nullable=False),
Column("spawn_timestamp", Float, nullable=False),
Column("despawn_timestamp", Float, nullable=False), # When this enemy should despawn
)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(metadata.create_all)
# ... (All other database functions are unchanged except the cooldown ones) ...
async def get_player(telegram_id: int):
async with engine.connect() as conn:
result = await conn.execute(players.select().where(players.c.telegram_id == telegram_id))
row = result.first()
return row._asdict() if row else None
async def create_player(telegram_id: int, name: str):
async with engine.connect() as conn:
await conn.execute(players.insert().values(telegram_id=telegram_id, name=name))
await conn.execute(inventory.insert().values(player_id=telegram_id, item_id="tattered_rucksack", is_equipped=True))
await conn.commit()
return await get_player(telegram_id)
async def update_player(telegram_id: int, updates: dict):
async with engine.connect() as conn:
await conn.execute(players.update().where(players.c.telegram_id == telegram_id).values(**updates))
await conn.commit()
async def get_inventory(player_id: int):
async with engine.connect() as conn:
result = await conn.execute(inventory.select().where(inventory.c.player_id == player_id))
return [row._asdict() for row in result.fetchall()]
async def get_inventory_item(item_db_id: int):
async with engine.connect() as conn:
result = await conn.execute(inventory.select().where(inventory.c.id == item_db_id))
row = result.first()
return row._asdict() if row else None
async def add_item_to_inventory(player_id: int, item_id: str, quantity: int = 1):
async with engine.connect() as conn:
result = await conn.execute(inventory.select().where(inventory.c.player_id == player_id, inventory.c.item_id == item_id))
existing_item = result.first()
if existing_item: stmt = inventory.update().where(inventory.c.id == existing_item.id).values(quantity=inventory.c.quantity + quantity)
else: stmt = inventory.insert().values(player_id=player_id, item_id=item_id, quantity=quantity)
await conn.execute(stmt)
await conn.commit()
async def add_equipped_item_to_inventory(player_id: int, item_id: str) -> int:
"""Add a single equipped item to inventory and return its ID."""
async with engine.connect() as conn:
stmt = inventory.insert().values(
player_id=player_id,
item_id=item_id,
quantity=1,
is_equipped=True
)
result = await conn.execute(stmt)
await conn.commit()
return result.inserted_primary_key[0]
async def update_inventory_item(item_db_id: int, quantity: int = None, is_equipped: bool = None):
"""Update inventory item properties."""
async with engine.connect() as conn:
updates = {}
if quantity is not None:
updates['quantity'] = quantity
if is_equipped is not None:
updates['is_equipped'] = is_equipped
if updates:
stmt = inventory.update().where(inventory.c.id == item_db_id).values(**updates)
await conn.execute(stmt)
await conn.commit()
async def remove_item_from_inventory(item_db_id: int, quantity: int = 1):
async with engine.connect() as conn:
result = await conn.execute(inventory.select().where(inventory.c.id == item_db_id))
item_data = result.first()
if not item_data: return
if item_data.quantity > quantity: stmt = inventory.update().where(inventory.c.id == item_db_id).values(quantity=inventory.c.quantity - quantity)
else: stmt = inventory.delete().where(inventory.c.id == item_db_id)
await conn.execute(stmt)
await conn.commit()
async def drop_item_to_world(item_id: str, quantity: int, location_id: str):
"""Drop item to world. Combines with existing stacks of same item in same location."""
async with engine.connect() as conn:
# Check if this item already exists in this location
result = await conn.execute(
dropped_items.select().where(
(dropped_items.c.item_id == item_id) &
(dropped_items.c.location_id == location_id)
)
)
existing_item = result.first()
if existing_item:
# Stack exists, add to it
new_quantity = existing_item.quantity + quantity
stmt = dropped_items.update().where(dropped_items.c.id == existing_item.id).values(
quantity=new_quantity,
drop_timestamp=time.time() # Update timestamp
)
else:
# Create new stack
stmt = dropped_items.insert().values(
item_id=item_id,
quantity=quantity,
location_id=location_id,
drop_timestamp=time.time()
)
await conn.execute(stmt)
await conn.commit()
async def get_dropped_items_in_location(location_id: str):
async with engine.connect() as conn:
result = await conn.execute(dropped_items.select().where(dropped_items.c.location_id == location_id).limit(10))
return [row._asdict() for row in result.fetchall()]
async def get_dropped_item(dropped_item_id: int):
async with engine.connect() as conn:
result = await conn.execute(dropped_items.select().where(dropped_items.c.id == dropped_item_id))
row = result.first()
return row._asdict() if row else None
async def remove_dropped_item(dropped_item_id: int):
async with engine.connect() as conn:
await conn.execute(dropped_items.delete().where(dropped_items.c.id == dropped_item_id))
await conn.commit()
async def update_dropped_item(dropped_item_id: int, new_quantity: int):
"""Update the quantity of a dropped item."""
async with engine.connect() as conn:
stmt = dropped_items.update().where(dropped_items.c.id == dropped_item_id).values(quantity=new_quantity)
await conn.execute(stmt)
await conn.commit()
async def remove_expired_dropped_items(timestamp_limit: float) -> int:
async with engine.connect() as conn:
stmt = dropped_items.delete().where(dropped_items.c.drop_timestamp < timestamp_limit)
result = await conn.execute(stmt)
await conn.commit()
return result.rowcount
async def regenerate_all_players_stamina() -> int:
"""
Regenerate stamina for all active players.
Recovery formula:
- Base recovery: 1 stamina per cycle (5 minutes)
- Endurance bonus: +1 stamina per 10 endurance points
- Example: 5 endurance = 1 stamina, 15 endurance = 2 stamina, 25 endurance = 3 stamina
- Only regenerates up to max_stamina
- Only regenerates for living players
"""
async with engine.connect() as conn:
# Get all living players who are below max stamina
result = await conn.execute(
players.select().where(
(players.c.is_dead == False) &
(players.c.stamina < players.c.max_stamina)
)
)
players_to_update = result.fetchall()
updated_count = 0
for player in players_to_update:
# Calculate stamina recovery
base_recovery = 1
endurance_bonus = player.endurance // 10 # +1 per 10 endurance
total_recovery = base_recovery + endurance_bonus
# Calculate new stamina (capped at max)
new_stamina = min(player.stamina + total_recovery, player.max_stamina)
# Only update if there's actually a change
if new_stamina > player.stamina:
await conn.execute(
players.update()
.where(players.c.telegram_id == player.telegram_id)
.values(stamina=new_stamina)
)
updated_count += 1
await conn.commit()
return updated_count
COOLDOWN_DURATION = 300
async def set_cooldown(instance_id: str):
expiry_time = time.time() + COOLDOWN_DURATION
async with engine.connect() as conn:
update_stmt = interactable_cooldowns.update().where(interactable_cooldowns.c.interactable_instance_id == instance_id).values(expiry_timestamp=expiry_time)
result = await conn.execute(update_stmt)
if result.rowcount == 0:
insert_stmt = interactable_cooldowns.insert().values(interactable_instance_id=instance_id, expiry_timestamp=expiry_time)
await conn.execute(insert_stmt)
await conn.commit()
# --- Combat Functions ---
async def create_combat(player_id: int, npc_id: str, npc_hp: int, npc_max_hp: int, location_id: str, from_wandering_enemy: bool = False):
"""Start a new combat encounter."""
async with engine.connect() as conn:
stmt = active_combats.insert().values(
player_id=player_id,
npc_id=npc_id,
npc_hp=npc_hp,
npc_max_hp=npc_max_hp,
turn="player",
turn_started_at=time.time(),
location_id=location_id,
player_status_effects="[]",
npc_status_effects="[]",
from_wandering_enemy=from_wandering_enemy
)
result = await conn.execute(stmt)
await conn.commit()
return result.inserted_primary_key[0]
async def get_combat(player_id: int):
"""Get active combat for a player."""
async with engine.connect() as conn:
stmt = active_combats.select().where(active_combats.c.player_id == player_id)
result = await conn.execute(stmt)
row = result.first()
return row._asdict() if row else None
async def update_combat(player_id: int, updates: dict):
"""Update combat state."""
async with engine.connect() as conn:
stmt = active_combats.update().where(active_combats.c.player_id == player_id).values(**updates)
await conn.execute(stmt)
await conn.commit()
async def end_combat(player_id: int):
"""Remove active combat."""
async with engine.connect() as conn:
stmt = active_combats.delete().where(active_combats.c.player_id == player_id)
await conn.execute(stmt)
await conn.commit()
async def get_all_idle_combats(idle_threshold: float):
"""Get all combats where the turn has been idle too long."""
async with engine.connect() as conn:
stmt = active_combats.select().where(active_combats.c.turn_started_at < idle_threshold)
result = await conn.execute(stmt)
return [row._asdict() for row in result.fetchall()]
async def create_player_corpse(player_name: str, location_id: str, items: str):
"""Create a player corpse bag."""
async with engine.connect() as conn:
stmt = player_corpses.insert().values(
player_name=player_name,
location_id=location_id,
items=items,
death_timestamp=time.time()
)
await conn.execute(stmt)
await conn.commit()
async def get_player_corpses_in_location(location_id: str):
"""Get all player corpses in a location."""
async with engine.connect() as conn:
stmt = player_corpses.select().where(player_corpses.c.location_id == location_id)
result = await conn.execute(stmt)
return [row._asdict() for row in result.fetchall()]
async def get_player_corpse(corpse_id: int):
"""Get a specific player corpse."""
async with engine.connect() as conn:
stmt = player_corpses.select().where(player_corpses.c.id == corpse_id)
result = await conn.execute(stmt)
row = result.first()
return row._asdict() if row else None
async def update_player_corpse(corpse_id: int, items: str):
"""Update items in a player corpse."""
async with engine.connect() as conn:
stmt = player_corpses.update().where(player_corpses.c.id == corpse_id).values(items=items)
await conn.execute(stmt)
await conn.commit()
async def remove_player_corpse(corpse_id: int):
"""Remove a player corpse."""
async with engine.connect() as conn:
stmt = player_corpses.delete().where(player_corpses.c.id == corpse_id)
await conn.execute(stmt)
await conn.commit()
async def remove_expired_player_corpses(timestamp_limit: float) -> int:
"""Remove old player corpses."""
async with engine.connect() as conn:
stmt = player_corpses.delete().where(player_corpses.c.death_timestamp < timestamp_limit)
result = await conn.execute(stmt)
await conn.commit()
return result.rowcount
async def create_npc_corpse(npc_id: str, location_id: str, loot_remaining: str):
"""Create an NPC corpse for scavenging."""
async with engine.connect() as conn:
stmt = npc_corpses.insert().values(
npc_id=npc_id,
location_id=location_id,
loot_remaining=loot_remaining,
death_timestamp=time.time()
)
result = await conn.execute(stmt)
await conn.commit()
return result.inserted_primary_key[0]
async def get_npc_corpses_in_location(location_id: str):
"""Get all NPC corpses in a location."""
async with engine.connect() as conn:
stmt = npc_corpses.select().where(npc_corpses.c.location_id == location_id)
result = await conn.execute(stmt)
return [row._asdict() for row in result.fetchall()]
async def get_npc_corpse(corpse_id: int):
"""Get a specific NPC corpse."""
async with engine.connect() as conn:
stmt = npc_corpses.select().where(npc_corpses.c.id == corpse_id)
result = await conn.execute(stmt)
row = result.first()
return row._asdict() if row else None
async def update_npc_corpse(corpse_id: int, loot_remaining: str):
"""Update loot in an NPC corpse."""
async with engine.connect() as conn:
stmt = npc_corpses.update().where(npc_corpses.c.id == corpse_id).values(loot_remaining=loot_remaining)
await conn.execute(stmt)
await conn.commit()
async def remove_npc_corpse(corpse_id: int):
"""Remove an NPC corpse."""
async with engine.connect() as conn:
stmt = npc_corpses.delete().where(npc_corpses.c.id == corpse_id)
await conn.execute(stmt)
await conn.commit()
async def remove_expired_npc_corpses(timestamp_limit: float) -> int:
"""Remove old NPC corpses."""
async with engine.connect() as conn:
stmt = npc_corpses.delete().where(npc_corpses.c.death_timestamp < timestamp_limit)
result = await conn.execute(stmt)
await conn.commit()
return result.rowcount
async def get_cooldown(instance_id: str) -> int:
async with engine.connect() as conn:
stmt = interactable_cooldowns.select().where(interactable_cooldowns.c.interactable_instance_id == instance_id)
result = await conn.execute(stmt)
cooldown = result.first()
if cooldown and cooldown.expiry_timestamp > time.time():
return int(cooldown.expiry_timestamp - time.time())
return 0
async def get_cooldowns_for_location(location_id: str) -> Set[str]:
"""Get all active cooldown instance IDs for a location by checking the prefix."""
async with engine.connect() as conn:
stmt = interactable_cooldowns.select().where(
interactable_cooldowns.c.interactable_instance_id.startswith(location_id + "_"),
interactable_cooldowns.c.expiry_timestamp > time.time()
)
result = await conn.execute(stmt)
return {row.interactable_instance_id for row in result.fetchall()}
# --- Image Cache Functions ---
async def get_cached_image(image_path: str):
"""Get the Telegram file_id for a cached image."""
async with engine.connect() as conn:
stmt = image_cache.select().where(image_cache.c.image_path == image_path)
result = await conn.execute(stmt)
row = result.first()
return row.telegram_file_id if row else None
async def cache_image(image_path: str, telegram_file_id: str):
"""Store a Telegram file_id for an image path."""
async with engine.connect() as conn:
# Check if already exists
stmt = image_cache.select().where(image_cache.c.image_path == image_path)
result = await conn.execute(stmt)
existing = result.first()
if existing:
# Update existing entry
update_stmt = image_cache.update().where(
image_cache.c.image_path == image_path
).values(telegram_file_id=telegram_file_id, uploaded_at=time.time())
await conn.execute(update_stmt)
else:
# Insert new entry
insert_stmt = image_cache.insert().values(
image_path=image_path,
telegram_file_id=telegram_file_id,
uploaded_at=time.time()
)
await conn.execute(insert_stmt)
await conn.commit()
# --- Wandering Enemies Functions ---
async def spawn_wandering_enemy(npc_id: str, location_id: str, lifetime_seconds: int = 600):
"""Spawn a wandering enemy at a location. Lifetime defaults to 10 minutes."""
async with engine.connect() as conn:
current_time = time.time()
despawn_time = current_time + lifetime_seconds
await conn.execute(wandering_enemies.insert().values(
npc_id=npc_id,
location_id=location_id,
spawn_timestamp=current_time,
despawn_timestamp=despawn_time
))
await conn.commit()
async def get_wandering_enemies_in_location(location_id: str):
"""Get all active wandering enemies at a location."""
async with engine.connect() as conn:
current_time = time.time()
stmt = wandering_enemies.select().where(
wandering_enemies.c.location_id == location_id,
wandering_enemies.c.despawn_timestamp > current_time
)
result = await conn.execute(stmt)
return [row._asdict() for row in result.fetchall()]
async def remove_wandering_enemy(enemy_id: int):
"""Remove a wandering enemy (when engaged in combat or manually despawned)."""
async with engine.connect() as conn:
await conn.execute(wandering_enemies.delete().where(wandering_enemies.c.id == enemy_id))
await conn.commit()
async def cleanup_expired_wandering_enemies():
"""Remove all expired wandering enemies."""
async with engine.connect() as conn:
current_time = time.time()
result = await conn.execute(
wandering_enemies.delete().where(wandering_enemies.c.despawn_timestamp <= current_time)
)
await conn.commit()
return result.rowcount # Number of enemies despawned
async def get_wandering_enemy_count_in_location(location_id: str) -> int:
"""Count active wandering enemies at a location."""
async with engine.connect() as conn:
current_time = time.time()
from sqlalchemy import func
stmt = wandering_enemies.select().where(
wandering_enemies.c.location_id == location_id,
wandering_enemies.c.despawn_timestamp > current_time
)
result = await conn.execute(stmt)
return len(result.fetchall())
async def get_all_active_wandering_enemies():
"""Get all active wandering enemies across all locations."""
async with engine.connect() as conn:
current_time = time.time()
stmt = wandering_enemies.select().where(
wandering_enemies.c.despawn_timestamp > current_time
)
result = await conn.execute(stmt)
return [row._asdict() for row in result.fetchall()]

1268
bot/handlers.py Normal file

File diff suppressed because it is too large Load Diff

603
bot/keyboards.py Normal file
View File

@@ -0,0 +1,603 @@
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from data.world_loader import game_world
from data.items import ITEMS
# ... (main_menu_keyboard, move_keyboard are unchanged) ...
def main_menu_keyboard() -> InlineKeyboardMarkup:
keyboard = [[InlineKeyboardButton("🗺️ Move", callback_data="move_menu"), InlineKeyboardButton("👀 Inspect Area", callback_data="inspect_area")], [InlineKeyboardButton("👤 Profile", callback_data="profile"), InlineKeyboardButton("🎒 Inventory", callback_data="inventory_menu")]]
return InlineKeyboardMarkup(keyboard)
async def move_keyboard(current_location_id: str, player_id: int) -> InlineKeyboardMarkup:
"""
Create a movement keyboard with stamina costs.
Layout:
[ North (⚡5) ]
[ West (⚡5) ] [ East (⚡5) ]
[ South (⚡5) ]
[ Other exits (inside, down, etc.) ]
[ Back ]
"""
from bot import database, logic
keyboard = []
location = game_world.get_location(current_location_id)
player = await database.get_player(player_id)
inventory = await database.get_inventory(player_id)
if location and player:
# Dictionary to hold direction buttons
compass_directions = {}
other_exits = []
for direction, destination_id in location.exits.items():
destination = game_world.get_location(destination_id)
if destination:
# Calculate stamina cost for this specific route
stamina_cost = logic.calculate_travel_stamina_cost(player, inventory, location, destination)
# Map direction to emoji and label
direction_lower = direction.lower()
if direction_lower == "north":
emoji = "⬆️"
compass_directions["north"] = InlineKeyboardButton(
f"{emoji} {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
)
elif direction_lower == "south":
emoji = "⬇️"
compass_directions["south"] = InlineKeyboardButton(
f"{emoji} {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
)
elif direction_lower == "east":
emoji = "➡️"
compass_directions["east"] = InlineKeyboardButton(
f"{emoji} {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
)
elif direction_lower == "west":
emoji = "⬅️"
compass_directions["west"] = InlineKeyboardButton(
f"{emoji} {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
)
elif direction_lower == "northeast":
emoji = "↗️"
compass_directions["northeast"] = InlineKeyboardButton(
f"{emoji} {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
)
elif direction_lower == "northwest":
emoji = "↖️"
compass_directions["northwest"] = InlineKeyboardButton(
f"{emoji} {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
)
elif direction_lower == "southeast":
emoji = "↘️"
compass_directions["southeast"] = InlineKeyboardButton(
f"{emoji} {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
)
elif direction_lower == "southwest":
emoji = "↙️"
compass_directions["southwest"] = InlineKeyboardButton(
f"{emoji} {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
)
elif direction_lower == "inside":
emoji = "🚪"
other_exits.append(InlineKeyboardButton(
f"{emoji} Enter {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
))
elif direction_lower == "outside":
emoji = "🚪"
other_exits.append(InlineKeyboardButton(
f"{emoji} Exit to {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
))
elif direction_lower == "down":
emoji = "⬇️"
other_exits.append(InlineKeyboardButton(
f"{emoji} Descend to {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
))
elif direction_lower == "up":
emoji = "⬆️"
other_exits.append(InlineKeyboardButton(
f"{emoji} Ascend to {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
))
else:
# Generic fallback for any other direction
emoji = "🔀"
other_exits.append(InlineKeyboardButton(
f"{emoji} {destination.name} (⚡{stamina_cost})",
callback_data=f"move:{destination_id}"
))
# Build compass layout
# Row 1: Northwest, North, Northeast
top_row = []
if "northwest" in compass_directions:
top_row.append(compass_directions["northwest"])
if "north" in compass_directions:
top_row.append(compass_directions["north"])
if "northeast" in compass_directions:
top_row.append(compass_directions["northeast"])
if top_row:
keyboard.append(top_row)
# Row 2: West and/or East
middle_row = []
if "west" in compass_directions:
middle_row.append(compass_directions["west"])
if "east" in compass_directions:
middle_row.append(compass_directions["east"])
if middle_row:
keyboard.append(middle_row)
# Row 3: Southwest, South, Southeast
bottom_row = []
if "southwest" in compass_directions:
bottom_row.append(compass_directions["southwest"])
if "south" in compass_directions:
bottom_row.append(compass_directions["south"])
if "southeast" in compass_directions:
bottom_row.append(compass_directions["southeast"])
if bottom_row:
keyboard.append(bottom_row)
# Add other exits (inside, outside, up, down, etc.)
for exit_button in other_exits:
keyboard.append([exit_button])
keyboard.append([InlineKeyboardButton("⬅️ Back", callback_data="main_menu")])
return InlineKeyboardMarkup(keyboard)
async def inspect_keyboard(location_id: str, dropped_items: list, wandering_enemies: list = None) -> InlineKeyboardMarkup:
from bot import database
from data.npcs import NPCS
keyboard = []
location = game_world.get_location(location_id)
# Show wandering enemies first if present (in pairs, emoji only)
if wandering_enemies:
row = []
for enemy in wandering_enemies:
npc_def = NPCS.get(enemy['npc_id'])
if npc_def:
button = InlineKeyboardButton(
f"⚠️ {npc_def.emoji} {npc_def.name}",
callback_data=f"attack_wandering:{enemy['id']}"
)
row.append(button)
if len(row) == 2:
keyboard.append(row)
row = []
if row: # Add remaining enemy if odd number
keyboard.append(row)
if wandering_enemies:
keyboard.append([InlineKeyboardButton("--- Environment ---", callback_data="no_op")])
# Show interactables in pairs when text is short enough
if location:
row = []
for instance_id, interactable in location.interactables.items():
label = interactable.name
# Check if ANY action is available (not on cooldown)
has_available_action = False
for action_id in interactable.actions.keys():
cooldown_key = f"{instance_id}:{action_id}"
if await database.get_cooldown(cooldown_key) == 0:
has_available_action = True
break
if not has_available_action and len(interactable.actions) > 0:
label += ""
# Include location_id in callback data for efficient lookup
button = InlineKeyboardButton(label, callback_data=f"inspect:{location_id}:{instance_id}")
# If text is short (< 20 chars), try to pair it
if len(label) < 20:
row.append(button)
if len(row) == 2:
keyboard.append(row)
row = []
else:
# Long text, add any pending row first, then add this one alone
if row:
keyboard.append(row)
row = []
keyboard.append([button])
# Add remaining button if odd number
if row:
keyboard.append(row)
# Show player corpse bags
player_corpses = await database.get_player_corpses_in_location(location_id)
if player_corpses:
keyboard.append([InlineKeyboardButton("--- Fallen survivors ---", callback_data="no_op")])
row = []
for corpse in player_corpses:
button = InlineKeyboardButton(
f"🎒 {corpse['player_name']}'s bag",
callback_data=f"loot_player_corpse:{corpse['id']}"
)
row.append(button)
if len(row) == 2:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
# Show NPC corpses
npc_corpses = await database.get_npc_corpses_in_location(location_id)
if npc_corpses:
if not player_corpses: # Only add separator if not already added
keyboard.append([InlineKeyboardButton("--- Corpses ---", callback_data="no_op")])
row = []
for corpse in npc_corpses:
from data.npcs import NPCS
npc_def = NPCS.get(corpse['npc_id'])
if npc_def:
button = InlineKeyboardButton(
f"{npc_def.emoji} {npc_def.name}",
callback_data=f"scavenge_npc_corpse:{corpse['id']}"
)
row.append(button)
if len(row) == 2:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
if dropped_items:
keyboard.append([InlineKeyboardButton("--- Items on the ground ---", callback_data="no_op")])
row = []
for item in dropped_items:
item_def = ITEMS.get(item['item_id'], {})
emoji = item_def.get('emoji', '')
quantity_text = f" x{item['quantity']}" if item['quantity'] > 1 else ""
button = InlineKeyboardButton(
f"{emoji} {item_def.get('name', 'Unknown')}{quantity_text}",
callback_data=f"pickup_menu:{item['id']}"
)
row.append(button)
if len(row) == 2:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
keyboard.append([InlineKeyboardButton("⬅️ Back", callback_data="main_menu")])
return InlineKeyboardMarkup(keyboard)
def pickup_options_keyboard(item_id: int, item_name: str, quantity: int) -> InlineKeyboardMarkup:
"""Create pickup options keyboard with x1, x5, x10, and All options."""
keyboard = []
if quantity == 1:
# Just show a single "Pick" button for single items
keyboard.append([InlineKeyboardButton("📦 Pick", callback_data=f"pickup:{item_id}:1")])
else:
# Build pickup row with available options
pickup_row = [InlineKeyboardButton("📦 Pick x1", callback_data=f"pickup:{item_id}:1")]
if quantity >= 5:
pickup_row.append(InlineKeyboardButton("📦 Pick x5", callback_data=f"pickup:{item_id}:5"))
if quantity >= 10:
pickup_row.append(InlineKeyboardButton("📦 Pick x10", callback_data=f"pickup:{item_id}:10"))
# Split into rows if more than 2 buttons
if len(pickup_row) > 2:
keyboard.append(pickup_row[:2])
keyboard.append(pickup_row[2:])
else:
keyboard.append(pickup_row)
# Add "Pick All" option
keyboard.append([InlineKeyboardButton(f"📦 Pick All ({quantity})", callback_data=f"pickup:{item_id}:all")])
# Back button
keyboard.append([InlineKeyboardButton("⬅️ Back", callback_data="inspect_area")])
return InlineKeyboardMarkup(keyboard)
async def actions_keyboard(location_id: str, instance_id: str) -> InlineKeyboardMarkup:
from bot import database
keyboard = []
location = game_world.get_location(location_id)
if location:
interactable = location.get_interactable(instance_id)
if interactable:
for action_id, action in interactable.actions.items():
cooldown_key = f"{instance_id}:{action_id}"
cooldown = await database.get_cooldown(cooldown_key)
label = action.label
# Add stamina cost to the label
if action.stamina_cost > 0:
label += f" (⚡{action.stamina_cost})"
if cooldown > 0:
label += ""
keyboard.append([InlineKeyboardButton(label, callback_data=f"action:{location_id}:{instance_id}:{action_id}")])
keyboard.append([InlineKeyboardButton("⬅️ Back", callback_data=f"inspect_area_menu:{location_id}")])
return InlineKeyboardMarkup(keyboard)
# ... (inventory_keyboard, inventory_item_actions_keyboard are unchanged) ...
def inventory_keyboard(inventory_items: list) -> InlineKeyboardMarkup:
keyboard = []
if inventory_items:
# Categorize and sort items
# Group items by item_id and equipped status to handle stacking properly
item_groups = {}
for item in inventory_items:
item_def = ITEMS.get(item['item_id'], {})
item_type = item_def.get('type', 'resource')
item_name = item_def.get('name', 'Unknown')
is_equipped = item.get('is_equipped', False)
# Create a unique key for grouping: item_id + equipped status
group_key = (item['item_id'], is_equipped)
if group_key not in item_groups:
item_groups[group_key] = {
'name': item_name,
'def': item_def,
'type': item_type,
'is_equipped': is_equipped,
'items': []
}
item_groups[group_key]['items'].append(item)
# Categorize groups
equipped = []
consumables = []
weapons = []
equipment = []
resources = []
quest_items = []
for group_key, group_data in item_groups.items():
item_name = group_data['name']
item_def = group_data['def']
item_type = group_data['type']
is_equipped = group_data['is_equipped']
items_list = group_data['items']
# Calculate total quantity and weight/volume for this group
total_quantity = sum(itm['quantity'] for itm in items_list)
weight_per_item = item_def.get('weight', 0)
volume_per_item = item_def.get('volume', 0)
total_weight = weight_per_item * total_quantity
total_volume = volume_per_item * total_quantity
# Use the first item's ID for the callback (they're all the same item type)
first_item_id = items_list[0]['id']
# Create item data tuple: (name, item_def, first_item_id, quantity, weight, volume, is_equipped)
item_tuple = (item_name, item_def, first_item_id, total_quantity, total_weight, total_volume, is_equipped)
# Only equipped items go to equipped section
if is_equipped:
equipped.append(item_tuple)
elif item_type == 'consumable':
consumables.append(item_tuple)
elif item_type == 'weapon':
weapons.append(item_tuple)
elif item_type == 'equipment':
equipment.append(item_tuple)
elif item_type == 'quest':
quest_items.append(item_tuple)
else:
resources.append(item_tuple)
# Sort each category alphabetically by name
equipped.sort(key=lambda x: x[0])
consumables.sort(key=lambda x: x[0])
weapons.sort(key=lambda x: x[0])
equipment.sort(key=lambda x: x[0])
resources.sort(key=lambda x: x[0])
quest_items.sort(key=lambda x: x[0])
# Build keyboard sections
def add_section(section_name, items_list):
if items_list:
keyboard.append([InlineKeyboardButton(f"--- {section_name} ---", callback_data="no_op")])
row = []
for item_name, item_def, item_id, quantity, weight, volume, is_equipped in items_list:
emoji = item_def.get('emoji', '')
quantity_text = f" x{quantity}" if quantity > 1 else ""
equipped_marker = "" if is_equipped else ""
# Round to 2 decimals
weight_vol_text = f" ({weight:.2f}kg, {volume:.2f}vol)" if quantity > 0 else ""
button = InlineKeyboardButton(
f"{emoji} {item_name}{quantity_text}{equipped_marker}{weight_vol_text}",
callback_data=f"inventory_item:{item_id}"
)
row.append(button)
if len(row) == 2:
keyboard.append(row)
row = []
# Add remaining item if odd number
if row:
keyboard.append(row)
# Add sections in order
add_section("Equipped", equipped)
add_section("Consumables", consumables)
add_section("Weapons", weapons)
add_section("Equipment", equipment)
add_section("Resources", resources)
add_section("Quest Items", quest_items)
if not keyboard:
keyboard.append([InlineKeyboardButton("--- Inventory is empty ---", callback_data="no_op")])
else:
keyboard.append([InlineKeyboardButton("--- Inventory is empty ---", callback_data="no_op")])
keyboard.append([InlineKeyboardButton("⬅️ Back", callback_data="main_menu")])
return InlineKeyboardMarkup(keyboard)
def inventory_item_actions_keyboard(item_db_id: int, item_def: dict, is_equipped: bool = False, quantity: int = 1) -> InlineKeyboardMarkup:
keyboard = []
# Use button for consumables
if item_def.get('type') == 'consumable':
keyboard.append([InlineKeyboardButton("➡️ Use Item", callback_data=f"inventory_use:{item_db_id}")])
# Equip/Unequip button for weapons and equipment
if item_def.get('type') in ["weapon", "equipment"]:
if is_equipped:
keyboard.append([InlineKeyboardButton("❌ Unequip", callback_data=f"inventory_unequip:{item_db_id}")])
else:
keyboard.append([InlineKeyboardButton("✅ Equip", callback_data=f"inventory_equip:{item_db_id}")])
# Drop buttons - simplified for single items
if quantity == 1:
# Just show a single "Drop" button
keyboard.append([InlineKeyboardButton("🗑️ Drop", callback_data=f"inventory_drop:{item_db_id}:all")])
else:
# Show x1, x5, x10 options based on quantity
drop_row = [InlineKeyboardButton("🗑️ Drop x1", callback_data=f"inventory_drop:{item_db_id}:1")]
if quantity >= 5:
drop_row.append(InlineKeyboardButton("🗑️ Drop x5", callback_data=f"inventory_drop:{item_db_id}:5"))
if quantity >= 10:
drop_row.append(InlineKeyboardButton("🗑️ Drop x10", callback_data=f"inventory_drop:{item_db_id}:10"))
# Split into rows if more than 2 buttons
if len(drop_row) > 2:
keyboard.append(drop_row[:2])
keyboard.append(drop_row[2:])
else:
keyboard.append(drop_row)
# Add "Drop All" option
keyboard.append([InlineKeyboardButton(f"🗑️ Drop All ({quantity})", callback_data=f"inventory_drop:{item_db_id}:all")])
keyboard.append([InlineKeyboardButton("⬅️ Back to Inventory", callback_data="inventory_menu")])
return InlineKeyboardMarkup(keyboard)
async def combat_keyboard(player_id: int) -> InlineKeyboardMarkup:
"""Create combat action keyboard."""
from bot import database
keyboard = []
# Attack option
keyboard.append([InlineKeyboardButton("⚔️ Attack", callback_data="combat_attack")])
# Flee option
keyboard.append([InlineKeyboardButton("🏃 Try to Flee", callback_data="combat_flee")])
# Use item option (show consumables)
inventory_items = await database.get_inventory(player_id)
consumables = [item for item in inventory_items if ITEMS.get(item['item_id'], {}).get('type') == 'consumable']
if consumables:
keyboard.append([InlineKeyboardButton("💊 Use Item", callback_data="combat_use_item_menu")])
return InlineKeyboardMarkup(keyboard)
async def combat_items_keyboard(player_id: int) -> InlineKeyboardMarkup:
"""Show consumable items during combat."""
from bot import database
keyboard = []
inventory_items = await database.get_inventory(player_id)
consumables = [item for item in inventory_items if ITEMS.get(item['item_id'], {}).get('type') == 'consumable']
if consumables:
keyboard.append([InlineKeyboardButton("--- Select item to use ---", callback_data="no_op")])
for item in consumables:
item_def = ITEMS.get(item['item_id'], {})
emoji = item_def.get('emoji', '')
keyboard.append([InlineKeyboardButton(
f"{emoji} {item_def.get('name', 'Unknown')} x{item['quantity']}",
callback_data=f"combat_use_item:{item['id']}"
)])
keyboard.append([InlineKeyboardButton("⬅️ Back to Combat", callback_data="combat_back")])
return InlineKeyboardMarkup(keyboard)
def corpse_keyboard(corpse_id: int, corpse_type: str) -> InlineKeyboardMarkup:
"""Create keyboard for interacting with corpses."""
keyboard = []
if corpse_type == "player":
keyboard.append([InlineKeyboardButton("🎒 Loot Bag", callback_data=f"loot_player_corpse:{corpse_id}")])
else: # NPC corpse
keyboard.append([InlineKeyboardButton("🔪 Scavenge Corpse", callback_data=f"scavenge_npc_corpse:{corpse_id}")])
keyboard.append([InlineKeyboardButton("⬅️ Back", callback_data="inspect_area")])
return InlineKeyboardMarkup(keyboard)
def player_corpse_loot_keyboard(corpse_id: int, items: list) -> InlineKeyboardMarkup:
"""Show items in a player corpse bag."""
keyboard = []
if items:
keyboard.append([InlineKeyboardButton("--- Take items ---", callback_data="no_op")])
for i, item_data in enumerate(items):
item_def = ITEMS.get(item_data['item_id'], {})
emoji = item_def.get('emoji', '')
keyboard.append([InlineKeyboardButton(
f"{emoji} Take {item_def.get('name', 'Unknown')} x{item_data['quantity']}",
callback_data=f"take_corpse_item:{corpse_id}:{i}"
)])
else:
keyboard.append([InlineKeyboardButton("--- Bag is empty ---", callback_data="no_op")])
keyboard.append([InlineKeyboardButton("⬅️ Back", callback_data="inspect_area")])
return InlineKeyboardMarkup(keyboard)
def npc_corpse_scavenge_keyboard(corpse_id: int, loot_items: list) -> InlineKeyboardMarkup:
"""Show scavenging options for NPC corpse."""
keyboard = []
if loot_items:
keyboard.append([InlineKeyboardButton("--- Scavenge for materials ---", callback_data="no_op")])
for i, loot_data in enumerate(loot_items):
item_def = ITEMS.get(loot_data['item_id'], {})
emoji = item_def.get('emoji', '')
label = f"{emoji} {item_def.get('name', 'Unknown')}"
if loot_data.get('required_tool'):
tool_def = ITEMS.get(loot_data['required_tool'], {})
label += f" (needs {tool_def.get('name', 'tool')})"
keyboard.append([InlineKeyboardButton(
label,
callback_data=f"scavenge_corpse_item:{corpse_id}:{i}"
)])
else:
keyboard.append([InlineKeyboardButton("--- Nothing left to scavenge ---", callback_data="no_op")])
keyboard.append([InlineKeyboardButton("⬅️ Back", callback_data="inspect_area")])
return InlineKeyboardMarkup(keyboard)
def spend_points_keyboard() -> InlineKeyboardMarkup:
"""Create keyboard for spending stat points."""
keyboard = [
[
InlineKeyboardButton("❤️ Max HP (+10)", callback_data="spend_point:max_hp"),
InlineKeyboardButton("⚡ Stamina (+5)", callback_data="spend_point:max_stamina")
],
[
InlineKeyboardButton("💪 Strength (+1)", callback_data="spend_point:strength"),
InlineKeyboardButton("🏃 Agility (+1)", callback_data="spend_point:agility")
],
[
InlineKeyboardButton("💚 Endurance (+1)", callback_data="spend_point:endurance"),
InlineKeyboardButton("🧠 Intellect (+1)", callback_data="spend_point:intellect")
],
[InlineKeyboardButton("⬅️ Back to Profile", callback_data="profile")]
]
return InlineKeyboardMarkup(keyboard)

119
bot/logic.py Normal file
View File

@@ -0,0 +1,119 @@
import random
from typing import Tuple, Dict, Any
from data.items import ITEMS
from data.models import Action, Outcome
def calculate_inventory_load(player_inventory: list) -> Tuple[float, float]:
"""Calculates the total weight and volume of a player's inventory."""
total_weight = 0.0
total_volume = 0.0
for item in player_inventory:
item_def = ITEMS.get(item["item_id"])
if item_def:
total_weight += item_def["weight"] * item["quantity"]
total_volume += item_def["volume"] * item["quantity"]
return round(total_weight, 2), round(total_volume, 2)
def get_player_capacity(player_inventory: list, player_stats: dict) -> Tuple[float, float]:
"""Calculates the total carrying capacity of a player."""
base_weight_cap = player_stats['strength'] * 5 # Example formula
base_volume_cap = player_stats['strength'] * 2 # Example formula
for item in player_inventory:
if item["is_equipped"]:
item_def = ITEMS.get(item["item_id"])
if item_def and item_def.get("type") == "equipment":
effects = item_def.get("effects", {})
base_weight_cap += effects.get("capacity_weight", 0)
base_volume_cap += effects.get("capacity_volume", 0)
return base_weight_cap, base_volume_cap
def resolve_action(player_stats: dict, action_obj: Action) -> Outcome:
"""
Resolves a player action, like searching, based on stats and luck.
Returns the resulting Outcome object.
"""
# A simple success chance calculation
base_chance = 50 + (player_stats.get('intellect', 5) * 2)
roll = random.randint(1, 100)
outcome_key = "failure"
if roll <= 5 and "critical_failure" in action_obj.outcomes:
outcome_key = "critical_failure"
elif roll <= base_chance and "success" in action_obj.outcomes:
outcome_key = "success"
return action_obj.outcomes.get(outcome_key, action_obj.outcomes["failure"])
async def can_add_item_to_inventory(user_id: int, item_id: str, quantity: int) -> Tuple[bool, str]:
"""
Check if an item can be added to the player's inventory.
Returns (can_add, reason_if_not)
"""
from . import database
player = await database.get_player(user_id)
if not player:
return False, "Player not found."
inventory = await database.get_inventory(user_id)
item_def = ITEMS.get(item_id)
if not item_def:
return False, "Invalid item."
# Calculate current and projected weight/volume
current_weight, current_volume = calculate_inventory_load(inventory)
max_weight, max_volume = get_player_capacity(inventory, player)
item_weight = item_def["weight"] * quantity
item_volume = item_def["volume"] * quantity
new_weight = current_weight + item_weight
new_volume = current_volume + item_volume
if new_weight > max_weight:
return False, f"Too heavy! ({new_weight:.1f}/{max_weight:.1f} kg)"
if new_volume > max_volume:
return False, f"Not enough space! ({new_volume:.1f}/{max_volume:.1f} vol)"
return True, ""
def calculate_travel_stamina_cost(player: dict, inventory: list, from_location, to_location) -> int:
"""
Calculate stamina cost for traveling between locations.
Based on distance, endurance (reduces cost), and carried weight (increases cost).
Args:
player: Player stats dictionary
inventory: Player's inventory list
from_location: Location object being traveled from
to_location: Location object being traveled to
"""
from data.travel_helpers import calculate_base_stamina_cost
# Get base cost from shared helper (used by map and game)
distance_cost = calculate_base_stamina_cost(from_location, to_location)
# Endurance reduces cost (each point reduces by 0.5)
endurance_reduction = player['endurance'] * 0.5
# Calculate weight burden
current_weight, _ = calculate_inventory_load(inventory)
max_weight, _ = get_player_capacity(inventory, player)
# Weight penalty: if carrying more than 50% capacity, add extra cost
weight_ratio = current_weight / max_weight if max_weight > 0 else 0
weight_penalty = 0
if weight_ratio > 0.5:
# Each 10% over 50% adds 1 stamina
weight_penalty = int((weight_ratio - 0.5) * 10)
# Calculate final cost (minimum 3)
final_cost = max(3, int(distance_cost - endurance_reduction + weight_penalty))
return final_cost

119
bot/spawn_manager.py Normal file
View File

@@ -0,0 +1,119 @@
"""
Global Wandering Enemy Spawn Manager
Runs periodically to spawn/despawn enemies based on location danger levels.
"""
import asyncio
import logging
import random
from typing import Dict, List
from bot import database
from data.npcs import (
LOCATION_SPAWNS,
LOCATION_DANGER,
get_random_npc_for_location,
get_wandering_enemy_chance
)
logger = logging.getLogger(__name__)
# Configuration
SPAWN_CHECK_INTERVAL = 120 # Check every 2 minutes
ENEMY_LIFETIME = 600 # Enemies live for 10 minutes
MAX_ENEMIES_PER_LOCATION = {
0: 0, # Safe zones - no wandering enemies
1: 1, # Low danger - max 1 enemy
2: 2, # Medium danger - max 2 enemies
3: 3, # High danger - max 3 enemies
4: 4, # Extreme danger - max 4 enemies
}
def get_danger_level(location_id: str) -> int:
"""Get danger level for a location."""
danger_data = LOCATION_DANGER.get(location_id, (0, 0.0, 0.0))
return danger_data[0]
async def spawn_manager_loop():
"""
Main spawn manager loop.
Runs continuously, checking spawn conditions every SPAWN_CHECK_INTERVAL seconds.
"""
logger.info("🎲 Spawn Manager started")
while True:
try:
await asyncio.sleep(SPAWN_CHECK_INTERVAL)
# Clean up expired enemies first
despawned_count = await database.cleanup_expired_wandering_enemies()
if despawned_count > 0:
logger.info(f"🧹 Cleaned up {despawned_count} expired wandering enemies")
# Process each location
spawned_count = 0
for location_id, spawn_table in LOCATION_SPAWNS.items():
if not spawn_table:
continue # Skip locations with no spawns
# Get danger level and max enemies for this location
danger_level = get_danger_level(location_id)
max_enemies = MAX_ENEMIES_PER_LOCATION.get(danger_level, 0)
if max_enemies == 0:
continue # Skip safe zones
# Check current enemy count
current_count = await database.get_wandering_enemy_count_in_location(location_id)
if current_count >= max_enemies:
continue # Location is at capacity
# Calculate spawn chance based on wandering_enemy_chance
spawn_chance = get_wandering_enemy_chance(location_id)
# Attempt to spawn enemies up to max capacity
for _ in range(max_enemies - current_count):
if random.random() < spawn_chance:
# Spawn an enemy
npc_id = get_random_npc_for_location(location_id)
if npc_id:
await database.spawn_wandering_enemy(
npc_id=npc_id,
location_id=location_id,
lifetime_seconds=ENEMY_LIFETIME
)
spawned_count += 1
logger.info(f"👹 Spawned {npc_id} at {location_id} (current: {current_count + 1}/{max_enemies})")
if spawned_count > 0:
logger.info(f"✨ Spawn cycle complete: {spawned_count} enemies spawned")
except Exception as e:
logger.error(f"❌ Error in spawn manager loop: {e}", exc_info=True)
# Continue running even if there's an error
await asyncio.sleep(10)
async def start_spawn_manager():
"""Start the spawn manager as a background task."""
asyncio.create_task(spawn_manager_loop())
logger.info("🎮 Spawn Manager initialized")
async def get_spawn_stats() -> Dict:
"""Get statistics about current spawns (for debugging/monitoring)."""
all_enemies = await database.get_all_active_wandering_enemies()
# Count by location
location_counts = {}
for enemy in all_enemies:
loc = enemy['location_id']
location_counts[loc] = location_counts.get(loc, 0) + 1
return {
"total_active": len(all_enemies),
"by_location": location_counts,
"enemies": all_enemies
}

60
bot/utils.py Normal file
View File

@@ -0,0 +1,60 @@
"""
Utility functions and decorators for the bot.
"""
import os
import functools
import logging
from telegram import Update
from telegram.ext import ContextTypes
logger = logging.getLogger(__name__)
def get_admin_ids():
"""Get the list of admin user IDs from environment variable."""
admin_ids_str = os.getenv("ADMIN_IDS", "")
if not admin_ids_str:
logger.warning("ADMIN_IDS not set in .env file. No admins configured.")
return set()
try:
# Parse comma-separated list of IDs
admin_ids = set(int(id.strip()) for id in admin_ids_str.split(",") if id.strip())
return admin_ids
except ValueError as e:
logger.error(f"Error parsing ADMIN_IDS: {e}")
return set()
def admin_only(func):
"""
Decorator that restricts command to admin users only.
Usage:
@admin_only
async def my_admin_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
...
"""
@functools.wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id
admin_ids = get_admin_ids()
if user_id not in admin_ids:
await update.message.reply_html(
"🚫 <b>Access Denied</b>\n\n"
"This command is restricted to administrators only."
)
logger.warning(f"User {user_id} attempted to use admin command: {func.__name__}")
return
# User is admin, execute the command
return await func(update, context, *args, **kwargs)
return wrapper
def is_admin(user_id: int) -> bool:
"""Check if a user ID is an admin."""
admin_ids = get_admin_ids()
return user_id in admin_ids