From 3195bec4e31e059af67536a09ea8df689e3aadbe Mon Sep 17 00:00:00 2001 From: Ruwann Date: Fri, 16 Jul 2021 08:46:15 +0200 Subject: [PATCH] Add support for multiple security schemes in AND fashion (#1290) * Add support for multiple security schemes in AND fashion * Add test for operation with multiple security schemes combined using logical AND * Add test for multiple oauth combined using logical AND --- connexion/operations/secure.py | 134 ++++++++++-------- .../security/security_handler_factory.py | 22 +++ docs/security.rst | 13 ++ tests/decorators/test_security.py | 43 ++++++ tests/test_operation2.py | 106 ++++++++++++++ 5 files changed, 257 insertions(+), 61 deletions(-) diff --git a/connexion/operations/secure.py b/connexion/operations/secure.py index 55b2d06a8..bc16cb914 100644 --- a/connexion/operations/secure.py +++ b/connexion/operations/secure.py @@ -80,72 +80,84 @@ def security_decorator(self): if not security_req: auth_funcs.append(self._api.security_handler_factory.verify_none()) continue - elif len(security_req) > 1: - logger.warning("... More than one security scheme in security requirement defined. " - "**DENYING ALL REQUESTS**", extra=vars(self)) - return self._api.security_handler_factory.security_deny - - scheme_name, scopes = next(iter(security_req.items())) - security_scheme = self.security_schemes[scheme_name] - - if security_scheme['type'] == 'oauth2': - required_scopes = scopes - token_info_func = self._api.security_handler_factory.get_tokeninfo_func(security_scheme) - scope_validate_func = self._api.security_handler_factory.get_scope_validate_func(security_scheme) - if not token_info_func: - logger.warning("... x-tokenInfoFunc missing", extra=vars(self)) - continue - - auth_funcs.append(self._api.security_handler_factory.verify_oauth(token_info_func, scope_validate_func)) - - # Swagger 2.0 - elif security_scheme['type'] == 'basic': - basic_info_func = self._api.security_handler_factory.get_basicinfo_func(security_scheme) - if not basic_info_func: - logger.warning("... x-basicInfoFunc missing", extra=vars(self)) - continue - - auth_funcs.append(self._api.security_handler_factory.verify_basic(basic_info_func)) - - # OpenAPI 3.0.0 - elif security_scheme['type'] == 'http': - scheme = security_scheme['scheme'].lower() - if scheme == 'basic': + + sec_req_funcs = {} + oauth = False + for scheme_name, scopes in security_req.items(): + security_scheme = self.security_schemes[scheme_name] + + if security_scheme['type'] == 'oauth2': + if oauth: + logger.warning("... multiple OAuth2 security schemes in AND fashion not supported", extra=vars(self)) + break + oauth = True + required_scopes = scopes + token_info_func = self._api.security_handler_factory.get_tokeninfo_func(security_scheme) + scope_validate_func = self._api.security_handler_factory.get_scope_validate_func(security_scheme) + if not token_info_func: + logger.warning("... x-tokenInfoFunc missing", extra=vars(self)) + break + + sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_oauth( + token_info_func, scope_validate_func) + + # Swagger 2.0 + elif security_scheme['type'] == 'basic': basic_info_func = self._api.security_handler_factory.get_basicinfo_func(security_scheme) if not basic_info_func: logger.warning("... x-basicInfoFunc missing", extra=vars(self)) - continue - - auth_funcs.append(self._api.security_handler_factory.verify_basic(basic_info_func)) - elif scheme == 'bearer': - bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme) - if not bearer_info_func: - logger.warning("... x-bearerInfoFunc missing", extra=vars(self)) - continue - auth_funcs.append(self._api.security_handler_factory.verify_bearer(bearer_info_func)) - else: - logger.warning("... Unsupported http authorization scheme %s" % scheme, extra=vars(self)) - - elif security_scheme['type'] == 'apiKey': - scheme = security_scheme.get('x-authentication-scheme', '').lower() - if scheme == 'bearer': - bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme) - if not bearer_info_func: - logger.warning("... x-bearerInfoFunc missing", extra=vars(self)) - continue - auth_funcs.append(self._api.security_handler_factory.verify_bearer(bearer_info_func)) - else: - apikey_info_func = self._api.security_handler_factory.get_apikeyinfo_func(security_scheme) - if not apikey_info_func: - logger.warning("... x-apikeyInfoFunc missing", extra=vars(self)) - continue - - auth_funcs.append(self._api.security_handler_factory.verify_api_key( - apikey_info_func, security_scheme['in'], security_scheme['name'] - )) + break + + sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_basic(basic_info_func) + + # OpenAPI 3.0.0 + elif security_scheme['type'] == 'http': + scheme = security_scheme['scheme'].lower() + if scheme == 'basic': + basic_info_func = self._api.security_handler_factory.get_basicinfo_func(security_scheme) + if not basic_info_func: + logger.warning("... x-basicInfoFunc missing", extra=vars(self)) + break + + sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_basic(basic_info_func) + elif scheme == 'bearer': + bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme) + if not bearer_info_func: + logger.warning("... x-bearerInfoFunc missing", extra=vars(self)) + break + sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_bearer(bearer_info_func) + else: + logger.warning("... Unsupported http authorization scheme %s" % scheme, extra=vars(self)) + break + + elif security_scheme['type'] == 'apiKey': + scheme = security_scheme.get('x-authentication-scheme', '').lower() + if scheme == 'bearer': + bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme) + if not bearer_info_func: + logger.warning("... x-bearerInfoFunc missing", extra=vars(self)) + break + sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_bearer(bearer_info_func) + else: + apikey_info_func = self._api.security_handler_factory.get_apikeyinfo_func(security_scheme) + if not apikey_info_func: + logger.warning("... x-apikeyInfoFunc missing", extra=vars(self)) + break + + sec_req_funcs[scheme_name] = self._api.security_handler_factory.verify_api_key( + apikey_info_func, security_scheme['in'], security_scheme['name'] + ) + else: + logger.warning("... Unsupported security scheme type %s" % security_scheme['type'], extra=vars(self)) + break else: - logger.warning("... Unsupported security scheme type %s" % security_scheme['type'], extra=vars(self)) + # No break encountered: no missing funcs + if len(sec_req_funcs) == 1: + (func,) = sec_req_funcs.values() + auth_funcs.append(func) + else: + auth_funcs.append(self._api.security_handler_factory.verify_multiple_schemes(sec_req_funcs)) return functools.partial(self._api.security_handler_factory.verify_security, auth_funcs, required_scopes) diff --git a/connexion/security/security_handler_factory.py b/connexion/security/security_handler_factory.py index 7a7cf691b..a6504c3b3 100644 --- a/connexion/security/security_handler_factory.py +++ b/connexion/security/security_handler_factory.py @@ -271,6 +271,28 @@ def wrapper(request, required_scopes): return wrapper + def verify_multiple_schemes(self, schemes): + """ + Verifies multiple authentication schemes in AND fashion. + If any scheme fails, the entire authentication fails. + + :param schemes: mapping scheme_name to auth function + :type schemes: dict + :rtype: types.FunctionType + """ + + def wrapper(request, required_scopes): + token_info = {} + for scheme_name, func in schemes.items(): + result = func(request, required_scopes) + if result is self.no_value: + return self.no_value + token_info[scheme_name] = result + + return token_info + + return wrapper + @staticmethod def verify_none(): """ diff --git a/docs/security.rst b/docs/security.rst index f2033fba8..85315b25e 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -87,6 +87,19 @@ semantics as for ``x-tokenInfoFunc``, but the function accepts one parameter: to You can find a `minimal JWT example application`_ in Connexion's "examples/openapi3" folder. +Multiple Authentication Schemes +------------------------------- + +With Connexion, it is also possible to combine multiple authentication schemes +as described in the `OpenAPI specification`_. When multiple authentication +schemes are combined using logical AND, the ``token_info`` argument will +consist of a dictionary mapping the names of the security scheme to their +corresponding ``token_info``. + +Multiple OAuth2 security schemes in AND fashion are not supported. + +.. _OpenAPI specification: https://swagger.io/docs/specification/authentication/#multiple + Deploying Authentication ------------------------ diff --git a/tests/decorators/test_security.py b/tests/decorators/test_security.py index e59e5fff7..44a39b86c 100644 --- a/tests/decorators/test_security.py +++ b/tests/decorators/test_security.py @@ -167,6 +167,49 @@ def apikey_info(apikey, required_scopes=None): assert wrapped_func(request, ['admin']) is not None +def test_multiple_schemes(security_handler_factory): + def apikey1_info(apikey, required_scopes=None): + if apikey == 'foobar': + return {'sub': 'foo'} + return None + def apikey2_info(apikey, required_scopes=None): + if apikey == 'bar': + return {'sub': 'bar'} + return None + + wrapped_func_key1 = security_handler_factory.verify_api_key(apikey1_info, 'header', 'X-Auth-1') + wrapped_func_key2 = security_handler_factory.verify_api_key(apikey2_info, 'header', 'X-Auth-2') + schemes = { + 'key1': wrapped_func_key1, + 'key2': wrapped_func_key2, + } + wrapped_func = security_handler_factory.verify_multiple_schemes(schemes) + + # Single key does not succeed + request = MagicMock() + request.headers = {"X-Auth-1": 'foobar'} + + assert wrapped_func(request, ['admin']) is security_handler_factory.no_value + + request = MagicMock() + request.headers = {"X-Auth-2": 'bar'} + + assert wrapped_func(request, ['admin']) is security_handler_factory.no_value + + # Supplying both keys does succeed + request = MagicMock() + request.headers = { + "X-Auth-1": 'foobar', + "X-Auth-2": 'bar' + } + + expected_token_info = { + 'key1': {'sub': 'foo'}, + 'key2': {'sub': 'bar'}, + } + assert wrapped_func(request, ['admin']) == expected_token_info + + def test_verify_security_oauthproblem(security_handler_factory): """Tests whether verify_security raises an OAuthProblem if there are no auth_funcs.""" func_to_secure = MagicMock(return_value='func') diff --git a/tests/test_operation2.py b/tests/test_operation2.py index 16a1a45d6..6976c1410 100644 --- a/tests/test_operation2.py +++ b/tests/test_operation2.py @@ -1,4 +1,5 @@ import copy +import logging import math import pathlib import types @@ -204,6 +205,16 @@ 'security': [{'oauth': ['uid']}], 'summary': 'Create new stack'} +OPERATION9 = {'description': 'operation secured with 2 api keys', + 'operationId': 'fakeapi.hello.post_greeting', + 'responses': {'200': {'description': 'OK'}}, + 'security': [{'key1': [], 'key2': []}]} + +OPERATION10 = {'description': 'operation secured with 2 oauth schemes combined using logical AND', + 'operationId': 'fakeapi.hello.post_greeting', + 'responses': {'200': {'description': 'OK'}}, + 'security': [{'oauth_1': ['uid'], 'oauth_2': ['uid']}]} + SECURITY_DEFINITIONS_REMOTE = {'oauth': {'type': 'oauth2', 'flow': 'password', 'x-tokenInfoUrl': 'https://oauth.example/token_info', @@ -224,6 +235,24 @@ 'flow': 'password', 'scopes': {'myscope': 'can do stuff'}}} +SECURITY_DEFINITIONS_2_KEYS = {'key1': {'type': 'apiKey', + 'in': 'header', + 'name': 'X-Auth-1', + 'x-apikeyInfoFunc': 'math.ceil'}, + 'key2': {'type': 'apiKey', + 'in': 'header', + 'name': 'X-Auth-2', + 'x-apikeyInfoFunc': 'math.ceil'}} + +SECURITY_DEFINITIONS_2_OAUTH = {'oauth_1': {'type': 'oauth2', + 'flow': 'password', + 'x-tokenInfoFunc': 'math.ceil', + 'scopes': {'myscope': 'can do stuff'}}, + 'oauth_2': {'type': 'oauth2', + 'flow': 'password', + 'x-tokenInfoFunc': 'math.ceil', + 'scopes': {'myscope': 'can do stuff'}}} + @pytest.fixture def api(security_handler_factory): @@ -452,6 +481,83 @@ def test_no_token_info(api): assert operation.body_schema == expected_body_schema +def test_multiple_security_schemes_and(api): + """Tests an operation with multiple security schemes in AND fashion.""" + def return_api_key_name(func, in_, name): + return name + verify_api_key = mock.MagicMock(side_effect=return_api_key_name) + api.security_handler_factory.verify_api_key = verify_api_key + verify_multiple = mock.MagicMock(return_value='verify_multiple_result') + api.security_handler_factory.verify_multiple_schemes = verify_multiple + + op_spec = make_operation(OPERATION9) + operation = Swagger2Operation(api=api, + method='GET', + path='endpoint', + path_parameters=[], + operation=op_spec, + app_produces=['application/json'], + app_consumes=['application/json'], + app_security=SECURITY_DEFINITIONS_2_KEYS, + security_definitions=SECURITY_DEFINITIONS_2_KEYS, + definitions=DEFINITIONS, + parameter_definitions=PARAMETER_DEFINITIONS, + resolver=Resolver()) + assert isinstance(operation.function, types.FunctionType) + assert verify_api_key.call_count == 2 + verify_api_key.assert_any_call(math.ceil, 'header', 'X-Auth-1') + verify_api_key.assert_any_call(math.ceil, 'header', 'X-Auth-2') + # Assert verify_multiple_schemes is called with mapping from scheme name + # to result of security_handler_factory.verify_api_key() + verify_multiple.assert_called_with({'key1': 'X-Auth-1', 'key2': 'X-Auth-2'}) + + security_decorator = operation.security_decorator + assert len(security_decorator.args[0]) == 1 + assert security_decorator.args[0][0] == 'verify_multiple_result' + assert security_decorator.args[1] is None + + assert operation.method == 'GET' + assert operation.produces == ['application/json'] + assert operation.consumes == ['application/json'] + assert operation.security == [{'key1': [], 'key2': []}] + + +def test_multiple_oauth_in_and(api, caplog): + """Tests an operation with multiple oauth security schemes in AND fashion. + These should be ignored and raise a warning. + """ + caplog.set_level(logging.WARNING, logger="connexion.operations.secure") + verify_oauth = mock.MagicMock(return_value='verify_oauth_result') + api.security_handler_factory.verify_oauth = verify_oauth + + op_spec = make_operation(OPERATION10) + operation = Swagger2Operation(api=api, + method='GET', + path='endpoint', + path_parameters=[], + operation=op_spec, + app_produces=['application/json'], + app_consumes=['application/json'], + app_security=SECURITY_DEFINITIONS_2_OAUTH, + security_definitions=SECURITY_DEFINITIONS_2_OAUTH, + definitions=DEFINITIONS, + parameter_definitions=PARAMETER_DEFINITIONS, + resolver=Resolver()) + assert isinstance(operation.function, types.FunctionType) + + security_decorator = operation.security_decorator + assert len(security_decorator.args[0]) == 0 + assert security_decorator.args[0] == [] + assert security_decorator.args[1] == ['uid'] + + assert '... multiple OAuth2 security schemes in AND fashion not supported' in caplog.text + + assert operation.method == 'GET' + assert operation.produces == ['application/json'] + assert operation.consumes == ['application/json'] + assert operation.security == [{'oauth_1': ['uid'], 'oauth_2': ['uid']}] + + def test_parameter_reference(api): op_spec = make_operation(OPERATION3, definitions=False) operation = Swagger2Operation(api=api,