Skip to content

Commit

Permalink
Add support for multiple security schemes in AND fashion (#1290)
Browse files Browse the repository at this point in the history
* Add support for multiple security schemes in AND fashion

* Add test for operation with multiple security schemes combined using logical AND

* Add test for multiple oauth combined using logical AND
  • Loading branch information
Ruwann authored Jul 16, 2021
1 parent a8375a1 commit 3195bec
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 61 deletions.
134 changes: 73 additions & 61 deletions connexion/operations/secure.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,72 +80,84 @@ def security_decorator(self):
if not security_req:
auth_funcs.append(self._api.security_handler_factory.verify_none())
continue
elif len(security_req) > 1:
logger.warning("... More than one security scheme in security requirement defined. "
"**DENYING ALL REQUESTS**", extra=vars(self))
return self._api.security_handler_factory.security_deny

scheme_name, scopes = next(iter(security_req.items()))
security_scheme = self.security_schemes[scheme_name]

if security_scheme['type'] == 'oauth2':
required_scopes = scopes
token_info_func = self._api.security_handler_factory.get_tokeninfo_func(security_scheme)
scope_validate_func = self._api.security_handler_factory.get_scope_validate_func(security_scheme)
if not token_info_func:
logger.warning("... x-tokenInfoFunc missing", extra=vars(self))
continue

auth_funcs.append(self._api.security_handler_factory.verify_oauth(token_info_func, scope_validate_func))

# Swagger 2.0
elif security_scheme['type'] == 'basic':
basic_info_func = self._api.security_handler_factory.get_basicinfo_func(security_scheme)
if not basic_info_func:
logger.warning("... x-basicInfoFunc missing", extra=vars(self))
continue

auth_funcs.append(self._api.security_handler_factory.verify_basic(basic_info_func))

# OpenAPI 3.0.0
elif security_scheme['type'] == 'http':
scheme = security_scheme['scheme'].lower()
if scheme == 'basic':

sec_req_funcs = {}
oauth = False
for scheme_name, scopes in security_req.items():
security_scheme = self.security_schemes[scheme_name]

if security_scheme['type'] == 'oauth2':
if oauth:
logger.warning("... multiple OAuth2 security schemes in AND fashion not supported", extra=vars(self))
break
oauth = True
required_scopes = scopes
token_info_func = self._api.security_handler_factory.get_tokeninfo_func(security_scheme)
scope_validate_func = self._api.security_handler_factory.get_scope_validate_func(security_scheme)
if not token_info_func:
logger.warning("... x-tokenInfoFunc missing", extra=vars(self))
break

sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_oauth(
token_info_func, scope_validate_func)

# Swagger 2.0
elif security_scheme['type'] == 'basic':
basic_info_func = self._api.security_handler_factory.get_basicinfo_func(security_scheme)
if not basic_info_func:
logger.warning("... x-basicInfoFunc missing", extra=vars(self))
continue

auth_funcs.append(self._api.security_handler_factory.verify_basic(basic_info_func))
elif scheme == 'bearer':
bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme)
if not bearer_info_func:
logger.warning("... x-bearerInfoFunc missing", extra=vars(self))
continue
auth_funcs.append(self._api.security_handler_factory.verify_bearer(bearer_info_func))
else:
logger.warning("... Unsupported http authorization scheme %s" % scheme, extra=vars(self))

elif security_scheme['type'] == 'apiKey':
scheme = security_scheme.get('x-authentication-scheme', '').lower()
if scheme == 'bearer':
bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme)
if not bearer_info_func:
logger.warning("... x-bearerInfoFunc missing", extra=vars(self))
continue
auth_funcs.append(self._api.security_handler_factory.verify_bearer(bearer_info_func))
else:
apikey_info_func = self._api.security_handler_factory.get_apikeyinfo_func(security_scheme)
if not apikey_info_func:
logger.warning("... x-apikeyInfoFunc missing", extra=vars(self))
continue

auth_funcs.append(self._api.security_handler_factory.verify_api_key(
apikey_info_func, security_scheme['in'], security_scheme['name']
))
break

sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_basic(basic_info_func)

# OpenAPI 3.0.0
elif security_scheme['type'] == 'http':
scheme = security_scheme['scheme'].lower()
if scheme == 'basic':
basic_info_func = self._api.security_handler_factory.get_basicinfo_func(security_scheme)
if not basic_info_func:
logger.warning("... x-basicInfoFunc missing", extra=vars(self))
break

sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_basic(basic_info_func)
elif scheme == 'bearer':
bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme)
if not bearer_info_func:
logger.warning("... x-bearerInfoFunc missing", extra=vars(self))
break
sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_bearer(bearer_info_func)
else:
logger.warning("... Unsupported http authorization scheme %s" % scheme, extra=vars(self))
break

