diff --git a/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py b/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py index 185a49ffaba06..d87e4cd04d37f 100644 --- a/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py +++ b/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py @@ -17,6 +17,7 @@ from __future__ import annotations import base64 +import time import uuid import warnings from datetime import timedelta @@ -313,6 +314,80 @@ def get_sql_api_query_status(self, query_id: str) -> dict[str, str | list[str]]: status_code, resp = self._make_api_call_with_retries("GET", url, header, params) return self._process_response(status_code, resp) + def wait_for_query( + self, query_id: str, raise_error: bool = False, poll_interval: int = 5, timeout: int = 60 + ) -> dict[str, str | list[str]]: + """ + Wait for query to finish either successfully or with error. + + :param query_id: statement handle id for the individual statement. + :param raise_error: whether to raise an error if the query failed. + :param poll_interval: time (in seconds) between checking the query status. + :param timeout: max time (in seconds) to wait for the query to finish before raising a TimeoutError. + + :raises RuntimeError: If the query status is 'error' and `raise_error` is True. + :raises TimeoutError: If the query doesn't finish within the specified timeout. + """ + start_time = time.time() + + while True: + response = self.get_sql_api_query_status(query_id=query_id) + self.log.debug("Query status `%s`", response["status"]) + + if time.time() - start_time > timeout: + raise TimeoutError( + f"Query `{query_id}` did not finish within the timeout period of {timeout} seconds." + ) + + if response["status"] != "running": + self.log.info("Query status `%s`", response["status"]) + break + + time.sleep(poll_interval) + + if response["status"] == "error" and raise_error: + raise RuntimeError(response["message"]) + + return response + + def get_result_from_successful_sql_api_query(self, query_id: str) -> list[dict[str, Any]]: + """ + Based on the query id HTTP requests are made to snowflake SQL API and return result data. + + :param query_id: statement handle id for the individual statement. + + :raises RuntimeError: If the query status is not 'success'. + """ + self.log.info("Retrieving data for query id %s", query_id) + header, params, url = self.get_request_url_header_params(query_id) + status_code, response = self._make_api_call_with_retries("GET", url, header, params) + + if (query_status := self._process_response(status_code, response)["status"]) != "success": + msg = f"Query must have status `success` to retrieve data; got `{query_status}`." + raise RuntimeError(msg) + + # Below fields should always be present in response, but added some safety checks + data = response.get("data", []) + if not data: + self.log.warning("No data found in the API response.") + return [] + metadata = response.get("resultSetMetaData", {}) + col_names = [row["name"] for row in metadata.get("rowType", [])] + if not col_names: + self.log.warning("No column metadata found in the API response.") + return [] + + num_partitions = len(metadata.get("partitionInfo", [])) + if num_partitions > 1: + self.log.debug("Result data is returned as multiple partitions. Will perform additional queries.") + url += "?partition=" + for partition_no in range(1, num_partitions): # First partition was already returned + self.log.debug("Querying for partition no. %s", partition_no) + _, response = self._make_api_call_with_retries("GET", url + str(partition_no), header, params) + data.extend(response.get("data", [])) + + return [dict(zip(col_names, row)) for row in data] # Merged column names with data + async def get_sql_api_query_status_async(self, query_id: str) -> dict[str, str | list[str]]: """ Based on the query id async HTTP request is made to snowflake SQL API and return response. diff --git a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py index 8e610e2fd7546..2cbf156a2c94e 100644 --- a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py +++ b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py @@ -33,9 +33,7 @@ from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import Connection -from airflow.providers.snowflake.hooks.snowflake_sql_api import ( - SnowflakeSqlApiHook, -) +from airflow.providers.snowflake.hooks.snowflake_sql_api import SnowflakeSqlApiHook if TYPE_CHECKING: from pathlib import Path @@ -152,12 +150,13 @@ API_URL = "https://test.snowflakecomputing.com/api/v2/statements/test" +MODULE_PATH = "airflow.providers.snowflake.hooks.snowflake_sql_api" +HOOK_PATH = f"{MODULE_PATH}.SnowflakeSqlApiHook" + @pytest.fixture def mock_requests(): - with mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.requests.Session" - ) as mock_session_cls: + with mock.patch(f"{MODULE_PATH}.requests.Session") as mock_session_cls: mock_session = mock.MagicMock() mock_session_cls.return_value.__enter__.return_value = mock_session yield mock_session @@ -165,9 +164,7 @@ def mock_requests(): @pytest.fixture def mock_async_request(): - with mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.aiohttp.ClientSession.request" - ) as mock_session_cls: + with mock.patch(f"{MODULE_PATH}.aiohttp.ClientSession.request") as mock_session_cls: mock_request = mock.MagicMock() mock_session_cls.return_value = mock_request yield mock_request @@ -227,11 +224,8 @@ class TestSnowflakeSqlApiHook: (SQL_MULTIPLE_STMTS, 4, {"statementHandles": ["uuid", "uuid1"]}, ["uuid", "uuid1"]), ], ) - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params", - new_callable=PropertyMock, - ) - @mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook.get_headers") + @mock.patch(f"{HOOK_PATH}._get_conn_params", new_callable=PropertyMock) + @mock.patch(f"{HOOK_PATH}.get_headers") def test_execute_query( self, mock_get_header, @@ -254,11 +248,8 @@ def test_execute_query( query_ids = hook.execute_query(sql, statement_count) assert query_ids == expected_query_ids - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params", - new_callable=PropertyMock, - ) - @mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook.get_headers") + @mock.patch(f"{HOOK_PATH}._get_conn_params", new_callable=PropertyMock) + @mock.patch(f"{HOOK_PATH}.get_headers") def test_execute_query_multiple_times_give_fresh_query_ids_each_time( self, mock_get_header, mock_conn_param, mock_requests ): @@ -297,11 +288,8 @@ def test_execute_query_multiple_times_give_fresh_query_ids_each_time( "sql,statement_count,expected_response, expected_query_ids", [(SINGLE_STMT, 1, {"statementHandle": "uuid"}, ["uuid"])], ) - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params", - new_callable=PropertyMock, - ) - @mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook.get_headers") + @mock.patch(f"{HOOK_PATH}._get_conn_params", new_callable=PropertyMock) + @mock.patch(f"{HOOK_PATH}.get_headers") def test_execute_query_exception_without_statement_handle( self, mock_get_header, @@ -330,11 +318,8 @@ def test_execute_query_exception_without_statement_handle( (SQL_MULTIPLE_STMTS, 4, {"1": {"type": "FIXED", "value": "123"}}), ], ) - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params", - new_callable=PropertyMock, - ) - @mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook.get_headers") + @mock.patch(f"{HOOK_PATH}._get_conn_params", new_callable=PropertyMock) + @mock.patch(f"{HOOK_PATH}.get_headers") def test_execute_query_bindings_warning( self, mock_get_headers, @@ -364,10 +349,7 @@ def test_execute_query_bindings_warning( (["uuid", "uuid1"]), ], ) - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook." - "get_request_url_header_params" - ) + @mock.patch(f"{HOOK_PATH}.get_request_url_header_params") def test_check_query_output(self, mock_geturl_header_params, query_ids, mock_requests): """Test check_query_output by passing query ids as params and mock get_request_url_header_params""" req_id = uuid.uuid4() @@ -380,10 +362,7 @@ def test_check_query_output(self, mock_geturl_header_params, query_ids, mock_req mock_log_info.assert_called_with(GET_RESPONSE) @pytest.mark.parametrize("query_ids", [["uuid", "uuid1"]]) - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook." - "get_request_url_header_params" - ) + @mock.patch(f"{HOOK_PATH}.get_request_url_header_params") def test_check_query_output_exception( self, mock_geturl_header_params, @@ -405,11 +384,8 @@ def test_check_query_output_exception( with pytest.raises(requests.exceptions.HTTPError): hook.check_query_output(query_ids) - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params", - new_callable=PropertyMock, - ) - @mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook.get_headers") + @mock.patch(f"{HOOK_PATH}._get_conn_params", new_callable=PropertyMock) + @mock.patch(f"{HOOK_PATH}.get_headers") def test_get_request_url_header_params(self, mock_get_header, mock_conn_param): """Test get_request_url_header_params by mocking _get_conn_params and get_headers""" mock_conn_param.return_value = CONN_PARAMS @@ -419,11 +395,8 @@ def test_get_request_url_header_params(self, mock_get_header, mock_conn_param): assert header == HEADERS assert url == "https://airflow.af_region.snowflakecomputing.com/api/v2/statements/uuid" - @mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook.get_private_key") - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params", - new_callable=PropertyMock, - ) + @mock.patch(f"{HOOK_PATH}.get_private_key") + @mock.patch(f"{HOOK_PATH}._get_conn_params", new_callable=PropertyMock) @mock.patch("airflow.providers.snowflake.utils.sql_api_generate_jwt.JWTGenerator.get_token") def test_get_headers_should_support_private_key(self, mock_get_token, mock_conn_param, mock_private_key): """Test get_headers method by mocking get_private_key and _get_conn_params method""" @@ -433,11 +406,8 @@ def test_get_headers_should_support_private_key(self, mock_get_token, mock_conn_ result = hook.get_headers() assert result == HEADERS - @mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook.get_oauth_token") - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params", - new_callable=PropertyMock, - ) + @mock.patch(f"{HOOK_PATH}.get_oauth_token") + @mock.patch(f"{HOOK_PATH}._get_conn_params", new_callable=PropertyMock) def test_get_headers_should_support_oauth(self, mock_conn_param, mock_oauth_token): """Test get_headers method by mocking get_oauth_token and _get_conn_params method""" mock_conn_param.return_value = CONN_PARAMS_OAUTH @@ -448,10 +418,7 @@ def test_get_headers_should_support_oauth(self, mock_conn_param, mock_oauth_toke @mock.patch("airflow.providers.snowflake.hooks.snowflake.HTTPBasicAuth") @mock.patch("requests.post") - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params", - new_callable=PropertyMock, - ) + @mock.patch(f"{HOOK_PATH}._get_conn_params", new_callable=PropertyMock) def test_get_oauth_token(self, mock_conn_param, requests_post, mock_auth): """Test get_oauth_token method makes the right http request""" basic_auth = {"Authorization": "Basic usernamepassword"} @@ -655,10 +622,7 @@ def test_get_private_key_should_support_private_auth_with_unencrypted_key( (404, {"status": "error", "message": "test"}, {"status": "error", "message": "test"}), ], ) - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook." - "get_request_url_header_params" - ) + @mock.patch(f"{HOOK_PATH}.get_request_url_header_params") def test_get_sql_api_query_status( self, mock_geturl_header_params, status_code, response, expected_response, mock_requests ): @@ -718,10 +682,7 @@ def raise_for_status(self): (404, {"status": "error", "message": "test"}, {"status": "error", "message": "test"}), ], ) - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook." - "get_request_url_header_params" - ) + @mock.patch(f"{HOOK_PATH}.get_request_url_header_params") async def test_get_sql_api_query_status_async( self, mock_geturl_header_params, status_code, response, expected_response, mock_async_request ): @@ -829,11 +790,8 @@ def test_hook_parameter_propagation(self, hook_params): ], ) @mock.patch("uuid.uuid4") - @mock.patch( - "airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params", - new_callable=PropertyMock, - ) - @mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook.get_headers") + @mock.patch(f"{HOOK_PATH}._get_conn_params", new_callable=PropertyMock) + @mock.patch(f"{HOOK_PATH}.get_headers") def test_proper_parametrization_of_execute_query_api_request( self, mock_get_headers, @@ -1079,6 +1037,241 @@ def test_make_api_call_with_retries_custom_retry_config(self, mock_requests): # Should attempt only 2 times due to custom config assert mock_requests.request.call_count == 2 + @mock.patch(f"{MODULE_PATH}.time.sleep") + def test_wait_for_query_immediate_success(self, sleep_mock): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + hook.get_sql_api_query_status = mock.MagicMock( + return_value={"status": "success", "results": ["row1", "row2"]} + ) + + result = hook.wait_for_query(query_id="qid-123") + + assert result == {"status": "success", "results": ["row1", "row2"]} + sleep_mock.assert_not_called() + + @mock.patch(f"{MODULE_PATH}.time.sleep") + def test_wait_for_query_polls_until_done(self, sleep_mock): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + hook.get_sql_api_query_status = mock.MagicMock( + side_effect=[ + {"status": "running"}, + {"status": "running"}, + {"status": "success", "data": [1, 2, 3]}, + ] + ) + qid = "qid-456" + + result = hook.wait_for_query(query_id=qid, poll_interval=2) + + assert result == {"status": "success", "data": [1, 2, 3]} + assert sleep_mock.call_count == 2 + sleep_mock.assert_has_calls([mock.call(2), mock.call(2)]) + hook.get_sql_api_query_status.assert_has_calls( + [mock.call(query_id=qid), mock.call(query_id=qid), mock.call(query_id=qid)] + ) + + @mock.patch(f"{MODULE_PATH}.time.sleep") + def test_wait_for_query_error_raises_when_requested(self, sleep_mock): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + hook.get_sql_api_query_status = mock.MagicMock(return_value={"status": "error", "message": "oh no!"}) + + with pytest.raises(RuntimeError) as excinfo: + hook.wait_for_query("qid-789", raise_error=True) + assert str(excinfo.value) == "oh no!" + sleep_mock.assert_not_called() + + @mock.patch(f"{MODULE_PATH}.time.sleep") + def test_wait_for_query_error_returns_when_not_raising(self, sleep_mock): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + hook.get_sql_api_query_status = mock.MagicMock( + return_value={"status": "error", "message": "still bad"} + ) + + result = hook.wait_for_query(query_id="qid-000", raise_error=False) + assert result == {"status": "error", "message": "still bad"} + sleep_mock.assert_not_called() + + @mock.patch(f"{MODULE_PATH}.time.sleep") + def test_wait_for_query_handles_unknown_status(self, sleep_mock): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + hook.get_sql_api_query_status = mock.MagicMock(return_value={"status": "queued", "info": ["a", "b"]}) + + result = hook.wait_for_query(query_id="qid-111") + assert result == {"status": "queued", "info": ["a", "b"]} + sleep_mock.assert_not_called() + + @mock.patch(f"{MODULE_PATH}.time.time") + @mock.patch(f"{MODULE_PATH}.time.sleep") + def test_wait_for_query_timeout_error(self, sleep_mock, time_mock): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + + # Simulate a query that keeps running and never finishes + hook.get_sql_api_query_status = mock.MagicMock(return_value={"status": "running"}) + time_mock.side_effect = list(range(5)) + + qid = "qid-789" + timeout = 3 + + with pytest.raises(TimeoutError): + hook.wait_for_query(query_id=qid, timeout=timeout, poll_interval=1) + + # Ensure that we polled multiple times before the timeout error is raised + assert sleep_mock.call_count == 3 + sleep_mock.assert_has_calls([mock.call(1)] * 3) + assert hook.get_sql_api_query_status.call_count == 4 + hook.get_sql_api_query_status.assert_has_calls([mock.call(query_id=qid)] * 4) + assert time_mock.call_count == 5 + + @mock.patch(f"{HOOK_PATH}._make_api_call_with_retries") + @mock.patch(f"{HOOK_PATH}._process_response") + @mock.patch(f"{HOOK_PATH}.get_request_url_header_params") + def test_get_result_from_successful_sql_api_query_no_data( + self, mock_get_url, mock_process_response, mock_api_call + ): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + + mock_get_url.return_value = ("header", "params", "url") + mock_process_response.return_value = {"status": "success"} + mock_api_call.return_value = (200, {"data": []}) + + result = hook.get_result_from_successful_sql_api_query(query_id="qid-1") + + assert result == [] + mock_get_url.assert_called_once_with("qid-1") + mock_api_call.assert_called_once_with("GET", "url", "header", "params") + mock_process_response.assert_called_once_with(200, {"data": []}) + + @mock.patch(f"{HOOK_PATH}._make_api_call_with_retries") + @mock.patch(f"{HOOK_PATH}._process_response") + @mock.patch(f"{HOOK_PATH}.get_request_url_header_params") + def test_get_result_from_successful_sql_api_query_no_column_metadata( + self, mock_get_url, mock_process_response, mock_api_call + ): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + + mock_get_url.return_value = ("header", "params", "url") + mock_process_response.return_value = {"status": "success"} + mock_api_call.return_value = (200, {"data": [[1, 2]], "resultSetMetaData": {"rowType": []}}) + + result = hook.get_result_from_successful_sql_api_query(query_id="qid-2") + + assert result == [] + mock_get_url.assert_called_once_with("qid-2") + mock_api_call.assert_called_once_with("GET", "url", "header", "params") + mock_process_response.assert_called_once_with( + 200, {"data": [[1, 2]], "resultSetMetaData": {"rowType": []}} + ) + + @mock.patch(f"{HOOK_PATH}._make_api_call_with_retries") + @mock.patch(f"{HOOK_PATH}._process_response") + @mock.patch(f"{HOOK_PATH}.get_request_url_header_params") + def test_get_result_from_successful_sql_api_query_raises_error( + self, mock_get_url, mock_process_response, mock_api_call + ): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + + mock_get_url.return_value = ("header", "params", "url") + mock_process_response.return_value = {"status": "failed"} + mock_api_call.return_value = (400, {"some": "response"}) + + with pytest.raises( + RuntimeError, match="Query must have status `success` to retrieve data; got `failed`." + ): + hook.get_result_from_successful_sql_api_query(query_id="qid-3") + + mock_get_url.assert_called_once_with("qid-3") + mock_api_call.assert_called_once_with("GET", "url", "header", "params") + mock_process_response.assert_called_once_with(400, {"some": "response"}) + + @mock.patch(f"{HOOK_PATH}._make_api_call_with_retries") + @mock.patch(f"{HOOK_PATH}._process_response") + @mock.patch(f"{HOOK_PATH}.get_request_url_header_params") + def test_get_result_from_successful_sql_api_query_single_partition( + self, mock_get_url, mock_process_response, mock_api_call + ): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + + mock_get_url.return_value = ("header", "params", "url") + mock_process_response.return_value = {"status": "success"} + mock_api_call.return_value = ( + 200, + { + "data": [[1, "a"], [2, "b"]], + "resultSetMetaData": { + "rowType": [{"name": "id"}, {"name": "val"}], + "partitionInfo": [{"p0": "p0"}], # Single partition + }, + }, + ) + + result = hook.get_result_from_successful_sql_api_query(query_id="qid-4") + + expected = [{"id": 1, "val": "a"}, {"id": 2, "val": "b"}] + assert result == expected + + # Only one API call (no additional partitions) + mock_api_call.assert_called_once_with("GET", "url", "header", "params") + mock_get_url.assert_called_once_with("qid-4") + mock_process_response.assert_called_once_with( + 200, + { + "data": [[1, "a"], [2, "b"]], + "resultSetMetaData": { + "rowType": [{"name": "id"}, {"name": "val"}], + "partitionInfo": [{"p0": "p0"}], + }, + }, + ) + + @mock.patch(f"{HOOK_PATH}._make_api_call_with_retries") + @mock.patch(f"{HOOK_PATH}._process_response") + @mock.patch(f"{HOOK_PATH}.get_request_url_header_params") + def test_get_result_from_successful_sql_api_query_multiple_partitions( + self, mock_get_url, mock_process_response, mock_api_call + ): + hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn") + + # Initial response with metadata and 2 partitions + initial_response = { + "data": [[1, "a"], [2, "b"]], + "resultSetMetaData": { + "rowType": [{"name": "id"}, {"name": "val"}], + "partitionInfo": [{"p0": "p0"}, {"p1": "p1"}], + }, + } + # Second partition response + second_response = { + "data": [[3, "c"], [4, "d"]], + "resultSetMetaData": { + "rowType": [{"name": "id"}, {"name": "val"}], + "partitionInfo": [{"p0": "p0"}, {"p1": "p1"}], + }, + } + + mock_get_url.return_value = ("header", "params", "https://example.com/api/query") + mock_process_response.return_value = {"status": "success"} + mock_api_call.side_effect = [ + (200, initial_response), # Initial call + (200, second_response), # Partition 1 + ] + + result = hook.get_result_from_successful_sql_api_query(query_id="qid-5") + + expected = [ + {"id": 1, "val": "a"}, + {"id": 2, "val": "b"}, + {"id": 3, "val": "c"}, + {"id": 4, "val": "d"}, + ] + assert result == expected + + # Two API calls: first for the initial query, second for partition 1 + assert mock_api_call.call_count == 2 + mock_api_call.assert_any_call("GET", "https://example.com/api/query", "header", "params") + mock_api_call.assert_any_call("GET", "https://example.com/api/query?partition=1", "header", "params") + mock_get_url.assert_called_once_with("qid-5") + mock_process_response.assert_called_once() + @pytest.mark.asyncio async def test_make_api_call_with_retries_async_success(self, mock_async_request): """