Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deactivating DAGs which have been removed from files #17121

Merged
merged 3 commits into from
Sep 18, 2021

Conversation

SamWheating
Copy link
Contributor

@SamWheating SamWheating commented Jul 21, 2021

Closes: #11901
Closes: #17516

Ensuring that the active DAGs in the DB are all actually present in their corresponding python files by reconciling the DB state and with the contents of a given DAG file on every parse operation.

This should prevent a lot of issues encountered when writing multiple DAGs per-file, renaming DAGs or dynamically generating DAGs based on a config file read at parse-time.

This will cause a few other functional changes:

  1. DAGs will disappear from the UI if an ImportError is introduced in the underlying file.
  2. DAGs will no longer be re-marked as active if they are inactive / missing but the file is present.
  3. DAGs could be incorrectly marked inactive if there's an unexpected parsing error (I guess this is a tradeoff between false-poitives and false-negatives)

Validation:

I have validated these changes in a local breeze environment with the following DAG:

from airflow.models import DAG
from airflow import utils
from airflow.operators.python import PythonOperator

NUM_DAGS=1

def message():
    print('Hello, world.')

for i in range(NUM_DAGS):
    with DAG(f'dag-{i}', schedule_interval=None, start_date=utils.dates.days_ago(1)) as dag:
        task = PythonOperator(
            task_id='task',
            python_callable=message
        )
        globals()[f"dag_{i}"] = dag

By changing the value of NUM_DAGS I can quickly change the number of DAG objects present in this file.

Before this change, decreasing the value of NUM_DAGS would leave a bunch of stale DAGs in the UI. These could be triggered but would then fail as the executor was not able to load the specified task from the file.

After implementing this change, stale DAGs disappear from the UI shortly after decreasing the value of NUM_DAGS.

Questions:

  1. Is there a good reason for Airflow to mark inactive DAGs as active if the file still exists? I looked through the original PR which introduced this but couldn't find an explanation.

  2. How significant is the performance hit incurred by updating the DAG table on every parse operation?

  3. Will this change introduce any other functional changes that I haven't mentioned above?

Follow up:

If this PR is merged, we should also add some documentation which discusses dynamically generated DAGs, and explains how to generate multiple DAGs in a loop as shown in the example above.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Jul 21, 2021
@SamWheating SamWheating force-pushed the sw-remove-stale-dags branch 2 times, most recently from 269698d to 0d8a0fd Compare July 26, 2021 16:35
@kaxil
Copy link
Member

kaxil commented Jul 30, 2021

Can you rebase on latest main please

@kaxil kaxil added this to the Airflow 2.1.3 milestone Jul 30, 2021
@kaxil kaxil modified the milestones: Airflow 2.1.3, Airflow 2.2 Aug 9, 2021
@kaxil
Copy link
Member

kaxil commented Aug 9, 2021

@SamWheating -- I just returned from holiday, can you resolve the conflicts, please

@kaxil
Copy link
Member

kaxil commented Aug 20, 2021

One of the tests is failing, can you take a look please: https://github.com/apache/airflow/pull/17121/checks?check_run_id=3284172968#step:6:10746

@SamWheating
Copy link
Contributor Author

One of the tests is failing, can you take a look please: #17121 (checks)

I rebased my branch and have been unable to recreate this failure locally - If this failure persists in CI after a rebase then I can investigate further 👀

@kaxil
Copy link
Member

kaxil commented Aug 21, 2021

Tests are still failing

@kaxil
Copy link
Member

kaxil commented Aug 24, 2021

@SamWheating Can you take a look at failing tests please

@SamWheating
Copy link
Contributor Author

I'm away on vacation for the next week - I can get these tests fixed afterwards.

Is there a deadline for the next release?

@uranusjr
Copy link
Member

No hard deadline yet but features probably need to finish in ~2 weeks or so to be included.

@@ -404,7 +404,7 @@ def test_process_file_should_failure_callback(self):

@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
def test_add_unparseable_file_before_sched_start_creates_import_error(self, tmpdir):
unparseable_filename = tmpdir / TEMP_DAG_FILENAME
unparseable_filename = os.path.join(tmpdir, TEMP_DAG_FILENAME)
Copy link
Contributor Author

@SamWheating SamWheating Aug 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are required because the previous tmpdir / "child_path" syntax would return a PathLib object rather than a string, and the _process_file function expects a string.

Providing the wrong type was causing some of the tests to fail due SQLAlchemy errors.

os.path.join will always return a string and thus fixes this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmpdir is not a pathlib.Path but a py._path.LocalPath, which is a path object implementation in pytest (actually from a package py that backs most of pytest) that pre-dates the stdlib pathlib.

But you fix is correct—py._path.LocalPath implements the new fspath protocol introduced in Python 3.6 (PEP 519), so it can be consumed by os.path.join and coerced into a str.