elif security_scheme['type'] == 'apiKey':
scheme = security_scheme.get('x-authentication-scheme', '').lower()
if scheme == 'bearer':
bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme)
if not bearer_info_func:
logger.warning("... x-bearerInfoFunc missing", extra=vars(self))
break
sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_bearer(bearer_info_func)
else:
apikey_info_func = self._api.security_handler_factory.get_apikeyinfo_func(security_scheme)
if not apikey_info_func:
logger.warning("... x-apikeyInfoFunc missing", extra=vars(self))
break

sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_api_key(
apikey_info_func, security_scheme['in'], security_scheme['name']
)

else:
logger.warning("... Unsupported security scheme type %s" % security_scheme['type'], extra=vars(self))
break
else:
logger.warning("... Unsupported security scheme type %s" % security_scheme['type'], extra=vars(self))
# No break encountered: no missing funcs
if len(sec_req_funcs) == 1:
(func,) = sec_req_funcs.values()
auth_funcs.append(func)
else:
auth_funcs.append(self._api.security_handler_factory.verify_multiple_schemes(sec_req_funcs))

return functools.partial(self._api.security_handler_factory.verify_security, auth_funcs, required_scopes)

Expand Down
22 changes: 22 additions & 0 deletions connexion/security/security_handler_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,28 @@ def wrapper(request, required_scopes):

return wrapper

def verify_multiple_schemes(self, schemes):
"""
Verifies multiple authentication schemes in AND fashion.
If any scheme fails, the entire authentication fails.
:param schemes: mapping scheme_name to auth function
:type schemes: dict
:rtype: types.FunctionType
"""

def wrapper(request, required_scopes):
token_info = {}
for scheme_name, func in schemes.items():
result = func(request, required_scopes)
if result is self.no_value:
return self.no_value
token_info[scheme_name] = result

return token_info

return wrapper

