Merge branch 'refactor-bot-class-for-doing-it-more-readable' into 'main'

Refactor bot class in order to make it more readable

See merge request jocaru/amazon-telegram!1
This commit is contained in:
2023-09-22 10:43:10 +00:00
4 changed files with 114 additions and 67 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
.env* .env*
/data/amazon.db /data/amazon.db
/data/images/products /data/images/products
/data/images/last_iteration/*.png
venv
.DS_Store

9
bot/amazon_product.py Normal file
View File

@@ -0,0 +1,9 @@
class AmazonProduct:
def __init__(self, title, price, image):
self.title = title
self.price = price
self.image = image
def __str__(self):
return f"Title={self.title}, price={self.price}, image={self.image}"

View File

@@ -4,7 +4,7 @@ import random
import dbhelper import dbhelper
import constants import constants
import helpers import helpers
import parser import product_parser
import time import time
from amazoncaptcha import AmazonCaptcha from amazoncaptcha import AmazonCaptcha
@@ -15,6 +15,8 @@ from selenium.webdriver.chrome.options import Options
from selenium import webdriver from selenium import webdriver
from telegram import Update, ForceReply, InlineKeyboardButton, InlineKeyboardMarkup from telegram import Update, ForceReply, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
from amazon_product import AmazonProduct
def get_chrome_options(): def get_chrome_options():
chrome_options = Options() chrome_options = Options()
@@ -22,88 +24,120 @@ def get_chrome_options():
chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-dev-shm-usage")
chrome_prefs = {} chrome_prefs = {}
chrome_options.experimental_options["prefs"] = chrome_prefs
chrome_prefs["profile.default_content_settings"] = {"images": 2} chrome_prefs["profile.default_content_settings"] = {"images": 2}
chrome_options.experimental_options["prefs"] = chrome_prefs
return chrome_options return chrome_options
def find_amazon_link(update: Update, context: CallbackContext) -> None:
def create_selenium_driver(options: Options):
logging.info("Setting up new Chrome Browser")
driver = webdriver.Chrome(options=options)
driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000))
return driver
def load_page(url: str, driver: webdriver, screenshot_type=""):
logging.info(f"Loading webpage {url}")
driver.get(url)
driver.save_screenshot(f"/app/data/images/last_iteration/last_screenshot{screenshot_type}.png")
return driver.page_source
def extract_amazon_url_with_referer(text: str):
productCode = "" productCode = ""
extraparams = "" extraparams = ""
start = text.find("amzn.to")
if start != -1:
text = helpers.unshort_url(text[start:].split()[0])
start = text.find("amzn.eu")
if start != -1:
text = helpers.unshort_url(text[start:].split()[0])
start = text.find(constants.searchURL)
if start != -1:
text = f"https://{text[start:].split(' ')[0]}"
m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',text) # Gets product code in amazon
m_e = re.search(r'(?:&m=[\w]*)',text) # Gets vendor and keeps it
if m != None:
productCode = m.group(0)
if m_e != None:
extraparams = m_e.group(0)
return helpers.new_refer_url(productCode, extraparams)
def resolve_captcha(driver: webdriver):
logging.info("Trying to resolve captcha")
captcha = AmazonCaptcha.fromdriver(driver)
solution = captcha.solve()
return solution if solution != 'Not solved' else None
def apply_captcha(captcha_solution: str, driver: webdriver):
logging.info(f"Captcha solution is {captcha_solution}, redirecting")
logging.info("Waiting for 5 seconds, humans are not that fast :)")
time.sleep(5)
fill_captcha_element = driver.find_element(By.ID, 'captchacharacters')
fill_captcha_element.send_keys(captcha_solution)
fill_captcha_element.send_keys(Keys.RETURN)
def scrape_data(page_html: str) -> AmazonProduct:
logging.info("Scraping information")
soup = BeautifulSoup(page_html, "lxml")
etree_soup = BeautifulSoup(page_html, "html.parser")
title, price, image = product_parser.get_title(soup), product_parser.get_price(soup), product_parser.get_image(soup, etree_soup)
return AmazonProduct(title=title, price=price, image=image) if title != "" else None
def find_amazon_link(update: Update, context: CallbackContext) -> None:
try: try:
msg = update.message.text msg = update.message.text
except AttributeError: except AttributeError:
logging.info(f"Received message has no text") logging.info(f"Received message has no text")
return return
start = msg.find("amzn.to")
if start != -1:
msg = helpers.unshort_url(msg[start:].split()[0])
start = msg.find("amzn.eu")
if start != -1:
msg = helpers.unshort_url(msg[start:].split()[0])
start = msg.find(constants.searchURL)
if start != -1:
msg = f"https://{msg[start:].split(' ')[0]}"
m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',msg) # Gets product code in amazon
m_e = re.search(r'(?:&m=[\w]*)',msg) # Gets vendor and keeps it
if m != None:
productCode = m.group(0)
if m_e != None:
extraparams = m_e.group(0)
user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id user, chat, chat_id, message_id = update.message.from_user, update.message.chat, update.message.chat_id, update.message.message_id
logging.info(f"Link sent by {user} - {msg}") logging.info(f"Link sent by {user} - {msg}")
referurl = helpers.new_refer_url(productCode, extraparams) amazon_url_with_referer = extract_amazon_url_with_referer(msg)
logging.info("Setting up new Chrome Browser")
driver = webdriver.Chrome(options=get_chrome_options())
driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000))
logging.info("Loading Amazon webpage")
driver.get(referurl)
driver.save_screenshot('/app/data/last_screenshot.png')
logging.info("Scraping information and closing browser")
soup = BeautifulSoup(driver.page_source, "lxml")
etree_soup = BeautifulSoup(driver.page_source, "html.parser")
# DEBUG
#logging.info(soup)
logging.info("Getting title...") if amazon_url_with_referer:
title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup) with create_selenium_driver(get_chrome_options()) as chrome_driver:
if title == "": logging.info("Loading page for scraping information")
page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver)
product_data = scrape_data(page_html)
if not product_data:
logging.info(f"Title not found, not a valid product or captcha") logging.info(f"Title not found, not a valid product or captcha")
captcha = AmazonCaptcha.fromdriver(driver)
solution = captcha.solve() captcha_solution = resolve_captcha(chrome_driver)
if solution == 'Not solved':
logging.info("Couldn't solve the captcha, if there was any") if captcha_solution:
else: apply_captcha(captcha_solution, chrome_driver)
logging.info(f"Captcha solution is {solution}, redirecting")
logging.info("Waiting for 5 seconds, humans are not that fast :)")
time.sleep(5)
fill_captcha_element = driver.find_element(By.ID, 'captchacharacters')
fill_captcha_element.send_keys(solution)
fill_captcha_element.send_keys(Keys.RETURN)
logging.info("Re-loading Amazon webpage") logging.info("Re-loading Amazon webpage")
driver.get(msg) page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver, screenshot_type="_aftercaptcha")
logging.info("Scraping information") product_data = scrape_data(page_html)
soup = BeautifulSoup(driver.page_source, "lxml") else:
etree_soup = BeautifulSoup(driver.page_source, "html.parser") logging.info("Couldn't solve the captcha, if there was any")
logging.info("Getting title...")
title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup) if not product_data: #if after applying the captcha we don't have any data yet, stop the execution and reply to the user
if title == "": logging.info("Unable to get the product information")
logging.info(f"Title not found, not a valid product or failed captcha") context.bot.send_message(chat_id=chat_id, text="Unable to get product attributes from the provided url", reply_to_message_id=message_id)
return return
logging.info(f"Title found: {title}")
logging.info("Closing browser") logging.info(f"Product information found: {product_data}")
driver.close()
context.bot.deleteMessage(chat_id=chat['id'], message_id=message_id) context.bot.deleteMessage(chat_id=chat_id, message_id=message_id)
product_id = dbhelper.check_product(referurl, price) product_id = dbhelper.check_product(amazon_url_with_referer, product_data.price)
if not product_id: if not product_id:
product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, referurl, title, price, image) product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat_id, msg, amazon_url_with_referer, product_data.title, product_data.price, product_data.image)
helpers.create_image(product_id, price) helpers.create_image(product_id, product_data.price)
keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{referurl}")]] keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]]
markup = InlineKeyboardMarkup(keyboard) markup = InlineKeyboardMarkup(keyboard)
context.bot.send_photo(chat_id=update.message.chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{title}", reply_markup=markup) context.bot.send_photo(chat_id=chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{product_data.title}", reply_markup=markup)
def main() -> None: def main() -> None:
dbhelper.setup_db() dbhelper.setup_db()
@@ -114,5 +148,6 @@ def main() -> None:
updater.start_polling() updater.start_polling()
updater.idle() updater.idle()
if __name__ == '__main__': if __name__ == '__main__':
main() main()