Files
wallamanta/wallamanta/helpers.py
2023-07-04 22:10:00 +02:00

325 lines
14 KiB
Python

import time
import random
import requests
import logging
import constants
import pytz
import walladb
from newrelic_telemetry_sdk import Event, EventClient
from PIL import Image, ImageDraw, ImageFont
from datetime import datetime, timedelta, timezone, date
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application
from telegram.constants import ParseMode
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
def telegram_escape_characters(text):
for character in constants.TELEGRAM_ESCAPE_CHARACTERS:
try:
text = text.replace(character, f'\\{character}')
except:
pass
for character in constants.TELEGRAM_REMOVE_CHARACTERS:
try:
text = text.replace(character, '')
except:
pass
return text
def is_user_admin(telegram_user_id):
return telegram_user_id in constants.ADMIN_IDS
def get_telegram_user_id(update):
#return update.message.chat_id
return update.effective_chat.id
def get_telegram_user_name(update):
return update.message.from_user
def get_date_ahead(add_days):
date_ahead = date.today() + timedelta(days=add_days)
return date_ahead
def get_spanish_date(date):
return date.strftime("%d/%m/%Y")
def is_date_expired(until):
#until_date = datetime.strptime(until, "%d/%m/%Y")
difference = until - date.today()
return difference.days < 0
def random_wait():
time.sleep(random.random())
def download_image(article):
r = requests.get(article['images'][0]['original'])
if r.status_code == 200:
image = open(f"/app/data/images/products/{article['id']}.jpg", "wb")
image.write(r.content)
image.close()
def create_image(article):
download_image(article)
currency = '?'
if article['currency'] == 'EUR':
currency = ''
price = str(article['price']) + currency
wallamanta_text = "@wallamanta_bot"
width = 1280
height = 800
baseheight = int(height * 0.85)
# límite de ancho para la parte izquierda (producto)
wlimit = int(((width / 3) * 2) - 80)
# límite de ancho para los logos (homelabers y amazon)
wlogo = int(width * 0.2)
# fuente y tamaño
font = ImageFont.truetype("/app/data/fonts/Roboto-Bold.ttf", 90)
wallamanta_text_font = ImageFont.truetype("/app/data/fonts/Roboto-Bold.ttf", 40)
# inicializamos canvas
image = Image.new('RGBA', (width, height), (255, 255, 255))
# logo homelabers, redimensionamos y ponemos en la parte derecha arriba
#logo_image = Image.open("/app/data/images/logo.png").convert("RGBA")
#hlogo = int((float(logo_image.size[1]) * float(lpercent)))
#lpercent = wlogo / float(logo_image.size[0])
#logo_image = logo_image.resize((wlogo, hlogo), Image.Resampling.LANCZOS)
#image.paste(logo_image, (int((width / 6) * 5 - logo_image.size[0] / 2), int(height * 0.1)), logo_image)
# logo wallamanta, redimensionamos y ponemos en la parte derecha abajo
#wallamanta_logo = Image.open("/app/data/images/wallamanta_logo.png").convert("RGBA")
#lpercent = wlogo / float(wallamanta_logo.size[0])
#hlogo = int((float(wallamanta_logo.size[1]) * float(lpercent)))
#wallamanta_logo = wallamanta_logo.resize((wlogo, hlogo), Image.Resampling.LANCZOS)
#image.paste(wallamanta_logo, (int((width / 6) * 5 - wallamanta_logo.size[0] / 2), int(height - height * 0.2)), wallamanta_logo)
draw = ImageDraw.Draw(image)
# escribimos @wallamanta_bot
wtext, htext = draw.textsize(wallamanta_text, font=wallamanta_text_font)
draw.text(((width / 6) * 5 - wtext / 2, int(height - height * 0.2)), wallamanta_text, "#13C1AC", font=wallamanta_text_font)
# escribimos el precio
wtext, htext = draw.textsize(price, font=font)
draw.text(((width / 6) * 5 - wtext / 2, height / 2 - htext / 2), price, (0, 0, 0), font=font)
# dibujamos rectángulo verde externo, con un margen externo y ancho determinado
draw.rectangle([15, 15, width - 15, height - 15], width = 15, outline="#13C1AC")
# ponemos la imagen del producto en la parte izquierda y se redimensiona dependiendo de lo ancho
product_image = Image.open(f"/app/data/images/products/{article['id']}.jpg")
hpercent = (baseheight / float(product_image.size[1]))
wsize = int((float(product_image.size[0]) * float(hpercent)))
if wsize < wlimit:
product_image = product_image.resize((wsize, baseheight), Image.Resampling.LANCZOS)
else:
wpercent = wlimit / float(product_image.size[0])
hsize = int((float(product_image.size[1]) * float(wpercent)))
product_image = product_image.resize((wlimit, hsize), Image.Resampling.LANCZOS)
image.paste(product_image, (int((width/3)-(product_image.size[0]/2)), int((height/2) - (product_image.size[1]/2))))
# guardamos la imagen con otro nombre
image.save(f"/app/data/images/products/{article['id']}_composed.png", quality=95)
def get_publish_date(article):
article_date = article['creation_date']
return datetime.fromtimestamp(int(int(article_date)/1000)).astimezone(pytz.timezone("Europe/Madrid")).strftime("%d/%m/%Y - %H:%M:%S")
def get_modified_date(article):
article_date = article['modification_date']
return datetime.fromtimestamp(int(int(article_date)/1000)).astimezone(pytz.timezone("Europe/Madrid")).strftime("%d/%m/%Y - %H:%M:%S")
async def send_message(telegram_user_id, message):
application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build()
await application.bot.send_message(chat_id=telegram_user_id, text=message)
async def send_article(article, product):
application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build()
create_image(article)
title = f"*{telegram_escape_characters(article['title'])}*"
description = f"*📝 Descripción*: {telegram_escape_characters(article['description'])}"
found_by = f"*🔍 Encontrado por la búsqueda de:* {telegram_escape_characters(product['product_name'])}"
created_at = f"*📅 Fecha de publicación:* {telegram_escape_characters(get_publish_date(article))}"
modified_at = f"*📅 Fecha de modificación:* {telegram_escape_characters(get_modified_date(article))}"
location = f"📍 *Lugar:* {telegram_escape_characters(article['location']['city'])} {telegram_escape_characters('(' + article['location']['postal_code'] + ')')} {telegram_escape_characters('- (' + article['location']['country_code'] + ')')}"
if article['shipping']['user_allows_shipping']:
user_ships = f"📦 *Envío:* ✅"
else:
user_ships = f"📦 *Envío:* ❌"
if article['currency'] == 'EUR':
currency = ''
else:
currency = '?'
price = f"*💰 Precio*: {telegram_escape_characters(str(article['price']))} {telegram_escape_characters(currency)}"
text = f"{title}\n\n{description}\n\n{found_by}\n\n{created_at}\n{modified_at}\n\n{location}\n\n{user_ships}\n\n{price}"
#url = f"https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto?chat_id={product['telegram_user_id']}&caption={text}&parse_mode=MarkdownV2"
#files = {'photo':open(f"/app/data/images/products/{article['id']}_composed.png", 'rb')}
keyboard = [[InlineKeyboardButton("Ir al anuncio", url=f"https://es.wallapop.com/item/{article['web_slug']}")]]
#InlineKeyboardButton("Listar productos", callback_data="list")]
markup = InlineKeyboardMarkup(keyboard)
response = await application.bot.send_photo(chat_id=product['telegram_user_id'], photo=open(f"/app/data/images/products/{article['id']}_composed.png", 'rb'), caption=text, parse_mode=ParseMode.MARKDOWN_V2, reply_markup=markup)
#logging.info(requests.post(url, files=files).content)
send_to_nr(article, product)
logging.info(response)
def get_category_name(category):
category = int(category)
return constants.CATEGORIES[category][0]
def get_subcategory_name(subcategory):
subcategory = int(subcategory)
subcategory_name = ''
for category in constants.CATEGORIES:
if has_subcategory(category):
if constants.CATEGORIES[category][1].get(subcategory):
subcategory_name = constants.CATEGORIES[category][1].get(subcategory)
return subcategory_name
def has_subcategory(category):
category = int(category)
return len(constants.CATEGORIES[category]) > 1
def is_subcategory(category, subcategory):
category = int(category)
subcategory = int(subcategory)
if has_subcategory(category):
return subcategory in constants.CATEGORIES[category][1]
else:
return False
def create_categories_keyboard(categories):
categories = categories.split(',')
count = 0
category_line = []
keyboard = []
for category in categories:
if has_subcategory(category):
category_line.append(InlineKeyboardButton(constants.CATEGORIES[int(category)][0], callback_data=f"{int(category)}"))
count = count + 1
if count % 3 == 0:
keyboard.append(category_line)
category_line = []
if count % 3 != 0:
keyboard.append(category_line)
keyboard.append([InlineKeyboardButton("Volver", callback_data="finish")])
return keyboard
def create_subcategory_keyboard(category):
category = int(category)
count = 0
subcategory_line = []
keyboard = []
for subcategory in constants.CATEGORIES[category][1]:
subcategory_line.append(InlineKeyboardButton(constants.CATEGORIES[category][1][subcategory], callback_data=f"{subcategory}"))
count = count + 1
if count % 3 == 0:
keyboard.append(subcategory_line)
subcategory_line = []
if count % 3 != 0:
keyboard.append(subcategory_line)
keyboard.append([InlineKeyboardButton("Volver", callback_data="finish")])
return keyboard
def create_category_keyboard():
count = 0
category_line = []
keyboard = []
for category in constants.CATEGORIES:
category_line.append(InlineKeyboardButton(constants.CATEGORIES[category][0], callback_data=f"{category}"))
count = count + 1
if count % 3 == 0:
keyboard.append(category_line)
category_line = []
if count % 3 != 0:
keyboard.append(category_line)
return keyboard
def create_products_keyboard(telegram_user_id):
count = 0
product_line = []
keyboard = []
for product in walladb.get_products_from_user(telegram_user_id):
product_line.append(InlineKeyboardButton(product['product_name'], callback_data=f"{product['product_name']}"))
count = count + 1
if count % 3 == 0:
keyboard.append(product_line)
product_line = []
if count % 3 != 0:
keyboard.append(product_line)
return keyboard
def create_continue_keyboard():
keyboard = [
[
InlineKeyboardButton("Finalizar", callback_data='add'),
InlineKeyboardButton("Descartar", callback_data='remove')
],
[
InlineKeyboardButton("Añadir categoría", callback_data='add_category'),
InlineKeyboardButton("Añadir subcategoría", callback_data='add_subcategory')
],[
InlineKeyboardButton("Exclusión de título", callback_data='title_exclude'),
InlineKeyboardButton("Exclusión de descripción", callback_data='title_description_exclude')
],[
InlineKeyboardButton("Coordenadas", callback_data='coords'),
InlineKeyboardButton("Distancia", callback_data='distance')
],
]
return keyboard
def generate_categories_string(categories, subcategories):
categories_string = ""
if '0' in categories.split(','):
categories_string = "todas"
elif len(categories) > 1:
for category in categories.split(','):
categories_string = f"{categories_string}{get_category_name(category)}"
if has_subcategory(category):
if len(subcategories) > 1:
subcategories_temp = ""
for subcategory in subcategories.split(','):
if is_subcategory(category, subcategory):
subcategories_temp = f"{subcategories_temp}{get_subcategory_name(subcategory)}, "
if subcategories_temp != "":
categories_string = f"{categories_string} ({subcategories_temp[:-2]})"
categories_string = f"{categories_string}, "
categories_string = categories_string[:-2]
else:
categories_string = "todas"
return categories_string
def get_thread(product_name):
global SEARCH_THREADS_LIST
for product_thread in SEARCH_THREADS_LIST:
if product_name == product_thread[0]:
return product_thread[1]
return None
def send_to_nr(article, product):
event = Event(
"ProductFound", {
"article_name": article['title'],
"article_price": article['price'],
"article_web_slug": article['web_slug'],
"product_name": product['product_name'],
"telegram_user_id": product['telegram_user_id'],
"telegram_name": walladb.get_user(product['telegram_user_id']),
"category_id": article['category_id'],
"seller_id": article['seller_id'],
"environment": constants.NR_ENV
}
)
event_client = EventClient(insert_key=constants.NEW_RELIC_INSERT_KEY, host="insights-collector.eu01.nr-data.net")
try:
response = event_client.send(event)
except Exception as e:
logging.error(f"Error sending to NR: {e}")
response.raise_for_status()
def is_valid_request(product):
is_valid = False
if walladb.get_product(product):
if not walladb.is_user_expired(product['telegram_user_id']):
if walladb.is_user_valid(product['telegram_user_id']):
if walladb.is_user_premium(product['telegram_user_id']) or \
walladb.is_user_testing(product['telegram_user_id']):
is_valid = True
return is_valid