diff --git a/pybotx/bot/bot.py b/pybotx/bot/bot.py index 670a6cf3..0cd21e17 100644 --- a/pybotx/bot/bot.py +++ b/pybotx/bot/bot.py @@ -12,6 +12,7 @@ Mapping, Optional, Sequence, + Set, Tuple, Union, ) @@ -302,12 +303,13 @@ def async_execute_raw_bot_command( verify_request: bool = True, request_headers: Optional[Mapping[str, str]] = None, logging_command: bool = True, + trusted_issuers: Optional[Set[str]] = None, ) -> None: if logging_command: log_incoming_request(raw_bot_command, message="Got command: ") if verify_request: - self._verify_request(request_headers) + self._verify_request(request_headers, trusted_issuers=trusted_issuers) try: bot_api_command: BotAPICommand = parse_obj_as( @@ -336,6 +338,7 @@ async def sync_execute_raw_smartapp_event( verify_request: bool = True, request_headers: Optional[Mapping[str, str]] = None, logging_command: bool = True, + trusted_issuers: Optional[Set[str]] = None, ) -> BotAPISyncSmartAppEventResponse: if logging_command: log_incoming_request( @@ -344,7 +347,7 @@ async def sync_execute_raw_smartapp_event( ) if verify_request: - self._verify_request(request_headers) + self._verify_request(request_headers, trusted_issuers=trusted_issuers) try: bot_api_smartapp_event: BotAPISyncSmartAppEvent = parse_obj_as( @@ -374,6 +377,7 @@ async def raw_get_status( query_params: Dict[str, str], verify_request: bool = True, request_headers: Optional[Mapping[str, str]] = None, + trusted_issuers: Optional[Set[str]] = None, ) -> Dict[str, Any]: logger.opt(lazy=True).debug( "Got status: {status}", @@ -381,9 +385,7 @@ async def raw_get_status( ) if verify_request: - if request_headers is None: - raise RequestHeadersNotProvidedError - self._verify_request(request_headers) + self._verify_request(request_headers, trusted_issuers=trusted_issuers) try: bot_api_status_recipient = BotAPIStatusRecipient.parse_obj(query_params) @@ -406,13 +408,12 @@ async def set_raw_botx_method_result( raw_botx_method_result: Dict[str, Any], verify_request: bool = True, request_headers: Optional[Mapping[str, str]] = None, + trusted_issuers: Optional[Set[str]] = None, ) -> None: logger.debug("Got callback: {callback}", callback=raw_botx_method_result) if verify_request: - if request_headers is None: - raise RequestHeadersNotProvidedError - self._verify_request(request_headers) + self._verify_request(request_headers, trusted_issuers=trusted_issuers) callback: BotXMethodCallback = parse_obj_as( # Same ignore as in pydantic @@ -2068,6 +2069,8 @@ async def collect_metric( def _verify_request( # noqa: WPS231, WPS238 self, headers: Optional[Mapping[str, str]], + *, + trusted_issuers: Optional[Set[str]] = None, ) -> None: if headers is None: raise RequestHeadersNotProvidedError @@ -2108,11 +2111,20 @@ def _verify_request( # noqa: WPS231, WPS238 leeway=1, options={ "verify_aud": False, + "verify_iss": False, }, ) except jwt.InvalidTokenError as exc: raise UnverifiedRequestError(exc.args[0]) from exc + issuer = token_payload.get("iss") + if issuer is None: + raise UnverifiedRequestError('Token is missing the "iss" claim') + + if issuer != bot_account.host: + if not trusted_issuers or issuer not in trusted_issuers: + raise UnverifiedRequestError("Invalid issuer") + @staticmethod def _build_main_collector( collectors: Sequence[HandlerCollector], diff --git a/pyproject.toml b/pyproject.toml index c769b559..658117c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pybotx" -version = "0.72.0" +version = "0.73.0" description = "A python library for interacting with eXpress BotX API" authors = [ "Sidnev Nikolay ", diff --git a/tests/test_verify_request.py b/tests/test_verify_request.py index d2082271..9480c088 100644 --- a/tests/test_verify_request.py +++ b/tests/test_verify_request.py @@ -263,9 +263,85 @@ async def test__verify_request__invalid_issuer( assert "Invalid issuer" in str(exc.value) +async def test__verify_request__trusted_issuers_have_token_issuer( + bot_account: BotAccountWithSecret, + authorization_token_payload: Dict[str, Any], +) -> None: + # - Arrange - + collector = HandlerCollector() + built_bot = Bot(collectors=[collector], bot_accounts=[bot_account]) + token_issuer = "another.example.com" + authorization_token_payload["iss"] = token_issuer + token = jwt.encode( + payload=authorization_token_payload, + key=bot_account.secret_key, + ) + + # - Act - + async with lifespan_wrapper(built_bot) as bot: + bot._verify_request( + {"authorization": f"Bearer {token}"}, + trusted_issuers={token_issuer}, + ) + + +async def test__verify_request__trusted_issuers_have_not_token_issuer( + bot_account: BotAccountWithSecret, + authorization_token_payload: Dict[str, Any], +) -> None: + # - Arrange - + collector = HandlerCollector() + built_bot = Bot(collectors=[collector], bot_accounts=[bot_account]) + authorization_token_payload["iss"] = "another.example.com" + token = jwt.encode( + payload=authorization_token_payload, + key=bot_account.secret_key, + ) + + # - Act - + async with lifespan_wrapper(built_bot) as bot: + with pytest.raises(UnverifiedRequestError) as exc: + bot._verify_request( + {"authorization": f"Bearer {token}"}, + trusted_issuers={"another-another.example.com"}, + ) + + # - Assert - + assert "Invalid issuer" in str(exc.value) + + +async def test__verify_request__token_issuer_is_missed( + bot_account: BotAccountWithSecret, + authorization_token_payload: Dict[str, Any], +) -> None: + # - Arrange - + collector = HandlerCollector() + built_bot = Bot(collectors=[collector], bot_accounts=[bot_account]) + del authorization_token_payload["iss"] + token = jwt.encode( + payload=authorization_token_payload, + key=bot_account.secret_key, + ) + + # - Act - + async with lifespan_wrapper(built_bot) as bot: + with pytest.raises(UnverifiedRequestError) as exc: + bot._verify_request( + {"authorization": f"Bearer {token}"}, + ) + + # - Assert - + assert 'Token is missing the "iss" claim' in str(exc.value) + + @pytest.mark.parametrize( "target_func_name", - ("async_execute_raw_bot_command", "raw_get_status", "set_raw_botx_method_result"), + ( + "async_execute_raw_bot_command", + "sync_execute_raw_smartapp_event", + "raw_get_status", + "set_raw_botx_method_result", + ), ) async def test__verify_request__without_headers( api_incoming_message_factory: Callable[..., Dict[str, Any]], @@ -311,6 +387,31 @@ async def test__async_execute_raw_bot_command__verify_request__called( bot._verify_request.assert_called() +async def test__sync_execute_raw_smartapp_event__verify_request__called( + api_sync_smartapp_event_factory: Callable[..., Dict[str, Any]], + collector_with_sync_smartapp_event_handler: HandlerCollector, + bot_account: BotAccountWithSecret, +) -> None: + # - Arrange - + built_bot = Bot( + collectors=[collector_with_sync_smartapp_event_handler], + bot_accounts=[bot_account], + ) + payload = api_sync_smartapp_event_factory(bot_id=bot_account.id) + + # - Act - + async with lifespan_wrapper(built_bot) as bot: + bot._verify_request = Mock() # type: ignore + await bot.sync_execute_raw_smartapp_event( + payload, + verify_request=True, + request_headers={}, + ) + + # - Assert - + bot._verify_request.assert_called() + + async def test__raw_get_status__verify_request__called( api_incoming_message_factory: Callable[..., Dict[str, Any]], bot_account: BotAccountWithSecret, @@ -384,6 +485,26 @@ async def test__async_execute_raw_bot_command__verify_request__not_called( bot.async_execute_bot_command.assert_called() +async def test__sync_execute_raw_smartapp_event__verify_request__not_called( + api_incoming_message_factory: Callable[..., Dict[str, Any]], + bot_account: BotAccountWithSecret, +) -> None: + # - Arrange - + collector = HandlerCollector() + built_bot = Bot(collectors=[collector], bot_accounts=[bot_account]) + payload = api_incoming_message_factory() + + # - Act - + async with lifespan_wrapper(built_bot) as bot: + bot._verify_request = Mock() # type: ignore + bot.sync_execute_raw_smartapp_event = Mock() # type: ignore + bot.sync_execute_raw_smartapp_event(payload, verify_request=False) + + # - Assert - + bot._verify_request.assert_not_called() + bot.sync_execute_raw_smartapp_event.assert_called() + + async def test__raw_get_status__verify_request__not_called( api_incoming_message_factory: Callable[..., Dict[str, Any]], bot_account: BotAccountWithSecret,