Migrated from sqlite to MySQL

This commit is contained in:
Joan Cano
2023-07-04 00:14:14 +02:00
parent 3c1fb2474b
commit 69b2034bbe
7 changed files with 266 additions and 110 deletions

View File

@@ -14,3 +14,26 @@ services:
- LATITUDE=${LATITUDE}
- LONGITUDE=${LONGITUDE}
- SLEEP_TIME=${SLEEP_TIME}
- DB_HOST=${DB_HOST}
- DB_NAME=${DB_NAME}
- DB_USER=${DB_USER}
- DB_PASSWORD=${DB_PASSWORD}
wallamanta_db:
image: mysql:8
container_name: wallamanta_db
volumes:
- ./db:/var/lib/mysql
environment:
- MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASSWORD}
- MYSQL_DATABASE=${DB_NAME}
- MYSQL_USER=${DB_USER}
- MYSQL_PASSWORD=${DB_PASSWORD}
restart: unless-stopped
wallamanta_adminer:
image: adminer
container_name: wallamanta_adminer
restart: unless-stopped
ports:
- 8555:8080

View File

@@ -6,6 +6,10 @@ TELEGRAM_REMOVE_CHARACTERS = ['#']
ADMIN_IDS = [10101691]
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
DB = "/app/data/wallamanta.db"
DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")
LATITUDE = os.getenv("LATITUDE")
LONGITUDE = os.getenv("LONGITUDE")
SLEEP_TIME = int(os.getenv("SLEEP_TIME"))

View File

@@ -7,7 +7,7 @@ import pytz
import walladb
from PIL import Image, ImageDraw, ImageFont
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta, timezone, date
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application
from telegram.constants import ParseMode
@@ -43,12 +43,15 @@ def get_telegram_user_name(update):
return update.message.from_user
def get_date_ahead(add_days):
date_ahead = datetime.today() + timedelta(days=add_days)
return date_ahead.strftime("%d/%m/%Y")
date_ahead = date.today() + timedelta(days=add_days)
return date_ahead
def get_spanish_date(date):
return date.strftime("%d/%m/%Y")
def is_date_expired(until):
until_date = datetime.strptime(until, "%d/%m/%Y")
difference = until_date - datetime.today()
#until_date = datetime.strptime(until, "%d/%m/%Y")
difference = until - date.today()
return difference.days < 0
def random_wait():
@@ -123,10 +126,10 @@ async def send_article(article, product):
application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build()
create_image(article)
title = f"*{telegram_escape_characters(article['title'])}*"
description = f"*Descripción*: {telegram_escape_characters(article['description'])}"
found_by = f"*Encontrado por la búsqueda de:* {telegram_escape_characters(product['product_name'])}"
created_at = f"*Fecha de publicación:* {telegram_escape_characters(get_publish_date(article))}"
price = f"*Precio*: {telegram_escape_characters(str(article['price']))} {telegram_escape_characters(article['currency'])}"
description = f"*📝 Descripción*: {telegram_escape_characters(article['description'])}"
found_by = f"*🔍 Encontrado por la búsqueda de:* {telegram_escape_characters(product['product_name'])}"
created_at = f"*📅 Fecha de publicación:* {telegram_escape_characters(get_publish_date(article))}"
price = f"*💰 Precio*: {telegram_escape_characters(str(article['price']))} {telegram_escape_characters(article['currency'])}"
text = f"{title}\n\n{description}\n\n{found_by}\n\n{created_at}\n\n{price}"
#url = f"https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto?chat_id={product['telegram_user_id']}&caption={text}&parse_mode=MarkdownV2"
#files = {'photo':open(f"/app/data/images/products/{article['id']}_composed.png", 'rb')}
@@ -262,3 +265,10 @@ def generate_categories_string(categories, subcategories):
else:
categories_string = "todas"
return categories_string
def get_thread(product_name):
global SEARCH_THREADS_LIST
for product_thread in SEARCH_THREADS_LIST:
if product_name == product_thread[0]:
return product_thread[1]
return None

View File

@@ -3,3 +3,4 @@ python-telegram-bot[job-queue]==20.1
requests==2.28.1
prettytable==3.6.0
Pillow==9.4.0
mysql-connector-python==8.0.32

View File

