Skip to content
Merged
23 changes: 23 additions & 0 deletions oauth2_provider/templates/oauth2_provider/authorized-oob.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{% extends "oauth2_provider/base.html" %}

{% load i18n %}

{% block title %}
Success code={{code}}
{% endblock %}

{% block content %}
<div class="block-center">
{% if not error %}
<h2>{% trans "Success" %}</h2>

<p>{% trans "Please return to your application and enter this code:" %}</p>

<p><code>{{ code }}</code></p>

{% else %}
<h2>Error: {{ error.error }}</h2>
<p>{{ error.description }}</p>
{% endif %}
</div>
{% endblock %}
38 changes: 35 additions & 3 deletions oauth2_provider/views/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
import logging
import urllib.parse

from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpResponse
from django.http import HttpResponse, JsonResponse
from django.utils import timezone
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.debug import sensitive_post_parameters
from django.views.generic import FormView, View
from django.shortcuts import render
from django.urls import reverse

from ..exceptions import OAuthToolkitError
from ..forms import AllowForm
Expand All @@ -18,7 +21,6 @@
from ..signals import app_authorized
from .mixins import OAuthLibMixin


log = logging.getLogger("oauth2_provider")


Expand Down Expand Up @@ -59,6 +61,7 @@ def redirect(self, redirect_to, application):
allowed_schemes = application.get_allowed_schemes()
return OAuth2ResponseRedirect(redirect_to, allowed_schemes)

RFC3339 = '%Y-%m-%dT%H:%M:%SZ'

class AuthorizationView(BaseAuthorizationView, FormView):
"""
Expand Down Expand Up @@ -204,13 +207,42 @@ def get(self, request, *args, **kwargs):
request=self.request, scopes=" ".join(scopes),
credentials=credentials, allow=True
)
return self.redirect(uri, application)
return self.redirect(uri, application, token)

except OAuthToolkitError as error:
return self.error_response(error, application)

return self.render_to_response(self.get_context_data(**kwargs))

def redirect(self, redirect_to, application,
token = None):

if not redirect_to.startswith("urn:ietf:wg:oauth:2.0:oob"):
return super().redirect(redirect_to, application)

parsed_redirect = urllib.parse.urlparse(redirect_to)
code = urllib.parse.parse_qs(parsed_redirect.query)['code'][0]

if redirect_to.startswith('urn:ietf:wg:oauth:2.0:oob:auto'):

response = {
'access_token': code,
'token_uri': redirect_to,
'client_id': application.client_id,
'client_secret': application.client_secret,
'revoke_uri': reverse('oauth2_provider:revoke-token'),
}

return JsonResponse(response)

else:
return render(
request=self.request,
template_name="oauth2_provider/authorized-oob.html",
context={
'code': code,
},
)

@method_decorator(csrf_exempt, name="dispatch")
class TokenView(OAuthLibMixin, View):
Expand Down
92 changes: 92 additions & 0 deletions tests/test_authorization_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime
import hashlib
import json
import re
from urllib.parse import parse_qs, urlencode, urlparse

from django.contrib.auth import get_user_model
Expand All @@ -27,6 +28,8 @@
RefreshToken = get_refresh_token_model()
UserModel = get_user_model()

URI_OOB = "urn:ietf:wg:oauth:2.0:oob"
URI_OOB_AUTO = "urn:ietf:wg:oauth:2.0:oob:auto"

# mocking a protected resource view
class ResourceView(ProtectedResourceView):
Expand All @@ -46,6 +49,7 @@ def setUp(self):
name="Test Application",
redirect_uris=(
"http://localhost http://example.com http://example.org custom-scheme://example.com"
" " + URI_OOB + " " + URI_OOB_AUTO
),
user=self.dev_user,
client_type=Application.CLIENT_CONFIDENTIAL,
Expand Down Expand Up @@ -1456,6 +1460,94 @@ def test_code_exchange_succeed_when_redirect_uri_match_with_multiple_query_param
self.assertEqual(content["scope"], "read write")
self.assertEqual(content["expires_in"], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS)

def test_oob_as_html(self):
"""
Test out-of-band authentication.
"""
self.client.login(username="test_user", password="123456")

authcode_data = {
"client_id": self.application.client_id,
"state": "random_state_string",
"scope": "read write",
"redirect_uri": URI_OOB,
"response_type": "code",
"allow": True,
}

response = self.client.post(reverse("oauth2_provider:authorize"), data=authcode_data)
self.assertEqual(response.status_code, 200)
self.assertRegex(response['Content-Type'], r'^text/html')

content = response.content.decode("utf-8")

# "A lot of applications, for legacy reasons, use this and regex
# to extract the token, risking summoning zalgo in the process."
# -- https://github.com/jazzband/django-oauth-toolkit/issues/235

matches = re.search(r'.*<code>([^<>]*)</code>',
content)
self.assertIsNotNone(matches,
msg="OOB response contains code inside <code> tag")
self.assertEqual(len(matches.groups()), 1,
msg="OOB response contains multiple <code> tags")
authorization_code = matches.groups()[0]

token_request_data = {
"grant_type": "authorization_code",
"code": authorization_code,
"redirect_uri": URI_OOB,
"client_id": self.application.client_id,
"client_secret": self.application.client_secret,
}

response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data)
self.assertEqual(response.status_code, 200)

content = json.loads(response.content.decode("utf-8"))
self.assertEqual(content["token_type"], "Bearer")
self.assertEqual(content["scope"], "read write")
self.assertEqual(content["expires_in"], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS)

def test_oob_as_json(self):
"""
Test out-of-band authentication, with a JSON response.
"""
self.client.login(username="test_user", password="123456")

authcode_data = {
"client_id": self.application.client_id,
"state": "random_state_string",
"scope": "read write",
"redirect_uri": URI_OOB_AUTO,
"response_type": "code",
"allow": True,
}

response = self.client.post(reverse("oauth2_provider:authorize"), data=authcode_data)
self.assertEqual(response.status_code, 200)
self.assertRegex(response['Content-Type'], '^application/json')

parsed_response = json.loads(response.content.decode("utf-8"))

self.assertIn('access_token', parsed_response)
authorization_code = parsed_response['access_token']

token_request_data = {
"grant_type": "authorization_code",
"code": authorization_code,
"redirect_uri": URI_OOB_AUTO,
"client_id": self.application.client_id,
"client_secret": self.application.client_secret,
}

response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data)
self.assertEqual(response.status_code, 200)

content = json.loads(response.content.decode("utf-8"))
self.assertEqual(content["token_type"], "Bearer")
self.assertEqual(content["scope"], "read write")
self.assertEqual(content["expires_in"], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS)

class TestAuthorizationCodeProtectedResource(BaseTest):
def test_resource_access_allowed(self):
Expand Down