Files
telerifas/app/helpers.py
2025-09-17 17:57:44 +02:00

485 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import requests
from database import * # Import all DB functions
from config import * # Import constants if needed (like BOT_TOKEN for direct API calls, although better passed)
from PIL import Image, ImageDraw, ImageFont
import os
from requests.auth import HTTPBasicAuth
logger = logging.getLogger(__name__)
def format_raffle_details(raffle_id):
"""Fetches and formats raffle details for display, including multi-channel prices."""
raffle = get_raffle(raffle_id) # Fetches basic info from 'raffles' table
if not raffle:
return "Error: No se encontró el sorteo."
details = (
f" <b>Detalles del sorteo</b> \n\n"
f"<b>ID:</b> <code>{raffle['id']}</code>\n"
f"<b>Nombre:</b> {raffle['name']}\n"
f"<b>Descripción:</b>\n{raffle['description'][:100]}...\n\n"
f"<b>Envío internacional:</b> {'' if raffle['international_shipping'] else 'No'}\n"
f"<b>Activo:</b> {'' if raffle['active'] else 'No (Terminado)'}\n"
f"<b>Donación mínima (canal principal):</b> {raffle['price']}\n"
)
# Image ID (optional display)
if raffle['image_file_id']:
details += f"<b>ID Imagen:</b> {raffle['image_file_id']} (Presente)\n"
else:
details += f"<b>ID Imagen:</b> (No asignada)\n"
# Add participant count and remaining numbers
participants = get_participants(raffle_id) # Fetches list of Rows
completed_participants_count = 0
# pending_participants_count = 0 # If you want to show pending
if participants: # Check if participants list is not None or empty
completed_participants_count = sum(1 for p in participants if p['step'] == 'completed')
# pending_participants_count = sum(1 for p in participants if p['step'] == 'waiting_for_payment')
details += f"\n<b>Participantes Confirmados:</b> {completed_participants_count}\n"
# details += f"<b>Reservas Pendientes:</b> {pending_participants_count}\n"
remaining_count = get_remaining_numbers_amount(raffle_id)
details += f"<b>Números Disponibles:</b> {remaining_count if remaining_count >= 0 else 'Error al calcular'}\n"
# Gross and net amounts
total_gross = 0.0
total_fees = 0.0
total_net = 0.0
invoice_ids = get_all_invoice_ids(raffle_id)
for inv_id in invoice_ids:
gross, net, fees = get_paypal_amounts_for_invoice(inv_id)
total_gross += gross
total_fees += fees
total_net += net
details += f"\n<b>Total Recaudado (bruto):</b> {total_gross:.2f}\n"
details += f"<b>Total Gastos (comisiones):</b> {total_fees:.2f}\n"
details += f"<b>Total Beneficio (neto):</b> {total_net:.2f}\n"
return details
def get_winners(raffle_id, winner_numbers_int):
"""Finds winners based on chosen numbers."""
participants = get_participants(raffle_id) # Gets all participants for the raffle
winners = {} # { user_name: [list_of_winning_numbers_they_had] }
if not participants:
return "" # No participants, no winners
winner_numbers_set = set(winner_numbers_int)
for participant in participants:
# Only consider completed participations as potential winners
if participant['step'] != 'completed' or not participant['numbers']:
continue
user_id = participant['user_id']
user_name = participant['user_name'] or f"User_{user_id}" # Fallback name
numbers_str = participant['numbers']
try:
participant_numbers_set = {int(n) for n in numbers_str.split(',') if n.isdigit()}
except ValueError:
logger.warning(f"Invalid number format for participant {user_id} in raffle {raffle_id}: {numbers_str}")
continue # Skip participant with bad data
# Find the intersection
won_numbers = winner_numbers_set.intersection(participant_numbers_set)
if won_numbers:
# Store the winning numbers (as strings, sorted) for this user
won_numbers_str_sorted = sorted([f"{n:02}" for n in won_numbers])
if user_name not in winners:
winners[user_name] = []
winners[user_name].extend(won_numbers_str_sorted) # Add potentially multiple matches
if not winners:
return "No hubo ganadores con esos números."
# Format the output string
winners_message_parts = []
for user_name, numbers in winners.items():
# Ensure numbers are unique in the final output per user
unique_numbers_str = ", ".join(sorted(list(set(numbers))))
winners_message_parts.append(f"- @{user_name} acertó: <b>{unique_numbers_str}</b>")
return "\n".join(winners_message_parts)
# def generate_table_image(raffle_id):
# """Generates the 10x10 grid image showing number status."""
# # Define image parameters
# cols, rows = 10, 10
# cell_width, cell_height = 120, 50
# title_height_space = 70
# image_width = cols * cell_width
# image_height = rows * cell_height + title_height_space
# background_color = "white"
# line_color = "black"
# font_size = 16
# title_font_size = 24
# # Create image
# img = Image.new("RGB", (image_width, image_height), background_color)
# draw = ImageDraw.Draw(img)
# # Load fonts (handle potential errors)
# try:
# # Ensure the font file exists or provide a fallback path
# font_path = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf" # Example Linux path
# if not os.path.exists(font_path):
# font_path = "arial.ttf" # Try common Windows font
# font = ImageFont.truetype(font_path, font_size)
# title_font = ImageFont.truetype(font_path, title_font_size)
# except IOError:
# logger.warning("Specific font not found, using default PIL font.")
# font = ImageFont.load_default()
# # Adjust size for default font if needed, default doesn't take size arg directly
# # title_font = ImageFont.truetype(font_path, title_font_size) # Need a default large font method
# title_font = ImageFont.load_default() # Revert to default for title too for simplicity
# # Draw Title
# raffle_details = get_raffle(raffle_id)
# if not raffle_details:
# logger.error(f"Cannot generate image: Raffle {raffle_id} not found.")
# # Draw error message on image
# draw.text((10, 10), f"Error: Sorteo {raffle_id} no encontrado", fill="red", font=title_font)
# img.save(f"/app/data/raffles/raffle_table_{raffle_id}_error.png")
# return False # Indicate failure
# raffle_name = raffle_details['name']
# title_text = f"Sorteo: {raffle_name}"
# # Calculate text bounding box for centering
# try:
# # Use textbbox for more accurate centering
# title_bbox = draw.textbbox((0, 0), title_text, font=title_font)
# title_width = title_bbox[2] - title_bbox[0]
# # title_height = title_bbox[3] - title_bbox[1] # Not needed for x centering
# title_x = (image_width - title_width) / 2
# title_y = 10 # Padding from top
# draw.text((title_x, title_y), title_text, fill=line_color, font=title_font)
# except AttributeError: # Handle older PIL versions that might not have textbbox
# # Fallback using textlength (less accurate)
# title_width = draw.textlength(title_text, font=title_font)
# title_x = (image_width - title_width) / 2
# title_y = 10
# draw.text((title_x, title_y), title_text, fill=line_color, font=title_font)
# # Get participant data
# participants = get_participants(raffle_id)
# number_status = {} # { num_int: (user_name, status_color) }
# if participants:
# for p in participants:
# if not p['numbers'] or p['step'] not in ['waiting_for_payment', 'completed']:
# continue
# user_name = p['user_name'] or f"User_{p['user_id']}" # Fallback name
# status_color = "red" if p['step'] == 'waiting_for_payment' else "black" # Red=Reserved, Black=Completed
# try:
# nums = {int(n) for n in p['numbers'].split(',') if n.isdigit()}
# for num in nums:
# if 0 <= num <= 99:
# number_status[num] = (user_name, status_color)
# except ValueError:
# logger.warning(f"Skipping invalid numbers '{p['numbers']}' for user {p['user_id']} in image generation.")
# continue
# # Draw Grid and Fill Numbers
# for i in range(rows):
# for j in range(cols):
# num = i * cols + j
# x1 = j * cell_width
# y1 = i * cell_height + title_height_space
# x2 = x1 + cell_width
# y2 = y1 + cell_height
# # Draw cell rectangle
# draw.rectangle([x1, y1, x2, y2], outline=line_color)
# # Prepare text and color
# number_text = f"{num:02}"
# text_fill = "blue" # Default color for free numbers
# owner_text = ""
# if num in number_status:
# owner, status_color = number_status[num]
# text_fill = status_color
# # Truncate long usernames
# max_name_len = 12
# owner_text = owner[:max_name_len] + ('…' if len(owner) > max_name_len else '')
# # Position text within the cell
# text_x = x1 + 10 # Padding from left
# text_y_num = y1 + 5 # Padding for number line
# text_y_owner = y1 + 5 + font_size + 2 # Padding for owner line (below number)
# draw.text((text_x, text_y_num), number_text, fill=text_fill, font=font)
# if owner_text:
# draw.text((text_x, text_y_owner), owner_text, fill=text_fill, font=font)
# # Ensure data directory exists
# os.makedirs("/app/data/raffles", exist_ok=True)
# # Save the image
# image_path = f"/app/data/raffles/raffle_table_{raffle_id}.png"
# try:
# img.save(image_path)
# logger.info(f"Generated raffle table image: {image_path}")
# return True # Indicate success
# except Exception as e:
# logger.error(f"Failed to save raffle table image {image_path}: {e}")
# return False # Indicate failure
def generate_table_image(raffle_id):
"""Generates a fancier 10x10 raffle grid image with participant names and legend."""
# Parameters
cols, rows = 10, 10
cell_width, cell_height = 120, 60
title_height_space = 90
title_bottom_padding = 30 # extra space between title and grid
legend_height_space = 80
margin_x = 40 # left/right margin
image_width = cols * cell_width + margin_x * 2
image_height = rows * cell_height + title_height_space + title_bottom_padding + legend_height_space
background_color = "#fdfdfd"
grid_line_color = "#666666"
free_bg_color = "#e8f0ff"
reserved_bg_color = "#ffe8e8"
taken_bg_color = "#e8ffe8"
font_size = 16
title_font_size = 28
legend_font_size = 18
# Create base image
img = Image.new("RGB", (image_width, image_height), background_color)
draw = ImageDraw.Draw(img)
# Load fonts
try:
font_path = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
if not os.path.exists(font_path):
font_path = "arial.ttf"
font = ImageFont.truetype(font_path, font_size)
title_font = ImageFont.truetype(font_path, title_font_size)
legend_font = ImageFont.truetype(font_path, legend_font_size)
except IOError:
font = ImageFont.load_default()
title_font = ImageFont.load_default()
legend_font = ImageFont.load_default()
# --- Title Bar ---
raffle_details = get_raffle(raffle_id)
if not raffle_details:
draw.text((10, 10), f"Error: Sorteo {raffle_id} no encontrado", fill="red", font=title_font)
img.save(f"/app/data/raffles/raffle_table_{raffle_id}_error.png")
return False
raffle_name = raffle_details['name']
title_text = f"Sorteo: {raffle_name}"
# Draw title bar (full width)
title_bar_color = "#4a90e2"
draw.rectangle([0, 0, image_width, title_height_space], fill=title_bar_color)
# Centered title text
title_bbox = draw.textbbox((0, 0), title_text, font=title_font)
title_width = title_bbox[2] - title_bbox[0]
title_height = title_bbox[3] - title_bbox[1]
title_x = (image_width - title_width) / 2
title_y = (title_height_space - title_height) / 2
draw.text((title_x, title_y), title_text, fill="white", font=title_font)
# --- Participants ---
participants = get_participants(raffle_id)
number_status = {}
if participants:
for p in participants:
if not p['numbers'] or p['step'] not in ['waiting_for_payment', 'completed']:
continue
user_name = p['user_name'] or f"User_{p['user_id']}"
status = p['step']
nums = {int(n) for n in p['numbers'].split(',') if n.isdigit()}
for num in nums:
if 0 <= num <= 99:
number_status[num] = (user_name, status)
# --- Grid ---
grid_top = title_height_space + title_bottom_padding
for i in range(rows):
for j in range(cols):
num = i * cols + j
x1 = margin_x + j * cell_width
y1 = grid_top + i * cell_height
x2 = x1 + cell_width
y2 = y1 + cell_height
# Background color
if num in number_status:
owner, status = number_status[num]
bg_color = reserved_bg_color if status == "waiting_for_payment" else taken_bg_color
text_color = "#000000"
else:
owner, bg_color, text_color = "", free_bg_color, "#1a4db3"
# Rounded rectangle cell
radius = 12
draw.rounded_rectangle([x1+1, y1+1, x2-1, y2-1], radius, outline=grid_line_color, fill=bg_color)
# Draw number
number_text = f"{num:02}"
draw.text((x1+10, y1+8), number_text, fill=text_color, font=font)
# Draw owner
if owner:
max_name_len = 12
owner_text = owner[:max_name_len] + ('' if len(owner) > max_name_len else '')
draw.text((x1+10, y1+8+font_size+4), owner_text, fill=text_color, font=font)
# --- Legend ---
legend_y = image_height - legend_height_space + 20
legend_items = [
("Libre", free_bg_color),
("Reservado", reserved_bg_color),
("Asignado", taken_bg_color)
]
spacing = 280 # more spacing between legend items
start_x = (image_width - (spacing * (len(legend_items)-1) + 140)) / 2
for i, (label, color) in enumerate(legend_items):
box_x = start_x + i * spacing
box_y = legend_y
box_w, box_h = 34, 34
# Color box
draw.rounded_rectangle([box_x, box_y, box_x+box_w, box_y+box_h], 6, fill=color, outline=grid_line_color)
# Label text
draw.text((box_x + box_w + 14, box_y + 6), label, fill="black", font=legend_font)
# Save
os.makedirs("/app/data/raffles", exist_ok=True)
image_path = f"/app/data/raffles/raffle_table_{raffle_id}.png"
img.save(image_path)
return True
def format_last_participants_list(participants_list: list) -> str:
"""
Formats the list of last participants for the announcement message.
participants_list is a list of dicts: [{'user_name', 'numbers'}]
"""
if not participants_list:
return "" # Return empty string if no other recent participants
# Reverse the list so the oldest of the "last N" appears first in the formatted string
# as per the example "nick1, nick2, nick3" implies chronological order of joining.
# The DB query already returns newest first, so we reverse it for display.
formatted_lines = ["Los últimos participantes en unirse (además del más reciente) han sido:"]
for p_info in reversed(participants_list): # Display oldest of this batch first
user_name = p_info.get('user_name', 'Usuario Anónimo')
numbers_str = p_info.get('numbers', '')
if numbers_str:
num_list = numbers_str.split(',')
if len(num_list) == 1:
line = f" - {user_name}, con la papeleta: {num_list[0]}"
else:
line = f" - {user_name}, con las papeletas: {', '.join(num_list)}"
formatted_lines.append(line)
return "\n".join(formatted_lines) # Add a trailing newline
def get_paypal_access_token():
old_token = get_paypal_access_token_db()
if old_token:
logger.info(f"Using cached PayPal access token")
return old_token
logger.info("Fetching new PayPal access token")
url = f"{PAYPAL_URL}/v1/oauth2/token"
headers = {"Accept": "application/json", "Accept-Language": "en_US"}
data = {"grant_type": "client_credentials"}
response = requests.post(url, headers=headers, data=data,
auth=HTTPBasicAuth(PAYPAL_CLIENT_ID, PAYPAL_SECRET))
response.raise_for_status()
store_paypal_access_token(response.json()["access_token"], response.json()["expires_in"])
return response.json()["access_token"]
def create_paypal_order(access_token, value, raffle_id, numbers):
url = f"{PAYPAL_URL}/v2/checkout/orders"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
}
payload = {
"intent": "CAPTURE",
"purchase_units": [
{
"amount": {"currency_code": "EUR", "value": f"{value:.2f}"},
"description": f"Donación para participar en el sorteo de HomeLabs Club (ID: {raffle_id}, Números: {numbers})",
}
],
"application_context": {
"locale": "es-ES",
"return_url": f"https://t.me/{BOT_NAME}",
"cancel_url": f"https://t.me/{BOT_NAME}"
}
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
order = response.json()
# Extract the approval link
approval_url = next(link["href"] for link in order["links"] if link["rel"] == "approve")
return approval_url, order["id"]
def get_paypal_amounts_for_invoice(invoice_id):
"""Fetches the gross, net, and fee amounts for a given PayPal invoice ID."""
access_token = get_paypal_access_token()
url = f"{PAYPAL_URL}/v2/checkout/orders/{invoice_id}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Error fetching PayPal invoice {invoice_id}: {e}")
return 0.0, 0.0, 0.0
order = response.json()
if order["status"] != "COMPLETED":
logger.warning(f"Invoice {invoice_id} is not completed. Status: {order['status']}")
return 0.0, 0.0, 0.0
gross_amount = float(order["purchase_units"][0]["amount"]["value"])
fee_amount = 0.0
net_amount = gross_amount
# Fetch capture details to get fee information
capture_id = order["purchase_units"][0]["payments"]["captures"][0]["id"]
capture_url = f"{PAYPAL_URL}/v2/payments/captures/{capture_id}"
capture_response = requests.get(capture_url, headers=headers)
capture_response.raise_for_status()
capture_details = capture_response.json()
if "seller_receivable_breakdown" in capture_details:
breakdown = capture_details["seller_receivable_breakdown"]
fee_amount = float(breakdown.get("paypal_fee", {}).get("value", 0.0))
net_amount = float(breakdown.get("net_amount", {}).get("value", gross_amount))
return gross_amount, net_amount, fee_amount