Remove duplicated code by extracting repeated code into methods and add feedback for invalid urls

This commit is contained in:
Alejandro Perez Lopez
2023-09-09 19:23:56 +02:00
parent bf1b9309e1
commit aec433fa9c
2 changed files with 89 additions and 66 deletions

View File

@@ -4,7 +4,7 @@ import random
import dbhelper import dbhelper
import constants import constants
import helpers import helpers
import parser import product_parser
import time import time
from amazoncaptcha import AmazonCaptcha from amazoncaptcha import AmazonCaptcha
@@ -15,6 +15,8 @@ from selenium.webdriver.chrome.options import Options
from selenium import webdriver from selenium import webdriver
from telegram import Update, ForceReply, InlineKeyboardButton, InlineKeyboardMarkup from telegram import Update, ForceReply, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
from amazon_product import AmazonProduct
def get_chrome_options(): def get_chrome_options():
chrome_options = Options() chrome_options = Options()
@@ -26,75 +28,24 @@ def get_chrome_options():
chrome_options.experimental_options["prefs"] = chrome_prefs chrome_options.experimental_options["prefs"] = chrome_prefs
return chrome_options return chrome_options
def find_amazon_link(update: Update, context: CallbackContext) -> None:
productCode = ""
extraparams = ""
try:
msg = update.message.text
except AttributeError:
logging.info(f"Received message has no text")
return
user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id def create_selenium_driver(options: Options):
logging.info(f"Link sent by {user} - {msg}") logging.info("Setting up new Chrome Browser")
driver = webdriver.Chrome(options=options)
driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000))
return driver
amazon_url_with_referer = extract_amazon_url_with_referer(msg)
if amazon_url_with_referer: def load_page(url: str, driver: webdriver, screenshot_type=""):
logging.info("Setting up new Chrome Browser") logging.info(f"Loading webpage {url}")
driver = webdriver.Chrome(options=get_chrome_options()) driver.get(url)
driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000)) driver.save_screenshot(f"/app/data/images/last_iteration/last_screenshot{screenshot_type}.png")
logging.info("Loading Amazon webpage") return driver.page_source
driver.get(amazon_url_with_referer)
driver.save_screenshot('/app/data/last_screenshot.png')
logging.info("Scraping information and closing browser")
soup = BeautifulSoup(driver.page_source, "lxml")
etree_soup = BeautifulSoup(driver.page_source, "html.parser")
# DEBUG
#logging.info(soup)
logging.info("Getting title...")
title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup)
if title == "":
logging.info(f"Title not found, not a valid product or captcha")
captcha = AmazonCaptcha.fromdriver(driver)
solution = captcha.solve()
if solution == 'Not solved':
logging.info("Couldn't solve the captcha, if there was any")
else:
logging.info(f"Captcha solution is {solution}, redirecting")
logging.info("Waiting for 5 seconds, humans are not that fast :)")
time.sleep(5)
fill_captcha_element = driver.find_element(By.ID, 'captchacharacters')
fill_captcha_element.send_keys(solution)
fill_captcha_element.send_keys(Keys.RETURN)
logging.info("Re-loading Amazon webpage")
driver.get(amazon_url_with_referer)
logging.info("Scraping information")
soup = BeautifulSoup(driver.page_source, "lxml")
etree_soup = BeautifulSoup(driver.page_source, "html.parser")
logging.info("Getting title...")
title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup)
if title == "":
logging.info(f"Title not found, not a valid product or failed captcha")
return
logging.info(f"Title found: {title}")
logging.info("Closing browser")
driver.close()
context.bot.deleteMessage(chat_id=chat['id'], message_id=message_id)
product_id = dbhelper.check_product(amazon_url_with_referer, price)
if not product_id:
product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, amazon_url_with_referer, title, price, image)
helpers.create_image(product_id, price)
keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]]
markup = InlineKeyboardMarkup(keyboard)
context.bot.send_photo(chat_id=update.message.chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{title}", reply_markup=markup)
def extract_amazon_url_with_referer(text: str): def extract_amazon_url_with_referer(text: str):
productCode = ""
extraparams = ""
start = text.find("amzn.to") start = text.find("amzn.to")
if start != -1: if start != -1:
@@ -111,10 +62,81 @@ def extract_amazon_url_with_referer(text: str):
productCode = m.group(0) productCode = m.group(0)
if m_e != None: if m_e != None:
extraparams = m_e.group(0) extraparams = m_e.group(0)
return helpers.new_refer_url(productCode, extraparams) return helpers.new_refer_url(productCode, extraparams)
return None
def resolve_captcha(driver: webdriver):
logging.info("Trying to resolve captcha")
captcha = AmazonCaptcha.fromdriver(driver)
solution = captcha.solve()
return solution if solution != 'Not solved' else None
def apply_captcha(captcha_solution: str, driver: webdriver):
logging.info(f"Captcha solution is {captcha_solution}, redirecting")
logging.info("Waiting for 5 seconds, humans are not that fast :)")
time.sleep(5)
fill_captcha_element = driver.find_element(By.ID, 'captchacharacters')
fill_captcha_element.send_keys(captcha_solution)
fill_captcha_element.send_keys(Keys.RETURN)
def scrape_data(page_html: str) -> AmazonProduct:
logging.info("Scraping information")
soup = BeautifulSoup(page_html, "lxml")
etree_soup = BeautifulSoup(page_html, "html.parser")
title, price, image = product_parser.get_title(soup), product_parser.get_price(soup), product_parser.get_image(soup, etree_soup)
return AmazonProduct(title=title, price=price, image=image) if title != "" else None
def find_amazon_link(update: Update, context: CallbackContext) -> None:
try:
msg = update.message.text
except AttributeError:
logging.info(f"Received message has no text")
return
user, chat, chat_id, message_id = update.message.from_user, update.message.chat, update.message.chat_id, update.message.message_id
logging.info(f"Link sent by {user} - {msg}")
amazon_url_with_referer = extract_amazon_url_with_referer(msg)
if amazon_url_with_referer:
with create_selenium_driver(get_chrome_options()) as chrome_driver:
logging.info("Loading page for scraping information")
page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver)
product_data = scrape_data(page_html)
if not product_data:
logging.info(f"Title not found, not a valid product or captcha")
captcha_solution = resolve_captcha(chrome_driver)
if captcha_solution:
apply_captcha(captcha_solution, chrome_driver)
logging.info("Re-loading Amazon webpage")
page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver, screenshot_type="_aftercaptcha")
product_data = scrape_data(page_html)
if not product_data:
logging.info("Unable to get the product information")
context.bot.send_message(chat_id=chat_id, text="Unable to get product attributes from the provided url", reply_to_message_id=message_id)
return
else:
logging.info("Couldn't solve the captcha, if there was any")
logging.info(f"Product information found: {product_data}")
context.bot.deleteMessage(chat_id=chat_id, message_id=message_id)
product_id = dbhelper.check_product(amazon_url_with_referer, product_data.price)
if not product_id:
product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat_id, msg, amazon_url_with_referer, product_data.title, product_data.price, product_data.image)
helpers.create_image(product_id, product_data.price)
keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]]
markup = InlineKeyboardMarkup(keyboard)
context.bot.send_photo(chat_id=chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{product_data.title}", reply_markup=markup)
def main() -> None: def main() -> None:
@@ -126,5 +148,6 @@ def main() -> None:
updater.start_polling() updater.start_polling()
updater.idle() updater.idle()
if __name__ == '__main__': if __name__ == '__main__':
main() main()