Skip to content

Commit

Permalink
fixup! Synced implementation of token_endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tpazderka committed Mar 4, 2019
1 parent 87965a0 commit 371923b
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 188 deletions.
5 changes: 3 additions & 2 deletions src/oic/oauth2/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,18 +769,19 @@ def token_scope_check(self, areq, info):
"""Not implemented here."""
return None

def token_endpoint(self, request='', authn='', dtype='urlencoded', **kwargs):
def token_endpoint(self, request='', authn='', dtype='urlencoded', atr_class=AccessTokenRequest, **kwargs):
"""
Provide clients with access tokens.
:param authn: Auhentication info, comes from HTTP header.
:param request: The request.
:param dtype: deserialization method for the request.
:param atr_class: Class used for AcessTokenRequest deserialization
"""
logger.debug("- token -")
logger.debug("token_request: %s" % sanitize(request))

areq = AccessTokenRequest().deserialize(request, dtype)
areq = atr_class().deserialize(request, dtype)

# Verify client authentication
try:
Expand Down
19 changes: 12 additions & 7 deletions src/oic/oic/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from oic.oic import claims_match
from oic.oic import scope2claims
from oic.oic.message import SCOPE2CLAIMS
from oic.oic.message import AccessTokenRequest
from oic.oic.message import AccessTokenResponse
from oic.oic.message import AuthorizationRequest
from oic.oic.message import AuthorizationResponse
Expand Down Expand Up @@ -83,6 +84,7 @@
from oic.utils.sdb import AccessCodeUsed
from oic.utils.sdb import AuthnEvent
from oic.utils.sdb import ExpiredToken
from oic.utils.sdb import WrongTokenType
from oic.utils.template_render import render_template
from oic.utils.time_util import utc_time_sans_frac

