Files
wallamanta/wallamanta/walladb.py
2024-10-07 14:08:01 +02:00

371 lines
14 KiB
Python

import mysql.connector
from mysql.connector import errorcode
import logging
import constants
import helpers
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def connect_db():
db = mysql.connector.connect(
host = constants.DB_HOST,
user = constants.DB_USER,
password = constants.DB_PASSWORD,
database = constants.DB_NAME
)
return db
def setup_db():
TABLES = {}
TABLES['users'] = (
"CREATE TABLE `users` ("
" `telegram_user_id` bigint NOT NULL,"
" `active` boolean NOT NULL,"
" `type` varchar(50) NOT NULL,"
" `until` date NOT NULL,"
" `telegram_name` varchar(255) NOT NULL,"
" `created_at` timestamp DEFAULT CURRENT_TIMESTAMP,"
" PRIMARY KEY (`telegram_user_id`)"
") ENGINE=InnoDB")
TABLES['products'] = (
"CREATE TABLE `products` ("
" `id` int AUTO_INCREMENT PRIMARY KEY,"
" `product_name` varchar(255) NOT NULL,"
" `distance` int NOT NULL,"
" `latitude` varchar(20) NOT NULL,"
" `longitude` varchar(20) NOT NULL,"
" `condition` varchar(20) NULL,"
" `min_price` int NOT NULL,"
" `max_price` int NOT NULL,"
" `category` varchar(255) NULL,"
" `subcategory` varchar(255) NULL,"
" `title_exclude` text NULL,"
" `title_description_exclude` text NULL,"
" `telegram_user_id` bigint NOT NULL,"
" `modified_at` timestamp NOT NULL,"
" `created_at` timestamp DEFAULT CURRENT_TIMESTAMP"
") ENGINE=InnoDB")
with connect_db() as con:
with con.cursor(prepared=True) as cur:
for table_name in TABLES:
table_description = TABLES[table_name]
try:
logging.info(f"Creating table {table_name}: ")
cur.execute(table_description)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
logging.info("already exists.")
else:
logging.info(err.msg)
else:
logging.info("OK")
def is_user_active(telegram_user_id):
if telegram_user_id < 0:
ret = False
else:
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True", params)
try:
ret = cur.fetchone() != None
except:
ret = False
return ret
def is_user_expired(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
res = cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params)
try:
q_res = cur.fetchone()
except:
q_res = None
ret = True
if q_res != None:
if not helpers.is_date_expired(q_res[0]):
ret = False
return ret
def is_user_premium(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='premium'", params)
try:
ret = cur.fetchone() != None
except:
ret = False
return ret
def is_user_homelabs(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='homelabs'", params)
try:
ret = cur.fetchone() != None
except:
ret = False
return ret
def is_user_testing(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s AND active=True AND type='testing'", params)
try:
ret = cur.fetchone() != None
except:
ret = False
return ret
def enable_user(telegram_user_id, telegram_name):
found = False
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id=%s", params)
try:
if res == None:
params = (telegram_user_id, False, None, None, telegram_name.first_name)
cur.execute("INSERT INTO users VALUES (%s, %s, %s, %s, %s)", params)
con.commit()
logging.info(f"Enabled user {telegram_user_id} - {telegram_name.first_name}")
else:
found = True
logging.info(f"User {telegram_user_id} - {telegram_name.first_name} was already enabled")
except Exception as e:
logging.error(f"Couldn't find username with id {telegram_user_id}: {e}")
return found
def add_premium_user(telegram_user_id, until):
found = False
with connect_db() as con:
with con.cursor(prepared=True) as cur:
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id={telegram_user_id}")
if res != None:
params = (until, telegram_user_id)
cur.execute(f"UPDATE users SET active = True, type = 'premium', until = %s WHERE telegram_user_id=%s", params)
con.commit()
found = True
logging.info(f"Added premium user {telegram_user_id} until {until}")
return found
def add_test_user(telegram_user_id, until):
found = False
with connect_db() as con:
with con.cursor(prepared=True) as cur:
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id={telegram_user_id}")
if res != None:
params = (until, telegram_user_id)
cur.execute(f"UPDATE users SET active = True, type = 'testing', until = %s WHERE telegram_user_id=%s", params)
con.commit()
found = True
logging.info(f"Added testing user {telegram_user_id} until {until}")
return found
def remove_valid_user(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
res = cur.execute(f"SELECT * FROM users WHERE telegram_user_id={telegram_user_id}")
if res != None:
cur.execute(f"UPDATE users SET active = False WHERE telegram_user_id=%s", params)
con.commit()
logging.info(f"De-activated user {telegram_user_id}")
def get_code(code):
with connect_db() as con:
with con.cursor(dictionary=True, prepared=True) as cur:
params = (code,)
res = cur.execute(f"SELECT * FROM codes WHERE code_id=%s", params)
ret = cur.fetchone()
return ret
def use_code(code):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (code,)
res = cur.execute(f"SELECT * FROM codes WHERE code_id=%s", params)
if res.fetchone() != None:
cur.execute(f"UPDATE codes SET used = True WHERE code_id=%s", params)
con.commit()
def get_user_list():
with connect_db() as con:
with con.cursor(dictionary=True, prepared=True) as cur:
res = cur.execute(f"SELECT * FROM users")
ret = cur.fetchall()
return ret
def get_active_user_list():
with connect_db() as con:
with con.cursor(dictionary=True, prepared=True) as cur:
res = cur.execute(f"SELECT * FROM users WHERE active=True")
ret = cur.fetchall()
return ret
def get_user(telegram_user_id):
with connect_db() as con:
with con.cursor(dictionary=True, prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"SELECT telegram_name FROM users WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchone()['telegram_name']
except Exception as e:
logging.error(f"Couldn't find username with id {telegram_user_id}: {e}")
ret = 'NoName'
return ret
def get_user_type(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"SELECT type FROM users WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchone()[0]
except:
ret = None
return ret
def get_user_until(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"SELECT until FROM users WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchone()[0]
except:
ret = None
return ret
def deactivate_user(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"UPDATE users SET active=False WHERE telegram_user_id=%s", params)
con.commit()
logging.info(f"De-activated user {get_user(telegram_user_id)} ({telegram_user_id})")
def activate_user(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"UPDATE users SET active=True WHERE telegram_user_id=%s", params)
con.commit()
logging.info(f"Activated user {get_user(telegram_user_id)} ({telegram_user_id})")
def get_product(product):
product_name = product.get('product_name').lower()
telegram_user_id = product.get('telegram_user_id')
with connect_db() as con:
with con.cursor(dictionary=True, prepared=True) as cur:
params = (telegram_user_id, product_name)
cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s \
AND product_name=%s", params)
try:
ret = cur.fetchone()
except:
ret = None
return ret
def get_products_from_user(telegram_user_id):
with connect_db() as con:
with con.cursor(dictionary=True, prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"SELECT * FROM products WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchall()
except:
ret = None
return ret
def get_all_products():
with connect_db() as con:
with con.cursor(dictionary=True, prepared=True) as cur:
cur.execute(f"SELECT * FROM products")
try:
ret = cur.fetchall()
except:
ret = None
return ret
def get_all_valid_products():
with connect_db() as con:
with con.cursor(dictionary=True, prepared=True) as cur:
cur.execute(f"SELECT products.* FROM products WHERE products.telegram_user_id IN (SELECT users.telegram_user_id FROM users WHERE active = 1)")
try:
ret = cur.fetchall()
except:
ret = None
return ret
def add_product(product):
condition = 'all'
product_name = product.get('product_name').lower()
distance = product.get('distance', 0)
if int(distance) > 4000:
distance = 4000
latitude = product.get('latitude', constants.LATITUDE)
longitude = product.get('longitude', constants.LONGITUDE)
min_price = product.get('min_price')
max_price = product.get('max_price')
title_exclude = product.get('title_exclude', '')
title_description_exclude = product.get('title_description_exclude', '')
category = product.get('category', '')
subcategory = product.get('subcategory', '')
if category == '0':
category = ''
telegram_user_id = product.get('telegram_user_id')
logging.info(f"Trying to add: {product_name}, {telegram_user_id}")
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (product_name, \
distance, latitude, longitude, condition, min_price, \
max_price, category, subcategory, title_exclude, title_description_exclude, telegram_user_id)
cur.execute("INSERT INTO `products` (`product_name`, `distance`, `latitude`, `longitude`, `condition`, `min_price`, `max_price`, `category`, `subcategory`, `title_exclude`, `title_description_exclude`, `telegram_user_id`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", params)
con.commit()
logging.info(f"{product_name} added for {telegram_user_id}")
def remove_product(product):
telegram_user_id = product.get('telegram_user_id')
product_name = product.get('product_name').lower()
removed = False
if get_product(product):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id, product_name)
cur.execute(f"DELETE FROM products WHERE telegram_user_id=%s \
AND product_name=%s", params)
con.commit()
logging.info(f"Removed product {product['product_name']}")
removed = True
return removed
def count_user_products(telegram_user_id):
with connect_db() as con:
with con.cursor(prepared=True) as cur:
params = (telegram_user_id,)
cur.execute(f"SELECT Count(*) FROM products WHERE telegram_user_id=%s", params)
try:
ret = cur.fetchone()[0]
except:
ret = None
return ret