Modified a lot of things. Now it works with ThreadPoolExecutor

This commit is contained in:
Joan Cano
2023-08-23 16:08:16 +02:00
parent 01dc0f2695
commit 454b1414b6
6 changed files with 327 additions and 406 deletions

View File

@@ -15,6 +15,7 @@ LONGITUDE = os.getenv("LONGITUDE")
SLEEP_TIME = int(os.getenv("SLEEP_TIME")) SLEEP_TIME = int(os.getenv("SLEEP_TIME"))
NEW_RELIC_INSERT_KEY = os.getenv("NEW_RELIC_INSERT_KEY") NEW_RELIC_INSERT_KEY = os.getenv("NEW_RELIC_INSERT_KEY")
NR_ENV = os.getenv("NR_ENV") NR_ENV = os.getenv("NR_ENV")
MAX_WORKERS = 12

View File

@@ -5,7 +5,6 @@ import logging
import constants import constants
import pytz import pytz
import walladb import walladb
import asyncio
import json import json
from newrelic_telemetry_sdk import Event, EventClient from newrelic_telemetry_sdk import Event, EventClient
@@ -60,9 +59,6 @@ def is_date_expired(until):
def random_wait(): def random_wait():
time.sleep(random.random()) time.sleep(random.random())
async def random_async_wait():
await asyncio.sleep(random.random())
def download_image(article): def download_image(article):
r = requests.get(article['images'][0]['original']) r = requests.get(article['images'][0]['original'])
if r.status_code == 200: if r.status_code == 200:
@@ -179,12 +175,17 @@ def send_article(article, product):
} }
response = requests.post(f'https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto', files=files) response = requests.post(f'https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto', files=files)
while response.status_code != 200:
logging.info(f"Error sending to Telegram, probably flood restricted. {product['product_name']} ({product['id']}) - ({walladb.get_user(product['telegram_user_id'])}")
random_wait()
response = requests.post(f'https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto', files=files)
image.close() image.close()
#response = await application.bot.send_photo(chat_id=product['telegram_user_id'], photo=open(f"/app/data/images/products/{article['id']}_composed.png", 'rb'), caption=text, parse_mode=ParseMode.MARKDOWN_V2, reply_markup=markup) #response = await application.bot.send_photo(chat_id=product['telegram_user_id'], photo=open(f"/app/data/images/products/{article['id']}_composed.png", 'rb'), caption=text, parse_mode=ParseMode.MARKDOWN_V2, reply_markup=markup)
#logging.info(requests.post(url, files=files).content) #logging.info(requests.post(url, files=files).content)
send_to_nr(article, product) send_to_nr(article, product)
logging.info(response.content) #logging.info(response.content)
logging.info(f"'{title}' (https://es.wallapop.com/item/{article['web_slug']}) sent to {walladb.get_user(product['telegram_user_id'])}")
def get_category_name(category): def get_category_name(category):
category = int(category) category = int(category)

View File

@@ -1,24 +1,52 @@
import threading
import time import time
import concurrent.futures import concurrent.futures
import logging
import worker import worker
import walladb import walladb
import helpers
import constants import constants
def work(): # Enable logging
searches = [] logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
def work():
searches = {}
while True:
products = walladb.get_all_valid_products() products = walladb.get_all_valid_products()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: first_run_products = []
future_to_product = {executor.submit(worker.Worker.first_run, product, 60): product for product in products}
for product in products:
try:
searches[product['id']]
except:
logging.info(f"New product added: {product['product_name']} ({product['id']}) - ({walladb.get_user(product['telegram_user_id'])}")
first_run_products.append(product)
if len(first_run_products) > 0:
with concurrent.futures.ThreadPoolExecutor(max_workers=constants.MAX_WORKERS) as executor:
future_to_product = {executor.submit(worker.first_run, product): product for product in first_run_products}
for future in concurrent.futures.as_completed(future_to_product): for future in concurrent.futures.as_completed(future_to_product):
product = future_to_product[future] product = future_to_product[future]
try: try:
searches[product['id']] = future.result() searches[product['id']] = future.result()
#first_run_list = future.result()
except Exception as e: except Exception as e:
print(f"Error occurred: {e}") logging.info(f"Error occurred in first run for {product['product_name']} ({product['id']}) - ({walladb.get_user(product['telegram_user_id'])}): {e}")
else: else:
print(f"First run for {product['name']} - {product['id']} got: {searches[product['id']]}") logging.info(f"First run for {product['product_name']} ({product['id']}) - ({walladb.get_user(product['telegram_user_id'])}) got: {len(searches[product['id']])}")
with concurrent.futures.ThreadPoolExecutor(max_workers=constants.MAX_WORKERS) as executor:
future_to_search = {executor.submit(worker.work, product, searches[product['id']]): product for product in products}
for future in concurrent.futures.as_completed(future_to_search):
product = future_to_search[future]
try:
searches[product['id']] = future.result()
except Exception as e:
logging.info(f"Error occurred in search for {product['product_name']} ({product['id']}) - ({walladb.get_user(product['telegram_user_id'])}): {e}")
else:
logging.info(f"Search for {product['product_name']} ({product['id']}) - ({walladb.get_user(product['telegram_user_id'])})")
time.sleep(constants.SLEEP_TIME)

