import logging import random import re import time from amazoncaptcha import AmazonCaptcha from bs4 import BeautifulSoup from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ApplicationBuilder, MessageHandler, filters, CallbackContext import constants import dbhelper import helpers import product_parser from amazon_product import AmazonProduct def get_chrome_options(): chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_prefs = {} chrome_prefs["profile.default_content_settings"] = {"images": 2} chrome_options.experimental_options["prefs"] = chrome_prefs return chrome_options def create_selenium_driver(options: Options): logging.info("Setting up new Chrome Browser") driver = webdriver.Chrome(options=options) driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) return driver def load_page(url: str, driver: webdriver, screenshot_type=""): logging.info(f"Loading webpage {url}") driver.get(url) driver.save_screenshot(f"/app/data/images/last_iteration/last_screenshot{screenshot_type}.png") return driver.page_source def extract_amazon_url_with_referer(text: str): product_code = "" extra_params = "" original_message = re.sub(r'https?://\S+|www\.\S+', '', text) start = text.find("amzn.to") if start != -1: text = helpers.unshort_url(text[start:].split()[0]) start = text.find("amzn.eu") if start != -1: text = helpers.unshort_url(text[start:].split()[0]) start = text.find(constants.searchURL) if start != -1: text = f"https://{text[start:].split(' ')[0]}" product_code_regex_result = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)', text) # Gets product code in amazon vendor_and_smid_result = re.findall(r'(?:[&|?]m=[\w]*)|(?:[&|?]smid=[\w]*)', text) # Gets vendor and smid parameter that it seems to be like a variant and keeps it if product_code_regex_result: product_code = product_code_regex_result.group(0) if vendor_and_smid_result: extra_params = (''.join(str(w) for w in vendor_and_smid_result if w is not None)) extra_params = extra_params.replace('?', '&') return helpers.new_refer_url(product_code, extra_params), original_message # we return here the original message without modification and None for the URL marking that the url is not valid return None, text def resolve_captcha(driver: webdriver): logging.info("Trying to resolve captcha") captcha = AmazonCaptcha.fromdriver(driver) solution = captcha.solve() return solution if solution != 'Not solved' else None def apply_captcha(captcha_solution: str, driver: webdriver): logging.info(f"Captcha solution is {captcha_solution}, redirecting") logging.info("Waiting for 5 seconds, humans are not that fast :)") time.sleep(5) fill_captcha_element = driver.find_element(By.ID, 'captchacharacters') fill_captcha_element.send_keys(captcha_solution) fill_captcha_element.send_keys(Keys.RETURN) def scrape_data(page_html: str) -> AmazonProduct: logging.info("Scraping information") soup = BeautifulSoup(page_html, "lxml") etree_soup = BeautifulSoup(page_html, "html.parser") title, price, image = product_parser.get_title(soup), product_parser.get_price(soup), product_parser.get_image(soup, etree_soup) return AmazonProduct(title=title, price=price, image=image) if title != "" else None async def find_amazon_link(update: Update, context: CallbackContext) -> None: try: msg = update.message.text except AttributeError: logging.info(f"Received message has no text") return user, chat, chat_id, message_id = (update.message.from_user, update.message.chat, update.message.chat_id, update.message.message_id, ) if update.message.is_topic_message: thread_id = update.message.message_thread_id else: thread_id = None amazon_url_with_referer, original_message = extract_amazon_url_with_referer(msg) original_message = f"\n\nMensaje original: {original_message}" if original_message != '' else '' if amazon_url_with_referer: logging.info(f"Link sent by {user} - {msg}") logging.info(f"Info from message: chat_id: {chat_id}, message_id: {message_id}, thread_id: {thread_id}") with create_selenium_driver(get_chrome_options()) as chrome_driver: logging.info("Loading page for scraping information") page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver) product_data = scrape_data(page_html) if not product_data: logging.info(f"Title not found, not a valid product or captcha") captcha_solution = resolve_captcha(chrome_driver) if captcha_solution: apply_captcha(captcha_solution, chrome_driver) logging.info("Re-loading Amazon webpage") page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver, screenshot_type="_aftercaptcha") product_data = scrape_data(page_html) else: logging.info("Couldn't solve the captcha, if there was any") if not product_data: # if after applying the captcha we don't have any data yet, stop the execution and reply to the user logging.info("Unable to get the product information") await context.bot.send_message(chat_id=chat_id, text="Unable to get product attributes from the provided url", reply_to_message_id=message_id, message_thread_id=thread_id) return logging.info(f"Product information found: {product_data}") await context.bot.deleteMessage(chat_id=chat_id, message_id=message_id) product_id = dbhelper.check_product(amazon_url_with_referer, product_data.price) if not product_id: product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat_id, msg, amazon_url_with_referer, product_data.title, product_data.price, product_data.image) helpers.create_image(product_id, product_data.price) keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]] markup = InlineKeyboardMarkup(keyboard) await context.bot.send_photo(chat_id=chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{product_data.title}{original_message}", reply_markup=markup, message_thread_id=thread_id) def main() -> None: dbhelper.setup_db() if constants.telegram_proxy: logging.info("Creating application with socks5 proxy") application = ApplicationBuilder().get_updates_http_version('1.1').http_version('1.1').token( constants.TELEGRAM_API_KEY).proxy(constants.proxy_url).get_updates_proxy(constants.proxy_url).build() else: logging.info("Creating application without socks5 proxy") application = ApplicationBuilder().get_updates_http_version('1.1').http_version('1.1').token( constants.TELEGRAM_API_KEY).build() application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, find_amazon_link)) application.run_polling() if __name__ == '__main__': main()