Added database and deleted json. Renamed main file. Modified functions to work with database. And more things...

This commit is contained in:
Joan Cano
2023-03-06 00:22:07 +01:00
parent 20778a9e96
commit 17e9cd05ec
6 changed files with 163 additions and 79 deletions

169
wallamanta/wallamanta.py Normal file
View File

@@ -0,0 +1,169 @@
import json
import os
import threading
import logging
import prettytable
import helpers
import walladb
from worker import Worker
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
TELEGRAM_CHANNEL_ID = os.getenv("TELEGRAM_CHANNEL_ID")
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
LATITUDE = os.getenv("LATITUDE")
LONGITUDE = os.getenv("LONGITUDE")
SLEEP_TIME = os.getenv("SLEEP_TIME")
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
def parse_json_file():
f = open("/app/data/products.json")
return json.load(f)
def save_json_file(products):
with open('/app/data/products.json', 'w') as outfile:
json.dump(products, outfile, indent=2)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if walladb.is_user_valid(helpers.get_telegram_user_id(update)):
message = """Añade un producto con `/add producto;precio_mínimo;precio_máximo,excluir_título(opcional, separado por comas);excluir_descripción_y_título(opciona, separado por comas);latitud(opcional);longitud(opcional),distancia(opcional)`\n
Ejemplo: `/add placa base itx;0;150`\n
Ejemplo 2: `/add cpu;10;30;;intel,core 2 duo,celeron;;;100`\n
Los campos opcionales que se dejen vacíos tomarán el valor configurado en el archivo `.env`\n
Lista los productos con `/list` o obtén la información de uno en concreto con `/list nombre del producto`\n
Borra un producto con `/remove nombre del producto`"""
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
async def add_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
telegram_user_id = helpers.get_telegram_user_id(update)
if walladb.is_user_valid(telegram_user_id):
message = """Tienes que pasar el número correcto de parámetros: `/add producto;precio_mínimo;precio_máximo,excluir_título(opcional, separado por comas);excluir_descripción_y_título(opciona, separado por comas);latitud(opcional);longitud(opcional),distancia(opcional)`\n
Ejemplo: `/add placa base itx;0;150`\n
Ejemplo 2: `/add cpu;10;30;;intel,core 2 duo,celeron;;;100`\n
Los campos opcionales que se dejen vacíos tomarán el valor configurado en el archivo `.env`"""
args = update.message.text.split("/add ")
if len(args) == 1:
pass
elif len(args[1].split(";")) > 2:
product = dict()
product['telegram_user_id'] = telegram_user_id
args = args[1].split(";")
product['product_name'], product['min_price'], product['max_price'] = args[0:3]
if len(args) > 3 and args[3]:
product['title_exclude'] = args[3]
if len(args) > 4 and args[4]:
product['title_description_exclude'] = args[4]
if len(args) > 5 and args[5]:
product['latitude'] = args[5]
if len(args) > 6 and args[6]:
product['longitude'] = args[6]
if len(args) > 7 and args[7]:
product['distance'] = args[7]
logging.info(f'Adding: {product}')
if not walladb.get_product(product):
walladb.add_product(product)
p = threading.Thread(target=Worker.run, args=(walladb.get_product(product), ))
p.start()
message = f"Añadido {walladb.get_product(product)['product_name']} a seguimiento"
else:
message = f"{walladb.get_product(product)['product_name']} ya está en seguimiento!"
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
async def remove_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
telegram_user_id = helpers.get_telegram_user_id(update)
if walladb.is_user_valid(telegram_user_id):
product_to_remove = update.message.text[len('/remove '):]
message = f"{product_to_remove} no está en seguimiento!"
if walladb.remove_product({'product_name' : product_to_remove, \
'telegram_user_id' : telegram_user_id}):
message = f"{product_to_remove} borrado!"
await update.message.reply_text(message)
async def list_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
telegram_user_id = helpers.get_telegram_user_id(update)
if walladb.is_user_valid(telegram_user_id):
products = walladb.get_products_from_user(telegram_user_id)
args = update.message.text.split("/list ")
found = False
if len(args) > 1:
product = walladb.get_product({'product_name':args[1],'telegram_user_id':telegram_user_id})
if product:
table = prettytable.PrettyTable(['Campo', 'Valor'])
table.align['Campo'] = 'l'
table.align['Valor'] = 'r'
for key in product:
table.add_row([key, product[key]])
found = True
if not found:
table = prettytable.PrettyTable(['Producto', 'Mín', 'Máx'])
table.align['Producto'] = 'l'
table.align['Mín'] = 'r'
table.align['Máx'] = 'r'
for product in products:
table.add_row([helpers.telegram_escape_characters(product['product_name']), f"{helpers.telegram_escape_characters(product['min_price'])}", f"{helpers.telegram_escape_characters(product['max_price'])}"])
await update.message.reply_markdown_v2(f'```{(table)}```')
async def admin_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if helpers.is_user_admin(update.message.chat_id):
await update.message.reply_markdown_v2(helpers.telegram_escape_characters("¡Eres admin!"))
async def add_user_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if helpers.is_user_admin(update.message.chat_id):
telegram_user_id = update.message.text.split('/add_user ')[1]
if walladb.add_valid_user(telegram_user_id):
products = walladb.get_products_from_user(telegram_user_id)
for product in products:
logging.info(product)
p = threading.Thread(target=Worker.run, args=(product, ))
p.start()
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} re-activado. Re-activando productos."))
else:
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} añadido."))
async def remove_user_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if helpers.is_user_admin(update.message.chat_id):
telegram_user_id = update.message.text.split('/remove_user ')[1]
walladb.remove_valid_user(telegram_user_id)
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} desactivado."))
def main()->None:
walladb.setup_db()
products = walladb.get_all_products()
for product in products:
logging.info(product)
p = threading.Thread(target=Worker.run, args=(product, ))
p.start()
"""Start the bot."""
# Create the Application and pass it your bot's token.
application = Application.builder().token(TELEGRAM_TOKEN).build()
# on different commands - answer in Telegram
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("add", add_command))
application.add_handler(CommandHandler("remove", remove_command))
application.add_handler(CommandHandler("list", list_command))
application.add_handler(CommandHandler("admin", admin_command))
application.add_handler(CommandHandler("add_user", add_user_command))
application.add_handler(CommandHandler("remove_user", remove_user_command))
# on non command i.e message - echo the message on Telegram
#application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
# Run the bot until the user presses Ctrl-C
application.run_polling()
if __name__ == "__main__":
main()