Skip to content

Commit

Permalink
Deactivating DAGs which have been removed from files (#17121)
Browse files Browse the repository at this point in the history
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.

Feel free to ping committers for the review!

In case of existing issue, reference it using one of the following:

closes: #ISSUE
related: #ISSUE

How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->

Closes: #11901
Closes: #17516

Ensuring that the active DAGs in the DB are all actually present in their corresponding python files by reconciling the DB state and with the contents of a given DAG file on every parse operation. 

This _should_ prevent a lot of issues encountered when writing multiple DAGs per-file, renaming DAGs or dynamically generating DAGs based on a config file read at parse-time.

This will cause a few other functional changes:

1) DAGs will disappear from the UI if an ImportError is introduced in the underlying file.
2) DAGs will no longer be re-marked as active if they are inactive / missing but the file is present.
3) DAGs could be incorrectly marked inactive if there's an unexpected parsing error (I guess this is a tradeoff between false-poitives and false-negatives)

#### Validation:

I have validated these changes in a local breeze environment with the following DAG:

```python
from airflow.models import DAG
from airflow import utils
from airflow.operators.python import PythonOperator

NUM_DAGS=1

def message():
    print('Hello, world.')

for i in range(NUM_DAGS):
    with DAG(f'dag-{i}', schedule_interval=None, start_date=utils.dates.days_ago(1)) as dag:
        task = PythonOperator(
            task_id='task',
            python_callable=message
        )
        globals()[f"dag_{i}"] = dag

```

By changing the value of `NUM_DAGS` I can quickly change the number of DAG objects present in this file. 

Before this change, decreasing the value of `NUM_DAGS` would leave a bunch of stale DAGs in the UI. These could be triggered but would then fail as the executor was not able to load the specified task from the file.

After implementing this change, stale DAGs disappear from the UI shortly after decreasing the value of `NUM_DAGS`.

#### Questions:

