Initial commit, refactored some things from original code and removed WP functionality
This commit is contained in:
36
bot/Dockerfile
Normal file
36
bot/Dockerfile
Normal file
@@ -0,0 +1,36 @@
|
||||
FROM python:3.7
|
||||
|
||||
ENV TELEGRAM_API_KEY=5707028834:AAFlX07ObRGWmm15KtHgwqcPZ4OHy-MkMks
|
||||
|
||||
# Adding trusting keys to apt for repositories
|
||||
RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add -
|
||||
|
||||
# Adding Google Chrome to the repositories
|
||||
RUN sh -c 'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google-chrome.list'
|
||||
|
||||
# Updating apt to see and install Google Chrome
|
||||
RUN apt-get -y update
|
||||
|
||||
# Magic happens
|
||||
RUN apt-get install -y google-chrome-stable
|
||||
|
||||
# Installing Unzip
|
||||
RUN apt-get install -yqq unzip
|
||||
|
||||
# Download the Chrome Driver
|
||||
RUN wget -O /tmp/chromedriver.zip http://chromedriver.storage.googleapis.com/`curl -sS chromedriver.storage.googleapis.com/LATEST_RELEASE`/chromedriver_linux64.zip
|
||||
|
||||
# Unzip the Chrome Driver into /usr/local/bin directory
|
||||
RUN unzip /tmp/chromedriver.zip chromedriver -d /usr/local/bin/
|
||||
|
||||
# Set display port as an environment variable
|
||||
ENV DISPLAY=:99
|
||||
|
||||
RUN mkdir /app
|
||||
ADD requirements.txt /app
|
||||
ADD bot.py /app
|
||||
RUN pip install -r /app/requirements.txt
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
CMD [ "python", "/app/bot.py" ]
|
||||
86
bot/bot.py
Normal file
86
bot/bot.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import logging
|
||||
import re
|
||||
import random
|
||||
import dbhelper
|
||||
import constants
|
||||
import helpers
|
||||
import parser
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium import webdriver
|
||||
from telegram import Update, ForceReply
|
||||
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
|
||||
|
||||
def get_chrome_options():
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--headless")
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||
chrome_prefs = {}
|
||||
chrome_options.experimental_options["prefs"] = chrome_prefs
|
||||
chrome_prefs["profile.default_content_settings"] = {"images": 2}
|
||||
return chrome_options
|
||||
|
||||
def find_amazon_link(update: Update, context: CallbackContext) -> None:
|
||||
productCode = ""
|
||||
extraparams = ""
|
||||
try:
|
||||
msg = update.message.text
|
||||
except AttributeError:
|
||||
logging.info(f"Received message has no text")
|
||||
return
|
||||
start = msg.find("amzn.to")
|
||||
if start != -1:
|
||||
msg = helpers.unshort_url(msg[start:].split()[0])
|
||||
start = msg.find("amzn.eu")
|
||||
if start != -1:
|
||||
msg = helpers.unshort_url(msg[start:].split()[0])
|
||||
start = msg.find(constants.searchURL)
|
||||
if start != -1:
|
||||
msg = f"https://{msg[start:].split(' ')[0]}"
|
||||
m = re.search(r'(?:dp\/[\w]*)|(?:gp\/product\/[\w]*)',msg) # Gets product code in amazon
|
||||
m_e = re.search(r'(?:&m=[\w]*)',msg) # Gets vendor and keeps it
|
||||
if m != None:
|
||||
productCode = m.group(0)
|
||||
if m_e != None:
|
||||
extraparams = m_e.group(0)
|
||||
|
||||
user, chat, message_id = update.message.from_user, update.message.chat, update.message.message_id
|
||||
logging.info(f"Link sent by {user} - {msg}")
|
||||
|
||||
logging.info("Setting up new Chrome Browser")
|
||||
driver = webdriver.Chrome(options=get_chrome_options())
|
||||
driver.set_window_size(random.randint(1200, 1800), random.randint(600, 1000))
|
||||
logging.info("Loading Amazon webpage")
|
||||
driver.get(msg)
|
||||
logging.info("Scraping information and closing browser")
|
||||
soup = BeautifulSoup(driver.page_source, "lxml")
|
||||
driver.close()
|
||||
|
||||
logging.info("Getting title...")
|
||||
title, price, image = parser.get_title(soup), parser.get_price(soup), parser.get_image(soup)
|
||||
if title == "":
|
||||
logging.info(f"Title not found, not a valid product")
|
||||
return
|
||||
logging.info(f"Title found: {title}")
|
||||
|
||||
context.bot.deleteMessage(chat_id=chat['id'], message_id=message_id)
|
||||
referurl = helpers.new_refer_url(productCode, extraparams)
|
||||
product_id = dbhelper.check_product(referurl, price)
|
||||
if not product_id:
|
||||
product_id = dbhelper.add_product(user['username'], chat['title'], user['id'], chat['id'], msg, referurl, title, price, image)
|
||||
helpers.create_image(product_id, price)
|
||||
context.bot.send_photo(chat_id=update.message.chat_id, photo=open(f"/app/data/images/products/{product_id}_composed.png", 'rb'), caption=f"URL enviada por @{user['username']}: \n\n{title} \n\n{referurl}")
|
||||
|
||||
def main() -> None:
|
||||
dbhelper.setup_db()
|
||||
|
||||
updater = Updater(constants.TELEGRAM_API_KEY)
|
||||
dispatcher = updater.dispatcher
|
||||
dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, find_amazon_link))
|
||||
updater.start_polling()
|
||||
updater.idle()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
18
bot/constants.py
Normal file
18
bot/constants.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import os
|
||||
|
||||
TELEGRAM_API_KEY = os.getenv('TELEGRAM_API_KEY')
|
||||
baseURL = os.environ['baseURL']
|
||||
affiliate_tag = os.environ['affiliate_tag']
|
||||
HEADERS = ({'User-Agent':
|
||||
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36',
|
||||
'Accept-Language': 'en-US, en;q=0.5'})
|
||||
DB = '/data/amazon.db'
|
||||
|
||||
if baseURL.startswith("https://www."):
|
||||
searchURL = baseURL[12:]
|
||||
elif baseURL.startswith("http://www."):
|
||||
searchURL = baseURL[11:]
|
||||
baseURL = "https://www."+searchURL
|
||||
else:
|
||||
searchURL = baseURL
|
||||
baseURL = "https://www."+baseURL
|
||||
47
bot/dbhelper.py
Normal file
47
bot/dbhelper.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import sqlite3
|
||||
import requests
|
||||
import constants
|
||||
import helpers
|
||||
|
||||
def setup_db():
|
||||
con = sqlite3.connect(constants.DB)
|
||||
cur = con.cursor()
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS amazon(id INTEGER PRIMARY KEY AUTOINCREMENT, \
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, tg_user TEXT, tg_group TEXT, \
|
||||
tg_user_id INTEGER, tg_group_id INTEGER, url TEXT, referurl TEXT, price TEXT, \
|
||||
title TEXT, image TEXT)")
|
||||
con.close()
|
||||
|
||||
def add_product(tg_user, tg_group, tg_user_id, tg_group_id, url, referurl, title, price, image):
|
||||
con = sqlite3.connect(constants.DB)
|
||||
cur = con.cursor()
|
||||
cur.execute(f"INSERT INTO amazon (tg_user, tg_group, tg_user_id, tg_group_id, url, referurl, price, title, image) \
|
||||
VALUES ('{tg_user}', '{tg_group}', {tg_user_id}, {tg_group_id}, '{url}', '{referurl}', '{price}', '{title}', '{image}')")
|
||||
product_id = cur.lastrowid
|
||||
con.commit()
|
||||
con.close()
|
||||
|
||||
response = requests.get(image, headers=constants.HEADERS)
|
||||
file = open(f"/app/data/images/products/{product_id}.jpg", "wb")
|
||||
file.write(response.content)
|
||||
file.close()
|
||||
|
||||
return product_id
|
||||
|
||||
def check_product(referurl, price):
|
||||
con = sqlite3.connect(constants.DB)
|
||||
cur = con.cursor()
|
||||
cur.execute(f"SELECT * FROM amazon WHERE referurl='{referurl}' ORDER BY id DESC")
|
||||
result = cur.fetchone()
|
||||
|
||||
if result is None:
|
||||
helpers.logging.info("New entry, creating in DB")
|
||||
return False
|
||||
else:
|
||||
helpers.logging.info("Already exists, checking price")
|
||||
if price == result[8]:
|
||||
helpers.logging.info("Price is the same, retrieving link")
|
||||
return result[0]
|
||||
else:
|
||||
helpers.logging.info("Price is different, creating new entry in DB")
|
||||
return False
|
||||
64
bot/helpers.py
Normal file
64
bot/helpers.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import logging
|
||||
import constants
|
||||
import requests
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
# Enable logging
|
||||
logging.basicConfig(
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def create_image(product_id, price):
|
||||
width = 1280
|
||||
height = 800
|
||||
baseheight = int(height * 0.85)
|
||||
# límite de ancho para la parte izquierda (producto)
|
||||
wlimit = int(((width / 3) * 2) - 80)
|
||||
# límite de ancho para los logos (homelabers y amazon)
|
||||
wlogo = int(width * 0.2)
|
||||
# fuente y tamaño
|
||||
font = ImageFont.truetype("/app/data/fonts/Roboto-Bold.ttf", 90)
|
||||
# inicializamos canvas
|
||||
image = Image.new('RGBA', (width, height), (255, 255, 255))
|
||||
# logo homelabers, redimensionamos y ponemos en la parte derecha arriba
|
||||
logo_image = Image.open("/app/data/images/logo.png").convert("RGBA")
|
||||
lpercent = wlogo / float(logo_image.size[0])
|
||||
hlogo = int((float(logo_image.size[1]) * float(lpercent)))
|
||||
logo_image = logo_image.resize((wlogo, hlogo), Image.Resampling.LANCZOS)
|
||||
image.paste(logo_image, (int((width / 6) * 5 - logo_image.size[0] / 2), int(height * 0.1)), logo_image)
|
||||
# logo amazon, redimensionamos y ponemos en la parte derecha abajo
|
||||
amazon_logo = Image.open("/app/data/images/Amazon_logo.png").convert("RGBA")
|
||||
lpercent = wlogo / float(amazon_logo.size[0])
|
||||
hlogo = int((float(amazon_logo.size[1]) * float(lpercent)))
|
||||
amazon_logo = amazon_logo.resize((wlogo, hlogo), Image.Resampling.LANCZOS)
|
||||
image.paste(amazon_logo, (int((width / 6) * 5 - amazon_logo.size[0] / 2), int(height - height * 0.2)), amazon_logo)
|
||||
# dibujamos rectángulo verde externo, con un margen externo y ancho determinado
|
||||
draw = ImageDraw.Draw(image)
|
||||
wtext, htext = draw.textsize(price, font=font)
|
||||
draw.text(((width / 6) * 5 - wtext / 2, height / 2 - htext / 2), price, (0, 0, 0), font=font)
|
||||
draw.rectangle([15, 15, width - 15, height - 15], width = 15, outline="#20e163")
|
||||
# ponemos la imagen del producto en la parte izquierda y se redimensiona dependiendo de lo ancho
|
||||
product_image = Image.open(f"/app/data/images/products/{product_id}.jpg")
|
||||
hpercent = (baseheight / float(product_image.size[1]))
|
||||
wsize = int((float(product_image.size[0]) * float(hpercent)))
|
||||
if wsize < wlimit:
|
||||
product_image = product_image.resize((wsize, baseheight), Image.Resampling.LANCZOS)
|
||||
else:
|
||||
wpercent = wlimit / float(product_image.size[0])
|
||||
hsize = int((float(product_image.size[1]) * float(wpercent)))
|
||||
product_image = product_image.resize((wlimit, hsize), Image.Resampling.LANCZOS)
|
||||
image.paste(product_image, (int((width/3)-(product_image.size[0]/2)), int((height/2) - (product_image.size[1]/2))))
|
||||
# guardamos la imagen con otro nombre
|
||||
image.save(f"/app/data/images/products/{product_id}_composed.png", quality=95)
|
||||
|
||||
def new_refer_url(pcode, extraparams=None):
|
||||
return constants.baseURL+pcode+"?tag="+constants.affiliate_tag+extraparams
|
||||
|
||||
def unshort_url(url):
|
||||
session = requests.Session()
|
||||
#resp = session.head("https://"+url, allow_redirects=True)
|
||||
resp = session.get("https://"+url, allow_redirects=True)
|
||||
logging.info(f"Unshorted URL: {resp.url}")
|
||||
return resp.url
|
||||
32
bot/parser.py
Normal file
32
bot/parser.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import helpers
|
||||
|
||||
def get_title(soup):
|
||||
try:
|
||||
title = soup.find("span", attrs={"id":'productTitle'})
|
||||
title_value = title.string
|
||||
title_string = title_value.strip()
|
||||
except AttributeError as err:
|
||||
helpers.logging.info(f"Couldn't get title: {err}")
|
||||
title_string = ""
|
||||
|
||||
return title_string
|
||||
|
||||
def get_price(soup):
|
||||
try:
|
||||
price = soup.find("span", attrs={'class':'a-offscreen'}).string.strip()
|
||||
# A veces mete el título en el precio
|
||||
if "€" not in price:
|
||||
price = "N/A"
|
||||
except AttributeError:
|
||||
price = "N/A"
|
||||
|
||||
return price
|
||||
|
||||
def get_image(soup):
|
||||
try:
|
||||
image = soup.find("img", attrs={'id':'landingImage'})
|
||||
image = image.get('src')
|
||||
except AttributeError:
|
||||
image = "N/A"
|
||||
|
||||
return image
|
||||
6
bot/requirements.txt
Normal file
6
bot/requirements.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
python-telegram-bot==13.13
|
||||
requests==2.28.1
|
||||
beautifulsoup4==4.11.1
|
||||
lxml==4.9.1
|
||||
selenium==4.4.0
|
||||
Pillow==9.2.0
|
||||
Reference in New Issue
Block a user