Files
amazon-telegram/bot/bot.py

116 lines
5.0 KiB
Python

import logging
import re
import random
import dbhelper
import constants
import helpers
import parser
import time
from amazoncaptcha import AmazonCaptcha
from bs4 import BeautifulSoup
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium import webdriver
from telegram import Update, ForceReply, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
def get_chrome_options():
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_prefs = {}
chrome_options.experimental_options["prefs"] = chrome_prefs
chrome_prefs["profile.default_content_settings"] = {"images": 2}
return chrome_options
def find_amazon_link(update: Update, context: CallbackContext) -> None:
productCode = ""
extraparams = ""
try:
msg = update.message.text
except AttributeError:
logging.info(f"Received message has no text")
return
start = msg.find("amzn.to")
if start != -1:
msg = helpers.unshort_url(msg[start:].split()[0])
start = msg.find("amzn.eu")
if start != -1:
msg = helpers.unshort_url(msg[start:].split()[0])
start = msg.find(constants.searchURL)
if start != -1:
msg = f"https://{msg[start:].split(' ')[0]}"
m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',msg) # Gets product code in amazon
m_e = re.search(r'(?:&m=[\w]*)',msg) # Gets vendor and keeps it
if m != None:
productCode = m.group(0)
if m_e != None:
extraparams = m_e.group(0)
user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id
logging.info(f"Link sent by {user} - {msg}")
logging.info("Setting up new Chrome Browser")
driver = webdriver.Chrome(options=get_chrome_options())
driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000))
logging.info("Loading Amazon webpage")
driver.get(msg)
logging.info("Scraping information and closing browser")
soup = BeautifulSoup(driver.page_source, "lxml")
etree_soup = BeautifulSoup(driver.page_source, "html.parser")
logging.info("Getting title...")
title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup)
if title == "":
logging.info(f"Title not found, not a valid product or captcha")
captcha = AmazonCaptcha.fromdriver(driver)
solution = captcha.solve()
if solution == 'Not solved':
logging.info("Couldn't solve the captcha, if there was any")
else:
logging.info(f"Captcha solution is {solution}, redirecting")
logging.info("Waiting for 5 seconds, humans are not that fast :)")
time.sleep(5)
fill_captcha_element = driver.find_element(By.ID, 'captchacharacters')
fill_captcha_element.send_keys(solution)
fill_captcha_element.send_keys(Keys.RETURN)
logging.info("Re-loading Amazon webpage")
driver.get(msg)
logging.info("Scraping information")
soup = BeautifulSoup(driver.page_source, "lxml")
etree_soup = BeautifulSoup(driver.page_source, "html.parser")
logging.info("Getting title...")
title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup, etree_soup)
if title == "":
logging.info(f"Title not found, not a valid product or failed captcha")
return
logging.info(f"Title found: {title}")
logging.info("Closing browser")
driver.close()
context.bot.deleteMessage(chat_id=chat['id'], message_id=message_id)
referurl = helpers.new_refer_url(productCode, extraparams)
product_id = dbhelper.check_product(referurl, price)
if not product_id:
product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, referurl, title, price, image)
helpers.create_image(product_id, price)
keyboard = [[InlineKeyboardButton("Ir a Amazon", url=f"{referurl}")]]
markup = InlineKeyboardMarkup(keyboard)
context.bot.send_photo(chat_id=update.message.chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{title}", reply_markup=markup)
def main() -> None:
dbhelper.setup_db()
updater = Updater(constants.TELEGRAM_API_KEY)
dispatcher = updater.dispatcher
dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, find_amazon_link))
updater.start_polling()
updater.idle()
if __name__ == '__main__':
main()