From 326c7f97735367b9083302cc89f66233553e1711 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 15 Sep 2025 19:48:39 +0100 Subject: [PATCH] Fix DAG disappearing after callback execution in stale detection When `min_file_process_interval` exceeds `stale_dag_threshold`, DAGs would incorrectly disappear after callback processing. This occurred because callback-only processing updated file processing timestamps but not DAG parsing timestamps, causing stale DAG detection to trigger false positives. The fix prevents callback-only processing from updating `last_finish_time`, ensuring DAGs remain active when only callbacks are executed. fixes #55315 --- .../src/airflow/dag_processing/manager.py | 33 +++++++++-- .../src/airflow/dag_processing/processor.py | 2 + .../unit/dag_processing/test_processor.py | 58 +++++++++++++++++++ 3 files changed, 87 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 725e354610efd..73f74c415abc1 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -817,6 +817,12 @@ def _collect_results(self, session: Session = NEW_SESSION): continue finished.append(file) + # Detect if this was callback-only processing + # For such-cases, we don't serialize the dags and hence send parsing_result as None. + is_callback_only = proc.had_callbacks and proc.parsing_result is None + if is_callback_only: + self.log.debug("Detected callback-only processing for %s", file) + # Collect the DAGS and import errors into the DB, emit metrics etc. self._file_stats[file] = process_parse_results( run_duration=time.monotonic() - proc.start_time, @@ -826,6 +832,7 @@ def _collect_results(self, session: Session = NEW_SESSION): bundle_version=self._bundle_versions[file.bundle_name], parsing_result=proc.parsing_result, session=session, + is_callback_only=is_callback_only, ) for file in finished: @@ -1109,13 +1116,24 @@ def process_parse_results( bundle_version: str | None, parsing_result: DagFileParsingResult | None, session: Session, + *, + is_callback_only: bool = False, ) -> DagFileStat: """Take the parsing result and stats about the parser process and convert it into a DagFileStat.""" - stat = DagFileStat( - last_finish_time=finish_time, - last_duration=run_duration, - run_count=run_count + 1, - ) + if is_callback_only: + # Callback-only processing - don't update timestamps to avoid stale DAG detection issues + stat = DagFileStat( + last_duration=run_duration, + run_count=run_count, # Don't increment for callback-only processing + ) + Stats.incr("dag_processing.callback_only_count") + else: + # Actual DAG parsing or import error + stat = DagFileStat( + last_finish_time=finish_time, + last_duration=run_duration, + run_count=run_count + 1, + ) # TODO: AIP-66 emit metrics # file_name = Path(dag_file.path).stem @@ -1123,7 +1141,10 @@ def process_parse_results( # Stats.timing("dag_processing.last_duration", stat.last_duration, tags={"file_name": file_name}) if parsing_result is None: - stat.import_errors = 1 + # No DAGs were parsed - this happens for callback-only processing + # Don't treat this as an import error when it's callback-only + if not is_callback_only: + stat.import_errors = 1 else: # record DAGs and import errors to database import_errors = {} diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 616bd1abbe950..a86d308c8c59e 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -438,6 +438,7 @@ class DagFileProcessorProcess(WatchedSubprocess): logger_filehandle: BinaryIO parsing_result: DagFileParsingResult | None = None decoder: ClassVar[TypeAdapter[ToManager]] = TypeAdapter[ToManager](ToManager) + had_callbacks: bool = False # Track if this process was started with callbacks to prevent stale DAG detection false positives client: Client """The HTTP client to use for communication with the API server.""" @@ -458,6 +459,7 @@ def start( # type: ignore[override] _pre_import_airflow_modules(os.fspath(path), logger) proc: Self = super().start(target=target, client=client, **kwargs) + proc.had_callbacks = bool(callbacks) # Track if this process had callbacks proc._on_child_started(callbacks, path, bundle_path) return proc diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index 37d9236face60..8acba729f0589 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -47,6 +47,7 @@ EmailNotificationRequest, TaskCallbackRequest, ) +from airflow.dag_processing.manager import process_parse_results from airflow.dag_processing.processor import ( DagFileParseRequest, DagFileParsingResult, @@ -581,6 +582,63 @@ def fake_collect_dags(self, *args, **kwargs): assert called is True +def test_callback_processing_does_not_update_timestamps(session): + """Callback processing should not update last_finish_time to prevent stale DAG detection.""" + stat = process_parse_results( + run_duration=1.0, + finish_time=timezone.utcnow(), + run_count=5, + bundle_name="test", + bundle_version=None, + parsing_result=None, + session=session, + is_callback_only=True, + ) + + assert stat.last_finish_time is None + assert stat.run_count == 5 + + +def test_normal_parsing_updates_timestamps(session): + """last_finish_time should be updated when parsing a dag file.""" + finish_time = timezone.utcnow() + + stat = process_parse_results( + run_duration=2.0, + finish_time=finish_time, + run_count=3, + bundle_name="test-bundle", + bundle_version="v1", + parsing_result=DagFileParsingResult(fileloc="test.py", serialized_dags=[]), + session=session, + is_callback_only=False, + ) + + assert stat.last_finish_time == finish_time + assert stat.run_count == 4 + assert stat.import_errors == 0 + + +def test_import_error_updates_timestamps(session): + """last_finish_time should be updated when parsing a dag file results in import errors.""" + finish_time = timezone.utcnow() + + stat = process_parse_results( + run_duration=1.5, + finish_time=finish_time, + run_count=2, + bundle_name="test-bundle", + bundle_version="v1", + parsing_result=None, + session=session, + is_callback_only=False, + ) + + assert stat.last_finish_time == finish_time + assert stat.run_count == 3 + assert stat.import_errors == 1 + + class TestExecuteDagCallbacks: """Test the _execute_dag_callbacks function with context_from_server"""