Expand Down Expand Up @@ -958,6 +960,10 @@ def sign_encrypt_id_token(self, sinfo, client_info, areq, code=None,

return id_token

def token_endpoint(self, request='', authn='', dtype='urlencoded', atr_class=AccessTokenRequest, **kwargs):
"""Overloaded to use the oic.message."""
super().token_endpoint(request=request, authn=authn, dtype=dtype, atr_class=atr_class, **kwargs)

def code_grant_type(self, areq):
"""
Token authorization using Code Grant.
Expand All @@ -984,13 +990,10 @@ def code_grant_type(self, areq):
except KeyError:
return error_response("invalid_request", descr="Code is invalid")

# If redirect_uri was in the initial authorization request
# verify that the one given here is the correct one.
if "redirect_uri" in _info:
if 'redirect_uri' not in areq:
return error_response('invalid_request', descr='Missing redirect_uri')
if areq["redirect_uri"] != _info["redirect_uri"]:
return error_response("invalid_request", descr="redirect_uri mismatch")
# If redirect_uri was in the initial authorization request verify that it is here as well
# Mismatch would raise in oic.oauth2.provider.Provider.token_endpoint
if "redirect_uri" in _info and 'redirect_uri' not in areq:
return error_response('invalid_request', descr='Missing redirect_uri')

_log_debug("All checks OK")

Expand Down Expand Up @@ -1045,6 +1048,8 @@ def refresh_token_grant_type(self, areq):
_info = _sdb.refresh_token(rtoken, client_id=client_id)
except ExpiredToken:
return error_response("invalid_request", descr="Refresh token is expired")
except WrongTokenType:
return error_response("invalid_request", descr="Not a refresh token")

if "openid" in _info["scope"] and "authn_event" in _info:
userinfo = self.userinfo_in_id_token_claims(_info)
Expand Down
256 changes: 256 additions & 0 deletions tests/test_oauth2_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@
import pytest
from testfixtures import LogCapture

from oic.exception import UnSupported
from oic.oauth2.consumer import Consumer
from oic.oauth2.message import AccessTokenRequest
from oic.oauth2.message import AccessTokenResponse
from oic.oauth2.message import AuthorizationRequest
from oic.oauth2.message import AuthorizationResponse
from oic.oauth2.message import CCAccessTokenRequest
from oic.oauth2.message import Message
from oic.oauth2.message import ROPCAccessTokenRequest
from oic.oauth2.message import TokenErrorResponse
from oic.oauth2.provider import Provider
from oic.utils.authn.authn_context import AuthnBroker
from oic.utils.authn.client import verify_client
from oic.utils.authn.user import UserAuthnMethod
from oic.utils.authz import Implicit
from oic.utils.http_util import Response
from oic.utils.sdb import AuthnEvent

CLIENT_CONFIG = {
"client_id": "client1",
Expand Down Expand Up @@ -53,6 +58,12 @@
"redirect_uris": [("http://localhost:8087/authz", None)],
'token_endpoint_auth_method': 'client_secret_post',
'response_types': ['code', 'token']
},
"client2": {
"client_secret": "verysecret",
"redirect_uris": [("http://localhost:8087/authz", None)],
'token_endpoint_auth_method': 'client_secret_basic',
'response_types': ['code', 'token']
}
}

Expand Down Expand Up @@ -340,6 +351,251 @@ def test_token_endpoint_unauth(self):
atr = TokenErrorResponse().deserialize(resp.message, "json")
assert _eq(atr.keys(), ['error_description', 'error'])

def test_token_endpoint_malformed_code(self):
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
client_id='client1',
response_type="code",
scope=["openid"])

_sdb = self.provider.sdb
sid = _sdb.access_token.key(user="sub", areq=authreq)
access_grant = _sdb.access_token(sid=sid)
_sdb[sid] = {
"oauth_state": "authz",
"authn_event": '',
"authzreq": '',
"client_id": 'client1',
"code": access_grant,
"code_used": False,
"scope": ["openid"],
"redirect_uri": "http://example.com/authz",
}

# Construct Access token request
areq = AccessTokenRequest(code=access_grant[0:len(access_grant) - 1],
client_id='client1',
redirect_uri="http://example.com/authz",
client_secret='hemlighet',
grant_type='authorization_code')

txt = areq.to_urlencoded()

resp = self.provider.token_endpoint(request=txt)
atr = TokenErrorResponse().deserialize(resp.message, "json")
assert atr['error'] == "unauthorized_client"

def test_token_endpoint_bad_redirect_uri(self):
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
client_id='client1',
response_type="code",
scope=["openid"])

_sdb = self.provider.sdb
sid = _sdb.access_token.key(user="sub", areq=authreq)
access_grant = _sdb.access_token(sid=sid)
_sdb[sid] = {
"oauth_state": "authz",
"authn_event": '',
"authzreq": '',
"client_id": 'client1',
"code": access_grant,
"code_used": False,
"scope": ["openid"],
"redirect_uri": "http://example.com/authz",
}

# Construct Access token request
areq = AccessTokenRequest(code=access_grant,
client_id='client1',
redirect_uri="http://example.com/authz2",
client_secret='hemlighet',
grant_type='authorization_code')

txt = areq.to_urlencoded()

resp = self.provider.token_endpoint(request=txt)
atr = TokenErrorResponse().deserialize(resp.message, "json")
assert atr['error'] == "unauthorized_client"

def test_token_endpoint_ok_state(self):
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
client_id='client1',
response_type="code",
scope=["openid"])

_sdb = self.provider.sdb
sid = _sdb.access_token.key(user="sub", areq=authreq)
access_grant = _sdb.access_token(sid=sid)
ae = AuthnEvent("user", "salt")
_sdb[sid] = {
"oauth_state": "authz",
"authn_event": ae.to_json(),
"authzreq": '',
"client_id": 'client1',
"code": access_grant,
'state': 'state',
"code_used": False,
"scope": ["openid"],
"redirect_uri": "http://example.com/authz",
}
_sdb.do_sub(sid, "client_salt")

# Construct Access token request
areq = AccessTokenRequest(code=access_grant,
client_id='client1',
redirect_uri="http://example.com/authz",
client_secret='hemlighet',
grant_type='authorization_code',
state='state')

txt = areq.to_urlencoded()

resp = self.provider.token_endpoint(request=txt)
atr = AccessTokenResponse().deserialize(resp.message, "json")
assert atr['token_type'] == "Bearer"

def test_token_endpoint_bad_state(self):
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
client_id='client1',
response_type="code",
scope=["openid"])

_sdb = self.provider.sdb
sid = _sdb.access_token.key(user="sub", areq=authreq)
access_grant = _sdb.access_token(sid=sid)
_sdb[sid] = {
"oauth_state": "authz",
"authn_event": '',
"authzreq": '',
"client_id": 'client1',
"code": access_grant,
'state': 'state',
"code_used": False,
"scope": ["openid"],
"redirect_uri": "http://example.com/authz",
}

# Construct Access token request
areq = AccessTokenRequest(code=access_grant,
client_id='client1',
redirect_uri="http://example.com/authz",
client_secret='hemlighet',
grant_type='authorization_code',
state='other_state')

txt = areq.to_urlencoded()

resp = self.provider.token_endpoint(request=txt)
atr = TokenErrorResponse().deserialize(resp.message, "json")
assert atr['error'] == "unauthorized_client"

def test_token_endpoint_client_credentials(self):
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
client_id="client1")

_sdb = self.provider.sdb
sid = _sdb.access_token.key(user="sub", areq=authreq)
access_grant = _sdb.access_token(sid=sid)
_sdb[sid] = {
"oauth_state": "authz",
"sub": "sub",
"authzreq": "",
"client_id": "client1",
"code": access_grant,
"code_used": False,
"redirect_uri": "http://example.com/authz",
'token_endpoint_auth_method': 'client_secret_basic',
}
areq = CCAccessTokenRequest(grant_type='client_credentials')
authn = 'Basic Y2xpZW50Mjp2ZXJ5c2VjcmV0='
with pytest.raises(NotImplementedError):
self.provider.token_endpoint(request=areq.to_urlencoded(), authn=authn)

def test_token_endpoint_password(self):
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
client_id="client1")

_sdb = self.provider.sdb
sid = _sdb.access_token.key(user="sub", areq=authreq)
access_grant = _sdb.access_token(sid=sid)
_sdb[sid] = {
"oauth_state": "authz",
"sub": "sub",
"authzreq": "",
"client_id": "client1",
"code": access_grant,
"code_used": False,
"redirect_uri": "http://example.com/authz",
'token_endpoint_auth_method': 'client_secret_basic',
}
areq = ROPCAccessTokenRequest(grant_type='password', username='client1', password='password')
authn = 'Basic Y2xpZW50Mjp2ZXJ5c2VjcmV0='
with pytest.raises(NotImplementedError):
self.provider.token_endpoint(request=areq.to_urlencoded(), authn=authn)

def test_token_endpoint_other(self):
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
client_id="client1")

_sdb = self.provider.sdb
sid = _sdb.access_token.key(user="sub", areq=authreq)
access_grant = _sdb.access_token(sid=sid)
_sdb[sid] = {
"oauth_state": "authz",
"sub": "sub",
"authzreq": "",
"client_id": "client1",
"code": access_grant,
"code_used": False,
"redirect_uri": "http://example.com/authz",
'token_endpoint_auth_method': 'client_secret_basic',
}
areq = Message(grant_type='some_other')
authn = 'Basic Y2xpZW50Mjp2ZXJ5c2VjcmV0='
with pytest.raises(UnSupported):
self.provider.token_endpoint(request=areq.to_urlencoded(), authn=authn)

def test_code_grant_type_used(self):
authreq = AuthorizationRequest(state="state",
redirect_uri="http://example.com/authz",
client_id='client1',
response_type="code",
scope=["openid"])

_sdb = self.provider.sdb
sid = _sdb.access_token.key(user="sub", areq=authreq)
access_grant = _sdb.access_token(sid=sid)
_sdb[sid] = {
"oauth_state": "authz",
"authn_event": '',
"authzreq": '',
"client_id": 'client1',
"code": access_grant,
"code_used": True,
"scope": ["openid"],
"redirect_uri": "http://example.com/authz",
}

# Construct Access token request
areq = AccessTokenRequest(code=access_grant,
client_id='client1',
redirect_uri="http://example.com/authz",
client_secret='hemlighet',
grant_type='authorization_code')

txt = areq.to_urlencoded()

resp = self.provider.token_endpoint(request=txt)
atr = TokenErrorResponse().deserialize(resp.message, "json")
assert atr['error'] == "invalid_grant"

@pytest.mark.parametrize("response_types", [
['token id_token', 'id_token'],
['id_token token']
Expand Down
Loading

0 comments on commit 371923b

Please sign in to comment.