500 lines
21 KiB
Python
500 lines
21 KiB
Python
import logging
|
||
import requests
|
||
from database import * # Import all DB functions
|
||
from config import * # Import constants if needed (like BOT_TOKEN for direct API calls, although better passed)
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
import os
|
||
from requests.auth import HTTPBasicAuth
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def format_raffle_details(raffle_id):
|
||
"""Fetches and formats raffle details for display, including multi-channel prices."""
|
||
raffle = get_raffle(raffle_id) # Fetches basic info from 'raffles' table
|
||
if not raffle:
|
||
return "Error: No se encontró el sorteo."
|
||
|
||
details = (
|
||
f"ℹ️ <b>Detalles del sorteo</b> ℹ️\n\n"
|
||
f"<b>ID:</b> <code>{raffle['id']}</code>\n"
|
||
f"<b>Nombre:</b> {raffle['name']}\n"
|
||
f"<b>Descripción:</b>\n{raffle['description'][:100]}...\n\n"
|
||
f"<b>Envío internacional:</b> {'Sí' if raffle['international_shipping'] else 'No'}\n"
|
||
f"<b>Activo:</b> {'Sí' if raffle['active'] else 'No (Terminado)'}\n"
|
||
f"<b>Donación mínima (canal principal):</b> {raffle['price']}€\n"
|
||
)
|
||
|
||
# Image ID (optional display)
|
||
if raffle['image_file_id']:
|
||
details += f"<b>ID Imagen:</b> {raffle['image_file_id']} (Presente)\n"
|
||
else:
|
||
details += f"<b>ID Imagen:</b> (No asignada)\n"
|
||
|
||
# Add participant count and remaining numbers
|
||
participants = get_participants(raffle_id) # Fetches list of Rows
|
||
completed_participants_count = 0
|
||
# pending_participants_count = 0 # If you want to show pending
|
||
if participants: # Check if participants list is not None or empty
|
||
completed_participants_count = sum(1 for p in participants if p['step'] == 'completed')
|
||
# pending_participants_count = sum(1 for p in participants if p['step'] == 'waiting_for_payment')
|
||
|
||
|
||
details += f"\n<b>Participantes Confirmados:</b> {completed_participants_count}\n"
|
||
# details += f"<b>Reservas Pendientes:</b> {pending_participants_count}\n"
|
||
|
||
remaining_count = get_remaining_numbers_amount(raffle_id)
|
||
details += f"<b>Números Disponibles:</b> {remaining_count if remaining_count >= 0 else 'Error al calcular'}\n"
|
||
|
||
# Gross and net amounts
|
||
total_gross = 0.0
|
||
total_fees = 0.0
|
||
total_net = 0.0
|
||
invoice_ids = get_all_invoice_ids(raffle_id)
|
||
for inv_id in invoice_ids:
|
||
gross, net, fees = get_paypal_amounts_for_invoice(inv_id)
|
||
total_gross += gross
|
||
total_fees += fees
|
||
total_net += net
|
||
details += f"\n<b>Total Recaudado (bruto):</b> {total_gross:.2f}€\n"
|
||
details += f"<b>Total Gastos (comisiones):</b> {total_fees:.2f}€\n"
|
||
details += f"<b>Total Beneficio (neto):</b> {total_net:.2f}€\n"
|
||
|
||
return details
|
||
|
||
def get_winners(raffle_id, winner_numbers_int):
|
||
"""Finds winners based on chosen numbers."""
|
||
participants = get_participants(raffle_id) # Gets all participants for the raffle
|
||
winners = {} # { user_name: [list_of_winning_numbers_they_had] }
|
||
|
||
if not participants:
|
||
return "" # No participants, no winners
|
||
|
||
winner_numbers_set = set(winner_numbers_int)
|
||
|
||
for participant in participants:
|
||
# Only consider completed participations as potential winners
|
||
if participant['step'] != 'completed' or not participant['numbers']:
|
||
continue
|
||
|
||
user_id = participant['user_id']
|
||
user_name = escape_markdown_v2_chars_for_username(participant['user_name']) or f"User_{user_id}" # Fallback name
|
||
numbers_str = participant['numbers']
|
||
|
||
try:
|
||
participant_numbers_set = {int(n) for n in numbers_str.split(',') if n.isdigit()}
|
||
except ValueError:
|
||
logger.warning(f"Invalid number format for participant {user_id} in raffle {raffle_id}: {numbers_str}")
|
||
continue # Skip participant with bad data
|
||
|
||
# Find the intersection
|
||
won_numbers = winner_numbers_set.intersection(participant_numbers_set)
|
||
|
||
if won_numbers:
|
||
# Store the winning numbers (as strings, sorted) for this user
|
||
won_numbers_str_sorted = sorted([f"{n:02}" for n in won_numbers])
|
||
if user_name not in winners:
|
||
winners[user_name] = []
|
||
winners[user_name].extend(won_numbers_str_sorted) # Add potentially multiple matches
|
||
|
||
if not winners:
|
||
return "No hubo ganadores con esos números."
|
||
|
||
# Format the output string
|
||
winners_message_parts = []
|
||
for user_name, numbers in winners.items():
|
||
# Ensure numbers are unique in the final output per user
|
||
unique_numbers_str = ", ".join(sorted(list(set(numbers))))
|
||
winners_message_parts.append(f"- @{escape_markdown_v2_chars_for_username(user_name)} acertó: **{unique_numbers_str}**")
|
||
|
||
return "\n".join(winners_message_parts)
|
||
|
||
|
||
# def generate_table_image(raffle_id):
|
||
# """Generates the 10x10 grid image showing number status."""
|
||
# # Define image parameters
|
||
# cols, rows = 10, 10
|
||
# cell_width, cell_height = 120, 50
|
||
# title_height_space = 70
|
||
# image_width = cols * cell_width
|
||
# image_height = rows * cell_height + title_height_space
|
||
# background_color = "white"
|
||
# line_color = "black"
|
||
# font_size = 16
|
||
# title_font_size = 24
|
||
|
||
# # Create image
|
||
# img = Image.new("RGB", (image_width, image_height), background_color)
|
||
# draw = ImageDraw.Draw(img)
|
||
|
||
# # Load fonts (handle potential errors)
|
||
# try:
|
||
# # Ensure the font file exists or provide a fallback path
|
||
# font_path = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf" # Example Linux path
|
||
# if not os.path.exists(font_path):
|
||
# font_path = "arial.ttf" # Try common Windows font
|
||
# font = ImageFont.truetype(font_path, font_size)
|
||
# title_font = ImageFont.truetype(font_path, title_font_size)
|
||
# except IOError:
|
||
# logger.warning("Specific font not found, using default PIL font.")
|
||
# font = ImageFont.load_default()
|
||
# # Adjust size for default font if needed, default doesn't take size arg directly
|
||
# # title_font = ImageFont.truetype(font_path, title_font_size) # Need a default large font method
|
||
# title_font = ImageFont.load_default() # Revert to default for title too for simplicity
|
||
|
||
# # Draw Title
|
||
# raffle_details = get_raffle(raffle_id)
|
||
# if not raffle_details:
|
||
# logger.error(f"Cannot generate image: Raffle {raffle_id} not found.")
|
||
# # Draw error message on image
|
||
# draw.text((10, 10), f"Error: Sorteo {raffle_id} no encontrado", fill="red", font=title_font)
|
||
# img.save(f"/app/data/raffles/raffle_table_{raffle_id}_error.png")
|
||
# return False # Indicate failure
|
||
|
||
# raffle_name = raffle_details['name']
|
||
# title_text = f"Sorteo: {raffle_name}"
|
||
# # Calculate text bounding box for centering
|
||
# try:
|
||
# # Use textbbox for more accurate centering
|
||
# title_bbox = draw.textbbox((0, 0), title_text, font=title_font)
|
||
# title_width = title_bbox[2] - title_bbox[0]
|
||
# # title_height = title_bbox[3] - title_bbox[1] # Not needed for x centering
|
||
# title_x = (image_width - title_width) / 2
|
||
# title_y = 10 # Padding from top
|
||
# draw.text((title_x, title_y), title_text, fill=line_color, font=title_font)
|
||
# except AttributeError: # Handle older PIL versions that might not have textbbox
|
||
# # Fallback using textlength (less accurate)
|
||
# title_width = draw.textlength(title_text, font=title_font)
|
||
# title_x = (image_width - title_width) / 2
|
||
# title_y = 10
|
||
# draw.text((title_x, title_y), title_text, fill=line_color, font=title_font)
|
||
|
||
|
||
# # Get participant data
|
||
# participants = get_participants(raffle_id)
|
||
# number_status = {} # { num_int: (user_name, status_color) }
|
||
|
||
# if participants:
|
||
# for p in participants:
|
||
# if not p['numbers'] or p['step'] not in ['waiting_for_payment', 'completed']:
|
||
# continue
|
||
# user_name = p['user_name'] or f"User_{p['user_id']}" # Fallback name
|
||
# status_color = "red" if p['step'] == 'waiting_for_payment' else "black" # Red=Reserved, Black=Completed
|
||
|
||
# try:
|
||
# nums = {int(n) for n in p['numbers'].split(',') if n.isdigit()}
|
||
# for num in nums:
|
||
# if 0 <= num <= 99:
|
||
# number_status[num] = (user_name, status_color)
|
||
# except ValueError:
|
||
# logger.warning(f"Skipping invalid numbers '{p['numbers']}' for user {p['user_id']} in image generation.")
|
||
# continue
|
||
|
||
# # Draw Grid and Fill Numbers
|
||
# for i in range(rows):
|
||
# for j in range(cols):
|
||
# num = i * cols + j
|
||
# x1 = j * cell_width
|
||
# y1 = i * cell_height + title_height_space
|
||
# x2 = x1 + cell_width
|
||
# y2 = y1 + cell_height
|
||
|
||
# # Draw cell rectangle
|
||
# draw.rectangle([x1, y1, x2, y2], outline=line_color)
|
||
|
||
# # Prepare text and color
|
||
# number_text = f"{num:02}"
|
||
# text_fill = "blue" # Default color for free numbers
|
||
# owner_text = ""
|
||
|
||
# if num in number_status:
|
||
# owner, status_color = number_status[num]
|
||
# text_fill = status_color
|
||
# # Truncate long usernames
|
||
# max_name_len = 12
|
||
# owner_text = owner[:max_name_len] + ('…' if len(owner) > max_name_len else '')
|
||
|
||
|
||
# # Position text within the cell
|
||
# text_x = x1 + 10 # Padding from left
|
||
# text_y_num = y1 + 5 # Padding for number line
|
||
# text_y_owner = y1 + 5 + font_size + 2 # Padding for owner line (below number)
|
||
|
||
# draw.text((text_x, text_y_num), number_text, fill=text_fill, font=font)
|
||
# if owner_text:
|
||
# draw.text((text_x, text_y_owner), owner_text, fill=text_fill, font=font)
|
||
|
||
# # Ensure data directory exists
|
||
# os.makedirs("/app/data/raffles", exist_ok=True)
|
||
|
||
# # Save the image
|
||
# image_path = f"/app/data/raffles/raffle_table_{raffle_id}.png"
|
||
# try:
|
||
# img.save(image_path)
|
||
# logger.info(f"Generated raffle table image: {image_path}")
|
||
# return True # Indicate success
|
||
# except Exception as e:
|
||
# logger.error(f"Failed to save raffle table image {image_path}: {e}")
|
||
# return False # Indicate failure
|
||
|
||
def generate_table_image(raffle_id):
|
||
"""Generates a fancier 10x10 raffle grid image with participant names and legend."""
|
||
|
||
# Parameters
|
||
cols, rows = 10, 10
|
||
cell_width, cell_height = 120, 60
|
||
title_height_space = 90
|
||
title_bottom_padding = 30 # extra space between title and grid
|
||
legend_height_space = 80
|
||
margin_x = 40 # left/right margin
|
||
image_width = cols * cell_width + margin_x * 2
|
||
image_height = rows * cell_height + title_height_space + title_bottom_padding + legend_height_space
|
||
background_color = "#fdfdfd"
|
||
grid_line_color = "#666666"
|
||
free_bg_color = "#e8f0ff"
|
||
reserved_bg_color = "#ffe8e8"
|
||
taken_bg_color = "#e8ffe8"
|
||
font_size = 16
|
||
title_font_size = 28
|
||
legend_font_size = 18
|
||
|
||
# Create base image
|
||
img = Image.new("RGB", (image_width, image_height), background_color)
|
||
draw = ImageDraw.Draw(img)
|
||
|
||
# Load fonts
|
||
try:
|
||
font_path = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
|
||
if not os.path.exists(font_path):
|
||
font_path = "arial.ttf"
|
||
font = ImageFont.truetype(font_path, font_size)
|
||
title_font = ImageFont.truetype(font_path, title_font_size)
|
||
legend_font = ImageFont.truetype(font_path, legend_font_size)
|
||
except IOError:
|
||
font = ImageFont.load_default()
|
||
title_font = ImageFont.load_default()
|
||
legend_font = ImageFont.load_default()
|
||
|
||
# --- Title Bar ---
|
||
raffle_details = get_raffle(raffle_id)
|
||
if not raffle_details:
|
||
draw.text((10, 10), f"Error: Sorteo {raffle_id} no encontrado", fill="red", font=title_font)
|
||
img.save(f"/app/data/raffles/raffle_table_{raffle_id}_error.png")
|
||
return False
|
||
|
||
raffle_name = raffle_details['name']
|
||
title_text = f"Sorteo: {raffle_name}"
|
||
|
||
# Draw title bar (full width)
|
||
title_bar_color = "#4a90e2"
|
||
draw.rectangle([0, 0, image_width, title_height_space], fill=title_bar_color)
|
||
|
||
# Centered title text
|
||
title_bbox = draw.textbbox((0, 0), title_text, font=title_font)
|
||
title_width = title_bbox[2] - title_bbox[0]
|
||
title_height = title_bbox[3] - title_bbox[1]
|
||
title_x = (image_width - title_width) / 2
|
||
title_y = (title_height_space - title_height) / 2
|
||
draw.text((title_x, title_y), title_text, fill="white", font=title_font)
|
||
|
||
# --- Participants ---
|
||
participants = get_participants(raffle_id)
|
||
number_status = {}
|
||
|
||
if participants:
|
||
for p in participants:
|
||
if not p['numbers'] or p['step'] not in ['waiting_for_payment', 'completed']:
|
||
continue
|
||
user_name = p['user_name'] or f"User_{p['user_id']}"
|
||
status = p['step']
|
||
nums = {int(n) for n in p['numbers'].split(',') if n.isdigit()}
|
||
for num in nums:
|
||
if 0 <= num <= 99:
|
||
number_status[num] = (user_name, status)
|
||
|
||
# --- Grid ---
|
||
grid_top = title_height_space + title_bottom_padding
|
||
for i in range(rows):
|
||
for j in range(cols):
|
||
num = i * cols + j
|
||
x1 = margin_x + j * cell_width
|
||
y1 = grid_top + i * cell_height
|
||
x2 = x1 + cell_width
|
||
y2 = y1 + cell_height
|
||
|
||
# Background color
|
||
if num in number_status:
|
||
owner, status = number_status[num]
|
||
bg_color = reserved_bg_color if status == "waiting_for_payment" else taken_bg_color
|
||
text_color = "#000000"
|
||
else:
|
||
owner, bg_color, text_color = "", free_bg_color, "#1a4db3"
|
||
|
||
# Rounded rectangle cell
|
||
radius = 12
|
||
draw.rounded_rectangle([x1+1, y1+1, x2-1, y2-1], radius, outline=grid_line_color, fill=bg_color)
|
||
|
||
# Draw number
|
||
number_text = f"{num:02}"
|
||
draw.text((x1+10, y1+8), number_text, fill=text_color, font=font)
|
||
|
||
# Draw owner
|
||
if owner:
|
||
max_name_len = 12
|
||
owner_text = owner[:max_name_len] + ('…' if len(owner) > max_name_len else '')
|
||
draw.text((x1+10, y1+8+font_size+4), owner_text, fill=text_color, font=font)
|
||
|
||
# --- Legend ---
|
||
legend_y = image_height - legend_height_space + 20
|
||
legend_items = [
|
||
("Libre", free_bg_color),
|
||
("Reservado", reserved_bg_color),
|
||
("Pagado", taken_bg_color)
|
||
]
|
||
|
||
spacing = 280 # more spacing between legend items
|
||
start_x = (image_width - (spacing * (len(legend_items)-1) + 140)) / 2
|
||
|
||
for i, (label, color) in enumerate(legend_items):
|
||
box_x = start_x + i * spacing
|
||
box_y = legend_y
|
||
box_w, box_h = 34, 34
|
||
|
||
# Color box
|
||
draw.rounded_rectangle([box_x, box_y, box_x+box_w, box_y+box_h], 6, fill=color, outline=grid_line_color)
|
||
|
||
# Label text
|
||
draw.text((box_x + box_w + 14, box_y + 6), label, fill="black", font=legend_font)
|
||
|
||
# Save
|
||
os.makedirs("/app/data/raffles", exist_ok=True)
|
||
image_path = f"/app/data/raffles/raffle_table_{raffle_id}.png"
|
||
img.save(image_path)
|
||
return True
|
||
|
||
|
||
def escape_markdown_v2_chars_for_username(text: str) -> str:
|
||
"""Escapes characters for MarkdownV2, specifically for usernames."""
|
||
# For usernames, usually only _ and * are problematic if not part of actual formatting
|
||
# Other MarkdownV2 special characters: `[` `]` `(` `)` `~` `>` `#` `+` `-` `=` `|` `{` `}` `.` `!`
|
||
# We are most concerned with _ in @user_name context.
|
||
# A more comprehensive list of characters to escape for general text:
|
||
# escape_chars = r'_*[]()~`>#+-=|{}.!'
|
||
# For just usernames in this context, focus on what breaks @user_name
|
||
escape_chars = r'_*`[' # Adding ` and [ just in case they appear in odd usernames
|
||
|
||
# Python's re.escape escapes all non-alphanumerics.
|
||
# We only want to escape specific markdown control characters within the username.
|
||
# For usernames, simply escaping '_' is often enough for the @mention issue.
|
||
return "".join(['\\' + char if char in escape_chars else char for char in text])
|
||
|
||
def format_last_participants_list(participants_list: list) -> str:
|
||
"""
|
||
Formats the list of last participants for the announcement message.
|
||
participants_list is a list of dicts: [{'user_name', 'numbers'}]
|
||
"""
|
||
if not participants_list:
|
||
return "" # Return empty string if no other recent participants
|
||
|
||
# Reverse the list so the oldest of the "last N" appears first in the formatted string
|
||
# as per the example "nick1, nick2, nick3" implies chronological order of joining.
|
||
# The DB query already returns newest first, so we reverse it for display.
|
||
formatted_lines = ["Los últimos participantes en unirse (además del más reciente) han sido:"]
|
||
for p_info in reversed(participants_list): # Display oldest of this batch first
|
||
user_name = p_info.get('user_name', 'Usuario Anónimo')
|
||
numbers_str = p_info.get('numbers', '')
|
||
if numbers_str:
|
||
num_list = numbers_str.split(',')
|
||
if len(num_list) == 1:
|
||
line = f" - {escape_markdown_v2_chars_for_username(user_name)}, con la papeleta: {num_list[0]}"
|
||
else:
|
||
line = f" - {escape_markdown_v2_chars_for_username(user_name)}, con las papeletas: {', '.join(num_list)}"
|
||
formatted_lines.append(line)
|
||
|
||
return "\n".join(formatted_lines) # Add a trailing newline
|
||
|
||
def get_paypal_access_token():
|
||
old_token = get_paypal_access_token_db()
|
||
if old_token:
|
||
logger.info(f"Using cached PayPal access token")
|
||
return old_token
|
||
logger.info("Fetching new PayPal access token")
|
||
url = f"{PAYPAL_URL}/v1/oauth2/token"
|
||
headers = {"Accept": "application/json", "Accept-Language": "en_US"}
|
||
data = {"grant_type": "client_credentials"}
|
||
|
||
response = requests.post(url, headers=headers, data=data,
|
||
auth=HTTPBasicAuth(PAYPAL_CLIENT_ID, PAYPAL_SECRET))
|
||
response.raise_for_status()
|
||
store_paypal_access_token(response.json()["access_token"], response.json()["expires_in"])
|
||
return response.json()["access_token"]
|
||
|
||
def create_paypal_order(access_token, value, raffle_id, numbers):
|
||
url = f"{PAYPAL_URL}/v2/checkout/orders"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {access_token}"
|
||
}
|
||
payload = {
|
||
"intent": "CAPTURE",
|
||
"purchase_units": [
|
||
{
|
||
"amount": {"currency_code": "EUR", "value": f"{value:.2f}"},
|
||
"description": f"Donación para participar en el sorteo de HomeLabs Club (ID: {raffle_id}, Números: {numbers})",
|
||
}
|
||
],
|
||
"application_context": {
|
||
"locale": "es-ES",
|
||
"return_url": f"https://t.me/{BOT_NAME}",
|
||
"cancel_url": f"https://t.me/{BOT_NAME}"
|
||
}
|
||
}
|
||
|
||
response = requests.post(url, headers=headers, json=payload)
|
||
response.raise_for_status()
|
||
order = response.json()
|
||
|
||
# Extract the approval link
|
||
approval_url = next(link["href"] for link in order["links"] if link["rel"] == "approve")
|
||
return approval_url, order["id"]
|
||
|
||
|
||
def get_paypal_amounts_for_invoice(invoice_id):
|
||
"""Fetches the gross, net, and fee amounts for a given PayPal invoice ID."""
|
||
access_token = get_paypal_access_token()
|
||
url = f"{PAYPAL_URL}/v2/checkout/orders/{invoice_id}"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {access_token}"
|
||
}
|
||
|
||
try:
|
||
response = requests.get(url, headers=headers)
|
||
response.raise_for_status()
|
||
except requests.RequestException as e:
|
||
logger.error(f"Error fetching PayPal invoice {invoice_id}: {e}")
|
||
return 0.0, 0.0, 0.0
|
||
|
||
order = response.json()
|
||
|
||
if order["status"] != "COMPLETED":
|
||
logger.warning(f"Invoice {invoice_id} is not completed. Status: {order['status']}")
|
||
return 0.0, 0.0, 0.0
|
||
|
||
gross_amount = float(order["purchase_units"][0]["amount"]["value"])
|
||
fee_amount = 0.0
|
||
net_amount = gross_amount
|
||
|
||
# Fetch capture details to get fee information
|
||
capture_id = order["purchase_units"][0]["payments"]["captures"][0]["id"]
|
||
capture_url = f"{PAYPAL_URL}/v2/payments/captures/{capture_id}"
|
||
|
||
capture_response = requests.get(capture_url, headers=headers)
|
||
capture_response.raise_for_status()
|
||
capture_details = capture_response.json()
|
||
|
||
if "seller_receivable_breakdown" in capture_details:
|
||
breakdown = capture_details["seller_receivable_breakdown"]
|
||
fee_amount = float(breakdown.get("paypal_fee", {}).get("value", 0.0))
|
||
net_amount = float(breakdown.get("net_amount", {}).get("value", gross_amount))
|
||
|
||
return gross_amount, net_amount, fee_amount |