162 lines
7.5 KiB
Python
162 lines
7.5 KiB
Python
import time
|
|
import requests
|
|
import logging
|
|
import helpers
|
|
import walladb
|
|
import constants
|
|
import asyncio
|
|
|
|
# Enable logging
|
|
logging.basicConfig(
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Worker:
|
|
|
|
def is_valid_request(self, product):
|
|
is_valid = False
|
|
if walladb.get_product(product):
|
|
if not walladb.is_user_expired(product['telegram_user_id']):
|
|
if walladb.is_user_valid(product['telegram_user_id']):
|
|
if walladb.is_user_premium(product['telegram_user_id']) or \
|
|
walladb.is_user_testing(product['telegram_user_id']):
|
|
is_valid = True
|
|
return is_valid
|
|
|
|
def request(self, product_name, n_articles, latitude=constants.LATITUDE, longitude=constants.LONGITUDE, distance='0', condition='all', min_price=0, max_price=10000000, category=""):
|
|
url = (f"http://api.wallapop.com/api/v3/general/search?keywords={product_name}"
|
|
f"&order_by=newest&latitude={latitude}"
|
|
f"&longitude={longitude}"
|
|
f"&distance={distance}"
|
|
f"&min_sale_price={min_price}"
|
|
f"&max_sale_price={max_price}"
|
|
f"&filters_source=quick_filters&language=es_ES")
|
|
|
|
if condition != "all":
|
|
url = url + f"&condition={condition}" # new, as_good_as_new, good, fair, has_given_it_all
|
|
|
|
if category != "":
|
|
url = url + f"&category_ids={category}"
|
|
|
|
for step in range(15):
|
|
while True:
|
|
helpers.random_wait()
|
|
response = requests.get(url+f"&step={step+1}")
|
|
try:
|
|
if response.status_code == 200:
|
|
break
|
|
else:
|
|
logging.info(f"\'{product_name}\' -> Wallapop returned status {response.status_code}. Illegal parameters or Wallapop service is down. Retrying...")
|
|
except Exception as e:
|
|
logging.info("Exception: " + e)
|
|
time.sleep(3)
|
|
|
|
json_data = response.json()
|
|
return json_data['search_objects']
|
|
|
|
def first_run(self, product):
|
|
logging.info(f"First run for {product['product_name']} for {walladb.get_user(product['telegram_user_id'])}")
|
|
for i in range(5):
|
|
helpers.random_wait()
|
|
list = []
|
|
if not self.is_valid_request(product):
|
|
return list
|
|
articles = self.request(product['product_name'], 0, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], product['category'])
|
|
for article in articles:
|
|
list.insert(0, article['id'])
|
|
return list
|
|
|
|
async def work(self, product, list):
|
|
helpers.random_wait() # Random wait to make requests separated in time in order to prevent API rate limit
|
|
exec_times = []
|
|
while True:
|
|
if not self.is_valid_request(product):
|
|
logging.info(f"{product['product_name']} not valid anymore, exiting worker")
|
|
break # Exits and ends worker thread
|
|
start_time = time.time()
|
|
articles = self.request(product['product_name'], 0, product['latitude'], product['longitude'], product['distance'], product['condition'], product['min_price'], product['max_price'], product['category'])
|
|
for article in articles:
|
|
if not article['id'] in list:
|
|
logging.info(f"Found article {article['title']}")
|
|
try:
|
|
if not self.has_excluded_words(article['title'].lower(), article['description'].lower(), product['title_description_exclude']) and not self.is_title_key_word_excluded(article['title'].lower(), product['title_exclude']):
|
|
try:
|
|
await helpers.send_article(article, product)
|
|
except:
|
|
await helpers.send_article(article, product)
|
|
await asyncio.sleep(1)
|
|
#time.sleep(1) # Avoid Telegram flood restriction
|
|
list.insert(0, article['id'])
|
|
except Exception as e:
|
|
logging.info("---------- EXCEPTION -----------")
|
|
logging.info(f"{product['product_name']} worker crashed. {e}")
|
|
logging.info(f"{product['product_name']}: Trying to parse {article['id']}: {article['title']} .\n")
|
|
#time.sleep(constants.SLEEP_TIME)
|
|
await asyncio.sleep(constants.SLEEP_TIME)
|
|
exec_times.append(time.time() - start_time)
|
|
logging.info(f"\'{product['product_name']}\' for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']}) node-> last: {exec_times[-1]:.2f} max: {self.get_max_time(exec_times):.2f} avg: {self.get_average_time(exec_times):.2f}")
|
|
|
|
def has_excluded_words(self, title, description, excluded_words):
|
|
if len(excluded_words) > 0:
|
|
for word in excluded_words.split(","):
|
|
logging.info("EXCLUDER: Checking '" + word + "' for title: '" + title)
|
|
if word in title or word in description:
|
|
logging.info("EXCLUDE!")
|
|
return True
|
|
return False
|
|
|
|
def is_title_key_word_excluded(self, title, excluded_words):
|
|
if len(excluded_words) > 0:
|
|
for word in excluded_words.split(","):
|
|
logging.info("Checking '" + word + "' for title: '" + title)
|
|
if word in title:
|
|
return True
|
|
return False
|
|
|
|
def get_average_time(self, exec_times):
|
|
sum = 0
|
|
for i in exec_times:
|
|
sum = sum + i
|
|
|
|
return sum / len(exec_times)
|
|
|
|
def get_max_time(self, exec_times):
|
|
largest = 0
|
|
for i in exec_times:
|
|
if i > largest:
|
|
largest = i
|
|
return largest
|
|
|
|
async def old_run(product):
|
|
worker = Worker()
|
|
list = worker.first_run(product)
|
|
while True:
|
|
try:
|
|
logging.info(f"Wallapop monitor worker started. Checking for new items containing: \'{product['product_name']}\' for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']}) with given parameters periodically")
|
|
await worker.work(product, list)
|
|
break
|
|
except Exception as e:
|
|
logging.info(f"Exception: {e}")
|
|
logging.info(f"{product['product_name']} worker crashed. Restarting worker...")
|
|
await asyncio.sleep(10)
|
|
#time.sleep(10)
|
|
logging.info(f"Wallapop monitor worker stopped for: \'{product['product_name']}\'")
|
|
|
|
def run(product):
|
|
worker = Worker()
|
|
list = worker.first_run(product)
|
|
while True:
|
|
try:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
loop.run_until_complete(worker.work(product, list))
|
|
loop.close()
|
|
break
|
|
except Exception as e:
|
|
logging.info(f"Exception: {e}")
|
|
logging.info(f"{product['product_name']} worker crashed. Restarting worker...")
|
|
time.sleep(10)
|
|
logging.info(f"Wallapop monitor worker stopped for: \'{product['product_name']}\'")
|