Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ansible_wisdom/main/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""

import os
import sys
from pathlib import Path

# Build paths inside the project like this: BASE_DIR / 'subdir'.
Expand Down Expand Up @@ -170,6 +171,18 @@
'REFRESH_TOKEN_EXPIRE_SECONDS': 864_000, # = 10 days
}

#
# We need to run 'manage.py migrate' before adding our own OAuth2 application model.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be seeing a migration file in this PR for this new application? Assuming yes, I'm not entirely clear on the order of operations for getting everything set up correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we will not have a new migration file with this PR. The new "wildcard" OAuth app will swap the default app as it is defined as "swappable" and the same db table will be used as the default one,

# See https://django-oauth-toolkit.readthedocs.io/en/latest/advanced_topics.html
# #extending-the-application-model
#
# Also, if these lines are executed in testing, test fails with:
# django.db.utils.ProgrammingError: relation "users_user" does not exist
#
if sys.argv[1:2] not in [['migrate'], ['test']]:
INSTALLED_APPS.append('wildcard_oauth2')
OAUTH2_PROVIDER_APPLICATION_MODEL = 'wildcard_oauth2.Application'

# OAUTH: todo
# - remove ansible_wisdom/users/auth.py module
# - remove ansible_wisdom/users/views.py module
Expand Down
Empty file.
14 changes: 14 additions & 0 deletions ansible_wisdom/wildcard_oauth2/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
Django App Configuration
"""

from django.apps import AppConfig


class WildcardOauth2ApplicationConfig(AppConfig):
"""
Configures wildcard_oauth2 as a Django app plugin
"""

name = 'wildcard_oauth2'
verbose_name = "Wildcard OAuth2 Application"
128 changes: 128 additions & 0 deletions ansible_wisdom/wildcard_oauth2/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
A custom OAuth2 application that allows wildcard for redirect_uris

https://github.com/jazzband/django-oauth-toolkit/issues/443#issuecomment-420255286
"""
import ipaddress
import re
from urllib.parse import parse_qsl, unquote, urlparse

from django.core.exceptions import ValidationError
from oauth2_provider.models import AbstractApplication
from oauth2_provider.settings import oauth2_settings


def validate_uris(value):
"""Ensure that `value` contains valid blank-separated URIs."""
urls = value.split()
for url in urls:
obj = urlparse(url)
if obj.fragment:
raise ValidationError('Redirect URIs must not contain fragments')
if obj.scheme.lower() not in oauth2_settings.ALLOWED_REDIRECT_URI_SCHEMES:
raise ValidationError('Redirect URI scheme is not allowed.')
if not obj.netloc:
raise ValidationError('Redirect URI must contain a domain.')
if not is_acceptable_netloc(obj.netloc):
raise ValidationError('Redirect URI is not acceptable.')


def wildcard_string_to_regex(value):
return re.escape(value).replace('\\*', '[^\\/]*')


def is_acceptable_netloc(value):
if '*' in value:
return re.fullmatch(r'.*\.[^\.\*]+\.[^\.\*]+', value) is not None
else:
return True


def is_ip_address(address):
try:
ipaddress.ip_address(address)
return True
except ValueError:
return False


class Application(AbstractApplication):
"""Subclass of application to allow for regular expressions for the redirect uri."""

@staticmethod
def _uri_is_allowed(allowed_uri, uri):
"""Check that the URI conforms to these rules."""
schemes_match = allowed_uri.scheme == uri.scheme

# Wildcards ('*') in netloc is supposed to be matched to a hostname,
# not an ip address.
if '*' in allowed_uri.netloc and is_ip_address(uri.hostname):
netloc_matches_pattern = None
else:
regex = wildcard_string_to_regex(allowed_uri.netloc)
netloc_matches_pattern = re.fullmatch(regex, uri.netloc)

# The original code allowed only fixed paths only with:
# paths_match = allowed_uri.path == uri.path
# However, since paths can contain variable portions (e.g. code-server),
# code was modified to support regex patterns in paths as well.
regex = wildcard_string_to_regex(allowed_uri.path)
paths_match = re.fullmatch(regex, uri.path)

return all([schemes_match, netloc_matches_pattern, paths_match])

def __init__(self, *args, **kwargs):
"""Relax the validator to allow for uris with regular expressions."""
self._meta.get_field('redirect_uris').validators = [
validate_uris,
]
super().__init__(*args, **kwargs)

def redirect_uri_allowed(self, uri):
"""
Check if given url is one of the items in :attr:`redirect_uris` string.
A Redirect uri domain may be a regular expression e.g. `^(.*).example.com$` will
match all subdomains of example.com.
A Redirect uri may be `https://(.*).example.com/some/path/?q=x`
:param uri: Url to check
"""
for allowed_uri in self.redirect_uris.split():
parsed_allowed_uri = urlparse(allowed_uri)
parsed_uri = urlparse(uri)

if self._uri_is_allowed(parsed_allowed_uri, parsed_uri):
aqs_set = set(parse_qsl(parsed_allowed_uri.query))
uqs_set = set(parse_qsl(parsed_uri.query))

if aqs_set.issubset(uqs_set):
return True

return False

def clean(self):
uris_with_wildcard = [uri for uri in self.redirect_uris.split(' ') if '*' in uri]
if uris_with_wildcard:
self.redirect_uris = ' '.join(
[uri for uri in self.redirect_uris.split(' ') if '*' not in uri]
)
super().clean()
if uris_with_wildcard:
self.redirect_uris += ' ' + ' '.join(uris_with_wildcard)

