Initial commit
This commit is contained in:
293
app/helpers.py
Normal file
293
app/helpers.py
Normal file
@@ -0,0 +1,293 @@
|
||||
import logging
|
||||
import requests
|
||||
from database import * # Import all DB functions
|
||||
from config import * # Import constants if needed (like BOT_TOKEN for direct API calls, although better passed)
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def format_raffle_details(raffle_id):
|
||||
"""Fetches and formats raffle details for display, including multi-channel prices."""
|
||||
raffle = get_raffle(raffle_id) # Fetches basic info from 'raffles' table
|
||||
if not raffle:
|
||||
return "Error: No se encontró el sorteo."
|
||||
|
||||
details = (
|
||||
f"ℹ️ **Detalles del Sorteo** ℹ️\n\n"
|
||||
f"**ID:** `{raffle['id']}`\n"
|
||||
f"**Nombre:** {raffle['name']}\n"
|
||||
f"**Descripción:**\n{raffle['description']}\n\n"
|
||||
f"**Activo:** {'Sí' if raffle['active'] else 'No (Terminado)'}\n"
|
||||
)
|
||||
|
||||
# Get and Format Channels and their Prices
|
||||
channels_and_prices = get_raffle_channels_and_prices(raffle_id) # List of Row objects {'channel_id', 'price'}
|
||||
|
||||
if channels_and_prices:
|
||||
details += "**Canales y Precios:**\n"
|
||||
for item in channels_and_prices:
|
||||
channel_id_str = item['channel_id']
|
||||
price = item['price']
|
||||
channel_alias = REVERSE_CHANNELS.get(str(channel_id_str), f"ID:{channel_id_str}") # Ensure lookup with string ID
|
||||
details += f"- {channel_alias}: {price}€\n"
|
||||
details += "\n" # Add a newline after the list
|
||||
else:
|
||||
details += "**Canales y Precios:** Ninguno asignado\n\n"
|
||||
|
||||
|
||||
# Image ID (optional display)
|
||||
if raffle['image_file_id']:
|
||||
details += f"**ID Imagen:** (Presente)\n"
|
||||
else:
|
||||
details += f"**ID Imagen:** (No asignada)\n"
|
||||
|
||||
# Add participant count and remaining numbers
|
||||
participants = get_participants(raffle_id) # Fetches list of Rows
|
||||
completed_participants_count = 0
|
||||
# pending_participants_count = 0 # If you want to show pending
|
||||
if participants: # Check if participants list is not None or empty
|
||||
completed_participants_count = sum(1 for p in participants if p['step'] == 'completed')
|
||||
# pending_participants_count = sum(1 for p in participants if p['step'] == 'waiting_for_payment')
|
||||
|
||||
|
||||
details += f"\n**Participantes Confirmados:** {completed_participants_count}\n"
|
||||
# details += f"**Reservas Pendientes:** {pending_participants_count}\n"
|
||||
|
||||
remaining_count = get_remaining_numbers_amount(raffle_id)
|
||||
details += f"**Números Disponibles:** {remaining_count if remaining_count >= 0 else 'Error al calcular'}\n"
|
||||
|
||||
return details
|
||||
|
||||
def build_raffle_announcement_caption(raffle_id):
|
||||
"""Builds the standard announcement caption text."""
|
||||
raffle = get_raffle(raffle_id)
|
||||
if not raffle:
|
||||
return None
|
||||
|
||||
remaining_count = get_remaining_numbers_amount(raffle_id)
|
||||
|
||||
caption = (
|
||||
f"🎉 **¡Sorteo Activo!** 🎉\n\n"
|
||||
f"✨ **{raffle['name']}** ✨\n\n"
|
||||
f"{raffle['description']}\n\n"
|
||||
f"💰 **Donación mínima:** {raffle['price']}€\n"
|
||||
f"🔢 **Números disponibles:** {remaining_count if remaining_count >= 0 else 'N/A'}\n\n"
|
||||
f"👇 ¡Pulsa /sorteo en este chat para participar! 👇"
|
||||
)
|
||||
return caption
|
||||
|
||||
def get_winners(raffle_id, winner_numbers_int):
|
||||
"""Finds winners based on chosen numbers."""
|
||||
participants = get_participants(raffle_id) # Gets all participants for the raffle
|
||||
winners = {} # { user_name: [list_of_winning_numbers_they_had] }
|
||||
|
||||
if not participants:
|
||||
return "" # No participants, no winners
|
||||
|
||||
winner_numbers_set = set(winner_numbers_int)
|
||||
|
||||
for participant in participants:
|
||||
# Only consider completed participations as potential winners
|
||||
if participant['step'] != 'completed' or not participant['numbers']:
|
||||
continue
|
||||
|
||||
user_id = participant['user_id']
|
||||
user_name = escape_markdown_v2_chars_for_username(participant['user_name']) or f"User_{user_id}" # Fallback name
|
||||
numbers_str = participant['numbers']
|
||||
|
||||
try:
|
||||
participant_numbers_set = {int(n) for n in numbers_str.split(',') if n.isdigit()}
|
||||
except ValueError:
|
||||
logger.warning(f"Invalid number format for participant {user_id} in raffle {raffle_id}: {numbers_str}")
|
||||
continue # Skip participant with bad data
|
||||
|
||||
# Find the intersection
|
||||
won_numbers = winner_numbers_set.intersection(participant_numbers_set)
|
||||
|
||||
if won_numbers:
|
||||
# Store the winning numbers (as strings, sorted) for this user
|
||||
won_numbers_str_sorted = sorted([f"{n:02}" for n in won_numbers])
|
||||
if user_name not in winners:
|
||||
winners[user_name] = []
|
||||
winners[user_name].extend(won_numbers_str_sorted) # Add potentially multiple matches
|
||||
|
||||
if not winners:
|
||||
return "No hubo ganadores con esos números."
|
||||
|
||||
# Format the output string
|
||||
winners_message_parts = []
|
||||
for user_name, numbers in winners.items():
|
||||
# Ensure numbers are unique in the final output per user
|
||||
unique_numbers_str = ", ".join(sorted(list(set(numbers))))
|
||||
winners_message_parts.append(f"- @{escape_markdown_v2_chars_for_username(user_name)} acertó: **{unique_numbers_str}**")
|
||||
|
||||
return "\n".join(winners_message_parts)
|
||||
|
||||
|
||||
def generate_table_image(raffle_id):
|
||||
"""Generates the 10x10 grid image showing number status."""
|
||||
# Define image parameters
|
||||
cols, rows = 10, 10
|
||||
cell_width, cell_height = 120, 50
|
||||
title_height_space = 70
|
||||
image_width = cols * cell_width
|
||||
image_height = rows * cell_height + title_height_space
|
||||
background_color = "white"
|
||||
line_color = "black"
|
||||
font_size = 16
|
||||
title_font_size = 24
|
||||
|
||||
# Create image
|
||||
img = Image.new("RGB", (image_width, image_height), background_color)
|
||||
draw = ImageDraw.Draw(img)
|
||||
|
||||
# Load fonts (handle potential errors)
|
||||
try:
|
||||
# Ensure the font file exists or provide a fallback path
|
||||
font_path = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf" # Example Linux path
|
||||
if not os.path.exists(font_path):
|
||||
font_path = "arial.ttf" # Try common Windows font
|
||||
font = ImageFont.truetype(font_path, font_size)
|
||||
title_font = ImageFont.truetype(font_path, title_font_size)
|
||||
except IOError:
|
||||
logger.warning("Specific font not found, using default PIL font.")
|
||||
font = ImageFont.load_default()
|
||||
# Adjust size for default font if needed, default doesn't take size arg directly
|
||||
# title_font = ImageFont.truetype(font_path, title_font_size) # Need a default large font method
|
||||
title_font = ImageFont.load_default() # Revert to default for title too for simplicity
|
||||
|
||||
# Draw Title
|
||||
raffle_details = get_raffle(raffle_id)
|
||||
if not raffle_details:
|
||||
logger.error(f"Cannot generate image: Raffle {raffle_id} not found.")
|
||||
# Draw error message on image
|
||||
draw.text((10, 10), f"Error: Sorteo {raffle_id} no encontrado", fill="red", font=title_font)
|
||||
img.save(f"/app/data/raffles/raffle_table_{raffle_id}_error.png")
|
||||
return False # Indicate failure
|
||||
|
||||
raffle_name = raffle_details['name']
|
||||
title_text = f"Sorteo: {raffle_name}"
|
||||
# Calculate text bounding box for centering
|
||||
try:
|
||||
# Use textbbox for more accurate centering
|
||||
title_bbox = draw.textbbox((0, 0), title_text, font=title_font)
|
||||
title_width = title_bbox[2] - title_bbox[0]
|
||||
# title_height = title_bbox[3] - title_bbox[1] # Not needed for x centering
|
||||
title_x = (image_width - title_width) / 2
|
||||
title_y = 10 # Padding from top
|
||||
draw.text((title_x, title_y), title_text, fill=line_color, font=title_font)
|
||||
except AttributeError: # Handle older PIL versions that might not have textbbox
|
||||
# Fallback using textlength (less accurate)
|
||||
title_width = draw.textlength(title_text, font=title_font)
|
||||
title_x = (image_width - title_width) / 2
|
||||
title_y = 10
|
||||
draw.text((title_x, title_y), title_text, fill=line_color, font=title_font)
|
||||
|
||||
|
||||
# Get participant data
|
||||
participants = get_participants(raffle_id)
|
||||
number_status = {} # { num_int: (user_name, status_color) }
|
||||
|
||||
if participants:
|
||||
for p in participants:
|
||||
if not p['numbers'] or p['step'] not in ['waiting_for_payment', 'completed']:
|
||||
continue
|
||||
user_name = p['user_name'] or f"User_{p['user_id']}" # Fallback name
|
||||
status_color = "red" if p['step'] == 'waiting_for_payment' else "black" # Red=Reserved, Black=Completed
|
||||
|
||||
try:
|
||||
nums = {int(n) for n in p['numbers'].split(',') if n.isdigit()}
|
||||
for num in nums:
|
||||
if 0 <= num <= 99:
|
||||
number_status[num] = (user_name, status_color)
|
||||
except ValueError:
|
||||
logger.warning(f"Skipping invalid numbers '{p['numbers']}' for user {p['user_id']} in image generation.")
|
||||
continue
|
||||
|
||||
# Draw Grid and Fill Numbers
|
||||
for i in range(rows):
|
||||
for j in range(cols):
|
||||
num = i * cols + j
|
||||
x1 = j * cell_width
|
||||
y1 = i * cell_height + title_height_space
|
||||
x2 = x1 + cell_width
|
||||
y2 = y1 + cell_height
|
||||
|
||||
# Draw cell rectangle
|
||||
draw.rectangle([x1, y1, x2, y2], outline=line_color)
|
||||
|
||||
# Prepare text and color
|
||||
number_text = f"{num:02}"
|
||||
text_fill = "blue" # Default color for free numbers
|
||||
owner_text = ""
|
||||
|
||||
if num in number_status:
|
||||
owner, status_color = number_status[num]
|
||||
text_fill = status_color
|
||||
# Truncate long usernames
|
||||
max_name_len = 12
|
||||
owner_text = owner[:max_name_len] + ('…' if len(owner) > max_name_len else '')
|
||||
|
||||
|
||||
# Position text within the cell
|
||||
text_x = x1 + 10 # Padding from left
|
||||
text_y_num = y1 + 5 # Padding for number line
|
||||
text_y_owner = y1 + 5 + font_size + 2 # Padding for owner line (below number)
|
||||
|
||||
draw.text((text_x, text_y_num), number_text, fill=text_fill, font=font)
|
||||
if owner_text:
|
||||
draw.text((text_x, text_y_owner), owner_text, fill=text_fill, font=font)
|
||||
|
||||
# Ensure data directory exists
|
||||
os.makedirs("/app/data/raffles", exist_ok=True)
|
||||
|
||||
# Save the image
|
||||
image_path = f"/app/data/raffles/raffle_table_{raffle_id}.png"
|
||||
try:
|
||||
img.save(image_path)
|
||||
logger.info(f"Generated raffle table image: {image_path}")
|
||||
return True # Indicate success
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save raffle table image {image_path}: {e}")
|
||||
return False # Indicate failure
|
||||
|
||||
def escape_markdown_v2_chars_for_username(text: str) -> str:
|
||||
"""Escapes characters for MarkdownV2, specifically for usernames."""
|
||||
# For usernames, usually only _ and * are problematic if not part of actual formatting
|
||||
# Other MarkdownV2 special characters: `[` `]` `(` `)` `~` `>` `#` `+` `-` `=` `|` `{` `}` `.` `!`
|
||||
# We are most concerned with _ in @user_name context.
|
||||
# A more comprehensive list of characters to escape for general text:
|
||||
# escape_chars = r'_*[]()~`>#+-=|{}.!'
|
||||
# For just usernames in this context, focus on what breaks @user_name
|
||||
escape_chars = r'_*`[' # Adding ` and [ just in case they appear in odd usernames
|
||||
|
||||
# Python's re.escape escapes all non-alphanumerics.
|
||||
# We only want to escape specific markdown control characters within the username.
|
||||
# For usernames, simply escaping '_' is often enough for the @mention issue.
|
||||
return "".join(['\\' + char if char in escape_chars else char for char in text])
|
||||
|
||||
def format_last_participants_list(participants_list: list) -> str:
|
||||
"""
|
||||
Formats the list of last participants for the announcement message.
|
||||
participants_list is a list of dicts: [{'user_name', 'numbers'}]
|
||||
"""
|
||||
if not participants_list:
|
||||
return "" # Return empty string if no other recent participants
|
||||
|
||||
# Reverse the list so the oldest of the "last N" appears first in the formatted string
|
||||
# as per the example "nick1, nick2, nick3" implies chronological order of joining.
|
||||
# The DB query already returns newest first, so we reverse it for display.
|
||||
formatted_lines = ["Los últimos participantes en unirse (además del más reciente) han sido:"]
|
||||
for p_info in reversed(participants_list): # Display oldest of this batch first
|
||||
user_name = p_info.get('user_name', 'Usuario Anónimo')
|
||||
numbers_str = p_info.get('numbers', '')
|
||||
if numbers_str:
|
||||
num_list = numbers_str.split(',')
|
||||
if len(num_list) == 1:
|
||||
line = f" - {escape_markdown_v2_chars_for_username(user_name)}, con el número: {num_list[0]}"
|
||||
else:
|
||||
line = f" - {escape_markdown_v2_chars_for_username(user_name)}, con los números: {', '.join(num_list)}"
|
||||
formatted_lines.append(line)
|
||||
|
||||
return "\n".join(formatted_lines) # Add a trailing newline
|
||||
Reference in New Issue
Block a user