Files
echoes-of-the-ash/web-map/server.py
2025-11-27 16:27:01 +01:00

1848 lines
67 KiB
Python

"""
Enhanced Map Server with Editor and Authentication
"""
from flask import Flask, jsonify, request, send_from_directory, session, redirect, url_for
from flask_cors import CORS
from pathlib import Path
import json
import os
import sys
import secrets
from werkzeug.utils import secure_filename
from functools import wraps
# Get the directory of this script
SCRIPT_DIR = Path(__file__).parent.resolve()
PARENT_DIR = SCRIPT_DIR.parent
# Add parent directory to path for imports
sys.path.insert(0, str(PARENT_DIR))
app = Flask(__name__, static_folder=str(SCRIPT_DIR))
app.secret_key = os.environ.get('EDITOR_SECRET_KEY', secrets.token_hex(32))
CORS(app)
# Configuration
ADMIN_PASSWORD = os.environ.get('EDITOR_PASSWORD', 'admin123') # Change this!
UPLOAD_FOLDER = PARENT_DIR / 'images' / 'locations'
UPLOAD_FOLDER.mkdir(parents=True, exist_ok=True)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
# JSON data files
GAMEDATA_DIR = PARENT_DIR / 'gamedata'
LOCATIONS_FILE = GAMEDATA_DIR / 'locations.json'
NPCS_FILE = GAMEDATA_DIR / 'npcs.json'
ITEMS_FILE = GAMEDATA_DIR / 'items.json'
INTERACTABLES_FILE = GAMEDATA_DIR / 'interactables.json'
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def load_json_file(filepath):
"""Helper to load JSON file"""
try:
with open(filepath, 'r') as f:
return json.load(f)
except FileNotFoundError:
return None
except Exception as e:
print(f"Error loading {filepath}: {e}")
return None
def save_json_file(filepath, data):
"""Helper to save JSON file"""
try:
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
return True
except Exception as e:
print(f"Error saving {filepath}: {e}")
return False
def require_auth(f):
"""Decorator to require authentication for editor routes"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('authenticated'):
return jsonify({'error': 'Unauthorized'}), 401
return f(*args, **kwargs)
return decorated_function
# ==================== AUTHENTICATION ROUTES ====================
@app.route('/api/login', methods=['POST'])
def login():
"""Authenticate user with password"""
data = request.get_json()
password = data.get('password', '')
if password == ADMIN_PASSWORD:
session['authenticated'] = True
session.permanent = True
return jsonify({'success': True, 'message': 'Authenticated successfully'})
else:
return jsonify({'success': False, 'message': 'Invalid password'}), 401
@app.route('/api/logout', methods=['POST'])
def logout():
"""Logout user"""
session.pop('authenticated', None)
return jsonify({'success': True, 'message': 'Logged out'})
@app.route('/api/editor/restart-bot', methods=['POST'])
@require_auth
def restart_bot():
"""Restart the Telegram bot container"""
try:
import subprocess
# Try to restart the bot container
result = subprocess.run(
['docker', 'restart', 'echoes_of_the_ashes_api'],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return jsonify({
'success': True,
'message': 'Bot restarted successfully'
})
else:
return jsonify({
'success': False,
'error': f'Docker restart failed: {result.stderr}'
}), 500
except subprocess.TimeoutExpired:
return jsonify({
'success': False,
'error': 'Restart command timed out'
}), 500
except FileNotFoundError:
return jsonify({
'success': False,
'error': 'Docker command not found. Is Docker installed?'
}), 500
except Exception as e:
return jsonify({
'success': False,
'error': f'Unexpected error: {str(e)}'
}), 500
@app.route('/api/editor/bot-logs', methods=['GET'])
@require_auth
def get_bot_logs():
"""Get logs from the Telegram bot container"""
try:
import subprocess
# Get number of lines from query parameter (default 100)
lines = request.args.get('lines', '100')
try:
lines = int(lines)
if lines < 1 or lines > 1000:
lines = 100
except ValueError:
lines = 100
# Get logs from the bot container
result = subprocess.run(
['docker', 'logs', 'echoes_of_the_ashes_api', '--tail', str(lines)],
capture_output=True,
text=True,
timeout=10
)
# Combine stdout and stderr (Docker logs can use both)
logs = result.stdout + result.stderr
return jsonify({
'success': True,
'logs': logs,
'lines': lines
})
except subprocess.TimeoutExpired:
return jsonify({
'success': False,
'error': 'Logs command timed out'
}), 500
except FileNotFoundError:
return jsonify({
'success': False,
'error': 'Docker command not found. Is Docker installed?'
}), 500
except Exception as e:
return jsonify({
'success': False,
'error': f'Unexpected error: {str(e)}'
}), 500
@app.route('/api/check-auth', methods=['GET'])
def check_auth():
"""Check if user is authenticated"""
return jsonify({'authenticated': session.get('authenticated', False)})
# ==================== PUBLIC ROUTES ====================
@app.route('/')
def index():
"""Serve the main map viewer"""
return send_from_directory(SCRIPT_DIR, 'index.html')
@app.route('/editor')
def editor():
"""Serve the map editor (requires auth)"""
return send_from_directory(SCRIPT_DIR, 'editor.html')
@app.route('/<path:path>')
def serve_static(path):
"""Serve static files"""
return send_from_directory(SCRIPT_DIR, path)
@app.route('/images/<path:path>')
def serve_images(path):
"""Serve images from parent directory"""
return send_from_directory(PARENT_DIR / 'images', path)
@app.route('/api/map-data', methods=['GET'])
@app.route('/map_data.json', methods=['GET'])
def get_map_data():
"""Get current map data (exported from world_loader which now loads from JSON)"""
try:
from data.world_loader import export_map_data
map_data = export_map_data()
return jsonify(map_data)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ==================== EDITOR API ROUTES (PROTECTED) ====================
@app.route('/api/editor/locations', methods=['GET'])
@require_auth
def get_locations():
"""Get all locations with full details"""
try:
# Load directly from JSON files to get latest data
locations_data = load_json_file(LOCATIONS_FILE)
npcs_data = load_json_file(NPCS_FILE)
if not locations_data:
return jsonify({'locations': []})
locations = []
for loc_data in locations_data.get('locations', []):
location_id = loc_data.get('id')
# Get danger config
danger_config = locations_data.get('danger_config', {}).get(location_id, {})
# Get spawn config
spawn_config = locations_data.get('spawn_config', {}).get(location_id, [])
spawn_npcs = []
for spawn_entry in spawn_config:
npc_id = spawn_entry.get('npc_id')
weight = spawn_entry.get('weight', 50)
# Get NPC details
npc_def = npcs_data.get('npcs', {}).get(npc_id, {})
if npc_def:
spawn_npcs.append({
'npc_id': npc_id,
'name': npc_def.get('name', npc_id),
'emoji': npc_def.get('emoji', '👹'),
'weight': weight
})
# Get connections from locations.json
exits = []
for conn in locations_data.get('connections', []):
if conn.get('from') == location_id:
exits.append(conn.get('to'))
locations.append({
'id': location_id,
'name': loc_data.get('name', ''),
'description': loc_data.get('description', ''),
'image_path': loc_data.get('image_path', ''),
'x': loc_data.get('x', 0.0),
'y': loc_data.get('y', 0.0),
'danger_level': danger_config.get('danger_level', 0),
'encounter_rate': danger_config.get('encounter_rate', 0.0),
'wandering_chance': danger_config.get('wandering_chance', 0.0),
'spawn_npcs': spawn_npcs,
'exits': exits,
'interactables_count': len(loc_data.get('interactables', {}))
})
return jsonify({'locations': locations})
except Exception as e:
import traceback
print(f"Error loading locations: {e}\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/location/<location_id>', methods=['GET'])
@require_auth
def get_location_detail(location_id):
"""Get detailed information about a specific location"""
try:
# Load directly from JSON to get latest data
locations_data = load_json_file(LOCATIONS_FILE)
npcs_data = load_json_file(NPCS_FILE)
if not locations_data:
return jsonify({'error': 'Locations file not found'}), 404
# Find the location in the list
location = None
for loc in locations_data.get('locations', []):
if loc.get('id') == location_id:
location = loc
break
if not location:
return jsonify({'error': 'Location not found'}), 404
# Get danger config
danger_config = locations_data.get('danger_config', {}).get(location_id, {})
# Get spawn config
spawn_config = locations_data.get('spawn_config', {}).get(location_id, [])
spawn_npcs = []
for spawn_entry in spawn_config:
npc_id = spawn_entry.get('npc_id')
weight = spawn_entry.get('weight', 50)
# Get NPC details
npc_def = npcs_data.get('npcs', {}).get(npc_id, {})
if npc_def:
spawn_npcs.append({
'npc_id': npc_id,
'name': npc_def.get('name', npc_id),
'emoji': npc_def.get('emoji', '👹'),
'weight': weight
})
# Get exits from connections
exits = {}
for conn in locations_data.get('connections', []):
if conn.get('from') == location_id:
to_id = conn.get('to')
exits[to_id] = {
'stamina_cost': conn.get('stamina_cost', 1),
'distance': conn.get('distance', 1.0)
}
return jsonify({
'id': location_id,
'name': location.get('name', ''),
'description': location.get('description', ''),
'image_path': location.get('image_path', ''),
'x': location.get('x', 0.0),
'y': location.get('y', 0.0),
'danger_level': danger_config.get('danger_level', 0),
'encounter_rate': danger_config.get('encounter_rate', 0.0),
'wandering_chance': danger_config.get('wandering_chance', 0.0),
'spawn_npcs': spawn_npcs,
'exits': exits,
'interactables': location.get('interactables', {})
})
except Exception as e:
import traceback
print(f"Error loading location detail: {e}\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/location', methods=['POST'])
@require_auth
def update_location():
"""Update or create a location"""
try:
data = request.get_json()
location_id = data.get('id')
print(f"[DEBUG] Received location data: {json.dumps(data, indent=2)}")
# Load or create config
config = {}
if LOCATIONS_FILE.exists():
with open(LOCATIONS_FILE, 'r') as f:
config = json.load(f)
# Ensure proper structure - locations is a list, others are dicts
if 'locations' not in config or not isinstance(config['locations'], list):
config['locations'] = []
if 'danger_config' not in config or not isinstance(config['danger_config'], dict):
config['danger_config'] = {}
if 'spawn_config' not in config or not isinstance(config['spawn_config'], dict):
config['spawn_config'] = {}
# Find or create location in the list
location_index = None
for i, loc in enumerate(config['locations']):
if loc.get('id') == location_id:
location_index = i
break
location_data = {
'id': location_id,
'name': data.get('name'),
'description': data.get('description'),
'image_path': data.get('image_path'),
'x': data.get('x'),
'y': data.get('y'),
'interactables': data.get('interactables', {})
}
if location_index is not None:
# Update existing location
config['locations'][location_index] = location_data
else:
# Add new location
config['locations'].append(location_data)
# Update danger config
config['danger_config'][location_id] = {
'danger_level': data.get('danger_level', 0),
'encounter_rate': data.get('encounter_rate', 0.0),
'wandering_chance': data.get('wandering_chance', 0.0)
}
# Update spawn config
spawn_npcs = data.get('spawn_npcs', [])
print(f"[DEBUG] spawn_npcs type: {type(spawn_npcs)}, value: {spawn_npcs}")
if spawn_npcs and isinstance(spawn_npcs, list):
config['spawn_config'][location_id] = [
{'npc_id': npc.get('npc_id', npc) if isinstance(npc, dict) else npc,
'weight': npc.get('weight', 50) if isinstance(npc, dict) else 50}
for npc in spawn_npcs
]
else:
config['spawn_config'][location_id] = []
# Save config
with open(LOCATIONS_FILE, 'w') as f:
json.dump(config, f, indent=2)
return jsonify({'success': True, 'message': 'Location updated successfully'})
except Exception as e:
import traceback
error_msg = f"[ERROR] Failed to save location: {e}\n{traceback.format_exc()}"
print(error_msg, flush=True)
# Also write to a file for debugging
try:
with open('/tmp/location_save_error.txt', 'w') as f:
f.write(error_msg)
f.write(f"\n\nReceived data: {json.dumps(data, indent=2)}")
except:
pass
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/location/<location_id>', methods=['DELETE'])
@require_auth
def delete_location(location_id):
"""Delete a location from config"""
try:
if not LOCATIONS_FILE.exists():
return jsonify({'error': 'No config file found'}), 404
with open(LOCATIONS_FILE, 'r') as f:
config = json.load(f)
# Remove location from locations array
if 'locations' in config:
config['locations'] = [loc for loc in config['locations'] if loc.get('id') != location_id]
# Remove from connections array (both from and to)
if 'connections' in config:
config['connections'] = [
conn for conn in config['connections']
if conn.get('from') != location_id and conn.get('to') != location_id
]
# Remove from danger_config
if 'danger_config' in config and location_id in config['danger_config']:
del config['danger_config'][location_id]
# Remove from spawn_config
if 'spawn_config' in config and location_id in config['spawn_config']:
del config['spawn_config'][location_id]
with open(LOCATIONS_FILE, 'w') as f:
json.dump(config, f, indent=2)
return jsonify({'success': True, 'message': 'Location deleted successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/available-npcs', methods=['GET'])
@require_auth
def get_available_npcs():
"""Get list of all available NPCs for spawning"""
try:
# Load directly from JSON to get latest data
npcs_data = load_json_file(NPCS_FILE)
if not npcs_data:
return jsonify({'npcs': []})
npcs = []
for npc_id, npc_def in npcs_data.get('npcs', {}).items():
npcs.append({
'id': npc_id,
'name': npc_def.get('name', npc_id),
'emoji': npc_def.get('emoji', '👹'),
'hp_range': [npc_def.get('hp_min', 10), npc_def.get('hp_max', 20)],
'damage_range': [npc_def.get('damage_min', 1), npc_def.get('damage_max', 5)],
'xp_reward': npc_def.get('xp_reward', 10)
})
return jsonify({'npcs': npcs})
except Exception as e:
import traceback
print(f"Error loading available NPCs: {e}\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/upload-image', methods=['POST'])
@require_auth
def upload_image():
"""Upload a location image"""
try:
if 'image' not in request.files:
return jsonify({'error': 'No image file provided'}), 400
file = request.files['image']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = UPLOAD_FOLDER / filename
file.save(str(filepath))
# Return relative path
relative_path = f'images/locations/{filename}'
return jsonify({
'success': True,
'image_path': relative_path,
'message': f'Image uploaded successfully: {filename}'
})
else:
return jsonify({'error': 'Invalid file type'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/export-config', methods=['GET'])
@require_auth
def export_config():
"""Export current config as downloadable JSON"""
try:
if LOCATIONS_FILE.exists():
with open(LOCATIONS_FILE, 'r') as f:
config = json.load(f)
return jsonify(config)
else:
return jsonify({'locations': {}, 'danger_config': {}, 'spawn_config': {}})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/import-config', methods=['POST'])
@require_auth
def import_config():
"""Import config from JSON"""
try:
config = request.get_json()
with open(LOCATIONS_FILE, 'w') as f:
json.dump(config, f, indent=2)
return jsonify({'success': True, 'message': 'Configuration imported successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/connections', methods=['GET'])
@require_auth
def get_connections():
"""Get all connections between locations"""
try:
import math
# Load directly from JSON file to get latest data
locations_data = load_json_file(LOCATIONS_FILE)
if not locations_data:
return jsonify({'connections': []})
# Build location lookup for calculating distances
locations_by_id = {}
for loc in locations_data.get('locations', []):
locations_by_id[loc['id']] = loc
connections = []
for conn in locations_data.get('connections', []):
from_id = conn.get('from')
to_id = conn.get('to')
direction = conn.get('direction')
from_loc = locations_by_id.get(from_id)
to_loc = locations_by_id.get(to_id)
if from_loc and to_loc:
distance = math.sqrt((to_loc.get('x', 0) - from_loc.get('x', 0))**2 +
(to_loc.get('y', 0) - from_loc.get('y', 0))**2)
connections.append({
'from': from_id,
'to': to_id,
'direction': direction,
'distance': round(distance, 2)
})
return jsonify({'connections': connections})
except Exception as e:
import traceback
print(f"Error loading connections: {e}\n{traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/connection', methods=['POST'])
@require_auth
def add_connection():
"""Add a connection between two locations"""
try:
data = request.get_json()
from_id = data.get('from')
to_id = data.get('to')
direction = data.get('direction')
# Load or create config
config = {}
if LOCATIONS_FILE.exists():
with open(LOCATIONS_FILE, 'r') as f:
config = json.load(f)
if 'connections' not in config:
config['connections'] = []
# Add connection
config['connections'].append({
'from': from_id,
'to': to_id,
'direction': direction
})
with open(LOCATIONS_FILE, 'w') as f:
json.dump(config, f, indent=2)
return jsonify({'success': True, 'message': 'Connection added successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/connection', methods=['DELETE'])
@require_auth
def delete_connection():
"""Delete a connection between two locations"""
try:
data = request.get_json()
from_id = data.get('from')
to_id = data.get('to')
if not LOCATIONS_FILE.exists():
return jsonify({'error': 'No config file found'}), 404
with open(LOCATIONS_FILE, 'r') as f:
config = json.load(f)
if 'connections' in config:
config['connections'] = [
conn for conn in config['connections']
if not (conn['from'] == from_id and conn['to'] == to_id)
]
with open(LOCATIONS_FILE, 'w') as f:
json.dump(config, f, indent=2)
return jsonify({'success': True, 'message': 'Connection deleted successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ==================== NPC MANAGEMENT ====================
@app.route('/api/editor/npcs', methods=['GET'])
@require_auth
def get_npcs():
"""Get all NPCs"""
try:
npcs_data = load_json_file(NPCS_FILE)
if not npcs_data:
return jsonify({'npcs': []})
npcs = []
for npc_id, npc_def in npcs_data.get('npcs', {}).items():
npcs.append({
'id': npc_id,
'name': npc_def.get('name', npc_id),
'emoji': npc_def.get('emoji', '👹'),
'hp_min': npc_def.get('hp_min', 10),
'hp_max': npc_def.get('hp_max', 20),
'damage_min': npc_def.get('damage_min', 1),
'damage_max': npc_def.get('damage_max', 5),
'xp_reward': npc_def.get('xp_reward', 10),
'defense': npc_def.get('defense', 0),
'description': npc_def.get('description', ''),
'image_path': npc_def.get('image_path', ''),
'loot_table': npc_def.get('loot_table', []),
'corpse_loot': npc_def.get('corpse_loot', [])
})
return jsonify({'npcs': npcs})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/npc', methods=['POST'])
@require_auth
def save_npc():
"""Save or update an NPC"""
try:
npc_data = request.get_json()
npc_id = npc_data.get('id')
# Load current NPCs
npcs_data = load_json_file(NPCS_FILE) or {'npcs': {}, 'danger_levels': {}, 'spawn_tables': {}}
if 'npcs' not in npcs_data:
npcs_data['npcs'] = {}
# Update NPC
npcs_data['npcs'][npc_id] = {
'name': npc_data.get('name'),
'emoji': npc_data.get('emoji'),
'hp_min': npc_data.get('hp_min'),
'hp_max': npc_data.get('hp_max'),
'damage_min': npc_data.get('damage_min'),
'damage_max': npc_data.get('damage_max'),
'xp_reward': npc_data.get('xp_reward'),
'defense': npc_data.get('defense', 0),
'description': npc_data.get('description', ''),
'image_path': npc_data.get('image_path', ''),
'loot_table': npc_data.get('loot_table', []),
'corpse_loot': npc_data.get('corpse_loot', [])
}
# Save back
if save_json_file(NPCS_FILE, npcs_data):
return jsonify({'success': True, 'message': 'NPC saved successfully'})
else:
return jsonify({'error': 'Failed to save NPC'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/npc/<npc_id>', methods=['DELETE'])
@require_auth
def delete_npc(npc_id):
"""Delete an NPC"""
try:
npcs_data = load_json_file(NPCS_FILE)
if not npcs_data or 'npcs' not in npcs_data:
return jsonify({'error': 'NPCs file not found'}), 404
if npc_id in npcs_data['npcs']:
del npcs_data['npcs'][npc_id]
if save_json_file(NPCS_FILE, npcs_data):
return jsonify({'success': True, 'message': 'NPC deleted successfully'})
else:
return jsonify({'error': 'Failed to delete NPC'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
# ==================== ITEM MANAGEMENT ====================
@app.route('/api/editor/items', methods=['GET'])
@require_auth
def get_items():
"""Get all items with complete field set"""
try:
items_data = load_json_file(ITEMS_FILE)
if not items_data:
return jsonify({'items': []})
items = []
for item_id, item_def in items_data.get('items', {}).items():
# Build complete item object with all possible fields
item = {
'id': item_id,
'name': item_def.get('name', item_id),
'emoji': item_def.get('emoji', ''),
'type': item_def.get('type', 'resource'),
'weight': item_def.get('weight', 0.1),
'volume': item_def.get('volume', 0.1),
'description': item_def.get('description', ''),
'image_path': item_def.get('image_path', ''),
# Consumable properties
'stackable': item_def.get('stackable', True),
'hp_restore': item_def.get('hp_restore', 0),
'stamina_restore': item_def.get('stamina_restore', 0),
'treats': item_def.get('treats', ''),
# Equipment properties
'equippable': item_def.get('equippable', False),
'slot': item_def.get('slot', ''),
'durability': item_def.get('durability', 0),
'tier': item_def.get('tier', 0),
'encumbrance': item_def.get('encumbrance', 0),
# Stats object
'stats': item_def.get('stats', {}),
# Weapon effects
'weapon_effects': item_def.get('weapon_effects', {}),
# Crafting system
'craftable': item_def.get('craftable', False),
'craft_level': item_def.get('craft_level', 0),
'craft_materials': item_def.get('craft_materials', []),
'craft_tools': item_def.get('craft_tools', []),
# Repair system
'repairable': item_def.get('repairable', False),
'repair_materials': item_def.get('repair_materials', []),
'repair_tools': item_def.get('repair_tools', []),
'repair_percentage': item_def.get('repair_percentage', 0),
# Uncrafting system
'uncraftable': item_def.get('uncraftable', False),
'uncraft_yield': item_def.get('uncraft_yield', []),
'uncraft_loss_chance': item_def.get('uncraft_loss_chance', 0),
'uncraft_tools': item_def.get('uncraft_tools', []),
}
items.append(item)
return jsonify({'items': items})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/item', methods=['POST'])
@require_auth
def save_item():
"""Save or update an item with complete field support"""
try:
item_data = request.get_json()
item_id = item_data.get('id')
# Load current items
items_data = load_json_file(ITEMS_FILE) or {'items': {}}
if 'items' not in items_data:
items_data['items'] = {}
# Build item dict with required fields
item_entry = {
'name': item_data.get('name'),
'type': item_data.get('type'),
'weight': item_data.get('weight'),
'volume': item_data.get('volume'),
}
# Add optional string fields
if item_data.get('emoji'):
item_entry['emoji'] = item_data.get('emoji')
if item_data.get('description'):
item_entry['description'] = item_data.get('description')
if item_data.get('image_path'):
item_entry['image_path'] = item_data.get('image_path')
if item_data.get('treats'):
item_entry['treats'] = item_data.get('treats')
# Stackable defaults to True, only include if False
if not item_data.get('stackable', True):
item_entry['stackable'] = False
# Consumable properties
if item_data.get('hp_restore', 0) != 0:
item_entry['hp_restore'] = item_data.get('hp_restore')
if item_data.get('stamina_restore', 0) != 0:
item_entry['stamina_restore'] = item_data.get('stamina_restore')
# Equipment properties
if item_data.get('equippable'):
item_entry['equippable'] = True
if item_data.get('slot'):
item_entry['slot'] = item_data.get('slot')
if item_data.get('durability', 0) != 0:
item_entry['durability'] = item_data.get('durability')
if item_data.get('tier', 0) != 0:
item_entry['tier'] = item_data.get('tier')
if item_data.get('encumbrance', 0) != 0:
item_entry['encumbrance'] = item_data.get('encumbrance')
# Stats object (only include if not empty)
stats = item_data.get('stats', {})
if stats and isinstance(stats, dict) and len(stats) > 0:
item_entry['stats'] = stats
# Weapon effects object (only include if not empty)
weapon_effects = item_data.get('weapon_effects', {})
if weapon_effects and isinstance(weapon_effects, dict) and len(weapon_effects) > 0:
item_entry['weapon_effects'] = weapon_effects
# Crafting system
if item_data.get('craftable'):
item_entry['craftable'] = True
if item_data.get('craft_level', 0) != 0:
item_entry['craft_level'] = item_data.get('craft_level')
craft_materials = item_data.get('craft_materials', [])
if craft_materials and isinstance(craft_materials, list) and len(craft_materials) > 0:
item_entry['craft_materials'] = craft_materials
craft_tools = item_data.get('craft_tools', [])
if craft_tools and isinstance(craft_tools, list) and len(craft_tools) > 0:
item_entry['craft_tools'] = craft_tools
# Repair system
if item_data.get('repairable'):
item_entry['repairable'] = True
repair_materials = item_data.get('repair_materials', [])
if repair_materials and isinstance(repair_materials, list) and len(repair_materials) > 0:
item_entry['repair_materials'] = repair_materials
repair_tools = item_data.get('repair_tools', [])
if repair_tools and isinstance(repair_tools, list) and len(repair_tools) > 0:
item_entry['repair_tools'] = repair_tools
if item_data.get('repair_percentage', 0) != 0:
item_entry['repair_percentage'] = item_data.get('repair_percentage')
# Uncrafting system
if item_data.get('uncraftable'):
item_entry['uncraftable'] = True
uncraft_yield = item_data.get('uncraft_yield', [])
if uncraft_yield and isinstance(uncraft_yield, list) and len(uncraft_yield) > 0:
item_entry['uncraft_yield'] = uncraft_yield
if item_data.get('uncraft_loss_chance', 0) != 0:
item_entry['uncraft_loss_chance'] = item_data.get('uncraft_loss_chance')
uncraft_tools = item_data.get('uncraft_tools', [])
if uncraft_tools and isinstance(uncraft_tools, list) and len(uncraft_tools) > 0:
item_entry['uncraft_tools'] = uncraft_tools
# Update item
items_data['items'][item_id] = item_entry
# Save back
if save_json_file(ITEMS_FILE, items_data):
return jsonify({'success': True, 'message': 'Item saved successfully'})
else:
return jsonify({'error': 'Failed to save item'}), 500
except Exception as e:
import traceback
print(f"Error saving item: {e}\n{traceback.format_exc()}", flush=True)
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/item/<item_id>', methods=['DELETE'])
@require_auth
def delete_item(item_id):
"""Delete an item"""
try:
items_data = load_json_file(ITEMS_FILE)
if not items_data or 'items' not in items_data:
return jsonify({'error': 'Items file not found'}), 404
if item_id in items_data['items']:
del items_data['items'][item_id]
if save_json_file(ITEMS_FILE, items_data):
return jsonify({'success': True, 'message': 'Item deleted successfully'})
else:
return jsonify({'error': 'Failed to delete item'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
# ==================== INTERACTABLES ENDPOINTS ====================
@app.route('/api/editor/interactables', methods=['GET'])
@require_auth
def get_interactables():
"""Get all interactable templates"""
try:
interactables_data = load_json_file(INTERACTABLES_FILE)
if not interactables_data:
return jsonify({'interactables': []})
interactables = []
for inter_id, inter_def in interactables_data.get('interactables', {}).items():
interactables.append({
'id': inter_id,
'name': inter_def.get('name', inter_id),
'description': inter_def.get('description', ''),
'image_path': inter_def.get('image_path', ''),
'actions': inter_def.get('actions', {})
})
return jsonify({'interactables': interactables})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/interactable', methods=['POST'])
@require_auth
def save_interactable():
"""Save or update an interactable template"""
try:
inter_data = request.get_json()
inter_id = inter_data.get('id')
# Load current interactables
interactables_data = load_json_file(INTERACTABLES_FILE) or {'interactables': {}}
if 'interactables' not in interactables_data:
interactables_data['interactables'] = {}
# Update interactable
interactables_data['interactables'][inter_id] = {
'id': inter_id,
'name': inter_data.get('name'),
'description': inter_data.get('description', ''),
'image_path': inter_data.get('image_path', ''),
'actions': inter_data.get('actions', {})
}
# Save back
if save_json_file(INTERACTABLES_FILE, interactables_data):
return jsonify({'success': True, 'message': 'Interactable saved successfully'})
else:
return jsonify({'error': 'Failed to save interactable'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/interactable/<inter_id>', methods=['DELETE'])
@require_auth
def delete_interactable(inter_id):
"""Delete an interactable template"""
try:
interactables_data = load_json_file(INTERACTABLES_FILE)
if not interactables_data or 'interactables' not in interactables_data:
return jsonify({'error': 'Interactables file not found'}), 404
if inter_id in interactables_data['interactables']:
del interactables_data['interactables'][inter_id]
if save_json_file(INTERACTABLES_FILE, interactables_data):
return jsonify({'success': True, 'message': 'Interactable deleted successfully'})
else:
return jsonify({'error': 'Failed to delete interactable'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/editor/live-stats', methods=['GET'])
@require_auth
def get_live_stats():
"""Get real-time player locations and enemy counts"""
# Try to import database module - may not be available in map container
sys.path.insert(0, str(PARENT_DIR))
try:
from bot import database
except ImportError:
# Bot module not available in this container
# Return empty stats - this is expected in map-only container
return jsonify({
'players_by_location': {},
'enemies_by_location': {}
})
import asyncio
from sqlalchemy import text
# Run async queries
async def get_stats():
players_by_location = {}
enemies_by_location = {}
# Get all players and their locations
try:
async with database.engine.connect() as conn:
# Get player counts per location
result = await conn.execute(text(
"SELECT location_id, COUNT(*) as count FROM characters GROUP BY location_id"
))
rows = result.fetchall()
for row in rows:
players_by_location[row[0]] = row[1]
# Get wandering enemy counts per location
result = await conn.execute(text(
"SELECT location_id, COUNT(*) as count FROM wandering_enemies WHERE despawn_timestamp > EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) GROUP BY location_id"
))
rows = result.fetchall()
for row in rows:
enemies_by_location[row[0]] = row[1]
except Exception as e:
print(f"[LIVE-STATS] Database query error: {e}", flush=True)
import traceback
traceback.print_exc()
return players_by_location, enemies_by_location
# Run the async function
try:
players, enemies = asyncio.run(get_stats())
#print(f"[LIVE-STATS] Success! Players: {players}, Enemies: {enemies}", flush=True)
except Exception as e:
#print(f"[LIVE-STATS] asyncio.run error: {e}", flush=True)
import traceback
traceback.print_exc()
return jsonify({'error': str(e), 'players_by_location': {}, 'enemies_by_location': {}}), 200
return jsonify({
'players_by_location': players,
'enemies_by_location': enemies
})
# Export to JSON endpoint removed to prevent accidental data loss
# The editor now works directly with JSON files
@app.route('/api/editor/export-python', methods=['GET'])
@require_auth
def export_python():
"""Export configuration as Python code to update world_loader.py"""
try:
if not LOCATIONS_FILE.exists():
return jsonify({'error': 'No configuration to export'}), 404
with open(LOCATIONS_FILE, 'r') as f:
config = json.load(f)
# Generate Python code
python_code = generate_world_loader_code(config)
return jsonify({
'success': True,
'python_code': python_code,
'message': 'Python code generated. Review and manually update world_loader.py'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
def generate_world_loader_code(config):
"""Generate Python code from config"""
code_lines = ["# Generated configuration updates\n", "# REVIEW CAREFULLY before applying!\n\n"]
# Generate location updates
if 'locations' in config:
code_lines.append("# ===== LOCATION UPDATES =====\n")
for loc_id, loc_data in config['locations'].items():
code_lines.append(f"\n# Update {loc_id}\n")
code_lines.append(f"{loc_id}.name = {repr(loc_data['name'])}\n")
code_lines.append(f"{loc_id}.description = {repr(loc_data['description'])}\n")
code_lines.append(f"{loc_id}.x = {loc_data['x']}\n")
code_lines.append(f"{loc_id}.y = {loc_data['y']}\n")
if loc_data.get('image_path'):
code_lines.append(f"{loc_id}.image_path = {repr(loc_data['image_path'])}\n")
# Generate danger config
if 'danger_config' in config:
code_lines.append("\n# ===== DANGER CONFIG (for npcs.py) =====\n")
code_lines.append("LOCATION_DANGER = {\n")
for loc_id, danger_data in config['danger_config'].items():
code_lines.append(f" {repr(loc_id)}: ({danger_data['danger_level']}, {danger_data['encounter_rate']}, {danger_data['wandering_chance']}),\n")
code_lines.append("}\n")
# Generate spawn config
if 'spawn_config' in config:
code_lines.append("\n# ===== SPAWN CONFIG (for npcs.py) =====\n")
code_lines.append("LOCATION_SPAWNS = {\n")
for loc_id, spawns in config['spawn_config'].items():
code_lines.append(f" {repr(loc_id)}: [\n")
for spawn in spawns:
code_lines.append(f" ({repr(spawn['npc_id'])}, {spawn['weight']}),\n")
code_lines.append(" ],\n")
code_lines.append("}\n")
# Generate connections
if 'connections' in config:
code_lines.append("\n# ===== CONNECTIONS =====\n")
for conn in config['connections']:
code_lines.append(f"{conn['from']}.add_exit({repr(conn['direction'])}, {repr(conn['to'])})\n")
return ''.join(code_lines)
# ==================== ACCOUNT MANAGEMENT ====================
@app.route('/api/editor/accounts', methods=['GET'])
@require_auth
def get_accounts():
"""Get list of all accounts with their characters"""
sys.path.insert(0, str(PARENT_DIR))
try:
from api import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
import time
from sqlalchemy import text
async def fetch_accounts():
accounts_list = []
try:
async with database.engine.connect() as conn:
# Get all accounts
result = await conn.execute(text("""
SELECT id, email, premium_expires_at, created_at
FROM accounts
ORDER BY created_at DESC
"""))
accounts_data = result.fetchall()
for acc_row in accounts_data:
account = {
'id': acc_row[0],
'email': acc_row[1],
'premium_expires_at': acc_row[2],
'created_at': acc_row[3],
'is_premium': acc_row[2] and acc_row[2] > time.time() if acc_row[2] else False,
'characters': []
}
# Get characters for this account
char_result = await conn.execute(text("""
SELECT id, name, level, location_id, hp, max_hp
FROM characters
WHERE account_id = :aid
ORDER BY last_played_at DESC
"""), {'aid': acc_row[0]})
for char_row in char_result.fetchall():
account['characters'].append({
'id': char_row[0],
'name': char_row[1],
'level': char_row[2],
'location_id': char_row[3],
'hp': char_row[4],
'max_hp': char_row[5]
})
account['character_count'] = len(account['characters'])
accounts_list.append(account)
except Exception as e:
print(f"[ACCOUNTS] Error fetching accounts: {e}", flush=True)
import traceback
traceback.print_exc()
return accounts_list
accounts = asyncio.run(fetch_accounts())
return jsonify({'accounts': accounts})
@app.route('/api/editor/account/<int:account_id>', methods=['GET'])
@require_auth
def get_account_details(account_id):
"""Get detailed account information with all characters"""
sys.path.insert(0, str(PARENT_DIR))
try:
from api import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
from sqlalchemy import text
async def fetch_account_details():
try:
async with database.engine.connect() as conn:
# Get account info
result = await conn.execute(text("""
SELECT id, email, premium_expires_at, created_at, last_login_at
FROM accounts
WHERE id = :aid
"""), {'aid': account_id})
row = result.fetchone()
if not row:
return None
account = {
'id': row[0],
'email': row[1],
'premium_expires_at': row[2],
'created_at': row[3],
'last_login_at': row[4],
'characters': []
}
# Get all characters with full details
char_result = await conn.execute(text("""
SELECT id, name, level, xp, hp, max_hp, stamina, max_stamina,
strength, agility, endurance, intellect, location_id,
is_dead, created_at, last_played_at
FROM characters
WHERE account_id = :aid
ORDER BY last_played_at DESC
"""), {'aid': account_id})
for char_row in char_result.fetchall():
account['characters'].append({
'id': char_row[0],
'name': char_row[1],
'level': char_row[2],
'xp': char_row[3],
'hp': char_row[4],
'max_hp': char_row[5],
'stamina': char_row[6],
'max_stamina': char_row[7],
'strength': char_row[8],
'agility': char_row[9],
'endurance': char_row[10],
'intellect': char_row[11],
'location_id': char_row[12],
'is_dead': char_row[13],
'created_at': char_row[14],
'last_played_at': char_row[15]
})
return account
except Exception as e:
print(f"[ACCOUNT-DETAILS] Error: {e}", flush=True)
import traceback
traceback.print_exc()
return None
account = asyncio.run(fetch_account_details())
if not account:
return jsonify({'error': 'Account not found'}), 404
return jsonify(account)
@app.route('/api/editor/account/<int:account_id>', methods=['POST'])
@require_auth
def update_account_details(account_id):
"""Update account information"""
sys.path.insert(0, str(PARENT_DIR))
try:
from api import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
from sqlalchemy import text
data = request.get_json()
async def update_account_data():
try:
async with database.engine.begin() as conn:
await conn.execute(text("""
UPDATE accounts SET
email = :email,
premium_expires_at = :premium_exp
WHERE id = :aid
"""), {
'aid': account_id,
'email': data.get('email'),
'premium_exp': data.get('premium_expires_at')
})
return True
except Exception as e:
print(f"[UPDATE-ACCOUNT] Error: {e}", flush=True)
import traceback
traceback.print_exc()
return False
success = asyncio.run(update_account_data())
if success:
return jsonify({'success': True, 'message': 'Account updated successfully'})
else:
return jsonify({'error': 'Failed to update account'}), 500
# ==================== PLAYER MANAGEMENT ====================
@app.route('/api/editor/players', methods=['GET'])
@require_auth
def get_players():
"""Get list of all characters with their accounts"""
sys.path.insert(0, str(PARENT_DIR))
try:
from api import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
import time
from sqlalchemy import text
async def fetch_players():
players_list = []
try:
async with database.engine.connect() as conn:
# Get all characters with account info
result = await conn.execute(text("""
SELECT c.id, c.account_id, c.name, c.location_id, c.hp, c.max_hp,
c.stamina, c.max_stamina, c.level, c.xp, c.strength, c.agility,
c.endurance, c.intellect, c.unspent_points,
a.email, a.premium_expires_at, a.created_at, c.created_at as character_created_at
FROM characters c
LEFT JOIN accounts a ON c.account_id = a.id
ORDER BY c.name
"""))
rows = result.fetchall()
for row in rows:
players_list.append({
'id': row[0],
'account_id': row[1],
'character_name': row[2],
'location_id': row[3],
'hp': row[4],
'max_hp': row[5],
'stamina': row[6],
'max_stamina': row[7],
'level': row[8],
'xp': row[9],
'strength': row[10],
'agility': row[11],
'endurance': row[12],
'intellect': row[13],
'unspent_points': row[14],
'email': row[15],
'premium_expires_at': row[16],
'account_created_at': row[17],
'character_created_at': row[18],
'is_premium': row[16] and row[16] > time.time() if row[16] else False
})
except Exception as e:
print(f"[PLAYERS] Error fetching players: {e}", flush=True)
import traceback
traceback.print_exc()
return players_list
players = asyncio.run(fetch_players())
return jsonify({'players': players})
@app.route('/api/editor/player/<int:character_id>', methods=['GET'])
@require_auth
def get_player_details(character_id):
"""Get detailed character information including inventory"""
sys.path.insert(0, str(PARENT_DIR))
try:
from api import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
from sqlalchemy import text
async def fetch_player_details():
player_data = {}
try:
async with database.engine.connect() as conn:
# Get character basic info with account
result = await conn.execute(text("""
SELECT c.*, a.email, a.premium_expires_at, a.created_at as account_created_at
FROM characters c
LEFT JOIN accounts a ON c.account_id = a.id
WHERE c.id = :cid
"""), {'cid': character_id})
row = result.fetchone()
if not row:
return None
player_data = dict(row._mapping)
# Get inventory
result = await conn.execute(text("""
SELECT i.item_id, i.quantity, i.is_equipped, i.unique_item_id,
u.durability, u.max_durability, u.tier, u.unique_stats
FROM inventory i
LEFT JOIN unique_items u ON i.unique_item_id = u.id
WHERE i.character_id = :cid
"""), {'cid': character_id})
inventory = []
equipped = {}
for row in result.fetchall():
item_data = {
'item_id': row[0],
'quantity': row[1]
}
if row[3]: # Has unique_item_id
item_data['unique_item_data'] = {
'durability': row[4],
'max_durability': row[5],
'tier': row[6],
'unique_stats': row[7]
}
if row[2]: # is_equipped
equipped[row[0]] = item_data
else:
inventory.append(item_data)
player_data['inventory'] = inventory
player_data['equipped'] = equipped
except Exception as e:
print(f"[PLAYER-DETAILS] Error: {e}", flush=True)
import traceback
traceback.print_exc()
return None
return player_data
player = asyncio.run(fetch_player_details())
if not player:
return jsonify({'error': 'Player not found'}), 404
# Timestamps are already Unix floats, no conversion needed
return jsonify(player)
@app.route('/api/editor/player/<int:character_id>', methods=['POST'])
@require_auth
def update_player(character_id):
"""Update character stats and properties"""
sys.path.insert(0, str(PARENT_DIR))
try:
from api import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
from sqlalchemy import text
data = request.get_json()
async def update_player_data():
try:
async with database.engine.begin() as conn:
# Update character stats
await conn.execute(text("""
UPDATE characters SET
name = :name,
location_id = :location,
hp = :hp,
max_hp = :max_hp,
stamina = :stamina,
max_stamina = :max_stamina,
level = :level,
xp = :xp,
strength = :str,
agility = :agi,
endurance = :end,
intellect = :int,
unspent_points = :unspent
WHERE id = :cid
"""), {
'cid': character_id,
'name': data.get('character_name'),
'location': data.get('location_id'),
'hp': data.get('hp'),
'max_hp': data.get('max_hp'),
'stamina': data.get('stamina'),
'max_stamina': data.get('max_stamina'),
'level': data.get('level'),
'xp': data.get('xp'),
'str': data.get('strength'),
'agi': data.get('agility'),
'end': data.get('endurance'),
'int': data.get('intellect'),
'unspent': data.get('unspent_points', 0)
})
return True
except Exception as e:
print(f"[UPDATE-PLAYER] Error: {e}", flush=True)
import traceback
traceback.print_exc()
return False
success = asyncio.run(update_player_data())
if success:
return jsonify({'success': True, 'message': 'Player updated successfully'})
else:
return jsonify({'error': 'Failed to update player'}), 500
@app.route('/api/editor/player/<int:character_id>/inventory', methods=['POST'])
@require_auth
def update_player_inventory(character_id):
"""Update player inventory"""
sys.path.insert(0, str(PARENT_DIR))
try:
from bot import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
from sqlalchemy import text
data = request.get_json()
inventory_items = data.get('inventory', [])
async def update_inventory():
try:
async with database.engine.begin() as conn:
# Clear existing inventory
await conn.execute(text("""
DELETE FROM inventory WHERE character_id = :cid
"""), {'cid': character_id})
# Insert new inventory (simplified - full unique item support would need more work)
for item in inventory_items:
await conn.execute(text("""
INSERT INTO inventory (character_id, item_id, quantity, is_equipped)
VALUES (:cid, :item_id, :qty, :equipped)
"""), {
'cid': character_id,
'item_id': item['item_id'],
'qty': item['quantity'],
'equipped': item.get('is_equipped', False)
})
return True
except Exception as e:
print(f"[UPDATE-INVENTORY] Error: {e}", flush=True)
import traceback
traceback.print_exc()
return False
success = asyncio.run(update_inventory())
if success:
return jsonify({'success': True, 'message': 'Inventory updated successfully'})
else:
return jsonify({'error': 'Failed to update inventory'}), 500
@app.route('/api/editor/player/<int:character_id>/equipment', methods=['POST'])
@require_auth
def update_player_equipment(character_id):
"""Update player equipped items"""
sys.path.insert(0, str(PARENT_DIR))
try:
from bot import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
from sqlalchemy import text
data = request.get_json()
equipped_items = data.get('equipped', {})
async def update_equipment():
try:
async with database.engine.begin() as conn:
# Clear existing equipment
await conn.execute(text("""
DELETE FROM equipped_items WHERE telegram_id = :tid
"""), {'tid': telegram_id})
# Insert new equipment
for slot, item_data in equipped_items.items():
if item_data and item_data.get('item_id'):
unique_data = json.dumps(item_data['unique_item_data']) if item_data.get('unique_item_data') else None
await conn.execute(text("""
INSERT INTO equipped_items (telegram_id, slot, item_id, unique_item_data)
VALUES (:tid, :slot, :item_id, :unique_data)
"""), {
'tid': telegram_id,
'slot': slot,
'item_id': item_data['item_id'],
'unique_data': unique_data
})
return True
except Exception as e:
print(f"[UPDATE-EQUIPMENT] Error: {e}", flush=True)
import traceback
traceback.print_exc()
return False
success = asyncio.run(update_equipment())
if success:
return jsonify({'success': True, 'message': 'Equipment updated successfully'})
else:
return jsonify({'error': 'Failed to update equipment'}), 500
@app.route('/api/editor/account/<int:telegram_id>/ban', methods=['POST'])
@require_auth
def ban_account(telegram_id):
"""Ban or unban an account"""
sys.path.insert(0, str(PARENT_DIR))
try:
from bot import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
from sqlalchemy import text
data = request.get_json()
is_banned = data.get('is_banned', True)
ban_reason = data.get('ban_reason', '')
async def update_ban_status():
try:
async with database.engine.begin() as conn:
await conn.execute(text("""
UPDATE accounts SET
is_banned = :banned,
ban_reason = :reason
WHERE telegram_id = :tid
"""), {
'tid': telegram_id,
'banned': is_banned,
'reason': ban_reason if is_banned else None
})
return True
except Exception as e:
print(f"[BAN-ACCOUNT] Error: {e}", flush=True)
return False
success = asyncio.run(update_ban_status())
if success:
action = 'banned' if is_banned else 'unbanned'
return jsonify({'success': True, 'message': f'Account {action} successfully'})
else:
return jsonify({'error': 'Failed to update ban status'}), 500
@app.route('/api/editor/account/<int:account_id>/delete', methods=['DELETE'])
@require_auth
def delete_account(account_id):
"""Delete an account and all associated characters"""
sys.path.insert(0, str(PARENT_DIR))
try:
from api import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
from sqlalchemy import text
async def delete_account_data():
try:
async with database.engine.begin() as conn:
# CASCADE will handle characters and their inventory
await conn.execute(text("DELETE FROM accounts WHERE id = :aid"), {'aid': account_id})
return True
except Exception as e:
print(f"[DELETE-ACCOUNT] Error: {e}", flush=True)
import traceback
traceback.print_exc()
return False
success = asyncio.run(delete_account_data())
if success:
return jsonify({'success': True, 'message': 'Account deleted successfully'})
else:
return jsonify({'error': 'Failed to delete account'}), 500
@app.route('/api/editor/player/<int:character_id>/reset', methods=['POST'])
@require_auth
def reset_player(character_id):
"""Reset character to starting state"""
sys.path.insert(0, str(PARENT_DIR))
try:
from api import database
except ImportError:
return jsonify({'error': 'Database module not available'}), 500
import asyncio
from sqlalchemy import text
async def reset_player_data():
try:
async with database.engine.begin() as conn:
# Clear inventory
await conn.execute(text("DELETE FROM inventory WHERE character_id = :cid"), {'cid': character_id})
# Reset character stats to defaults
await conn.execute(text("""
UPDATE characters SET
location_id = 'cabin',
hp = 100,
max_hp = 100,
stamina = 100,
max_stamina = 100,
level = 1,
xp = 0,
strength = 0,
agility = 0,
endurance = 0,
intellect = 0,
unspent_points = 20,
is_dead = false
WHERE id = :cid
"""), {'cid': character_id})
return True
except Exception as e:
print(f"[RESET-PLAYER] Error: {e}", flush=True)
import traceback
traceback.print_exc()
return False
success = asyncio.run(reset_player_data())
if success:
return jsonify({'success': True, 'message': 'Player reset successfully'})
else:
return jsonify({'error': 'Failed to reset player'}), 500
if __name__ == '__main__':
port = int(os.environ.get('PORT', 8080))
print(f"""
╔════════════════════════════════════════════╗
║ Echoes of the Ashes - Map Server ║
║ WITH EDITOR ║
╚════════════════════════════════════════════╝
🗺️ Map Viewer: http://localhost:{port}
✏️ Map Editor: http://localhost:{port}/editor
🔐 Password: {ADMIN_PASSWORD}
⚠️ IMPORTANT: Set EDITOR_PASSWORD environment variable in production!
""")
app.run(host='0.0.0.0', port=port, debug=False)