146 lines
7.1 KiB
Python
146 lines
7.1 KiB
Python
import json
|
|
import os
|
|
import threading
|
|
import logging
|
|
import prettytable
|
|
import helpers
|
|
import walladb
|
|
|
|
from worker import Worker
|
|
from telegram import Update
|
|
from telegram.ext import Application, CommandHandler, ContextTypes
|
|
|
|
TELEGRAM_CHANNEL_ID = os.getenv("TELEGRAM_CHANNEL_ID")
|
|
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
|
|
LATITUDE = os.getenv("LATITUDE")
|
|
LONGITUDE = os.getenv("LONGITUDE")
|
|
SLEEP_TIME = os.getenv("SLEEP_TIME")
|
|
|
|
# Enable logging
|
|
logging.basicConfig(
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def parse_json_file():
|
|
f = open("/app/data/products.json")
|
|
return json.load(f)
|
|
|
|
def save_json_file(products):
|
|
with open('/app/data/products.json', 'w') as outfile:
|
|
json.dump(products, outfile, indent=2)
|
|
|
|
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if walladb.is_user_valid(helpers.get_telegram_user_id(update)):
|
|
message = """Añade un producto con `/add producto;precio_mínimo;precio_máximo,excluir_título(opcional, separado por comas);excluir_descripción_y_título(opciona, separado por comas);latitud(opcional);longitud(opcional),distancia(opcional)`\n
|
|
Ejemplo: `/add placa base itx;0;150`\n
|
|
Ejemplo 2: `/add cpu;10;30;;intel,core 2 duo,celeron;;;100`\n
|
|
Los campos opcionales que se dejen vacíos tomarán el valor configurado en el archivo `.env`\n
|
|
Lista los productos con `/list` o obtén la información de uno en concreto con `/list nombre del producto`\n
|
|
Borra un producto con `/remove nombre del producto`"""
|
|
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
|
|
|
|
async def add_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if walladb.is_user_valid(helpers.get_telegram_user_id(update)):
|
|
message = """Tienes que pasar el número correcto de parámetros: `/add producto;precio_mínimo;precio_máximo,excluir_título(opcional, separado por comas);excluir_descripción_y_título(opciona, separado por comas);latitud(opcional);longitud(opcional),distancia(opcional)`\n
|
|
Ejemplo: `/add placa base itx;0;150`\n
|
|
Ejemplo 2: `/add cpu;10;30;;intel,core 2 duo,celeron;;;100`\n
|
|
Los campos opcionales que se dejen vacíos tomarán el valor configurado en el archivo `.env`"""
|
|
|
|
args = update.message.text.split("/add ")
|
|
if len(args) == 1:
|
|
pass
|
|
elif len(args[1].split(";")) > 2:
|
|
args = args[1].split(";")
|
|
logging.info(f'Adding: {args}')
|
|
if not walladb.get_product(args, helpers.get_telegram_user_id(update)):
|
|
walladb.add_product(args, helpers.get_telegram_user_id(update))
|
|
p = threading.Thread(target=Worker.run, args=(walladb.get_product(args, helpers.get_telegram_user_id(update)), ))
|
|
p.start()
|
|
message = f"Añadido {walladb.get_product(args, helpers.get_telegram_user_id(update))['product_name']} a seguimiento"
|
|
else:
|
|
message = f"{walladb.get_product(args, helpers.get_telegram_user_id(update))['product_name']} ya está en seguimiento!"
|
|
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
|
|
|
|
async def remove_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if walladb.is_user_valid(helpers.get_telegram_user_id(update)):
|
|
product_to_remove = update.message.text[len('/remove '):]
|
|
message = f"{product_to_remove} no está en seguimiento!"
|
|
products = parse_json_file()
|
|
for product in products:
|
|
if product['product_name'] == product_to_remove:
|
|
products.remove(product)
|
|
message = f"{product_to_remove} borrado!"
|
|
save_json_file(products)
|
|
await update.message.reply_text(message)
|
|
|
|
async def list_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if walladb.is_user_valid(helpers.get_telegram_user_id(update)):
|
|
products = parse_json_file()
|
|
|
|
args = update.message.text.split("/list ")
|
|
found = False
|
|
if len(args) > 1:
|
|
table = prettytable.PrettyTable(['Campo', 'Valor'])
|
|
table.align['Campo'] = 'l'
|
|
table.align['Valor'] = 'r'
|
|
for product in products:
|
|
if product['product_name'] == args[1]:
|
|
for key in product:
|
|
table.add_row([key, product[key]])
|
|
found = True
|
|
break
|
|
if not found:
|
|
table = prettytable.PrettyTable(['Producto', 'Mín', 'Máx'])
|
|
table.align['Producto'] = 'l'
|
|
table.align['Mín'] = 'r'
|
|
table.align['Máx'] = 'r'
|
|
for product in products:
|
|
table.add_row([helpers.telegram_escape_characters(product['product_name']), f"{helpers.telegram_escape_characters(product['min_price'])}€", f"{helpers.telegram_escape_characters(product['max_price'])}€"])
|
|
await update.message.reply_markdown_v2(f'```{(table)}```')
|
|
|
|
async def admin_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if helpers.is_user_admin(update.message.chat_id):
|
|
await update.message.reply_markdown_v2(helpers.telegram_escape_characters("¡Eres admin!"))
|
|
|
|
async def add_user_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if helpers.is_user_admin(update.message.chat_id):
|
|
telegram_user_id = update.message.text.split('/add_user ')[1]
|
|
walladb.add_valid_user(telegram_user_id)
|
|
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} añadido."))
|
|
|
|
async def remove_user_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if helpers.is_user_admin(update.message.chat_id):
|
|
telegram_user_id = update.message.text.split('/remove_user ')[1]
|
|
walladb.remove_valid_user(telegram_user_id)
|
|
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} desactivado."))
|
|
|
|
def main()->None:
|
|
walladb.setup_db()
|
|
products = parse_json_file()
|
|
|
|
for product in products:
|
|
logging.info(product)
|
|
p = threading.Thread(target=Worker.run, args=(product, ))
|
|
p.start()
|
|
|
|
"""Start the bot."""
|
|
# Create the Application and pass it your bot's token.
|
|
application = Application.builder().token(TELEGRAM_TOKEN).build()
|
|
|
|
# on different commands - answer in Telegram
|
|
application.add_handler(CommandHandler("help", help_command))
|
|
application.add_handler(CommandHandler("add", add_command))
|
|
application.add_handler(CommandHandler("remove", remove_command))
|
|
application.add_handler(CommandHandler("list", list_command))
|
|
application.add_handler(CommandHandler("admin", list_command))
|
|
|
|
# on non command i.e message - echo the message on Telegram
|
|
#application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
|
|
|
|
# Run the bot until the user presses Ctrl-C
|
|
application.run_polling()
|
|
|
|
if __name__ == "__main__":
|
|
main() |