139 lines
5.1 KiB
Python
139 lines
5.1 KiB
Python
import time
|
|
import pickle
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
import requests
|
|
import json
|
|
import base64
|
|
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
from selenium import webdriver
|
|
|
|
logging.basicConfig(
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SLEEP_QRCODE = int(os.environ.get("SLEEP_QRCODE")) #30
|
|
SLEEP_LONG = int(os.environ.get("SLEEP_LONG")) #15
|
|
SLEEP_SHORT = int(os.environ.get("SLEEP_SHORT")) #1
|
|
REQUEST_URL = os.environ.get("REQUEST_URL")
|
|
MAIL = os.environ.get("MAIL")
|
|
PHONE = os.environ.get("PHONE")
|
|
|
|
def get_chrome_options():
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("user-agent=User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36")
|
|
chrome_options.add_argument("--user-data-dir=/app/data/chrome-data")
|
|
chrome_prefs = {}
|
|
chrome_options.experimental_options["prefs"] = chrome_prefs
|
|
chrome_prefs["profile.default_content_settings"] = {"images": 2}
|
|
return chrome_options
|
|
|
|
def get_time():
|
|
return datetime.today().strftime('%Y-%m-%d-%H-%M-%S')
|
|
|
|
def whatsapp_login(driver):
|
|
driver.get("https://web.whatsapp.com/")
|
|
time.sleep(SLEEP_LONG)
|
|
|
|
logged_in = False
|
|
location = "/app/data/login.png"
|
|
driver.save_screenshot(location)
|
|
|
|
if not("QR code" in driver.page_source or "código QR" in driver.page_source):
|
|
logged_in = True
|
|
|
|
while "QR code" in driver.page_source or "código QR" in driver.page_source:
|
|
logging.info("Not logged in! Comencing loop")
|
|
requests.get(f"{REQUEST_URL}/solicitasesion.php?mail={MAIL}&phone={PHONE}")
|
|
logging.info(f"Requested session, waiting for {SLEEP_LONG} seconds...")
|
|
time.sleep(SLEEP_LONG)
|
|
while True:
|
|
response = requests.get(f"{REQUEST_URL}/abresesion.php")
|
|
logging.info("Checking if someones there...")
|
|
if "SI" in response.content:
|
|
logging.info("Someones there! Getting QR code...")
|
|
driver.get("https://web.whatsapp.com/")
|
|
time.sleep(SLEEP_LONG)
|
|
driver.save_screenshot(location)
|
|
with open(location, "rb") as qrcode:
|
|
files = {'qr_image': qrcode}
|
|
#encoded_string = base64.b64encode(qrcode.read())
|
|
requests.post(f"{REQUEST_URL}/qrsesion.php", files=files)
|
|
logging.info("QR code sent through POST request! Waiting for user to scan it")
|
|
time.sleep(SLEEP_QRCODE)
|
|
driver.get("https://web.whatsapp.com/")
|
|
logging.info("Loading Whatsapp page again...")
|
|
time.sleep(SLEEP_LONG)
|
|
if "QR code" in driver.page_source or "código QR" in driver.page_source:
|
|
logging.info("Not logged in yet, doing another loop")
|
|
else:
|
|
logging.info("Logged in succesfully, continuing!")
|
|
logged_in = True
|
|
break
|
|
time.sleep(5)
|
|
pickle.dump(driver.get_cookies(), open("/app/data/cookies.pkl", "wb"))
|
|
|
|
return logged_in
|
|
|
|
def setup_driver():
|
|
driver = webdriver.Chrome(options=get_chrome_options())
|
|
driver.set_window_size(1280, 720)
|
|
|
|
if os.path.isfile("/app/data/cookies.pkl"):
|
|
cookies = pickle.load(open("/app/data/cookies.pkl", "rb"))
|
|
for cookie in cookies:
|
|
driver.add_cookie(cookie)
|
|
|
|
return driver
|
|
|
|
def message_check(message, status):
|
|
try:
|
|
requests.get(f"{REQUEST_URL}/message_status.php?id={message['id']}&status={status}")
|
|
except Exception as e:
|
|
logging.error(f"Error sending message check: {e}")
|
|
|
|
def send_messages(messages):
|
|
driver = setup_driver()
|
|
if not whatsapp_login(driver):
|
|
return False
|
|
|
|
for message in messages:
|
|
driver.get(f"https://web.whatsapp.com/send?phone={message['number']}&text={message['message']}")
|
|
time.sleep(SLEEP_LONG)
|
|
actions = ActionChains(driver)
|
|
actions.send_keys(Keys.ENTER)
|
|
actions.perform()
|
|
message_check(message, 'ok')
|
|
|
|
driver.close()
|
|
return True
|
|
|
|
def main() -> None:
|
|
#messages_json = '[{"id": "1", "number": "+34666666666", "message": "Mensaje de prueba 1"}]'
|
|
while True:
|
|
time.sleep(5)
|
|
try:
|
|
logging.info("Checking for new messages...")
|
|
response = requests.get(f"{REQUEST_URL}/getmessages.php")
|
|
if response.content != '':
|
|
messages = json.loads(response.content)
|
|
if send_messages(messages):
|
|
logging.info("Messages sent successful")
|
|
else:
|
|
logging.error("There was an error sending the messages")
|
|
except Exception as e:
|
|
logging.info(f"Error: {e}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|