View File

@@ -58,8 +58,8 @@ def setup_db():
" `created_at` timestamp DEFAULT CURRENT_TIMESTAMP" " `created_at` timestamp DEFAULT CURRENT_TIMESTAMP"
") ENGINE=InnoDB") ") ENGINE=InnoDB")
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
for table_name in TABLES: for table_name in TABLES:
table_description = TABLES[table_name] table_description = TABLES[table_name]
@@ -73,27 +73,24 @@ def setup_db():
logging.info(err.msg) logging.info(err.msg)
else: else:
logging.info("OK") logging.info("OK")
cur.close()
con.close()
def is_user_valid(telegram_user_id): def is_user_valid(telegram_user_id):
if telegram_user_id < 0: if telegram_user_id < 0:
ret = False ret = False
else: else:
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True", params) cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True", params)
try: try:
ret = cur.fetchone() != None ret = cur.fetchone() != None
except: except:
ret = False ret = False
con.close()
return ret return ret
def is_user_expired(telegram_user_id): def is_user_expired(telegram_user_id):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
res = cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params) res = cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params)
try: try:
@@ -104,51 +101,47 @@ def is_user_expired(telegram_user_id):
if q_res != None: if q_res != None:
if not helpers.is_date_expired(q_res[0]): if not helpers.is_date_expired(q_res[0]):
ret = False ret = False
con.close()
return ret return ret
def is_user_premium(telegram_user_id): def is_user_premium(telegram_user_id):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='premium'", params) cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='premium'", params)
try: try:
ret = cur.fetchone() != None ret = cur.fetchone() != None
except: except:
ret = False ret = False
con.close()
return ret return ret
def is_user_testing(telegram_user_id): def is_user_testing(telegram_user_id):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='testing'", params) cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='testing'", params)
try: try:
ret = cur.fetchone() != None ret = cur.fetchone() != None
except: except:
ret = False ret = False
con.close()
return ret return ret
def add_premium_user(telegram_user_id, until): def add_premium_user(telegram_user_id, until):
found = False found = False
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params) res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params)
if res.fetchone() != None: if res.fetchone() != None:
params = (until, telegram_user_id) params = (until, telegram_user_id)
cur.execute(f"UPDATE users SET active = True, type = 'premium', until = %s WHERE telegram_user_id=%s", params) cur.execute(f"UPDATE users SET active = True, type = 'premium', until = %s WHERE telegram_user_id=%s", params)
con.commit() con.commit()
found = True found = True
con.close()
logging.info(f"Added premium user {telegram_user_id} until {until}") logging.info(f"Added premium user {telegram_user_id} until {until}")
return found return found
def add_test_user(telegram_user_id, telegram_name, until): def add_test_user(telegram_user_id, telegram_name, until):
found = False found = False
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params) cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params)
try: try:
@@ -159,33 +152,29 @@ def add_test_user(telegram_user_id, telegram_name, until):
found = True found = True
except Exception as e: except Exception as e:
logging.error(f"Couldn't find username with id {telegram_user_id}: {e}") logging.error(f"Couldn't find username with id {telegram_user_id}: {e}")
con.close()
logging.info(f"Added test user {telegram_user_id} until {until}") logging.info(f"Added test user {telegram_user_id} until {until}")
return not found return not found
def remove_valid_user(telegram_user_id): def remove_valid_user(telegram_user_id):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params) res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params)
if res.fetchone() != None: if res.fetchone() != None:
cur.execute(f"UPDATE users SET active = False WHERE telegram_user_id=%s", params) cur.execute(f"UPDATE users SET active = False WHERE telegram_user_id=%s", params)
con.commit() con.commit()
con.close()
logging.info(f"De-activated user {telegram_user_id}") logging.info(f"De-activated user {telegram_user_id}")
def get_user_list(): def get_user_list():
con = connect_db() with connect_db() as con:
cur = con.cursor(dictionary=True, prepared=True) with con.cursor(dictionary=True, prepared=True) as cur:
res = cur.execute(f"SELECT * FROM users") res = cur.execute(f"SELECT * FROM users")
ret = cur.fetchall() ret = cur.fetchall()
con.close()
return ret return ret
def get_user(telegram_user_id): def get_user(telegram_user_id):
con = connect_db() with connect_db() as con:
#con.row_factory = dict_factory with con.cursor(dictionary=True, prepared=True) as cur:
cur = con.cursor(dictionary=True, prepared=True)
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"SELECT telegram_name FROM users WHERE telegram_user_id=%s", params) cur.execute(f"SELECT telegram_name FROM users WHERE telegram_user_id=%s", params)
try: try:
@@ -193,57 +182,51 @@ def get_user(telegram_user_id):
except Exception as e: except Exception as e:
logging.error(f"Couldn't find username with id {telegram_user_id}: {e}") logging.error(f"Couldn't find username with id {telegram_user_id}: {e}")
ret = 'NoName' ret = 'NoName'
con.close()
return ret return ret
def get_user_type(telegram_user_id): def get_user_type(telegram_user_id):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"SELECT type FROM users WHERE telegram_user_id=%s", params) cur.execute(f"SELECT type FROM users WHERE telegram_user_id=%s", params)
try: try:
ret = cur.fetchone()[0] ret = cur.fetchone()[0]
except: except:
ret = None ret = None
con.close()
return ret return ret
def get_user_until(telegram_user_id): def get_user_until(telegram_user_id):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params) cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params)
try: try:
ret = cur.fetchone()[0] ret = cur.fetchone()[0]
except: except:
ret = None ret = None
con.close()
return ret return ret
def deactivate_user(telegram_user_id): def deactivate_user(telegram_user_id):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"UPDATE users SET active=False WHERE telegram_user_id=%s", params) cur.execute(f"UPDATE users SET active=False WHERE telegram_user_id=%s", params)
con.commit() con.commit()
con.close()
logging.info(f"De-activated user {get_user(telegram_user_id)} ({telegram_user_id})") logging.info(f"De-activated user {get_user(telegram_user_id)} ({telegram_user_id})")
def activate_user(telegram_user_id): def activate_user(telegram_user_id):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"UPDATE users SET active=True WHERE telegram_user_id=%s", params) cur.execute(f"UPDATE users SET active=True WHERE telegram_user_id=%s", params)
con.commit() con.commit()
con.close()
logging.info(f"Activated user {get_user(telegram_user_id)} ({telegram_user_id})") logging.info(f"Activated user {get_user(telegram_user_id)} ({telegram_user_id})")
def get_product(product): def get_product(product):
product_name = product.get('product_name').lower() product_name = product.get('product_name').lower()
telegram_user_id = product.get('telegram_user_id') telegram_user_id = product.get('telegram_user_id')
con = connect_db() with connect_db() as con:
#con.row_factory = dict_factory with con.cursor(dictionary=True, prepared=True) as cur:
cur = con.cursor(dictionary=True, prepared=True)
params = (telegram_user_id, product_name) params = (telegram_user_id, product_name)
cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s \ cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s \
AND product_name=%s", params) AND product_name=%s", params)
@@ -251,37 +234,31 @@ def get_product(product):
ret = cur.fetchone() ret = cur.fetchone()
except: except:
ret = None ret = None
con.close()
return ret return ret
def get_products_from_user(telegram_user_id): def get_products_from_user(telegram_user_id):
con = connect_db() with connect_db() as con:
#con.row_factory = dict_factory with con.cursor(dictionary=True, prepared=True) as cur:
cur = con.cursor(dictionary=True, prepared=True)
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s", params) cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s", params)
try: try:
ret = cur.fetchall() ret = cur.fetchall()
except: except:
ret = None ret = None
con.close()
return ret return ret
def get_all_products(): def get_all_products():
con = connect_db() with connect_db() as con:
#con.row_factory = dict_factory with con.cursor(dictionary=True, prepared=True) as cur:
cur = con.cursor(dictionary=True, prepared=True)
cur.execute(f"SELECT * FROM products") cur.execute(f"SELECT * FROM products")
try: try:
ret = cur.fetchall() ret = cur.fetchall()
except: except:
ret = None ret = None
con.close()
return ret return ret
def get_all_valid_products(): def get_all_valid_products():
with connect_db() as con: with connect_db() as con:
#con.row_factory = dict_factory
with con.cursor(dictionary=True, prepared=True) as cur: with con.cursor(dictionary=True, prepared=True) as cur:
cur.execute(f"SELECT products.* FROM products WHERE products.telegram_user_id IN (SELECT users.telegram_user_id FROM users WHERE active = 1)") cur.execute(f"SELECT products.* FROM products WHERE products.telegram_user_id IN (SELECT users.telegram_user_id FROM users WHERE active = 1)")
try: try:
@@ -308,40 +285,37 @@ def add_product(product):
category = '' category = ''
telegram_user_id = product.get('telegram_user_id') telegram_user_id = product.get('telegram_user_id')
logging.info(f"Trying to add: {product_name}, {telegram_user_id}") logging.info(f"Trying to add: {product_name}, {telegram_user_id}")
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (product_name, \ params = (product_name, \
distance, latitude, longitude, condition, min_price, \ distance, latitude, longitude, condition, min_price, \
max_price, category, subcategory, title_exclude, title_description_exclude, telegram_user_id) max_price, category, subcategory, title_exclude, title_description_exclude, telegram_user_id)
cur.execute("INSERT INTO `products` (`product_name`, `distance`, `latitude`, `longitude`, `condition`, `min_price`, `max_price`, `category`, `subcategory`, `title_exclude`, `title_description_exclude`, `telegram_user_id`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", params) cur.execute("INSERT INTO `products` (`product_name`, `distance`, `latitude`, `longitude`, `condition`, `min_price`, `max_price`, `category`, `subcategory`, `title_exclude`, `title_description_exclude`, `telegram_user_id`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", params)
con.commit() con.commit()
logging.info(f"{product_name} added for {telegram_user_id}") logging.info(f"{product_name} added for {telegram_user_id}")
con.close()
def remove_product(product): def remove_product(product):
telegram_user_id = product.get('telegram_user_id') telegram_user_id = product.get('telegram_user_id')
product_name = product.get('product_name').lower() product_name = product.get('product_name').lower()
removed = False removed = False
if get_product(product): if get_product(product):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id, product_name) params = (telegram_user_id, product_name)
cur.execute(f"DELETE FROM products WHERE telegram_user_id=%s \ cur.execute(f"DELETE FROM products WHERE telegram_user_id=%s \
AND product_name=%s", params) AND product_name=%s", params)
con.commit() con.commit()
con.close()
logging.info(f"Removed product {product['product_name']}") logging.info(f"Removed product {product['product_name']}")
removed = True removed = True
return removed return removed
def count_user_products(telegram_user_id): def count_user_products(telegram_user_id):
con = connect_db() with connect_db() as con:
cur = con.cursor(prepared=True) with con.cursor(prepared=True) as cur:
params = (telegram_user_id,) params = (telegram_user_id,)
cur.execute(f"SELECT Count(*) FROM products WHERE telegram_user_id=%s", params) cur.execute(f"SELECT Count(*) FROM products WHERE telegram_user_id=%s", params)
try: try:
ret = cur.fetchone()[0] ret = cur.fetchone()[0]
except: except:
ret = None ret = None
con.close()
return ret return ret