@@ -1,4 +1,5 @@
import sqlite3
import mysql.connector
from mysql.connector import errorcode
import logging
import constants
import helpers
@@ -16,30 +17,86 @@ def dict_factory(cursor, row):
d[col[0]] = row[idx]
return d
def connect_db():
db = mysql.connector.connect(
host = constants.DB_HOST,
user = constants.DB_USER,
password = constants.DB_PASSWORD,
database = constants.DB_NAME
)
return db
def setup_db():
con = sqlite3.connect(constants.DB)
cur = con.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS users(telegram_user_id, active, type, until, telegram_name)")
cur.execute("CREATE TABLE IF NOT EXISTS products(product_name, distance, \
latitude, longitude, condition, min_price, max_price, category, subcategory, \
title_exclude, title_description_exclude, telegram_user_id)")
TABLES = {}
TABLES['users'] = (
"CREATE TABLE `users` ("
" `telegram_user_id` bigint NOT NULL,"
" `active` boolean NOT NULL,"
" `type` varchar(50) NOT NULL,"
" `until` date NOT NULL,"
" `telegram_name` varchar(255) NOT NULL,"
" `created_at` timestamp DEFAULT CURRENT_TIMESTAMP"
" PRIMARY KEY (`telegram_user_id`)"
") ENGINE=InnoDB")
TABLES['products'] = (
"CREATE TABLE `products` ("
" `id` int AUTO_INCREMENT PRIMARY KEY,"
" `product_name` varchar(255) NOT NULL,"
" `distance` int NOT NULL,"
" `latitude` varchar(20) NOT NULL,"
" `longitude` varchar(20) NOT NULL,"
" `condition` varchar(20) NULL,"
" `min_price` int NOT NULL,"
" `max_price` int NOT NULL,"
" `category` varchar(255) NULL,"
" `subcategory` varchar(255) NULL,"
" `title_exclude` text NULL,"
" `title_description_exclude` text NULL,"
" `telegram_user_id` bigint NOT NULL,"
" `modified_at` timestamp NOT NULL,"
" `created_at` timestamp DEFAULT CURRENT_TIMESTAMP"
") ENGINE=InnoDB")
con = connect_db()
cur = con.cursor(prepared=True)
for table_name in TABLES:
table_description = TABLES[table_name]
try:
logging.info(f"Creating table {table_name}: ")
cur.execute(table_description)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
logging.info("already exists.")
else:
logging.info(err.msg)
else:
logging.info("OK")
cur.close()
con.close()
def is_user_valid(telegram_user_id):
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=? AND active=True", params)
ret = res.fetchone() != None
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True", params)
try:
ret = cur.fetchone() != None
except:
ret = False
con.close()
return ret
def is_user_expired(telegram_user_id):
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT until FROM users WHERE telegram_user_id=?", params)
q_res = res.fetchone()
res = cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params)
try:
q_res = cur.fetchone()
except:
q_res = None
ret = True
if q_res != None:
if not helpers.is_date_expired(q_res[0]):
@@ -48,31 +105,37 @@ def is_user_expired(telegram_user_id):
return ret
def is_user_premium(telegram_user_id):
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=? AND active=True AND type='premium'", params)
ret = res.fetchone() != None
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='premium'", params)
try:
ret = cur.fetchone() != None
except:
ret = False
con.close()
return ret
def is_user_testing(telegram_user_id):
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=? AND active=True AND type='testing'", params)
ret = res.fetchone() != None
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='testing'", params)
try:
ret = cur.fetchone() != None
except:
ret = False
con.close()
return ret
def add_premium_user(telegram_user_id, until):
found = False
con = sqlite3.connect(constants.DB)
cur = con.cursor()
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=?", params)
con = connect_db()
cur = con.cursor(prepared=True)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params)
if res.fetchone() != None:
params = (until, telegram_user_id)
cur.execute(f"UPDATE users SET active = True, type = 'premium', until = ? WHERE telegram_user_id=?", params)
cur.execute(f"UPDATE users SET active = True, type = 'premium', until = %s WHERE telegram_user_id=%s", params)
con.commit()
found = True
con.close()
@@ -81,13 +144,13 @@ def add_premium_user(telegram_user_id, until):
def add_test_user(telegram_user_id, telegram_name, until):
found = False
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=?", params)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params)
if res.fetchone() is None:
params = (telegram_user_id, True, 'testing', until, telegram_name.first_name)
cur.execute("INSERT INTO users VALUES (?, ?, ?, ?, ?)", params)
cur.execute("INSERT INTO users VALUES (%s, %s, %s, %s, %s)", params)
con.commit()
else:
found = True
@@ -96,84 +159,100 @@ def add_test_user(telegram_user_id, telegram_name, until):
return not found
def remove_valid_user(telegram_user_id):
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=?", params)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params)
if res.fetchone() != None:
cur.execute(f"UPDATE users SET active = False WHERE telegram_user_id=?", params)
cur.execute(f"UPDATE users SET active = False WHERE telegram_user_id=%s", params)
con.commit()
con.close()
logging.info(f"De-activated user {telegram_user_id}")
def get_user_list():
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
res = cur.execute(f"SELECT * FROM users")
ret = res.fetchall()
con.close()
return ret
def get_user(telegram_user_id):
con = sqlite3.connect(constants.DB)
con.row_factory = dict_factory
cur = con.cursor()
con = connect_db()
#con.row_factory = dict_factory
cur = con.cursor(dictionary=True, prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT telegram_name FROM users WHERE telegram_user_id=?", params)
if res != None:
ret = res.fetchone()['telegram_name']
else:
cur.execute(f"SELECT telegram_name FROM users WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchone()['telegram_name']
except Exception as e:
logging.error(f"Couldn't find username with id {telegram_user_id}: {e}")
ret = 'NoName'
con.close()
return ret
def get_user_type(telegram_user_id):
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT type FROM users WHERE telegram_user_id=?", params)
ret = res.fetchone()
cur.execute(f"SELECT type FROM users WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchone()[0]
except:
ret = None
con.close()
return ret[0]
return ret
def get_user_until(telegram_user_id):
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT until FROM users WHERE telegram_user_id=?", params)
ret = res.fetchone()
cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchone()[0]
except:
ret = None
con.close()
return ret[0]
return ret
def get_product(product):
product_name = product.get('product_name').lower()
telegram_user_id = product.get('telegram_user_id')
con = sqlite3.connect(constants.DB)
con.row_factory = dict_factory
cur = con.cursor()
con = connect_db()
#con.row_factory = dict_factory
cur = con.cursor(dictionary=True, prepared=True)
params = (telegram_user_id, product_name)
res = cur.execute(f"SELECT * FROM products WHERE telegram_user_id=? \
AND product_name=?", params)
ret = res.fetchone()
cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s \
AND product_name=%s", params)
try:
ret = cur.fetchone()
except:
ret = None
con.close()
return ret
def get_products_from_user(telegram_user_id):
con = sqlite3.connect(constants.DB)
con.row_factory = dict_factory
cur = con.cursor()
con = connect_db()
#con.row_factory = dict_factory
cur = con.cursor(dictionary=True, prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT * FROM products WHERE telegram_user_id=?", params)
ret = res.fetchall()
cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchall()
except:
ret = None
con.close()
return ret
def get_all_products():
con = sqlite3.connect(constants.DB)
con.row_factory = dict_factory
cur = con.cursor()
res = cur.execute(f"SELECT * FROM products")
ret = res.fetchall()
con = connect_db()
#con.row_factory = dict_factory
cur = con.cursor(dictionary=True, prepared=True)
cur.execute(f"SELECT * FROM products")
try:
ret = cur.fetchall()
except:
ret = None
con.close()
return ret
@@ -181,6 +260,8 @@ def add_product(product):
condition = 'all'
product_name = product.get('product_name').lower()
distance = product.get('distance', 0)
if int(distance) > 1000000000:
distance = 1000000000
latitude = product.get('latitude', constants.LATITUDE)
longitude = product.get('longitude', constants.LONGITUDE)
min_price = product.get('min_price')
@@ -193,17 +274,14 @@ def add_product(product):
category = ''
telegram_user_id = product.get('telegram_user_id')
logging.info(f"Trying to add: {product_name}, {telegram_user_id}")
con = sqlite3.connect(constants.DB)
cur = con.cursor()
params = (telegram_user_id, product_name)
res = cur.execute(f"SELECT * FROM products WHERE telegram_user_id=? \
AND product_name=?", params)
if res.fetchone() is None:
params = (product_name, \
distance, latitude, longitude, condition, min_price, \
max_price, category, subcategory, title_exclude, title_description_exclude, telegram_user_id)
cur.execute("INSERT INTO products VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", params)
con.commit()
con = connect_db()
cur = con.cursor(prepared=True)
params = (product_name, \
distance, latitude, longitude, condition, min_price, \
max_price, category, subcategory, title_exclude, title_description_exclude, telegram_user_id)
cur.execute("INSERT INTO `products` (`product_name`, `distance`, `latitude`, `longitude`, `condition`, `min_price`, `max_price`, `category`, `subcategory`, `title_exclude`, `title_description_exclude`, `telegram_user_id`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", params)
con.commit()
logging.info(f"{product_name} added for {telegram_user_id}")
con.close()
def remove_product(product):
@@ -211,11 +289,11 @@ def remove_product(product):
product_name = product.get('product_name').lower()
removed = False
if get_product(product):
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id, product_name)
res = cur.execute(f"DELETE FROM products WHERE telegram_user_id=? \
AND product_name=?", params)
cur.execute(f"DELETE FROM products WHERE telegram_user_id=%s \
AND product_name=%s", params)
con.commit()
con.close()
logging.info(f"Removed product {product['product_name']}")
@@ -223,10 +301,13 @@ def remove_product(product):
return removed
def count_user_products(telegram_user_id):
con = sqlite3.connect(constants.DB)
cur = con.cursor()
con = connect_db()
cur = con.cursor(prepared=True)
params = (telegram_user_id,)
res = cur.execute(f"SELECT Count() FROM products WHERE telegram_user_id=?", params)
ret = res.fetchone()[0]
cur.execute(f"SELECT Count(*) FROM products WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchone()[0]
except:
ret = None
con.close()
return ret

View File

@@ -1,7 +1,5 @@
import json
import threading
import logging
import prettytable
import helpers
import walladb
import constants
@@ -20,6 +18,8 @@ from telegram.ext import (
filters
)
SEARCH_THREADS_LIST = []
ACTION, ADD_PRODUCT_NAME, ADD_PRODUCT_MIN_PRICE, ADD_PRODUCT_MAX_PRICE, \
ADD_PRODUCT_CATEGORY, ADD_PRODUCT_TITLE_EXCLUDE, ADD_PRODUCT_DESCRIPTION_EXCLUDE, \
ADD_PRODUCT_COORDS, ADD_PRODUCT_DISTANCE, REMOVE_PRODUCT, LIST, FINISH, CONTINUE_OR_FINISH = range(13)
@@ -304,6 +304,7 @@ async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
return ConversationHandler.END
async def remove_product(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
global SEARCH_THREADS_LIST
await update.callback_query.edit_message_reply_markup(None)
query = update.callback_query
product_name = query.data
@@ -312,6 +313,9 @@ async def remove_product(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
if walladb.remove_product({'product_name' : product_name, \
'telegram_user_id' : telegram_user_id}):
message = f"¡{product_name} borrado de la lista de seguimiento!"
product_thread = helpers.get_thread(product_name)
if product_thread != None:
product_thread.stop()
await context.bot.send_message(chat_id=update.effective_chat.id, text=message)
return ConversationHandler.END
@@ -324,8 +328,8 @@ async def product_details(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
if product:
categories = helpers.generate_categories_string(product['category'], product['subcategory'])
text = f"*Nombre del producto:* {helpers.telegram_escape_characters(product['product_name'])}\n\
*Precio desde *{product['min_price']}€ *hasta *{product['max_price']}\n\
*En las coordenadas *{helpers.telegram_escape_characters(product['latitude'])}, {helpers.telegram_escape_characters(product['longitude'])} *y a *{product['distance']}km *de estas*\n\
*Precio desde *{helpers.telegram_escape_characters(str(product['min_price']))}€ *hasta *{helpers.telegram_escape_characters(str(product['max_price']))}\n\
*En las coordenadas *{helpers.telegram_escape_characters(str(product['latitude']))}, {helpers.telegram_escape_characters(str(product['longitude']))} *y a *{product['distance']}km *de estas*\n\
*En las categorías: *{helpers.telegram_escape_characters(categories)}\n\
*Palabras excluídas del título: *`{helpers.telegram_escape_characters(product['title_exclude'])}`\n\
*Palabras excluídas del título y la descripción: *`{helpers.telegram_escape_characters(product['title_description_exclude'])}`"
@@ -376,7 +380,7 @@ async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
if walladb.is_user_valid(telegram_user_id):
type = walladb.get_user_type(telegram_user_id)
until = walladb.get_user_until(telegram_user_id)
message = f"Tu cuenta es tipo: {type} y caduca el {until}."
message = f"Tu cuenta es tipo: {type} y caduca el {helpers.get_spanish_date(until)}."
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
@@ -385,7 +389,7 @@ async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
if not walladb.is_user_valid(telegram_user_id):
until = helpers.get_date_ahead(7)
walladb.add_test_user(telegram_user_id, telegram_user_name, until)
message = f"Periodo de prueba activado hasta el {until}."
message = f"Periodo de prueba activado hasta el {helpers.get_spanish_date(until)}."
else:
message = "Ya has utilizado el periodo de prueba."
if walladb.is_user_testing(telegram_user_id):
@@ -395,10 +399,12 @@ async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
async def add_to_db_and_send(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
global SEARCH_THREADS_LIST
logging.info(f"Adding product with context: {context.user_data}")
walladb.add_product(context.user_data)
p = threading.Thread(target=Worker.run, args=(walladb.get_product(context.user_data), ))
p.start()
SEARCH_THREADS_LIST.append((context.user_data, p))
await context.bot.send_message(chat_id=update.effective_chat.id, text=f"¡*{helpers.telegram_escape_characters(context.user_data['product_name'])}* añadido correctamente\!", parse_mode=ParseMode.MARKDOWN_V2)
def error(update, context):
@@ -410,7 +416,25 @@ async def conv_timeout(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
async def conv_finish(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Vuelve a usar el menú si quieres añadir otro producto.")
async def list_threads(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if helpers.is_user_admin(update.message.chat_id):
global SEARCH_THREADS_LIST
tmp_search_threads_list = []
threads_string = ""
for thread in SEARCH_THREADS_LIST:
if thread[1].is_alive():
tmp_search_threads_list.append((thread[0], thread[1]))
threads_string = threads_string + f"{thread[0]['product_name']} - {thread[0]['telegram_user_id']}\n"
SEARCH_THREADS_LIST = tmp_search_threads_list
if len(threads_string) > 2000:
my_strings = [(threads_string[i:i+2000]) for i in range(0, len(threads_string), 2000)]
for my_string in my_strings:
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{my_string}"))
else:
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{threads_string}"))
def main()->None:
global SEARCH_THREADS_LIST
walladb.setup_db()
products = walladb.get_all_products()
@@ -418,6 +442,7 @@ def main()->None:
logging.info(product)
p = threading.Thread(target=Worker.run, args=(product, ))
p.start()
SEARCH_THREADS_LIST.append((product, p))
"""Start the bot."""
# Create the Application and pass it your bot's token.
@@ -430,6 +455,7 @@ def main()->None:
application.add_handler(CommandHandler("remove_user", remove_user_command))
application.add_handler(CommandHandler("status", status_command))
application.add_handler(CommandHandler("test", test_command))
application.add_handler(CommandHandler("list_threads", list_threads))
#application.add_handler(CallbackQueryHandler("list", send_list()))
#application.add_handler(CallbackQueryHandler(pattern="list", callback=send_list()))

View File

@@ -15,6 +15,8 @@ logger = logging.getLogger(__name__)
class Worker:
_stop = False
def is_valid_request(self, product):
is_valid = False
if walladb.get_product(product):
@@ -26,6 +28,7 @@ class Worker:
return is_valid
def request(self, product_name, n_articles, latitude=constants.LATITUDE, longitude=constants.LONGITUDE, distance='0', condition='all', min_price=0, max_price=10000000, category="", subcategories=[]):
distance = str(int(distance) * 1000)
url = (f"http://api.wallapop.com/api/v3/general/search?keywords={product_name}"
f"&order_by=newest&latitude={latitude}"
f"&longitude={longitude}"
@@ -46,10 +49,13 @@ class Worker:
url_subcategories = url_subcategories + f"{subcategory},"
url = url + f"&object_type_ids={url_subcategories[:-1]}"
search_objects = list()
for step in range(15):
while True:
helpers.random_wait()
response = requests.get(url+f"&step={step+1}")
search_objects = search_objects + response.json()['search_objects']
try:
if response.status_code == 200:
break
@@ -59,8 +65,7 @@ class Worker:
logging.info("Exception: " + e)
time.sleep(3)
json_data = response.json()
return json_data['search_objects']
return search_objects
def first_run(self, product):
logging.info(f"First run for {product['product_name']} for {walladb.get_user(product['telegram_user_id'])}")
@@ -98,7 +103,7 @@ class Worker:
helpers.random_wait() # Random wait to make requests separated in time in order to prevent API rate limit
exec_times = []
while True:
if not self.is_valid_request(product):
if not self.is_valid_request(product) or self._stop == True:
logging.info(f"{product['product_name']} not valid anymore, exiting worker")
break # Exits and ends worker thread
start_time = time.time()
@@ -188,9 +193,15 @@ class Worker:
#time.sleep(10)
logging.info(f"Wallapop monitor worker stopped for: \'{product['product_name']}\'")
def stop(self):
self._stop = True
def run(product):
worker = Worker()
list = worker.first_run(product)
try:
list = worker.first_run(product)
except:
logging.info(f"{product['product_name']} worker crashed.")
#time.sleep(constants.SLEEP_TIME)
while True:
try: