Files
wallamanta/wallamanta/wallamanta.py

218 lines
12 KiB
Python

import json
import os
import threading
import logging
import prettytable
import helpers
import walladb
import constants
from worker import Worker
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
def parse_json_file():
f = open("/app/data/products.json")
return json.load(f)
def save_json_file(products):
with open('/app/data/products.json', 'w') as outfile:
json.dump(products, outfile, indent=2)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if walladb.is_user_valid(helpers.get_telegram_user_id(update)):
message = """Añade un producto con `/add producto;precio_mínimo;precio_máximo,categoría,excluir_título(opcional, separado por comas);
excluir_descripción_y_título(opciona, separado por comas);latitud(opcional);longitud(opcional),distancia(opcional)`\n
Ejemplo: `/add placa base itx;0;150`\n
Ejemplo 2: `/add cpu;10;30;;intel,core 2 duo,celeron;;;100`\n
Los campos opcionales que se dejen vacíos tomarán el valor configurado en el archivo `.env`\n
Lista los productos con `/list` o obtén la información de uno en concreto con `/list nombre del producto`\n
Borra un producto con `/remove nombre del producto`\n
`/status` muestra tu tipo de membresía y fecha de caducidad"""
else:
message = """Activa tu periodo de prueba de 7 días con `/test` o contacta con @jocarduck para más información."""
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
async def categories_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if walladb.is_user_valid(helpers.get_telegram_user_id(update)):
message = ''
for category in constants.CATEGORIES:
message = f"{message}, {category}"
await update.message.reply_markdown_v2(f"```{helpers.telegram_escape_characters(message)}```")
async def add_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
telegram_user_id = helpers.get_telegram_user_id(update)
if walladb.is_user_valid(telegram_user_id):
number_of_products = walladb.count_user_products(telegram_user_id)
message = """Tienes que pasar el número correcto de parámetros: `/add producto;precio_mínimo;precio_máximo,categoría,excluir_título(opcional, separado por comas);excluir_descripción_y_título(opciona, separado por comas);latitud(opcional);longitud(opcional),distancia(opcional)`\n
Ejemplo: `/add placa base itx;0;150`\n
Ejemplo 2: `/add cpu;10;30;;intel,core 2 duo,celeron;;;100`\n
Los campos opcionales que se dejen vacíos tomarán el valor configurado en el archivo `.env`"""
valid = False
if walladb.is_user_testing(telegram_user_id):
valid = True
if number_of_products >= 5:
message = "Ya tienes 5 productos en seguimiento. Con premium puedes tener hasta 20."
valid = False
elif walladb.is_user_premium(telegram_user_id):
valid = True
if number_of_products >= 20:
message = "Ya tienes 20 productos en seguimiento. Borra algunos para añadir más."
valid = False
if valid:
args = update.message.text.split("/add ")
if len(args) == 1:
pass
elif len(args[1].split(";")) > 2:
product = dict()
product['telegram_user_id'] = telegram_user_id
args = args[1].split(";")
product['product_name'], product['min_price'], product['max_price'] = args[0:3]
if len(args) > 3 and args[3]:
product['category'] = helpers.get_category_id(args[3])
if len(args) > 4 and args[4]:
product['title_exclude'] = args[4]
if len(args) > 5 and args[5]:
product['title_description_exclude'] = args[5]
if len(args) > 6 and args[6]:
product['latitude'] = args[6]
if len(args) > 7 and args[7]:
product['longitude'] = args[7]
if len(args) > 8 and args[8]:
product['distance'] = args[8]
logging.info(f'Adding: {product}')
if not walladb.get_product(product):
walladb.add_product(product)
p = threading.Thread(target=Worker.run, args=(walladb.get_product(product), ))
p.start()
message = f"Añadido {walladb.get_product(product)['product_name']} a seguimiento."
else:
message = f"¡{walladb.get_product(product)['product_name']} ya está en seguimiento!"
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
async def remove_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
telegram_user_id = helpers.get_telegram_user_id(update)
if walladb.is_user_valid(telegram_user_id):
if walladb.is_user_testing(telegram_user_id) or walladb.is_user_premium(telegram_user_id):
product_to_remove = update.message.text[len('/remove '):]
message = f"¡{product_to_remove} no está en seguimiento!"
if walladb.remove_product({'product_name' : product_to_remove, \
'telegram_user_id' : telegram_user_id}):
message = f"¡{product_to_remove} borrado de la lista de seguimiento!"
await update.message.reply_text(message)
async def list_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
telegram_user_id = helpers.get_telegram_user_id(update)
if walladb.is_user_valid(telegram_user_id):
if walladb.is_user_testing(telegram_user_id) or walladb.is_user_premium(telegram_user_id):
products = walladb.get_products_from_user(telegram_user_id)
args = update.message.text.split("/list ")
found = False
if len(args) > 1:
product = walladb.get_product({'product_name':args[1],'telegram_user_id':telegram_user_id})
if product:
table = prettytable.PrettyTable(['Campo', 'Valor'])
table.align['Campo'] = 'l'
table.align['Valor'] = 'r'
for key in product:
table.add_row([key, product[key]])
found = True
if not found:
table = prettytable.PrettyTable(['Producto', 'Mín', 'Máx'])
table.align['Producto'] = 'l'
table.align['Mín'] = 'r'
table.align['Máx'] = 'r'
for product in products:
table.add_row([helpers.telegram_escape_characters(product['product_name']), f"{helpers.telegram_escape_characters(product['min_price'])}", f"{helpers.telegram_escape_characters(product['max_price'])}"])
await update.message.reply_markdown_v2(f'```{(table)}```')
async def admin_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if helpers.is_user_admin(update.message.chat_id):
await update.message.reply_markdown_v2(helpers.telegram_escape_characters("¡Eres admin!"))
async def add_premium_user_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if helpers.is_user_admin(update.message.chat_id):
telegram_user_id = update.message.text.split('/add_premium_user ')[1].split(' ')[0]
days = update.message.text.split('/add_premium_user ')[1].split(' ')[1]
until = helpers.get_date_ahead(int(days))
if not walladb.add_premium_user(telegram_user_id, until):
products = walladb.get_products_from_user(telegram_user_id)
for product in products:
logging.info(product)
p = threading.Thread(target=Worker.run, args=(product, ))
p.start()
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} re-activado hasta {until}. Re-activando productos."))
else:
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} añadido hasta {until}."))
async def remove_user_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if helpers.is_user_admin(update.message.chat_id):
telegram_user_id = update.message.text.split('/remove_user ')[1]
walladb.remove_valid_user(telegram_user_id)
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} desactivado."))
async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
telegram_user_id = helpers.get_telegram_user_id(update)
if walladb.is_user_valid(telegram_user_id):
type = walladb.get_user_type(telegram_user_id)
until = walladb.get_user_until(telegram_user_id)
message = f"Tu cuenta es tipo: {type} y caduca el {until}."
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
telegram_user_id = helpers.get_telegram_user_id(update)
if not walladb.is_user_valid(telegram_user_id):
until = helpers.get_date_ahead(7)
walladb.add_test_user(telegram_user_id, until)
message = f"Periodo de prueba activado hasta el {until}."
else:
message = "Ya has utilizado el periodo de prueba."
if walladb.is_user_testing(telegram_user_id):
message = "Ya estás en el periodo de prueba."
elif walladb.is_user_premium(telegram_user_id):
message = "Ya eres premium. No puedes volver al periodo de prueba."
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
def main()->None:
walladb.setup_db()
products = walladb.get_all_products()
for product in products:
logging.info(product)
p = threading.Thread(target=Worker.run, args=(product, ))
p.start()
"""Start the bot."""
# Create the Application and pass it your bot's token.
application = Application.builder().token(constants.TELEGRAM_TOKEN).build()
# on different commands - answer in Telegram
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("categories", categories_command))
application.add_handler(CommandHandler("add", add_command))
application.add_handler(CommandHandler("remove", remove_command))
application.add_handler(CommandHandler("list", list_command))
application.add_handler(CommandHandler("admin", admin_command))
application.add_handler(CommandHandler("add_premium_user", add_premium_user_command))
application.add_handler(CommandHandler("remove_user", remove_user_command))
application.add_handler(CommandHandler("status", status_command))
application.add_handler(CommandHandler("test", test_command))
# on non command i.e message - echo the message on Telegram
#application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
# Run the bot until the user presses Ctrl-C
application.run_polling()
if __name__ == "__main__":
main()