import time import pickle import os import logging from datetime import datetime import requests import json from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium import webdriver logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) SLEEP_QRCODE = int(os.environ.get("SLEEP_QRCODE")) #30 SLEEP_LONG = int(os.environ.get("SLEEP_LONG")) #15 SLEEP_SHORT = int(os.environ.get("SLEEP_SHORT")) #1 REQUEST_URL = os.environ.get("REQUEST_URL") def get_chrome_options(): chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("user-agent=User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36") chrome_options.add_argument("--user-data-dir=/app/data/chrome-data") chrome_prefs = {} chrome_options.experimental_options["prefs"] = chrome_prefs chrome_prefs["profile.default_content_settings"] = {"images": 2} return chrome_options def get_time(): return datetime.today().strftime('%Y-%m-%d-%H-%M-%S') def find_number(number, driver): logging.info(f"Searching for {number} in self conversation") messages = driver.find_elements(By.CLASS_NAME, "selectable-text") for message in messages: if number in message.text: message.click() time.sleep(SLEEP_SHORT) ltr = driver.find_elements(By.XPATH, '//span[@dir = "ltr"]') for chat_with in ltr: if number == chat_with.text.replace(" ", ""): logging.info(f"{number} found") return chat_with time.sleep(SLEEP_SHORT) break def send_qr_code(location): #requests.post(f"{REQUEST_URL}/send_qr", ) pass def whatsapp_login(driver, tri=1): driver.get("https://web.whatsapp.com/") time.sleep(SLEEP_LONG) if tri > 5: logging.error(f"Couldn't login after {tri} tries") return False try: driver.find_element(By.XPATH, '//span[@data-testid = "qrcode"]') logging.info("Not logged in! Saving QR code and waiting 30 seconds") location = f"/app/data/login-{get_time()}.png" driver.save_screenshot(location) send_qr_code(location) time.sleep(SLEEP_QRCODE) pickle.dump(driver.get_cookies(), open("/app/data/cookies.pkl", "wb")) return whatsapp_login(driver, tri) except: logging.info("Already logged in") return True def setup_driver(): driver = webdriver.Chrome(options=get_chrome_options()) driver.set_window_size(1280, 720) if os.path.isfile("/app/data/cookies.pkl"): cookies = pickle.load(open("/app/data/cookies.pkl", "rb")) for cookie in cookies: driver.add_cookie(cookie) return driver def go_to_self(driver): logging.info("Going to self conversation") new_chats = driver.find_element(By.XPATH, '//span[@data-testid = "chat"]') new_chats.click() time.sleep(SLEEP_SHORT) user = driver.find_element(By.XPATH, '//span[@data-testid = "you-label"]') user.click() time.sleep(SLEEP_SHORT) def send_message(message, driver): logging.info(f"Sending message: {message}") input_box = driver.find_element(By.XPATH, '//div[@data-testid = "conversation-compose-box-input"]') input_box.send_keys(message + Keys.ENTER) time.sleep(SLEEP_SHORT) def message_check(message, status): try: requests.get(f"{REQUEST_URL}/message_status?id={message['id']}&status={status}") except Exception as e: logging.error(f"Error sending message check: {e}") def send_messages(messages): driver = setup_driver() if not whatsapp_login(driver): return False for message in messages: go_to_self(driver) conversation_link = find_number(message['number'], driver) if conversation_link == None: logging.info("Number not found, sending message to self") send_message(message['number'], driver) conversation_link = find_number(message['number'], driver) if conversation_link == None: logging.error(f"Number {message['number']} may not be valid, couldn't click on link to open chat") message_check(message, 'number_not_valid') continue conversation_link.click() time.sleep(SLEEP_SHORT) logging.info(f"Sending message to {message['number']}") send_message(message['message'], driver) message_check(message, 'ok') #driver.save_screenshot(f"/app/data/logged-{get_time()}.png") driver.close() return True def main() -> None: messages_json = '[{"id": "1", "number": "+34666666666", "message": "Mensaje de prueba 1"},{"id": "2", "number": "+34666666666", "message": "Mensaje de prueba 2"},{"id": "3", "number": "+34666666666", "message": "Mensaje de prueba 3"}]' while True: time.sleep(5) try: logging.info("Checking for new messages...") #response = requests.get(REQUEST_URL) #if response.content != '': if True: #messages = json.loads(response.content) messages = json.loads(messages_json) if send_messages(messages): logging.info("Messages sent successful") else: logging.error("There was an error sending the messages") except Exception as e: logging.info(f"Error: {e}") if __name__ == '__main__': main()