Skip to content

Commit

Permalink
Fix: DAGs are not marked as stale if the dags folder change (#41433) (#…
Browse files Browse the repository at this point in the history
…41829)

* Fix: DAGs are not marked as stale if the AIRFLOW__CORE__DAGS_FOLDER changes

* Update airflow/dag_processing/manager.py

* Add testcase

* Add code comment

* Update code comment

* Update the logic for checking the current dag_directory

* Update testcases

* Remove unwanted code

* Uncomment code

* Add processor_subdir when creating processor_subdir

* Fix test_retry_still_in_executor test

* Remove config from test

* Update airflow/dag_processing/manager.py

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* Update if condition for readability

---------

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
(cherry picked from commit 9f30a41)
  • Loading branch information
utkarsharma2 authored Aug 28, 2024
1 parent 14cd75a commit 996af78
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 6 deletions.
10 changes: 8 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,20 @@ def deactivate_stale_dags(
dags_parsed = session.execute(query)

for dag in dags_parsed:
# When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder,
# DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact
# on standalone DAG processors.
dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
# significant delay in deactivation of stale dags when a large timeout is configured
if (
dag_removed_from_dag_folder_or_file = (
dag.fileloc in last_parsed
and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
):
)

if dag_not_in_current_dag_folder or dag_removed_from_dag_folder_or_file:
cls.logger().info("DAG %s is missing and will be deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)

Expand Down
71 changes: 67 additions & 4 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import random
import socket
import sys
import tempfile
import textwrap
import threading
import time
Expand Down Expand Up @@ -638,7 +639,7 @@ def test_scan_stale_dags(self):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory="directory",
dag_directory=str(TEST_DAG_FOLDER),
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand Down Expand Up @@ -712,11 +713,11 @@ def test_scan_stale_dags_standalone_mode(self):
"""
Ensure only dags from current dag_directory are updated
"""
dag_directory = "directory"
dag_directory = str(TEST_DAG_FOLDER)
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=dag_directory,
dag_directory=TEST_DAG_FOLDER,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand All @@ -740,7 +741,7 @@ def test_scan_stale_dags_standalone_mode(self):
# Add stale DAG to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir="other")
other_dag.sync_to_db(processor_subdir="/other")

# Add DAG to the file_parsing_stats
stat = DagFileStat(
Expand All @@ -762,6 +763,68 @@ def test_scan_stale_dags_standalone_mode(self):
active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1

def test_scan_stale_dags_when_dag_folder_change(self):
"""
Ensure dags from old dag_folder is marked as stale when dag processor
is running as part of scheduler.
"""

def get_dag_string(filename) -> str:
return open(TEST_DAG_FOLDER / filename).read()

with tempfile.TemporaryDirectory() as tmpdir:
old_dag_home = tempfile.mkdtemp(dir=tmpdir)
old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py")
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
old_dag_file.flush()
new_dag_home = tempfile.mkdtemp(dir=tmpdir)
new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py")
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
new_dag_file.flush()

manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=new_dag_home,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)

with create_session() as session:
# Add DAG from old dah home to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.fileloc = old_dag_file.name
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db(processor_subdir=old_dag_home)

# Add DAG from new DAG home to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.fileloc = new_dag_file.name
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir=new_dag_home)

manager.processor._file_paths = [new_dag_file]

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 2

manager.processor._scan_stale_dags()

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 1

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
)
Expand Down
1 change: 1 addition & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3568,6 +3568,7 @@ def test_retry_still_in_executor(self, dag_maker):
dag_id="test_retry_still_in_executor",
schedule="@once",
session=session,
fileloc=os.devnull + "/test_retry_still_in_executor.py",
):
dag_task1 = BashOperator(
task_id="test_retry_handling_op",
Expand Down

0 comments on commit 996af78

Please sign in to comment.