Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow-core/docs/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ You can either use pre-written deferrable operators as a DAG author or write you
Using Deferrable Operators
--------------------------

If you want to use pre-written deferrable operators that come with Airflow, such as ``TimeSensorAsync``, then you only need to complete two steps:
If you want to use pre-written deferrable operators that come with Airflow, such as ``TimeSensor``, then you only need to complete two steps:

* Ensure your Airflow installation runs at least one ``triggerer`` process, as well as the normal ``scheduler``
* Use deferrable operators/sensors in your dags

Airflow automatically handles and implements the deferral processes for you.

If you're upgrading existing dags to use deferrable operators, Airflow contains API-compatible sensor variants, like ``TimeSensorAsync`` for ``TimeSensor``. Add these variants into your DAG to use deferrable operators with no other changes required.
If you're upgrading existing dags to use deferrable operators, Airflow contains API-compatible sensor variants. Add these variants into your dag to use deferrable operators with no other changes required.

Note that you can't use the deferral ability from inside custom PythonOperator or TaskFlow Python functions. Deferral is only available to traditional, class-based operators.

Expand Down
18 changes: 1 addition & 17 deletions providers/standard/docs/sensors/datetime.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ It is an async version of the operator and requires Triggerer to run.
TimeSensor
==========

Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensor` to end sensing after time specified.
Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensor` to end sensing after time specified. ``TimeSensor`` can be run in deferrable mode, if a Triggerer is available.

Time will be evaluated against ``data_interval_end`` if present for the dag run, otherwise ``run_after`` will be used.

Expand All @@ -65,22 +65,6 @@ Time will be evaluated against ``data_interval_end`` if present for the dag run,
:end-before: [END example_time_sensors]


.. _howto/operator:TimeSensorAsync:

TimeSensorAsync
===============

Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensorAsync` to end sensing after time specified.
It is an async version of the operator and requires Triggerer to run.

Time will be evaluated against ``data_interval_end`` if present for the dag run, otherwise ``run_after`` will be used.

.. exampleinclude:: /../tests/system/standard/example_sensors.py
:language: python
:dedent: 4
:start-after: [START example_time_sensors_async]
:end-before: [END example_time_sensors_async]

.. _howto/operator:DayOfWeekSensor:

DayOfWeekSensor
Expand Down
88 changes: 48 additions & 40 deletions providers/standard/src/airflow/providers/standard/sensors/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
from __future__ import annotations

import datetime
import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, NoReturn
from typing import TYPE_CHECKING, Any

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sensors.base import BaseSensorOperator

Expand Down Expand Up @@ -54,39 +57,14 @@ class TimeSensor(BaseSensorOperator):
Waits until the specified time of the day.

:param target_time: time after which the job succeeds
:param deferrable: whether to defer execution

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:TimeSensor`

"""

def __init__(self, *, target_time: datetime.time, **kwargs) -> None:
super().__init__(**kwargs)
self.target_time = target_time

def poke(self, context: Context) -> bool:
self.log.info("Checking if the time (%s) has come", self.target_time)
return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time


class TimeSensorAsync(BaseSensorOperator):
"""
Waits until the specified time of the day.

This frees up a worker slot while it is waiting.

