diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst b/airflow-core/docs/authoring-and-scheduling/deferring.rst index 68933af157d74..08a9058e4c623 100644 --- a/airflow-core/docs/authoring-and-scheduling/deferring.rst +++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst @@ -36,14 +36,14 @@ You can either use pre-written deferrable operators as a DAG author or write you Using Deferrable Operators -------------------------- -If you want to use pre-written deferrable operators that come with Airflow, such as ``TimeSensorAsync``, then you only need to complete two steps: +If you want to use pre-written deferrable operators that come with Airflow, such as ``TimeSensor``, then you only need to complete two steps: * Ensure your Airflow installation runs at least one ``triggerer`` process, as well as the normal ``scheduler`` * Use deferrable operators/sensors in your dags Airflow automatically handles and implements the deferral processes for you. -If you're upgrading existing dags to use deferrable operators, Airflow contains API-compatible sensor variants, like ``TimeSensorAsync`` for ``TimeSensor``. Add these variants into your DAG to use deferrable operators with no other changes required. +If you're upgrading existing dags to use deferrable operators, Airflow contains API-compatible sensor variants. Add these variants into your dag to use deferrable operators with no other changes required. Note that you can't use the deferral ability from inside custom PythonOperator or TaskFlow Python functions. Deferral is only available to traditional, class-based operators. diff --git a/providers/standard/docs/sensors/datetime.rst b/providers/standard/docs/sensors/datetime.rst index aabff0f866dce..f1556993baf76 100644 --- a/providers/standard/docs/sensors/datetime.rst +++ b/providers/standard/docs/sensors/datetime.rst @@ -54,7 +54,7 @@ It is an async version of the operator and requires Triggerer to run. TimeSensor ========== -Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensor` to end sensing after time specified. +Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensor` to end sensing after time specified. ``TimeSensor`` can be run in deferrable mode, if a Triggerer is available. Time will be evaluated against ``data_interval_end`` if present for the dag run, otherwise ``run_after`` will be used. @@ -65,22 +65,6 @@ Time will be evaluated against ``data_interval_end`` if present for the dag run, :end-before: [END example_time_sensors] -.. _howto/operator:TimeSensorAsync: - -TimeSensorAsync -=============== - -Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensorAsync` to end sensing after time specified. -It is an async version of the operator and requires Triggerer to run. - -Time will be evaluated against ``data_interval_end`` if present for the dag run, otherwise ``run_after`` will be used. - -.. exampleinclude:: /../tests/system/standard/example_sensors.py - :language: python - :dedent: 4 - :start-after: [START example_time_sensors_async] - :end-before: [END example_time_sensors_async] - .. _howto/operator:DayOfWeekSensor: DayOfWeekSensor diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index bbe93dbc01a5e..7b5d87eb11b8a 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -18,9 +18,12 @@ from __future__ import annotations import datetime +import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, NoReturn +from typing import TYPE_CHECKING, Any +from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.sensors.base import BaseSensorOperator @@ -54,6 +57,7 @@ class TimeSensor(BaseSensorOperator): Waits until the specified time of the day. :param target_time: time after which the job succeeds + :param deferrable: whether to defer execution .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -61,32 +65,6 @@ class TimeSensor(BaseSensorOperator): """ - def __init__(self, *, target_time: datetime.time, **kwargs) -> None: - super().__init__(**kwargs) - self.target_time = target_time - - def poke(self, context: Context) -> bool: - self.log.info("Checking if the time (%s) has come", self.target_time) - return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time - - -class TimeSensorAsync(BaseSensorOperator): - """ - Waits until the specified time of the day. - - This frees up a worker slot while it is waiting. - - :param target_time: time after which the job succeeds - :param start_from_trigger: Start the task directly from the triggerer without going into the worker. - :param end_from_trigger: End the task directly from the triggerer without going into the worker. - :param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True - during dynamic task mapping. This argument is not used in standard usage. - - .. seealso:: - For more information on how to use this sensor, take a look at the guide: - :ref:`howto/operator:TimeSensorAsync` - """ - start_trigger_args = StartTriggerArgs( trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger", trigger_kwargs={"moment": "", "end_from_trigger": False}, @@ -100,32 +78,62 @@ def __init__( self, *, target_time: datetime.time, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), start_from_trigger: bool = False, - trigger_kwargs: dict[str, Any] | None = None, end_from_trigger: bool = False, + trigger_kwargs: dict[str, Any] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) - self.start_from_trigger = start_from_trigger - self.end_from_trigger = end_from_trigger - self.target_time = target_time + # Create a "date-aware" timestamp that will be used as the "target_datetime". This is a requirement + # of the DateTimeTrigger aware_time = timezone.coerce_datetime( - datetime.datetime.combine(datetime.datetime.today(), self.target_time, self.dag.timezone) + datetime.datetime.combine(datetime.datetime.today(), target_time, self.dag.timezone) ) + # Now that the dag's timezone has made the datetime timezone aware, we need to convert to UTC self.target_datetime = timezone.convert_to_utc(aware_time) + self.deferrable = deferrable + self.start_from_trigger = start_from_trigger + self.end_from_trigger = end_from_trigger + if self.start_from_trigger: self.start_trigger_args.trigger_kwargs = dict( moment=self.target_datetime, end_from_trigger=self.end_from_trigger ) - def execute(self, context: Context) -> NoReturn: - self.defer( - trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger), - method_name="execute_complete", - ) + def execute(self, context: Context) -> None: + if self.deferrable: + self.defer( + trigger=DateTimeTrigger( + moment=self.target_datetime, # This needs to be an aware timestamp + end_from_trigger=self.end_from_trigger, + ), + method_name="execute_complete", + ) - def execute_complete(self, context: Context, event: Any = None) -> None: - """Handle the event when the trigger fires and return immediately.""" - return None + def execute_complete(self, context: Context) -> None: + return + + def poke(self, context: Context) -> bool: + self.log.info("Checking if the time (%s) has come", self.target_datetime) + + # self.target_date has been converted to UTC, so we do not need to convert timezone + return timezone.utcnow() > self.target_datetime + + +class TimeSensorAsync(TimeSensor): + """ + Deprecated. Use TimeSensor with deferrable=True instead. + + :sphinx-autoapi-skip: + """ + + def __init__(self, **kwargs) -> None: + warnings.warn( + "TimeSensorAsync is deprecated and will be removed in a future version. Use `TimeSensor` with deferrable=True instead.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + super().__init__(deferrable=True, **kwargs) diff --git a/providers/standard/tests/system/standard/example_sensors.py b/providers/standard/tests/system/standard/example_sensors.py index b5ea2f458fd94..bc151f140c695 100644 --- a/providers/standard/tests/system/standard/example_sensors.py +++ b/providers/standard/tests/system/standard/example_sensors.py @@ -25,7 +25,7 @@ from airflow.providers.standard.sensors.bash import BashSensor from airflow.providers.standard.sensors.filesystem import FileSensor from airflow.providers.standard.sensors.python import PythonSensor -from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync +from airflow.providers.standard.sensors.time import TimeSensor from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync from airflow.providers.standard.sensors.weekday import DayOfWeekSensor from airflow.providers.standard.utils.weekday import WeekDay @@ -71,20 +71,21 @@ def failure_callable(): soft_fail=True, target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), ) - # [END example_time_sensors] - # [START example_time_sensors_async] - t1a = TimeSensorAsync( - task_id="fire_immediately_async", target_time=datetime.datetime.now(tz=datetime.timezone.utc).time() + t1a = TimeSensor( + task_id="fire_immediately_async", + target_time=datetime.datetime.now(tz=datetime.timezone.utc).time(), + deferrable=True, ) - t2a = TimeSensorAsync( + t2a = TimeSensor( task_id="timeout_after_second_date_in_the_future_async", timeout=1, soft_fail=True, target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), + deferrable=True, ) - # [END example_time_sensors_async] + # [END example_time_sensors] # [START example_bash_sensors] t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0") diff --git a/providers/standard/tests/unit/standard/sensors/test_time.py b/providers/standard/tests/unit/standard/sensors/test_time.py index 017b410eda4df..44fbbf14fc229 100644 --- a/providers/standard/tests/unit/standard/sensors/test_time.py +++ b/providers/standard/tests/unit/standard/sensors/test_time.py @@ -25,7 +25,7 @@ from airflow.exceptions import TaskDeferred from airflow.models.dag import DAG -from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync +from airflow.providers.standard.sensors.time import TimeSensor from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -36,55 +36,58 @@ class TestTimeSensor: @pytest.mark.parametrize( - "default_timezone, start_date, expected", + "default_timezone, start_date, target_time ,expected", [ - ("UTC", DEFAULT_DATE_WO_TZ, True), - ("UTC", DEFAULT_DATE_WITH_TZ, False), - (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False), + ("UTC", DEFAULT_DATE_WO_TZ, time(10, 0), True), + ("UTC", DEFAULT_DATE_WITH_TZ, time(16, 0), True), + ("UTC", DEFAULT_DATE_WITH_TZ, time(23, 0), False), + (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, time(23, 0), False), ], ) - @time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc)) - def test_timezone(self, default_timezone, start_date, expected, monkeypatch): + @time_machine.travel(timezone.datetime(2020, 1, 1, 13, 0).replace(tzinfo=timezone.utc)) + def test_timezone(self, default_timezone, start_date, target_time, expected, monkeypatch): monkeypatch.setattr("airflow.settings.TIMEZONE", timezone.parse_timezone(default_timezone)) - dag = DAG("test", schedule=None, default_args={"start_date": start_date}) - op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag) + dag = DAG("test_timezone", schedule=None, default_args={"start_date": start_date}) + op = TimeSensor(task_id="test", target_time=target_time, dag=dag) assert op.poke(None) == expected + def test_target_time_aware_dag_timezone(self): + # This behavior should be the same for both deferrable and non-deferrable + with DAG("test_target_time_aware", schedule=None, start_date=datetime(2020, 1, 1, 13, 0)): + aware_time = time(0, 1).replace(tzinfo=timezone.parse_timezone(DEFAULT_TIMEZONE)) + op = TimeSensor(task_id="test", target_time=aware_time) + assert op.target_datetime.tzinfo == timezone.utc + + def test_target_time_naive_dag_timezone(self): + # Again, this behavior should be the same for both deferrable and non-deferrable + with DAG( + dag_id="test_target_time_naive_dag_timezone", + schedule=None, + start_date=datetime(2020, 1, 1, 23, 0, tzinfo=timezone.parse_timezone(DEFAULT_TIMEZONE)), + ): + op = TimeSensor(task_id="test", target_time=time(9, 0)) + + # Since the DEFAULT_TIMEZONE is UTC+8:00, then hour 9 should be converted to hour 1 + assert op.target_datetime.time() == pendulum.time(1, 0) + assert op.target_datetime.tzinfo == timezone.utc -class TestTimeSensorAsync: @time_machine.travel("2020-07-07 00:00:00", tick=False) def test_task_is_deferred(self): with DAG( dag_id="test_task_is_deferred", schedule=None, - start_date=timezone.datetime(2020, 1, 1, 23, 0), + start_date=datetime(2020, 1, 1, 13, 0), ): - op = TimeSensorAsync(task_id="test", target_time=time(10, 0)) + op = TimeSensor(task_id="test", target_time=time(10, 0), deferrable=True) + + # This should be converted to UTC in the __init__. Since there is no default timezone, it will become + # aware, but note changed assert not timezone.is_naive(op.target_datetime) with pytest.raises(TaskDeferred) as exc_info: op.execute({}) assert isinstance(exc_info.value.trigger, DateTimeTrigger) - assert exc_info.value.trigger.moment == timezone.datetime(2020, 7, 7, 10) + assert exc_info.value.trigger.moment == pendulum.datetime(2020, 7, 7, 10) assert exc_info.value.kwargs is None assert exc_info.value.method_name == "execute_complete" - - def test_target_time_aware(self): - with DAG("test_target_time_aware", schedule=None, start_date=timezone.datetime(2020, 1, 1, 23, 0)): - aware_time = time(0, 1).replace(tzinfo=pendulum.local_timezone()) - op = TimeSensorAsync(task_id="test", target_time=aware_time) - assert op.target_datetime.tzinfo == timezone.utc - - def test_target_time_naive_dag_timezone(self): - """ - Tests that naive target_time gets converted correctly using the DAG's timezone. - """ - with DAG( - dag_id="test_target_time_naive_dag_timezone", - schedule=None, - start_date=pendulum.datetime(2020, 1, 1, 0, 0, tz=DEFAULT_TIMEZONE), - ): - op = TimeSensorAsync(task_id="test", target_time=pendulum.time(9, 0)) - assert op.target_datetime.time() == pendulum.time(1, 0) - assert op.target_datetime.tzinfo == timezone.utc