Files
amazon-telegram/bot/bot.py

154 lines
6.2 KiB
Python

import logging
import re
import random
import dbhelper
import constants
import helpers
import product_parser
import time
from amazoncaptcha import AmazonCaptcha
from bs4 import BeautifulSoup
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium import webdriver
from telegram import Update, ForceReply, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
from amazon_product import AmazonProduct
def get_chrome_options():
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_prefs = {}
chrome_prefs["profile.default_content_settings"] = {"images": 2}
chrome_options.experimental_options["prefs"] = chrome_prefs
return chrome_options
def create_selenium_driver(options: Options):
logging.info("Setting up new Chrome Browser")
driver = webdriver.Chrome(options=options)
driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000))
return driver
def load_page(url: str, driver: webdriver, screenshot_type=""):
logging.info(f"Loading webpage {url}")
driver.get(url)
driver.save_screenshot(f"/app/data/images/last_iteration/last_screenshot{screenshot_type}.png")
return driver.page_source
def extract_amazon_url_with_referer(text: str):
productCode = ""
extraparams = ""
start = text.find("amzn.to")
if start != -1:
text = helpers.unshort_url(text[start:].split()[0])
start = text.find("amzn.eu")
if start != -1:
text = helpers.unshort_url(text[start:].split()[0])
start = text.find(constants.searchURL)
if start != -1:
text = f"https://{text[start:].split(' ')[0]}"
m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',text) # Gets product code in amazon
m_e = re.search(r'(?:&m=[\w]*)',text) # Gets vendor and keeps it
if m != None:
productCode = m.group(0)
if m_e != None:
extraparams = m_e.group(0)
return helpers.new_refer_url(productCode, extraparams)
def resolve_captcha(driver: webdriver):
logging.info("Trying to resolve captcha")
captcha = AmazonCaptcha.fromdriver(driver)
solution = captcha.solve()
return solution if solution != 'Not solved' else None
def apply_captcha(captcha_solution: str, driver: webdriver):
logging.info(f"Captcha solution is {captcha_solution}, redirecting")
logging.info("Waiting for 5 seconds, humans are not that fast :)")
time.sleep(5)
fill_captcha_element = driver.find_element(By.ID, 'captchacharacters')
fill_captcha_element.send_keys(captcha_solution)
fill_captcha_element.send_keys(Keys.RETURN)
def scrape_data(page_html: str) -> AmazonProduct:
logging.info("Scraping information")
soup = BeautifulSoup(page_html, "lxml")
etree_soup = BeautifulSoup(page_html, "html.parser")
title, price, image = product_parser.get_title(soup), product_parser.get_price(soup), product_parser.get_image(soup, etree_soup)
return AmazonProduct(title=title, price=price, image=image) if title != "" else None
def find_amazon_link(update: Update, context: CallbackContext) -> None:
try:
msg = update.message.text
except AttributeError:
logging.info(f"Received message has no text")
return
user, chat, chat_id, message_id = update.message.from_user, update.message.chat, update.message.chat_id, update.message.message_id
amazon_url_with_referer = extract_amazon_url_with_referer(msg)
if amazon_url_with_referer:
logging.info(f"Link sent by {user} - {msg}")
with create_selenium_driver(get_chrome_options()) as chrome_driver:
logging.info("Loading page for scraping information")
page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver)
product_data = scrape_data(page_html)
if not product_data:
logging.info(f"Title not found, not a valid product or captcha")
captcha_solution = resolve_captcha(chrome_driver)
if captcha_solution:
apply_captcha(captcha_solution, chrome_driver)
logging.info("Re-loading Amazon webpage")
page_html = load_page(url=amazon_url_with_referer, driver=chrome_driver, screenshot_type="_aftercaptcha")
product_data = scrape_data(page_html)
else:
logging.info("Couldn't solve the captcha, if there was any")
if not product_data: #if after applying the captcha we don't have any data yet, stop the execution and reply to the user
logging.info("Unable to get the product information")
context.bot.send_message(chat_id=chat_id, text="Unable to get product attributes from the provided url", reply_to_message_id=message_id)
return
logging.info(f"Product information found: {product_data}")
context.bot.deleteMessage(chat_id=chat_id, message_id=message_id)
product_id = dbhelper.check_product(amazon_url_with_referer, product_data.price)
if not product_id:
product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat_id, msg, amazon_url_with_referer, product_data.title, product_data.price, product_data.image)
helpers.create_image(product_id, product_data.price)
keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{amazon_url_with_referer}")]]
markup = InlineKeyboardMarkup(keyboard)
context.bot.send_photo(chat_id=chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{product_data.title}", reply_markup=markup)
def main() -> None:
dbhelper.setup_db()
updater = Updater(constants.TELEGRAM_API_KEY)
dispatcher = updater.dispatcher
dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, find_amazon_link))
updater.start_polling()
updater.idle()
if __name__ == '__main__':
main()