diff --git a/docker-compose.yml b/docker-compose.yml index f9cb0cf..fcf286e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,3 +14,26 @@ services: - LATITUDE=${LATITUDE} - LONGITUDE=${LONGITUDE} - SLEEP_TIME=${SLEEP_TIME} + - DB_HOST=${DB_HOST} + - DB_NAME=${DB_NAME} + - DB_USER=${DB_USER} + - DB_PASSWORD=${DB_PASSWORD} + + wallamanta_db: + image: mysql:8 + container_name: wallamanta_db + volumes: + - ./db:/var/lib/mysql + environment: + - MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASSWORD} + - MYSQL_DATABASE=${DB_NAME} + - MYSQL_USER=${DB_USER} + - MYSQL_PASSWORD=${DB_PASSWORD} + restart: unless-stopped + + wallamanta_adminer: + image: adminer + container_name: wallamanta_adminer + restart: unless-stopped + ports: + - 8555:8080 \ No newline at end of file diff --git a/wallamanta/constants.py b/wallamanta/constants.py index fe89dbc..f833856 100644 --- a/wallamanta/constants.py +++ b/wallamanta/constants.py @@ -6,6 +6,10 @@ TELEGRAM_REMOVE_CHARACTERS = ['#'] ADMIN_IDS = [10101691] TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") DB = "/app/data/wallamanta.db" +DB_HOST = os.getenv("DB_HOST") +DB_USER = os.getenv("DB_USER") +DB_PASSWORD = os.getenv("DB_PASSWORD") +DB_NAME = os.getenv("DB_NAME") LATITUDE = os.getenv("LATITUDE") LONGITUDE = os.getenv("LONGITUDE") SLEEP_TIME = int(os.getenv("SLEEP_TIME")) diff --git a/wallamanta/helpers.py b/wallamanta/helpers.py index dc75f5c..9111cbc 100644 --- a/wallamanta/helpers.py +++ b/wallamanta/helpers.py @@ -7,7 +7,7 @@ import pytz import walladb from PIL import Image, ImageDraw, ImageFont -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta, timezone, date from telegram import InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application from telegram.constants import ParseMode @@ -43,12 +43,15 @@ def get_telegram_user_name(update): return update.message.from_user def get_date_ahead(add_days): - date_ahead = datetime.today() + timedelta(days=add_days) - return date_ahead.strftime("%d/%m/%Y") + date_ahead = date.today() + timedelta(days=add_days) + return date_ahead + +def get_spanish_date(date): + return date.strftime("%d/%m/%Y") def is_date_expired(until): - until_date = datetime.strptime(until, "%d/%m/%Y") - difference = until_date - datetime.today() + #until_date = datetime.strptime(until, "%d/%m/%Y") + difference = until - date.today() return difference.days < 0 def random_wait(): @@ -123,10 +126,10 @@ async def send_article(article, product): application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build() create_image(article) title = f"*{telegram_escape_characters(article['title'])}*" - description = f"*Descripción*: {telegram_escape_characters(article['description'])}" - found_by = f"*Encontrado por la búsqueda de:* {telegram_escape_characters(product['product_name'])}" - created_at = f"*Fecha de publicación:* {telegram_escape_characters(get_publish_date(article))}" - price = f"*Precio*: {telegram_escape_characters(str(article['price']))} {telegram_escape_characters(article['currency'])}" + description = f"*📝 Descripción*: {telegram_escape_characters(article['description'])}" + found_by = f"*🔍 Encontrado por la búsqueda de:* {telegram_escape_characters(product['product_name'])}" + created_at = f"*📅 Fecha de publicación:* {telegram_escape_characters(get_publish_date(article))}" + price = f"*💰 Precio*: {telegram_escape_characters(str(article['price']))} {telegram_escape_characters(article['currency'])}" text = f"{title}\n\n{description}\n\n{found_by}\n\n{created_at}\n\n{price}" #url = f"https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto?chat_id={product['telegram_user_id']}&caption={text}&parse_mode=MarkdownV2" #files = {'photo':open(f"/app/data/images/products/{article['id']}_composed.png", 'rb')} @@ -261,4 +264,11 @@ def generate_categories_string(categories, subcategories): categories_string = categories_string[:-2] else: categories_string = "todas" - return categories_string \ No newline at end of file + return categories_string + +def get_thread(product_name): + global SEARCH_THREADS_LIST + for product_thread in SEARCH_THREADS_LIST: + if product_name == product_thread[0]: + return product_thread[1] + return None \ No newline at end of file diff --git a/wallamanta/requirements.txt b/wallamanta/requirements.txt index 200a22e..3e51ca2 100644 --- a/wallamanta/requirements.txt +++ b/wallamanta/requirements.txt @@ -2,4 +2,5 @@ python-telegram-bot==20.1 python-telegram-bot[job-queue]==20.1 requests==2.28.1 prettytable==3.6.0 -Pillow==9.4.0 \ No newline at end of file +Pillow==9.4.0 +mysql-connector-python==8.0.32 \ No newline at end of file diff --git a/wallamanta/walladb.py b/wallamanta/walladb.py index a2091d8..fd4e63f 100644 --- a/wallamanta/walladb.py +++ b/wallamanta/walladb.py @@ -1,4 +1,5 @@ -import sqlite3 +import mysql.connector +from mysql.connector import errorcode import logging import constants import helpers @@ -16,30 +17,86 @@ def dict_factory(cursor, row): d[col[0]] = row[idx] return d +def connect_db(): + db = mysql.connector.connect( + host = constants.DB_HOST, + user = constants.DB_USER, + password = constants.DB_PASSWORD, + database = constants.DB_NAME + ) + return db + def setup_db(): - con = sqlite3.connect(constants.DB) - cur = con.cursor() - cur.execute("CREATE TABLE IF NOT EXISTS users(telegram_user_id, active, type, until, telegram_name)") - cur.execute("CREATE TABLE IF NOT EXISTS products(product_name, distance, \ - latitude, longitude, condition, min_price, max_price, category, subcategory, \ - title_exclude, title_description_exclude, telegram_user_id)") + TABLES = {} + TABLES['users'] = ( + "CREATE TABLE `users` (" + " `telegram_user_id` bigint NOT NULL," + " `active` boolean NOT NULL," + " `type` varchar(50) NOT NULL," + " `until` date NOT NULL," + " `telegram_name` varchar(255) NOT NULL," + " `created_at` timestamp DEFAULT CURRENT_TIMESTAMP" + " PRIMARY KEY (`telegram_user_id`)" + ") ENGINE=InnoDB") + + TABLES['products'] = ( + "CREATE TABLE `products` (" + " `id` int AUTO_INCREMENT PRIMARY KEY," + " `product_name` varchar(255) NOT NULL," + " `distance` int NOT NULL," + " `latitude` varchar(20) NOT NULL," + " `longitude` varchar(20) NOT NULL," + " `condition` varchar(20) NULL," + " `min_price` int NOT NULL," + " `max_price` int NOT NULL," + " `category` varchar(255) NULL," + " `subcategory` varchar(255) NULL," + " `title_exclude` text NULL," + " `title_description_exclude` text NULL," + " `telegram_user_id` bigint NOT NULL," + " `modified_at` timestamp NOT NULL," + " `created_at` timestamp DEFAULT CURRENT_TIMESTAMP" + ") ENGINE=InnoDB") + + con = connect_db() + cur = con.cursor(prepared=True) + + for table_name in TABLES: + table_description = TABLES[table_name] + try: + logging.info(f"Creating table {table_name}: ") + cur.execute(table_description) + except mysql.connector.Error as err: + if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: + logging.info("already exists.") + else: + logging.info(err.msg) + else: + logging.info("OK") + cur.close() con.close() def is_user_valid(telegram_user_id): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=? AND active=True", params) - ret = res.fetchone() != None + cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True", params) + try: + ret = cur.fetchone() != None + except: + ret = False con.close() return ret def is_user_expired(telegram_user_id): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT until FROM users WHERE telegram_user_id=?", params) - q_res = res.fetchone() + res = cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params) + try: + q_res = cur.fetchone() + except: + q_res = None ret = True if q_res != None: if not helpers.is_date_expired(q_res[0]): @@ -48,31 +105,37 @@ def is_user_expired(telegram_user_id): return ret def is_user_premium(telegram_user_id): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=? AND active=True AND type='premium'", params) - ret = res.fetchone() != None + cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='premium'", params) + try: + ret = cur.fetchone() != None + except: + ret = False con.close() return ret def is_user_testing(telegram_user_id): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=? AND active=True AND type='testing'", params) - ret = res.fetchone() != None + cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='testing'", params) + try: + ret = cur.fetchone() != None + except: + ret = False con.close() return ret def add_premium_user(telegram_user_id, until): found = False - con = sqlite3.connect(constants.DB) - cur = con.cursor() - res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=?", params) + con = connect_db() + cur = con.cursor(prepared=True) + res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params) if res.fetchone() != None: params = (until, telegram_user_id) - cur.execute(f"UPDATE users SET active = True, type = 'premium', until = ? WHERE telegram_user_id=?", params) + cur.execute(f"UPDATE users SET active = True, type = 'premium', until = %s WHERE telegram_user_id=%s", params) con.commit() found = True con.close() @@ -81,13 +144,13 @@ def add_premium_user(telegram_user_id, until): def add_test_user(telegram_user_id, telegram_name, until): found = False - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=?", params) + res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params) if res.fetchone() is None: params = (telegram_user_id, True, 'testing', until, telegram_name.first_name) - cur.execute("INSERT INTO users VALUES (?, ?, ?, ?, ?)", params) + cur.execute("INSERT INTO users VALUES (%s, %s, %s, %s, %s)", params) con.commit() else: found = True @@ -96,84 +159,100 @@ def add_test_user(telegram_user_id, telegram_name, until): return not found def remove_valid_user(telegram_user_id): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=?", params) + res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params) if res.fetchone() != None: - cur.execute(f"UPDATE users SET active = False WHERE telegram_user_id=?", params) + cur.execute(f"UPDATE users SET active = False WHERE telegram_user_id=%s", params) con.commit() con.close() logging.info(f"De-activated user {telegram_user_id}") def get_user_list(): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) res = cur.execute(f"SELECT * FROM users") ret = res.fetchall() con.close() return ret def get_user(telegram_user_id): - con = sqlite3.connect(constants.DB) - con.row_factory = dict_factory - cur = con.cursor() + con = connect_db() + #con.row_factory = dict_factory + cur = con.cursor(dictionary=True, prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT telegram_name FROM users WHERE telegram_user_id=?", params) - if res != None: - ret = res.fetchone()['telegram_name'] - else: + cur.execute(f"SELECT telegram_name FROM users WHERE telegram_user_id=%s", params) + try: + ret = cur.fetchone()['telegram_name'] + except Exception as e: + logging.error(f"Couldn't find username with id {telegram_user_id}: {e}") ret = 'NoName' con.close() return ret def get_user_type(telegram_user_id): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT type FROM users WHERE telegram_user_id=?", params) - ret = res.fetchone() + cur.execute(f"SELECT type FROM users WHERE telegram_user_id=%s", params) + try: + ret = cur.fetchone()[0] + except: + ret = None con.close() - return ret[0] + return ret def get_user_until(telegram_user_id): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT until FROM users WHERE telegram_user_id=?", params) - ret = res.fetchone() + cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params) + try: + ret = cur.fetchone()[0] + except: + ret = None con.close() - return ret[0] + return ret def get_product(product): product_name = product.get('product_name').lower() telegram_user_id = product.get('telegram_user_id') - con = sqlite3.connect(constants.DB) - con.row_factory = dict_factory - cur = con.cursor() + con = connect_db() + #con.row_factory = dict_factory + cur = con.cursor(dictionary=True, prepared=True) params = (telegram_user_id, product_name) - res = cur.execute(f"SELECT * FROM products WHERE telegram_user_id=? \ - AND product_name=?", params) - ret = res.fetchone() + cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s \ + AND product_name=%s", params) + try: + ret = cur.fetchone() + except: + ret = None con.close() return ret def get_products_from_user(telegram_user_id): - con = sqlite3.connect(constants.DB) - con.row_factory = dict_factory - cur = con.cursor() + con = connect_db() + #con.row_factory = dict_factory + cur = con.cursor(dictionary=True, prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT * FROM products WHERE telegram_user_id=?", params) - ret = res.fetchall() + cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s", params) + try: + ret = cur.fetchall() + except: + ret = None con.close() return ret def get_all_products(): - con = sqlite3.connect(constants.DB) - con.row_factory = dict_factory - cur = con.cursor() - res = cur.execute(f"SELECT * FROM products") - ret = res.fetchall() + con = connect_db() + #con.row_factory = dict_factory + cur = con.cursor(dictionary=True, prepared=True) + cur.execute(f"SELECT * FROM products") + try: + ret = cur.fetchall() + except: + ret = None con.close() return ret @@ -181,6 +260,8 @@ def add_product(product): condition = 'all' product_name = product.get('product_name').lower() distance = product.get('distance', 0) + if int(distance) > 1000000000: + distance = 1000000000 latitude = product.get('latitude', constants.LATITUDE) longitude = product.get('longitude', constants.LONGITUDE) min_price = product.get('min_price') @@ -193,17 +274,14 @@ def add_product(product): category = '' telegram_user_id = product.get('telegram_user_id') logging.info(f"Trying to add: {product_name}, {telegram_user_id}") - con = sqlite3.connect(constants.DB) - cur = con.cursor() - params = (telegram_user_id, product_name) - res = cur.execute(f"SELECT * FROM products WHERE telegram_user_id=? \ - AND product_name=?", params) - if res.fetchone() is None: - params = (product_name, \ - distance, latitude, longitude, condition, min_price, \ - max_price, category, subcategory, title_exclude, title_description_exclude, telegram_user_id) - cur.execute("INSERT INTO products VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", params) - con.commit() + con = connect_db() + cur = con.cursor(prepared=True) + params = (product_name, \ + distance, latitude, longitude, condition, min_price, \ + max_price, category, subcategory, title_exclude, title_description_exclude, telegram_user_id) + cur.execute("INSERT INTO `products` (`product_name`, `distance`, `latitude`, `longitude`, `condition`, `min_price`, `max_price`, `category`, `subcategory`, `title_exclude`, `title_description_exclude`, `telegram_user_id`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", params) + con.commit() + logging.info(f"{product_name} added for {telegram_user_id}") con.close() def remove_product(product): @@ -211,11 +289,11 @@ def remove_product(product): product_name = product.get('product_name').lower() removed = False if get_product(product): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id, product_name) - res = cur.execute(f"DELETE FROM products WHERE telegram_user_id=? \ - AND product_name=?", params) + cur.execute(f"DELETE FROM products WHERE telegram_user_id=%s \ + AND product_name=%s", params) con.commit() con.close() logging.info(f"Removed product {product['product_name']}") @@ -223,10 +301,13 @@ def remove_product(product): return removed def count_user_products(telegram_user_id): - con = sqlite3.connect(constants.DB) - cur = con.cursor() + con = connect_db() + cur = con.cursor(prepared=True) params = (telegram_user_id,) - res = cur.execute(f"SELECT Count() FROM products WHERE telegram_user_id=?", params) - ret = res.fetchone()[0] + cur.execute(f"SELECT Count(*) FROM products WHERE telegram_user_id=%s", params) + try: + ret = cur.fetchone()[0] + except: + ret = None con.close() return ret \ No newline at end of file diff --git a/wallamanta/wallamanta.py b/wallamanta/wallamanta.py index b8755ee..01abc41 100644 --- a/wallamanta/wallamanta.py +++ b/wallamanta/wallamanta.py @@ -1,7 +1,5 @@ -import json import threading import logging -import prettytable import helpers import walladb import constants @@ -20,6 +18,8 @@ from telegram.ext import ( filters ) +SEARCH_THREADS_LIST = [] + ACTION, ADD_PRODUCT_NAME, ADD_PRODUCT_MIN_PRICE, ADD_PRODUCT_MAX_PRICE, \ ADD_PRODUCT_CATEGORY, ADD_PRODUCT_TITLE_EXCLUDE, ADD_PRODUCT_DESCRIPTION_EXCLUDE, \ ADD_PRODUCT_COORDS, ADD_PRODUCT_DISTANCE, REMOVE_PRODUCT, LIST, FINISH, CONTINUE_OR_FINISH = range(13) @@ -304,6 +304,7 @@ async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: return ConversationHandler.END async def remove_product(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + global SEARCH_THREADS_LIST await update.callback_query.edit_message_reply_markup(None) query = update.callback_query product_name = query.data @@ -312,6 +313,9 @@ async def remove_product(update: Update, context: ContextTypes.DEFAULT_TYPE) -> if walladb.remove_product({'product_name' : product_name, \ 'telegram_user_id' : telegram_user_id}): message = f"¡{product_name} borrado de la lista de seguimiento!" + product_thread = helpers.get_thread(product_name) + if product_thread != None: + product_thread.stop() await context.bot.send_message(chat_id=update.effective_chat.id, text=message) return ConversationHandler.END @@ -324,8 +328,8 @@ async def product_details(update: Update, context: ContextTypes.DEFAULT_TYPE) -> if product: categories = helpers.generate_categories_string(product['category'], product['subcategory']) text = f"*Nombre del producto:* {helpers.telegram_escape_characters(product['product_name'])}\n\ -*Precio desde *{product['min_price']}€ *hasta *{product['max_price']}€\n\ -*En las coordenadas *{helpers.telegram_escape_characters(product['latitude'])}, {helpers.telegram_escape_characters(product['longitude'])} *y a *{product['distance']}km *de estas*\n\ +*Precio desde *{helpers.telegram_escape_characters(str(product['min_price']))}€ *hasta *{helpers.telegram_escape_characters(str(product['max_price']))}€\n\ +*En las coordenadas *{helpers.telegram_escape_characters(str(product['latitude']))}, {helpers.telegram_escape_characters(str(product['longitude']))} *y a *{product['distance']}km *de estas*\n\ *En las categorías: *{helpers.telegram_escape_characters(categories)}\n\ *Palabras excluídas del título: *`{helpers.telegram_escape_characters(product['title_exclude'])}`\n\ *Palabras excluídas del título y la descripción: *`{helpers.telegram_escape_characters(product['title_description_exclude'])}`" @@ -376,7 +380,7 @@ async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> if walladb.is_user_valid(telegram_user_id): type = walladb.get_user_type(telegram_user_id) until = walladb.get_user_until(telegram_user_id) - message = f"Tu cuenta es tipo: {type} y caduca el {until}." + message = f"Tu cuenta es tipo: {type} y caduca el {helpers.get_spanish_date(until)}." await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message)) async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: @@ -385,7 +389,7 @@ async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No if not walladb.is_user_valid(telegram_user_id): until = helpers.get_date_ahead(7) walladb.add_test_user(telegram_user_id, telegram_user_name, until) - message = f"Periodo de prueba activado hasta el {until}." + message = f"Periodo de prueba activado hasta el {helpers.get_spanish_date(until)}." else: message = "Ya has utilizado el periodo de prueba." if walladb.is_user_testing(telegram_user_id): @@ -395,10 +399,12 @@ async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message)) async def add_to_db_and_send(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + global SEARCH_THREADS_LIST logging.info(f"Adding product with context: {context.user_data}") walladb.add_product(context.user_data) p = threading.Thread(target=Worker.run, args=(walladb.get_product(context.user_data), )) p.start() + SEARCH_THREADS_LIST.append((context.user_data, p)) await context.bot.send_message(chat_id=update.effective_chat.id, text=f"¡*{helpers.telegram_escape_characters(context.user_data['product_name'])}* añadido correctamente\!", parse_mode=ParseMode.MARKDOWN_V2) def error(update, context): @@ -410,7 +416,25 @@ async def conv_timeout(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No async def conv_finish(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: await context.bot.send_message(chat_id=update.effective_chat.id, text="Vuelve a usar el menú si quieres añadir otro producto.") +async def list_threads(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if helpers.is_user_admin(update.message.chat_id): + global SEARCH_THREADS_LIST + tmp_search_threads_list = [] + threads_string = "" + for thread in SEARCH_THREADS_LIST: + if thread[1].is_alive(): + tmp_search_threads_list.append((thread[0], thread[1])) + threads_string = threads_string + f"{thread[0]['product_name']} - {thread[0]['telegram_user_id']}\n" + SEARCH_THREADS_LIST = tmp_search_threads_list + if len(threads_string) > 2000: + my_strings = [(threads_string[i:i+2000]) for i in range(0, len(threads_string), 2000)] + for my_string in my_strings: + await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{my_string}")) + else: + await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{threads_string}")) + def main()->None: + global SEARCH_THREADS_LIST walladb.setup_db() products = walladb.get_all_products() @@ -418,6 +442,7 @@ def main()->None: logging.info(product) p = threading.Thread(target=Worker.run, args=(product, )) p.start() + SEARCH_THREADS_LIST.append((product, p)) """Start the bot.""" # Create the Application and pass it your bot's token. @@ -430,6 +455,7 @@ def main()->None: application.add_handler(CommandHandler("remove_user", remove_user_command)) application.add_handler(CommandHandler("status", status_command)) application.add_handler(CommandHandler("test", test_command)) + application.add_handler(CommandHandler("list_threads", list_threads)) #application.add_handler(CallbackQueryHandler("list", send_list())) #application.add_handler(CallbackQueryHandler(pattern="list", callback=send_list())) diff --git a/wallamanta/worker.py b/wallamanta/worker.py index 1c3b32b..ebacdf7 100644 --- a/wallamanta/worker.py +++ b/wallamanta/worker.py @@ -15,6 +15,8 @@ logger = logging.getLogger(__name__) class Worker: + _stop = False + def is_valid_request(self, product): is_valid = False if walladb.get_product(product): @@ -26,6 +28,7 @@ class Worker: return is_valid def request(self, product_name, n_articles, latitude=constants.LATITUDE, longitude=constants.LONGITUDE, distance='0', condition='all', min_price=0, max_price=10000000, category="", subcategories=[]): + distance = str(int(distance) * 1000) url = (f"http://api.wallapop.com/api/v3/general/search?keywords={product_name}" f"&order_by=newest&latitude={latitude}" f"&longitude={longitude}" @@ -46,10 +49,13 @@ class Worker: url_subcategories = url_subcategories + f"{subcategory}," url = url + f"&object_type_ids={url_subcategories[:-1]}" + search_objects = list() + for step in range(15): while True: helpers.random_wait() response = requests.get(url+f"&step={step+1}") + search_objects = search_objects + response.json()['search_objects'] try: if response.status_code == 200: break @@ -59,8 +65,7 @@ class Worker: logging.info("Exception: " + e) time.sleep(3) - json_data = response.json() - return json_data['search_objects'] + return search_objects def first_run(self, product): logging.info(f"First run for {product['product_name']} for {walladb.get_user(product['telegram_user_id'])}") @@ -98,7 +103,7 @@ class Worker: helpers.random_wait() # Random wait to make requests separated in time in order to prevent API rate limit exec_times = [] while True: - if not self.is_valid_request(product): + if not self.is_valid_request(product) or self._stop == True: logging.info(f"{product['product_name']} not valid anymore, exiting worker") break # Exits and ends worker thread start_time = time.time() @@ -188,9 +193,15 @@ class Worker: #time.sleep(10) logging.info(f"Wallapop monitor worker stopped for: \'{product['product_name']}\'") + def stop(self): + self._stop = True + def run(product): worker = Worker() - list = worker.first_run(product) + try: + list = worker.first_run(product) + except: + logging.info(f"{product['product_name']} worker crashed.") #time.sleep(constants.SLEEP_TIME) while True: try: