diff --git a/oauth2_provider/templates/oauth2_provider/authorized-oob.html b/oauth2_provider/templates/oauth2_provider/authorized-oob.html new file mode 100644 index 000000000..78399da7c --- /dev/null +++ b/oauth2_provider/templates/oauth2_provider/authorized-oob.html @@ -0,0 +1,23 @@ +{% extends "oauth2_provider/base.html" %} + +{% load i18n %} + +{% block title %} +Success code={{code}} +{% endblock %} + +{% block content %} +
+ {% if not error %} +

{% trans "Success" %}

+ +

{% trans "Please return to your application and enter this code:" %}

+ +

{{ code }}

+ + {% else %} +

Error: {{ error.error }}

+

{{ error.description }}

+ {% endif %} +
+{% endblock %} diff --git a/oauth2_provider/views/base.py b/oauth2_provider/views/base.py index e236f9064..41c2a6c67 100644 --- a/oauth2_provider/views/base.py +++ b/oauth2_provider/views/base.py @@ -1,13 +1,16 @@ import json import logging +import urllib.parse from django.contrib.auth.mixins import LoginRequiredMixin -from django.http import HttpResponse +from django.http import HttpResponse, JsonResponse from django.utils import timezone from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from django.views.decorators.debug import sensitive_post_parameters from django.views.generic import FormView, View +from django.shortcuts import render +from django.urls import reverse from ..exceptions import OAuthToolkitError from ..forms import AllowForm @@ -18,7 +21,6 @@ from ..signals import app_authorized from .mixins import OAuthLibMixin - log = logging.getLogger("oauth2_provider") @@ -59,6 +61,7 @@ def redirect(self, redirect_to, application): allowed_schemes = application.get_allowed_schemes() return OAuth2ResponseRedirect(redirect_to, allowed_schemes) +RFC3339 = '%Y-%m-%dT%H:%M:%SZ' class AuthorizationView(BaseAuthorizationView, FormView): """ @@ -204,13 +207,42 @@ def get(self, request, *args, **kwargs): request=self.request, scopes=" ".join(scopes), credentials=credentials, allow=True ) - return self.redirect(uri, application) + return self.redirect(uri, application, token) except OAuthToolkitError as error: return self.error_response(error, application) return self.render_to_response(self.get_context_data(**kwargs)) + def redirect(self, redirect_to, application, + token = None): + + if not redirect_to.startswith("urn:ietf:wg:oauth:2.0:oob"): + return super().redirect(redirect_to, application) + + parsed_redirect = urllib.parse.urlparse(redirect_to) + code = urllib.parse.parse_qs(parsed_redirect.query)['code'][0] + + if redirect_to.startswith('urn:ietf:wg:oauth:2.0:oob:auto'): + + response = { + 'access_token': code, + 'token_uri': redirect_to, + 'client_id': application.client_id, + 'client_secret': application.client_secret, + 'revoke_uri': reverse('oauth2_provider:revoke-token'), + } + + return JsonResponse(response) + + else: + return render( + request=self.request, + template_name="oauth2_provider/authorized-oob.html", + context={ + 'code': code, + }, + ) @method_decorator(csrf_exempt, name="dispatch") class TokenView(OAuthLibMixin, View): diff --git a/tests/test_authorization_code.py b/tests/test_authorization_code.py index 45116dad6..69dcfd93a 100644 --- a/tests/test_authorization_code.py +++ b/tests/test_authorization_code.py @@ -2,6 +2,7 @@ import datetime import hashlib import json +import re from urllib.parse import parse_qs, urlencode, urlparse from django.contrib.auth import get_user_model @@ -27,6 +28,8 @@ RefreshToken = get_refresh_token_model() UserModel = get_user_model() +URI_OOB = "urn:ietf:wg:oauth:2.0:oob" +URI_OOB_AUTO = "urn:ietf:wg:oauth:2.0:oob:auto" # mocking a protected resource view class ResourceView(ProtectedResourceView): @@ -46,6 +49,7 @@ def setUp(self): name="Test Application", redirect_uris=( "http://localhost http://example.com http://example.org custom-scheme://example.com" + " " + URI_OOB + " " + URI_OOB_AUTO ), user=self.dev_user, client_type=Application.CLIENT_CONFIDENTIAL, @@ -1456,6 +1460,94 @@ def test_code_exchange_succeed_when_redirect_uri_match_with_multiple_query_param self.assertEqual(content["scope"], "read write") self.assertEqual(content["expires_in"], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS) + def test_oob_as_html(self): + """ + Test out-of-band authentication. + """ + self.client.login(username="test_user", password="123456") + + authcode_data = { + "client_id": self.application.client_id, + "state": "random_state_string", + "scope": "read write", + "redirect_uri": URI_OOB, + "response_type": "code", + "allow": True, + } + + response = self.client.post(reverse("oauth2_provider:authorize"), data=authcode_data) + self.assertEqual(response.status_code, 200) + self.assertRegex(response['Content-Type'], r'^text/html') + + content = response.content.decode("utf-8") + + # "A lot of applications, for legacy reasons, use this and regex + # to extract the token, risking summoning zalgo in the process." + # -- https://github.com/jazzband/django-oauth-toolkit/issues/235 + + matches = re.search(r'.*([^<>]*)', + content) + self.assertIsNotNone(matches, + msg="OOB response contains code inside tag") + self.assertEqual(len(matches.groups()), 1, + msg="OOB response contains multiple tags") + authorization_code = matches.groups()[0] + + token_request_data = { + "grant_type": "authorization_code", + "code": authorization_code, + "redirect_uri": URI_OOB, + "client_id": self.application.client_id, + "client_secret": self.application.client_secret, + } + + response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data) + self.assertEqual(response.status_code, 200) + + content = json.loads(response.content.decode("utf-8")) + self.assertEqual(content["token_type"], "Bearer") + self.assertEqual(content["scope"], "read write") + self.assertEqual(content["expires_in"], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS) + + def test_oob_as_json(self): + """ + Test out-of-band authentication, with a JSON response. + """ + self.client.login(username="test_user", password="123456") + + authcode_data = { + "client_id": self.application.client_id, + "state": "random_state_string", + "scope": "read write", + "redirect_uri": URI_OOB_AUTO, + "response_type": "code", + "allow": True, + } + + response = self.client.post(reverse("oauth2_provider:authorize"), data=authcode_data) + self.assertEqual(response.status_code, 200) + self.assertRegex(response['Content-Type'], '^application/json') + + parsed_response = json.loads(response.content.decode("utf-8")) + + self.assertIn('access_token', parsed_response) + authorization_code = parsed_response['access_token'] + + token_request_data = { + "grant_type": "authorization_code", + "code": authorization_code, + "redirect_uri": URI_OOB_AUTO, + "client_id": self.application.client_id, + "client_secret": self.application.client_secret, + } + + response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data) + self.assertEqual(response.status_code, 200) + + content = json.loads(response.content.decode("utf-8")) + self.assertEqual(content["token_type"], "Bearer") + self.assertEqual(content["scope"], "read write") + self.assertEqual(content["expires_in"], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS) class TestAuthorizationCodeProtectedResource(BaseTest): def test_resource_access_allowed(self):