-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Aip 86: add deadlines to DagRunResponse #50957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| end_date: datetime | None | ||
| data_interval_start: datetime | None | ||
| data_interval_end: datetime | None | ||
| deadlines: list[DeadlineResponse] | None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I missed this. A dag run can have multiple deadlines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, here's an example that @ferruzzi provided:
with DAG(
dag_id="dag_with_deadlines",
deadline=[
DeadlineAlert(
reference = DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(hours=1),
callback=warn_me_it_may_be_late
),
DeadlineAlert(
reference = DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(hours=2),
callback=definitely_late
),
DeadlineAlert(
reference = DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(hours=3),
callback=email_my_boss_that_it_wasnt_my_fault
),
]
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok and then on the DagResponse we will have the full DeadlineAlert with the reference, interval and callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making this a separate endpoint and calling it for the DagRun retrieved? Simply passing the dag_run_id. This way, we don't increase the DagRunResponse too much. It already has a lot of information. We can have deadline routes as a separate if it is okay from the UI perspective
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think keeping some deadline information on DagRunResponse is valuable because I think "show me recent dag runs with passed deadlines" would be a key feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the API point of view, we can nest the deadline API inside the dagrun one. Because deadline can be considered a sub-resource of the dagrun, they only exist for their corresponding dagrun, and especially when the dagrun is in a specific state, otherwise they are removed. (But that's debatable, since all the API resource are for now flat at root level, we could do the same here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlines: list[DeadlineResponse] should be enough, the relationship should return an empty list when there is no related items
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I brought it up on the dev call last week and we're going to make (a variation on) the change you suggested. Here's what the lifespan of a Deadline will be, using Dagrun as an example:
-
When a new Dagrun is created:
- If the dag has a deadline, calculate the value and store it along with the dag_id, run_id, etc in the
deadlinetable
- If the dag has a deadline, calculate the value and store it along with the dag_id, run_id, etc in the
-
When a dagrun finishes:
- If (and only if) the current time is before the calculated deadline, then remove the deadline from the
deadlinetable
- If (and only if) the current time is before the calculated deadline, then remove the deadline from the
-
The scheduler loop will query the deadline table to see if any deadlines have expired
- If yes then the callback is sent to the Triggerer to process
- once the callback is run, the Triggerer will move the failed deadline to a new table (working name is just
missed_deadlines?)
So deadlines table will have all upcoming and unprocessed deadlines. missed_deadlines table will have all expired deadlines which have been resolved. If the serialized DAG has a deadline and it is in neither table, then it will be assumed that it completed successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should also clarify that the current code as written is still a single deadline, but I need to make that change before launch, it WILL accept a list but the current code in the repo doesn't show that. Sorry if I mislead or confused anyone on that point. It came up in a Meetup discussion after I did the initial work, but it's a great idea and I'm going to be adding it in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ferruzzi , if that's the case, should i keep this PR on hold until we add the support to have multiple deadlines?
pierrejeambrun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, looks good to me.
Just a few suggestions.
| end_date: datetime | None | ||
| data_interval_start: datetime | None | ||
| data_interval_end: datetime | None | ||
| deadlines: list[DeadlineResponse] | None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I misunderstood something. As I understand it, there's no record in the db at all for deadlines unless the run is in progress?
I assume we can always improve later and add that if people request it. (soft delete etc...)
| end_date: datetime | None | ||
| data_interval_start: datetime | None | ||
| data_interval_end: datetime | None | ||
| deadlines: list[DeadlineResponse] | None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the API point of view, we can nest the deadline API inside the dagrun one. Because deadline can be considered a sub-resource of the dagrun, they only exist for their corresponding dagrun, and especially when the dagrun is in a specific state, otherwise they are removed. (But that's debatable, since all the API resource are for now flat at root level, we could do the same here).
| end_date: datetime | None | ||
| data_interval_start: datetime | None | ||
| data_interval_end: datetime | None | ||
| deadlines: list[DeadlineResponse] | None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadlines: list[DeadlineResponse] should be enough, the relationship should return an empty list when there is no related items
| def convert_instrumented_list_to_dict(lst): | ||
| result = [obj.__dict__ for obj in lst] | ||
| for obj in result: | ||
| obj.pop("_sa_instance_state", None) | ||
| return result | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That's an additional item on the list and was failing tests due to a mismatch in comparison caused by _sa_instance_state.
bugraoz93
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good. Some tests still need updates to include the new field in dag_run
pierrejeambrun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, this PR was merged recently #51698 which will impact this PR.
@pierrejeambrun , what kind of impact? Did you mean merge conflicts or do I need to be updating something? |
pierrejeambrun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pierrejeambrun , what kind of impact? Did you mean merge conflicts or do I need to be updating something?
Yes merge conflicts and also some unused code. It basically introduces DeadlineAlertResponse which is exactly your DeadlineResponse, so we need to keep only one.
|
I haven't been getting time to finish this. I'll let someone pick and finish this off. At least the PR can act as a starting point to anyone who picks it up |
closes #50913