From fe1ac2cedd4a102ff9e38ce914850a34f56d2bf5 Mon Sep 17 00:00:00 2001 From: Alejandro Perez Lopez Date: Tue, 5 Sep 2023 12:04:14 +0200 Subject: [PATCH 1/5] Extract into a separated method the logic for building the referer url --- bot/bot.py | 60 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/bot/bot.py b/bot/bot.py index fc368a1..be99ed3 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -22,8 +22,8 @@ def get_chrome_options(): chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_prefs = {} - chrome_options.experimental_options["prefs"] = chrome_prefs chrome_prefs["profile.default_content_settings"] = {"images": 2} + chrome_options.experimental_options["prefs"] = chrome_prefs return chrome_options def find_amazon_link(update: Update, context: CallbackContext) -> None: @@ -34,33 +34,22 @@ def find_amazon_link(update: Update, context: CallbackContext) -> None: except AttributeError: logging.info(f"Received message has no text") return - start = msg.find("amzn.to") - if start != -1: - msg = helpers.unshort_url(msg[start:].split()[0]) - start = msg.find("amzn.eu") - if start != -1: - msg = helpers.unshort_url(msg[start:].split()[0]) - start = msg.find(constants.searchURL) - if start != -1: - msg = f"https://{msg[start:].split(' ')[0]}" - m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',msg) # Gets product code in amazon - m_e = re.search(r'(?:&m=[\w]*)',msg) # Gets vendor and keeps it - if m != None: - productCode = m.group(0) - if m_e != None: - extraparams = m_e.group(0) + + user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id + logging.info(f"Link sent by {user} - {msg}") - user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id - logging.info(f"Link sent by {user} - {msg}") + amazon_url_with_referer = extract_amazon_url_with_referer(msg) - referurl = helpers.new_refer_url(productCode, extraparams) + if amazon_url_with_referer: logging.info("Setting up new Chrome Browser") driver = webdriver.Chrome(options=get_chrome_options()) driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) logging.info("Loading Amazon webpage") - driver.get(referurl) + + driver.get(amazon_url_with_referer) driver.save_screenshot('/app/data/last_screenshot.png') logging.info("Scraping information and closing browser") + soup = BeautifulSoup(driver.page_source, "lxml") etree_soup = BeautifulSoup(driver.page_source, "html.parser") # DEBUG @@ -82,7 +71,7 @@ def find_amazon_link(update: Update, context: CallbackContext) -> None: fill_captcha_element.send_keys(solution) fill_captcha_element.send_keys(Keys.RETURN) logging.info("Re-loading Amazon webpage") - driver.get(msg) + driver.get(amazon_url_with_referer) logging.info("Scraping information") soup = BeautifulSoup(driver.page_source, "lxml") etree_soup = BeautifulSoup(driver.page_source, "html.parser") @@ -97,14 +86,37 @@ def find_amazon_link(update: Update, context: CallbackContext) -> None: driver.close() context.bot.deleteMessage(chat_id=chat['id'], message_id=message_id) - product_id = dbhelper.check_product(referurl, price) + product_id = dbhelper.check_product(amazon_url_with_referer, price) if not product_id: - product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, referurl, title, price, image) + product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, amazon_url_with_referer, title, price, image) helpers.create_image(product_id, price) - keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{referurl}")]] + keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]] markup = InlineKeyboardMarkup(keyboard) context.bot.send_photo(chat_id=update.message.chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{title}", reply_markup=markup) +def extract_amazon_url_with_referer(text: str): + + start = text.find("amzn.to") + if start != -1: + text = helpers.unshort_url(text[start:].split()[0]) + start = text.find("amzn.eu") + if start != -1: + text = helpers.unshort_url(text[start:].split()[0]) + start = text.find(constants.searchURL) + if start != -1: + text = f"https://{text[start:].split(' ')[0]}" + m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',text) # Gets product code in amazon + m_e = re.search(r'(?:&m=[\w]*)',text) # Gets vendor and keeps it + if m != None: + productCode = m.group(0) + if m_e != None: + extraparams = m_e.group(0) + + return helpers.new_refer_url(productCode, extraparams) + + return None + + def main() -> None: dbhelper.setup_db() From 1d6213ffb131bf9cf80ce8936a1f389dc509e217 Mon Sep 17 00:00:00 2001 From: Alejandro Perez Lopez Date: Sat, 9 Sep 2023 19:22:06 +0200 Subject: [PATCH 2/5] Modify .gitignore in order to add more routes for images --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 56d57fd..7c2e09e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ .env* /data/amazon.db -/data/images/products \ No newline at end of file +/data/images/products +/data/images/last_iteration/*.png +venv +.DS_Store \ No newline at end of file From bf1b9309e125ce918b7d43c5d390e9321c9eae00 Mon Sep 17 00:00:00 2001 From: Alejandro Perez Lopez Date: Sat, 9 Sep 2023 19:22:40 +0200 Subject: [PATCH 3/5] Create a class for storing the scraped product information --- bot/amazon_product.py | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 bot/amazon_product.py diff --git a/bot/amazon_product.py b/bot/amazon_product.py new file mode 100644 index 0000000..939815f --- /dev/null +++ b/bot/amazon_product.py @@ -0,0 +1,9 @@ +class AmazonProduct: + + def __init__(self, title, price, image): + self.title = title + self.price = price + self.image = image + + def __str__(self): + return f"Title={self.title}, price={self.price}, image={self.image}" \ No newline at end of file From aec433fa9c3a7e8f915ce7eb378e703df92f92e2 Mon Sep 17 00:00:00 2001 From: Alejandro Perez Lopez Date: Sat, 9 Sep 2023 19:23:56 +0200 Subject: [PATCH 4/5] Remove duplicated code by extracting repeated code into methods and add feedback for invalid urls --- bot/bot.py | 155 +++++++++++++++------------ bot/{parser.py => product_parser.py} | 0 2 files changed, 89 insertions(+), 66 deletions(-) rename bot/{parser.py => product_parser.py} (100%) diff --git a/bot/bot.py b/bot/bot.py index be99ed3..e689ab3 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -4,7 +4,7 @@ import random import dbhelper import constants import helpers -import parser +import product_parser import time from amazoncaptcha import AmazonCaptcha @@ -15,6 +15,8 @@ from selenium.webdriver.chrome.options import Options from selenium import webdriver from telegram import Update, ForceReply, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext +from amazon_product import AmazonProduct + def get_chrome_options(): chrome_options = Options() @@ -26,75 +28,24 @@ def get_chrome_options(): chrome_options.experimental_options["prefs"] = chrome_prefs return chrome_options -def find_amazon_link(update: Update, context: CallbackContext) -> None: - productCode = "" - extraparams = "" - try: - msg = update.message.text - except AttributeError: - logging.info(f"Received message has no text") - return - - user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id - logging.info(f"Link sent by {user} - {msg}") - amazon_url_with_referer = extract_amazon_url_with_referer(msg) +def create_selenium_driver(options: Options): + logging.info("Setting up new Chrome Browser") + driver = webdriver.Chrome(options=options) + driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) + return driver - if amazon_url_with_referer: - logging.info("Setting up new Chrome Browser") - driver = webdriver.Chrome(options=get_chrome_options()) - driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) - logging.info("Loading Amazon webpage") - - driver.get(amazon_url_with_referer) - driver.save_screenshot('/app/data/last_screenshot.png') - logging.info("Scraping information and closing browser") - soup = BeautifulSoup(driver.page_source, "lxml") - etree_soup = BeautifulSoup(driver.page_source, "html.parser") - # DEBUG - #logging.info(soup) +def load_page(url: str, driver: webdriver, screenshot_type=""): + logging.info(f"Loading webpage {url}") + driver.get(url) + driver.save_screenshot(f"/app/data/images/last_iteration/last_screenshot{screenshot_type}.png") + return driver.page_source - logging.info("Getting title...") - title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup) - if title == "": - logging.info(f"Title not found, not a valid product or captcha") - captcha = AmazonCaptcha.fromdriver(driver) - solution = captcha.solve() - if solution == 'Not solved': - logging.info("Couldn't solve the captcha, if there was any") - else: - logging.info(f"Captcha solution is {solution}, redirecting") - logging.info("Waiting for 5 seconds, humans are not that fast :)") - time.sleep(5) - fill_captcha_element = driver.find_element(By.ID, 'captchacharacters') - fill_captcha_element.send_keys(solution) - fill_captcha_element.send_keys(Keys.RETURN) - logging.info("Re-loading Amazon webpage") - driver.get(amazon_url_with_referer) - logging.info("Scraping information") - soup = BeautifulSoup(driver.page_source, "lxml") - etree_soup = BeautifulSoup(driver.page_source, "html.parser") - logging.info("Getting title...") - title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup) - if title == "": - logging.info(f"Title not found, not a valid product or failed captcha") - return - logging.info(f"Title found: {title}") - - logging.info("Closing browser") - driver.close() - - context.bot.deleteMessage(chat_id=chat['id'], message_id=message_id) - product_id = dbhelper.check_product(amazon_url_with_referer, price) - if not product_id: - product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, amazon_url_with_referer, title, price, image) - helpers.create_image(product_id, price) - keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]] - markup = InlineKeyboardMarkup(keyboard) - context.bot.send_photo(chat_id=update.message.chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{title}", reply_markup=markup) def extract_amazon_url_with_referer(text: str): + productCode = "" + extraparams = "" start = text.find("amzn.to") if start != -1: @@ -111,10 +62,81 @@ def extract_amazon_url_with_referer(text: str): productCode = m.group(0) if m_e != None: extraparams = m_e.group(0) - return helpers.new_refer_url(productCode, extraparams) - return None + +def resolve_captcha(driver: webdriver): + logging.info("Trying to resolve captcha") + captcha = AmazonCaptcha.fromdriver(driver) + solution = captcha.solve() + return solution if solution != 'Not solved' else None + + +def apply_captcha(captcha_solution: str, driver: webdriver): + logging.info(f"Captcha solution is {captcha_solution}, redirecting") + logging.info("Waiting for 5 seconds, humans are not that fast :)") + time.sleep(5) + fill_captcha_element = driver.find_element(By.ID, 'captchacharacters') + fill_captcha_element.send_keys(captcha_solution) + fill_captcha_element.send_keys(Keys.RETURN) + + +def scrape_data(page_html: str) -> AmazonProduct: + logging.info("Scraping information") + soup = BeautifulSoup(page_html, "lxml") + etree_soup = BeautifulSoup(page_html, "html.parser") + + title, price, image = product_parser.get_title(soup), product_parser.get_price(soup), product_parser.get_image(soup, etree_soup) + return AmazonProduct(title=title, price=price, image=image) if title != "" else None + + +def find_amazon_link(update: Update, context: CallbackContext) -> None: + try: + msg = update.message.text + except AttributeError: + logging.info(f"Received message has no text") + return + + user, chat, chat_id, message_id = update.message.from_user, update.message.chat, update.message.chat_id, update.message.message_id + logging.info(f"Link sent by {user} - {msg}") + + amazon_url_with_referer = extract_amazon_url_with_referer(msg) + + if amazon_url_with_referer: + with create_selenium_driver(get_chrome_options()) as chrome_driver: + logging.info("Loading page for scraping information") + page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver) + product_data = scrape_data(page_html) + + if not product_data: + logging.info(f"Title not found, not a valid product or captcha") + + captcha_solution = resolve_captcha(chrome_driver) + + if captcha_solution: + apply_captcha(captcha_solution, chrome_driver) + + logging.info("Re-loading Amazon webpage") + page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver, screenshot_type="_aftercaptcha") + product_data = scrape_data(page_html) + + if not product_data: + logging.info("Unable to get the product information") + context.bot.send_message(chat_id=chat_id, text="Unable to get product attributes from the provided url", reply_to_message_id=message_id) + return + else: + logging.info("Couldn't solve the captcha, if there was any") + + logging.info(f"Product information found: {product_data}") + + context.bot.deleteMessage(chat_id=chat_id, message_id=message_id) + product_id = dbhelper.check_product(amazon_url_with_referer, product_data.price) + if not product_id: + product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat_id, msg, amazon_url_with_referer, product_data.title, product_data.price, product_data.image) + helpers.create_image(product_id, product_data.price) + keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]] + markup = InlineKeyboardMarkup(keyboard) + context.bot.send_photo(chat_id=chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{product_data.title}", reply_markup=markup) def main() -> None: @@ -126,5 +148,6 @@ def main() -> None: updater.start_polling() updater.idle() + if __name__ == '__main__': main() diff --git a/bot/parser.py b/bot/product_parser.py similarity index 100% rename from bot/parser.py rename to bot/product_parser.py From 38fdf3bb50983d8900f8958eb73c7b7091797acb Mon Sep 17 00:00:00 2001 From: Alejandro Perez Lopez Date: Fri, 22 Sep 2023 12:37:05 +0200 Subject: [PATCH 5/5] Move logic in case the captcha resolution fails or we can't extract the data after the captcha resolution to avoid errors --- bot/bot.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bot/bot.py b/bot/bot.py index e689ab3..8969562 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -119,13 +119,13 @@ def find_amazon_link(update: Update, context: CallbackContext) -> None: logging.info("Re-loading Amazon webpage") page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver, screenshot_type="_aftercaptcha") product_data = scrape_data(page_html) - - if not product_data: - logging.info("Unable to get the product information") - context.bot.send_message(chat_id=chat_id, text="Unable to get product attributes from the provided url", reply_to_message_id=message_id) - return else: logging.info("Couldn't solve the captcha, if there was any") + + if not product_data: #if after applying the captcha we don't have any data yet, stop the execution and reply to the user + logging.info("Unable to get the product information") + context.bot.send_message(chat_id=chat_id, text="Unable to get product attributes from the provided url", reply_to_message_id=message_id) + return logging.info(f"Product information found: {product_data}")