diff --git a/app/jobs/0_start_xray.py b/app/jobs/0_start_xray.py index 1634efbea..e4c459d02 100644 --- a/app/jobs/0_start_xray.py +++ b/app/jobs/0_start_xray.py @@ -1,37 +1,12 @@ -import time - -import sqlalchemy - from app import app, xray -from app.db import GetDB, User, engine, get_users -from app.models.user import UserStatus -from app.utils.xray import xray_add_user - - -@xray.core.on_start -def add_users_from_db(): - if sqlalchemy.inspect(engine).has_table(User.__tablename__): - with GetDB() as db: - - # to prevent ConnectionError while adding users - tries = 0 - while True: - if tries == 5: - return xray.core.restart() - try: - xray.api.get_sys_stats() - break - except xray.exc.ConnectionError: - time.sleep(2) - tries += 1 - - for user in get_users(db, status=UserStatus.active): - xray_add_user(user) +from app.utils.xray import xray_config_include_db_clients @app.on_event("startup") def app_startup(): - xray.core.start() + xray.core.start( + xray_config_include_db_clients(xray.config) + ) @app.on_event("shutdown") diff --git a/app/jobs/record_usages.py b/app/jobs/record_usages.py index 9068a0fc9..c7618b3ae 100644 --- a/app/jobs/record_usages.py +++ b/app/jobs/record_usages.py @@ -4,6 +4,7 @@ from app.db import engine from app.db.models import System, User from sqlalchemy import bindparam, update +from app.utils.xray import xray_config_include_db_clients def record_users_usage(): @@ -16,7 +17,9 @@ def record_users_usage(): except xray.exceptions.ConnectionError: try: - xray.core.restart() + xray.core.restart( + xray_config_include_db_clients(xray.config) + ) except ProcessLookupError: pass @@ -45,7 +48,9 @@ def record_outbounds_usage(): except xray.exceptions.ConnectionError: try: - xray.core.restart() + xray.core.restart( + xray_config_include_db_clients(xray.config) + ) except ProcessLookupError: pass diff --git a/app/jobs/review_users.py b/app/jobs/review_users.py index 422194704..9770c78a2 100644 --- a/app/jobs/review_users.py +++ b/app/jobs/review_users.py @@ -4,6 +4,7 @@ from app import logger, scheduler, telegram, xray from app.db import GetDB, get_users, update_user_status from app.models.user import UserStatus +from app.utils.xray import xray_config_include_db_clients def review(): @@ -29,7 +30,9 @@ def review(): except xray.exceptions.ConnectionError: try: - xray.core.restart() + xray.core.restart( + xray_config_include_db_clients(xray.config) + ) except ProcessLookupError: pass diff --git a/app/telegram/admin.py b/app/telegram/admin.py index eb52d84d0..f15ffd3c8 100644 --- a/app/telegram/admin.py +++ b/app/telegram/admin.py @@ -1,8 +1,9 @@ import math import re from datetime import datetime -from dateutil.relativedelta import relativedelta + import sqlalchemy +from dateutil.relativedelta import relativedelta from telebot import types from telebot.custom_filters import AdvancedCustomFilter from telebot.util import user_link @@ -14,10 +15,10 @@ from app.telegram.keyboard import BotKeyboard from app.utils.store import MemoryStorage from app.utils.system import cpu_usage, memory_usage, readable_size -from app.utils.xray import xray_add_user, xray_remove_user +from app.utils.xray import (xray_add_user, xray_config_include_db_clients, + xray_remove_user) from config import TELEGRAM_ADMIN_ID - mem_store = MemoryStorage() @@ -277,8 +278,7 @@ def add_user_data_limit_step(message: types.Message, username: str): bot.send_message( message.chat.id, '⬆️ Enter Expire Date (YYYY-MM-DD)\nOr You Can Use Regex Symbol: ^[0-9]{1,3}(M|Y) :\n⚠️ Send 0 for never expire.', - reply_markup=BotKeyboard.cancel_action() - ) + reply_markup=BotKeyboard.cancel_action()) bot.register_next_step_handler(message, add_user_expire_step, username=username, data_limit=data_limit) @@ -378,7 +378,9 @@ def confirm_user_command(call: types.CallbackQuery): elif data == 'restart': m = bot.edit_message_text('🔄 Restarting XRay core...', call.message.chat.id, call.message.message_id) - xray.core.restart() + xray.core.restart( + xray_config_include_db_clients(xray.config) + ) bot.edit_message_text( '✅ XRay core restarted successfully.', m.chat.id, m.message_id, diff --git a/app/utils/xray.py b/app/utils/xray.py index 605274302..27c09d6f9 100644 --- a/app/utils/xray.py +++ b/app/utils/xray.py @@ -1,5 +1,8 @@ -from app.models.user import User, UserResponse from app import xray +from app.db import GetDB, User, get_users +from app.models.proxy import ProxySettings +from app.models.user import User, UserResponse, UserStatus +from app.xray.config import XRayConfig def xray_add_user(user: User): @@ -20,3 +23,21 @@ def xray_remove_user(user: User): xray.api.remove_inbound_user(tag=inbound_tag, email=user.username) except xray.exc.EmailNotFoundError: pass + + +def xray_config_include_db_clients(config: XRayConfig): + config = config.copy() + + with GetDB() as db: + for user in get_users(db, status=UserStatus.active): + proxies_settings = { + p.type: ProxySettings.from_dict(p.type, p.settings).dict(no_obj=True) + for p in user.proxies + } + for proxy_type, inbound_tags in user.inbounds.items(): + for inbound_tag in inbound_tags: + config.add_inbound_client(inbound_tag, + user.username, + proxies_settings[proxy_type]) + + return config diff --git a/app/xray/__init__.py b/app/xray/__init__.py index fb7517bfc..17436075c 100644 --- a/app/xray/__init__.py +++ b/app/xray/__init__.py @@ -20,7 +20,7 @@ del api_port -core = XRayCore(config, XRAY_EXECUTABLE_PATH, XRAY_ASSETS_PATH) +core = XRayCore(XRAY_EXECUTABLE_PATH, XRAY_ASSETS_PATH) api = XRay(config.api_host, config.api_port) diff --git a/app/xray/config.py b/app/xray/config.py index 503e9f704..a5e074e5f 100644 --- a/app/xray/config.py +++ b/app/xray/config.py @@ -1,5 +1,6 @@ import json import re +from copy import deepcopy from pathlib import PosixPath from typing import Union @@ -37,6 +38,7 @@ def __init__(self, self.inbounds_by_protocol = {} self.inbounds_by_tag = {} self._fallbacks_inbound = self.get_inbound(XRAY_FALLBACKS_INBOUND_TAG) + self._addr_clients_by_tag = {} self._resolve_inbounds() self._apply_api() @@ -115,6 +117,12 @@ def _resolve_inbounds(self): if inbound['tag'] in XRAY_EXCLUDE_INBOUND_TAGS: continue + if not inbound.get('settings'): + inbound['settings'] = {} + if not inbound['settings'].get('clients'): + inbound['settings']['clients'] = [] + self._addr_clients_by_tag[inbound['tag']] = inbound['settings']['clients'] + settings = { "tag": inbound["tag"], "protocol": inbound["protocol"], @@ -216,6 +224,14 @@ def _resolve_inbounds(self): except KeyError: self.inbounds_by_protocol[inbound['protocol']] = [settings] + def add_inbound_client(self, inbound_tag: str, email: str, settings: dict): + client = {"email": email, **settings} + try: + self._addr_clients_by_tag[inbound_tag].append(client) + except KeyError: + return + return client + def get_inbound(self, tag) -> dict: for inbound in self['inbounds']: if inbound['tag'] == tag: @@ -228,3 +244,6 @@ def get_outbound(self, tag) -> dict: def to_json(self, **json_kwargs): return json.dumps(self, **json_kwargs) + + def copy(self): + return deepcopy(self) diff --git a/app/xray/core.py b/app/xray/core.py index cb5f74dd7..46cd03a73 100644 --- a/app/xray/core.py +++ b/app/xray/core.py @@ -8,13 +8,11 @@ class XRayCore: def __init__(self, - config: XRayConfig, executable_path: str = "/usr/bin/xray", assets_path: str = "/usr/share/xray"): self.executable_path = executable_path self.assets_path = assets_path self.started = False - self.config = config self._process = None self._on_start_funcs = [] self._on_stop_funcs = [] @@ -35,23 +33,22 @@ def reader(): while True: try: output = self._process.stdout.readline().strip('\n') + if output == '' and self._process.poll() is not None: + break except AttributeError: break - if output == '' and self._process.poll() is not None: - break - # if output: # logger.info(output) threading.Thread(target=reader).start() - def start(self): + def start(self, config: XRayConfig): if self.started is True: raise RuntimeError("Xray is started already") - if self.config.get('log', {}).get('logLevel') in ('none', 'error'): - self.config['log']['logLevel'] = 'warning' + if config.get('log', {}).get('logLevel') in ('none', 'error'): + config['log']['logLevel'] = 'warning' cmd = [ self.executable_path, @@ -66,7 +63,7 @@ def start(self): stdout=subprocess.PIPE, universal_newlines=True ) - self._process.stdin.write(self.config.to_json()) + self._process.stdin.write(config.to_json()) self._process.stdin.flush() self._process.stdin.close() @@ -101,9 +98,9 @@ def stop(self): for func in self._on_stop_funcs: threading.Thread(target=func).start() - def restart(self): + def restart(self, config: XRayConfig): self.stop() - self.start() + self.start(config) def on_start(self, func: callable): self._on_start_funcs.append(func)