:param target_time: time after which the job succeeds
:param start_from_trigger: Start the task directly from the triggerer without going into the worker.
:param end_from_trigger: End the task directly from the triggerer without going into the worker.
:param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True
during dynamic task mapping. This argument is not used in standard usage.

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:TimeSensorAsync`
"""

start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger",
trigger_kwargs={"moment": "", "end_from_trigger": False},
Expand All @@ -100,32 +78,62 @@ def __init__(
self,
*,
target_time: datetime.time,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
start_from_trigger: bool = False,
trigger_kwargs: dict[str, Any] | None = None,
end_from_trigger: bool = False,
trigger_kwargs: dict[str, Any] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.start_from_trigger = start_from_trigger
self.end_from_trigger = end_from_trigger
self.target_time = target_time

# Create a "date-aware" timestamp that will be used as the "target_datetime". This is a requirement
# of the DateTimeTrigger
aware_time = timezone.coerce_datetime(
datetime.datetime.combine(datetime.datetime.today(), self.target_time, self.dag.timezone)
datetime.datetime.combine(datetime.datetime.today(), target_time, self.dag.timezone)
)

# Now that the dag's timezone has made the datetime timezone aware, we need to convert to UTC
self.target_datetime = timezone.convert_to_utc(aware_time)
self.deferrable = deferrable
self.start_from_trigger = start_from_trigger
self.end_from_trigger = end_from_trigger

if self.start_from_trigger:
self.start_trigger_args.trigger_kwargs = dict(
moment=self.target_datetime, end_from_trigger=self.end_from_trigger
)

def execute(self, context: Context) -> NoReturn:
self.defer(
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger),
method_name="execute_complete",
)
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=DateTimeTrigger(
moment=self.target_datetime, # This needs to be an aware timestamp
end_from_trigger=self.end_from_trigger,
),
method_name="execute_complete",
)

def execute_complete(self, context: Context, event: Any = None) -> None:
"""Handle the event when the trigger fires and return immediately."""
return None
def execute_complete(self, context: Context) -> None:
return

def poke(self, context: Context) -> bool:
self.log.info("Checking if the time (%s) has come", self.target_datetime)

# self.target_date has been converted to UTC, so we do not need to convert timezone
return timezone.utcnow() > self.target_datetime


class TimeSensorAsync(TimeSensor):
"""
Deprecated. Use TimeSensor with deferrable=True instead.

:sphinx-autoapi-skip:
"""

def __init__(self, **kwargs) -> None:
warnings.warn(
"TimeSensorAsync is deprecated and will be removed in a future version. Use `TimeSensor` with deferrable=True instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
super().__init__(deferrable=True, **kwargs)
15 changes: 8 additions & 7 deletions providers/standard/tests/system/standard/example_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.providers.standard.sensors.bash import BashSensor
from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.providers.standard.sensors.python import PythonSensor
from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync
from airflow.providers.standard.sensors.time import TimeSensor
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
from airflow.providers.standard.utils.weekday import WeekDay
Expand Down Expand Up @@ -71,20 +71,21 @@ def failure_callable():
soft_fail=True,
target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(),
)
# [END example_time_sensors]

# [START example_time_sensors_async]
t1a = TimeSensorAsync(
task_id="fire_immediately_async", target_time=datetime.datetime.now(tz=datetime.timezone.utc).time()
t1a = TimeSensor(
task_id="fire_immediately_async",
target_time=datetime.datetime.now(tz=datetime.timezone.utc).time(),
deferrable=True,
)

t2a = TimeSensorAsync(
t2a = TimeSensor(
task_id="timeout_after_second_date_in_the_future_async",
timeout=1,
soft_fail=True,
target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(),
deferrable=True,
)
# [END example_time_sensors_async]
# [END example_time_sensors]

# [START example_bash_sensors]
t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0")
Expand Down
67 changes: 35 additions & 32 deletions providers/standard/tests/unit/standard/sensors/test_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from airflow.exceptions import TaskDeferred
from airflow.models.dag import DAG
from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync
from airflow.providers.standard.sensors.time import TimeSensor
from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

Expand All @@ -36,55 +36,58 @@

class TestTimeSensor:
@pytest.mark.parametrize(
"default_timezone, start_date, expected",
"default_timezone, start_date, target_time ,expected",
[
("UTC", DEFAULT_DATE_WO_TZ, True),
("UTC", DEFAULT_DATE_WITH_TZ, False),
(DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False),
("UTC", DEFAULT_DATE_WO_TZ, time(10, 0), True),
("UTC", DEFAULT_DATE_WITH_TZ, time(16, 0), True),
("UTC", DEFAULT_DATE_WITH_TZ, time(23, 0), False),
(DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, time(23, 0), False),
],
)
@time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc))
def test_timezone(self, default_timezone, start_date, expected, monkeypatch):
@time_machine.travel(timezone.datetime(2020, 1, 1, 13, 0).replace(tzinfo=timezone.utc))
def test_timezone(self, default_timezone, start_date, target_time, expected, monkeypatch):
monkeypatch.setattr("airflow.settings.TIMEZONE", timezone.parse_timezone(default_timezone))
dag = DAG("test", schedule=None, default_args={"start_date": start_date})
op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
dag = DAG("test_timezone", schedule=None, default_args={"start_date": start_date})
op = TimeSensor(task_id="test", target_time=target_time, dag=dag)
assert op.poke(None) == expected

def test_target_time_aware_dag_timezone(self):
# This behavior should be the same for both deferrable and non-deferrable
with DAG("test_target_time_aware", schedule=None, start_date=datetime(2020, 1, 1, 13, 0)):
aware_time = time(0, 1).replace(tzinfo=timezone.parse_timezone(DEFAULT_TIMEZONE))
op = TimeSensor(task_id="test", target_time=aware_time)
assert op.target_datetime.tzinfo == timezone.utc

def test_target_time_naive_dag_timezone(self):
# Again, this behavior should be the same for both deferrable and non-deferrable
with DAG(
dag_id="test_target_time_naive_dag_timezone",
schedule=None,
start_date=datetime(2020, 1, 1, 23, 0, tzinfo=timezone.parse_timezone(DEFAULT_TIMEZONE)),
):
op = TimeSensor(task_id="test", target_time=time(9, 0))

# Since the DEFAULT_TIMEZONE is UTC+8:00, then hour 9 should be converted to hour 1
assert op.target_datetime.time() == pendulum.time(1, 0)
assert op.target_datetime.tzinfo == timezone.utc

class TestTimeSensorAsync:
@time_machine.travel("2020-07-07 00:00:00", tick=False)
def test_task_is_deferred(self):
with DAG(
dag_id="test_task_is_deferred",
schedule=None,
start_date=timezone.datetime(2020, 1, 1, 23, 0),
start_date=datetime(2020, 1, 1, 13, 0),
):
op = TimeSensorAsync(task_id="test", target_time=time(10, 0))
op = TimeSensor(task_id="test", target_time=time(10, 0), deferrable=True)

# This should be converted to UTC in the __init__. Since there is no default timezone, it will become
# aware, but note changed
assert not timezone.is_naive(op.target_datetime)

with pytest.raises(TaskDeferred) as exc_info:
op.execute({})

assert isinstance(exc_info.value.trigger, DateTimeTrigger)
assert exc_info.value.trigger.moment == timezone.datetime(2020, 7, 7, 10)
assert exc_info.value.trigger.moment == pendulum.datetime(2020, 7, 7, 10)
assert exc_info.value.kwargs is None
assert exc_info.value.method_name == "execute_complete"

def test_target_time_aware(self):
with DAG("test_target_time_aware", schedule=None, start_date=timezone.datetime(2020, 1, 1, 23, 0)):
aware_time = time(0, 1).replace(tzinfo=pendulum.local_timezone())
op = TimeSensorAsync(task_id="test", target_time=aware_time)
assert op.target_datetime.tzinfo == timezone.utc

def test_target_time_naive_dag_timezone(self):
"""
Tests that naive target_time gets converted correctly using the DAG's timezone.
"""
with DAG(
dag_id="test_target_time_naive_dag_timezone",
schedule=None,
start_date=pendulum.datetime(2020, 1, 1, 0, 0, tz=DEFAULT_TIMEZONE),
):
op = TimeSensorAsync(task_id="test", target_time=pendulum.time(9, 0))
assert op.target_datetime.time() == pendulum.time(1, 0)
assert op.target_datetime.tzinfo == timezone.utc