diff --git a/.gitignore b/.gitignore index 56d57fd..7c2e09e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ .env* /data/amazon.db -/data/images/products \ No newline at end of file +/data/images/products +/data/images/last_iteration/*.png +venv +.DS_Store \ No newline at end of file diff --git a/bot/amazon_product.py b/bot/amazon_product.py new file mode 100644 index 0000000..939815f --- /dev/null +++ b/bot/amazon_product.py @@ -0,0 +1,9 @@ +class AmazonProduct: + + def __init__(self, title, price, image): + self.title = title + self.price = price + self.image = image + + def __str__(self): + return f"Title={self.title}, price={self.price}, image={self.image}" \ No newline at end of file diff --git a/bot/bot.py b/bot/bot.py index fc368a1..8969562 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -4,7 +4,7 @@ import random import dbhelper import constants import helpers -import parser +import product_parser import time from amazoncaptcha import AmazonCaptcha @@ -15,6 +15,8 @@ from selenium.webdriver.chrome.options import Options from selenium import webdriver from telegram import Update, ForceReply, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext +from amazon_product import AmazonProduct + def get_chrome_options(): chrome_options = Options() @@ -22,88 +24,120 @@ def get_chrome_options(): chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_prefs = {} - chrome_options.experimental_options["prefs"] = chrome_prefs chrome_prefs["profile.default_content_settings"] = {"images": 2} + chrome_options.experimental_options["prefs"] = chrome_prefs return chrome_options -def find_amazon_link(update: Update, context: CallbackContext) -> None: + +def create_selenium_driver(options: Options): + logging.info("Setting up new Chrome Browser") + driver = webdriver.Chrome(options=options) + driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) + return driver + + +def load_page(url: str, driver: webdriver, screenshot_type=""): + logging.info(f"Loading webpage {url}") + driver.get(url) + driver.save_screenshot(f"/app/data/images/last_iteration/last_screenshot{screenshot_type}.png") + return driver.page_source + + +def extract_amazon_url_with_referer(text: str): productCode = "" extraparams = "" + + start = text.find("amzn.to") + if start != -1: + text = helpers.unshort_url(text[start:].split()[0]) + start = text.find("amzn.eu") + if start != -1: + text = helpers.unshort_url(text[start:].split()[0]) + start = text.find(constants.searchURL) + if start != -1: + text = f"https://{text[start:].split(' ')[0]}" + m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',text) # Gets product code in amazon + m_e = re.search(r'(?:&m=[\w]*)',text) # Gets vendor and keeps it + if m != None: + productCode = m.group(0) + if m_e != None: + extraparams = m_e.group(0) + return helpers.new_refer_url(productCode, extraparams) + + +def resolve_captcha(driver: webdriver): + logging.info("Trying to resolve captcha") + captcha = AmazonCaptcha.fromdriver(driver) + solution = captcha.solve() + return solution if solution != 'Not solved' else None + + +def apply_captcha(captcha_solution: str, driver: webdriver): + logging.info(f"Captcha solution is {captcha_solution}, redirecting") + logging.info("Waiting for 5 seconds, humans are not that fast :)") + time.sleep(5) + fill_captcha_element = driver.find_element(By.ID, 'captchacharacters') + fill_captcha_element.send_keys(captcha_solution) + fill_captcha_element.send_keys(Keys.RETURN) + + +def scrape_data(page_html: str) -> AmazonProduct: + logging.info("Scraping information") + soup = BeautifulSoup(page_html, "lxml") + etree_soup = BeautifulSoup(page_html, "html.parser") + + title, price, image = product_parser.get_title(soup), product_parser.get_price(soup), product_parser.get_image(soup, etree_soup) + return AmazonProduct(title=title, price=price, image=image) if title != "" else None + + +def find_amazon_link(update: Update, context: CallbackContext) -> None: try: msg = update.message.text except AttributeError: logging.info(f"Received message has no text") return - start = msg.find("amzn.to") - if start != -1: - msg = helpers.unshort_url(msg[start:].split()[0]) - start = msg.find("amzn.eu") - if start != -1: - msg = helpers.unshort_url(msg[start:].split()[0]) - start = msg.find(constants.searchURL) - if start != -1: - msg = f"https://{msg[start:].split(' ')[0]}" - m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',msg) # Gets product code in amazon - m_e = re.search(r'(?:&m=[\w]*)',msg) # Gets vendor and keeps it - if m != None: - productCode = m.group(0) - if m_e != None: - extraparams = m_e.group(0) + + user, chat, chat_id, message_id = update.message.from_user, update.message.chat, update.message.chat_id, update.message.message_id + logging.info(f"Link sent by {user} - {msg}") - user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id - logging.info(f"Link sent by {user} - {msg}") + amazon_url_with_referer = extract_amazon_url_with_referer(msg) - referurl = helpers.new_refer_url(productCode, extraparams) - logging.info("Setting up new Chrome Browser") - driver = webdriver.Chrome(options=get_chrome_options()) - driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) - logging.info("Loading Amazon webpage") - driver.get(referurl) - driver.save_screenshot('/app/data/last_screenshot.png') - logging.info("Scraping information and closing browser") - soup = BeautifulSoup(driver.page_source, "lxml") - etree_soup = BeautifulSoup(driver.page_source, "html.parser") - # DEBUG - #logging.info(soup) + if amazon_url_with_referer: + with create_selenium_driver(get_chrome_options()) as chrome_driver: + logging.info("Loading page for scraping information") + page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver) + product_data = scrape_data(page_html) + + if not product_data: + logging.info(f"Title not found, not a valid product or captcha") - logging.info("Getting title...") - title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup) - if title == "": - logging.info(f"Title not found, not a valid product or captcha") - captcha = AmazonCaptcha.fromdriver(driver) - solution = captcha.solve() - if solution == 'Not solved': - logging.info("Couldn't solve the captcha, if there was any") - else: - logging.info(f"Captcha solution is {solution}, redirecting") - logging.info("Waiting for 5 seconds, humans are not that fast :)") - time.sleep(5) - fill_captcha_element = driver.find_element(By.ID, 'captchacharacters') - fill_captcha_element.send_keys(solution) - fill_captcha_element.send_keys(Keys.RETURN) - logging.info("Re-loading Amazon webpage") - driver.get(msg) - logging.info("Scraping information") - soup = BeautifulSoup(driver.page_source, "lxml") - etree_soup = BeautifulSoup(driver.page_source, "html.parser") - logging.info("Getting title...") - title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup) - if title == "": - logging.info(f"Title not found, not a valid product or failed captcha") + captcha_solution = resolve_captcha(chrome_driver) + + if captcha_solution: + apply_captcha(captcha_solution, chrome_driver) + + logging.info("Re-loading Amazon webpage") + page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver, screenshot_type="_aftercaptcha") + product_data = scrape_data(page_html) + else: + logging.info("Couldn't solve the captcha, if there was any") + + if not product_data: #if after applying the captcha we don't have any data yet, stop the execution and reply to the user + logging.info("Unable to get the product information") + context.bot.send_message(chat_id=chat_id, text="Unable to get product attributes from the provided url", reply_to_message_id=message_id) return - logging.info(f"Title found: {title}") + + logging.info(f"Product information found: {product_data}") - logging.info("Closing browser") - driver.close() - - context.bot.deleteMessage(chat_id=chat['id'], message_id=message_id) - product_id = dbhelper.check_product(referurl, price) + context.bot.deleteMessage(chat_id=chat_id, message_id=message_id) + product_id = dbhelper.check_product(amazon_url_with_referer, product_data.price) if not product_id: - product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, referurl, title, price, image) - helpers.create_image(product_id, price) - keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{referurl}")]] + product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat_id, msg, amazon_url_with_referer, product_data.title, product_data.price, product_data.image) + helpers.create_image(product_id, product_data.price) + keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]] markup = InlineKeyboardMarkup(keyboard) - context.bot.send_photo(chat_id=update.message.chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{title}", reply_markup=markup) + context.bot.send_photo(chat_id=chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{product_data.title}", reply_markup=markup) + def main() -> None: dbhelper.setup_db() @@ -114,5 +148,6 @@ def main() -> None: updater.start_polling() updater.idle() + if __name__ == '__main__': main() diff --git a/bot/parser.py b/bot/product_parser.py similarity index 100% rename from bot/parser.py rename to bot/product_parser.py