1. Is there a good reason for Airflow to mark inactive DAGs as active if the file still exists? I looked through the [original PR which introduced this](https://github.com/apache/airflow/pull/5743/files) but couldn't find an explanation.

2. How significant is the performance hit incurred by updating the DAG table on every parse operation?
 
3. Will this change introduce any other functional changes that I haven't mentioned above?

#### Follow up:

If this PR is merged, we should also add some documentation which discusses dynamically generated DAGs, and explains how to generate multiple DAGs in a loop as shown in the example above.
  • Loading branch information
SamWheating authored Sep 18, 2021
1 parent dc94ee2 commit e81f14b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 25 deletions.
14 changes: 13 additions & 1 deletion airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,8 @@ def process_file(
2. Execute any Callbacks if passed to this method.
3. Serialize the DAGs and save it to DB (or update existing record in the DB).
4. Pickle the DAG and save it to the DB (if necessary).
5. Record any errors importing the file into ORM
5. Mark any DAGs which are no longer present as inactive
6. Record any errors importing the file into ORM
:param file_path: the path to the Python file that should be executed
:type file_path: str
Expand All @@ -627,6 +628,8 @@ def process_file(
Stats.incr('dag_file_refresh_error', 1, 1)
return 0, 0

self._deactivate_missing_dags(session, dagbag, file_path)

if len(dagbag.dags) > 0:
self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
else:
Expand Down Expand Up @@ -656,3 +659,12 @@ def process_file(
self.log.exception("Error logging import errors!")

return len(dagbag.dags), len(dagbag.import_errors)

def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
deactivated = (
session.query(DagModel)
.filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
.update({DagModel.is_active: False}, synchronize_session="fetch")
)
if deactivated:
self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)
6 changes: 0 additions & 6 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2809,7 +2809,6 @@ def set_is_paused(self, is_paused: bool, including_subdags: bool = True, session
def deactivate_deleted_dags(cls, alive_dag_filelocs: List[str], session=None):
"""
Set ``is_active=False`` on the DAGs for which the DAG files have been removed.
Additionally change ``is_active=False`` to ``True`` if the DAG file exists.
:param alive_dag_filelocs: file paths of alive DAGs
:param session: ORM Session
Expand All @@ -2821,11 +2820,6 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: List[str], session=None):
if dag_model.fileloc is not None:
if correct_maybe_zipped(dag_model.fileloc) not in alive_dag_filelocs:
dag_model.is_active = False
else:
# If is_active is set as False and the DAG File still exists
# Change is_active=True
if not dag_model.is_active:
dag_model.is_active = True
else:
continue

Expand Down
57 changes: 41 additions & 16 deletions tests/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from airflow import settings
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessor
from airflow.models import DagBag, SlaMiss, TaskInstance, errors
from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance, errors
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.operators.dummy import DummyOperator
from airflow.utils import timezone
Expand Down Expand Up @@ -411,7 +411,7 @@ def test_process_file_should_failure_callback(self, monkeypatch, tmp_path):

@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
def test_add_unparseable_file_before_sched_start_creates_import_error(self, tmpdir):
unparseable_filename = tmpdir / TEMP_DAG_FILENAME
unparseable_filename = os.path.join(tmpdir, TEMP_DAG_FILENAME)
with open(unparseable_filename, 'w') as unparseable_file:
unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)

Expand All @@ -427,8 +427,8 @@ def test_add_unparseable_file_before_sched_start_creates_import_error(self, tmpd

@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
def test_add_unparseable_zip_file_creates_import_error(self, tmpdir):
zip_filename = tmpdir / "test_zip.zip"
invalid_dag_filename = zip_filename / TEMP_DAG_FILENAME
zip_filename = os.path.join(tmpdir, "test_zip.zip")
invalid_dag_filename = os.path.join(zip_filename, TEMP_DAG_FILENAME)
with ZipFile(zip_filename, "w") as zip_file:
zip_file.writestr(TEMP_DAG_FILENAME, UNPARSEABLE_DAG_FILE_CONTENTS)

Expand All @@ -443,21 +443,21 @@ def test_add_unparseable_zip_file_creates_import_error(self, tmpdir):
session.rollback()

def test_no_import_errors_with_parseable_dag(self, tmpdir):
parseable_filename = tmpdir / TEMP_DAG_FILENAME
parseable_filename = os.path.join(tmpdir, TEMP_DAG_FILENAME)

with open(parseable_filename, 'w') as parseable_file:
parseable_file.writelines(PARSEABLE_DAG_FILE_CONTENTS)

with create_session() as session:
self._process_file(parseable_file, session)
self._process_file(parseable_filename, session)
import_errors = session.query(errors.ImportError).all()

assert len(import_errors) == 0

session.rollback()

def test_no_import_errors_with_parseable_dag_in_zip(self, tmpdir):
zip_filename = tmpdir / "test_zip.zip"
zip_filename = os.path.join(tmpdir, "test_zip.zip")
with ZipFile(zip_filename, "w") as zip_file:
zip_file.writestr(TEMP_DAG_FILENAME, PARSEABLE_DAG_FILE_CONTENTS)

Expand All @@ -471,7 +471,7 @@ def test_no_import_errors_with_parseable_dag_in_zip(self, tmpdir):

@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
def test_new_import_error_replaces_old(self, tmpdir):
unparseable_filename = tmpdir / TEMP_DAG_FILENAME
unparseable_filename = os.path.join(tmpdir, TEMP_DAG_FILENAME)

# Generate original import error
with open(unparseable_filename, 'w') as unparseable_file:
Expand All @@ -496,7 +496,7 @@ def test_new_import_error_replaces_old(self, tmpdir):
session.rollback()

def test_remove_error_clears_import_error(self, tmpdir):
filename_to_parse = tmpdir / TEMP_DAG_FILENAME
filename_to_parse = os.path.join(tmpdir, TEMP_DAG_FILENAME)

# Generate original import error
with open(filename_to_parse, 'w') as file_to_parse:
Expand All @@ -519,7 +519,7 @@ def test_remove_error_clears_import_error_zip(self, tmpdir):
session = settings.Session()

# Generate original import error
zip_filename = tmpdir / "test_zip.zip"
zip_filename = os.path.join(tmpdir, "test_zip.zip")
with ZipFile(zip_filename, "w") as zip_file:
zip_file.writestr(TEMP_DAG_FILENAME, UNPARSEABLE_DAG_FILE_CONTENTS)
self._process_file(zip_filename, session)
Expand All @@ -538,7 +538,7 @@ def test_remove_error_clears_import_error_zip(self, tmpdir):
session.rollback()

def test_import_error_tracebacks(self, tmpdir):
unparseable_filename = tmpdir / TEMP_DAG_FILENAME
unparseable_filename = os.path.join(tmpdir, TEMP_DAG_FILENAME)
with open(unparseable_filename, "w") as unparseable_file:
unparseable_file.writelines(INVALID_DAG_WITH_DEPTH_FILE_CONTENTS)

Expand All @@ -564,7 +564,7 @@ def test_import_error_tracebacks(self, tmpdir):

@conf_vars({("core", "dagbag_import_error_traceback_depth"): "1"})
def test_import_error_traceback_depth(self, tmpdir):
unparseable_filename = tmpdir / TEMP_DAG_FILENAME
unparseable_filename = os.path.join(tmpdir, TEMP_DAG_FILENAME)
with open(unparseable_filename, "w") as unparseable_file:
unparseable_file.writelines(INVALID_DAG_WITH_DEPTH_FILE_CONTENTS)

Expand All @@ -586,8 +586,8 @@ def test_import_error_traceback_depth(self, tmpdir):
session.rollback()

def test_import_error_tracebacks_zip(self, tmpdir):
invalid_zip_filename = tmpdir / "test_zip_invalid.zip"
invalid_dag_filename = invalid_zip_filename / TEMP_DAG_FILENAME
invalid_zip_filename = os.path.join(tmpdir, "test_zip_invalid.zip")
invalid_dag_filename = os.path.join(invalid_zip_filename, TEMP_DAG_FILENAME)
with ZipFile(invalid_zip_filename, "w") as invalid_zip_file:
invalid_zip_file.writestr(TEMP_DAG_FILENAME, INVALID_DAG_WITH_DEPTH_FILE_CONTENTS)

Expand All @@ -613,8 +613,8 @@ def test_import_error_tracebacks_zip(self, tmpdir):

@conf_vars({("core", "dagbag_import_error_traceback_depth"): "1"})
def test_import_error_tracebacks_zip_depth(self, tmpdir):
invalid_zip_filename = tmpdir / "test_zip_invalid.zip"
invalid_dag_filename = invalid_zip_filename / TEMP_DAG_FILENAME
invalid_zip_filename = os.path.join(tmpdir, "test_zip_invalid.zip")
invalid_dag_filename = os.path.join(invalid_zip_filename, TEMP_DAG_FILENAME)
with ZipFile(invalid_zip_filename, "w") as invalid_zip_file:
invalid_zip_file.writestr(TEMP_DAG_FILENAME, INVALID_DAG_WITH_DEPTH_FILE_CONTENTS)

Expand All @@ -633,3 +633,28 @@ def test_import_error_tracebacks_zip_depth(self, tmpdir):
)
assert import_error.stacktrace == expected_stacktrace.format(invalid_dag_filename)
session.rollback()

def test_process_file_should_deactivate_missing_dags(self):

dag_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), '../dags/test_only_dummy_tasks.py'
)

# write a DAG into the DB which is not present in its specified file
with create_session() as session:
orm_dag = DagModel(dag_id='missing_dag', is_active=True, fileloc=dag_file)
session.merge(orm_dag)
session.commit()

session = settings.Session()

dags = session.query(DagModel).all()
assert [dag.dag_id for dag in dags if dag.is_active] == ['missing_dag']

# re-parse the file and see that the DAG is no longer there
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag_file_processor.process_file(dag_file, [])

dags = session.query(DagModel).all()
assert [dag.dag_id for dag in dags if dag.is_active] == ['test_only_dummy_tasks']
assert [dag.dag_id for dag in dags if not dag.is_active] == ['missing_dag']
5 changes: 3 additions & 2 deletions tests/www/views/test_views_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import os
from unittest import mock

import flask
Expand Down Expand Up @@ -128,7 +129,7 @@ def working_dags(tmpdir):

with create_session() as session:
for dag_id in TEST_FILTER_DAG_IDS:
filename = tmpdir / f"{dag_id}.py"
filename = os.path.join(tmpdir, f"{dag_id}.py")
with open(filename, "w") as f:
f.writelines(dag_contents_template.format(dag_id))
_process_file(filename, session)
Expand All @@ -138,7 +139,7 @@ def working_dags(tmpdir):
def broken_dags(tmpdir, working_dags):
with create_session() as session:
for dag_id in TEST_FILTER_DAG_IDS:
filename = tmpdir / f"{dag_id}.py"
filename = os.path.join(tmpdir, f"{dag_id}.py")
with open(filename, "w") as f:
f.writelines('airflow DAG')
_process_file(filename, session)
Expand Down

0 comments on commit e81f14b

Please sign in to comment.