import time import pickle import os import logging from datetime import datetime import requests import json import base64 from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.action_chains import ActionChains from selenium import webdriver logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) SLEEP_QRCODE = int(os.environ.get("SLEEP_QRCODE")) #30 SLEEP_LONG = int(os.environ.get("SLEEP_LONG")) #15 SLEEP_SHORT = int(os.environ.get("SLEEP_SHORT")) #1 REQUEST_URL = os.environ.get("REQUEST_URL") MAIL = os.environ.get("MAIL") PHONE = os.environ.get("PHONE") def get_chrome_options(): chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("user-agent=User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36") chrome_options.add_argument("--user-data-dir=/app/data/chrome-data") chrome_prefs = {} chrome_options.experimental_options["prefs"] = chrome_prefs chrome_prefs["profile.default_content_settings"] = {"images": 2} return chrome_options def get_time(): return datetime.today().strftime('%Y-%m-%d-%H-%M-%S') def whatsapp_login(driver): driver.get("https://web.whatsapp.com/") time.sleep(SLEEP_LONG) logged_in = False location = "/app/data/login.png" driver.save_screenshot(location) if not("QR code" in driver.page_source or "código QR" in driver.page_source): logged_in = True while "QR code" in driver.page_source or "código QR" in driver.page_source: logging.info("Not logged in! Comencing loop") requests.get(f"{REQUEST_URL}/solicitasesion.php?mail={MAIL}&phone={PHONE}") logging.info(f"Requested session, waiting for {SLEEP_LONG} seconds...") time.sleep(SLEEP_LONG) while True: response = requests.get(f"{REQUEST_URL}/abresesion.php") logging.info("Checking if someones there...") if response.content == "SI": logging.info("Someones there! Getting QR code...") driver.get("https://web.whatsapp.com/") time.sleep(SLEEP_LONG) driver.save_screenshot(location) with open(location, "rb") as qrcode: encoded_string = base64.b64encode(qrcode.read()) requests.post(f"{REQUEST_URL}/qrsesion.php", data=encoded_string) logging.info("QR code sent through POST request! Waiting for user to scan it") time.sleep(SLEEP_QRCODE) driver.get("https://web.whatsapp.com/") logging.info("Loading Whatsapp page again...") time.sleep(SLEEP_LONG) if "QR code" in driver.page_source or "código QR" in driver.page_source: logging.info("Not logged in yet, doing another loop") else: logging.info("Logged in succesfully, continuing!") logged_in = True break time.sleep(5) pickle.dump(driver.get_cookies(), open("/app/data/cookies.pkl", "wb")) return logged_in def setup_driver(): driver = webdriver.Chrome(options=get_chrome_options()) driver.set_window_size(1280, 720) if os.path.isfile("/app/data/cookies.pkl"): cookies = pickle.load(open("/app/data/cookies.pkl", "rb")) for cookie in cookies: driver.add_cookie(cookie) return driver def message_check(message, status): try: requests.get(f"{REQUEST_URL}/message_status?id={message['id']}&status={status}") except Exception as e: logging.error(f"Error sending message check: {e}") def send_messages(messages): driver = setup_driver() if not whatsapp_login(driver): return False for message in messages: driver.get(f"https://web.whatsapp.com/send?phone={message['number']}&text={message['message']}") time.sleep(SLEEP_LONG) actions = ActionChains(driver) actions.send_keys(Keys.ENTER) actions.perform() message_check(message, 'ok') driver.close() return True def main() -> None: #messages_json = '[{"id": "1", "number": "+34666666666", "message": "Mensaje de prueba 1"}]' while True: time.sleep(5) try: logging.info("Checking for new messages...") response = requests.get(REQUEST_URL) if response.content != '': messages = json.loads(response.content) if send_messages(messages): logging.info("Messages sent successful") else: logging.error("There was an error sending the messages") except Exception as e: logging.info(f"Error: {e}") if __name__ == '__main__': main()