From 371923b40054d05ba42486be22ed23839161d97f Mon Sep 17 00:00:00 2001 From: Tomas Pazderka Date: Sun, 3 Mar 2019 22:33:41 +0100 Subject: [PATCH] fixup! Synced implementation of token_endpoint --- src/oic/oauth2/provider.py | 5 +- src/oic/oic/provider.py | 19 ++- tests/test_oauth2_provider.py | 256 ++++++++++++++++++++++++++++++++ tests/test_oic_provider.py | 272 ++++++++++++---------------------- 4 files changed, 364 insertions(+), 188 deletions(-) diff --git a/src/oic/oauth2/provider.py b/src/oic/oauth2/provider.py index 3c26ffdec..ba11f5577 100644 --- a/src/oic/oauth2/provider.py +++ b/src/oic/oauth2/provider.py @@ -769,18 +769,19 @@ def token_scope_check(self, areq, info): """Not implemented here.""" return None - def token_endpoint(self, request='', authn='', dtype='urlencoded', **kwargs): + def token_endpoint(self, request='', authn='', dtype='urlencoded', atr_class=AccessTokenRequest, **kwargs): """ Provide clients with access tokens. :param authn: Auhentication info, comes from HTTP header. :param request: The request. :param dtype: deserialization method for the request. + :param atr_class: Class used for AcessTokenRequest deserialization """ logger.debug("- token -") logger.debug("token_request: %s" % sanitize(request)) - areq = AccessTokenRequest().deserialize(request, dtype) + areq = atr_class().deserialize(request, dtype) # Verify client authentication try: diff --git a/src/oic/oic/provider.py b/src/oic/oic/provider.py index 7bdde0c25..fb6aa32f2 100644 --- a/src/oic/oic/provider.py +++ b/src/oic/oic/provider.py @@ -54,6 +54,7 @@ from oic.oic import claims_match from oic.oic import scope2claims from oic.oic.message import SCOPE2CLAIMS +from oic.oic.message import AccessTokenRequest from oic.oic.message import AccessTokenResponse from oic.oic.message import AuthorizationRequest from oic.oic.message import AuthorizationResponse @@ -83,6 +84,7 @@ from oic.utils.sdb import AccessCodeUsed from oic.utils.sdb import AuthnEvent from oic.utils.sdb import ExpiredToken +from oic.utils.sdb import WrongTokenType from oic.utils.template_render import render_template from oic.utils.time_util import utc_time_sans_frac @@ -958,6 +960,10 @@ def sign_encrypt_id_token(self, sinfo, client_info, areq, code=None, return id_token + def token_endpoint(self, request='', authn='', dtype='urlencoded', atr_class=AccessTokenRequest, **kwargs): + """Overloaded to use the oic.message.""" + super().token_endpoint(request=request, authn=authn, dtype=dtype, atr_class=atr_class, **kwargs) + def code_grant_type(self, areq): """ Token authorization using Code Grant. @@ -984,13 +990,10 @@ def code_grant_type(self, areq): except KeyError: return error_response("invalid_request", descr="Code is invalid") - # If redirect_uri was in the initial authorization request - # verify that the one given here is the correct one. - if "redirect_uri" in _info: - if 'redirect_uri' not in areq: - return error_response('invalid_request', descr='Missing redirect_uri') - if areq["redirect_uri"] != _info["redirect_uri"]: - return error_response("invalid_request", descr="redirect_uri mismatch") + # If redirect_uri was in the initial authorization request verify that it is here as well + # Mismatch would raise in oic.oauth2.provider.Provider.token_endpoint + if "redirect_uri" in _info and 'redirect_uri' not in areq: + return error_response('invalid_request', descr='Missing redirect_uri') _log_debug("All checks OK") @@ -1045,6 +1048,8 @@ def refresh_token_grant_type(self, areq): _info = _sdb.refresh_token(rtoken, client_id=client_id) except ExpiredToken: return error_response("invalid_request", descr="Refresh token is expired") + except WrongTokenType: + return error_response("invalid_request", descr="Not a refresh token") if "openid" in _info["scope"] and "authn_event" in _info: userinfo = self.userinfo_in_id_token_claims(_info) diff --git a/tests/test_oauth2_provider.py b/tests/test_oauth2_provider.py index 1d2b9cfb9..a31167f31 100644 --- a/tests/test_oauth2_provider.py +++ b/tests/test_oauth2_provider.py @@ -7,11 +7,15 @@ import pytest from testfixtures import LogCapture +from oic.exception import UnSupported from oic.oauth2.consumer import Consumer from oic.oauth2.message import AccessTokenRequest from oic.oauth2.message import AccessTokenResponse from oic.oauth2.message import AuthorizationRequest from oic.oauth2.message import AuthorizationResponse +from oic.oauth2.message import CCAccessTokenRequest +from oic.oauth2.message import Message +from oic.oauth2.message import ROPCAccessTokenRequest from oic.oauth2.message import TokenErrorResponse from oic.oauth2.provider import Provider from oic.utils.authn.authn_context import AuthnBroker @@ -19,6 +23,7 @@ from oic.utils.authn.user import UserAuthnMethod from oic.utils.authz import Implicit from oic.utils.http_util import Response +from oic.utils.sdb import AuthnEvent CLIENT_CONFIG = { "client_id": "client1", @@ -53,6 +58,12 @@ "redirect_uris": [("http://localhost:8087/authz", None)], 'token_endpoint_auth_method': 'client_secret_post', 'response_types': ['code', 'token'] + }, + "client2": { + "client_secret": "verysecret", + "redirect_uris": [("http://localhost:8087/authz", None)], + 'token_endpoint_auth_method': 'client_secret_basic', + 'response_types': ['code', 'token'] } } @@ -340,6 +351,251 @@ def test_token_endpoint_unauth(self): atr = TokenErrorResponse().deserialize(resp.message, "json") assert _eq(atr.keys(), ['error_description', 'error']) + def test_token_endpoint_malformed_code(self): + authreq = AuthorizationRequest(state="state", + redirect_uri="http://example.com/authz", + client_id='client1', + response_type="code", + scope=["openid"]) + + _sdb = self.provider.sdb + sid = _sdb.access_token.key(user="sub", areq=authreq) + access_grant = _sdb.access_token(sid=sid) + _sdb[sid] = { + "oauth_state": "authz", + "authn_event": '', + "authzreq": '', + "client_id": 'client1', + "code": access_grant, + "code_used": False, + "scope": ["openid"], + "redirect_uri": "http://example.com/authz", + } + + # Construct Access token request + areq = AccessTokenRequest(code=access_grant[0:len(access_grant) - 1], + client_id='client1', + redirect_uri="http://example.com/authz", + client_secret='hemlighet', + grant_type='authorization_code') + + txt = areq.to_urlencoded() + + resp = self.provider.token_endpoint(request=txt) + atr = TokenErrorResponse().deserialize(resp.message, "json") + assert atr['error'] == "unauthorized_client" + + def test_token_endpoint_bad_redirect_uri(self): + authreq = AuthorizationRequest(state="state", + redirect_uri="http://example.com/authz", + client_id='client1', + response_type="code", + scope=["openid"]) + + _sdb = self.provider.sdb + sid = _sdb.access_token.key(user="sub", areq=authreq) + access_grant = _sdb.access_token(sid=sid) + _sdb[sid] = { + "oauth_state": "authz", + "authn_event": '', + "authzreq": '', + "client_id": 'client1', + "code": access_grant, + "code_used": False, + "scope": ["openid"], + "redirect_uri": "http://example.com/authz", + } + + # Construct Access token request + areq = AccessTokenRequest(code=access_grant, + client_id='client1', + redirect_uri="http://example.com/authz2", + client_secret='hemlighet', + grant_type='authorization_code') + + txt = areq.to_urlencoded() + + resp = self.provider.token_endpoint(request=txt) + atr = TokenErrorResponse().deserialize(resp.message, "json") + assert atr['error'] == "unauthorized_client" + + def test_token_endpoint_ok_state(self): + authreq = AuthorizationRequest(state="state", + redirect_uri="http://example.com/authz", + client_id='client1', + response_type="code", + scope=["openid"]) + + _sdb = self.provider.sdb + sid = _sdb.access_token.key(user="sub", areq=authreq) + access_grant = _sdb.access_token(sid=sid) + ae = AuthnEvent("user", "salt") + _sdb[sid] = { + "oauth_state": "authz", + "authn_event": ae.to_json(), + "authzreq": '', + "client_id": 'client1', + "code": access_grant, + 'state': 'state', + "code_used": False, + "scope": ["openid"], + "redirect_uri": "http://example.com/authz", + } + _sdb.do_sub(sid, "client_salt") + + # Construct Access token request + areq = AccessTokenRequest(code=access_grant, + client_id='client1', + redirect_uri="http://example.com/authz", + client_secret='hemlighet', + grant_type='authorization_code', + state='state') + + txt = areq.to_urlencoded() + + resp = self.provider.token_endpoint(request=txt) + atr = AccessTokenResponse().deserialize(resp.message, "json") + assert atr['token_type'] == "Bearer" + + def test_token_endpoint_bad_state(self): + authreq = AuthorizationRequest(state="state", + redirect_uri="http://example.com/authz", + client_id='client1', + response_type="code", + scope=["openid"]) + + _sdb = self.provider.sdb + sid = _sdb.access_token.key(user="sub", areq=authreq) + access_grant = _sdb.access_token(sid=sid) + _sdb[sid] = { + "oauth_state": "authz", + "authn_event": '', + "authzreq": '', + "client_id": 'client1', + "code": access_grant, + 'state': 'state', + "code_used": False, + "scope": ["openid"], + "redirect_uri": "http://example.com/authz", + } + + # Construct Access token request + areq = AccessTokenRequest(code=access_grant, + client_id='client1', + redirect_uri="http://example.com/authz", + client_secret='hemlighet', + grant_type='authorization_code', + state='other_state') + + txt = areq.to_urlencoded() + + resp = self.provider.token_endpoint(request=txt) + atr = TokenErrorResponse().deserialize(resp.message, "json") + assert atr['error'] == "unauthorized_client" + + def test_token_endpoint_client_credentials(self): + authreq = AuthorizationRequest(state="state", + redirect_uri="http://example.com/authz", + client_id="client1") + + _sdb = self.provider.sdb + sid = _sdb.access_token.key(user="sub", areq=authreq) + access_grant = _sdb.access_token(sid=sid) + _sdb[sid] = { + "oauth_state": "authz", + "sub": "sub", + "authzreq": "", + "client_id": "client1", + "code": access_grant, + "code_used": False, + "redirect_uri": "http://example.com/authz", + 'token_endpoint_auth_method': 'client_secret_basic', + } + areq = CCAccessTokenRequest(grant_type='client_credentials') + authn = 'Basic Y2xpZW50Mjp2ZXJ5c2VjcmV0=' + with pytest.raises(NotImplementedError): + self.provider.token_endpoint(request=areq.to_urlencoded(), authn=authn) + + def test_token_endpoint_password(self): + authreq = AuthorizationRequest(state="state", + redirect_uri="http://example.com/authz", + client_id="client1") + + _sdb = self.provider.sdb + sid = _sdb.access_token.key(user="sub", areq=authreq) + access_grant = _sdb.access_token(sid=sid) + _sdb[sid] = { + "oauth_state": "authz", + "sub": "sub", + "authzreq": "", + "client_id": "client1", + "code": access_grant, + "code_used": False, + "redirect_uri": "http://example.com/authz", + 'token_endpoint_auth_method': 'client_secret_basic', + } + areq = ROPCAccessTokenRequest(grant_type='password', username='client1', password='password') + authn = 'Basic Y2xpZW50Mjp2ZXJ5c2VjcmV0=' + with pytest.raises(NotImplementedError): + self.provider.token_endpoint(request=areq.to_urlencoded(), authn=authn) + + def test_token_endpoint_other(self): + authreq = AuthorizationRequest(state="state", + redirect_uri="http://example.com/authz", + client_id="client1") + + _sdb = self.provider.sdb + sid = _sdb.access_token.key(user="sub", areq=authreq) + access_grant = _sdb.access_token(sid=sid) + _sdb[sid] = { + "oauth_state": "authz", + "sub": "sub", + "authzreq": "", + "client_id": "client1", + "code": access_grant, + "code_used": False, + "redirect_uri": "http://example.com/authz", + 'token_endpoint_auth_method': 'client_secret_basic', + } + areq = Message(grant_type='some_other') + authn = 'Basic Y2xpZW50Mjp2ZXJ5c2VjcmV0=' + with pytest.raises(UnSupported): + self.provider.token_endpoint(request=areq.to_urlencoded(), authn=authn) + + def test_code_grant_type_used(self): + authreq = AuthorizationRequest(state="state", + redirect_uri="http://example.com/authz", + client_id='client1', + response_type="code", + scope=["openid"]) + + _sdb = self.provider.sdb + sid = _sdb.access_token.key(user="sub", areq=authreq) + access_grant = _sdb.access_token(sid=sid) + _sdb[sid] = { + "oauth_state": "authz", + "authn_event": '', + "authzreq": '', + "client_id": 'client1', + "code": access_grant, + "code_used": True, + "scope": ["openid"], + "redirect_uri": "http://example.com/authz", + } + + # Construct Access token request + areq = AccessTokenRequest(code=access_grant, + client_id='client1', + redirect_uri="http://example.com/authz", + client_secret='hemlighet', + grant_type='authorization_code') + + txt = areq.to_urlencoded() + + resp = self.provider.token_endpoint(request=txt) + atr = TokenErrorResponse().deserialize(resp.message, "json") + assert atr['error'] == "invalid_grant" + @pytest.mark.parametrize("response_types", [ ['token id_token', 'id_token'], ['id_token token'] diff --git a/tests/test_oic_provider.py b/tests/test_oic_provider.py index 4219d68ed..4d996e2aa 100644 --- a/tests/test_oic_provider.py +++ b/tests/test_oic_provider.py @@ -33,6 +33,7 @@ from oic.oic.message import AuthorizationResponse from oic.oic.message import CheckSessionRequest from oic.oic.message import IdToken +from oic.oic.message import Message from oic.oic.message import OpenIDSchema from oic.oic.message import RefreshAccessTokenRequest from oic.oic.message import RegistrationRequest @@ -43,7 +44,6 @@ from oic.oic.provider import InvalidSectorIdentifier from oic.oic.provider import Provider from oic.utils.authn.authn_context import AuthnBroker -from oic.utils.authn.client import ClientSecretBasic from oic.utils.authn.client import verify_client from oic.utils.authn.user import UserAuthnMethod from oic.utils.authz import AuthzHandling @@ -418,7 +418,7 @@ def test_authenticated_none(self): parsed.path) == "http://localhost:8087/authz" assert "state" in parse_qs(parsed.query) - def test_token_endpoint(self): + def test_code_grant_type_ok(self): authreq = AuthorizationRequest(state="state", redirect_uri="http://example.com/authz", client_id=CLIENT_ID, @@ -439,22 +439,29 @@ def test_token_endpoint(self): "scope": ["openid"], "redirect_uri": "http://example.com/authz", } - _sdb.do_sub(sid, "client_salt") + _sdb.do_sub(sid, 'client_salt') # Construct Access token request areq = AccessTokenRequest(code=access_grant, client_id=CLIENT_ID, - redirect_uri="http://example.com/authz", + redirect_uri='http://example.com/authz', client_secret=CLIENT_SECRET, grant_type='authorization_code') + resp = self.provider.code_grant_type(areq) + atr = AccessTokenResponse().deserialize(resp.message, 'json') + assert _eq(atr.keys(), ['token_type', 'id_token', 'access_token', 'scope']) - txt = areq.to_urlencoded() - - resp = self.provider.token_endpoint(request=txt) - atr = AccessTokenResponse().deserialize(resp.message, "json") - assert _eq(atr.keys(), - ['token_type', 'id_token', 'access_token', 'scope']) + def test_code_grant_type_missing_code(self): + # Construct Access token request + areq = AccessTokenRequest(client_id=CLIENT_ID, + redirect_uri='http://example.com/authz', + client_secret=CLIENT_SECRET, + grant_type='authorization_code') + resp = self.provider.code_grant_type(areq) + atr = TokenErrorResponse().deserialize(resp.message, 'json') + assert atr['error'] == 'invalid_request' + assert atr['error_description'] == 'Missing code' - def test_token_endpoint_no_cache(self): + def test_code_grant_type_revoked(self): authreq = AuthorizationRequest(state="state", redirect_uri="http://example.com/authz", client_id=CLIENT_ID, @@ -471,31 +478,39 @@ def test_token_endpoint_no_cache(self): "authzreq": authreq.to_json(), "client_id": CLIENT_ID, "code": access_grant, - "code_used": False, + "revoked": True, "scope": ["openid"], "redirect_uri": "http://example.com/authz", } - _sdb.do_sub(sid, "client_salt") + _sdb.do_sub(sid, 'client_salt') # Construct Access token request areq = AccessTokenRequest(code=access_grant, client_id=CLIENT_ID, - redirect_uri="http://example.com/authz", + redirect_uri='http://example.com/authz', client_secret=CLIENT_SECRET, grant_type='authorization_code') + resp = self.provider.code_grant_type(areq) + atr = TokenErrorResponse().deserialize(resp.message, 'json') + assert atr['error'] == 'invalid_request' + assert atr['error_description'] == 'Token is revoked' - txt = areq.to_urlencoded() - - resp = self.provider.token_endpoint(request=txt) - assert resp.headers == [('Pragma', 'no-cache'), ('Cache-Control', 'no-store'), - ('Content-type', 'application/json')] + def test_code_grant_type_no_session(self): + # Construct Access token request + areq = AccessTokenRequest(code='some grant', client_id=CLIENT_ID, + redirect_uri='http://example.com/authz', + client_secret=CLIENT_SECRET, + grant_type='authorization_code') + resp = self.provider.code_grant_type(areq) + atr = TokenErrorResponse().deserialize(resp.message, 'json') + assert atr['error'] == 'invalid_request' + assert atr['error_description'] == 'Code is invalid' - def test_token_endpoint_refresh(self): + def test_code_grant_type_missing_redirect_uri(self): authreq = AuthorizationRequest(state="state", redirect_uri="http://example.com/authz", client_id=CLIENT_ID, response_type="code", - scope=["openid offline_access"], - prompt="consent") + scope=["openid"]) _sdb = self.provider.sdb sid = _sdb.access_token.key(user="sub", areq=authreq) @@ -508,26 +523,21 @@ def test_token_endpoint_refresh(self): "client_id": CLIENT_ID, "code": access_grant, "code_used": False, - "scope": ["openid", "offline_access"], + "scope": ["openid"], "redirect_uri": "http://example.com/authz", } - _sdb.do_sub(sid, "client_salt") + _sdb.do_sub(sid, 'client_salt') # Construct Access token request areq = AccessTokenRequest(code=access_grant, client_id=CLIENT_ID, - redirect_uri="http://example.com/authz", client_secret=CLIENT_SECRET, grant_type='authorization_code') + resp = self.provider.code_grant_type(areq) + atr = TokenErrorResponse().deserialize(resp.message, 'json') + assert atr['error'] == 'invalid_request' + assert atr['error_description'] == 'Missing redirect_uri' - txt = areq.to_urlencoded() - - resp = self.provider.token_endpoint(request=txt) - atr = AccessTokenResponse().deserialize(resp.message, "json") - assert _eq(atr.keys(), - ['token_type', 'id_token', 'access_token', 'scope', - 'refresh_token']) - - def test_token_endpoint_malformed(self): + def test_code_grant_type_used(self): authreq = AuthorizationRequest(state="state", redirect_uri="http://example.com/authz", client_id=CLIENT_ID, @@ -544,31 +554,29 @@ def test_token_endpoint_malformed(self): "authzreq": authreq.to_json(), "client_id": CLIENT_ID, "code": access_grant, - "code_used": False, + "code_used": True, "scope": ["openid"], "redirect_uri": "http://example.com/authz", } - _sdb.do_sub(sid, "client_salt") + _sdb.do_sub(sid, 'client_salt') # Construct Access token request - areq = AccessTokenRequest(code=access_grant[0:len(access_grant) - 1], - client_id=CLIENT_ID, - redirect_uri="http://example.com/authz", + areq = AccessTokenRequest(code=access_grant, client_id=CLIENT_ID, + redirect_uri='http://example.com/authz', client_secret=CLIENT_SECRET, grant_type='authorization_code') + resp = self.provider.code_grant_type(areq) + atr = TokenErrorResponse().deserialize(resp.message, 'json') + assert atr['error'] == 'access_denied' + assert atr['error_description'] == 'Access Code already used' - txt = areq.to_urlencoded() - - resp = self.provider.token_endpoint(request=txt) - atr = TokenErrorResponse().deserialize(resp.message, "json") - assert atr['error'] == "unauthorized_client" - - def test_token_endpoint_bad_code(self): + def test_code_grant_type_refresh(self): authreq = AuthorizationRequest(state="state", redirect_uri="http://example.com/authz", client_id=CLIENT_ID, response_type="code", - scope=["openid"]) + scope=["openid offline_access"], + prompt="consent") _sdb = self.provider.sdb sid = _sdb.access_token.key(user="sub", areq=authreq) @@ -581,94 +589,31 @@ def test_token_endpoint_bad_code(self): "client_id": CLIENT_ID, "code": access_grant, "code_used": False, - "scope": ["openid"], - "state": "state", + "scope": ["openid", "offline_access"], "redirect_uri": "http://example.com/authz", } _sdb.do_sub(sid, "client_salt") # Construct Access token request - areq = AccessTokenRequest(code='bad_code', - client_id=CLIENT_ID, + areq = AccessTokenRequest(code=access_grant, client_id=CLIENT_ID, redirect_uri="http://example.com/authz", client_secret=CLIENT_SECRET, - grant_type='authorization_code', - state="state") - - txt = areq.to_urlencoded() - - resp = self.provider.token_endpoint(request=txt) - atr = TokenErrorResponse().deserialize(resp.message, "json") - assert atr['error'] == "unauthorized_client" - - def test_token_endpoint_unauth(self): - state = 'state' - authreq = AuthorizationRequest(state=state, - redirect_uri="http://example.com/authz", - client_id="client_1") - - _sdb = self.provider.sdb - sid = _sdb.access_token.key(user="sub", areq=authreq) - access_grant = _sdb.access_token(sid=sid) - ae = AuthnEvent("user", "salt") - _sdb[sid] = { - "authn_event": ae.to_json(), - "oauth_state": "authz", - "authzreq": "", - "client_id": "client_1", - "code": access_grant, - "code_used": False, - "scope": ["openid"], - "redirect_uri": "http://example.com/authz", - 'state': state - } - _sdb.do_sub(sid, "client_salt") - - # Construct Access token request - areq = AccessTokenRequest(code=access_grant, - redirect_uri="http://example.com/authz", - client_id="client_1", - client_secret="secret", - state=state, grant_type='authorization_code') + resp = self.provider.code_grant_type(areq) + atr = AccessTokenResponse().deserialize(resp.message, "json") + assert _eq(atr.keys(), ['token_type', 'id_token', 'access_token', 'scope', 'refresh_token']) - txt = areq.to_urlencoded() - - resp = self.provider.token_endpoint(request=txt, remote_user="client2", - request_method="POST") - atr = TokenErrorResponse().deserialize(resp.message, "json") - assert atr["error"] == "unauthorized_client" - - def test_token_endpoint_auth(self): - state, location = self.cons.begin("openid", "code", - path="http://localhost:8087") - - resp = self.provider.authorization_endpoint( - request=urlparse(location).query) - - self.cons.parse_response(AuthorizationResponse, resp.message, - sformat="urlencoded") - - # Construct Access token request - areq = self.cons.construct_AccessTokenRequest( - redirect_uri="http://example.com/authz", - client_id="client_1", - client_secret='abcdefghijklmnop', - state=state) - - txt = areq.to_urlencoded() - self.cons.client_secret = 'drickyoughurt' - - csb = ClientSecretBasic(self.cons) - http_args = csb.construct(areq) - - resp = self.provider.token_endpoint(request=txt, remote_user="client2", - request_method="POST", - authn=http_args['headers'][ - 'Authorization']) + def test_client_credentials_grant_type(self): + resp = self.provider.client_credentials_grant_type(Message()) + parsed = ErrorResponse().from_json(resp.message) + assert parsed['error'] == 'invalid_request' + assert parsed['error_description'] == 'Unsupported grant_type' - atr = TokenErrorResponse().deserialize(resp.message, "json") - assert atr["token_type"] == 'Bearer' + def test_password_grant_type(self): + resp = self.provider.password_grant_type(Message()) + parsed = ErrorResponse().from_json(resp.message) + assert parsed['error'] == 'invalid_request' + assert parsed['error_description'] == 'Unsupported grant_type' def test_authz_endpoint(self): _state, location = self.cons.begin("openid", @@ -1662,7 +1607,7 @@ def test_id_token_RS512_sign(self): id_token = self._auth_with_id_token() assert id_token.jws_header['alg'] == "RS512" - def test_refresh_access_token_request(self): + def test_refresh_token_grant_type_ok(self): authreq = AuthorizationRequest(state="state", redirect_uri="http://example.com/authz", client_id=CLIENT_ID, @@ -1685,74 +1630,43 @@ def test_refresh_access_token_request(self): "redirect_uri": "http://example.com/authz", } _sdb.do_sub(sid, "client_salt") + info = _sdb.upgrade_to_token(access_grant, issue_refresh=True) - # Construct Access token request - areq = AccessTokenRequest(code=access_grant, client_id=CLIENT_ID, - redirect_uri="http://example.com/authz", - client_secret=CLIENT_SECRET, - grant_type='authorization_code') - - txt = areq.to_urlencoded() + rareq = RefreshAccessTokenRequest(grant_type="refresh_token", + refresh_token=info['refresh_token'], + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + scope=['openid']) - resp = self.provider.token_endpoint(request=txt) + resp = self.provider.refresh_token_grant_type(rareq) atr = AccessTokenResponse().deserialize(resp.message, "json") + assert atr['refresh_token'] is not None + assert atr['token_type'] == 'Bearer' + def test_refresh_token_grant_type_wrong_token(self): rareq = RefreshAccessTokenRequest(grant_type="refresh_token", - refresh_token=atr['refresh_token'], + refresh_token='some_other_refresh_token', client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scope=['openid']) - resp = self.provider.token_endpoint(request=rareq.to_urlencoded()) - atr2 = AccessTokenResponse().deserialize(resp.message, "json") - assert atr2['access_token'] != atr['access_token'] - assert atr2['refresh_token'] == atr['refresh_token'] - assert atr2['token_type'] == 'Bearer' - - def test_refresh_access_token_no_cache(self): - authreq = AuthorizationRequest(state="state", - redirect_uri="http://example.com/authz", - client_id=CLIENT_ID, - response_type="code", - scope=["openid", 'offline_access'], - prompt='consent') - - _sdb = self.provider.sdb - sid = _sdb.access_token.key(user="sub", areq=authreq) - access_grant = _sdb.access_token(sid=sid) - ae = AuthnEvent("user", "salt") - _sdb[sid] = { - "oauth_state": "authz", - "authn_event": ae.to_json(), - "authzreq": authreq.to_json(), - "client_id": CLIENT_ID, - "code": access_grant, - "code_used": False, - "scope": ["openid", 'offline_access'], - "redirect_uri": "http://example.com/authz", - } - _sdb.do_sub(sid, "client_salt") - - # Construct Access token request - areq = AccessTokenRequest(code=access_grant, client_id=CLIENT_ID, - redirect_uri="http://example.com/authz", - client_secret=CLIENT_SECRET, - grant_type='authorization_code') - - txt = areq.to_urlencoded() - - resp = self.provider.token_endpoint(request=txt) - atr = AccessTokenResponse().deserialize(resp.message, "json") + resp = self.provider.refresh_token_grant_type(rareq) + atr = TokenErrorResponse().deserialize(resp.message, "json") + assert atr['error'] == 'invalid_request' + assert atr['error_description'] == 'Not a refresh token' + def test_refresh_token_grant_type_expired(self): + # Missing refresh_token also raises Expired rareq = RefreshAccessTokenRequest(grant_type="refresh_token", - refresh_token=atr['refresh_token'], + refresh_token='Refresh_some_other_refresh_token', client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scope=['openid']) - resp = self.provider.token_endpoint(request=rareq.to_urlencoded()) - assert resp.headers == [('Pragma', 'no-cache'), ('Cache-Control', 'no-store'), - ('Content-type', 'application/json')] + resp = self.provider.refresh_token_grant_type(rareq) + atr = TokenErrorResponse().deserialize(resp.message, "json") + assert atr['error'] == 'invalid_request' + assert atr['error_description'] == 'Refresh token is expired' def test_authorization_endpoint_faulty_request_uri(self): bib = {"scope": ["openid"],