View File

@@ -5,8 +5,8 @@ import walladb
import constants import constants
import account_checker import account_checker
import time import time
import search_manager
from worker import Worker
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ForceReply from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ForceReply
from telegram.constants import ParseMode from telegram.constants import ParseMode
from telegram.ext import ( from telegram.ext import (
@@ -33,6 +33,9 @@ logging.basicConfig(
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
httpx_logger = logging.getLogger('httpx')
httpx_logger.setLevel(logging.WARNING)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if walladb.is_user_valid(helpers.get_telegram_user_id(update)): if walladb.is_user_valid(helpers.get_telegram_user_id(update)):
message = "Utiliza el menú de la conversación para añadir un producto y sigue los pasos indicados. Si tienes cualquier duda contacta con @jocarduck para más información." message = "Utiliza el menú de la conversación para añadir un producto y sigue los pasos indicados. Si tienes cualquier duda contacta con @jocarduck para más información."
@@ -315,9 +318,6 @@ async def remove_product(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
if walladb.remove_product({'product_name' : product_name, \ if walladb.remove_product({'product_name' : product_name, \
'telegram_user_id' : telegram_user_id}): 'telegram_user_id' : telegram_user_id}):
message = f"¡{product_name} borrado de la lista de seguimiento!" message = f"¡{product_name} borrado de la lista de seguimiento!"
#product_thread = helpers.get_thread(product_name)
#if product_thread != None:
# product_thread.stop()
await context.bot.send_message(chat_id=update.effective_chat.id, text=message) await context.bot.send_message(chat_id=update.effective_chat.id, text=message)
return ConversationHandler.END return ConversationHandler.END
@@ -361,12 +361,6 @@ async def add_premium_user_command(update: Update, context: ContextTypes.DEFAULT
days = update.message.text.split('/add_premium_user ')[1].split(' ')[1] days = update.message.text.split('/add_premium_user ')[1].split(' ')[1]
until = helpers.get_date_ahead(int(days)) until = helpers.get_date_ahead(int(days))
if not walladb.add_premium_user(telegram_user_id, until): if not walladb.add_premium_user(telegram_user_id, until):
products = walladb.get_products_from_user(telegram_user_id)
for product in products:
logging.info(product)
p = threading.Thread(target=Worker.run, args=(product, ))
p.start()
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} re-activado hasta {until}. Re-activando productos.")) await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} re-activado hasta {until}. Re-activando productos."))
else: else:
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} añadido hasta {until}.")) await update.message.reply_markdown_v2(helpers.telegram_escape_characters(f"{telegram_user_id} añadido hasta {until}."))
@@ -404,12 +398,8 @@ async def test_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message)) await update.message.reply_markdown_v2(helpers.telegram_escape_characters(message))
async def add_to_db_and_send(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def add_to_db_and_send(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
global SEARCH_THREADS_LIST
logging.info(f"Adding product with context: {context.user_data}") logging.info(f"Adding product with context: {context.user_data}")
walladb.add_product(context.user_data) walladb.add_product(context.user_data)
p = threading.Thread(target=Worker.run, args=(walladb.get_product(context.user_data), ))
p.start()
SEARCH_THREADS_LIST.append((context.user_data, p))
await context.bot.send_message(chat_id=update.effective_chat.id, text=f"¡*{helpers.telegram_escape_characters(context.user_data['product_name'])}* añadido correctamente\!", parse_mode=ParseMode.MARKDOWN_V2) await context.bot.send_message(chat_id=update.effective_chat.id, text=f"¡*{helpers.telegram_escape_characters(context.user_data['product_name'])}* añadido correctamente\!", parse_mode=ParseMode.MARKDOWN_V2)
def error(update, context): def error(update, context):
@@ -445,24 +435,14 @@ def count_threads():
time.sleep(60) time.sleep(60)
def main()->None: def main()->None:
global SEARCH_THREADS_LIST
walladb.setup_db() walladb.setup_db()
products = walladb.get_all_products()
"""Start the bot.""" """Start the bot."""
# Create the Application and pass it your bot's token. # Create the Application and pass it your bot's token.
application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build() application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build()
logging.info(f"Loading {len(products)} products...") p = threading.Thread(target=search_manager.work)
count = 0
for product in products:
if helpers.is_valid_request(product):
count = count + 1
logging.info(product)
p = threading.Thread(target=Worker.run, args=(product, ))
p.start() p.start()
SEARCH_THREADS_LIST.append((product, p))
logging.info(f"{count} products finally loaded")
p = threading.Thread(target=account_checker.work, args=(3600, )) p = threading.Thread(target=account_checker.work, args=(3600, ))
p.start() p.start()

View File

@@ -4,7 +4,6 @@ import logging
import helpers import helpers
import walladb import walladb
import constants import constants
import sys
# Enable logging # Enable logging
logging.basicConfig( logging.basicConfig(
@@ -13,11 +12,7 @@ logging.basicConfig(
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Worker: def request(product_name, steps=15, latitude=constants.LATITUDE, longitude=constants.LONGITUDE, distance='0', condition='all', min_price=0, max_price=10000000, category="", subcategories=[]):
_stop = False
def request(self, product_name, steps=15, latitude=constants.LATITUDE, longitude=constants.LONGITUDE, distance='0', condition='all', min_price=0, max_price=10000000, category="", subcategories=[]):
distance = str(int(distance) * 1000) distance = str(int(distance) * 1000)
url = (f"https://api.wallapop.com/api/v3/general/search?keywords={product_name}" url = (f"https://api.wallapop.com/api/v3/general/search?keywords={product_name}"
f"&order_by=newest&latitude={latitude}" f"&order_by=newest&latitude={latitude}"
@@ -44,7 +39,6 @@ class Worker:
for step in range(steps): for step in range(steps):
tries = 5 tries = 5
for _ in range(tries): for _ in range(tries):
helpers.random_wait()
response = requests.get(url+f"&step={step}") response = requests.get(url+f"&step={step}")
try: try:
if response.status_code == 200: if response.status_code == 200:
@@ -58,92 +52,69 @@ class Worker:
return search_objects return search_objects
def first_run(self, product): def first_run(product):
for _ in range(10):
helpers.random_wait()
logging.info(f"First run for {product['product_name']} for {walladb.get_user(product['telegram_user_id'])} ({walladb.get_user(product['telegram_user_id'])})") logging.info(f"First run for {product['product_name']} for {walladb.get_user(product['telegram_user_id'])} ({walladb.get_user(product['telegram_user_id'])})")
list = [] list = []
if not helpers.is_valid_request(product): if not helpers.is_valid_request(product):
return list return list
if product['category'] == '': if product['category'] == '':
articles = self.request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], product['category']) articles = request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], product['category'])
for article in articles: for article in articles:
#list[article['id']] = 1
list.insert(0, article['id']) list.insert(0, article['id'])
else: else:
if '0' in product['category'].split(','): if '0' in product['category'].split(','):
articles = self.request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price']) articles = request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'])
for article in articles: for article in articles:
#list[article['id']] = 1
list.insert(0, article['id']) list.insert(0, article['id'])
else: else:
for category in product['category'].split(','): for category in product['category'].split(','):
if product['subcategory'] == '' or not helpers.has_subcategory(category): if product['subcategory'] == '' or not helpers.has_subcategory(category):
articles = self.request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category) articles = request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category)
for article in articles: for article in articles:
#list[article['id']] = 1
list.insert(0, article['id']) list.insert(0, article['id'])
else: else:
subcategories = [] subcategories = []
for subcategory in product['subcategory'].split(','): for subcategory in product['subcategory'].split(','):
if helpers.is_subcategory(category, subcategory): if helpers.is_subcategory(category, subcategory):
subcategories.append(subcategory) subcategories.append(subcategory)
articles = self.request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category, subcategories) articles = request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category, subcategories)
for article in articles: for article in articles:
#list[article['id']] = 1
list.insert(0, article['id']) list.insert(0, article['id'])
return list return list
def work(self, product, list): def work(product, list):
helpers.random_wait() # Random wait to make requests separated in time in order to prevent API rate limit logging.info(f"Searching {product['product_name']}")
exec_times = []
while True:
#logging.info(f"List for {product['product_name']} length is: {len(list)}")
if not helpers.is_valid_request(product) or self._stop == True:
logging.info(f"{product['product_name']} not valid anymore, exiting worker")
break # Exits and ends worker thread
start_time = time.time()
articles_list = [] articles_list = []
if product['category'] == '': if product['category'] == '':
articles_list.append(self.request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'])) articles_list.append(request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price']))
else: else:
if '0' in product['category'].split(','): if '0' in product['category'].split(','):
articles_list.append(self.request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'])) articles_list.append(request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price']))
else: else:
for category in product['category'].split(','): for category in product['category'].split(','):
if product['subcategory'] == '' or not helpers.has_subcategory(category): if product['subcategory'] == '' or not helpers.has_subcategory(category):
articles_list.append(self.request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category)) articles_list.append(request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category))
else: else:
subcategories = [] subcategories = []
for subcategory in product['subcategory'].split(','): for subcategory in product['subcategory'].split(','):
if helpers.is_subcategory(category, subcategory): if helpers.is_subcategory(category, subcategory):
subcategories.append(subcategory) subcategories.append(subcategory)
articles_list.append(self.request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category, subcategories)) articles_list.append(request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category, subcategories))
for articles in articles_list: for articles in articles_list:
for article in articles: for article in articles:
if not article['id'] in list: if not article['id'] in list:
logging.info(f"Found article {article['title']} for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']})") logging.info(f"Found article {article['title']} for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']})")
try: try:
if not self.has_excluded_words(article['title'].lower(), article['description'].lower(), product['title_description_exclude']) and not self.is_title_key_word_excluded(article['title'].lower(), product['title_exclude']): if not has_excluded_words(article['title'].lower(), article['description'].lower(), product['title_description_exclude']) and not is_title_key_word_excluded(article['title'].lower(), product['title_exclude']):
try:
helpers.send_article(article, product) helpers.send_article(article, product)
except:
helpers.send_article(article, product)
time.sleep(1) #Avoid telegram flood restriction
list.insert(0, article['id']) list.insert(0, article['id'])
except Exception as e: except Exception as e:
logging.info("---------- EXCEPTION -----------") logging.info("---------- EXCEPTION -----------")
logging.info(f"{product['product_name']} worker crashed. {e}") logging.info(f"{product['product_name']} worker crashed. {e}")
logging.info(f"{product['product_name']}: Trying to parse {article['id']}: {article['title']} .\n") logging.info(f"{product['product_name']}: Trying to parse {article['id']}: {article['title']} .\n")
if len(list) > 600: return list[:600]
del list[600:]
if len(exec_times) > 50:
del exec_times[50:]
time.sleep(constants.SLEEP_TIME)
exec_times.insert(0, time.time() - start_time)
logging.info(f"\'{product['product_name']}\' for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']}) node-> last: {exec_times[0]:.2f} max: {self.get_max_time(exec_times):.2f} avg: {self.get_average_time(exec_times):.2f} - Size of articles_list: {round(sys.getsizeof(articles_list)/1024, 2)}Kb. Size of list: {round(sys.getsizeof(list)/1024, 2)}Kb. Size of exec_times: {round(sys.getsizeof(exec_times)/1024, 2)}Kb")
def has_excluded_words(self, title, description, excluded_words): def has_excluded_words(title, description, excluded_words):
if len(excluded_words) > 0: if len(excluded_words) > 0:
for word in excluded_words.split(","): for word in excluded_words.split(","):
logging.info("EXCLUDER: Checking '" + word + "' for title: '" + title) logging.info("EXCLUDER: Checking '" + word + "' for title: '" + title)
@@ -152,44 +123,10 @@ class Worker:
return True return True
return False return False
def is_title_key_word_excluded(self, title, excluded_words): def is_title_key_word_excluded(title, excluded_words):
if len(excluded_words) > 0: if len(excluded_words) > 0:
for word in excluded_words.split(","): for word in excluded_words.split(","):
logging.info("Checking '" + word + "' for title: '" + title) logging.info("Checking '" + word + "' for title: '" + title)
if word.lower().lstrip().rstrip() in title.lower(): if word.lower().lstrip().rstrip() in title.lower():
return True return True
return False return False
def get_average_time(self, exec_times):
sum = 0
for i in exec_times:
sum = sum + i
return sum / len(exec_times)
def get_max_time(self, exec_times):
largest = 0
for i in exec_times:
if i > largest:
largest = i
return largest
def stop(self):
self._stop = True
def run(product):
worker = Worker()
try:
list = worker.first_run(product)
except:
logging.info(f"{product['product_name']} worker crashed.")
#time.sleep(constants.SLEEP_TIME)
while True:
try:
worker.work(product, list)
break
except Exception as e:
logging.info(f"Exception: {e}")
logging.info(f"{product['product_name']} worker crashed. Restarting worker...")
time.sleep(10)
logging.info(f"Wallapop monitor worker stopped for: \'{product['product_name']}\'")