@staticmethod
def verify_none():
"""
Expand Down
13 changes: 13 additions & 0 deletions docs/security.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ semantics as for ``x-tokenInfoFunc``, but the function accepts one parameter: to

You can find a `minimal JWT example application`_ in Connexion's "examples/openapi3" folder.

Multiple Authentication Schemes
-------------------------------

With Connexion, it is also possible to combine multiple authentication schemes
as described in the `OpenAPI specification`_. When multiple authentication
schemes are combined using logical AND, the ``token_info`` argument will
consist of a dictionary mapping the names of the security scheme to their
corresponding ``token_info``.

Multiple OAuth2 security schemes in AND fashion are not supported.

.. _OpenAPI specification: https://swagger.io/docs/specification/authentication/#multiple

Deploying Authentication
------------------------

Expand Down
43 changes: 43 additions & 0 deletions tests/decorators/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,49 @@ def apikey_info(apikey, required_scopes=None):
assert wrapped_func(request, ['admin']) is not None


def test_multiple_schemes(security_handler_factory):
def apikey1_info(apikey, required_scopes=None):
if apikey == 'foobar':
return {'sub': 'foo'}
return None
def apikey2_info(apikey, required_scopes=None):
if apikey == 'bar':
return {'sub': 'bar'}
return None

wrapped_func_key1 = security_handler_factory.verify_api_key(apikey1_info, 'header', 'X-Auth-1')
wrapped_func_key2 = security_handler_factory.verify_api_key(apikey2_info, 'header', 'X-Auth-2')
schemes = {
'key1': wrapped_func_key1,
'key2': wrapped_func_key2,
}
wrapped_func = security_handler_factory.verify_multiple_schemes(schemes)

# Single key does not succeed
request = MagicMock()
request.headers = {"X-Auth-1": 'foobar'}

assert wrapped_func(request, ['admin']) is security_handler_factory.no_value

request = MagicMock()
request.headers = {"X-Auth-2": 'bar'}

assert wrapped_func(request, ['admin']) is security_handler_factory.no_value

# Supplying both keys does succeed
request = MagicMock()
request.headers = {
"X-Auth-1": 'foobar',
"X-Auth-2": 'bar'
}

expected_token_info = {
'key1': {'sub': 'foo'},
'key2': {'sub': 'bar'},
}
assert wrapped_func(request, ['admin']) == expected_token_info


def test_verify_security_oauthproblem(security_handler_factory):
"""Tests whether verify_security raises an OAuthProblem if there are no auth_funcs."""
func_to_secure = MagicMock(return_value='func')
Expand Down
106 changes: 106 additions & 0 deletions tests/test_operation2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import logging
import math
import pathlib
import types
Expand Down Expand Up @@ -204,6 +205,16 @@
'security': [{'oauth': ['uid']}],
'summary': 'Create new stack'}

OPERATION9 = {'description': 'operation secured with 2 api keys',
'operationId': 'fakeapi.hello.post_greeting',
'responses': {'200': {'description': 'OK'}},
'security': [{'key1': [], 'key2': []}]}

OPERATION10 = {'description': 'operation secured with 2 oauth schemes combined using logical AND',
'operationId': 'fakeapi.hello.post_greeting',
'responses': {'200': {'description': 'OK'}},
'security': [{'oauth_1': ['uid'], 'oauth_2': ['uid']}]}

SECURITY_DEFINITIONS_REMOTE = {'oauth': {'type': 'oauth2',
'flow': 'password',
'x-tokenInfoUrl': 'https://oauth.example/token_info',
Expand All @@ -224,6 +235,24 @@
'flow': 'password',
'scopes': {'myscope': 'can do stuff'}}}

SECURITY_DEFINITIONS_2_KEYS = {'key1': {'type': 'apiKey',
'in': 'header',
'name': 'X-Auth-1',
'x-apikeyInfoFunc': 'math.ceil'},
'key2': {'type': 'apiKey',
'in': 'header',
'name': 'X-Auth-2',
'x-apikeyInfoFunc': 'math.ceil'}}

SECURITY_DEFINITIONS_2_OAUTH = {'oauth_1': {'type': 'oauth2',
'flow': 'password',
'x-tokenInfoFunc': 'math.ceil',
'scopes': {'myscope': 'can do stuff'}},
'oauth_2': {'type': 'oauth2',
'flow': 'password',
'x-tokenInfoFunc': 'math.ceil',
'scopes': {'myscope': 'can do stuff'}}}


@pytest.fixture
def api(security_handler_factory):
Expand Down Expand Up @@ -452,6 +481,83 @@ def test_no_token_info(api):
assert operation.body_schema == expected_body_schema


def test_multiple_security_schemes_and(api):
"""Tests an operation with multiple security schemes in AND fashion."""
def return_api_key_name(func, in_, name):
return name
verify_api_key = mock.MagicMock(side_effect=return_api_key_name)
api.security_handler_factory.verify_api_key = verify_api_key
verify_multiple = mock.MagicMock(return_value='verify_multiple_result')
api.security_handler_factory.verify_multiple_schemes = verify_multiple

op_spec = make_operation(OPERATION9)
operation = Swagger2Operation(api=api,
method='GET',
path='endpoint',
path_parameters=[],
operation=op_spec,
app_produces=['application/json'],
app_consumes=['application/json'],
app_security=SECURITY_DEFINITIONS_2_KEYS,
security_definitions=SECURITY_DEFINITIONS_2_KEYS,
definitions=DEFINITIONS,
parameter_definitions=PARAMETER_DEFINITIONS,
resolver=Resolver())
assert isinstance(operation.function, types.FunctionType)
assert verify_api_key.call_count == 2
verify_api_key.assert_any_call(math.ceil, 'header', 'X-Auth-1')
verify_api_key.assert_any_call(math.ceil, 'header', 'X-Auth-2')
# Assert verify_multiple_schemes is called with mapping from scheme name
# to result of security_handler_factory.verify_api_key()
verify_multiple.assert_called_with({'key1': 'X-Auth-1', 'key2': 'X-Auth-2'})

security_decorator = operation.security_decorator
assert len(security_decorator.args[0]) == 1
assert security_decorator.args[0][0] == 'verify_multiple_result'
assert security_decorator.args[1] is None

assert operation.method == 'GET'
assert operation.produces == ['application/json']
assert operation.consumes == ['application/json']
assert operation.security == [{'key1': [], 'key2': []}]


def test_multiple_oauth_in_and(api, caplog):
"""Tests an operation with multiple oauth security schemes in AND fashion.
These should be ignored and raise a warning.
"""
caplog.set_level(logging.WARNING, logger="connexion.operations.secure")
verify_oauth = mock.MagicMock(return_value='verify_oauth_result')
api.security_handler_factory.verify_oauth = verify_oauth

op_spec = make_operation(OPERATION10)
operation = Swagger2Operation(api=api,
method='GET',
path='endpoint',
path_parameters=[],
operation=op_spec,
app_produces=['application/json'],
app_consumes=['application/json'],
app_security=SECURITY_DEFINITIONS_2_OAUTH,
security_definitions=SECURITY_DEFINITIONS_2_OAUTH,
definitions=DEFINITIONS,
parameter_definitions=PARAMETER_DEFINITIONS,
resolver=Resolver())
assert isinstance(operation.function, types.FunctionType)

security_decorator = operation.security_decorator
assert len(security_decorator.args[0]) == 0
assert security_decorator.args[0] == []
assert security_decorator.args[1] == ['uid']

assert '... multiple OAuth2 security schemes in AND fashion not supported' in caplog.text

assert operation.method == 'GET'
assert operation.produces == ['application/json']
assert operation.consumes == ['application/json']
assert operation.security == [{'oauth_1': ['uid'], 'oauth_2': ['uid']}]


def test_parameter_reference(api):
op_spec = make_operation(OPERATION3, definitions=False)
operation = Swagger2Operation(api=api,
Expand Down

0 comments on commit 3195bec

Please sign in to comment.