import logging import re import random import dbhelper import constants import helpers import parser import time from amazoncaptcha import AmazonCaptcha from bs4 import BeautifulSoup from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium import webdriver from telegram import Update, ForceReply from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext def get_chrome_options(): chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_prefs = {} chrome_options.experimental_options["prefs"] = chrome_prefs chrome_prefs["profile.default_content_settings"] = {"images": 2} return chrome_options def find_amazon_link(update: Update, context: CallbackContext) -> None: productCode = "" extraparams = "" try: msg = update.message.text except AttributeError: logging.info(f"Received message has no text") return start = msg.find("amzn.to") if start != -1: msg = helpers.unshort_url(msg[start:].split()[0]) start = msg.find("amzn.eu") if start != -1: msg = helpers.unshort_url(msg[start:].split()[0]) start = msg.find(constants.searchURL) if start != -1: msg = f"https://{msg[start:].split(' ')[0]}" m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',msg) # Gets product code in amazon m_e = re.search(r'(?:&m=[\w]*)',msg) # Gets vendor and keeps it if m != None: productCode = m.group(0) if m_e != None: extraparams = m_e.group(0) user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id logging.info(f"Link sent by {user} - {msg}") logging.info("Setting up new Chrome Browser") driver = webdriver.Chrome(options=get_chrome_options()) driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) logging.info("Loading Amazon webpage") driver.get(msg) logging.info("Scraping information and closing browser") soup = BeautifulSoup(driver.page_source, "lxml") logging.info("Getting title...") title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup) if title == "": logging.info(f"Title not found, not a valid product or captcha") captcha = AmazonCaptcha.fromdriver(driver) solution = captcha.solve() if solution == 'Not solved': logging.info("Couldn't solve the captcha, if there was any") else: logging.info(f"Captcha solution is {solution}, redirecting") logging.info("Waiting for 5 seconds, humans are not that fast :)") time.sleep(5) fill_captcha_element = driver.find_element(By.ID, 'captchacharacters') fill_captcha_element.send_keys(solution) fill_captcha_element.send_keys(Keys.RETURN) logging.info("Re-loading Amazon webpage") driver.get(msg) logging.info("Scraping information") soup = BeautifulSoup(driver.page_source, "lxml") logging.info("Getting title...") title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup) if title == "": logging.info(f"Title not found, not a valid product or failed captcha") return logging.info(f"Title found: {title}") logging.info("Closing browser") driver.close() context.bot.deleteMessage(chat_id=chat['id'], message_id=message_id) referurl = helpers.new_refer_url(productCode, extraparams) product_id = dbhelper.check_product(referurl, price) if not product_id: product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, referurl, title, price, image) helpers.create_image(product_id, price) context.bot.send_photo(chat_id=update.message.chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{title} \n\n{referurl}") def main() -> None: dbhelper.setup_db() updater = Updater(constants.TELEGRAM_API_KEY) dispatcher = updater.dispatcher dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, find_amazon_link)) updater.start_polling() updater.idle() if __name__ == '__main__': main()