diff --git a/database.py b/database.py index 8b2757b8..e6727840 100644 --- a/database.py +++ b/database.py @@ -1,6 +1,6 @@ import json from datetime import datetime -from typing import Any +from typing import Union, List, Optional, Dict, Any import asyncpg import pytz @@ -9,14 +9,19 @@ from logger import logger -async def create_temporary_data(session, tg_id: int, state: str, data: dict): - """Сохраняет временные данные пользователя.""" +# --------------------- Temporary Data --------------------- # + + +async def create_temporary_data(session: asyncpg.Connection, tg_id: int, state: str, data: dict) -> None: + """ + Сохраняет временные данные пользователя. + """ await session.execute( """ INSERT INTO temporary_data (tg_id, state, data, updated_at) VALUES ($1, $2, $3, $4) ON CONFLICT (tg_id) - DO UPDATE SET state = $2, data = $3, updated_at = $4 + DO UPDATE SET state = EXCLUDED.state, data = EXCLUDED.data, updated_at = EXCLUDED.updated_at """, tg_id, state, @@ -25,27 +30,41 @@ async def create_temporary_data(session, tg_id: int, state: str, data: dict): ) -async def get_temporary_data(session, tg_id: int) -> dict | None: - """Извлекает временные данные пользователя.""" +async def get_temporary_data(session: asyncpg.Connection, tg_id: int) -> Optional[dict]: + """ + Извлекает временные данные пользователя. + """ result = await session.fetchrow("SELECT state, data FROM temporary_data WHERE tg_id = $1", tg_id) if result: return {"state": result["state"], "data": json.loads(result["data"])} return None -async def clear_temporary_data(session, tg_id: int): - # TODO rename delete_temporary_data +# TODO delete_temporary_data + + +async def clear_temporary_data(session: asyncpg.Connection, tg_id: int) -> None: + """ + Удаляет временные данные пользователя. + """ await session.execute("DELETE FROM temporary_data WHERE tg_id = $1", tg_id) -async def create_blocked_user(tg_id: int, conn: asyncpg.Connection): +# --------------------- Blocked Users --------------------- # + + +async def create_blocked_user(tg_id: int, conn: asyncpg.Connection) -> None: await conn.execute( - "INSERT INTO blocked_users (tg_id) VALUES ($1) ON CONFLICT (tg_id) DO NOTHING", + """ + INSERT INTO blocked_users (tg_id) + VALUES ($1) + ON CONFLICT (tg_id) DO NOTHING + """, tg_id, ) -async def delete_blocked_user(tg_id: int | list[int], conn: asyncpg.Connection): +async def delete_blocked_user(tg_id: Union[int, List[int]], conn: asyncpg.Connection) -> None: """ Удаляет пользователя или список пользователей из списка заблокированных. @@ -53,13 +72,25 @@ async def delete_blocked_user(tg_id: int | list[int], conn: asyncpg.Connection): :param conn: Подключение к базе данных """ if isinstance(tg_id, list): - await conn.execute("DELETE FROM blocked_users WHERE tg_id = ANY($1)", tg_id) + await conn.execute( + "DELETE FROM blocked_users WHERE tg_id = ANY($1)", + tg_id, + ) else: - await conn.execute("DELETE FROM blocked_users WHERE tg_id = $1", tg_id) + await conn.execute( + "DELETE FROM blocked_users WHERE tg_id = $1", + tg_id, + ) -async def init_db(file_path: str = "assets/schema.sql"): - with open(file_path) as file: +# --------------------- DB Initialization --------------------- # + + +async def init_db(file_path: str = "assets/schema.sql") -> None: + """ + Инициализирует базу данных, выполняя SQL-скрипты из указанного файла. + """ + with open(file_path, "r", encoding="utf-8") as file: sql_content = file.read() statements = [stmt.strip() for stmt in sql_content.split(";") if stmt.strip()] @@ -75,36 +106,40 @@ async def init_db(file_path: str = "assets/schema.sql"): await conn.close() -async def check_unique_server_name(server_name: str, session: Any, cluster_name: str | None = None) -> bool: +# --------------------- Servers --------------------- # + + +async def check_unique_server_name( + server_name: str, session: asyncpg.Connection, cluster_name: Optional[str] = None +) -> bool: """ Проверяет уникальность имени сервера. - - :param server_name: Имя сервера. - :param session: Сессия базы данных. - :param cluster_name: Имя кластера (опционально). - :return: True, если имя сервера уникально, False, если уже существует. """ if cluster_name: result = await session.fetchrow( - "SELECT 1 FROM servers WHERE server_name = $1 AND cluster_name = $2 LIMIT 1", server_name, cluster_name + """ + SELECT 1 FROM servers + WHERE server_name = $1 AND cluster_name = $2 + LIMIT 1 + """, + server_name, + cluster_name, ) else: - result = await session.fetchrow("SELECT 1 FROM servers WHERE server_name = $1 LIMIT 1", server_name) - + result = await session.fetchrow( + """ + SELECT 1 FROM servers + WHERE server_name = $1 + LIMIT 1 + """, + server_name, + ) return result is None -async def check_server_name_by_cluster(server_name: str, session: Any) -> dict | None: +async def check_server_name_by_cluster(server_name: str, session: asyncpg.Connection) -> Optional[dict]: """ Проверяет принадлежность сервера к кластеру. - - Args: - server_name (str): Имя сервера для проверки - session (Any): Сессия базы данных - - Returns: - dict | None: Словарь с информацией о кластере или None, если сервер не найден - - cluster_name (str): Название кластера """ try: cluster_info = await session.fetchrow( @@ -125,28 +160,115 @@ async def check_server_name_by_cluster(server_name: str, session: Any) -> dict | raise -async def create_coupon(coupon_code: str, amount: float, usage_limit: int, session: Any): +async def create_server( + cluster_name: str, + server_name: str, + api_url: str, + subscription_url: str, + inbound_id: int, + session: asyncpg.Connection, +) -> None: """ - Создает новый купон в базе данных. + Добавляет новый сервер в базу данных. + """ + try: + await session.execute( + """ + INSERT INTO servers (cluster_name, server_name, api_url, subscription_url, inbound_id) + VALUES ($1, $2, $3, $4, $5) + """, + cluster_name, + server_name, + api_url, + subscription_url, + inbound_id, + ) + logger.info(f"Сервер {server_name} успешно добавлен в кластер {cluster_name}") + except Exception as e: + logger.error(f"Ошибка при добавлении сервера {server_name} в кластер {cluster_name}: {e}") + raise + + +async def delete_server(server_name: str, session: asyncpg.Connection) -> None: + """ + Удаляет сервер из базы данных по его названию. + """ + try: + await session.execute( + """ + DELETE FROM servers + WHERE server_name = $1 + """, + server_name, + ) + logger.info(f"Сервер {server_name} успешно удалён из базы данных") + except Exception as e: + logger.error(f"Ошибка при удалении сервера {server_name} из базы данных: {e}") + raise + + +async def get_servers(session: asyncpg.Connection = None) -> dict: + """ + Возвращает словарь вида: + { + "cluster_name1": [ + { + "server_name": ..., + "api_url": ..., + "subscription_url": ..., + "inbound_id": ... + }, + ... + ], + "cluster_name2": [...] + } + """ + conn = session + try: + if conn is None: + conn = await asyncpg.connect(DATABASE_URL) + + result = await conn.fetch( + """ + SELECT cluster_name, server_name, api_url, subscription_url, inbound_id + FROM servers + """ + ) + servers = {} + for row in result: + cluster_name = row["cluster_name"] + if cluster_name not in servers: + servers[cluster_name] = [] + servers[cluster_name].append( + { + "server_name": row["server_name"], + "api_url": row["api_url"], + "subscription_url": row["subscription_url"], + "inbound_id": row["inbound_id"], + } + ) + return servers + except Exception as e: + logger.error(f"Ошибка при получении серверов: {e}") + raise + finally: + if conn is not None and session is None: + await conn.close() - Args: - coupon_code (str): Уникальный код купона. - amount (float): Сумма, которую дает купон. - usage_limit (int): Максимальное количество использований купона. - session (Any): Сессия базы данных для выполнения запроса. - Raises: - Exception: В случае ошибки при создании купона. +# --------------------- Coupons --------------------- # - Example: - await create_coupon('SALE50', 50.0, 5, session) + +async def create_coupon(coupon_code: str, amount: float, usage_limit: int, session: asyncpg.Connection) -> None: + """ + Создает новый купон в базе данных. """ try: await session.execute( """ INSERT INTO coupons (code, amount, usage_limit, usage_count, is_used) VALUES ($1, $2, $3, 0, FALSE) - """, + """, coupon_code, amount, usage_limit, @@ -157,31 +279,18 @@ async def create_coupon(coupon_code: str, amount: float, usage_limit: int, sessi raise -async def get_coupon_by_code(coupon_code: str, session: Any) -> dict | None: +async def get_coupon_by_code(coupon_code: str, session: asyncpg.Connection) -> Optional[dict]: """ Получает информацию о купоне по его коду. - - Args: - coupon_code (str): Код купона для поиска - session (Any): Сессия базы данных - - Returns: - dict | None: Словарь с информацией о купоне или None, если купон не найден - - id (int): ID купона - - usage_limit (int): Лимит использований - - usage_count (int): Текущее количество использований - - is_used (bool): Флаг использования - - amount (float): Сумма купона - - Raises: - Exception: В случае ошибки при выполнении запроса """ try: result = await session.fetchrow( """ SELECT id, usage_limit, usage_count, is_used, amount FROM coupons - WHERE code = $1 AND (usage_count < usage_limit OR usage_limit = 0) AND is_used = FALSE + WHERE code = $1 + AND (usage_count < usage_limit OR usage_limit = 0) + AND is_used = FALSE """, coupon_code, ) @@ -191,24 +300,9 @@ async def get_coupon_by_code(coupon_code: str, session: Any) -> dict | None: raise -async def get_all_coupons(session: Any, page: int = 1, per_page: int = 10): +async def get_all_coupons(session: asyncpg.Connection, page: int = 1, per_page: int = 10) -> dict: """ Получает список купонов из базы данных с пагинацией. - - Args: - session (Any): Сессия базы данных для выполнения запроса - page (int): Номер страницы (по умолчанию 1) - per_page (int): Количество купонов на странице (по умолчанию 10) - - Returns: - dict: Словарь с информацией о купонах и пагинации: - - coupons (list): Список словарей с информацией о купонах - - total (int): Общее количество купонов - - pages (int): Общее количество страниц - - current_page (int): Текущая страница - - Raises: - Exception: В случае ошибки при получении данных из базы """ try: offset = (page - 1) * per_page @@ -228,35 +322,27 @@ async def get_all_coupons(session: Any, page: int = 1, per_page: int = 10): logger.info(f"Успешно получено {len(coupons)} купонов из базы данных (страница {page})") - return {"coupons": coupons, "total": total_count, "pages": total_pages, "current_page": page} + return { + "coupons": coupons, + "total": total_count, + "pages": total_pages, + "current_page": page, + } except Exception as e: logger.error(f"Критическая ошибка при получении списка купонов: {e}") logger.exception("Трассировка стека ошибки получения купонов") return {"coupons": [], "total": 0, "pages": 0, "current_page": page} -async def delete_coupon(coupon_code: str, session: Any): +async def delete_coupon(coupon_code: str, session: asyncpg.Connection) -> bool: """ Удаляет купон из базы данных по его коду. - - Args: - coupon_code (str): Код купона для удаления - session (Any): Сессия базы данных для выполнения запроса - - Returns: - bool: True, если купон успешно удален, False если купон не найден или произошла ошибка - - Raises: - Exception: В случае ошибки при выполнении запроса к базе данных - - Example: - result = await delete_coupon('SALE50', session) """ try: coupon_record = await session.fetchrow( """ SELECT id FROM coupons WHERE code = $1 - """, + """, coupon_code, ) @@ -267,10 +353,9 @@ async def delete_coupon(coupon_code: str, session: Any): await session.execute( """ DELETE FROM coupons WHERE code = $1 - """, + """, coupon_code, ) - logger.info(f"Купон {coupon_code} успешно удален из базы данных") return True @@ -279,49 +364,103 @@ async def delete_coupon(coupon_code: str, session: Any): return False -async def update_trial(tg_id: int, status: int, session: Any): +async def create_coupon_usage(coupon_id: int, user_id: int, session: asyncpg.Connection) -> None: """ - Устанавливает статус триального периода для пользователя. + Создаёт запись об использовании купона в базе данных. + """ + try: + await session.execute( + """ + INSERT INTO coupon_usages (coupon_id, user_id, used_at) + VALUES ($1, $2, $3) + """, + coupon_id, + user_id, + datetime.utcnow(), + ) + logger.info(f"Создана запись об использовании купона {coupon_id} пользователем {user_id}") + except Exception as e: + logger.error(f"Ошибка при создании записи об использовании купона {coupon_id} пользователем {user_id}: {e}") + raise + + +async def check_coupon_usage(coupon_id: int, user_id: int, session: asyncpg.Connection) -> bool: + """ + Проверяет, использовал ли пользователь данный купон. + """ + try: + result = await session.fetchrow( + """ + SELECT 1 FROM coupon_usages + WHERE coupon_id = $1 + AND user_id = $2 + """, + coupon_id, + user_id, + ) + return result is not None + except Exception as e: + logger.error(f"Ошибка при проверке использования купона {coupon_id} пользователем {user_id}: {e}") + raise - Args: - tg_id (int): Telegram ID пользователя - status (int): Статус триального периода (0 - доступен, 1 - использован) - session (Any): Сессия базы данных - Returns: - bool: True, если статус успешно установлен, False в случае ошибки +async def update_coupon_usage_count(coupon_id: int, session: asyncpg.Connection) -> None: + """ + Обновляет счетчик использования купона и его статус. """ try: await session.execute( """ - INSERT INTO connections (tg_id, trial) - VALUES ($1, $2) - ON CONFLICT (tg_id) - DO UPDATE SET trial = $2 + UPDATE coupons + SET usage_count = usage_count + 1, + is_used = CASE + WHEN usage_count + 1 >= usage_limit + AND usage_limit > 0 + THEN TRUE + ELSE FALSE + END + WHERE id = $1 """, - tg_id, - status, + coupon_id, ) - status_text = "восстановлен" if status == 0 else "использован" - logger.info(f"Триальный период успешно {status_text} для пользователя {tg_id}") - return True + logger.info(f"Успешно обновлен счетчик использования купона {coupon_id}") except Exception as e: - logger.error(f"Ошибка при установке статуса триального периода для пользователя {tg_id}: {e}") - return False + logger.error(f"Ошибка при обновлении счетчика использования купона {coupon_id}: {e}") + raise -async def add_connection(tg_id: int, balance: float = 0.0, trial: int = 0, session: Any = None): +async def get_coupon_details(coupon_id: str, session: asyncpg.Connection) -> Optional[dict]: """ - Добавляет новое подключение для пользователя в базу данных. + Получает детали купона по его ID (или коду, если используется поле id как varchar). + """ + try: + record = await session.fetchrow( + """ + SELECT id, code, discount, usage_count, usage_limit, is_used + FROM coupons + WHERE id = $1 + """, + coupon_id, + ) + + if record: + logger.info(f"Успешно получены детали купона {coupon_id}") + return dict(record) + + logger.warning(f"Купон {coupon_id} не найден") + return None + + except Exception as e: + logger.error(f"Ошибка при получении деталей купона {coupon_id}: {e}") + raise + - Args: - tg_id (int): Telegram ID пользователя - balance (float, optional): Начальный баланс пользователя. По умолчанию 0.0. - trial (int, optional): Статус триального периода. По умолчанию 0. - session (Any, optional): Сессия базы данных. +# --------------------- Connections (Users + Balances) --------------------- # - Raises: - Exception: Если возникает ошибка при добавлении подключения в базу данных. + +async def add_connection(tg_id: int, balance: float, trial: int, session: asyncpg.Connection) -> None: + """ + Добавляет новое подключение для пользователя в базу данных. """ try: await session.execute( @@ -334,26 +473,19 @@ async def add_connection(tg_id: int, balance: float = 0.0, trial: int = 0, sessi trial, ) logger.info( - f"Успешно добавлено новое подключение для пользователя {tg_id} с балансом {balance} и статусом триала {trial}" + f"Успешно добавлено новое подключение для пользователя {tg_id} " + f"с балансом {balance} и статусом триала {trial}" ) except Exception as e: logger.error(f"Не удалось добавить подключение для пользователя {tg_id}. Причина: {e}") raise -async def check_connection_exists(tg_id: int): +async def check_connection_exists(tg_id: int) -> bool: """ Проверяет существование подключения для указанного пользователя в базе данных. - - Args: - tg_id (int): Telegram ID пользователя для проверки. - - Returns: - bool: True, если подключение существует, иначе False. - - Raises: - Exception: В случае ошибки при подключении к базе данных. """ + conn: asyncpg.Connection = None try: conn = await asyncpg.connect(DATABASE_URL) exists = await conn.fetchval( @@ -374,91 +506,192 @@ async def check_connection_exists(tg_id: int): await conn.close() -async def store_key( - tg_id: int, - client_id: str, - email: str, - expiry_time: int, - key: str, - server_id: str, - session: Any, -): +async def get_balance(tg_id: int, session: asyncpg.Connection = None) -> float: """ - Сохраняет информацию о ключе в базу данных. - - Args: - tg_id (int): Telegram ID пользователя - client_id (str): Уникальный идентификатор клиента - email (str): Электронная почта или имя устройства - expiry_time (int): Время истечения ключа в миллисекундах - key (str): Ключ доступа - server_id (str): Идентификатор сервера - - Raises: - Exception: Если возникает ошибка при сохранении ключа в базу данных + Получает баланс пользователя из базы данных. """ + conn = session try: - await session.execute( + if conn is None: + conn = await asyncpg.connect(DATABASE_URL) + + balance = await conn.fetchval( """ - INSERT INTO keys (tg_id, client_id, email, created_at, expiry_time, key, server_id) - VALUES ($1, $2, $3, $4, $5, $6, $7) + SELECT balance + FROM connections + WHERE tg_id = $1 """, tg_id, - client_id, - email, - int(datetime.utcnow().timestamp() * 1000), - expiry_time, - key, - server_id, ) - logger.info(f"Ключ успешно сохранен для пользователя {tg_id} на сервере {server_id}") + logger.info(f"Получен баланс для пользователя {tg_id}: {balance}") + return round(balance, 1) if balance is not None else 0.0 except Exception as e: - logger.error(f"Ошибка при сохранении ключа для пользователя {tg_id}: {e}") - raise + logger.error(f"Ошибка при получении баланса для пользователя {tg_id}: {e}") + return 0.0 + finally: + if conn is not None and session is None: + await conn.close() -async def get_keys(tg_id: int, session: Any): +async def update_balance(tg_id: int, amount: float, session: asyncpg.Connection = None, is_admin: bool = False) -> None: """ - Получает список ключей для указанного пользователя. + Обновляет баланс пользователя в базе данных. + Кэшбек применяется только для положительных сумм и если пополнение НЕ через админку. + """ + conn = session + try: + if conn is None: + conn = await asyncpg.connect(DATABASE_URL) - Args: - tg_id (int): Telegram ID пользователя + extra = 0 + if not is_admin and amount > 0 and CASHBACK > 0: + extra = amount * (CASHBACK / 100.0) - Returns: - list: Список записей ключей с информацией о клиенте, электронной почте, времени создания и ключе + total_amount = int(amount + extra) # приводим к целому - Raises: - Exception: В случае ошибки при подключении к базе данных или выполнении запроса - """ - try: - records = await session.fetch( + current_balance = await conn.fetchval( """ - SELECT * - FROM keys + SELECT balance + FROM connections WHERE tg_id = $1 """, tg_id, ) - logger.info(f"Успешно получено {len(records)} ключей для пользователя {tg_id}") - return records - except Exception as e: - logger.error(f"Ошибка при получении ключей для пользователя {tg_id}: {e}") - raise + current_balance = current_balance or 0 + new_balance = int(current_balance) + total_amount -async def get_keys_by_server(tg_id: int | None, server_id: str, session: Any): + await conn.execute( + """ + UPDATE connections + SET balance = $1 + WHERE tg_id = $2 + """, + new_balance, + tg_id, + ) + logger.info( + f"Баланс пользователя {tg_id} обновлен. Было: {int(current_balance)}, " + f"пополнение: {amount} " + f"({'+ кешбэк' if extra > 0 else 'без кешбэка'}), стало: {new_balance}" + ) + + if not is_admin and amount > 0: + # Для реферальной логики передаём текущую сессию, чтобы не открывать новую + await handle_referral_on_balance_update(tg_id, int(amount), conn) + + except Exception as e: + logger.error(f"Ошибка при обновлении баланса для пользователя {tg_id}: {e}") + raise + finally: + if conn is not None and session is None: + await conn.close() + + +async def update_trial(tg_id: int, status: int, session: asyncpg.Connection) -> bool: """ - Получает список ключей на определенном сервере. Если tg_id=None, возвращает все ключи на сервере. + Устанавливает статус триального периода для пользователя (0 - доступен, 1 - использован). + """ + try: + await session.execute( + """ + INSERT INTO connections (tg_id, trial) + VALUES ($1, $2) + ON CONFLICT (tg_id) + DO UPDATE SET trial = EXCLUDED.trial + """, + tg_id, + status, + ) + status_text = "восстановлен" if status == 0 else "использован" + logger.info(f"Триальный период успешно {status_text} для пользователя {tg_id}") + return True + except Exception as e: + logger.error(f"Ошибка при установке статуса триального периода для пользователя {tg_id}: {e}") + return False + + +async def get_trial(tg_id: int, session: asyncpg.Connection) -> int: + """ + Получает статус триала (0 - не использован, 1 - использован). + """ + try: + trial = await session.fetchval( + """ + SELECT trial + FROM connections + WHERE tg_id = $1 + """, + tg_id, + ) + logger.info(f"Получен статус триала для пользователя {tg_id}: {trial}") + return trial if trial is not None else 0 + except Exception as e: + logger.error(f"Ошибка при получении статуса триала для пользователя {tg_id}: {e}") + return 0 + + +# --------------------- Keys --------------------- # + + +async def store_key( + tg_id: int, + client_id: str, + email: str, + expiry_time: int, + key: str, + server_id: str, + session: asyncpg.Connection, +) -> None: + """ + Сохраняет информацию о ключе в базу данных. + """ + try: + await session.execute( + """ + INSERT INTO keys ( + tg_id, client_id, email, created_at, + expiry_time, key, server_id + ) + VALUES ($1, $2, $3, $4, $5, $6, $7) + """, + tg_id, + client_id, + email, + int(datetime.utcnow().timestamp() * 1000), + expiry_time, + key, + server_id, + ) + logger.info(f"Ключ успешно сохранен для пользователя {tg_id} на сервере {server_id}") + except Exception as e: + logger.error(f"Ошибка при сохранении ключа для пользователя {tg_id}: {e}") + raise + - Args: - tg_id (int | None): Telegram ID пользователя или None для всех пользователей - server_id (str): Идентификатор сервера +async def get_keys(tg_id: int, session: asyncpg.Connection) -> List[asyncpg.Record]: + """ + Получает список ключей для указанного пользователя. + """ + try: + records = await session.fetch( + """ + SELECT * + FROM keys + WHERE tg_id = $1 + """, + tg_id, + ) + logger.info(f"Успешно получено {len(records)} ключей для пользователя {tg_id}") + return records + except Exception as e: + logger.error(f"Ошибка при получении ключей для пользователя {tg_id}: {e}") + raise - Returns: - list: Список записей ключей с информацией о клиенте, электронной почте, времени создания и ключе - Raises: - Exception: В случае ошибки при подключении к базе данных или выполнении запроса +async def get_keys_by_server(tg_id: Optional[int], server_id: str, session: asyncpg.Connection) -> List[asyncpg.Record]: + """ + Получает список ключей на определенном сервере. Если tg_id=None, возвращает все ключи на сервере. """ try: if tg_id is not None: @@ -466,7 +699,8 @@ async def get_keys_by_server(tg_id: int | None, server_id: str, session: Any): """ SELECT * FROM keys - WHERE tg_id = $1 AND server_id = $2 + WHERE tg_id = $1 + AND server_id = $2 """, tg_id, server_id, @@ -492,7 +726,10 @@ async def get_keys_by_server(tg_id: int | None, server_id: str, session: Any): raise -async def get_key_by_server(tg_id: int, client_id: str, session: Any): +async def get_key_by_server(tg_id: int, client_id: str, session: asyncpg.Connection) -> Optional[asyncpg.Record]: + """ + Возвращает запись по конкретному ключу (tg_id + client_id). + """ query = """ SELECT tg_id, @@ -511,135 +748,125 @@ async def get_key_by_server(tg_id: int, client_id: str, session: Any): return record -async def get_balance(tg_id: int) -> float: +async def get_key_count(tg_id: int) -> int: """ - Получает баланс пользователя из базы данных. - - Args: - tg_id (int): Telegram ID пользователя - - Returns: - float: Баланс пользователя, 0.0 если баланс не найден - - Raises: - Exception: В случае ошибки при подключении к базе данных или выполнении запроса + Получает количество ключей для указанного пользователя. """ - conn = None + conn: asyncpg.Connection = None try: conn = await asyncpg.connect(DATABASE_URL) - balance = await conn.fetchval("SELECT balance FROM connections WHERE tg_id = $1", tg_id) - logger.info(f"Получен баланс для пользователя {tg_id}: {balance}") - return round(balance, 1) if balance is not None else 0.0 + count = await conn.fetchval( + """ + SELECT COUNT(*) + FROM keys + WHERE tg_id = $1 + """, + tg_id, + ) + logger.info(f"Получено количество ключей для пользователя {tg_id}: {count}") + return count if count is not None else 0 except Exception as e: - logger.error(f"Ошибка при получении баланса для пользователя {tg_id}: {e}") - return 0.0 + logger.error(f"Ошибка при получении количества ключей для пользователя {tg_id}: {e}") + return 0 finally: if conn: await conn.close() -async def update_balance(tg_id: int, amount: float, session: Any = None, is_admin: bool = False): +async def update_key_expiry(client_id: str, new_expiry_time: int, session: asyncpg.Connection) -> None: """ - Обновляет баланс пользователя в базе данных. - Кэшбек применяется только для положительных сумм, если пополнение НЕ через админку. - - Args: - tg_id (int): Telegram ID пользователя. - amount (float): Сумма для обновления баланса. - session (Any, optional): Сессия базы данных. Если не передана, создается новая. - is_admin (bool, optional): Флаг, указывающий, что пополнение идёт через админку. По умолчанию False. - Raises: - Exception: В случае ошибки при подключении к базе данных или обновлении баланса. + Обновление времени истечения ключа для указанного клиента. """ - conn = None try: - if session is None: - conn = await asyncpg.connect(DATABASE_URL) - session = conn - - extra = amount * (CASHBACK / 100.0) if (CASHBACK > 0 and amount > 0 and not is_admin) else 0 - total_amount = int(amount + extra) - - current_balance = await session.fetchval("SELECT balance FROM connections WHERE tg_id = $1", tg_id) - - if current_balance is None: - current_balance = 0 - - new_balance = int(current_balance) + total_amount - await session.execute( """ - UPDATE connections - SET balance = $1 - WHERE tg_id = $2 + UPDATE keys + SET expiry_time = $1, notified = FALSE, notified_24h = FALSE + WHERE client_id = $2 """, - new_balance, - tg_id, - ) - logger.info( - f"Баланс пользователя {tg_id} обновлен. Было: {int(current_balance)}, пополнение: {amount} " - f"({'+ кешбэк' if extra > 0 else 'без кешбэка'}), стало: {new_balance}" + new_expiry_time, + client_id, ) - - if not is_admin: - await handle_referral_on_balance_update(tg_id, int(amount)) - + logger.info(f"Успешно обновлено время истечения ключа для клиента {client_id}") except Exception as e: - logger.error(f"Ошибка при обновлении баланса для пользователя {tg_id}: {e}") + logger.error(f"Ошибка при обновлении времени истечения ключа для клиента {client_id}: {e}") raise - finally: - if conn is not None: - await conn.close() -async def get_trial(tg_id: int, session: Any) -> int: +async def delete_key(identifier: Union[int, str], session: asyncpg.Connection) -> None: """ - Получает статус триала для пользователя из базы данных. - - Args: - tg_id (int): Telegram ID пользователя - session (Any): Сессия базы данных - - Returns: - int: Статус триала (0 - не использован, 1 - использован) + Удаляет ключ из базы данных по client_id (str) или tg_id (int). """ try: - trial = await session.fetchval("SELECT trial FROM connections WHERE tg_id = $1", tg_id) - logger.info(f"Получен статус триала для пользователя {tg_id}: {trial}") - return trial if trial is not None else 0 - except Exception as e: - logger.error(f"Ошибка при получении статуса триала для пользователя {tg_id}: {e}") - return 0 + # Определяем, что передано: tg_id (число) или client_id (строка). + identifier_str = str(identifier) + if identifier_str.isdigit(): + query = "DELETE FROM keys WHERE tg_id = $1" + else: + query = "DELETE FROM keys WHERE client_id = $1" + await session.execute(query, identifier) + logger.info(f"Ключ с идентификатором {identifier} успешно удалён") + except Exception as e: + logger.error(f"Ошибка при удалении ключа с идентификатором {identifier} из базы данных: {e}") -async def get_key_count(tg_id: int) -> int: - """ - Получает количество ключей для указанного пользователя. - Args: - tg_id (int): Telegram ID пользователя +# --------------------- Gifts (Опционально) --------------------- # - Returns: - int: Количество ключей пользователя, 0 если ключей нет - Raises: - Exception: В случае ошибки при подключении к базе данных +async def store_gift_link( + gift_id: str, + sender_tg_id: int, + selected_months: int, + expiry_time: datetime, + gift_link: str, + session: asyncpg.Connection = None, +) -> bool: + """ + Добавляет информацию о подарке в базу данных. """ - conn = None + conn = session try: - conn = await asyncpg.connect(DATABASE_URL) - count = await conn.fetchval("SELECT COUNT(*) FROM keys WHERE tg_id = $1", tg_id) - logger.info(f"Получено количество ключей для пользователя {tg_id}: {count}") - return count if count is not None else 0 + if conn is None: + conn = await asyncpg.connect(DATABASE_URL) + + result = await conn.execute( + """ + INSERT INTO gifts ( + gift_id, sender_tg_id, recipient_tg_id, + selected_months, expiry_time, gift_link, + created_at, is_used + ) + VALUES ($1, $2, NULL, $3, $4, $5, $6, FALSE) + """, + gift_id, + sender_tg_id, + selected_months, + expiry_time, + gift_link, + datetime.utcnow(), + ) + if result: + logger.info(f"Подарок с ID {gift_id} успешно добавлен в базу данных.") + return True + else: + logger.error(f"Не удалось добавить подарок с ID {gift_id} в базу данных.") + return False except Exception as e: - logger.error(f"Ошибка при получении количества ключей для пользователя {tg_id}: {e}") - return 0 + logger.error(f"Ошибка при сохранении подарка с ID {gift_id} в базе данных: {e}") + return False finally: - if conn: + if conn is not None and session is None: await conn.close() -async def add_referral(referred_tg_id: int, referrer_tg_id: int, session: Any): +# --------------------- Referrals --------------------- # + + +async def add_referral(referred_tg_id: int, referrer_tg_id: int, session: asyncpg.Connection) -> None: + """ + Добавляет новую реферальную связь. + """ try: if referred_tg_id == referrer_tg_id: logger.warning(f"Пользователь {referred_tg_id} попытался использовать свою собственную реферальную ссылку.") @@ -659,20 +886,15 @@ async def add_referral(referred_tg_id: int, referrer_tg_id: int, session: Any): raise -async def handle_referral_on_balance_update(tg_id: int, amount: float): +async def handle_referral_on_balance_update(tg_id: int, amount: float, session: asyncpg.Connection = None) -> None: """ Обработка многоуровневой реферальной системы при обновлении баланса пользователя. - - Метод анализирует цепочку рефералов для указанного пользователя и начисляет - бонусы реферерам на разных уровнях согласно настроенным процентам. - - Args: - tg_id (int): Идентификатор Telegram пользователя, пополнившего баланс - amount (float): Сумма пополнения баланса """ - conn = None + conn = session try: - conn = await asyncpg.connect(DATABASE_URL) + if conn is None: + conn = await asyncpg.connect(DATABASE_URL) + logger.info(f"Начало обработки реферальной системы для пользователя {tg_id}") MAX_REFERRAL_LEVELS = len(REFERRAL_BONUS_PERCENTAGES.keys()) @@ -684,6 +906,7 @@ async def handle_referral_on_balance_update(tg_id: int, amount: float): current_tg_id = tg_id referral_chain = [] + # Формируем цепочку рефералов for level in range(1, MAX_REFERRAL_LEVELS + 1): if current_tg_id in visited_tg_ids: logger.warning(f"Обнаружен цикл в реферальной цепочке для пользователя {current_tg_id}. Прекращение.") @@ -705,7 +928,6 @@ async def handle_referral_on_balance_update(tg_id: int, amount: float): break referrer_tg_id = referral["referrer_tg_id"] - if referrer_tg_id in visited_tg_ids: logger.warning(f"Реферер {referrer_tg_id} уже обработан. Пропуск.") break @@ -713,6 +935,7 @@ async def handle_referral_on_balance_update(tg_id: int, amount: float): referral_chain.append({"tg_id": referrer_tg_id, "level": level}) current_tg_id = referrer_tg_id + # Начисляем бонусы for referral in referral_chain: referrer_tg_id = referral["tg_id"] level = referral["level"] @@ -723,29 +946,56 @@ async def handle_referral_on_balance_update(tg_id: int, amount: float): continue bonus = round(amount * bonus_percent, 2) - if bonus > 0: - logger.info(f"Начисление бонуса {bonus} рублей рефереру {referrer_tg_id} на уровне {level}.") - await update_balance(referrer_tg_id, bonus) + logger.info(f"Начисление бонуса {bonus} рефереру {referrer_tg_id} на уровне {level}.") + await update_balance(referrer_tg_id, bonus, conn, is_admin=True) except Exception as e: logger.error(f"Ошибка при обработке многоуровневой реферальной системы для {tg_id}: {e}") finally: - if conn: + if conn is not None and session is None: await conn.close() -async def get_referral_stats(referrer_tg_id: int): - conn = None +async def get_referral_by_referred_id(referred_tg_id: int, session: asyncpg.Connection) -> Optional[dict]: + """ + Получает информацию о реферале по ID приглашенного пользователя. + """ try: - conn = await asyncpg.connect(DATABASE_URL) - logger.info( - f"Установлено подключение к базе данных для получения статистики рефералов пользователя {referrer_tg_id}" + record = await session.fetchrow( + """ + SELECT * + FROM referrals + WHERE referred_tg_id = $1 + """, + referred_tg_id, ) + if record: + logger.info(f"Успешно получена информация о реферале для пользователя {referred_tg_id}") + return dict(record) + + logger.info(f"Реферал для пользователя {referred_tg_id} не найден") + return None + + except Exception as e: + logger.error(f"Ошибка при получении информации о реферале для пользователя {referred_tg_id}: {e}") + raise + + +async def get_referral_stats(referrer_tg_id: int) -> dict: + """ + Получение расширенной статистики по рефералам для заданного пользователя. + """ + conn: asyncpg.Connection = None + try: + conn = await asyncpg.connect(DATABASE_URL) + logger.info(f"Подключение для получения статистики рефералов пользователя {referrer_tg_id}") total_referrals = await conn.fetchval( """ - SELECT COUNT(*) FROM referrals WHERE referrer_tg_id = $1 + SELECT COUNT(*) + FROM referrals + WHERE referrer_tg_id = $1 """, referrer_tg_id, ) @@ -753,7 +1003,10 @@ async def get_referral_stats(referrer_tg_id: int): active_referrals = await conn.fetchval( """ - SELECT COUNT(*) FROM referrals WHERE referrer_tg_id = $1 AND reward_issued = TRUE + SELECT COUNT(*) + FROM referrals + WHERE referrer_tg_id = $1 + AND reward_issued = TRUE """, referrer_tg_id, ) @@ -761,6 +1014,7 @@ async def get_referral_stats(referrer_tg_id: int): MAX_REFERRAL_LEVELS = len(REFERRAL_BONUS_PERCENTAGES.keys()) + # Подсчёт количества рефералов по уровням referrals_by_level_records = await conn.fetch( f""" WITH RECURSIVE referral_levels AS ( @@ -795,6 +1049,11 @@ async def get_referral_stats(referrer_tg_id: int): } logger.debug(f"Получена статистика рефералов по уровням: {referrals_by_level}") + # Подсчёт суммы бонусов, заработанных с рефералов + case_lines = [] + for lvl, perc in REFERRAL_BONUS_PERCENTAGES.items(): + case_lines.append(f"WHEN rl.level = {lvl} THEN {perc}") + total_referral_bonus = await conn.fetchval( f""" WITH RECURSIVE referral_levels AS ( @@ -812,18 +1071,25 @@ async def get_referral_stats(referrer_tg_id: int): r.referrer_tg_id, rl.level + 1 FROM referrals r - JOIN referral_levels rl ON r.referrer_tg_id = rl.referred_tg_id + JOIN referral_levels rl + ON r.referrer_tg_id = rl.referred_tg_id WHERE rl.level < {MAX_REFERRAL_LEVELS} ) - SELECT - COALESCE(SUM(p.amount * ( - CASE - {" ".join([f"WHEN rl.level = {level} THEN {REFERRAL_BONUS_PERCENTAGES[level]}" for level in REFERRAL_BONUS_PERCENTAGES])} - ELSE 0 - END)), 0) AS total_bonus + SELECT COALESCE( + SUM( + p.amount * ( + CASE + {" ".join(case_lines)} + ELSE 0 + END + ) + ), + 0 + ) AS total_bonus FROM referral_levels rl JOIN payments p ON rl.referred_tg_id = p.tg_id - WHERE p.status = 'success' AND rl.level <= {MAX_REFERRAL_LEVELS} + WHERE p.status = 'success' + AND rl.level <= {MAX_REFERRAL_LEVELS} """, referrer_tg_id, ) @@ -846,178 +1112,151 @@ async def get_referral_stats(referrer_tg_id: int): logger.info("Закрытие подключения к базе данных") -async def update_key_expiry(client_id: str, new_expiry_time: int, session: Any): - """ - Обновление времени истечения ключа для указанного клиента. +# --------------------- Payments --------------------- # - Args: - client_id (str): Уникальный идентификатор клиента - new_expiry_time (int): Новое время истечения ключа - session (Any): Сессия подключения к базе данных - Raises: - Exception: В случае ошибки при подключении к базе данных или обновлении ключа +async def add_payment(tg_id: int, amount: float, payment_system: str) -> None: """ - try: - await session.execute( - """ - UPDATE keys - SET expiry_time = $1, notified = FALSE, notified_24h = FALSE - WHERE client_id = $2 - """, - new_expiry_time, - client_id, - ) - logger.info(f"Успешно обновлено время истечения ключа для клиента {client_id}") - - except Exception as e: - logger.error(f"Ошибка при обновлении времени истечения ключа для клиента {client_id}: {e}") - raise - - -async def add_balance_to_client(client_id: str, amount: float): - """ - Добавление баланса клиенту по его идентификатору Telegram. - - Args: - client_id (str): Идентификатор клиента в Telegram - amount (float): Сумма для пополнения баланса - - Raises: - Exception: В случае ошибки при подключении к базе данных или обновлении баланса + Добавляет информацию о платеже в базу данных. """ - conn = None + conn: asyncpg.Connection = None try: conn = await asyncpg.connect(DATABASE_URL) - logger.info(f"Установлено подключение к базе данных для пополнения баланса клиента {client_id}") + logger.info(f"Подключение к базе для добавления платежа пользователя {tg_id}") await conn.execute( """ - UPDATE connections - SET balance = balance + $1 - WHERE tg_id = $2 + INSERT INTO payments (tg_id, amount, payment_system, status) + VALUES ($1, $2, $3, 'success') """, + tg_id, amount, - client_id, + payment_system, ) - logger.info(f"Успешно пополнен баланс клиента {client_id} на сумму {amount}") - + logger.info(f"Успешно добавлен платеж для пользователя {tg_id} на сумму {amount}") except Exception as e: - logger.error(f"Ошибка при пополнении баланса для клиента {client_id}: {e}") + logger.error(f"Ошибка при добавлении платежа для пользователя {tg_id}: {e}") raise finally: if conn: await conn.close() - logger.info("Закрытие подключения к базе данных") + logger.info("Закрытие подключения к базе данных после добавления платежа") -async def get_client_id_by_email(email: str): +async def get_last_payments(tg_id: int, session: asyncpg.Connection) -> List[asyncpg.Record]: """ - Получение идентификатора клиента по электронной почте. - - Args: - email (str): Электронная почта клиента - - Returns: - str: Идентификатор клиента или None, если клиент не найден - - Raises: - Exception: В случае ошибки при подключении к базе данных или выполнении запроса + Получает последние 3 платежа пользователя. """ - conn = None try: - conn = await asyncpg.connect(DATABASE_URL) - logger.info(f"Установлено подключение к базе данных для поиска client_id по email: {email}") - - client_id = await conn.fetchval( + records = await session.fetch( """ - SELECT client_id FROM keys WHERE email = $1 - """, - email, + SELECT amount, payment_system, status, created_at + FROM payments + WHERE tg_id = $1 + ORDER BY created_at DESC + LIMIT 3 + """, + tg_id, ) + logger.info(f"Успешно получены последние платежи для пользователя {tg_id}") + return records + except Exception as e: + logger.error(f"Ошибка при получении последних платежей для пользователя {tg_id}: {e}") + raise - if client_id: - logger.info(f"Найден client_id для email: {email}") - else: - logger.warning(f"Не найден client_id для email: {email}") - return client_id +# --------------------- Notifications --------------------- # + +async def add_notification(tg_id: int, notification_type: str, session: asyncpg.Connection) -> None: + """ + Добавляет запись о notification в базу данных. + """ + try: + await session.execute( + """ + INSERT INTO notifications (tg_id, notification_type) + VALUES ($1, $2) + ON CONFLICT (tg_id, notification_type) + DO UPDATE SET last_notification_time = NOW() + """, + tg_id, + notification_type, + ) + logger.info(f"Успешно добавлено уведомление типа {notification_type} для пользователя {tg_id}") except Exception as e: - logger.error(f"Ошибка при получении client_id для email {email}: {e}") + logger.error(f"Ошибка при добавлении notification для пользователя {tg_id}: {e}") raise - finally: - if conn: - await conn.close() - logger.info("Закрытие подключения к базе данных") -async def get_tg_id_by_client_id(client_id: str): +async def check_notification_time( + tg_id: int, notification_type: str, hours: int = 12, session: asyncpg.Connection = None +) -> bool: """ - Получение Telegram ID по идентификатору клиента. - - Args: - client_id (str): Идентификатор клиента - - Returns: - int или None: Telegram ID клиента, если найден, иначе None - - Raises: - Exception: В случае ошибки при подключении к базе данных или выполнении запроса + Проверяет, прошло ли указанное количество часов с момента последнего уведомления. """ - conn = None + conn = session try: - conn = await asyncpg.connect(DATABASE_URL) - logger.info(f"Установлено подключение к базе данных для поиска Telegram ID по client_id: {client_id}") + if conn is None: + conn = await asyncpg.connect(DATABASE_URL) - result = await conn.fetchrow("SELECT tg_id FROM keys WHERE client_id = $1", client_id) + result = await conn.fetchval( + """ + SELECT + CASE + WHEN MAX(last_notification_time) IS NULL THEN TRUE + WHEN NOW() - MAX(last_notification_time) > ($1 * INTERVAL '1 hour') THEN TRUE + ELSE FALSE + END AS can_notify + FROM notifications + WHERE tg_id = $2 + AND notification_type = $3 + """, + hours, + tg_id, + notification_type, + ) - if result: - logger.info(f"Найден Telegram ID для client_id: {client_id}") - return result["tg_id"] - else: - logger.warning(f"Не найден Telegram ID для client_id: {client_id}") - return None + can_notify = result if result is not None else True + logger.info( + f"Проверка уведомления типа {notification_type} для пользователя {tg_id}: " + f"{'можно отправить' if can_notify else 'слишком рано'}" + ) + return can_notify except Exception as e: - logger.error(f"Ошибка при получении Telegram ID для client_id {client_id}: {e}") - raise + logger.error(f"Ошибка при проверке времени уведомления для пользователя {tg_id}: {e}") + return False finally: - if conn: + if conn is not None and session is None: await conn.close() - logger.info("Закрытие подключения к базе данных") + + +# --------------------- Users --------------------- # async def upsert_user( tg_id: int, - username: str = None, - first_name: str = None, - last_name: str = None, - language_code: str = None, + username: Optional[str] = None, + first_name: Optional[str] = None, + last_name: Optional[str] = None, + language_code: Optional[str] = None, is_bot: bool = False, -): +) -> None: """ Обновляет или вставляет информацию о пользователе в базу данных. - - Args: - tg_id (int): Идентификатор пользователя в Telegram - username (str, optional): Имя пользователя в Telegram - first_name (str, optional): Имя пользователя - last_name (str, optional): Фамилия пользователя - language_code (str, optional): Код языка пользователя - is_bot (bool, optional): Флаг, указывающий является ли пользователь ботом - - Raises: - Exception: В случае ошибки при работе с базой данных """ - conn = None + conn: asyncpg.Connection = None try: conn = await asyncpg.connect(DATABASE_URL) logger.info(f"Установлено подключение к базе данных для обновления пользователя {tg_id}") await conn.execute( """ - INSERT INTO users (tg_id, username, first_name, last_name, language_code, is_bot, created_at, updated_at) + INSERT INTO users ( + tg_id, username, first_name, last_name, + language_code, is_bot, created_at, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT (tg_id) DO UPDATE SET @@ -1045,242 +1284,156 @@ async def upsert_user( logger.info("Закрытие подключения к базе данных") -async def add_payment(tg_id: int, amount: float, payment_system: str): - """ - Добавляет информацию о платеже в базу данных. +# --------------------- Misc --------------------- # - Args: - tg_id (int): Идентификатор пользователя в Telegram - amount (float): Сумма платежа - payment_system (str): Система оплаты - Raises: - Exception: В случае ошибки при добавлении платежа +async def add_balance_to_client(client_id: str, amount: float) -> None: """ - conn = None + Добавление баланса клиенту по его идентификатору Telegram. + """ + conn: asyncpg.Connection = None try: conn = await asyncpg.connect(DATABASE_URL) - logger.info(f"Установлено подключение к базе данных для добавления платежа пользователя {tg_id}") + logger.info(f"Установлено подключение к базе данных для пополнения баланса клиента {client_id}") await conn.execute( """ - INSERT INTO payments (tg_id, amount, payment_system, status) - VALUES ($1, $2, $3, 'success') + UPDATE connections + SET balance = balance + $1 + WHERE tg_id = $2 """, - tg_id, amount, - payment_system, + client_id, ) - logger.info(f"Успешно добавлен платеж для пользователя {tg_id} на сумму {amount}") + logger.info(f"Успешно пополнен баланс клиента {client_id} на сумму {amount}") except Exception as e: - logger.error(f"Ошибка при добавлении платежа для пользователя {tg_id}: {e}") + logger.error(f"Ошибка при пополнении баланса для клиента {client_id}: {e}") raise finally: if conn: await conn.close() - logger.info("Закрытие подключения к базе данных после добавления платежа") + logger.info("Закрытие подключения к базе данных") -async def add_notification(tg_id: int, notification_type: str, session: Any): +async def get_client_id_by_email(email: str) -> Optional[str]: """ - Добавляет запись о notification в базу данных. - - Args: - tg_id (int): Идентификатор пользователя в Telegram - notification_type (str): Тип уведомления - session (Any): Сессия базы данных для выполнения запроса - - Raises: - Exception: В случае ошибки при добавлении notification + Получение идентификатора клиента по электронной почте. """ + conn: asyncpg.Connection = None try: - await session.execute( + conn = await asyncpg.connect(DATABASE_URL) + logger.info(f"Подключение к БД для поиска client_id по email: {email}") + + client_id = await conn.fetchval( """ - INSERT INTO notifications (tg_id, notification_type) - VALUES ($1, $2) - ON CONFLICT (tg_id, notification_type) - DO UPDATE SET last_notification_time = NOW() + SELECT client_id + FROM keys + WHERE email = $1 """, - tg_id, - notification_type, + email, ) - logger.info(f"Успешно добавлено уведомление типа {notification_type} для пользователя {tg_id}") + + if client_id: + logger.info(f"Найден client_id для email: {email}") + else: + logger.warning(f"Не найден client_id для email: {email}") + return client_id + except Exception as e: - logger.error(f"Ошибка при добавлении notification для пользователя {tg_id}: {e}") + logger.error(f"Ошибка при получении client_id для email {email}: {e}") raise + finally: + if conn: + await conn.close() -async def check_notification_time(tg_id: int, notification_type: str, hours: int = 12, session: Any = None) -> bool: +async def get_tg_id_by_client_id(client_id: str) -> Optional[int]: """ - Проверяет, прошло ли указанное количество часов с момента последнего уведомления. - - Args: - tg_id (int): Идентификатор пользователя в Telegram - notification_type (str): Тип уведомления - hours (int, optional): Количество часов для проверки. По умолчанию 12. - session (Any): Сессия базы данных для выполнения запроса - - Returns: - bool: True, если с момента последнего уведомления прошло больше указанного времени, иначе False - - Raises: - Exception: В случае ошибки при проверке времени уведомления + Получение Telegram ID по идентификатору клиента. """ - conn = None + conn: asyncpg.Connection = None try: - conn = session if session is not None else await asyncpg.connect(DATABASE_URL) + conn = await asyncpg.connect(DATABASE_URL) + logger.info(f"Подключение к БД для поиска Telegram ID по client_id: {client_id}") - result = await conn.fetchval( + result = await conn.fetchrow( """ - SELECT - CASE - WHEN MAX(last_notification_time) IS NULL THEN TRUE - WHEN NOW() - MAX(last_notification_time) > ($1 * INTERVAL '1 hour') THEN TRUE - ELSE FALSE - END AS can_notify - FROM notifications - WHERE tg_id = $2 AND notification_type = $3 + SELECT tg_id + FROM keys + WHERE client_id = $1 """, - hours, - tg_id, - notification_type, - ) - - can_notify = result if result is not None else True - - logger.info( - f"Проверка уведомления типа {notification_type} для пользователя {tg_id}: {'можно отправить' if can_notify else 'слишком рано'}" + client_id, ) - return can_notify - + if result: + logger.info(f"Найден Telegram ID для client_id: {client_id}") + return result["tg_id"] + else: + logger.warning(f"Не найден Telegram ID для client_id: {client_id}") + return None except Exception as e: - logger.error(f"Ошибка при проверке времени уведомления для пользователя {tg_id}: {e}") - return False - + logger.error(f"Ошибка при получении Telegram ID для client_id {client_id}: {e}") + raise finally: - if conn is not None and session is None: + if conn: await conn.close() -async def get_servers(session: Any = None): - conn = None +async def delete_user_data(session: asyncpg.Connection, tg_id: int) -> None: + """ + Удаляет информацию о пользователе из связанных таблиц. + """ try: - conn = session if session is not None else await asyncpg.connect(DATABASE_URL) - - result = await conn.fetch( - """ - SELECT cluster_name, server_name, api_url, subscription_url, inbound_id - FROM servers - """ - ) - servers = {} - for row in result: - cluster_name = row["cluster_name"] - if cluster_name not in servers: - servers[cluster_name] = [] - - servers[cluster_name].append( - { - "server_name": row["server_name"], - "api_url": row["api_url"], - "subscription_url": row["subscription_url"], - "inbound_id": row["inbound_id"], - } + # Удаляем подарки (если таблица gifts существует) + try: + await session.execute( + """ + DELETE FROM gifts + WHERE sender_tg_id = $1 + OR recipient_tg_id = $1 + """, + tg_id, ) + except Exception as e: + logger.warning(f"Версия без подарков или другая ошибка при удалении gifts для {tg_id}: {e}") - return servers - - finally: - if conn is not None and session is None: - await conn.close() - + await session.execute("DELETE FROM payments WHERE tg_id = $1", tg_id) + await session.execute("DELETE FROM users WHERE tg_id = $1", tg_id) + await session.execute("DELETE FROM connections WHERE tg_id = $1", tg_id) + await delete_key(tg_id, session) + await session.execute("DELETE FROM referrals WHERE referrer_tg_id = $1", tg_id) -async def delete_user_data(session: Any, tg_id: int): - try: - await session.execute("DELETE FROM gifts WHERE sender_tg_id = $1 OR recipient_tg_id = $1", tg_id) + logger.info(f"Все данные пользователя {tg_id} успешно удалены.") except Exception as e: - logger.warning(f"У Вас версия без подарков для {tg_id}: {e}") - await session.execute("DELETE FROM payments WHERE tg_id = $1", tg_id) - await session.execute("DELETE FROM users WHERE tg_id = $1", tg_id) - await session.execute("DELETE FROM connections WHERE tg_id = $1", tg_id) - await delete_key(tg_id, session) - await session.execute("DELETE FROM referrals WHERE referrer_tg_id = $1", tg_id) + logger.error(f"Ошибка при удалении данных пользователя {tg_id}: {e}") + raise -async def store_gift_link( - gift_id: str, - sender_tg_id: int, - selected_months: int, - expiry_time: datetime, - gift_link: str, - session: Any = None, -): +async def get_key_details(email: str, session: asyncpg.Connection) -> Optional[dict]: """ - Добавляет информацию о подарке в базу данных. - - Args: - gift_id (str): Уникальный идентификатор подарка - sender_tg_id (int): Идентификатор пользователя, который отправил подарок - selected_months (int): Количество месяцев подписки - expiry_time (datetime): Время окончания подписки - gift_link (str): Ссылка для активации подарка - session (Any): Сессия базы данных для выполнения запроса - - Returns: - bool: True, если информация о подарке успешно добавлена, иначе False - - Raises: - Exception: В случае ошибки при сохранении информации о подарке + Возвращает подробную информацию по ключу (и состоянию пользователя) по email. """ - conn = None - try: - conn = session if session is not None else await asyncpg.connect(DATABASE_URL) - - result = await conn.execute( - """ - INSERT INTO gifts (gift_id, sender_tg_id, recipient_tg_id, selected_months, expiry_time, gift_link, created_at, is_used) - VALUES ($1, $2, NULL, $3, $4, $5, $6, FALSE) - """, - gift_id, - sender_tg_id, - selected_months, - expiry_time, - gift_link, - datetime.utcnow(), - ) - - if result: - logger.info(f"Подарок с ID {gift_id} успешно добавлен в базу данных.") - return True - else: - logger.error(f"Не удалось добавить подарок с ID {gift_id} в базу данных.") - return False - except Exception as e: - logger.error(f"Ошибка при сохранении подарка с ID {gift_id} в базе данных: {e}") - return False - - finally: - if conn is not None and session is None: - await conn.close() - - -async def get_key_details(email, session): record = await session.fetchrow( """ - SELECT k.server_id, k.key, k.email, k.expiry_time, k.client_id, k.created_at, c.tg_id, c.balance + SELECT + k.server_id, + k.key, + k.email, + k.expiry_time, + k.client_id, + k.created_at, + c.tg_id, + c.balance FROM keys k JOIN connections c ON k.tg_id = c.tg_id WHERE k.email = $1 """, email, ) - if not record: return None cluster_name = record["server_id"] - moscow_tz = pytz.timezone("Europe/Moscow") expiry_date = datetime.fromtimestamp(record["expiry_time"] / 1000, tz=moscow_tz) current_date = datetime.now(moscow_tz) @@ -1309,293 +1462,14 @@ async def get_key_details(email, session): } -async def delete_key(identifier, session): - """ - Удаляет ключ из базы данных по client_id или tg_id - - Args: - identifier (int | str): client_id или tg_id для удаления - session: Сессия базы данных - - Raises: - Exception: В случае ошибки при удалении ключа - """ - try: - identifier_str = str(identifier) - - if identifier_str.isdigit(): - query = "DELETE FROM keys WHERE tg_id = $1" - else: - query = "DELETE FROM keys WHERE client_id = $1" - - await session.execute(query, identifier) - logger.info(f"Ключ с идентификатором {identifier} успешно удалён") - except Exception as e: - logger.error(f"Ошибка при удалении ключа с идентификатором {identifier} из базы данных: {e}") - - -async def create_server( - cluster_name: str, server_name: str, api_url: str, subscription_url: str, inbound_id: int, session: Any -): - """ - Добавляет новый сервер в базу данных. - - Args: - cluster_name (str): Название кластера - server_name (str): Название сервера - api_url (str): URL API сервера - subscription_url (str): URL подписки - inbound_id (int): ID входящего подключения - session (Any): Сессия базы данных - - Raises: - Exception: В случае ошибки при добавлении сервера - """ - try: - await session.execute( - """ - INSERT INTO servers (cluster_name, server_name, api_url, subscription_url, inbound_id) - VALUES ($1, $2, $3, $4, $5) - """, - cluster_name, - server_name, - api_url, - subscription_url, - inbound_id, - ) - logger.info(f"Сервер {server_name} успешно добавлен в кластер {cluster_name}") - except Exception as e: - logger.error(f"Ошибка при добавлении сервера {server_name} в кластер {cluster_name}: {e}") - raise - - -async def delete_server(server_name: str, session: Any): - """ - Удаляет сервер из базы данных по его названию. - - Args: - server_name (str): Название сервера для удаления - session (Any): Сессия базы данных - - Raises: - Exception: В случае ошибки при удалении сервера - """ - try: - await session.execute( - """ - DELETE FROM servers WHERE server_name = $1 - """, - server_name, - ) - logger.info(f"Сервер {server_name} успешно удалён из базы данных") - except Exception as e: - logger.error(f"Ошибка при удалении сервера {server_name} из базы данных: {e}") - raise - - -async def create_coupon_usage(coupon_id: int, user_id: int, session: Any): - """ - Создаёт запись об использовании купона в базе данных. - - Args: - coupon_id (int): ID купона - user_id (int): ID пользователя - session (Any): Сессия базы данных - - Raises: - Exception: В случае ошибки при создании записи - """ - try: - await session.execute( - """ - INSERT INTO coupon_usages (coupon_id, user_id, used_at) - VALUES ($1, $2, $3) - """, - coupon_id, - user_id, - datetime.utcnow(), - ) - logger.info(f"Создана запись об использовании купона {coupon_id} пользователем {user_id}") - except Exception as e: - logger.error(f"Ошибка при создании записи об использовании купона {coupon_id} пользователем {user_id}: {e}") - raise - - -async def check_coupon_usage(coupon_id: int, user_id: int, session: Any) -> bool: - """ - Проверяет, использовал ли пользователь данный купон. - - Args: - coupon_id (int): ID купона для проверки - user_id (int): ID пользователя для проверки - session (Any): Сессия базы данных - - Returns: - bool: True если купон уже использован, False если нет - - Raises: - Exception: В случае ошибки при выполнении запроса - """ - try: - result = await session.fetchrow( - """ - SELECT 1 FROM coupon_usages WHERE coupon_id = $1 AND user_id = $2 - """, - coupon_id, - user_id, - ) - return result is not None - except Exception as e: - logger.error(f"Ошибка при проверке использования купона {coupon_id} пользователем {user_id}: {e}") - raise - - -async def update_coupon_usage_count(coupon_id: int, session: Any): - """ - Обновляет счетчик использования купона и его статус. - - Args: - coupon_id (int): ID купона для обновления - session (Any): Сессия базы данных - - Raises: - Exception: В случае ошибки при обновлении данных купона - """ - try: - await session.execute( - """ - UPDATE coupons - SET usage_count = usage_count + 1, - is_used = CASE WHEN usage_count + 1 >= usage_limit AND usage_limit > 0 THEN TRUE ELSE FALSE END - WHERE id = $1 - """, - coupon_id, - ) - logger.info(f"Успешно обновлен счетчик использования купона {coupon_id}") - except Exception as e: - logger.error(f"Ошибка при обновлении счетчика использования купона {coupon_id}: {e}") - raise - - -async def get_last_payments(tg_id: int, session: Any): - """ - Получает последние 3 платежа пользователя. - - Args: - tg_id (int): Telegram ID пользователя - session (Any): Сессия базы данных - - Returns: - list: Список последних платежей пользователя - - Raises: - Exception: В случае ошибки при выполнении запроса - """ - try: - records = await session.fetch( - """ - SELECT amount, payment_system, status, created_at - FROM payments - WHERE tg_id = $1 - ORDER BY created_at DESC - LIMIT 3 - """, - tg_id, - ) - logger.info(f"Успешно получены последние платежи для пользователя {tg_id}") - return records - except Exception as e: - logger.error(f"Ошибка при получении последних платежей для пользователя {tg_id}: {e}") - raise - - -async def get_coupon_details(coupon_id: str, session: Any): - """ - Получает детали купона по его ID. - - Args: - coupon_id (str): ID купона - session (Any): Сессия базы данных - - Returns: - dict: Словарь с деталями купона или None если купон не найден - - Raises: - Exception: В случае ошибки при выполнении запроса - """ - try: - record = await session.fetchrow( - """ - SELECT id, code, discount, usage_count, usage_limit, is_used - FROM coupons - WHERE id = $1 - """, - coupon_id, - ) - - if record: - logger.info(f"Успешно получены детали купона {coupon_id}") - return dict(record) - - logger.warning(f"Купон {coupon_id} не найден") - return None - - except Exception as e: - logger.error(f"Ошибка при получении деталей купона {coupon_id}: {e}") - raise - - -async def get_referral_by_referred_id(referred_tg_id: int, session: Any): - """ - Получает информацию о реферале по ID приглашенного пользователя. - - Args: - referred_tg_id (int): ID приглашенного пользователя - session (Any): Сессия базы данных - - Returns: - dict: Словарь с информацией о реферале или None если не найден - - Raises: - Exception: В случае ошибки при выполнении запроса - """ - try: - record = await session.fetchrow( - """ - SELECT * FROM referrals - WHERE referred_tg_id = $1 - """, - referred_tg_id, - ) - - if record: - logger.info(f"Успешно получена информация о реферале для пользователя {referred_tg_id}") - return dict(record) - - logger.info(f"Реферал для пользователя {referred_tg_id} не найден") - return None - - except Exception as e: - logger.error(f"Ошибка при получении информации о реферале для пользователя {referred_tg_id}: {e}") - raise - - -async def get_all_keys(session: Any = None): +async def get_all_keys(session: asyncpg.Connection = None) -> List[asyncpg.Record]: """ Получает все записи из таблицы keys. - - Args: - session (Any, optional): Сессия базы данных. По умолчанию None. - - Returns: - list: Список всех записей из таблицы keys - - Raises: - Exception: В случае ошибки при выполнении запроса """ - conn = None + conn = session try: - conn = session if session is not None else await asyncpg.connect(DATABASE_URL) + if conn is None: + conn = await asyncpg.connect(DATABASE_URL) keys = await conn.fetch("SELECT * FROM keys") logger.info(f"Успешно получены все записи из таблицы keys. Количество: {len(keys)}") return keys diff --git a/handlers/notifications.py b/handlers/notifications.py index 6f53c5d8..a013d92d 100644 --- a/handlers/notifications.py +++ b/handlers/notifications.py @@ -40,8 +40,6 @@ from handlers.texts import KEY_EXPIRY_10H, KEY_EXPIRY_24H, KEY_RENEWED from logger import logger -from .utils import format_time_until_deletion - router = Router() @@ -64,6 +62,11 @@ async def periodic_expired_keys_check(bot: Bot): async def notify_expiring_keys(bot: Bot): + """ + Запускается периодически (раз в час или сколько у вас стоит) + для уведомлений за 24ч, 10ч, а также для «пробных» пользователей, + которые ещё не активировали ключ. + """ conn = None try: conn = await asyncpg.connect(DATABASE_URL) @@ -90,16 +93,19 @@ async def notify_expiring_keys(bot: Bot): logger.info("Соединение с базой данных закрыто.") -async def notify_10h_keys( - bot: Bot, - conn: asyncpg.Connection, - current_time: float, - threshold_time_10h: float, -): +async def notify_10h_keys(bot: Bot, conn: asyncpg.Connection, current_time: float, threshold_time_10h: float): + """ + Выбираем все ключи, у которых срок <= threshold_time_10h (то есть меньше 10 часов осталось), + но > current_time (ещё не истекли полностью), + и при этом notified = FALSE (ещё не уведомлялись за 10ч). + """ records = await conn.fetch( """ - SELECT tg_id, email, expiry_time, client_id, server_id FROM keys - WHERE expiry_time <= $1 AND expiry_time > $2 AND notified = FALSE + SELECT tg_id, email, expiry_time, client_id, server_id + FROM keys + WHERE expiry_time <= $1 + AND expiry_time > $2 + AND notified = FALSE """, threshold_time_10h, current_time, @@ -114,23 +120,26 @@ async def notify_10h_keys( async def process_10h_record(record, bot, conn): + """ + Логика уведомления, если осталось ~10ч. + Если AUTO_RENEW_KEYS включён и хватает баланса — продлеваем автоматически. + Если нет — просто шлём уведомление. + """ tg_id = record["tg_id"] email = record["email"] expiry_time = record["expiry_time"] + client_id = record["client_id"] moscow_tz = pytz.timezone("Europe/Moscow") - expiry_date = datetime.fromtimestamp(expiry_time / 1000, tz=moscow_tz) current_date = datetime.now(moscow_tz) time_left = expiry_date - current_date - days_left_message = ( - "Ключ истек" - if time_left.total_seconds() <= 0 - else f"{time_left.days}" - if time_left.days > 0 - else f"{time_left.seconds // 3600}" - ) + # Простой вывод, сколько осталось часов или дней + if time_left.total_seconds() <= 0: + days_left_message = "Ключ истек" + else: + days_left_message = f"{time_left.days}" if time_left.days > 0 else f"{time_left.seconds // 3600}" message = KEY_EXPIRY_10H.format( email=email, @@ -143,17 +152,34 @@ async def process_10h_record(record, bot, conn): if AUTO_RENEW_KEYS and balance >= RENEWAL_PLANS["1"]["price"]: try: + # Списываем баланс и продлеваем await update_balance(tg_id, -RENEWAL_PLANS["1"]["price"], conn) new_expiry_time = int((datetime.utcnow() + timedelta(days=30)).timestamp() * 1000) - await update_key_expiry(record["client_id"], new_expiry_time, conn) + # Обновляем expiry в БД + await update_key_expiry(client_id, new_expiry_time, conn) + + # Продлеваем на всех кластерах servers = await get_servers(conn) for cluster_id in servers: - await renew_key_in_cluster(cluster_id, email, record["client_id"], new_expiry_time, TOTAL_GB) + await renew_key_in_cluster(cluster_id, email, client_id, new_expiry_time, TOTAL_GB) logger.info(f"Ключ для пользователя {tg_id} успешно продлен в кластере {cluster_id}.") - await conn.execute("UPDATE keys SET notified = TRUE WHERE client_id = $1", record["client_id"]) + # После УСПЕШНОГО продления сбрасываем оба флага уведомлений, + # чтобы через ~24ч и 10ч до НОВОГО истечения пользователь получил уведомления заново. + await conn.execute( + """ + UPDATE keys + SET notified = FALSE, + notified_24h = FALSE, + expiry_time = $2 + WHERE client_id = $1 + """, + client_id, + new_expiry_time, + ) + # Шлём уведомление об успешном продлении image_path = os.path.join("img", "notify_10h.jpg") keyboard = types.InlineKeyboardMarkup( inline_keyboard=[[types.InlineKeyboardButton(text="👤 Личный кабинет", callback_data="profile")]] @@ -174,23 +200,36 @@ async def process_10h_record(record, bot, conn): logger.info(f"Уведомление об успешном продлении отправлено клиенту {tg_id}.") except Exception as e: - logger.error(f"Ошибка при продлении подписки для клиента {tg_id}: {e}") + logger.error(f"Ошибка при автопродлении подписки (10h) для клиента {tg_id}: {e}") else: - await send_renewal_notification(bot, tg_id, email, message, conn, record["client_id"], "notified") + # Если автопродления нет или не хватает баланса, отправляем уведомление + # и ПРИНУДИТЕЛЬНО ставим notified = TRUE (даже если бот заблокирован), + # чтобы не спамить каждый час. + await send_renewal_notification( + bot=bot, + tg_id=tg_id, + email=email, + message=message, + conn=conn, + client_id=client_id, + flag="notified", # уведомление за 10 часов + image_name="notify_10h.jpg", # чтобы отличать картинки для 10h и 24h + ) -async def notify_24h_keys( - bot: Bot, - conn: asyncpg.Connection, - current_time: float, - threshold_time_24h: float, -): - logger.info("Проверка истекших ключей...") +async def notify_24h_keys(bot: Bot, conn: asyncpg.Connection, current_time: float, threshold_time_24h: float): + """ + Аналогично notify_10h_keys, но за 24 часа. + """ + logger.info("Проверка для уведомлений за 24 часа...") records_24h = await conn.fetch( """ - SELECT tg_id, email, expiry_time, client_id, server_id FROM keys - WHERE expiry_time <= $1 AND expiry_time > $2 AND notified_24h = FALSE + SELECT tg_id, email, expiry_time, client_id, server_id + FROM keys + WHERE expiry_time <= $1 + AND expiry_time > $2 + AND notified_24h = FALSE """, threshold_time_24h, current_time, @@ -215,13 +254,10 @@ async def process_24h_record(record, bot, conn): current_date = datetime.now(moscow_tz) time_left = expiry_date - current_date - days_left_message = ( - "Ключ истек" - if time_left.total_seconds() <= 0 - else f"{time_left.days}" - if time_left.days > 0 - else f"{time_left.seconds // 3600}" - ) + if time_left.total_seconds() <= 0: + days_left_message = "Ключ истек" + else: + days_left_message = f"{time_left.days}" if time_left.days > 0 else f"{time_left.seconds // 3600}" message_24h = KEY_EXPIRY_24H.format( email=email, @@ -233,18 +269,31 @@ async def process_24h_record(record, bot, conn): if AUTO_RENEW_KEYS and balance >= RENEWAL_PLANS["1"]["price"]: try: + # Автопродление await update_balance(tg_id, -RENEWAL_PLANS["1"]["price"], conn) new_expiry_time = int((datetime.utcnow() + timedelta(days=30)).timestamp() * 1000) - await update_key_expiry(record["client_id"], new_expiry_time, conn) + + await update_key_expiry(client_id, new_expiry_time, conn) servers = await get_servers(conn) for cluster_id in servers: - await renew_key_in_cluster(cluster_id, email, record["client_id"], new_expiry_time, TOTAL_GB) + await renew_key_in_cluster(cluster_id, email, client_id, new_expiry_time, TOTAL_GB) logger.info(f"Ключ для пользователя {tg_id} успешно продлен в кластере {cluster_id}.") - except Exception as e: - logger.error(f"Ошибка при отправке уведомления пользователю {tg_id}: {e}") + # Сбрасываем уведомления к новому сроку + await conn.execute( + """ + UPDATE keys + SET notified = FALSE, + notified_24h = FALSE, + expiry_time = $2 + WHERE client_id = $1 + """, + client_id, + new_expiry_time, + ) + # Отправляем сообщение об успешном продлении image_path = os.path.join("img", "notify_24h.jpg") keyboard = types.InlineKeyboardMarkup( inline_keyboard=[[types.InlineKeyboardButton(text="👤 Личный кабинет", callback_data="profile")]] @@ -262,57 +311,95 @@ async def process_24h_record(record, bot, conn): else: await bot.send_message(tg_id, text=KEY_RENEWED.format(email=email), reply_markup=keyboard) - logger.info(f"Уведомление об успешном продлении отправлено клиенту {tg_id}.") + logger.info(f"Уведомление об успешном продлении (24h) отправлено клиенту {tg_id}.") except Exception as e: - logger.error(f"Ошибка при продлении подписки для клиента {tg_id}: {e}") + logger.error(f"Ошибка при автопродлении подписки (24h) для клиента {tg_id}: {e}") + else: - await send_renewal_notification(bot, tg_id, email, message_24h, conn, client_id, "notified_24h") + # Если автопродление отключено или нет денег + await send_renewal_notification( + bot=bot, + tg_id=tg_id, + email=email, + message=message_24h, + conn=conn, + client_id=client_id, + flag="notified_24h", + image_name="notify_24h.jpg", + ) -async def send_renewal_notification(bot, tg_id, email, message, conn, client_id, flag): +async def send_renewal_notification( + bot: Bot, + tg_id: int, + email: str, + message: str, + conn: asyncpg.Connection, + client_id: str, + flag: str, + image_name: str = "notify_24h.jpg", +): + """ + Общий метод отправки уведомлений: при 10ч или 24ч. + Обязательное условие — в конце, даже при ошибке, ставим флаг «уже уведомлён», + чтобы не спамить. + """ try: keyboard = InlineKeyboardBuilder() keyboard.row(types.InlineKeyboardButton(text="🔄 Продлить VPN", callback_data=f"renew_key|{email}")) keyboard.row(types.InlineKeyboardButton(text="💳 Пополнить баланс", callback_data="pay")) keyboard.row(types.InlineKeyboardButton(text="👤 Личный кабинет", callback_data="profile")) - image_path = os.path.join("img", "notify_24h.jpg") + image_path = os.path.join("img", image_name) if os.path.isfile(image_path): async with aiofiles.open(image_path, "rb") as image_file: image_data = await image_file.read() await bot.send_photo( tg_id, - photo=BufferedInputFile(image_data, filename="notify_24h.jpg"), + photo=BufferedInputFile(image_data, filename=image_name), caption=message, reply_markup=keyboard.as_markup(), ) else: await bot.send_message(tg_id, text=message, reply_markup=keyboard.as_markup()) - logger.info(f"Уведомление отправлено пользователю {tg_id}.") + logger.info(f"Уведомление ({flag}) отправлено пользователю {tg_id}.") + except TelegramForbiddenError: + logger.warning(f"Бот заблокирован пользователем {tg_id}. Добавляем в blocked_users.") + await create_blocked_user(tg_id, conn) + except Exception as e: + logger.error(f"Ошибка при отправке уведомления (flag={flag}) пользователю {tg_id}: {e}") + + finally: + # В ЛЮБОМ случае (даже при ошибке) ставим флаг, что уведомление уже было. + # Иначе будем слать каждый час. if flag == "notified_24h": await conn.execute("UPDATE keys SET notified_24h = TRUE WHERE client_id = $1", client_id) elif flag == "notified": await conn.execute("UPDATE keys SET notified = TRUE WHERE client_id = $1", client_id) else: logger.warning(f"Неизвестный флаг обновления уведомления: {flag}") - except Exception as e: - logger.error(f"Ошибка при отправке уведомления пользователю {tg_id}: {e}") async def notify_inactive_trial_users(bot: Bot, conn: asyncpg.Connection): + """ + Уведомления пользователям, которые завели бота, но так и не создали пробный ключ. + По логике — если прошли сутки, отправляем напоминание активировать триал. + """ logger.info("Проверка пользователей, не активировавших пробный период...") inactive_trial_users = await conn.fetch( """ - SELECT tg_id, username, first_name, last_name FROM users + SELECT tg_id, username, first_name, last_name + FROM users WHERE tg_id IN ( SELECT tg_id FROM connections WHERE trial = 0 - ) AND tg_id NOT IN ( + ) + AND tg_id NOT IN ( SELECT DISTINCT tg_id FROM keys ) """ @@ -321,15 +408,15 @@ async def notify_inactive_trial_users(bot: Bot, conn: asyncpg.Connection): for user in inactive_trial_users: tg_id = user["tg_id"] - username = user["username"] first_name = user["first_name"] last_name = user["last_name"] + display_name = username or first_name or last_name or "Пользователь" try: + # Проверяем, что прошли > 24 часа с последнего уведомления, чтобы не слать каждый день. can_notify = await check_notification_time(tg_id, "inactive_trial", hours=24, session=conn) - if can_notify: builder = InlineKeyboardBuilder() builder.row( @@ -366,14 +453,23 @@ async def notify_inactive_trial_users(bot: Bot, conn: asyncpg.Connection): async def handle_expired_keys(bot: Bot, conn: asyncpg.Connection, current_time: float): - logger.info("Проверка подписок, срок действия которых скоро истекает...") + """ + Обрабатываем ключи, которые уже истекли: + если включено автопродление и у пользователя достаточно баланса, + продлеваем. Иначе, ждём какое-то время (DELETE_KEYS_DELAY), + после чего удаляем ключ. + """ + logger.info("Проверка подписок, срок действия которых скоро истекает или уже истек.") threshold_time = int((datetime.utcnow() + timedelta(seconds=EXPIRED_KEYS_CHECK_INTERVAL * 1.5)).timestamp() * 1000) + # Ключи, которые вот-вот истекут (в течение INTERVAL) expiring_keys = await conn.fetch( """ - SELECT tg_id, client_id, expiry_time, email, server_id FROM keys - WHERE expiry_time <= $1 AND expiry_time > $2 + SELECT tg_id, client_id, expiry_time, email, server_id + FROM keys + WHERE expiry_time <= $1 + AND expiry_time > $2 """, threshold_time, current_time, @@ -383,8 +479,10 @@ async def handle_expired_keys(bot: Bot, conn: asyncpg.Connection, current_time: for record in expiring_keys: await process_key(record, bot, conn, current_time) + # Ключи, которые уже истекли expired_keys_query = """ - SELECT tg_id, client_id, email, server_id, expiry_time FROM keys + SELECT tg_id, client_id, email, server_id, expiry_time + FROM keys WHERE expiry_time <= $1 """ params = (current_time,) @@ -400,8 +498,10 @@ async def handle_expired_keys(bot: Bot, conn: asyncpg.Connection, current_time: time_since_expiry = current_time_utc - expiry_time_value if AUTO_RENEW_KEYS and balance >= RENEWAL_PLANS["1"]["price"]: + # Автопродление, если есть деньги await process_key(record, bot, conn, current_time, renew=True) else: + # Нет автопродления — удаляем по прошествии DELETE_KEYS_DELAY await process_key(record, bot, conn, current_time) if time_since_expiry >= DELETE_KEYS_DELAY * 1000: await delete_key_from_cluster( @@ -413,8 +513,8 @@ async def handle_expired_keys(bot: Bot, conn: asyncpg.Connection, current_time: message = ( f"🔔 Уведомление:\n\n" f"📅 Ваша подписка: {record['email']} была удалена из-за истечения срока действия.\n\n" - f"⏳ Чтобы продолжить использовать наши услуги, пожалуйста, создайте новую подписку.\n\n" - f"💬 Если у вас возникли вопросы, не стесняйтесь обращаться в поддержку!" + f"⏳ Чтобы продолжить пользоваться VPN, создайте новую подписку.\n\n" + f"💬 Если возникли вопросы, обратитесь в поддержку!" ) keyboard = InlineKeyboardBuilder() @@ -441,7 +541,7 @@ async def handle_expired_keys(bot: Bot, conn: asyncpg.Connection, current_time: else: remaining_time = (DELETE_KEYS_DELAY * 1000 - time_since_expiry) // 1000 logger.info( - f"Подписка {record['client_id']} не удалена. Осталось времени до удаления: {remaining_time} сек. (Удаление через {DELETE_KEYS_DELAY} сек после истечения)" + f"Подписка {record['client_id']} не удалена. Осталось времени до удаления: {remaining_time} сек." ) except TelegramForbiddenError: @@ -451,6 +551,11 @@ async def handle_expired_keys(bot: Bot, conn: asyncpg.Connection, current_time: async def process_key(record, bot, conn, current_time, renew=False): + """ + Общая обработка подписки при истечении: + - renew=True => автопродлить, если достаточно денег + - renew=False => просто уведомить/подготовить к удалению + """ tg_id = record["tg_id"] client_id = record["client_id"] email = record["email"] @@ -462,7 +567,7 @@ async def process_key(record, bot, conn, current_time, renew=False): current_date = datetime.now(moscow_tz) logger.info( - f"Время истечения подписки: {expiry_time_value} (МСК: {expiry_date}), Текущее время (МСК): {current_date}" + f"Время истечения подписки: {expiry_time_value} (МСК: {expiry_date}), текущее время (МСК): {current_date}" ) current_time_utc = int(datetime.utcnow().timestamp() * 1000) @@ -470,41 +575,52 @@ async def process_key(record, bot, conn, current_time, renew=False): try: if not renew: + # Если не пытаемся автопродлить if current_time_utc >= expiry_time_value: - if time_since_expiry <= DELETE_KEYS_DELAY * 500: + # Ключ уже истёк / Баг DELETE_KEYS_DELAY * 500 исправлено на DELETE_KEYS_DELAY * 1000 + if time_since_expiry <= DELETE_KEYS_DELAY * 1000: message = ( f"🔔 Уведомление:\n\n" - f"📅 Ваша подписка: {record['email']} истекла. Пополните баланс для продления.\n\n" + f"📅 Ваша подписка {email} истекла. Пополните баланс для продления.\n\n" ) remaining_time = (expiry_time_value + DELETE_KEYS_DELAY * 1000) - current_time_utc - if remaining_time > 0: - message += ( - f"⏳ Подписка будет удалена через {format_time_until_deletion(remaining_time // 1000)}." - ) + message += f"⏳ Подписка будет удалена через ~{remaining_time // 1000} секунд." await send_notification(bot, tg_id, message, "notify_expired.jpg", email) + else: + # Уже прошло больше DELETE_KEYS_DELAY, значит удалим чуть выше в коде handle_expired_keys + pass else: - if (expiry_time_value - current_time_utc) <= (EXPIRED_KEYS_CHECK_INTERVAL * 1000): - await send_notification( - bot, - tg_id, - f"Ваша подписка {email} скоро истечет. Пополните баланс для продления.", - "notify_expiring.jpg", - email, - ) + # Ключ ещё не истёк, но до конца <= EXPIRED_KEYS_CHECK_INTERVAL? + # Можно отправить предупреждение, если хотите + pass elif renew and AUTO_RENEW_KEYS and balance >= RENEWAL_PLANS["1"]["price"]: + # Логика автопродления await update_balance(tg_id, -RENEWAL_PLANS["1"]["price"], conn) new_expiry_time = int((datetime.now(moscow_tz) + timedelta(days=30)).timestamp() * 1000) await update_key_expiry(client_id, new_expiry_time, conn) - servers = await get_servers(conn) + servers = await get_servers(conn) for cluster_id in servers: await renew_key_in_cluster(cluster_id, email, client_id, new_expiry_time, TOTAL_GB) logger.info(f"Подписка {tg_id} продлена в кластере {cluster_id}.") + # После продления сбрасываем флаги + await conn.execute( + """ + UPDATE keys + SET notified = FALSE, + notified_24h = FALSE, + expiry_time = $2 + WHERE client_id = $1 + """, + client_id, + new_expiry_time, + ) + try: image_path = os.path.join("img", "notify_expired.jpg") caption = KEY_RENEWED.format(email=email) @@ -530,19 +646,22 @@ async def process_key(record, bot, conn, current_time, renew=False): async def send_notification(bot, tg_id, message, image_name, email): + """ + Уведомление о том, что подписка истекла или скоро удалится. + """ keyboard = InlineKeyboardBuilder() if DELETE_KEYS_DELAY > 0: keyboard.row(types.InlineKeyboardButton(text="🔄 Продлить", callback_data=f"renew_key|{email}")) keyboard.row(types.InlineKeyboardButton(text="👤 Личный кабинет", callback_data="profile")) - image_path = os.path.join("img", "notify_expired.jpg") + image_path = os.path.join("img", image_name) try: if os.path.isfile(image_path): async with aiofiles.open(image_path, "rb") as f: await bot.send_photo( tg_id, - photo=BufferedInputFile(await f.read(), filename="notify_expired.jpg"), + photo=BufferedInputFile(await f.read(), filename=image_name), caption=message, reply_markup=keyboard.as_markup(), ) @@ -551,3 +670,5 @@ async def send_notification(bot, tg_id, message, image_name, email): except TelegramForbiddenError: logger.warning(f"Пользователь {tg_id} заблокировал бота") + except Exception as e: + logger.error(f"Ошибка при отправке уведомления пользователю {tg_id}: {e}") diff --git a/handlers/profile.py b/handlers/profile.py index 57c9cc0c..f2669492 100644 --- a/handlers/profile.py +++ b/handlers/profile.py @@ -196,10 +196,7 @@ async def invite_handler(callback_query: CallbackQuery): image_path = os.path.join("img", "pic_invite.jpg") builder = InlineKeyboardBuilder() - builder.button( - text="📢 Поделиться", - switch_inline_query=invite_text - ) + builder.button(text="📢 Поделиться", switch_inline_query=invite_text) builder.button(text="👤 Личный кабинет", callback_data="profile") builder.adjust(1) @@ -215,4 +212,4 @@ async def invite_handler(callback_query: CallbackQuery): await callback_query.message.answer( text=invite_message, reply_markup=builder.as_markup(), - ) \ No newline at end of file + )