From af4a05d9a7b808684216f26aabf6e14eab76762e Mon Sep 17 00:00:00 2001 From: jroachgolf84 Date: Tue, 20 May 2025 14:23:51 -0400 Subject: [PATCH 1/5] Stashing initial changes --- .../providers/standard/sensors/time.py | 52 +++++++++++++++++-- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index bbe93dbc01a5e..0df057520eda5 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -61,13 +61,57 @@ class TimeSensor(BaseSensorOperator): """ - def __init__(self, *, target_time: datetime.time, **kwargs) -> None: + start_trigger_args = StartTriggerArgs( + trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger", + trigger_kwargs={"moment": "", "end_from_trigger": False}, + next_method="execute_complete", + next_kwargs=None, + timeout=None, + ) + start_from_trigger = False + + def __init__( + self, + *, + target_time: datetime.time, + deferrable: bool = False, + **kwargs + ) -> None: + super().__init__(**kwargs) - self.target_time = target_time + + # Create a "date-aware" timestamp that will be used as the "target_datetime" + aware_time = timezone.coerce_datetime( + datetime.datetime.combine(datetime.datetime.today(), target_time, self.dag.timezone) + ) + + self.target_datetime = timezone.convert_to_utc(aware_time) + self.deferrable = deferrable + + self.end_from_trigger = kwargs.get("end_from_trigger", False) + if self.start_from_trigger: + self.start_trigger_args.trigger_kwargs = dict( + moment=self.target_datetime, end_from_trigger=self.end_from_trigger + ) + + + def execute(self, context: Context) -> NoReturn: + if self.deferrable: + self.defer( + trigger=DateTimeTrigger( + moment=self.target_datetime, + end_from_trigger=self.end_from_trigger + ), + method_name="execute_complete", + ) + + def execute_complete(self, context: Context) -> NoReturn: + # Return immediately + return None def poke(self, context: Context) -> bool: - self.log.info("Checking if the time (%s) has come", self.target_time) - return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time + self.log.info("Checking if the time (%s) has come", self.target_datetime) + return timezone.make_naive(timezone.utcnow(), self.dag.timezone).datetime() > self.target_datetime class TimeSensorAsync(BaseSensorOperator): From 7ac555bbf17eb184674ef38e2febfc42b13d0ed3 Mon Sep 17 00:00:00 2001 From: jroachgolf84 Date: Tue, 20 May 2025 17:08:02 -0400 Subject: [PATCH 2/5] Merging TimeSensorAsync to TimeSensor with deferrable mode Per #50812, merging the `TimeSensorAsync` Sensor with `TimeSensor`, and adding deferrable mode. --- .../authoring-and-scheduling/deferring.rst | 4 +- airflow-core/src/airflow/sensors/__init__.py | 1 - providers/standard/docs/sensors/datetime.rst | 18 +--- .../providers/standard/sensors/time.py | 90 ++++--------------- .../tests/system/standard/example_sensors.py | 15 ++-- .../tests/unit/standard/sensors/test_time.py | 67 +++++++------- 6 files changed, 63 insertions(+), 132 deletions(-) diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst b/airflow-core/docs/authoring-and-scheduling/deferring.rst index 68933af157d74..08a9058e4c623 100644 --- a/airflow-core/docs/authoring-and-scheduling/deferring.rst +++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst @@ -36,14 +36,14 @@ You can either use pre-written deferrable operators as a DAG author or write you Using Deferrable Operators -------------------------- -If you want to use pre-written deferrable operators that come with Airflow, such as ``TimeSensorAsync``, then you only need to complete two steps: +If you want to use pre-written deferrable operators that come with Airflow, such as ``TimeSensor``, then you only need to complete two steps: * Ensure your Airflow installation runs at least one ``triggerer`` process, as well as the normal ``scheduler`` * Use deferrable operators/sensors in your dags Airflow automatically handles and implements the deferral processes for you. -If you're upgrading existing dags to use deferrable operators, Airflow contains API-compatible sensor variants, like ``TimeSensorAsync`` for ``TimeSensor``. Add these variants into your DAG to use deferrable operators with no other changes required. +If you're upgrading existing dags to use deferrable operators, Airflow contains API-compatible sensor variants. Add these variants into your dag to use deferrable operators with no other changes required. Note that you can't use the deferral ability from inside custom PythonOperator or TaskFlow Python functions. Deferral is only available to traditional, class-based operators. diff --git a/airflow-core/src/airflow/sensors/__init__.py b/airflow-core/src/airflow/sensors/__init__.py index 62a4037df2f66..d25938cba57ea 100644 --- a/airflow-core/src/airflow/sensors/__init__.py +++ b/airflow-core/src/airflow/sensors/__init__.py @@ -40,7 +40,6 @@ }, "time_sensor": { "TimeSensor": "airflow.providers.standard.sensors.time.TimeSensor", - "TimeSensorAsync": "airflow.providers.standard.sensors.time.TimeSensorAsync", }, "weekday": { "DayOfWeekSensor": "airflow.providers.standard.sensors.weekday.DayOfWeekSensor", diff --git a/providers/standard/docs/sensors/datetime.rst b/providers/standard/docs/sensors/datetime.rst index aabff0f866dce..f1556993baf76 100644 --- a/providers/standard/docs/sensors/datetime.rst +++ b/providers/standard/docs/sensors/datetime.rst @@ -54,7 +54,7 @@ It is an async version of the operator and requires Triggerer to run. TimeSensor ========== -Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensor` to end sensing after time specified. +Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensor` to end sensing after time specified. ``TimeSensor`` can be run in deferrable mode, if a Triggerer is available. Time will be evaluated against ``data_interval_end`` if present for the dag run, otherwise ``run_after`` will be used. @@ -65,22 +65,6 @@ Time will be evaluated against ``data_interval_end`` if present for the dag run, :end-before: [END example_time_sensors] -.. _howto/operator:TimeSensorAsync: - -TimeSensorAsync -=============== - -Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensorAsync` to end sensing after time specified. -It is an async version of the operator and requires Triggerer to run. - -Time will be evaluated against ``data_interval_end`` if present for the dag run, otherwise ``run_after`` will be used. - -.. exampleinclude:: /../tests/system/standard/example_sensors.py - :language: python - :dedent: 4 - :start-after: [START example_time_sensors_async] - :end-before: [END example_time_sensors_async] - .. _howto/operator:DayOfWeekSensor: DayOfWeekSensor diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index 0df057520eda5..16a8610a2a164 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -19,7 +19,7 @@ import datetime from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, NoReturn +from typing import TYPE_CHECKING, Any from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.sensors.base import BaseSensorOperator @@ -54,6 +54,7 @@ class TimeSensor(BaseSensorOperator): Waits until the specified time of the day. :param target_time: time after which the job succeeds + :param deferrable: whether to defer execution .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -75,101 +76,44 @@ def __init__( *, target_time: datetime.time, deferrable: bool = False, - **kwargs + start_from_trigger: bool = False, + end_from_trigger: bool = False, + **kwargs, ) -> None: - super().__init__(**kwargs) - # Create a "date-aware" timestamp that will be used as the "target_datetime" + # Create a "date-aware" timestamp that will be used as the "target_datetime". This is a requirement + # of the DateTimeTrigger aware_time = timezone.coerce_datetime( datetime.datetime.combine(datetime.datetime.today(), target_time, self.dag.timezone) ) + # Now that the dag's timezone has made the datetime timezone aware, we need to convert to UTC self.target_datetime = timezone.convert_to_utc(aware_time) self.deferrable = deferrable + self.start_from_trigger = start_from_trigger + self.end_from_trigger = end_from_trigger - self.end_from_trigger = kwargs.get("end_from_trigger", False) if self.start_from_trigger: self.start_trigger_args.trigger_kwargs = dict( moment=self.target_datetime, end_from_trigger=self.end_from_trigger ) - - def execute(self, context: Context) -> NoReturn: + def execute(self, context: Context) -> None: if self.deferrable: self.defer( trigger=DateTimeTrigger( - moment=self.target_datetime, - end_from_trigger=self.end_from_trigger + moment=self.target_datetime, # This needs to be an aware timestamp + end_from_trigger=self.end_from_trigger, ), method_name="execute_complete", ) - def execute_complete(self, context: Context) -> NoReturn: - # Return immediately - return None + def execute_complete(self, context: Context) -> None: + return def poke(self, context: Context) -> bool: self.log.info("Checking if the time (%s) has come", self.target_datetime) - return timezone.make_naive(timezone.utcnow(), self.dag.timezone).datetime() > self.target_datetime - - -class TimeSensorAsync(BaseSensorOperator): - """ - Waits until the specified time of the day. - - This frees up a worker slot while it is waiting. - - :param target_time: time after which the job succeeds - :param start_from_trigger: Start the task directly from the triggerer without going into the worker. - :param end_from_trigger: End the task directly from the triggerer without going into the worker. - :param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True - during dynamic task mapping. This argument is not used in standard usage. - - .. seealso:: - For more information on how to use this sensor, take a look at the guide: - :ref:`howto/operator:TimeSensorAsync` - """ - - start_trigger_args = StartTriggerArgs( - trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger", - trigger_kwargs={"moment": "", "end_from_trigger": False}, - next_method="execute_complete", - next_kwargs=None, - timeout=None, - ) - start_from_trigger = False - - def __init__( - self, - *, - target_time: datetime.time, - start_from_trigger: bool = False, - trigger_kwargs: dict[str, Any] | None = None, - end_from_trigger: bool = False, - **kwargs, - ) -> None: - super().__init__(**kwargs) - self.start_from_trigger = start_from_trigger - self.end_from_trigger = end_from_trigger - self.target_time = target_time - - aware_time = timezone.coerce_datetime( - datetime.datetime.combine(datetime.datetime.today(), self.target_time, self.dag.timezone) - ) - - self.target_datetime = timezone.convert_to_utc(aware_time) - if self.start_from_trigger: - self.start_trigger_args.trigger_kwargs = dict( - moment=self.target_datetime, end_from_trigger=self.end_from_trigger - ) - - def execute(self, context: Context) -> NoReturn: - self.defer( - trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger), - method_name="execute_complete", - ) - def execute_complete(self, context: Context, event: Any = None) -> None: - """Handle the event when the trigger fires and return immediately.""" - return None + # self.target_date has been converted to UTC, so we do not need to convert timezone + return timezone.utcnow() > self.target_datetime diff --git a/providers/standard/tests/system/standard/example_sensors.py b/providers/standard/tests/system/standard/example_sensors.py index b5ea2f458fd94..bc151f140c695 100644 --- a/providers/standard/tests/system/standard/example_sensors.py +++ b/providers/standard/tests/system/standard/example_sensors.py @@ -25,7 +25,7 @@ from airflow.providers.standard.sensors.bash import BashSensor from airflow.providers.standard.sensors.filesystem import FileSensor from airflow.providers.standard.sensors.python import PythonSensor -from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync +from airflow.providers.standard.sensors.time import TimeSensor from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync from airflow.providers.standard.sensors.weekday import DayOfWeekSensor from airflow.providers.standard.utils.weekday import WeekDay @@ -71,20 +71,21 @@ def failure_callable(): soft_fail=True, target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), ) - # [END example_time_sensors] - # [START example_time_sensors_async] - t1a = TimeSensorAsync( - task_id="fire_immediately_async", target_time=datetime.datetime.now(tz=datetime.timezone.utc).time() + t1a = TimeSensor( + task_id="fire_immediately_async", + target_time=datetime.datetime.now(tz=datetime.timezone.utc).time(), + deferrable=True, ) - t2a = TimeSensorAsync( + t2a = TimeSensor( task_id="timeout_after_second_date_in_the_future_async", timeout=1, soft_fail=True, target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), + deferrable=True, ) - # [END example_time_sensors_async] + # [END example_time_sensors] # [START example_bash_sensors] t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0") diff --git a/providers/standard/tests/unit/standard/sensors/test_time.py b/providers/standard/tests/unit/standard/sensors/test_time.py index 017b410eda4df..44fbbf14fc229 100644 --- a/providers/standard/tests/unit/standard/sensors/test_time.py +++ b/providers/standard/tests/unit/standard/sensors/test_time.py @@ -25,7 +25,7 @@ from airflow.exceptions import TaskDeferred from airflow.models.dag import DAG -from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync +from airflow.providers.standard.sensors.time import TimeSensor from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -36,55 +36,58 @@ class TestTimeSensor: @pytest.mark.parametrize( - "default_timezone, start_date, expected", + "default_timezone, start_date, target_time ,expected", [ - ("UTC", DEFAULT_DATE_WO_TZ, True), - ("UTC", DEFAULT_DATE_WITH_TZ, False), - (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False), + ("UTC", DEFAULT_DATE_WO_TZ, time(10, 0), True), + ("UTC", DEFAULT_DATE_WITH_TZ, time(16, 0), True), + ("UTC", DEFAULT_DATE_WITH_TZ, time(23, 0), False), + (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, time(23, 0), False), ], ) - @time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc)) - def test_timezone(self, default_timezone, start_date, expected, monkeypatch): + @time_machine.travel(timezone.datetime(2020, 1, 1, 13, 0).replace(tzinfo=timezone.utc)) + def test_timezone(self, default_timezone, start_date, target_time, expected, monkeypatch): monkeypatch.setattr("airflow.settings.TIMEZONE", timezone.parse_timezone(default_timezone)) - dag = DAG("test", schedule=None, default_args={"start_date": start_date}) - op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag) + dag = DAG("test_timezone", schedule=None, default_args={"start_date": start_date}) + op = TimeSensor(task_id="test", target_time=target_time, dag=dag) assert op.poke(None) == expected + def test_target_time_aware_dag_timezone(self): + # This behavior should be the same for both deferrable and non-deferrable + with DAG("test_target_time_aware", schedule=None, start_date=datetime(2020, 1, 1, 13, 0)): + aware_time = time(0, 1).replace(tzinfo=timezone.parse_timezone(DEFAULT_TIMEZONE)) + op = TimeSensor(task_id="test", target_time=aware_time) + assert op.target_datetime.tzinfo == timezone.utc + + def test_target_time_naive_dag_timezone(self): + # Again, this behavior should be the same for both deferrable and non-deferrable + with DAG( + dag_id="test_target_time_naive_dag_timezone", + schedule=None, + start_date=datetime(2020, 1, 1, 23, 0, tzinfo=timezone.parse_timezone(DEFAULT_TIMEZONE)), + ): + op = TimeSensor(task_id="test", target_time=time(9, 0)) + + # Since the DEFAULT_TIMEZONE is UTC+8:00, then hour 9 should be converted to hour 1 + assert op.target_datetime.time() == pendulum.time(1, 0) + assert op.target_datetime.tzinfo == timezone.utc -class TestTimeSensorAsync: @time_machine.travel("2020-07-07 00:00:00", tick=False) def test_task_is_deferred(self): with DAG( dag_id="test_task_is_deferred", schedule=None, - start_date=timezone.datetime(2020, 1, 1, 23, 0), + start_date=datetime(2020, 1, 1, 13, 0), ): - op = TimeSensorAsync(task_id="test", target_time=time(10, 0)) + op = TimeSensor(task_id="test", target_time=time(10, 0), deferrable=True) + + # This should be converted to UTC in the __init__. Since there is no default timezone, it will become + # aware, but note changed assert not timezone.is_naive(op.target_datetime) with pytest.raises(TaskDeferred) as exc_info: op.execute({}) assert isinstance(exc_info.value.trigger, DateTimeTrigger) - assert exc_info.value.trigger.moment == timezone.datetime(2020, 7, 7, 10) + assert exc_info.value.trigger.moment == pendulum.datetime(2020, 7, 7, 10) assert exc_info.value.kwargs is None assert exc_info.value.method_name == "execute_complete" - - def test_target_time_aware(self): - with DAG("test_target_time_aware", schedule=None, start_date=timezone.datetime(2020, 1, 1, 23, 0)): - aware_time = time(0, 1).replace(tzinfo=pendulum.local_timezone()) - op = TimeSensorAsync(task_id="test", target_time=aware_time) - assert op.target_datetime.tzinfo == timezone.utc - - def test_target_time_naive_dag_timezone(self): - """ - Tests that naive target_time gets converted correctly using the DAG's timezone. - """ - with DAG( - dag_id="test_target_time_naive_dag_timezone", - schedule=None, - start_date=pendulum.datetime(2020, 1, 1, 0, 0, tz=DEFAULT_TIMEZONE), - ): - op = TimeSensorAsync(task_id="test", target_time=pendulum.time(9, 0)) - assert op.target_datetime.time() == pendulum.time(1, 0) - assert op.target_datetime.tzinfo == timezone.utc From 2303d7d4adcaf4077ec191475a2b8aa996d7a191 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 21 May 2025 20:57:37 +0530 Subject: [PATCH 3/5] fixup! Merging TimeSensorAsync to TimeSensor with deferrable mode --- airflow-core/src/airflow/sensors/__init__.py | 1 + .../airflow/providers/standard/sensors/time.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/airflow-core/src/airflow/sensors/__init__.py b/airflow-core/src/airflow/sensors/__init__.py index d25938cba57ea..62a4037df2f66 100644 --- a/airflow-core/src/airflow/sensors/__init__.py +++ b/airflow-core/src/airflow/sensors/__init__.py @@ -40,6 +40,7 @@ }, "time_sensor": { "TimeSensor": "airflow.providers.standard.sensors.time.TimeSensor", + "TimeSensorAsync": "airflow.providers.standard.sensors.time.TimeSensorAsync", }, "weekday": { "DayOfWeekSensor": "airflow.providers.standard.sensors.weekday.DayOfWeekSensor", diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index 16a8610a2a164..6a24a1956f811 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -18,9 +18,11 @@ from __future__ import annotations import datetime +import warnings from dataclasses import dataclass from typing import TYPE_CHECKING, Any +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.sensors.base import BaseSensorOperator @@ -117,3 +119,19 @@ def poke(self, context: Context) -> bool: # self.target_date has been converted to UTC, so we do not need to convert timezone return timezone.utcnow() > self.target_datetime + + +class TimeSensorAsync(TimeSensor): + """ + Deprecated. Use TimeSensor with deferrable=True instead. + + :sphinx-autoapi-skip: + """ + + def __init__(self, **kwargs) -> None: + warnings.warn( + "TimeSensorAsync is deprecated and will be removed in a future version. Use `TimeSensor` with deferrable=True instead.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + super().__init__(deferrable=True, **kwargs) From ff7cc9af64ba64f6a21f4298f4d4bb1f431234a1 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Thu, 22 May 2025 08:02:32 -0400 Subject: [PATCH 4/5] Update providers/standard/src/airflow/providers/standard/sensors/time.py Co-authored-by: Wei Lee --- .../standard/src/airflow/providers/standard/sensors/time.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index 6a24a1956f811..15635164041d9 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -77,7 +77,7 @@ def __init__( self, *, target_time: datetime.time, - deferrable: bool = False, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False) start_from_trigger: bool = False, end_from_trigger: bool = False, **kwargs, From e67dbead0b1a853a47a4177b33f15ab6634384f9 Mon Sep 17 00:00:00 2001 From: jroachgolf84 Date: Thu, 22 May 2025 08:06:09 -0400 Subject: [PATCH 5/5] Adding back trigger_kwargs, integrating feedback from team --- .../standard/src/airflow/providers/standard/sensors/time.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index 15635164041d9..7b5d87eb11b8a 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -22,6 +22,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any +from airflow.configuration import conf from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.sensors.base import BaseSensorOperator @@ -77,9 +78,10 @@ def __init__( self, *, target_time: datetime.time, - deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False) + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), start_from_trigger: bool = False, end_from_trigger: bool = False, + trigger_kwargs: dict[str, Any] | None = None, **kwargs, ) -> None: super().__init__(**kwargs)