-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
area:corekind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yet
Description
Apache Airflow version
3.1.3
If "Other Airflow 2/3 version" selected, which one?
No response
What happened?
The get_first_reschedule_date method of RuntimeTaskInstanceProtocol has the following signature.
# airflow/sdk/types.py
def get_first_reschedule_date(self, first_try_number) -> AwareDatetime | None: ...
But in dag.test() mode it actually return a string.
This leads to at least one serious bug in Sensor operator
# airflow/sdk/bases/sensor.py
def execute(self, context: Context) -> Any:
started_at: datetime.datetime | float
if self.reschedule:
ti = context["ti"]
first_reschedule_date = ti.get_first_reschedule_date(context) <- here the string is returned
started_at = start_date = first_reschedule_date or timezone.utcnow()
def run_duration() -> float:
# If we are in reschedule mode, then we have to compute diff
# based on the time in a DB, so can't use time.monotonic
return (timezone.utcnow() - start_date).total_seconds() <- here the type unmatch happens
...
Error message:
[2025-11-27T14:56:19.717072Z] {task_runner.py:994} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920, in run
result = _execute_task(context=context, ti=ti, log=log)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1307, in _execute_task
result = ctx.run(execute, context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 416, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/sensor.py", line 234, in execute
if run_duration() > self.timeout:
^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/sensor.py", line 194, in run_duration
return (timezone.utcnow() - start_date).total_seconds()
~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~
TypeError: unsupported operand type(s) for -: 'datetime.datetime' and 'str'
What you think should happen instead?
No response
How to reproduce
import pendulum
from airflow.sdk import dag, task, PokeReturnValue
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
)
def my_dag():
@task
def start() -> int:
return 1
@task.sensor(
poke_interval=1,
mode="reschedule",
soft_fail=True,
timeout=3,
)
def sensor_task() -> PokeReturnValue:
return PokeReturnValue(is_done=False)
start() >> sensor_task()
if __name__ == '__main__':
my_dag().test()
Operating System
Ubuntu 24.04.3 LTS
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
area:corekind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yet