Skip to content

Commit

Permalink
Refactor network_from_str function to handle IPv6 addresses
Browse files Browse the repository at this point in the history
Added sample set proxies list from cloudflare list
  • Loading branch information
dkmstr committed Sep 30, 2024
1 parent 093c7b3 commit 81c5429
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 44 deletions.
126 changes: 126 additions & 0 deletions server/samples/set-proxies-from-cloudflare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-

#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

'''
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import typing
import asyncio
import aiohttp

AUTH_NAME: typing.Final[str] = 'interna'
AUTH_USER: typing.Final[str] = 'admin'
AUTH_PASS: typing.Final[str] = 'temporal'

REST_URL: typing.Final[str] = 'http://172.27.0.1:8000/uds/rest/'


class RESTException(Exception):
pass


class AuthException(RESTException):
pass


class LogoutException(RESTException):
pass


# Hace login con el root, puede usarse cualquier autenticador y cualquier usuario, pero en la 1.5 solo está implementado poder hacer
# este tipo de login con el usuario "root"
async def login(session: aiohttp.ClientSession) -> None:
parameters = {
'auth': AUTH_NAME,
'username': AUTH_USER,
'password': AUTH_PASS,
}

response = await session.post(REST_URL + 'auth/login', json=parameters)

if not response.ok:
raise AuthException('Error logging in')

# resp contiene las cabeceras, content el contenido de la respuesta (que es json), pero aún está en formato texto
res = await response.json()
print(res)

if res['result'] != 'ok': # Authentication error
raise AuthException('Authentication error')

session.headers.update({'X-Auth-Token': res['token']})
session.headers.update({'Scrambler': res['scrambler']})

# Fix user agent, so we indicate we are on Linux
session.headers.update({'User-Agent': 'SampleClient/1.0 (Linux)'})


async def logout(session: aiohttp.ClientSession) -> None:
response = await session.get(REST_URL + 'auth/logout')

if not response.ok:
raise LogoutException('Error logging out')


async def set_config_var(section: str, name: str, value: str, session: aiohttp.ClientSession) -> None:
response = await session.put(
REST_URL + 'config',
json={
section: {
name: {
'value': value,
}
}
},
)

if not response.ok:
raise RESTException('Error setting config var')


async def main():
async with aiohttp.ClientSession() as session:
await login(session) # Will raise an exception if error

# Get ipv4 and ipv6 from cloudflare
ips: typing.List[str] = []
for url in ['https://www.cloudflare.com/ips-v4', 'https://www.cloudflare.com/ips-v6']:
response = await session.get(url)
if not response.ok:
raise RESTException('Error getting cloudflare ips')
ips += (await response.text()).strip().split('\n')

await set_config_var('Security', 'Allowed IP Forwarders', ','.join(ips), session)

await logout(session)


if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
26 changes: 13 additions & 13 deletions server/src/uds/REST/methods/login_logout.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ def post(self) -> typing.Any:
):
raise exceptions.rest.RequestError('Invalid parameters (no auth)')

authId: typing.Optional[str] = self._params.get(
auth_id: typing.Optional[str] = self._params.get(
'auth_id',
self._params.get('authId', None), # Old compat, alias
)
authLabel: typing.Optional[str] = self._params.get(
auth_label: typing.Optional[str] = self._params.get(
'auth_label',
self._params.get(
'authSmallName', # Old compat name
self._params.get('authLabel', None), # Old compat name
),
)
authName: typing.Optional[str] = self._params.get('auth', None)
auth_name: typing.Optional[str] = self._params.get('auth', None)
platform: str = self._params.get('platform', self._request.os.os.value[0])

username: str = self._params['username']
Expand All @@ -148,25 +148,25 @@ def post(self) -> typing.Any:
# Generate a random scrambler
scrambler: str = CryptoManager.manager().random_string(32)
if (
authName == 'admin'
or authLabel == 'admin'
or authId == '00000000-0000-0000-0000-000000000000'
or (not authId and not authName and not authLabel)
auth_name == 'admin'
or auth_label == 'admin'
or auth_id == '00000000-0000-0000-0000-000000000000'
or (not auth_id and not auth_name and not auth_label)
):
if GlobalConfig.SUPER_USER_LOGIN.get(True) == username and CryptoManager().check_hash(
if GlobalConfig.SUPER_USER_LOGIN.get(True) == username and CryptoManager.manager().check_hash(
password, GlobalConfig.SUPER_USER_PASS.get(True)
):
self.gen_auth_token(-1, username, password, locale, platform, True, True, scrambler)
return Login.result(result='ok', token=self.get_auth_token())
return Login.result(error='Invalid credentials')

# Will raise an exception if no auth found
if authId:
auth = Authenticator.objects.get(uuid=process_uuid(authId))
elif authName:
auth = Authenticator.objects.get(name=authName)
if auth_id:
auth = Authenticator.objects.get(uuid=process_uuid(auth_id))
elif auth_name:
auth = Authenticator.objects.get(name__iexact=auth_name)
else:
auth = Authenticator.objects.get(small_name=authLabel)
auth = Authenticator.objects.get(small_name__iexact=auth_label)

# No matter in fact the password, just not empty (so it can be encrypted, but will be invalid anyway)
password = password or CryptoManager().random_string(32)
Expand Down
44 changes: 24 additions & 20 deletions server/src/uds/core/auths/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ def needs_trusted_source(
"""

@wraps(view_func)
def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> HttpResponse:
def _wrapped_view(
request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any
) -> HttpResponse:
"""
Wrapped function for decorator
"""
Expand All @@ -202,7 +204,9 @@ def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.A
# it's designed to be used in ajax calls mainly
def deny_non_authenticated(view_func: collections.abc.Callable[..., RT]) -> collections.abc.Callable[..., RT]:
@wraps(view_func)
def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> RT:
def _wrapped_view(
request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any
) -> RT:
if not request.user or not request.authorized:
return HttpResponseForbidden() # type: ignore
return view_func(request, *args, **kwargs)
Expand All @@ -215,6 +219,7 @@ def register_user(
auth_instance: AuthenticatorInstance,
username: str,
request: 'types.requests.ExtendedHttpRequest',
skip_callbacks: bool = False,
) -> types.auth.LoginResult:
"""
Check if this user already exists on database with this authenticator, if don't, create it with defaults
Expand All @@ -239,9 +244,9 @@ def register_user(
browser=request.os.browser,
version=request.os.version,
)
# Try to notify callback if needed
callbacks.weblogin(usr)
if not skip_callbacks:
callbacks.weblogin(usr)

return types.auth.LoginResult(user=usr)

return types.auth.LoginResult()
Expand All @@ -252,19 +257,17 @@ def authenticate(
password: str,
authenticator: models.Authenticator,
request: 'types.requests.ExtendedHttpRequest',
skip_callbacks: bool = False,
) -> types.auth.LoginResult:
"""
Given an username, password and authenticator, try to authenticate user
@param username: username to authenticate
@param password: password to authenticate this user
@param authenticator: Authenticator (database object) used to authenticate with provided credentials
@param request: Request object
@return:
An types.auth.LoginResult indicating:
user if success in logging in field user or None if not
url if not success in logging in field url so instead of error UDS will redirect to this url
Authenticate user with provided credentials
Args:
username (str): username to authenticate
password (str): password to authenticate this user
authenticator (models.Authenticator): Authenticator (database object) used to authenticate with provided credentials
request (ExtendedHttpRequestWithUser): Request object
skip_callbacks (bool, optional): Skip callbacks. Defaults to False.
"""
logger.debug('Authenticating user %s with authenticator %s', username, authenticator)
Expand Down Expand Up @@ -354,12 +357,11 @@ def authenticate_via_callback(
if result.success == types.auth.AuthenticationState.REDIRECT:
return types.auth.LoginResult(url=result.url)

if result.username:
return register_user(authenticator, auth_instance, result.username or '', request)
else:
if not result.username:
logger.warning('Authenticator %s returned empty username', authenticator.name)
raise exceptions.auth.InvalidUserException('User doesn\'t has access to UDS')

raise exceptions.auth.InvalidUserException('User doesn\'t has access to UDS')
return register_user(authenticator, auth_instance, result.username, request)


def authenticate_callback_url(authenticator: models.Authenticator) -> str:
Expand Down Expand Up @@ -447,7 +449,9 @@ def web_password(request: HttpRequest) -> str:
return CryptoManager().symmetric_decrypt(passkey, uds_cookie(request)) # recover as original unicode string


def web_logout(request: 'types.requests.ExtendedHttpRequest', exit_url: typing.Optional[str] = None) -> HttpResponse:
def web_logout(
request: 'types.requests.ExtendedHttpRequest', exit_url: typing.Optional[str] = None
) -> HttpResponse:
"""
Helper function to clear user related data from session. If this method is not used, the session we be cleaned anyway
by django in regular basis.
Expand Down
26 changes: 15 additions & 11 deletions server/src/uds/core/util/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import functools
import ipaddress
import logging
import re
Expand Down Expand Up @@ -210,29 +211,32 @@ def network_from_str_ipv6(strNets: str) -> NetworkType:


def network_from_str(
strNets: str,
network_str: str,
version: typing.Literal[0, 4, 6] = 0,
) -> NetworkType:
if not ':' in strNets and version != 6:
return network_from_str_ipv4(strNets)
# ':' in strNets or version == 6:
# If is in fact an IPv4 address, return None network, this will not be used
if '.' in strNets:
try:
if not ':' in network_str and version != 6:
return network_from_str_ipv4(network_str)
# ':' in strNets or version == 6:
# If is in fact an IPv4 address, return None network, this will not be used
if '.' in network_str:
return NetworkType(0, 0, 0)
return network_from_str_ipv6(network_str)
except ValueError:
return NetworkType(0, 0, 0)
return network_from_str_ipv6(strNets)


@functools.lru_cache(maxsize=32)
def networks_from_str(
nets: str,
networks_str: str,
version: typing.Literal[0, 4, 6] = 0,
) -> list[NetworkType]:
"""
If allowMultipleNetworks is True, it allows ',' and ';' separators (and, ofc, more than 1 network)
Returns a list of networks tuples in the form [(start1, end1), (start2, end2) ...]
"""
return [network_from_str(str_net, version) for str_net in re.split('[;,]', nets) if str_net]

return [network_from_str(str_net, version) for str_net in re.split('[;,]', networks_str) if str_net]

@functools.lru_cache(maxsize=32)
def contains(
networks: typing.Union[str, NetworkType, list[NetworkType]],
ip: typing.Union[str, int],
Expand Down

0 comments on commit 81c5429

Please sign in to comment.