Removed asyncio + threading mix. Used requests to send telegram messages
This commit is contained in:
@@ -2,14 +2,15 @@ import helpers
|
|||||||
import walladb
|
import walladb
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
# Enable logging
|
# Enable logging
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
|
||||||
)
|
)
|
||||||
|
|
||||||
async def work(sleep_time):
|
def work(sleep_time):
|
||||||
await asyncio.sleep(30)
|
time.sleep(30)
|
||||||
while True:
|
while True:
|
||||||
user_list = walladb.get_user_list()
|
user_list = walladb.get_user_list()
|
||||||
for user in user_list:
|
for user in user_list:
|
||||||
@@ -17,12 +18,12 @@ async def work(sleep_time):
|
|||||||
if walladb.is_user_expired(user['telegram_user_id']):
|
if walladb.is_user_expired(user['telegram_user_id']):
|
||||||
message = "¡Hola!\n\nTu cuenta ha caducado. Si quieres seguir usando @wallamanta_bot ponte en contacto con @jocarduck\n\n¡Gracias!"
|
message = "¡Hola!\n\nTu cuenta ha caducado. Si quieres seguir usando @wallamanta_bot ponte en contacto con @jocarduck\n\n¡Gracias!"
|
||||||
try:
|
try:
|
||||||
await asyncio.sleep(1)
|
time.sleep(1)
|
||||||
await helpers.send_message(user['telegram_user_id'], message)
|
helpers.send_message(user['telegram_user_id'], message)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Couldn't sent message to {user['telegram_user_id']}. Reason: {e}")
|
logging.error(f"Couldn't sent message to {user['telegram_user_id']}. Reason: {e}")
|
||||||
walladb.deactivate_user(user['telegram_user_id'])
|
walladb.deactivate_user(user['telegram_user_id'])
|
||||||
await asyncio.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
def account_checker(sleep_time):
|
def account_checker(sleep_time):
|
||||||
logging.info(f"Account checker starting... Checking every {sleep_time} seconds")
|
logging.info(f"Account checker starting... Checking every {sleep_time} seconds")
|
||||||
|
|||||||
@@ -131,12 +131,17 @@ def get_modified_date(article):
|
|||||||
article_date = article['modification_date']
|
article_date = article['modification_date']
|
||||||
return datetime.fromtimestamp(int(int(article_date)/1000)).astimezone(pytz.timezone("Europe/Madrid")).strftime("%d/%m/%Y - %H:%M:%S")
|
return datetime.fromtimestamp(int(int(article_date)/1000)).astimezone(pytz.timezone("Europe/Madrid")).strftime("%d/%m/%Y - %H:%M:%S")
|
||||||
|
|
||||||
async def send_message(telegram_user_id, message):
|
def send_message(telegram_user_id, message):
|
||||||
application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build()
|
files = {
|
||||||
await application.bot.send_message(chat_id=telegram_user_id, text=message)
|
'chat_id': (None, telegram_user_id),
|
||||||
|
'text': (None, message),
|
||||||
|
}
|
||||||
|
requests.post(f'https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendMessage', files=files)
|
||||||
|
#application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build()
|
||||||
|
#await application.bot.send_message(chat_id=telegram_user_id, text=message)
|
||||||
|
|
||||||
async def send_article(article, product):
|
def send_article(article, product):
|
||||||
application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build()
|
#application = Application.builder().get_updates_http_version('1.1').http_version('1.1').token(constants.TELEGRAM_TOKEN).build()
|
||||||
create_image(article)
|
create_image(article)
|
||||||
title = f"*{telegram_escape_characters(article['title'])}*"
|
title = f"*{telegram_escape_characters(article['title'])}*"
|
||||||
description = f"*📝 Descripción*: {telegram_escape_characters(article['description'])}"
|
description = f"*📝 Descripción*: {telegram_escape_characters(article['description'])}"
|
||||||
@@ -156,13 +161,24 @@ async def send_article(article, product):
|
|||||||
text = f"{title}\n\n{description}\n\n{found_by}\n\n{created_at}\n{modified_at}\n\n{location}\n\n{user_ships}\n\n{price}"
|
text = f"{title}\n\n{description}\n\n{found_by}\n\n{created_at}\n{modified_at}\n\n{location}\n\n{user_ships}\n\n{price}"
|
||||||
#url = f"https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto?chat_id={product['telegram_user_id']}&caption={text}&parse_mode=MarkdownV2"
|
#url = f"https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto?chat_id={product['telegram_user_id']}&caption={text}&parse_mode=MarkdownV2"
|
||||||
#files = {'photo':open(f"/app/data/images/products/{article['id']}_composed.png", 'rb')}
|
#files = {'photo':open(f"/app/data/images/products/{article['id']}_composed.png", 'rb')}
|
||||||
keyboard = [[InlineKeyboardButton("Ir al anuncio", url=f"https://es.wallapop.com/item/{article['web_slug']}")]]
|
#keyboard = [[InlineKeyboardButton("Ir al anuncio", url=f"https://es.wallapop.com/item/{article['web_slug']}")]]
|
||||||
#InlineKeyboardButton("Listar productos", callback_data="list")]
|
#InlineKeyboardButton("Listar productos", callback_data="list")]
|
||||||
markup = InlineKeyboardMarkup(keyboard)
|
#markup = InlineKeyboardMarkup(keyboard)
|
||||||
response = await application.bot.send_photo(chat_id=product['telegram_user_id'], photo=open(f"/app/data/images/products/{article['id']}_composed.png", 'rb'), caption=text, parse_mode=ParseMode.MARKDOWN_V2, reply_markup=markup)
|
|
||||||
|
files = {
|
||||||
|
'chat_id': (None, product['telegram_user_id']),
|
||||||
|
'photo': open(f"/app/data/images/products/{article['id']}_composed.png", 'rb'),
|
||||||
|
'caption': (None, text),
|
||||||
|
'parse_mode': (None, ParseMode.MARKDOWN_V2),
|
||||||
|
'reply_markup': (None, '{"inline_keyboard":[[{"text":"","url":"https://google.com"}]]}'),
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.post(f'https://api.telegram.org/bot{constants.TELEGRAM_TOKEN}/sendPhoto', files=files)
|
||||||
|
|
||||||
|
#response = await application.bot.send_photo(chat_id=product['telegram_user_id'], photo=open(f"/app/data/images/products/{article['id']}_composed.png", 'rb'), caption=text, parse_mode=ParseMode.MARKDOWN_V2, reply_markup=markup)
|
||||||
#logging.info(requests.post(url, files=files).content)
|
#logging.info(requests.post(url, files=files).content)
|
||||||
send_to_nr(article, product)
|
send_to_nr(article, product)
|
||||||
logging.info(response)
|
logging.info(response.content)
|
||||||
|
|
||||||
def get_category_name(category):
|
def get_category_name(category):
|
||||||
category = int(category)
|
category = int(category)
|
||||||
|
|||||||
@@ -464,7 +464,7 @@ def main()->None:
|
|||||||
SEARCH_THREADS_LIST.append((product, p))
|
SEARCH_THREADS_LIST.append((product, p))
|
||||||
logging.info(f"{count} products finally loaded")
|
logging.info(f"{count} products finally loaded")
|
||||||
|
|
||||||
p = threading.Thread(target=account_checker.account_checker, args=(3600, ))
|
p = threading.Thread(target=account_checker.work, args=(3600, ))
|
||||||
p.start()
|
p.start()
|
||||||
|
|
||||||
p = threading.Thread(target=count_threads)
|
p = threading.Thread(target=count_threads)
|
||||||
|
|||||||
@@ -94,8 +94,8 @@ class Worker:
|
|||||||
list.insert(0, article['id'])
|
list.insert(0, article['id'])
|
||||||
return list
|
return list
|
||||||
|
|
||||||
async def work(self, product, list):
|
def work(self, product, list):
|
||||||
await helpers.random_async_wait() # Random wait to make requests separated in time in order to prevent API rate limit
|
helpers.random_wait() # Random wait to make requests separated in time in order to prevent API rate limit
|
||||||
exec_times = []
|
exec_times = []
|
||||||
while True:
|
while True:
|
||||||
#logging.info(f"List for {product['product_name']} length is: {len(list)}")
|
#logging.info(f"List for {product['product_name']} length is: {len(list)}")
|
||||||
@@ -126,19 +126,16 @@ class Worker:
|
|||||||
try:
|
try:
|
||||||
if not self.has_excluded_words(article['title'].lower(), article['description'].lower(), product['title_description_exclude']) and not self.is_title_key_word_excluded(article['title'].lower(), product['title_exclude']):
|
if not self.has_excluded_words(article['title'].lower(), article['description'].lower(), product['title_description_exclude']) and not self.is_title_key_word_excluded(article['title'].lower(), product['title_exclude']):
|
||||||
try:
|
try:
|
||||||
await helpers.send_article(article, product)
|
helpers.send_article(article, product)
|
||||||
except:
|
except:
|
||||||
await helpers.send_article(article, product)
|
helpers.send_article(article, product)
|
||||||
await asyncio.sleep(1)
|
time.sleep(1) #Avoid telegram flood restriction
|
||||||
#time.sleep(1) # Avoid Telegram flood restriction
|
|
||||||
#list[article['id']] = 1
|
|
||||||
list.insert(0, article['id'])
|
list.insert(0, article['id'])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.info("---------- EXCEPTION -----------")
|
logging.info("---------- EXCEPTION -----------")
|
||||||
logging.info(f"{product['product_name']} worker crashed. {e}")
|
logging.info(f"{product['product_name']} worker crashed. {e}")
|
||||||
logging.info(f"{product['product_name']}: Trying to parse {article['id']}: {article['title']} .\n")
|
logging.info(f"{product['product_name']}: Trying to parse {article['id']}: {article['title']} .\n")
|
||||||
#time.sleep(constants.SLEEP_TIME)
|
time.sleep(constants.SLEEP_TIME)
|
||||||
await asyncio.sleep(constants.SLEEP_TIME)
|
|
||||||
exec_times.append(time.time() - start_time)
|
exec_times.append(time.time() - start_time)
|
||||||
logging.info(f"\'{product['product_name']}\' for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']}) node-> last: {exec_times[-1]:.2f} max: {self.get_max_time(exec_times):.2f} avg: {self.get_average_time(exec_times):.2f}")
|
logging.info(f"\'{product['product_name']}\' for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']}) node-> last: {exec_times[-1]:.2f} max: {self.get_max_time(exec_times):.2f} avg: {self.get_average_time(exec_times):.2f}")
|
||||||
|
|
||||||
@@ -172,21 +169,6 @@ class Worker:
|
|||||||
if i > largest:
|
if i > largest:
|
||||||
largest = i
|
largest = i
|
||||||
return largest
|
return largest
|
||||||
|
|
||||||
async def old_run(product):
|
|
||||||
worker = Worker()
|
|
||||||
list = worker.first_run(product)
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
logging.info(f"Wallapop monitor worker started. Checking for new items containing: \'{product['product_name']}\' for {walladb.get_user(product['telegram_user_id'])} ({product['telegram_user_id']}) with given parameters periodically")
|
|
||||||
await worker.work(product, list)
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
logging.info(f"Exception: {e}")
|
|
||||||
logging.info(f"{product['product_name']} worker crashed. Restarting worker...")
|
|
||||||
await asyncio.sleep(10)
|
|
||||||
#time.sleep(10)
|
|
||||||
logging.info(f"Wallapop monitor worker stopped for: \'{product['product_name']}\'")
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._stop = True
|
self._stop = True
|
||||||
@@ -200,10 +182,7 @@ class Worker:
|
|||||||
#time.sleep(constants.SLEEP_TIME)
|
#time.sleep(constants.SLEEP_TIME)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
loop = asyncio.new_event_loop()
|
worker.work(product, list)
|
||||||
asyncio.set_event_loop(loop)
|
|
||||||
loop.run_until_complete(worker.work(product, list))
|
|
||||||
loop.close()
|
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.info(f"Exception: {e}")
|
logging.info(f"Exception: {e}")
|
||||||
|
|||||||
Reference in New Issue
Block a user