@uranusjr
Copy link
Member

uranusjr commented Aug 30, 2021

This seems to break test_home_importerrors_filtered_singledag_user, I’m gussing maybe because that DAG is now disabled and thus not triggering the ImportError? You’ll need to investigate this and maybe fix the test (to use another DAG or something).

@SamWheating
Copy link
Contributor Author

SamWheating commented Aug 30, 2021

Yeah exactly, this might require some more changes, here's a shortened version of the relevant code from the rendering of the home page:

		dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)    # deactivated DAGs filtered out here
        import_errors = session.query(errors.ImportError).order_by(errors.ImportError.id).all()
		all_dags = dags_query
        current_dags = all_dags

        dags = (
            current_dags.order_by(DagModel.dag_id)
            .options(joinedload(DagModel.tags))
            .offset(start)
            .limit(dags_per_page)
            .all()
        )

        if import_errors:
            dag_filenames = {dag.fileloc for dag in dags}
            all_dags_readable = (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG) in user_permissions

            for import_error in import_errors:
                if all_dags_readable or import_error.filename in dag_filenames:		# dag_filenames only includes active DAGs
                    flash(
                        "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=import_error),
                        "dag_import_error",
                    )

source

So if a user does not have access to read all DAGs, they won't see importErrors for any DAGs since a DAG which begins throwing importErrors will be marked inactive 🤔 This definitely isn't the desired behaviour.

I guess the fix here would be to populate dag_filenames from a separate query which includes deactivated DAGs? Let me know if you have any suggestions.

@SamWheating
Copy link
Contributor Author

SamWheating commented Aug 31, 2021

Fixing the remaining broken test case will have to wait until #17924 is merged.

@SamWheating
Copy link
Contributor Author

I don't think that the remaining failure is related to my changes 🤔

Comment on lines +658 to +670
if deactivated:
self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it’d be worthwhile to log the DAG IDs here. (Is it easy though? I know in SQL you can do UPDATE ... RETURNING but I don’t know what dbs can handle that nor how it’s done in SQLAlchemy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a great idea - It looks like its supported:
https://docs.sqlalchemy.org/en/14/core/dml.html#sqlalchemy.sql.expression.Update.returning

But I'm not sure if this also means that all dialects are supported.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://docs.sqlalchemy.org/en/14/glossary.html#term-RETURNING:

The backends that currently support RETURNING or a similar construct are PostgreSQL, SQL Server, Oracle, and Firebird.

Copy link
Contributor Author

@SamWheating SamWheating Sep 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for confirming. If its not supported in sqlite and mySQL then I don't think we can rely on this feature.

We could split this into a separated query + update like:

        deactivated = (
            session.query(DagModel)
            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
        )
        if deactivated:
        	deactivated_dags = [dag.dag_id for dag in deactivated]
        	deactivated.update({DagModel.is_active: False}, synchronize_session="fetch")
        	self.log.info("Deactivated missing DAGs: %s", ",".join(deactivated_dags))

But I think that will introduce potential race conditions and consistency issues, so I'd rather keep things as-is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can only show the IDs on Postgres and MSSQL. Or do that some other time when someone complains 😛

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already do things like that :)

Neat! Thanks for the examples - I can refactor this PR tomorrow to use different queries and logging depending on the dialect

Copy link
Contributor Author

@SamWheating SamWheating Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am touching same code, to do something similar (though not same at #18120)

Interesting, I'll follow along with the conversation there as I think that there's some functional overlap with this PR, so a lot of the discussion is probably relevant here also.

There's also another PR recently open which takes a different approach to the same problem as this PR (#18087) We should definitely compare the different approaches and agree upon the best way to fix this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is pending or else looks good and ready to be merged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks for the reminder - had a busy week. I'll try to get this changed and update the tests ASAP 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I just went to go and make the requested changes and it doesn't look like update().returning() is supported in the SqlAlchemy query API but only in direct table operations.

https://docs.sqlalchemy.org/en/14/core/dml.html#sqlalchemy.sql.expression.Update.returning

I'm not an expert in SqlAlchemy, but it looks like we use the ORM-friendly session.query().update() methods everywhere, in which case we can't use .returning()

Does my understanding sound correct? If so then we'll have to just mark this one as resolved.

@github-actions
Copy link

github-actions bot commented Sep 2, 2021

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Sep 2, 2021
@SamWheating SamWheating changed the title [WIP] Deactivating DAGs which have been removed from files Deactivating DAGs which have been removed from files Sep 8, 2021
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is just one unresolved comment - gentle ping @SamWheating - https://github.com/apache/airflow/pull/17121/files#r700963529

Happy to approve and merge once that is addressed

@kaxil
Copy link
Member

kaxil commented Sep 18, 2021

Kubernetes test failure are unrelated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
4 participants