From a8cb2af5c24d5c66aad065287573a032fa8ae5fa Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Fri, 24 Mar 2023 15:25:42 +0530 Subject: [PATCH 01/15] Implement BatchAsyncSensor --- .../amazon/aws/hooks/batch_client.py | 78 ++++++++++++++++++- airflow/providers/amazon/aws/sensors/batch.py | 37 ++++++++- .../providers/amazon/aws/triggers/batch.py | 65 +++++++++++++++- .../operators/batch.rst | 9 +++ .../amazon/aws/sensors/test_batch.py | 40 +++++++++- 5 files changed, 223 insertions(+), 6 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index 4f6e217341a0c..437b9d792f26b 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -26,15 +26,17 @@ """ from __future__ import annotations +import asyncio from random import uniform from time import sleep +from typing import Any import botocore.client import botocore.exceptions import botocore.waiter from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseAsyncHook, AwsBaseHook from airflow.typing_compat import Protocol, runtime_checkable @@ -577,3 +579,77 @@ def exp(tries): delay = 1 + pow(tries * 0.6, 2) delay = min(max_interval, delay) return uniform(delay / 3, delay) + + +class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook): + """ + Async client for AWS Batch services. + + :param max_retries: exponential back-off retries, 4200 = 48 hours; + polling is only used when waiters is None + :param status_retries: number of HTTP retries to get job status, 10; + polling is only used when waiters is None + """ + + def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.job_id = job_id + self.waiters = waiters + + @staticmethod + async def delay(delay: int | float | None = None) -> None: # type: ignore[override] + """ + Pause execution for ``delay`` seconds. + + :param delay: a delay to pause execution using ``time.sleep(delay)``; + a small 1 second jitter is applied to the delay. + + .. note:: + This method uses a default random delay, i.e. + ``random.sample()``; + using a random interval helps to avoid AWS API throttle limits + when many concurrent tasks request job-descriptions. + """ + if delay is None: + delay = uniform(BatchClientAsyncHook.DEFAULT_DELAY_MIN, BatchClientAsyncHook.DEFAULT_DELAY_MAX) + else: + delay = BatchClientAsyncHook.add_jitter(delay) + await asyncio.sleep(delay) + + async def get_job_description(self, job_id: str) -> dict[str, str]: # type: ignore[override] + """ + Get job description (using status_retries). + + :param job_id: a Batch job ID + :raises: AirflowException + """ + retries = 0 + async with await self.get_client_async() as client: + while True: + try: + response = await client.describe_jobs(jobs=[job_id]) + return self.parse_job_description(job_id, response) + + except botocore.exceptions.ClientError as err: + error = err.response.get("Error", {}) + if error.get("Code") == "TooManyRequestsException": + pass # allow it to retry, if possible + else: + raise AirflowException(f"AWS Batch job ({job_id}) description error: {err}") + + retries += 1 + if retries >= self.status_retries: + raise AirflowException( + f"AWS Batch job ({job_id}) description error: exceeded status_retries " + f"({self.status_retries})" + ) + + pause = self.exponential_delay(retries) + self.log.info( + "AWS Batch job (%s) description retry (%d of %d) in the next %.2f seconds", + job_id, + retries, + self.status_retries, + pause, + ) + await self.delay(pause) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index c93fc3d8b3140..86b3bf67c36eb 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -16,13 +16,16 @@ # under the License. from __future__ import annotations -from functools import cached_property -from typing import TYPE_CHECKING, Sequence +from datetime import timedelta +from typing import TYPE_CHECKING, Any, Sequence +from typing import TYPE_CHECKING, Any, Sequence from deprecated import deprecated +from functools import cached_property from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook +from airflow.providers.amazon.aws.triggers.batch import BatchSensorTrigger from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -41,6 +44,7 @@ class BatchSensor(BaseSensorOperator): :param job_id: Batch job_id to check the state for :param aws_conn_id: aws connection to use, defaults to 'aws_default' :param region_name: aws region name associated with the client + :param deferrable: Run sensor in the deferrable mode. """ template_fields: Sequence[str] = ("job_id",) @@ -53,12 +57,16 @@ def __init__( job_id: str, aws_conn_id: str = "aws_default", region_name: str | None = None, + deferrable: bool = False, + poke_interval: float = 5, **kwargs, ): super().__init__(**kwargs) self.job_id = job_id self.aws_conn_id = aws_conn_id self.region_name = region_name + self.deferrable = deferrable + self.poke_interval = poke_interval def poke(self, context: Context) -> bool: job_description = self.hook.get_job_description(self.job_id) @@ -75,6 +83,31 @@ def poke(self, context: Context) -> bool: raise AirflowException(f"Batch sensor failed. Unknown AWS Batch job status: {state}") + def execute(self, context: Context) -> None: + if not self.deferrable: + super().execute(context=context) + else: + self.defer( + timeout=timedelta(seconds=self.timeout), + trigger=BatchSensorTrigger( + job_id=self.job_id, + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + poke_interval=self.poke_interval, + ), + method_name="execute_complete", + ) + + def execute_complete(self, context: Context, event: dict[str, Any]) -> None: + """ + Callback for when the trigger fires - returns immediately. + Relies on trigger to throw an exception, otherwise it assumes execution was + successful. + """ + if "status" in event and event["status"] == "error": + raise AirflowException(event["message"]) + self.log.info(event["message"]) + @deprecated(reason="use `hook` property instead.") def get_hook(self) -> BatchClientHook: """Create and return a BatchClientHook.""" diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index dc858a80fd710..5128c48d52872 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -18,11 +18,11 @@ import asyncio from functools import cached_property -from typing import Any +from typing import Any, AsyncIterator from botocore.exceptions import WaiterError -from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook +from airflow.providers.amazon.aws.hooks.batch_client import BatchClientAsyncHook, BatchClientHook from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -105,3 +105,64 @@ async def run(self): yield TriggerEvent({"status": "failure", "message": "Job Failed - max attempts reached."}) else: yield TriggerEvent({"status": "success", "job_id": self.job_id}) + + +class BatchSensorTrigger(BaseTrigger): + """ + Checks for the status of a submitted job_id to AWS Batch until it reaches a failure or a success state. + BatchSensorTrigger is fired as deferred class with params to poll the job state in Triggerer + + :param job_id: the job ID, to poll for job completion or not + :param aws_conn_id: connection id of AWS credentials / region name. If None, + credential boto3 strategy will be used + :param region_name: AWS region name to use + Override the region_name in connection (if provided) + :param poke_interval: polling period in seconds to check for the status of the job + """ + + def __init__( + self, + job_id: str, + region_name: str | None, + aws_conn_id: str | None = "aws_default", + poke_interval: float = 5, + ): + super().__init__() + self.job_id = job_id + self.aws_conn_id = aws_conn_id + self.region_name = region_name + self.poke_interval = poke_interval + + def serialize(self) -> tuple[str, dict[str, Any]]: + """Serializes BatchSensorTrigger arguments and classpath.""" + return ( + "airflow.providers.amazon.aws.triggers.batch.BatchSensorTrigger", + { + "job_id": self.job_id, + "aws_conn_id": self.aws_conn_id, + "region_name": self.region_name, + "poke_interval": self.poke_interval, + }, + ) + + async def run(self) -> AsyncIterator[TriggerEvent]: + """ + Make async connection using aiobotocore library to AWS Batch, + periodically poll for the Batch job status + + The status that indicates job completion are: 'SUCCEEDED'|'FAILED'. + """ + hook = BatchClientAsyncHook(job_id=self.job_id, aws_conn_id=self.aws_conn_id) + try: + while True: + response = await hook.get_job_description(self.job_id) + state = response["status"] + if state == BatchClientAsyncHook.SUCCESS_STATE: + success_message = f"{self.job_id} was completed successfully" + yield TriggerEvent({"status": "success", "message": success_message}) + if state == BatchClientAsyncHook.FAILURE_STATE: + error_message = f"{self.job_id} failed" + yield TriggerEvent({"status": "error", "message": error_message}) + await asyncio.sleep(self.poke_interval) + except Exception as e: + yield TriggerEvent({"status": "error", "message": str(e)}) diff --git a/docs/apache-airflow-providers-amazon/operators/batch.rst b/docs/apache-airflow-providers-amazon/operators/batch.rst index bcfb86dbf73f6..4cc2a2b0cced4 100644 --- a/docs/apache-airflow-providers-amazon/operators/batch.rst +++ b/docs/apache-airflow-providers-amazon/operators/batch.rst @@ -77,6 +77,15 @@ use :class:`~airflow.providers.amazon.aws.sensors.batch.BatchSensor`. :start-after: [START howto_sensor_batch] :end-before: [END howto_sensor_batch] +In order to monitor the state of the AWS Batch Job asynchronously, use +:class:`~airflow.providers.amazon.aws.sensors.batch.BatchSensor` with the +parameter ``deferrable`` set to True. + +Since this will release the Airflow worker slot , it will lead to efficient +utilization of available resources on your Airflow deployment. +This will also need the triggerer component to be available in your +Airflow deployment. + .. _howto/sensor:BatchComputeEnvironmentSensor: Wait on an AWS Batch compute environment status diff --git a/tests/providers/amazon/aws/sensors/test_batch.py b/tests/providers/amazon/aws/sensors/test_batch.py index 835b99ad0a5c2..d532379e2705f 100644 --- a/tests/providers/amazon/aws/sensors/test_batch.py +++ b/tests/providers/amazon/aws/sensors/test_batch.py @@ -20,16 +20,18 @@ import pytest -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, TaskDeferred from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook from airflow.providers.amazon.aws.sensors.batch import ( BatchComputeEnvironmentSensor, BatchJobQueueSensor, BatchSensor, ) +from airflow.providers.amazon.aws.triggers.batch import BatchSensorTrigger TASK_ID = "batch_job_sensor" JOB_ID = "8222a1c2-b246-4e19-b1b8-0039bb4407c0" +AWS_REGION = "eu-west-1" class TestBatchSensor: @@ -195,3 +197,39 @@ def test_poke_invalid(self, mock_batch_client): jobQueues=[self.job_queue], ) assert "AWS Batch job queue failed" in str(ctx.value) + + +class TestBatchAsyncSensor: + TASK = BatchSensor(task_id="task", job_id=JOB_ID, region_name=AWS_REGION, deferrable=True) + + def test_batch_sensor_async(self): + """ + Asserts that a task is deferred and a BatchSensorTrigger will be fired + when the BatchSensorAsync is executed. + """ + + with pytest.raises(TaskDeferred) as exc: + self.TASK.execute({}) + assert isinstance(exc.value.trigger, BatchSensorTrigger), "Trigger is not a BatchSensorTrigger" + + def test_batch_sensor_async_execute_failure(self): + """Tests that an AirflowException is raised in case of error event""" + + with pytest.raises(AirflowException) as exc_info: + self.TASK.execute_complete( + context={}, event={"status": "error", "message": "test failure message"} + ) + + assert str(exc_info.value) == "test failure message" + + @pytest.mark.parametrize( + "event", + [{"status": "success", "message": f"AWS Batch job ({JOB_ID}) succeeded"}], + ) + def test_batch_sensor_async_execute_complete(self, caplog, event): + """Tests that execute_complete method returns None and that it prints expected log""" + + with mock.patch.object(self.TASK.log, "info") as mock_log_info: + assert self.TASK.execute_complete(context={}, event=event) is None + + mock_log_info.assert_called_with(event["message"]) From fc84fb1a2eb87725c72b7165af8a293b1d470056 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 30 May 2023 09:25:54 +0530 Subject: [PATCH 02/15] Use boto waiter in the trigger --- airflow/providers/amazon/aws/sensors/batch.py | 6 +- .../providers/amazon/aws/triggers/batch.py | 56 +++++++++++++------ 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index 86b3bf67c36eb..86bed2a895008 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -17,12 +17,11 @@ from __future__ import annotations from datetime import timedelta -from typing import TYPE_CHECKING, Any, Sequence +from functools import cached_property from typing import TYPE_CHECKING, Any, Sequence from deprecated import deprecated -from functools import cached_property from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook from airflow.providers.amazon.aws.triggers.batch import BatchSensorTrigger @@ -59,6 +58,7 @@ def __init__( region_name: str | None = None, deferrable: bool = False, poke_interval: float = 5, + max_retries: int | None = None, **kwargs, ): super().__init__(**kwargs) @@ -67,6 +67,7 @@ def __init__( self.region_name = region_name self.deferrable = deferrable self.poke_interval = poke_interval + self.max_retries = max_retries def poke(self, context: Context) -> bool: job_description = self.hook.get_job_description(self.job_id) @@ -94,6 +95,7 @@ def execute(self, context: Context) -> None: aws_conn_id=self.aws_conn_id, region_name=self.region_name, poke_interval=self.poke_interval, + max_retries=self.max_retries, ), method_name="execute_complete", ) diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index 5128c48d52872..3565699c0c8be 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -18,11 +18,11 @@ import asyncio from functools import cached_property -from typing import Any, AsyncIterator +from typing import Any from botocore.exceptions import WaiterError -from airflow.providers.amazon.aws.hooks.batch_client import BatchClientAsyncHook, BatchClientHook +from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -117,6 +117,8 @@ class BatchSensorTrigger(BaseTrigger): credential boto3 strategy will be used :param region_name: AWS region name to use Override the region_name in connection (if provided) + :param max_retries: Number of times to poll for job state before + returning the current state, defaults to None :param poke_interval: polling period in seconds to check for the status of the job """ @@ -126,12 +128,14 @@ def __init__( region_name: str | None, aws_conn_id: str | None = "aws_default", poke_interval: float = 5, + max_retries: int | None = None, ): super().__init__() self.job_id = job_id self.aws_conn_id = aws_conn_id self.region_name = region_name self.poke_interval = poke_interval + self.max_retries = max_retries def serialize(self) -> tuple[str, dict[str, Any]]: """Serializes BatchSensorTrigger arguments and classpath.""" @@ -142,27 +146,45 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "aws_conn_id": self.aws_conn_id, "region_name": self.region_name, "poke_interval": self.poke_interval, + "max_retries": self.max_retries, }, ) - async def run(self) -> AsyncIterator[TriggerEvent]: + @cached_property + def hook(self) -> BatchClientHook: + return BatchClientHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) + + async def run(self): """ Make async connection using aiobotocore library to AWS Batch, periodically poll for the Batch job status The status that indicates job completion are: 'SUCCEEDED'|'FAILED'. """ - hook = BatchClientAsyncHook(job_id=self.job_id, aws_conn_id=self.aws_conn_id) - try: - while True: - response = await hook.get_job_description(self.job_id) - state = response["status"] - if state == BatchClientAsyncHook.SUCCESS_STATE: - success_message = f"{self.job_id} was completed successfully" - yield TriggerEvent({"status": "success", "message": success_message}) - if state == BatchClientAsyncHook.FAILURE_STATE: - error_message = f"{self.job_id} failed" - yield TriggerEvent({"status": "error", "message": error_message}) - await asyncio.sleep(self.poke_interval) - except Exception as e: - yield TriggerEvent({"status": "error", "message": str(e)}) + async with self.hook.async_conn as client: + waiter = self.hook.get_waiter("batch_job_complete", deferrable=True, client=client) + attempt = 0 + while attempt < self.max_retries: + attempt = attempt + 1 + try: + await waiter.wait( + jobs=[self.job_id], + WaiterConfig={ + "Delay": self.poke_interval, + "MaxAttempts": 1, + }, + ) + break + except WaiterError as error: + self.log.info( + "Job status is %s. Retrying attempt %s/%s", + error.last_response["jobs"][0]["status"], + attempt, + self.max_retries, + ) + await asyncio.sleep(int(self.poke_interval)) + + if attempt >= self.max_retries: + yield TriggerEvent({"status": "failure", "message": "Job Failed - max attempts reached."}) + else: + yield TriggerEvent({"status": "success", "job_id": self.job_id}) From ec950dc5f45b2da17d4fdd9355ac1077bb2ea0bc Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 30 May 2023 16:43:23 +0530 Subject: [PATCH 03/15] Add max_retries --- airflow/providers/amazon/aws/sensors/batch.py | 2 +- airflow/providers/amazon/aws/triggers/batch.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index 86bed2a895008..cebf80b8a81c0 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -58,7 +58,7 @@ def __init__( region_name: str | None = None, deferrable: bool = False, poke_interval: float = 5, - max_retries: int | None = None, + max_retries: int = 5, **kwargs, ): super().__init__(**kwargs) diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index 3565699c0c8be..89f0c0c8d20d7 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -128,7 +128,7 @@ def __init__( region_name: str | None, aws_conn_id: str | None = "aws_default", poke_interval: float = 5, - max_retries: int | None = None, + max_retries: int = 5, ): super().__init__() self.job_id = job_id From bbf6da67e325076ea714209d3591ff9f29c882f0 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 30 May 2023 21:12:03 +0530 Subject: [PATCH 04/15] Modify event to include job id --- airflow/providers/amazon/aws/triggers/batch.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index 89f0c0c8d20d7..bb42e2005786d 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -187,4 +187,10 @@ async def run(self): if attempt >= self.max_retries: yield TriggerEvent({"status": "failure", "message": "Job Failed - max attempts reached."}) else: - yield TriggerEvent({"status": "success", "job_id": self.job_id}) + yield TriggerEvent( + { + "status": "success", + "job_id": self.job_id, + "message": f"Job {self.job_id} Succeeded", + } + ) From 05ff484f7af11e0f1facd45648ea66025fec2a44 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Mon, 5 Jun 2023 16:26:13 +0530 Subject: [PATCH 05/15] Remove BatchClientAsyncHook --- .../amazon/aws/hooks/batch_client.py | 78 +------------------ 1 file changed, 1 insertion(+), 77 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index 437b9d792f26b..4f6e217341a0c 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -26,17 +26,15 @@ """ from __future__ import annotations -import asyncio from random import uniform from time import sleep -from typing import Any import botocore.client import botocore.exceptions import botocore.waiter from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseAsyncHook, AwsBaseHook +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.typing_compat import Protocol, runtime_checkable @@ -579,77 +577,3 @@ def exp(tries): delay = 1 + pow(tries * 0.6, 2) delay = min(max_interval, delay) return uniform(delay / 3, delay) - - -class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook): - """ - Async client for AWS Batch services. - - :param max_retries: exponential back-off retries, 4200 = 48 hours; - polling is only used when waiters is None - :param status_retries: number of HTTP retries to get job status, 10; - polling is only used when waiters is None - """ - - def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - self.job_id = job_id - self.waiters = waiters - - @staticmethod - async def delay(delay: int | float | None = None) -> None: # type: ignore[override] - """ - Pause execution for ``delay`` seconds. - - :param delay: a delay to pause execution using ``time.sleep(delay)``; - a small 1 second jitter is applied to the delay. - - .. note:: - This method uses a default random delay, i.e. - ``random.sample()``; - using a random interval helps to avoid AWS API throttle limits - when many concurrent tasks request job-descriptions. - """ - if delay is None: - delay = uniform(BatchClientAsyncHook.DEFAULT_DELAY_MIN, BatchClientAsyncHook.DEFAULT_DELAY_MAX) - else: - delay = BatchClientAsyncHook.add_jitter(delay) - await asyncio.sleep(delay) - - async def get_job_description(self, job_id: str) -> dict[str, str]: # type: ignore[override] - """ - Get job description (using status_retries). - - :param job_id: a Batch job ID - :raises: AirflowException - """ - retries = 0 - async with await self.get_client_async() as client: - while True: - try: - response = await client.describe_jobs(jobs=[job_id]) - return self.parse_job_description(job_id, response) - - except botocore.exceptions.ClientError as err: - error = err.response.get("Error", {}) - if error.get("Code") == "TooManyRequestsException": - pass # allow it to retry, if possible - else: - raise AirflowException(f"AWS Batch job ({job_id}) description error: {err}") - - retries += 1 - if retries >= self.status_retries: - raise AirflowException( - f"AWS Batch job ({job_id}) description error: exceeded status_retries " - f"({self.status_retries})" - ) - - pause = self.exponential_delay(retries) - self.log.info( - "AWS Batch job (%s) description retry (%d of %d) in the next %.2f seconds", - job_id, - retries, - self.status_retries, - pause, - ) - await self.delay(pause) From 6abd68d28ee403bed651056349feb85118639845 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Mon, 5 Jun 2023 19:06:15 +0530 Subject: [PATCH 06/15] Apply review suggestions --- airflow/providers/amazon/aws/sensors/batch.py | 4 ++-- airflow/providers/amazon/aws/triggers/batch.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index cebf80b8a81c0..35f7658b08ba6 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -57,7 +57,7 @@ def __init__( aws_conn_id: str = "aws_default", region_name: str | None = None, deferrable: bool = False, - poke_interval: float = 5, + poke_interval: int = 5, max_retries: int = 5, **kwargs, ): @@ -106,7 +106,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None: Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ - if "status" in event and event["status"] == "error": + if "status" in event and event["status"] == "failure": raise AirflowException(event["message"]) self.log.info(event["message"]) diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index bb42e2005786d..9c47884f8ea05 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -127,7 +127,7 @@ def __init__( job_id: str, region_name: str | None, aws_conn_id: str | None = "aws_default", - poke_interval: float = 5, + poke_interval: int = 5, max_retries: int = 5, ): super().__init__() From f4fb98441b504afc4c5f3eef0de02357aad699f9 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Mon, 5 Jun 2023 20:33:50 +0530 Subject: [PATCH 07/15] Fix static check failure --- airflow/providers/amazon/aws/sensors/batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index 35f7658b08ba6..b618d9e3c0677 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -57,7 +57,7 @@ def __init__( aws_conn_id: str = "aws_default", region_name: str | None = None, deferrable: bool = False, - poke_interval: int = 5, + poke_interval: int = 5, # type: ignore[arg-type] max_retries: int = 5, **kwargs, ): From f1b327305227e45fd34dd037158064f153f01102 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Mon, 5 Jun 2023 21:00:29 +0530 Subject: [PATCH 08/15] Fix static check failure --- airflow/providers/amazon/aws/sensors/batch.py | 2 +- airflow/providers/amazon/aws/triggers/batch.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index b618d9e3c0677..32418d23d53bf 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -57,7 +57,7 @@ def __init__( aws_conn_id: str = "aws_default", region_name: str | None = None, deferrable: bool = False, - poke_interval: int = 5, # type: ignore[arg-type] + poke_interval: float = 5, max_retries: int = 5, **kwargs, ): diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index 9c47884f8ea05..01e3a939e271c 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -127,7 +127,7 @@ def __init__( job_id: str, region_name: str | None, aws_conn_id: str | None = "aws_default", - poke_interval: int = 5, + poke_interval: float = 5, max_retries: int = 5, ): super().__init__() @@ -170,7 +170,7 @@ async def run(self): await waiter.wait( jobs=[self.job_id], WaiterConfig={ - "Delay": self.poke_interval, + "Delay": int(self.poke_interval), "MaxAttempts": 1, }, ) From bfa5223f820270a8adba1bd37e3137a563230d93 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Mon, 5 Jun 2023 21:36:30 +0530 Subject: [PATCH 09/15] Fix test failure --- tests/providers/amazon/aws/sensors/test_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/amazon/aws/sensors/test_batch.py b/tests/providers/amazon/aws/sensors/test_batch.py index d532379e2705f..42e9bffb5b688 100644 --- a/tests/providers/amazon/aws/sensors/test_batch.py +++ b/tests/providers/amazon/aws/sensors/test_batch.py @@ -217,7 +217,7 @@ def test_batch_sensor_async_execute_failure(self): with pytest.raises(AirflowException) as exc_info: self.TASK.execute_complete( - context={}, event={"status": "error", "message": "test failure message"} + context={}, event={"status": "failure", "message": "test failure message"} ) assert str(exc_info.value) == "test failure message" From 85a3fa81a13be4b585920102a25df2c6427fa411 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Thu, 8 Jun 2023 18:31:44 +0530 Subject: [PATCH 10/15] Fix static check failure --- airflow/providers/amazon/aws/triggers/batch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index 01e3a939e271c..a599c26105cbf 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -110,7 +110,7 @@ async def run(self): class BatchSensorTrigger(BaseTrigger): """ Checks for the status of a submitted job_id to AWS Batch until it reaches a failure or a success state. - BatchSensorTrigger is fired as deferred class with params to poll the job state in Triggerer + BatchSensorTrigger is fired as deferred class with params to poll the job state in Triggerer. :param job_id: the job ID, to poll for job completion or not :param aws_conn_id: connection id of AWS credentials / region name. If None, @@ -157,7 +157,7 @@ def hook(self) -> BatchClientHook: async def run(self): """ Make async connection using aiobotocore library to AWS Batch, - periodically poll for the Batch job status + periodically poll for the Batch job status. The status that indicates job completion are: 'SUCCEEDED'|'FAILED'. """ From bcfcfb3f708d441af911ad0291de35ca25acc88a Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Mon, 12 Jun 2023 17:34:23 +0530 Subject: [PATCH 11/15] Apply review suggestions --- airflow/providers/amazon/aws/sensors/batch.py | 3 + .../providers/amazon/aws/triggers/batch.py | 10 +- .../amazon/aws/triggers/test_batch.py | 104 +++++++++++++++++- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index 32418d23d53bf..52b4c880b6c5f 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -44,6 +44,9 @@ class BatchSensor(BaseSensorOperator): :param aws_conn_id: aws connection to use, defaults to 'aws_default' :param region_name: aws region name associated with the client :param deferrable: Run sensor in the deferrable mode. + :param poke_interval: polling period in seconds to check for the status of the job. + :param max_retries: Number of times to poll for job state before + returning the current state. """ template_fields: Sequence[str] = ("job_id",) diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index a599c26105cbf..1b9664799840e 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -113,13 +113,13 @@ class BatchSensorTrigger(BaseTrigger): BatchSensorTrigger is fired as deferred class with params to poll the job state in Triggerer. :param job_id: the job ID, to poll for job completion or not - :param aws_conn_id: connection id of AWS credentials / region name. If None, - credential boto3 strategy will be used :param region_name: AWS region name to use Override the region_name in connection (if provided) + :param aws_conn_id: connection id of AWS credentials / region name. If None, + credential boto3 strategy will be used + :param poke_interval: polling period in seconds to check for the status of the job :param max_retries: Number of times to poll for job state before returning the current state, defaults to None - :param poke_interval: polling period in seconds to check for the status of the job """ def __init__( @@ -177,8 +177,8 @@ async def run(self): break except WaiterError as error: self.log.info( - "Job status is %s. Retrying attempt %s/%s", - error.last_response["jobs"][0]["status"], + "Job response is %s. Retrying attempt %s/%s", + error.last_response["Error"]["Message"], attempt, self.max_retries, ) diff --git a/tests/providers/amazon/aws/triggers/test_batch.py b/tests/providers/amazon/aws/triggers/test_batch.py index 6f87d92a2da6a..7f244c358068b 100644 --- a/tests/providers/amazon/aws/triggers/test_batch.py +++ b/tests/providers/amazon/aws/triggers/test_batch.py @@ -16,12 +16,13 @@ # under the License. from __future__ import annotations +import asyncio from unittest import mock from unittest.mock import AsyncMock import pytest -from airflow.providers.amazon.aws.triggers.batch import BatchOperatorTrigger +from airflow.providers.amazon.aws.triggers.batch import BatchOperatorTrigger, BatchSensorTrigger from airflow.triggers.base import TriggerEvent BATCH_JOB_ID = "job_id" @@ -69,3 +70,104 @@ async def test_batch_job_trigger_run(self, mock_async_conn, mock_get_waiter): response = await generator.asend(None) assert response == TriggerEvent({"status": "success", "job_id": BATCH_JOB_ID}) + + +class TestBatchSensorTrigger: + TRIGGER = BatchSensorTrigger( + job_id=BATCH_JOB_ID, + region_name=AWS_REGION, + aws_conn_id=AWS_CONN_ID, + poke_interval=POLL_INTERVAL, + ) + + def test_batch_sensor_trigger_serialization(self): + """ + Asserts that the BatchSensorTrigger correctly serializes its arguments + and classpath. + """ + + classpath, kwargs = self.TRIGGER.serialize() + assert classpath == "airflow.providers.amazon.aws.triggers.batch.BatchSensorTrigger" + assert kwargs == { + "job_id": BATCH_JOB_ID, + "max_retries": 5, + "region_name": AWS_REGION, + "aws_conn_id": AWS_CONN_ID, + "poke_interval": POLL_INTERVAL, + } + + @pytest.mark.asyncio + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_job_description") + async def test_batch_sensor_trigger_run(self, mock_response): + """Trigger the BatchSensorTrigger and check if the task is in running state.""" + mock_response.return_value = {"status": "RUNNABLE"} + + task = asyncio.create_task(self.TRIGGER.run().__anext__()) + await asyncio.sleep(0.5) + # TriggerEvent was not returned + assert task.done() is False + asyncio.get_event_loop().stop() + + @pytest.mark.asyncio + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_waiter") + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.async_conn") + async def test_batch_job_trigger_run(self, mock_async_conn, mock_get_waiter): + the_mock = mock.MagicMock() + mock_async_conn.__aenter__.return_value = the_mock + + mock_get_waiter().wait = AsyncMock() + + batch_trigger = BatchOperatorTrigger( + job_id=BATCH_JOB_ID, + poll_interval=POLL_INTERVAL, + max_retries=MAX_ATTEMPT, + aws_conn_id=AWS_CONN_ID, + region_name=AWS_REGION, + ) + + generator = batch_trigger.run() + response = await generator.asend(None) + + assert response == TriggerEvent({"status": "success", "job_id": BATCH_JOB_ID}) + + @pytest.mark.asyncio + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_waiter") + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.async_conn") + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_job_description") + async def test_batch_sensor_trigger_completed(self, mock_response, mock_async_conn, mock_get_waiter): + """Test if the success event is returned from trigger.""" + mock_response.return_value = {"status": "SUCCEEDED"} + + the_mock = mock.MagicMock() + mock_async_conn.__aenter__.return_value = the_mock + + mock_get_waiter().wait = AsyncMock() + + trigger = BatchSensorTrigger( + job_id=BATCH_JOB_ID, + region_name=AWS_REGION, + aws_conn_id=AWS_CONN_ID, + ) + generator = trigger.run() + actual_response = await generator.asend(None) + assert ( + TriggerEvent( + {"status": "success", "job_id": BATCH_JOB_ID, "message": f"Job {BATCH_JOB_ID} Succeeded"} + ) + == actual_response + ) + + @pytest.mark.asyncio + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_waiter") + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_job_description") + async def test_batch_sensor_trigger_failure(self, mock_response, mock_get_waiter): + """Test if the failure event is returned from trigger.""" + trigger = BatchSensorTrigger( + job_id=BATCH_JOB_ID, region_name=AWS_REGION, aws_conn_id=AWS_CONN_ID, max_retries=0 + ) + generator = trigger.run() + actual_response = await generator.asend(None) + assert ( + TriggerEvent({"status": "failure", "message": "Job Failed - max attempts reached."}) + == actual_response + ) From 76185c40b1c55f0d282037f300f8fb8e32f0ffaa Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Mon, 12 Jun 2023 19:00:34 +0530 Subject: [PATCH 12/15] skip aiobotore import error --- tests/providers/amazon/aws/triggers/test_batch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/providers/amazon/aws/triggers/test_batch.py b/tests/providers/amazon/aws/triggers/test_batch.py index 7f244c358068b..6ca23bb03edac 100644 --- a/tests/providers/amazon/aws/triggers/test_batch.py +++ b/tests/providers/amazon/aws/triggers/test_batch.py @@ -30,6 +30,7 @@ MAX_ATTEMPT = 5 AWS_CONN_ID = "aws_batch_job_conn" AWS_REGION = "us-east-2" +pytest.importorskip("aiobotocore") class TestBatchOperatorTrigger: From 9dc72f5056a6ba17df939fb469d05ac03c72d91c Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 13 Jun 2023 09:56:42 +0530 Subject: [PATCH 13/15] Apply review suggestions --- airflow/providers/amazon/aws/sensors/batch.py | 8 +++-- .../providers/amazon/aws/triggers/batch.py | 30 ++++++++----------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index 52b4c880b6c5f..475b0ecb71bb6 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -91,14 +91,18 @@ def execute(self, context: Context) -> None: if not self.deferrable: super().execute(context=context) else: + timeout = ( + timedelta(seconds=self.max_retries * self.poke_interval + 60) + if self.max_retries + else self.execution_timeout + ) self.defer( - timeout=timedelta(seconds=self.timeout), + timeout=timeout, trigger=BatchSensorTrigger( job_id=self.job_id, aws_conn_id=self.aws_conn_id, region_name=self.region_name, poke_interval=self.poke_interval, - max_retries=self.max_retries, ), method_name="execute_complete", ) diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index 1b9664799840e..e0eb965d8f95e 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -118,8 +118,6 @@ class BatchSensorTrigger(BaseTrigger): :param aws_conn_id: connection id of AWS credentials / region name. If None, credential boto3 strategy will be used :param poke_interval: polling period in seconds to check for the status of the job - :param max_retries: Number of times to poll for job state before - returning the current state, defaults to None """ def __init__( @@ -128,14 +126,12 @@ def __init__( region_name: str | None, aws_conn_id: str | None = "aws_default", poke_interval: float = 5, - max_retries: int = 5, ): super().__init__() self.job_id = job_id self.aws_conn_id = aws_conn_id self.region_name = region_name self.poke_interval = poke_interval - self.max_retries = max_retries def serialize(self) -> tuple[str, dict[str, Any]]: """Serializes BatchSensorTrigger arguments and classpath.""" @@ -146,7 +142,6 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "aws_conn_id": self.aws_conn_id, "region_name": self.region_name, "poke_interval": self.poke_interval, - "max_retries": self.max_retries, }, ) @@ -164,7 +159,7 @@ async def run(self): async with self.hook.async_conn as client: waiter = self.hook.get_waiter("batch_job_complete", deferrable=True, client=client) attempt = 0 - while attempt < self.max_retries: + while True: attempt = attempt + 1 try: await waiter.wait( @@ -176,21 +171,20 @@ async def run(self): ) break except WaiterError as error: + if "Error" in str(error): + yield TriggerEvent({"status": "failure", "message": f"Job Failed: {error}"}) + break self.log.info( - "Job response is %s. Retrying attempt %s/%s", + "Job response is %s. Retrying attempt %s", error.last_response["Error"]["Message"], attempt, - self.max_retries, ) await asyncio.sleep(int(self.poke_interval)) - if attempt >= self.max_retries: - yield TriggerEvent({"status": "failure", "message": "Job Failed - max attempts reached."}) - else: - yield TriggerEvent( - { - "status": "success", - "job_id": self.job_id, - "message": f"Job {self.job_id} Succeeded", - } - ) + yield TriggerEvent( + { + "status": "success", + "job_id": self.job_id, + "message": f"Job {self.job_id} Succeeded", + } + ) From 450aae849305ee63068a4a5e78c6f53e576f8900 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 13 Jun 2023 12:47:24 +0530 Subject: [PATCH 14/15] Fix test failures --- .../providers/amazon/aws/triggers/batch.py | 2 +- .../amazon/aws/triggers/test_batch.py | 36 +++++++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index e0eb965d8f95e..f4a5de15254fa 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -171,7 +171,7 @@ async def run(self): ) break except WaiterError as error: - if "Error" in str(error): + if "error" in str(error): yield TriggerEvent({"status": "failure", "message": f"Job Failed: {error}"}) break self.log.info( diff --git a/tests/providers/amazon/aws/triggers/test_batch.py b/tests/providers/amazon/aws/triggers/test_batch.py index 6ca23bb03edac..430d28147db7b 100644 --- a/tests/providers/amazon/aws/triggers/test_batch.py +++ b/tests/providers/amazon/aws/triggers/test_batch.py @@ -21,6 +21,7 @@ from unittest.mock import AsyncMock import pytest +from botocore.exceptions import WaiterError from airflow.providers.amazon.aws.triggers.batch import BatchOperatorTrigger, BatchSensorTrigger from airflow.triggers.base import TriggerEvent @@ -91,7 +92,6 @@ def test_batch_sensor_trigger_serialization(self): assert classpath == "airflow.providers.amazon.aws.triggers.batch.BatchSensorTrigger" assert kwargs == { "job_id": BATCH_JOB_ID, - "max_retries": 5, "region_name": AWS_REGION, "aws_conn_id": AWS_CONN_ID, "poke_interval": POLL_INTERVAL, @@ -159,16 +159,38 @@ async def test_batch_sensor_trigger_completed(self, mock_response, mock_async_co ) @pytest.mark.asyncio + @mock.patch("asyncio.sleep") @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_waiter") @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_job_description") - async def test_batch_sensor_trigger_failure(self, mock_response, mock_get_waiter): + @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.async_conn") + async def test_batch_sensor_trigger_failure( + self, mock_async_conn, mock_response, mock_get_waiter, mock_sleep + ): """Test if the failure event is returned from trigger.""" - trigger = BatchSensorTrigger( - job_id=BATCH_JOB_ID, region_name=AWS_REGION, aws_conn_id=AWS_CONN_ID, max_retries=0 + a_mock = mock.MagicMock() + mock_async_conn.__aenter__.return_value = a_mock + + mock_response.return_value = {"status": "failed"} + + name = "batch_job_complete" + reason = ( + "An error occurred (UnrecognizedClientException): The security token included in the " + "request is invalid. " + ) + last_response = ({"Error": {"Message": "The security token included in the request is invalid."}},) + + error_failed = WaiterError( + name=name, + reason=reason, + last_response=last_response, ) + + mock_get_waiter().wait.side_effect = AsyncMock(side_effect=[error_failed]) + mock_sleep.return_value = True + + trigger = BatchSensorTrigger(job_id=BATCH_JOB_ID, region_name=AWS_REGION, aws_conn_id=AWS_CONN_ID) generator = trigger.run() actual_response = await generator.asend(None) - assert ( - TriggerEvent({"status": "failure", "message": "Job Failed - max attempts reached."}) - == actual_response + assert actual_response == TriggerEvent( + {"status": "failure", "message": f"Job Failed: Waiter {name} failed: {reason}"} ) From 0ae0e3a425be6ba492c96d1e62be76d067f063bd Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 13 Jun 2023 14:05:20 +0530 Subject: [PATCH 15/15] Fix tests --- tests/providers/amazon/aws/triggers/test_batch.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/tests/providers/amazon/aws/triggers/test_batch.py b/tests/providers/amazon/aws/triggers/test_batch.py index 430d28147db7b..5cf125f8280a5 100644 --- a/tests/providers/amazon/aws/triggers/test_batch.py +++ b/tests/providers/amazon/aws/triggers/test_batch.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import asyncio from unittest import mock from unittest.mock import AsyncMock @@ -97,18 +96,6 @@ def test_batch_sensor_trigger_serialization(self): "poke_interval": POLL_INTERVAL, } - @pytest.mark.asyncio - @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_job_description") - async def test_batch_sensor_trigger_run(self, mock_response): - """Trigger the BatchSensorTrigger and check if the task is in running state.""" - mock_response.return_value = {"status": "RUNNABLE"} - - task = asyncio.create_task(self.TRIGGER.run().__anext__()) - await asyncio.sleep(0.5) - # TriggerEvent was not returned - assert task.done() is False - asyncio.get_event_loop().stop() - @pytest.mark.asyncio @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.get_waiter") @mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.async_conn")