diff --git a/bot/bot.py b/bot/bot.py index be99ed3..e689ab3 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -4,7 +4,7 @@ import random import dbhelper import constants import helpers -import parser +import product_parser import time from amazoncaptcha import AmazonCaptcha @@ -15,6 +15,8 @@ from selenium.webdriver.chrome.options import Options from selenium import webdriver from telegram import Update, ForceReply, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext +from amazon_product import AmazonProduct + def get_chrome_options(): chrome_options = Options() @@ -26,75 +28,24 @@ def get_chrome_options(): chrome_options.experimental_options["prefs"] = chrome_prefs return chrome_options -def find_amazon_link(update: Update, context: CallbackContext) -> None: - productCode = "" - extraparams = "" - try: - msg = update.message.text - except AttributeError: - logging.info(f"Received message has no text") - return - - user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id - logging.info(f"Link sent by {user} - {msg}") - amazon_url_with_referer = extract_amazon_url_with_referer(msg) +def create_selenium_driver(options: Options): + logging.info("Setting up new Chrome Browser") + driver = webdriver.Chrome(options=options) + driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) + return driver - if amazon_url_with_referer: - logging.info("Setting up new Chrome Browser") - driver = webdriver.Chrome(options=get_chrome_options()) - driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) - logging.info("Loading Amazon webpage") - - driver.get(amazon_url_with_referer) - driver.save_screenshot('/app/data/last_screenshot.png') - logging.info("Scraping information and closing browser") - soup = BeautifulSoup(driver.page_source, "lxml") - etree_soup = BeautifulSoup(driver.page_source, "html.parser") - # DEBUG - #logging.info(soup) +def load_page(url: str, driver: webdriver, screenshot_type=""): + logging.info(f"Loading webpage {url}") + driver.get(url) + driver.save_screenshot(f"/app/data/images/last_iteration/last_screenshot{screenshot_type}.png") + return driver.page_source - logging.info("Getting title...") - title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup) - if title == "": - logging.info(f"Title not found, not a valid product or captcha") - captcha = AmazonCaptcha.fromdriver(driver) - solution = captcha.solve() - if solution == 'Not solved': - logging.info("Couldn't solve the captcha, if there was any") - else: - logging.info(f"Captcha solution is {solution}, redirecting") - logging.info("Waiting for 5 seconds, humans are not that fast :)") - time.sleep(5) - fill_captcha_element = driver.find_element(By.ID, 'captchacharacters') - fill_captcha_element.send_keys(solution) - fill_captcha_element.send_keys(Keys.RETURN) - logging.info("Re-loading Amazon webpage") - driver.get(amazon_url_with_referer) - logging.info("Scraping information") - soup = BeautifulSoup(driver.page_source, "lxml") - etree_soup = BeautifulSoup(driver.page_source, "html.parser") - logging.info("Getting title...") - title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup) - if title == "": - logging.info(f"Title not found, not a valid product or failed captcha") - return - logging.info(f"Title found: {title}") - - logging.info("Closing browser") - driver.close() - - context.bot.deleteMessage(chat_id=chat['id'], message_id=message_id) - product_id = dbhelper.check_product(amazon_url_with_referer, price) - if not product_id: - product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, amazon_url_with_referer, title, price, image) - helpers.create_image(product_id, price) - keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]] - markup = InlineKeyboardMarkup(keyboard) - context.bot.send_photo(chat_id=update.message.chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{title}", reply_markup=markup) def extract_amazon_url_with_referer(text: str): + productCode = "" + extraparams = "" start = text.find("amzn.to") if start != -1: @@ -111,10 +62,81 @@ def extract_amazon_url_with_referer(text: str): productCode = m.group(0) if m_e != None: extraparams = m_e.group(0) - return helpers.new_refer_url(productCode, extraparams) - return None + +def resolve_captcha(driver: webdriver): + logging.info("Trying to resolve captcha") + captcha = AmazonCaptcha.fromdriver(driver) + solution = captcha.solve() + return solution if solution != 'Not solved' else None + + +def apply_captcha(captcha_solution: str, driver: webdriver): + logging.info(f"Captcha solution is {captcha_solution}, redirecting") + logging.info("Waiting for 5 seconds, humans are not that fast :)") + time.sleep(5) + fill_captcha_element = driver.find_element(By.ID, 'captchacharacters') + fill_captcha_element.send_keys(captcha_solution) + fill_captcha_element.send_keys(Keys.RETURN) + + +def scrape_data(page_html: str) -> AmazonProduct: + logging.info("Scraping information") + soup = BeautifulSoup(page_html, "lxml") + etree_soup = BeautifulSoup(page_html, "html.parser") + + title, price, image = product_parser.get_title(soup), product_parser.get_price(soup), product_parser.get_image(soup, etree_soup) + return AmazonProduct(title=title, price=price, image=image) if title != "" else None + + +def find_amazon_link(update: Update, context: CallbackContext) -> None: + try: + msg = update.message.text + except AttributeError: + logging.info(f"Received message has no text") + return + + user, chat, chat_id, message_id = update.message.from_user, update.message.chat, update.message.chat_id, update.message.message_id + logging.info(f"Link sent by {user} - {msg}") + + amazon_url_with_referer = extract_amazon_url_with_referer(msg) + + if amazon_url_with_referer: + with create_selenium_driver(get_chrome_options()) as chrome_driver: + logging.info("Loading page for scraping information") + page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver) + product_data = scrape_data(page_html) + + if not product_data: + logging.info(f"Title not found, not a valid product or captcha") + + captcha_solution = resolve_captcha(chrome_driver) + + if captcha_solution: + apply_captcha(captcha_solution, chrome_driver) + + logging.info("Re-loading Amazon webpage") + page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver, screenshot_type="_aftercaptcha") + product_data = scrape_data(page_html) + + if not product_data: + logging.info("Unable to get the product information") + context.bot.send_message(chat_id=chat_id, text="Unable to get product attributes from the provided url", reply_to_message_id=message_id) + return + else: + logging.info("Couldn't solve the captcha, if there was any") + + logging.info(f"Product information found: {product_data}") + + context.bot.deleteMessage(chat_id=chat_id, message_id=message_id) + product_id = dbhelper.check_product(amazon_url_with_referer, product_data.price) + if not product_id: + product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat_id, msg, amazon_url_with_referer, product_data.title, product_data.price, product_data.image) + helpers.create_image(product_id, product_data.price) + keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]] + markup = InlineKeyboardMarkup(keyboard) + context.bot.send_photo(chat_id=chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{product_data.title}", reply_markup=markup) def main() -> None: @@ -126,5 +148,6 @@ def main() -> None: updater.start_polling() updater.idle() + if __name__ == '__main__': main() diff --git a/bot/parser.py b/bot/product_parser.py similarity index 100% rename from bot/parser.py rename to bot/product_parser.py