From 81c5429f40140ac922e36dfbc59fc92cf7f9542b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Mon, 30 Sep 2024 20:53:39 +0200 Subject: [PATCH] Refactor network_from_str function to handle IPv6 addresses Added sample set proxies list from cloudflare list --- server/samples/set-proxies-from-cloudflare.py | 126 ++++++++++++++++++ server/src/uds/REST/methods/login_logout.py | 26 ++-- server/src/uds/core/auths/auth.py | 44 +++--- server/src/uds/core/util/net.py | 26 ++-- 4 files changed, 178 insertions(+), 44 deletions(-) create mode 100644 server/samples/set-proxies-from-cloudflare.py diff --git a/server/samples/set-proxies-from-cloudflare.py b/server/samples/set-proxies-from-cloudflare.py new file mode 100644 index 000000000..accfdc56c --- /dev/null +++ b/server/samples/set-proxies-from-cloudflare.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- + +# +# Copyright (c) 2024 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +''' +Author: Adolfo Gómez, dkmaster at dkmon dot com +''' +import typing +import asyncio +import aiohttp + +AUTH_NAME: typing.Final[str] = 'interna' +AUTH_USER: typing.Final[str] = 'admin' +AUTH_PASS: typing.Final[str] = 'temporal' + +REST_URL: typing.Final[str] = 'http://172.27.0.1:8000/uds/rest/' + + +class RESTException(Exception): + pass + + +class AuthException(RESTException): + pass + + +class LogoutException(RESTException): + pass + + +# Hace login con el root, puede usarse cualquier autenticador y cualquier usuario, pero en la 1.5 solo está implementado poder hacer +# este tipo de login con el usuario "root" +async def login(session: aiohttp.ClientSession) -> None: + parameters = { + 'auth': AUTH_NAME, + 'username': AUTH_USER, + 'password': AUTH_PASS, + } + + response = await session.post(REST_URL + 'auth/login', json=parameters) + + if not response.ok: + raise AuthException('Error logging in') + + # resp contiene las cabeceras, content el contenido de la respuesta (que es json), pero aún está en formato texto + res = await response.json() + print(res) + + if res['result'] != 'ok': # Authentication error + raise AuthException('Authentication error') + + session.headers.update({'X-Auth-Token': res['token']}) + session.headers.update({'Scrambler': res['scrambler']}) + + # Fix user agent, so we indicate we are on Linux + session.headers.update({'User-Agent': 'SampleClient/1.0 (Linux)'}) + + +async def logout(session: aiohttp.ClientSession) -> None: + response = await session.get(REST_URL + 'auth/logout') + + if not response.ok: + raise LogoutException('Error logging out') + + +async def set_config_var(section: str, name: str, value: str, session: aiohttp.ClientSession) -> None: + response = await session.put( + REST_URL + 'config', + json={ + section: { + name: { + 'value': value, + } + } + }, + ) + + if not response.ok: + raise RESTException('Error setting config var') + + +async def main(): + async with aiohttp.ClientSession() as session: + await login(session) # Will raise an exception if error + + # Get ipv4 and ipv6 from cloudflare + ips: typing.List[str] = [] + for url in ['https://www.cloudflare.com/ips-v4', 'https://www.cloudflare.com/ips-v6']: + response = await session.get(url) + if not response.ok: + raise RESTException('Error getting cloudflare ips') + ips += (await response.text()).strip().split('\n') + + await set_config_var('Security', 'Allowed IP Forwarders', ','.join(ips), session) + + await logout(session) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) diff --git a/server/src/uds/REST/methods/login_logout.py b/server/src/uds/REST/methods/login_logout.py index 3ac563aad..eab5a6133 100644 --- a/server/src/uds/REST/methods/login_logout.py +++ b/server/src/uds/REST/methods/login_logout.py @@ -127,18 +127,18 @@ def post(self) -> typing.Any: ): raise exceptions.rest.RequestError('Invalid parameters (no auth)') - authId: typing.Optional[str] = self._params.get( + auth_id: typing.Optional[str] = self._params.get( 'auth_id', self._params.get('authId', None), # Old compat, alias ) - authLabel: typing.Optional[str] = self._params.get( + auth_label: typing.Optional[str] = self._params.get( 'auth_label', self._params.get( 'authSmallName', # Old compat name self._params.get('authLabel', None), # Old compat name ), ) - authName: typing.Optional[str] = self._params.get('auth', None) + auth_name: typing.Optional[str] = self._params.get('auth', None) platform: str = self._params.get('platform', self._request.os.os.value[0]) username: str = self._params['username'] @@ -148,12 +148,12 @@ def post(self) -> typing.Any: # Generate a random scrambler scrambler: str = CryptoManager.manager().random_string(32) if ( - authName == 'admin' - or authLabel == 'admin' - or authId == '00000000-0000-0000-0000-000000000000' - or (not authId and not authName and not authLabel) + auth_name == 'admin' + or auth_label == 'admin' + or auth_id == '00000000-0000-0000-0000-000000000000' + or (not auth_id and not auth_name and not auth_label) ): - if GlobalConfig.SUPER_USER_LOGIN.get(True) == username and CryptoManager().check_hash( + if GlobalConfig.SUPER_USER_LOGIN.get(True) == username and CryptoManager.manager().check_hash( password, GlobalConfig.SUPER_USER_PASS.get(True) ): self.gen_auth_token(-1, username, password, locale, platform, True, True, scrambler) @@ -161,12 +161,12 @@ def post(self) -> typing.Any: return Login.result(error='Invalid credentials') # Will raise an exception if no auth found - if authId: - auth = Authenticator.objects.get(uuid=process_uuid(authId)) - elif authName: - auth = Authenticator.objects.get(name=authName) + if auth_id: + auth = Authenticator.objects.get(uuid=process_uuid(auth_id)) + elif auth_name: + auth = Authenticator.objects.get(name__iexact=auth_name) else: - auth = Authenticator.objects.get(small_name=authLabel) + auth = Authenticator.objects.get(small_name__iexact=auth_label) # No matter in fact the password, just not empty (so it can be encrypted, but will be invalid anyway) password = password or CryptoManager().random_string(32) diff --git a/server/src/uds/core/auths/auth.py b/server/src/uds/core/auths/auth.py index 5ca547c5d..b8e27a0bc 100644 --- a/server/src/uds/core/auths/auth.py +++ b/server/src/uds/core/auths/auth.py @@ -180,7 +180,9 @@ def needs_trusted_source( """ @wraps(view_func) - def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> HttpResponse: + def _wrapped_view( + request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any + ) -> HttpResponse: """ Wrapped function for decorator """ @@ -202,7 +204,9 @@ def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.A # it's designed to be used in ajax calls mainly def deny_non_authenticated(view_func: collections.abc.Callable[..., RT]) -> collections.abc.Callable[..., RT]: @wraps(view_func) - def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> RT: + def _wrapped_view( + request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any + ) -> RT: if not request.user or not request.authorized: return HttpResponseForbidden() # type: ignore return view_func(request, *args, **kwargs) @@ -215,6 +219,7 @@ def register_user( auth_instance: AuthenticatorInstance, username: str, request: 'types.requests.ExtendedHttpRequest', + skip_callbacks: bool = False, ) -> types.auth.LoginResult: """ Check if this user already exists on database with this authenticator, if don't, create it with defaults @@ -239,9 +244,9 @@ def register_user( browser=request.os.browser, version=request.os.version, ) - # Try to notify callback if needed - callbacks.weblogin(usr) - + if not skip_callbacks: + callbacks.weblogin(usr) + return types.auth.LoginResult(user=usr) return types.auth.LoginResult() @@ -252,19 +257,17 @@ def authenticate( password: str, authenticator: models.Authenticator, request: 'types.requests.ExtendedHttpRequest', + skip_callbacks: bool = False, ) -> types.auth.LoginResult: """ - Given an username, password and authenticator, try to authenticate user - @param username: username to authenticate - @param password: password to authenticate this user - @param authenticator: Authenticator (database object) used to authenticate with provided credentials - @param request: Request object - - @return: - An types.auth.LoginResult indicating: - user if success in logging in field user or None if not - url if not success in logging in field url so instead of error UDS will redirect to this url + Authenticate user with provided credentials + Args: + username (str): username to authenticate + password (str): password to authenticate this user + authenticator (models.Authenticator): Authenticator (database object) used to authenticate with provided credentials + request (ExtendedHttpRequestWithUser): Request object + skip_callbacks (bool, optional): Skip callbacks. Defaults to False. """ logger.debug('Authenticating user %s with authenticator %s', username, authenticator) @@ -354,12 +357,11 @@ def authenticate_via_callback( if result.success == types.auth.AuthenticationState.REDIRECT: return types.auth.LoginResult(url=result.url) - if result.username: - return register_user(authenticator, auth_instance, result.username or '', request) - else: + if not result.username: logger.warning('Authenticator %s returned empty username', authenticator.name) + raise exceptions.auth.InvalidUserException('User doesn\'t has access to UDS') - raise exceptions.auth.InvalidUserException('User doesn\'t has access to UDS') + return register_user(authenticator, auth_instance, result.username, request) def authenticate_callback_url(authenticator: models.Authenticator) -> str: @@ -447,7 +449,9 @@ def web_password(request: HttpRequest) -> str: return CryptoManager().symmetric_decrypt(passkey, uds_cookie(request)) # recover as original unicode string -def web_logout(request: 'types.requests.ExtendedHttpRequest', exit_url: typing.Optional[str] = None) -> HttpResponse: +def web_logout( + request: 'types.requests.ExtendedHttpRequest', exit_url: typing.Optional[str] = None +) -> HttpResponse: """ Helper function to clear user related data from session. If this method is not used, the session we be cleaned anyway by django in regular basis. diff --git a/server/src/uds/core/util/net.py b/server/src/uds/core/util/net.py index 123ed6776..9f42ab6f6 100644 --- a/server/src/uds/core/util/net.py +++ b/server/src/uds/core/util/net.py @@ -30,6 +30,7 @@ """ Author: Adolfo Gómez, dkmaster at dkmon dot com """ +import functools import ipaddress import logging import re @@ -210,29 +211,32 @@ def network_from_str_ipv6(strNets: str) -> NetworkType: def network_from_str( - strNets: str, + network_str: str, version: typing.Literal[0, 4, 6] = 0, ) -> NetworkType: - if not ':' in strNets and version != 6: - return network_from_str_ipv4(strNets) - # ':' in strNets or version == 6: - # If is in fact an IPv4 address, return None network, this will not be used - if '.' in strNets: + try: + if not ':' in network_str and version != 6: + return network_from_str_ipv4(network_str) + # ':' in strNets or version == 6: + # If is in fact an IPv4 address, return None network, this will not be used + if '.' in network_str: + return NetworkType(0, 0, 0) + return network_from_str_ipv6(network_str) + except ValueError: return NetworkType(0, 0, 0) - return network_from_str_ipv6(strNets) - +@functools.lru_cache(maxsize=32) def networks_from_str( - nets: str, + networks_str: str, version: typing.Literal[0, 4, 6] = 0, ) -> list[NetworkType]: """ If allowMultipleNetworks is True, it allows ',' and ';' separators (and, ofc, more than 1 network) Returns a list of networks tuples in the form [(start1, end1), (start2, end2) ...] """ - return [network_from_str(str_net, version) for str_net in re.split('[;,]', nets) if str_net] - + return [network_from_str(str_net, version) for str_net in re.split('[;,]', networks_str) if str_net] +@functools.lru_cache(maxsize=32) def contains( networks: typing.Union[str, NetworkType, list[NetworkType]], ip: typing.Union[str, int],