def is_usable(self, request):
# This is a hacky way to decode redirect_uri stored in an oauthlib.Request instance.
# Once the oauthlib.Request class started decoding redirect_uri correctly, this will
# be removed.
if getattr(request, '_params'):
redirect_uri = request._params.get('redirect_uri')
if redirect_uri:
request._params['redirect_uri'] = unquote(redirect_uri)

return True

class Meta:
db_table = 'oauth2_provider_application'
# Without the following line, tests fail with:
# RuntimeError: Model class wildcard_oauth2.models.Application doesn't declare
# an explicit app_label and isn't in an application in INSTALLED_APPS.
app_label = 'wildcard_oauth2'
Empty file.
118 changes: 118 additions & 0 deletions ansible_wisdom/wildcard_oauth2/tests/test_wildcard_oauth2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import re
from urllib.parse import urlparse

from django.core.exceptions import ValidationError
from django.test import TestCase

from ..models import (
Application,
is_acceptable_netloc,
validate_uris,
wildcard_string_to_regex,
)

redirect_uris = [
'vscode://redhat.ansible',
'https://*.github.dev/extension-auth-callback?*',
'http://127.0.0.1:8000/*/callback?*',
]


class WildcardOAuth2Test(TestCase):
def setUp(self):
self.app = Application(redirect_uris=' '.join(redirect_uris))

def test_standalone_vscode_callback_uri(self):
rc = self.app.redirect_uri_allowed('vscode://redhat.ansible')
self.assertTrue(rc)
self.app.clean()

def test_invalid_callback_uri(self):
rc = self.app.redirect_uri_allowed('vscode://othercompany.ansible')
self.assertFalse(rc)

def test_valid_codespases_callback_uri(self):
rc = self.app.redirect_uri_allowed(
'https://jubilant-engine-wv4w5xw9vq9f9gg9.github.dev/'
'extension-auth-callback?state=6766a56164972ebe9ab0350c00d9041c'
)
self.assertTrue(rc)

def test_invalid_codespases_callback_uri(self):
rc = self.app.redirect_uri_allowed(
'https://jubilant-engine-wv4w5xw9vq9f9gg9.github.com/'
'extension-auth-callback?state=6766a56164972ebe9ab0350c00d9041c'
)
self.assertFalse(rc)

def test_valid_code_server_callback_uri(self):
rc = self.app.redirect_uri_allowed(
'http://127.0.0.1:8000/stable-9658969084238651b6dde258e04f4abd9b14bfd1/callback'
'?vscode-reqid=2&vscode-scheme=code-oss&vscode-authority=redhat.ansible'
)
self.assertTrue(rc)


class ValidateUrisTest(TestCase):
def test_uri_no_error(self):
validate_uris(
'https://example.com/callback '
'https://*.github.dev/extension-auth-callback?.* '
'http://127.0.0.1:8000/.*?.*'
)

def test_uri_containing_fragment(self):
try:
validate_uris('https://example.com/callback#fragment')
except ValidationError as e:
self.assertEqual(e.message, 'Redirect URIs must not contain fragments')

def test_uri_containing_invalid_scheme(self):
try:
validate_uris('myapp://example.com/callback')
except ValidationError as e:
self.assertEqual(e.message, 'Redirect URI scheme is not allowed.')

def test_uri_containing_no_domain(self):
try:
validate_uris('vscode:redhat.ansible')
except ValidationError as e:
self.assertEqual(e.message, 'Redirect URI must contain a domain.')

def test_uri_containing_wildcard_in_root_domain(self):
try:
validate_uris('https://*.github.*/extension-auth-callback?.*')
except ValidationError as e:
self.assertEqual(e.message, 'Redirect URI is not acceptable.')

def test_ip_address(self):
allowed_uri = urlparse('http://*.0.1/callback')
uri = urlparse('http://123.123.0.1/callback')
self.assertFalse(Application._uri_is_allowed(allowed_uri, uri))


class AcceptableNetlocTest(TestCase):
def test_valid_netlocs(self):
self.assertTrue(is_acceptable_netloc('subdomain.example.com'))
self.assertTrue(is_acceptable_netloc('*.example.com'))
self.assertTrue(is_acceptable_netloc('sub*.example.com'))
self.assertTrue(is_acceptable_netloc('*sub.example.com'))
self.assertTrue(is_acceptable_netloc('*sub*.sub.example.com'))
self.assertTrue(is_acceptable_netloc('*.*.example.com'))

def test_invalid_netlocs(self):
self.assertFalse(is_acceptable_netloc('subdomain.*.com'))
self.assertFalse(is_acceptable_netloc('*.example.*'))
self.assertFalse(is_acceptable_netloc('sub*.example*.com'))
self.assertFalse(is_acceptable_netloc('*.com'))


class WildcardStringToRegExTest(TestCase):
def test_wildcard_string_to_regex(self):
self.assertEqual(wildcard_string_to_regex('*'), '[^\\/]*')
p = wildcard_string_to_regex('*sub*.sub.example.com')
self.assertEqual(p, '[^\\/]*sub[^\\/]*\\.sub\\.example\\.com')
self.assertTrue(re.match(p, 'sub.sub.example.com'))
self.assertTrue(re.match(p, 'abcsubxyz.sub.example.com'))
self.assertTrue(re.match(p, 'abc.sub.sub.example.com'))
self.assertFalse(re.match(p, 'sub/xyz.sub.example.com'))