Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New 'clear_number' attribute for dag_run to track the number of times it has been cleared #34126

Merged
merged 5 commits into from
Sep 12, 2023

Conversation

sungwy
Copy link
Contributor

@sungwy sungwy commented Sep 5, 2023

related: #33677

Currently, Airflow supports clearing Dag runs in one of three ways:

REST API endpoint clear
UI button clearExistingTasks
Task Clear cli command

All three of these methods ultimately invoke dag.clear method and put the DagRun back in a DagRunState.Queued state. Unfortunately there is currently no way for the schedulers to know if the QUEUED DagRun is its original scheduled run, or has been cleared from an already finished state.

The inability for Airflow or for users to distinguish between the original scheduled DagRun, and a cleared DagRun leads to difficulty in inferring meaningful information from reliability metrics. Some example reliability metrics that can be used as meaningful indicators of the Airflow cluster health, when monitoring periodic / scheduled Dags include:

dagrun.schedule_delay
dagrun.first_task_scheduling_delay

Each of these metrics help engineers understand if the Airflow is scheduling DagRuns and tasks when they are meant to be scheduled. And if these metrics are published in a reliable way, we empower engineers to be able to set up alarms when these metrics spike. Unfortunately, these metrics spike in false-alarms when the DagRuns are cleared.

This PR proposes that we persist the information that a DagRun was cleared as a boolean attribute in the DagRun table, when the DagRun is cleared to a 'QUEUED' state from a finished state ('SUCCESS' or 'FAILED').

Additional positive impact of introducing this boolean attribute, is that this also exposes this information to users who may be using their own custom operators to generate reliability metrics of their DagRuns/tasks. (e.g. Deferrable Operators that measure SLAs as suggested in https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-57+Refactor+SLA+Feature and other discussions).


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:Scheduler including HA (high availability) scheduler kind:documentation labels Sep 5, 2023
@uranusjr
Copy link
Member

uranusjr commented Sep 5, 2023

Unfortunately there is currently no way for the schedulers to know if the QUEUED DagRun is its original scheduled run, or has been cleared from an already finished state.

Instead of a separate flag, would it be viable to have a CLEARED state instead?

@sungwy
Copy link
Contributor Author

sungwy commented Sep 6, 2023

Unfortunately there is currently no way for the schedulers to know if the QUEUED DagRun is its original scheduled run, or has been cleared from an already finished state.

Instead of a separate flag, would it be viable to have a CLEARED state instead?

@uranusjr thank you for the review, and yes interesting thought... Given that a new CLEARED state will only be a transient state, I think it'll be harder for us to provide same amount of information at different points throughout the lifecycle of the DagRun. Assuming that the new state will go through the following lifecycle: CLEARED -> (QUEUED) -> RUNNING -> finished STATE, I believe that using a transient state only allows us to differentiate between the original scheduled run, and a cleared dag run until the DagRun is scheduled.

For example, with a new state, we will still be able to know that the DagRun was cleared when we call _update_state, but not when we publish the first_task_scheduling_delay metric within _emit_true_scheduling_delay_stats_for_finished_state call (since the DagRun will already be in a RUNNING state at this point).

Similarly, I think using a transient state instead of a more permanent marker will reduce the amount of information the users can build on when they try to infer if the DagRun had been cleared for their own purposes - for instance if there's an importance in differentiating between the original scheduled run, or a cleared run in their custom operator.

@potiuk
Copy link
Member

potiuk commented Sep 7, 2023

Similarly, I think using a transient state instead of a more permanent marker will reduce the amount of information the users can build on when they try to infer if the DagRun had been cleared for their own purposes - for instance if there's an importance in differentiating between the original scheduled run, or a cleared run in their custom operator.

How about "cleared_count" instead of bool? I think that would be quie a bit more informative

@sungwy
Copy link
Contributor Author

sungwy commented Sep 7, 2023

Similarly, I think using a transient state instead of a more permanent marker will reduce the amount of information the users can build on when they try to infer if the DagRun had been cleared for their own purposes - for instance if there's an importance in differentiating between the original scheduled run, or a cleared run in their custom operator.

How about "cleared_count" instead of bool? I think that would be quie a bit more informative

I agree! I will implement the change shortly 👍

@uranusjr
Copy link
Member

uranusjr commented Sep 7, 2023

We already have try_number on TaskInstance for a similar purpose (you can clear and rerun one single task), I wonder if there’s some unifying can be done here.

@sungwy
Copy link
Contributor Author

sungwy commented Sep 7, 2023

We already have try_number on TaskInstance for a similar purpose (you can clear and rerun one single task), I wonder if there’s some unifying can be done here.

Yeah, that was my initial thought, and have noted that in the original Google Doc draft of the SLA AIP-57. I don't feel strongly about the naming convention, and will more than happily take either one :)

@potiuk
Copy link
Member

potiuk commented Sep 7, 2023

clear_number sounds good to me too.

@sungwy sungwy changed the title New 'cleared' boolean attribute for dag_run New 'clear_number' attribute for dag_run to track how many times it has been cleared Sep 8, 2023
@sungwy sungwy changed the title New 'clear_number' attribute for dag_run to track how many times it has been cleared New 'clear_number' attribute for dag_run to track the number of times it has been cleared Sep 8, 2023
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @uranusjr ?

airflow/models/dagrun.py Outdated Show resolved Hide resolved
Comment on lines +43 to +48
sa.Column(
"clear_number",
sa.Integer,
default=0,
nullable=False,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if we want to add a constraint on this to ensure the value is not negative. Or if not, maybe we should change the == 0 check to < 1 instead. A bit paranoid probably, but it’s mostly free safety so why not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not paranoid, these are great suggestions. I took all of your suggestions @uranusjr thank you for your input :)

@@ -210,6 +211,7 @@ def __init__(
dag_hash: str | None = None,
creating_job_id: int | None = None,
data_interval: tuple[datetime, datetime] | None = None,
clear_number: int = 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this argument used anywhere? I wonder if we should just have self.clear_number = 0 always and rely on in-place modification instead.

sungwy and others added 2 commits September 11, 2023 08:37
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
@uranusjr uranusjr merged commit eed2901 into apache:main Sep 12, 2023
41 of 42 checks passed
@ephraimbuddy ephraimbuddy added this to the Airflow 2.8.0 milestone Oct 3, 2023
@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label Oct 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler kind:documentation type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants