-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Fix client bypassed pydantic validation to enforce datetime & fix corresponding unittest #58791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix client bypassed pydantic validation to enforce datetime & fix corresponding unittest #58791
Conversation
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't correct. I ran your test dag with airflow 3.1.3 and added few logs:
Index: task-sdk/src/airflow/sdk/bases/sensor.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/task-sdk/src/airflow/sdk/bases/sensor.py b/task-sdk/src/airflow/sdk/bases/sensor.py
--- a/task-sdk/src/airflow/sdk/bases/sensor.py (revision 08e871458153cbac24a9ef3dba31286934f603e1)
+++ b/task-sdk/src/airflow/sdk/bases/sensor.py (date 1764310309945)
@@ -184,7 +184,9 @@
if self.reschedule:
ti = context["ti"]
first_reschedule_date = ti.get_first_reschedule_date(context)
+ print("first reschedule date:", first_reschedule_date, type(first_reschedule_date))
started_at = start_date = first_reschedule_date or timezone.utcnow()
+ print("start date:", start_date, type(start_date))
def run_duration() -> float:
# If we are in reschedule mode, then we have to compute diff
And it doesn't return string anywhere. It's a datetime object:
[2025-11-28 11:43:12] INFO - __file__='/files/plugins/triggera.py' loaded source=task.stdout
[2025-11-28 11:43:12] INFO - __file__='/files/plugins/triggera_comprehensive.py' loaded source=task.stdout
[2025-11-28 11:43:12] INFO - DAG bundles loaded: dags-folder source=airflow.dag_processing.bundles.manager.DagBundlesManager loc=manager.py:209
[2025-11-28 11:43:12] INFO - Filling up the DagBag from /files/test_dags/poke-bug.py source=airflow.dag_processing.dagbag.DagBag loc=dagbag.py:627
[2025-11-28 11:43:12] INFO - first reschedule date: None <class 'NoneType'> source=task.stdout
[2025-11-28 11:43:12] INFO - start date: 2025-11-28 06:13:12.540845+00:00 <class 'datetime.datetime'> source=task.stdout
[2025-11-28 11:43:12] INFO - Poking callable: <function poke_dag.<locals>.sensor_task at 0xffff84709750> source=airflow.task.operators.airflow.providers.standard.decorators.sensor.DecoratedSensorOperator loc=python.py:75
[2025-11-28 11:43:12] INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE source=task loc=task_runner.py:1011
[2025-11-28 11:43:14] INFO - __file__='/files/plugins/triggera.py' loaded source=task.stdout
[2025-11-28 11:43:14] INFO - __file__='/files/plugins/triggera_comprehensive.py' loaded source=task.stdout
[2025-11-28 11:43:14] INFO - DAG bundles loaded: dags-folder source=airflow.dag_processing.bundles.manager.DagBundlesManager loc=manager.py:209
[2025-11-28 11:43:14] INFO - Filling up the DagBag from /files/test_dags/poke-bug.py source=airflow.dag_processing.dagbag.DagBag loc=dagbag.py:627
[2025-11-28 11:43:14] INFO - Poking callable: <function poke_dag.<locals>.sensor_task at 0xffff8470d990> source=airflow.task.operators.airflow.providers.standard.decorators.sensor.DecoratedSensorOperator loc=python.py:75
[2025-11-28 11:43:14] INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE source=task loc=task_runner.py:1011
[2025-11-28 11:43:14] INFO - first reschedule date: 2025-11-28 06:13:12.546843+00:00 <class 'datetime.datetime'> source=task.stdout
[2025-11-28 11:43:14] INFO - start date: 2025-11-28 06:13:12.546843+00:00 <class 'datetime.datetime'> source=task.stdout
[2025-11-28 11:43:16] INFO - __file__='/files/plugins/triggera.py' loaded source=task.stdout
[2025-11-28 11:43:16] INFO - __file__='/files/plugins/triggera_comprehensive.py' loaded source=task.stdout
[2025-11-28 11:43:16] INFO - DAG bundles loaded: dags-folder source=airflow.dag_processing.bundles.manager.DagBundlesManager loc=manager.py:209
[2025-11-28 11:43:16] INFO - Filling up the DagBag from /files/test_dags/poke-bug.py source=airflow.dag_processing.dagbag.DagBag loc=dagbag.py:627
[2025-11-28 11:43:16] INFO - Poking callable: <function poke_dag.<locals>.sensor_task at 0xffff843feef0> source=airflow.task.operators.airflow.providers.standard.decorators.sensor.DecoratedSensorOperator loc=python.py:75
[2025-11-28 11:43:16] INFO - Skipping task. reason=Sensor has timed out; run duration of 4.354058 seconds exceeds the specified timeout of 3.0. source=task loc=task_runner.py:1003
[2025-11-28 11:43:16] INFO - first reschedule date: 2025-11-28 06:13:12.546843+00:00 <class 'datetime.datetime'> source=task.stdout
[2025-11-28 11:43:16] INFO - start date: 2025-11-28 06:13:12.546843+00:00 <class 'datetime.datetime'> source=task.stdout|
@amoghrajesh You're right that at runtime it returns datetime. This is because when the response travels from supervisor to task runner, it gets deserialized where Pydantic validation converts the string to datetime. As for client.py, it was returning a string with model_construct() which bypasses validation, and the test itself was testing against a string and passing. This fix ensures the client returns the correct type rather than relying on downstream |
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for clarifying.
|
@prdai could you change the title of the PR to reflect the changes better? |
|
@amoghrajesh i didn't create this pr, so i can't really change the title... |
|
My bad. Wrong tag |
|
Title updated |
(cherry picked from commit 4360616) Co-authored-by: Steve Ahn <steveahnahn@g.ucla.edu>
Problem
ti.get_first_reschedule_date() returns a string instead of a datetime object when running due to TaskRescheduleStartDate.model_construct() bypasses Pydantic validation. The corresponding unit test also tested against a string.
Solution
Use the standard Pydantic constructor instead of model_construct() to ensure it parses the ISO string into a timezone-aware datetime object and fixes the associated test.
Related Issue
Closes #58777