From b4757cc272820437c19b538203c4bffbf8bc39a4 Mon Sep 17 00:00:00 2001 From: Dzhem Aptula Date: Tue, 27 May 2025 21:36:24 +0200 Subject: [PATCH 1/8] feat: merge TimeDeltaSensorAsync to TimeDeltaSensor --- .gitignore | 1 + .../providers/standard/sensors/time_delta.py | 54 ++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 026a52c50a6cf..3f88e2c787bda 100644 --- a/.gitignore +++ b/.gitignore @@ -116,6 +116,7 @@ celerybeat-schedule # virtualenv .venv* venv* +airflow_venv/ ENV/ # Spyder project settings diff --git a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py index 35d5c83e5759b..132fe30b9bf16 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py @@ -21,6 +21,7 @@ from time import sleep from typing import TYPE_CHECKING, Any, NoReturn +from deprecated.classic import deprecated from packaging.version import Version from airflow.configuration import conf @@ -52,6 +53,7 @@ class TimeDeltaSensor(BaseSensorOperator): otherwise run_after will be used. :param delta: time to wait before succeeding. + :param deferrable: Run sensor in deferrable mode. If set to True, task will defer itself to avoid taking up a worker slot while it is waiting. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -59,9 +61,10 @@ class TimeDeltaSensor(BaseSensorOperator): """ - def __init__(self, *, delta, **kwargs): + def __init__(self, *, delta: timedelta, deferrable: bool = False, **kwargs): super().__init__(**kwargs) self.delta = delta + self.deferrable = deferrable def _derive_base_time(self, context: Context) -> datetime: """ @@ -90,7 +93,56 @@ def poke(self, context: Context) -> bool: self.log.info("Checking if the delta has elapsed base_time=%s, delta=%s", base_time, self.delta) return timezone.utcnow() > target_dttm + """ + Asynchronous execution + """ + + def execute(self, context: Context) -> bool | NoReturn: + """ + Depending on the deferrable flag, either execute the sensor in a blocking way or defer it. + + - Sync path → use BaseSensorOperator.execute() which loops over ``poke``. + - Async path → defer to DateTimeTrigger and free the worker slot. + """ + if not self.deferrable: + return super().execute(context=context) + + # Deferrable path + base_time = self._derive_base_time(context=context) + target_dttm: datetime = base_time + self.delta + + if timezone.utcnow() > target_dttm: + # If the target datetime is in the past, return immediately + return True + try: + if AIRFLOW_V_3_0_PLUS: + trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger) + else: + trigger = DateTimeTrigger(moment=target_dttm) + except (TypeError, ValueError) as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise + + # todo: remove backcompat when min airflow version greater than 2.11 + timeout: int | float | timedelta + if AIRFLOW_V_3_0_PLUS: + timeout = self.timeout + else: + # <=2.11 requires timedelta + timeout = timedelta(seconds=self.timeout) + + self.defer( + trigger=trigger, + method_name="execute_complete", + timeout=timeout, + ) + +# TODO: Remember to remove +@deprecated( + "use `TimeDeltaSensor` with flag `deferrable=True` instead", category="AirflowProvidersDeprecationWarning" +) class TimeDeltaSensorAsync(TimeDeltaSensor): """ A deferrable drop-in replacement for TimeDeltaSensor. From dc2276d2b4504f13d6774582b84e93e6d02bb329 Mon Sep 17 00:00:00 2001 From: Dzhem Aptula Date: Tue, 27 May 2025 21:46:32 +0200 Subject: [PATCH 2/8] feat: add a test for deferrable TimeDeltaSensor --- .../unit/standard/sensors/test_time_delta.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/providers/standard/tests/unit/standard/sensors/test_time_delta.py b/providers/standard/tests/unit/standard/sensors/test_time_delta.py index 12838f4364d55..061220fe639e3 100644 --- a/providers/standard/tests/unit/standard/sensors/test_time_delta.py +++ b/providers/standard/tests/unit/standard/sensors/test_time_delta.py @@ -18,9 +18,11 @@ from __future__ import annotations from datetime import timedelta +from typing import Any from unittest import mock import pendulum +from providers.standard.src.airflow.providers.standard.sensors.time_delta import DateTimeTrigger import pytest import time_machine @@ -105,6 +107,53 @@ def test_timedelta_sensor_run_after_vs_interval(run_after, interval_end, dag_mak assert actual == expected +@pytest.mark.parametrize( + "run_after, interval_end", + [ + (timezone.utcnow() + timedelta(days=1), timezone.utcnow() + timedelta(days=2)), + (timezone.utcnow() + timedelta(days=1), None), + ], +) +def test_timedelta_sensor_deferrable_run_after_vs_interval(run_after, interval_end, dag_maker): + """Test that TimeDeltaSensor defers correctly when flag is enabled.""" + if not AIRFLOW_V_3_0_PLUS and not interval_end: + pytest.skip("not applicable") + + context: dict[str, Any] = {} + if interval_end: + context["data_interval_end"] = interval_end + + with dag_maker() as dag: + # DagRun kwargs differ slightly between 2.x and 3.0+ + dagrun_kwargs = {"run_id": "test_deferrable"} + if AIRFLOW_V_3_0_PLUS: + from airflow.utils.types import DagRunTriggeredByType + + dagrun_kwargs.update(triggered_by=DagRunTriggeredByType.TEST, run_after=run_after) + + delta = timedelta(minutes=5) + sensor = TimeDeltaSensor( + task_id="timedelta_sensor_deferrable", + delta=delta, + dag=dag, + deferrable=True, # <-- the feature under test + ) + + dr = dag.create_dagrun(state=None, run_type=DagRunType.MANUAL, **dagrun_kwargs) + context.update(dag_run=dr) + + expected_base = interval_end or run_after + expected_fire_time = expected_base + delta + + with pytest.raises(TaskDeferred) as td: + sensor.execute(context) + + # The sensor should defer once with a DateTimeTrigger + trigger = td.value.trigger + assert isinstance(trigger, DateTimeTrigger) + assert trigger.moment == expected_fire_time + + class TestTimeDeltaSensorAsync: def setup_method(self): self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True) From 326eb87328684c70f6c5f1ad429948cdcc1b2ec9 Mon Sep 17 00:00:00 2001 From: Dzhem Aptula Date: Tue, 27 May 2025 22:15:58 +0200 Subject: [PATCH 3/8] feat: add example, simplify code in TimeDeltaSensor (as per TimeSensor) --- .../providers/standard/sensors/time_delta.py | 67 +++++-------------- .../tests/system/standard/example_sensors.py | 6 +- .../unit/standard/sensors/test_time_delta.py | 3 +- 3 files changed, 23 insertions(+), 53 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py index 132fe30b9bf16..bdbb9c69b8ea6 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import warnings from datetime import datetime, timedelta from time import sleep from typing import TYPE_CHECKING, Any, NoReturn @@ -25,7 +26,7 @@ from packaging.version import Version from airflow.configuration import conf -from airflow.exceptions import AirflowSkipException +from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowSkipException from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.sensors.base import BaseSensorOperator @@ -61,10 +62,13 @@ class TimeDeltaSensor(BaseSensorOperator): """ - def __init__(self, *, delta: timedelta, deferrable: bool = False, **kwargs): + def __init__( + self, *, delta: timedelta, deferrable: bool = False, end_from_trigger: bool = False, **kwargs + ): super().__init__(**kwargs) self.delta = delta self.deferrable = deferrable + self.end_from_trigger = end_from_trigger def _derive_base_time(self, context: Context) -> datetime: """ @@ -138,64 +142,29 @@ def execute(self, context: Context) -> bool | NoReturn: timeout=timeout, ) + def execute_complete(self, context: Context, event: Any = None) -> None: + """Handle the event when the trigger fires and return immediately.""" + return None + # TODO: Remember to remove @deprecated( - "use `TimeDeltaSensor` with flag `deferrable=True` instead", category="AirflowProvidersDeprecationWarning" + "Use `TimeDeltaSensor` with `deferrable=True` instead", category="AirflowProvidersDeprecationWarning" ) class TimeDeltaSensorAsync(TimeDeltaSensor): """ - A deferrable drop-in replacement for TimeDeltaSensor. - - Will defers itself to avoid taking up a worker slot while it is waiting. - - :param delta: time length to wait after the data interval before succeeding. - :param end_from_trigger: End the task directly from the triggerer without going into the worker. - - .. seealso:: - For more information on how to use this sensor, take a look at the guide: - :ref:`howto/operator:TimeDeltaSensorAsync` + Deprecated. Use TimeDeltaSensor with deferrable=True instead. + :sphinx-autoapi-skip: """ def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs) -> None: - super().__init__(delta=delta, **kwargs) - self.end_from_trigger = end_from_trigger - - def execute(self, context: Context) -> bool | NoReturn: - base_time = self._derive_base_time(context=context) - target_dttm: datetime = base_time + self.delta - - if timezone.utcnow() > target_dttm: - # If the target datetime is in the past, return immediately - return True - try: - if AIRFLOW_V_3_0_PLUS: - trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger) - else: - trigger = DateTimeTrigger(moment=target_dttm) - except (TypeError, ValueError) as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - raise - - # todo: remove backcompat when min airflow version greater than 2.11 - timeout: int | float | timedelta - if AIRFLOW_V_3_0_PLUS: - timeout = self.timeout - else: - # <=2.11 requires timedelta - timeout = timedelta(seconds=self.timeout) - - self.defer( - trigger=trigger, - method_name="execute_complete", - timeout=timeout, + warnings.warn( + "TimeDeltaSensorAsync is deprecated and will be removed in a future version. Use `TimeDeltaSensor` with `deferrable=True` instead.", + AirflowProviderDeprecationWarning, + stacklevel=2, ) - - def execute_complete(self, context: Context, event: Any = None) -> None: - """Handle the event when the trigger fires and return immediately.""" - return None + super().__init__(delta=delta, deferrable=True, end_from_trigger=end_from_trigger, **kwargs) class WaitSensor(BaseSensorOperator): diff --git a/providers/standard/tests/system/standard/example_sensors.py b/providers/standard/tests/system/standard/example_sensors.py index bc151f140c695..79d8e6d628859 100644 --- a/providers/standard/tests/system/standard/example_sensors.py +++ b/providers/standard/tests/system/standard/example_sensors.py @@ -26,7 +26,7 @@ from airflow.providers.standard.sensors.filesystem import FileSensor from airflow.providers.standard.sensors.python import PythonSensor from airflow.providers.standard.sensors.time import TimeSensor -from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync +from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor from airflow.providers.standard.sensors.weekday import DayOfWeekSensor from airflow.providers.standard.utils.weekday import WeekDay from airflow.sdk import DAG @@ -57,7 +57,9 @@ def failure_callable(): # [END example_time_delta_sensor] # [START example_time_delta_sensor_async] - t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async", delta=datetime.timedelta(seconds=2)) + t0a = TimeDeltaSensor( + task_id="wait_some_seconds_async", delta=datetime.timedelta(seconds=2), deferrable=True + ) # [END example_time_delta_sensor_async] # [START example_time_sensors] diff --git a/providers/standard/tests/unit/standard/sensors/test_time_delta.py b/providers/standard/tests/unit/standard/sensors/test_time_delta.py index 061220fe639e3..4b2d6c1c0df6b 100644 --- a/providers/standard/tests/unit/standard/sensors/test_time_delta.py +++ b/providers/standard/tests/unit/standard/sensors/test_time_delta.py @@ -124,8 +124,7 @@ def test_timedelta_sensor_deferrable_run_after_vs_interval(run_after, interval_e context["data_interval_end"] = interval_end with dag_maker() as dag: - # DagRun kwargs differ slightly between 2.x and 3.0+ - dagrun_kwargs = {"run_id": "test_deferrable"} + dagrun_kwargs = {} if AIRFLOW_V_3_0_PLUS: from airflow.utils.types import DagRunTriggeredByType From b81dc354ea5471a00605e32d338ef481e325360f Mon Sep 17 00:00:00 2001 From: Dzhem Aptula Date: Wed, 28 May 2025 22:14:05 +0200 Subject: [PATCH 4/8] feat: PR comments & adapt failing tests due to deprecation --- .../providers/standard/sensors/time_delta.py | 9 +- .../unit/standard/sensors/test_time_delta.py | 97 ++++++++++--------- 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py index bdbb9c69b8ea6..84efe6e92a4fc 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py @@ -63,7 +63,12 @@ class TimeDeltaSensor(BaseSensorOperator): """ def __init__( - self, *, delta: timedelta, deferrable: bool = False, end_from_trigger: bool = False, **kwargs + self, + *, + delta: timedelta, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + end_from_trigger: bool = False, + **kwargs, ): super().__init__(**kwargs) self.delta = delta @@ -149,7 +154,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None: # TODO: Remember to remove @deprecated( - "Use `TimeDeltaSensor` with `deferrable=True` instead", category="AirflowProvidersDeprecationWarning" + "Use `TimeDeltaSensor` with `deferrable=True` instead", category=AirflowProviderDeprecationWarning ) class TimeDeltaSensorAsync(TimeDeltaSensor): """ diff --git a/providers/standard/tests/unit/standard/sensors/test_time_delta.py b/providers/standard/tests/unit/standard/sensors/test_time_delta.py index 4b2d6c1c0df6b..8aed583e7bab5 100644 --- a/providers/standard/tests/unit/standard/sensors/test_time_delta.py +++ b/providers/standard/tests/unit/standard/sensors/test_time_delta.py @@ -22,11 +22,10 @@ from unittest import mock import pendulum -from providers.standard.src.airflow.providers.standard.sensors.time_delta import DateTimeTrigger import pytest import time_machine -from airflow.exceptions import TaskDeferred +from airflow.exceptions import AirflowProviderDeprecationWarning, TaskDeferred from airflow.models import DagBag from airflow.models.dag import DAG from airflow.providers.standard.sensors.time_delta import ( @@ -34,6 +33,7 @@ TimeDeltaSensorAsync, WaitSensor, ) +from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils import timezone from airflow.utils.timezone import datetime @@ -124,11 +124,11 @@ def test_timedelta_sensor_deferrable_run_after_vs_interval(run_after, interval_e context["data_interval_end"] = interval_end with dag_maker() as dag: - dagrun_kwargs = {} + kwargs = {} if AIRFLOW_V_3_0_PLUS: from airflow.utils.types import DagRunTriggeredByType - dagrun_kwargs.update(triggered_by=DagRunTriggeredByType.TEST, run_after=run_after) + kwargs.update(triggered_by=DagRunTriggeredByType.TEST, run_after=run_after) delta = timedelta(minutes=5) sensor = TimeDeltaSensor( @@ -138,7 +138,12 @@ def test_timedelta_sensor_deferrable_run_after_vs_interval(run_after, interval_e deferrable=True, # <-- the feature under test ) - dr = dag.create_dagrun(state=None, run_type=DagRunType.MANUAL, **dagrun_kwargs) + dr = dag.create_dagrun( + run_id="abcrhroceuh", + run_type=DagRunType.MANUAL, + state=None, + **kwargs, + ) context.update(dag_run=dr) expected_base = interval_end or run_after @@ -165,17 +170,20 @@ def setup_method(self): ) @mock.patch(DEFER_PATH) def test_timedelta_sensor(self, defer_mock, should_defer): - delta = timedelta(hours=1) - op = TimeDeltaSensorAsync(task_id="timedelta_sensor_check", delta=delta, dag=self.dag) - if should_defer: - data_interval_end = pendulum.now("UTC").add(hours=1) - else: - data_interval_end = pendulum.now("UTC").replace(microsecond=0, second=0, minute=0).add(hours=-1) - op.execute({"data_interval_end": data_interval_end}) - if should_defer: - defer_mock.assert_called_once() - else: - defer_mock.assert_not_called() + with pytest.warns(AirflowProviderDeprecationWarning) as m: + delta = timedelta(hours=1) + op = TimeDeltaSensorAsync(task_id="timedelta_sensor_check", delta=delta, dag=self.dag) + if should_defer: + data_interval_end = pendulum.now("UTC").add(hours=1) + else: + data_interval_end = ( + pendulum.now("UTC").replace(microsecond=0, second=0, minute=0).add(hours=-1) + ) + op.execute({"data_interval_end": data_interval_end}) + if should_defer: + defer_mock.assert_called_once() + else: + defer_mock.assert_not_called() @pytest.mark.parametrize( "should_defer", @@ -205,31 +213,32 @@ def test_wait_sensor(self, sleep_mock, defer_mock, should_defer): ) def test_timedelta_sensor_async_run_after_vs_interval(self, run_after, interval_end, dag_maker): """Interval end should be used as base time when present else run_after""" - if not AIRFLOW_V_3_0_PLUS and not interval_end: - pytest.skip("not applicable") - - context = {} - if interval_end: - context["data_interval_end"] = interval_end - with dag_maker() as dag: - kwargs = {} - if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - - kwargs.update(triggered_by=DagRunTriggeredByType.TEST, run_after=run_after) - - dr = dag.create_dagrun( - run_id="abcrhroceuh", - run_type=DagRunType.MANUAL, - state=None, - **kwargs, - ) - context.update(dag_run=dr) - delta = timedelta(seconds=1) - op = TimeDeltaSensorAsync(task_id="wait_sensor_check", delta=delta, dag=dag) - base_time = interval_end or run_after - expected_time = base_time + delta - with pytest.raises(TaskDeferred) as caught: - op.execute(context) - - assert caught.value.trigger.moment == expected_time + with pytest.warns(AirflowProviderDeprecationWarning) as m: + if not AIRFLOW_V_3_0_PLUS and not interval_end: + pytest.skip("not applicable") + + context = {} + if interval_end: + context["data_interval_end"] = interval_end + with dag_maker() as dag: + kwargs = {} + if AIRFLOW_V_3_0_PLUS: + from airflow.utils.types import DagRunTriggeredByType + + kwargs.update(triggered_by=DagRunTriggeredByType.TEST, run_after=run_after) + + dr = dag.create_dagrun( + run_id="abcrhroceuh", + run_type=DagRunType.MANUAL, + state=None, + **kwargs, + ) + context.update(dag_run=dr) + delta = timedelta(seconds=1) + op = TimeDeltaSensorAsync(task_id="wait_sensor_check", delta=delta, dag=dag) + base_time = interval_end or run_after + expected_time = base_time + delta + with pytest.raises(TaskDeferred) as caught: + op.execute(context) + + assert caught.value.trigger.moment == expected_time From cf8119654c7b3ab19fffe43be8516199137fef8a Mon Sep 17 00:00:00 2001 From: Dzhem Aptula Date: Wed, 28 May 2025 22:17:02 +0200 Subject: [PATCH 5/8] feat: get rid of airflow_venv/ from gitignore --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index 3f88e2c787bda..026a52c50a6cf 100644 --- a/.gitignore +++ b/.gitignore @@ -116,7 +116,6 @@ celerybeat-schedule # virtualenv .venv* venv* -airflow_venv/ ENV/ # Spyder project settings From 4b6fb66364efc5ba1828e490b7c9bd8fe25f089b Mon Sep 17 00:00:00 2001 From: Dzhem Aptula Date: Wed, 28 May 2025 22:22:43 +0200 Subject: [PATCH 6/8] feat: remove unused variables --- .../standard/tests/unit/standard/sensors/test_time_delta.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/standard/tests/unit/standard/sensors/test_time_delta.py b/providers/standard/tests/unit/standard/sensors/test_time_delta.py index 8aed583e7bab5..a42c5ad72cdcf 100644 --- a/providers/standard/tests/unit/standard/sensors/test_time_delta.py +++ b/providers/standard/tests/unit/standard/sensors/test_time_delta.py @@ -170,7 +170,7 @@ def setup_method(self): ) @mock.patch(DEFER_PATH) def test_timedelta_sensor(self, defer_mock, should_defer): - with pytest.warns(AirflowProviderDeprecationWarning) as m: + with pytest.warns(AirflowProviderDeprecationWarning): delta = timedelta(hours=1) op = TimeDeltaSensorAsync(task_id="timedelta_sensor_check", delta=delta, dag=self.dag) if should_defer: @@ -213,7 +213,7 @@ def test_wait_sensor(self, sleep_mock, defer_mock, should_defer): ) def test_timedelta_sensor_async_run_after_vs_interval(self, run_after, interval_end, dag_maker): """Interval end should be used as base time when present else run_after""" - with pytest.warns(AirflowProviderDeprecationWarning) as m: + with pytest.warns(AirflowProviderDeprecationWarning): if not AIRFLOW_V_3_0_PLUS and not interval_end: pytest.skip("not applicable") From 6e0094d301a794105de8777d337775747240bb12 Mon Sep 17 00:00:00 2001 From: James Hyphen Date: Thu, 29 May 2025 03:34:46 +0200 Subject: [PATCH 7/8] Update providers/standard/src/airflow/providers/standard/sensors/time_delta.py Co-authored-by: Wei Lee --- .../src/airflow/providers/standard/sensors/time_delta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py index 84efe6e92a4fc..40ad809343bb7 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py @@ -152,7 +152,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None: return None -# TODO: Remember to remove +# TODO: Remove in the next major release @deprecated( "Use `TimeDeltaSensor` with `deferrable=True` instead", category=AirflowProviderDeprecationWarning ) From 39ac8ac9573d86e2fc9403f7f7f3d5c28660e643 Mon Sep 17 00:00:00 2001 From: Dzhem Aptula Date: Thu, 29 May 2025 10:30:47 +0200 Subject: [PATCH 8/8] feat: resolve test errors --- .../openlineage/example_openlineage_defer.py | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/providers/openlineage/tests/system/openlineage/example_openlineage_defer.py b/providers/openlineage/tests/system/openlineage/example_openlineage_defer.py index f8589ad631002..69b021cf43fc8 100644 --- a/providers/openlineage/tests/system/openlineage/example_openlineage_defer.py +++ b/providers/openlineage/tests/system/openlineage/example_openlineage_defer.py @@ -21,9 +21,11 @@ from pathlib import Path from airflow import DAG +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import Variable from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.sensors.time_delta import TimeDeltaSensorAsync +import pytest from system.openlineage.operator import OpenLineageTestOperator @@ -45,21 +47,22 @@ def check_start_amount_func(): schedule=None, catchup=False, ) as dag: - # Timedelta is compared to the DAGRun start timestamp, which can occur long before a worker picks up the - # task. We need to ensure the sensor gets deferred at least once, so setting 180s. - wait = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(seconds=180)) + with pytest.warns(AirflowProviderDeprecationWarning): + # Timedelta is compared to the DAGRun start timestamp, which can occur long before a worker picks up the + # task. We need to ensure the sensor gets deferred at least once, so setting 180s. + wait = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(seconds=180)) - check_start_events_amount = PythonOperator( - task_id="check_start_events_amount", python_callable=check_start_amount_func - ) + check_start_events_amount = PythonOperator( + task_id="check_start_events_amount", python_callable=check_start_amount_func + ) - check_events = OpenLineageTestOperator( - task_id="check_events", - file_path=str(Path(__file__).parent / "example_openlineage_defer.json"), - allow_duplicate_events=True, - ) + check_events = OpenLineageTestOperator( + task_id="check_events", + file_path=str(Path(__file__).parent / "example_openlineage_defer.json"), + allow_duplicate_events=True, + ) - wait >> check_start_events_amount >> check_events + wait >> check_start_events_amount >> check_events from tests_common.test_utils.system_tests import get_test_run # noqa: E402