Files
wallamanta/wallamanta/helpers.py
2024-03-22 10:48:14 +01:00

379 lines
16 KiB
Python

import time
import random
import requests
import logging
import constants
import pytz
import walladb
import json
import string
from newrelic_telemetry_sdk import Event, EventClient, GaugeMetric, MetricClient
from PIL import Image, ImageDraw, ImageFont
from datetime import datetime, timedelta, timezone, date
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application
from telegram.constants import ParseMode
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
def telegram_escape_characters(text):
for character in constants.TELEGRAM_ESCAPE_CHARACTERS:
try:
text = text.replace(character, f'\\{character}')
except:
pass
for character in constants.TELEGRAM_REMOVE_CHARACTERS:
try:
text = text.replace(character, '')
except:
pass
return text
def is_user_admin(telegram_user_id):
return telegram_user_id in constants.ADMIN_IDS
def get_telegram_user_id(update):
#return update.message.chat_id
return update.effective_chat.id
def get_telegram_user_name(update):
return update.message.from_user
def get_date_ahead(add_days):
date_ahead = date.today() + timedelta(days=add_days)
return date_ahead
def get_spanish_date(date):
return date.strftime("%d/%m/%Y")
def is_date_expired(until):
#until_date = datetime.strptime(until, "%d/%m/%Y")
difference = until - date.today()
return difference.days < 0
def random_wait():
time.sleep(random.random())
def download_image(article, product_id):
r = requests.get(article['images'][0]['original'])
if r.status_code == 200:
with open(f"/app/data/images/products/{article['id']}_{product_id}.jpg", "wb") as image:
image.write(r.content)
def create_image(article, product_id):
download_image(article, product_id)
currency = '?'
if article['currency'] == 'EUR':
currency = ''
price = str(article['price']) + currency
wallamanta_text = "@wallamanta_bot"
width = 1280
height = 800
baseheight = int(height * 0.85)
# límite de ancho para la parte izquierda (producto)
wlimit = int(((width / 3) * 2) - 80)
# límite de ancho para los logos (homelabers y amazon)
wlogo = int(width * 0.2)
# fuente y tamaño
font = ImageFont.truetype("/app/data/fonts/Roboto-Bold.ttf", 90)
wallamanta_text_font = ImageFont.truetype("/app/data/fonts/Roboto-Bold.ttf", 40)
# inicializamos canvas
with Image.new('RGBA', (width, height), (255, 255, 255)) as image:
draw = ImageDraw.Draw(image)
# escribimos @wallamanta_bot
wtext, htext = draw.textsize(wallamanta_text, font=wallamanta_text_font)
draw.text(((width / 6) * 5 - wtext / 2, int(height - height * 0.2)), wallamanta_text, "#13C1AC", font=wallamanta_text_font)
# escribimos el precio
wtext, htext = draw.textsize(price, font=font)
draw.text(((width / 6) * 5 - wtext / 2, height / 2 - htext / 2), price, (0, 0, 0), font=font)
# dibujamos rectángulo verde externo, con un margen externo y ancho determinado
draw.rectangle([15, 15, width - 15, height - 15], width = 15, outline="#13C1AC")
# ponemos la imagen del producto en la parte izquierda y se redimensiona dependiendo de lo ancho
with Image.open(f"/app/data/images/products/{article['id']}_{product_id}.jpg") as product_image:
hpercent = (baseheight / float(product_image.size[1]))
wsize = int((float(product_image.size[0]) * float(hpercent)))
if wsize < wlimit:
resized_product_image = product_image.resize((wsize, baseheight), Image.Resampling.LANCZOS)
else:
wpercent = wlimit / float(product_image.size[0])
hsize = int((float(product_image.size[1]) * float(wpercent)))
resized_product_image = product_image.resize((wlimit, hsize), Image.Resampling.LANCZOS)
image.paste(resized_product_image, (int((width/3)-(resized_product_image.size[0]/2)), int((height/2) - (resized_product_image.size[1]/2))))
resized_product_image.close()
# guardamos la imagen con otro nombre
image.save(f"/app/data/images/products/{article['id']}_{product_id}_composed.png", quality=95)
def get_publish_date(article):
article_date = article['creation_date']
formato_fecha = "%Y-%m-%dT%H:%M:%S.%f%z"
return datetime.strptime(article_date, formato_fecha).astimezone(pytz.timezone("Europe/Madrid")).strftime("%d/%m/%Y - %H:%M:%S")
def get_modified_date(article):
article_date = article['modification_date']
formato_fecha = "%Y-%m-%dT%H:%M:%S.%f%z"
return datetime.strptime(article_date, formato_fecha).astimezone(pytz.timezone("Europe/Madrid")).strftime("%d/%m/%Y - %H:%M:%S")
def get_random_string(length):
result_str = ''.join(random.choice(string.ascii_letters) for i in range(length))
return result_str
def send_message(telegram_user_id, message):
files = {
'chat_id': (None, telegram_user_id),
'text': (None, message),
}
try:
requests.post(f'https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendMessage', files=files)
except:
time.sleep(1)
requests.post(f'https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendMessage', files=files)
def send_message_to_all(message):
for user in walladb.get_user_list:
try:
send_message(user['telegram_user_id'], message)
except Exception as e:
logging.info(f"Error sending message to {user['telegram_user_id']}: {e}")
def check_code(code):
walladb.get_code(code)
def send_article(article, product):
#application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build()
create_image(article, product['id'])
title = f"*{telegram_escape_characters(article['title'])}*"
description = f"*📝 Descripción*: {telegram_escape_characters(article['description'])}"
found_by = f"*🔍 Encontrado por la búsqueda de:* {telegram_escape_characters(product['product_name'])}"
created_at = f"*📅 Fecha de publicación:* {telegram_escape_characters(get_publish_date(article))}"
modified_at = f"*📅 Fecha de modificación:* {telegram_escape_characters(get_modified_date(article))}"
location = f"📍 *Lugar:* {telegram_escape_characters(article['location']['city'])} {telegram_escape_characters('(' + article['location']['postal_code'] + ')')} {telegram_escape_characters('- (' + article['location']['country_code'] + ')')}"
if article['shipping']['user_allows_shipping']:
user_ships = f"📦 *Envío:* ✅"
else:
user_ships = f"📦 *Envío:* ❌"
if article['currency'] == 'EUR':
currency = ''
else:
currency = '?'
price = f"*💰 Precio*: {telegram_escape_characters(str(article['price']))} {telegram_escape_characters(currency)}"
text = f"{title}\n\n{description}\n\n{found_by}\n\n{created_at}\n{modified_at}\n\n{location}\n\n{user_ships}\n\n{price}"
#url = f"https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto?chat_id={product['telegram_user_id']}&caption={text}&parse_mode=MarkdownV2"
#files = {'photo':open(f"/app/data/images/products/{article['id']}_composed.png", 'rb')}
#keyboard = [[InlineKeyboardButton("Ir al anuncio", url=f"https://es.wallapop.com/item/{article['web_slug']}")]]
#InlineKeyboardButton("Listar productos", callback_data="list")]
#markup = InlineKeyboardMarkup(keyboard)
keyboard = {'inline_keyboard':[[{'text':'Ir al anuncio','url':f'https://es.wallapop.com/item/{article["web_slug"]}'}]]}
with open(f"/app/data/images/products/{article['id']}_{product['id']}_composed.png", 'rb') as image:
files = {
'chat_id': (None, product['telegram_user_id']),
'photo': image,
'caption': (None, text),
'parse_mode': (None, ParseMode.MARKDOWN_V2),
'reply_markup': (None, json.dumps(keyboard)),
}
response = requests.post(f'https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto', files=files)
tries = 0
while response.status_code != 200:
logging.info(f"Error sending to Telegram, probably flood restricted. {product['product_name']} ({product['id']}) - ({walladb.get_user(product['telegram_user_id'])}) - {response.status_code}")
random_wait()
response = requests.post(f'https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto', files=files)
tries = tries + 1
if tries > 5:
logging.info(f"Gave up after 5 retries. Error sending to Telegram, probably flood restricted. {product['product_name']} ({product['id']}) - ({walladb.get_user(product['telegram_user_id'])}) - {response.status_code}")
break
#response = await application.bot.send_photo(chat_id=product['telegram_user_id'], photo=open(f"/app/data/images/products/{article['id']}_composed.png", 'rb'), caption=text, parse_mode=ParseMode.MARKDOWN_V2, reply_markup=markup)
#logging.info(requests.post(url, files=files).content)
send_to_nr(article, product)
#logging.info(response.content)
logging.info(f"'{title}' (https://es.wallapop.com/item/{article['web_slug']}) found by {product['product_name']} ({product['id']}) - ({walladb.get_user(product['telegram_user_id'])})")
def get_category_name(category):
category = int(category)
return constants.CATEGORIES[category][0]
def get_subcategory_name(subcategory):
subcategory = int(subcategory)
subcategory_name = ''
for category in constants.CATEGORIES:
if has_subcategory(category):
if constants.CATEGORIES[category][1].get(subcategory):
subcategory_name = constants.CATEGORIES[category][1].get(subcategory)
return subcategory_name
def has_subcategory(category):
category = int(category)
return len(constants.CATEGORIES[category]) > 1
def is_subcategory(category, subcategory):
category = int(category)
subcategory = int(subcategory)
if has_subcategory(category):
return subcategory in constants.CATEGORIES[category][1]
else:
return False
def create_categories_keyboard(categories):
categories = categories.split(',')
count = 0
category_line = []
keyboard = []
for category in categories:
if has_subcategory(category):
category_line.append(InlineKeyboardButton(constants.CATEGORIES[int(category)][0], callback_data=f"{int(category)}"))
count = count + 1
if count % 3 == 0:
keyboard.append(category_line)
category_line = []
if count % 3 != 0:
keyboard.append(category_line)
keyboard.append([InlineKeyboardButton("Volver", callback_data="finish")])
return keyboard
def create_subcategory_keyboard(category):
category = int(category)
count = 0
subcategory_line = []
keyboard = []
for subcategory in constants.CATEGORIES[category][1]:
subcategory_line.append(InlineKeyboardButton(constants.CATEGORIES[category][1][subcategory], callback_data=f"{subcategory}"))
count = count + 1
if count % 3 == 0:
keyboard.append(subcategory_line)
subcategory_line = []
if count % 3 != 0:
keyboard.append(subcategory_line)
keyboard.append([InlineKeyboardButton("Volver", callback_data="finish")])
return keyboard
def create_category_keyboard():
count = 0
category_line = []
keyboard = []
for category in constants.CATEGORIES:
category_line.append(InlineKeyboardButton(constants.CATEGORIES[category][0], callback_data=f"{category}"))
count = count + 1
if count % 3 == 0:
keyboard.append(category_line)
category_line = []
if count % 3 != 0:
keyboard.append(category_line)
return keyboard
def create_products_keyboard(telegram_user_id):
count = 0
product_line = []
keyboard = []
for product in walladb.get_products_from_user(telegram_user_id):
product_line.append(InlineKeyboardButton(product['product_name'], callback_data=f"{product['product_name']}"))
count = count + 1
if count % 3 == 0:
keyboard.append(product_line)
product_line = []
if count % 3 != 0:
keyboard.append(product_line)
return keyboard
def create_continue_keyboard():
keyboard = [
[
InlineKeyboardButton("Finalizar", callback_data='add'),
InlineKeyboardButton("Descartar", callback_data='remove')
],
[
InlineKeyboardButton("Añadir categoría", callback_data='add_category'),
InlineKeyboardButton("Añadir subcategoría", callback_data='add_subcategory')
],[
InlineKeyboardButton("Exclusión de título", callback_data='title_exclude'),
InlineKeyboardButton("Exclusión de descripción", callback_data='title_description_exclude')
],[
InlineKeyboardButton("Coordenadas", callback_data='coords'),
InlineKeyboardButton("Distancia", callback_data='distance')
],
]
return keyboard
def generate_categories_string(categories, subcategories):
categories_string = ""
if '0' in categories.split(','):
categories_string = "todas"
elif len(categories) > 1:
for category in categories.split(','):
categories_string = f"{categories_string}{get_category_name(category)}"
if has_subcategory(category):
if len(subcategories) > 1:
subcategories_temp = ""
for subcategory in subcategories.split(','):
if is_subcategory(category, subcategory):
subcategories_temp = f"{subcategories_temp}{get_subcategory_name(subcategory)}, "
if subcategories_temp != "":
categories_string = f"{categories_string} ({subcategories_temp[:-2]})"
categories_string = f"{categories_string}, "
categories_string = categories_string[:-2]
else:
categories_string = "todas"
return categories_string
def get_thread(product_name):
global SEARCH_THREADS_LIST
for product_thread in SEARCH_THREADS_LIST:
if product_name == product_thread[0]:
return product_thread[1]
return None
def send_to_nr(article, product):
event = Event(
"ProductFound", {
"article_name": article['title'],
"article_price": article['price'],
"article_web_slug": article['web_slug'],
"product_name": product['product_name'],
"telegram_user_id": product['telegram_user_id'],
"telegram_name": walladb.get_user(product['telegram_user_id']),
"category_id": article['category_id'],
"seller_id": article['seller_id'],
"environment": constants.NR_ENV
}
)
event_client = EventClient(insert_key=constants.NEW_RELIC_INSERT_KEY, host=constants.NR_HOST_INSIGHTS)
try:
logging.info(f"=== Sending product {article['title']} info to NR ===")
response = event_client.send(event, timeout=30)
response.raise_for_status()
logging.info(f"=== Sent product {article['title']} info to NR ===")
except Exception as e:
logging.error(f"Error sending to NR: {e}")
def send_statistics_to_nr(query_time, products_found, number_of_searches):
metric_client = MetricClient(insert_key=constants.NEW_RELIC_INSERT_KEY, host=constants.NR_HOST_METRICS)
query_time = GaugeMetric("query_time", query_time, {"units": "Seconds"})
products_found = GaugeMetric("products_found", products_found)
number_of_searches = GaugeMetric("active_searches", number_of_searches)
batch = [query_time, products_found, number_of_searches]
try:
logging.info("=== Sending stats to NR ===")
response = metric_client.send_batch(batch, timeout=30)
response.raise_for_status()
logging.info("=== Stats sent to NR ===")
except Exception as e:
logging.error(f"Error sending to NR: {e}")
def is_valid_request(product):
is_valid = False
if walladb.get_product(product):
if not walladb.is_user_expired(product['telegram_user_id']):
if walladb.is_user_active(product['telegram_user_id']):
if walladb.is_user_premium(product['telegram_user_id']) or \
walladb.is_user_testing(product['telegram_user_id']):
is_valid = True
return is_valid