Files
wallamanta/wallamanta/worker.py

196 lines
10 KiB
Python

import time
import requests
import logging
import helpers
import walladb
import constants
import sys
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
class Worker:
_stop = False
def request(self, product_name, steps=15, latitude=constants.LATITUDE, longitude=constants.LONGITUDE, distance='0', condition='all', min_price=0, max_price=10000000, category="", subcategories=[]):
distance = str(int(distance) * 1000)
url = (f"https://api.wallapop.com/api/v3/general/search?keywords={product_name}"
f"&order_by=newest&latitude={latitude}"
f"&longitude={longitude}"
f"&distance={distance}"
f"&min_sale_price={min_price}"
f"&max_sale_price={max_price}"
f"&filters_source=quick_filters&language=es_ES")
if condition != "all":
url = url + f"&condition={condition}" # new, as_good_as_new, good, fair, has_given_it_all
if category != "":
url = url + f"&category_ids={category}"
if len(subcategories) > 0:
url_subcategories = ''
for subcategory in subcategories:
url_subcategories = url_subcategories + f"{subcategory},"
url = url + f"&object_type_ids={url_subcategories[:-1]}"
search_objects = list()
for step in range(steps):
tries = 5
for _ in range(tries):
helpers.random_wait()
response = requests.get(url+f"&step={step}")
try:
if response.status_code == 200:
search_objects = search_objects + response.json()['search_objects']
break
else:
logging.info(f"\'{product_name}\' -> Wallapop returned status {response.status_code}. Illegal parameters or Wallapop service is down. Retrying...")
except Exception as e:
logging.info("Error while querying Wallapop, try #{_}: " + e)
time.sleep(3)
return search_objects
def first_run(self, product):
for _ in range(10):
helpers.random_wait()
logging.info(f"First run for {product['product_name']} for {walladb.get_user(product['telegram_user_id'])} ({walladb.get_user(product['telegram_user_id'])})")
list = []
if not helpers.is_valid_request(product):
return list
if product['category'] == '':
articles = self.request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], product['category'])
for article in articles:
#list[article['id']] = 1
list.insert(0, article['id'])
else:
if '0' in product['category'].split(','):
articles = self.request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'])
for article in articles:
#list[article['id']] = 1
list.insert(0, article['id'])
else:
for category in product['category'].split(','):
if product['subcategory'] == '' or not helpers.has_subcategory(category):
articles = self.request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category)
for article in articles:
#list[article['id']] = 1
list.insert(0, article['id'])
else:
subcategories = []
for subcategory in product['subcategory'].split(','):
if helpers.is_subcategory(category, subcategory):
subcategories.append(subcategory)
articles = self.request(product['product_name'], 15, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category, subcategories)
for article in articles:
#list[article['id']] = 1
list.insert(0, article['id'])
return list
def work(self, product, list):
helpers.random_wait() # Random wait to make requests separated in time in order to prevent API rate limit
exec_times = []
while True:
#logging.info(f"List for {product['product_name']} length is: {len(list)}")
if not helpers.is_valid_request(product) or self._stop == True:
logging.info(f"{product['product_name']} not valid anymore, exiting worker")
break # Exits and ends worker thread
start_time = time.time()
articles_list = []
if product['category'] == '':
articles_list.append(self.request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price']))
else:
if '0' in product['category'].split(','):
articles_list.append(self.request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price']))
else:
for category in product['category'].split(','):
if product['subcategory'] == '' or not helpers.has_subcategory(category):
articles_list.append(self.request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category))
else:
subcategories = []
for subcategory in product['subcategory'].split(','):
if helpers.is_subcategory(category, subcategory):
subcategories.append(subcategory)
articles_list.append(self.request(product['product_name'], 1, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], category, subcategories))
for articles in articles_list:
for article in articles:
if not article['id'] in list:
logging.info(f"Found article {article['title']} for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']})")
try:
if not self.has_excluded_words(article['title'].lower(), article['description'].lower(), product['title_description_exclude']) and not self.is_title_key_word_excluded(article['title'].lower(), product['title_exclude']):
try:
helpers.send_article(article, product)
except:
helpers.send_article(article, product)
time.sleep(1) #Avoid telegram flood restriction
list.insert(0, article['id'])
except Exception as e:
logging.info("---------- EXCEPTION -----------")
logging.info(f"{product['product_name']} worker crashed. {e}")
logging.info(f"{product['product_name']}: Trying to parse {article['id']}: {article['title']} .\n")
if len(list) > 600:
del list[600:]
if len(exec_times) > 50:
del exec_times[50:]
time.sleep(constants.SLEEP_TIME)
exec_times.insert(0, time.time() - start_time)
logging.info(f"\'{product['product_name']}\' for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']}) node-> last: {exec_times[0]:.2f} max: {self.get_max_time(exec_times):.2f} avg: {self.get_average_time(exec_times):.2f} - Size of articles_list: {round(sys.getsizeof(articles_list)/1024, 2)}Kb. Size of list: {round(sys.getsizeof(list)/1024, 2)}Kb. Size of exec_times: {round(sys.getsizeof(exec_times)/1024, 2)}Kb")
def has_excluded_words(self, title, description, excluded_words):
if len(excluded_words) > 0:
for word in excluded_words.split(","):
logging.info("EXCLUDER: Checking '" + word + "' for title: '" + title)
if word.lower().lstrip().rstrip() in title.lower() or word.lower().lstrip().rstrip() in description.lower():
logging.info("EXCLUDE!")
return True
return False
def is_title_key_word_excluded(self, title, excluded_words):
if len(excluded_words) > 0:
for word in excluded_words.split(","):
logging.info("Checking '" + word + "' for title: '" + title)
if word.lower().lstrip().rstrip() in title.lower():
return True
return False
def get_average_time(self, exec_times):
sum = 0
for i in exec_times:
sum = sum + i
return sum / len(exec_times)
def get_max_time(self, exec_times):
largest = 0
for i in exec_times:
if i > largest:
largest = i
return largest
def stop(self):
self._stop = True
def run(product):
worker = Worker()
try:
list = worker.first_run(product)
except:
logging.info(f"{product['product_name']} worker crashed.")
#time.sleep(constants.SLEEP_TIME)
while True:
try:
worker.work(product, list)
break
except Exception as e:
logging.info(f"Exception: {e}")
logging.info(f"{product['product_name']} worker crashed. Restarting worker...")
time.sleep(10)
logging.info(f"Wallapop monitor worker stopped for: \'{product['product_name']}\'")