Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,12 @@ def _collect_results(self, session: Session = NEW_SESSION):
continue
finished.append(file)

# Detect if this was callback-only processing
# For such-cases, we don't serialize the dags and hence send parsing_result as None.
is_callback_only = proc.had_callbacks and proc.parsing_result is None
if is_callback_only:
self.log.debug("Detected callback-only processing for %s", file)

# Collect the DAGS and import errors into the DB, emit metrics etc.
self._file_stats[file] = process_parse_results(
run_duration=time.monotonic() - proc.start_time,
Expand All @@ -826,6 +832,7 @@ def _collect_results(self, session: Session = NEW_SESSION):
bundle_version=self._bundle_versions[file.bundle_name],
parsing_result=proc.parsing_result,
session=session,
is_callback_only=is_callback_only,
)

for file in finished:
Expand Down Expand Up @@ -1109,21 +1116,35 @@ def process_parse_results(
bundle_version: str | None,
parsing_result: DagFileParsingResult | None,
session: Session,
*,
is_callback_only: bool = False,
) -> DagFileStat:
"""Take the parsing result and stats about the parser process and convert it into a DagFileStat."""
stat = DagFileStat(
last_finish_time=finish_time,
last_duration=run_duration,
run_count=run_count + 1,
)
if is_callback_only:
# Callback-only processing - don't update timestamps to avoid stale DAG detection issues
stat = DagFileStat(
last_duration=run_duration,
run_count=run_count, # Don't increment for callback-only processing
)
Stats.incr("dag_processing.callback_only_count")
else:
# Actual DAG parsing or import error
stat = DagFileStat(
last_finish_time=finish_time,
last_duration=run_duration,
run_count=run_count + 1,
)

# TODO: AIP-66 emit metrics
# file_name = Path(dag_file.path).stem
# Stats.timing(f"dag_processing.last_duration.{file_name}", stat.last_duration)
# Stats.timing("dag_processing.last_duration", stat.last_duration, tags={"file_name": file_name})

if parsing_result is None:
stat.import_errors = 1
# No DAGs were parsed - this happens for callback-only processing
# Don't treat this as an import error when it's callback-only
if not is_callback_only:
stat.import_errors = 1
else:
# record DAGs and import errors to database
import_errors = {}
Expand Down
2 changes: 2 additions & 0 deletions airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
logger_filehandle: BinaryIO
parsing_result: DagFileParsingResult | None = None
decoder: ClassVar[TypeAdapter[ToManager]] = TypeAdapter[ToManager](ToManager)
had_callbacks: bool = False # Track if this process was started with callbacks to prevent stale DAG detection false positives

client: Client
"""The HTTP client to use for communication with the API server."""
Expand All @@ -458,6 +459,7 @@ def start( # type: ignore[override]
_pre_import_airflow_modules(os.fspath(path), logger)

proc: Self = super().start(target=target, client=client, **kwargs)
proc.had_callbacks = bool(callbacks) # Track if this process had callbacks
proc._on_child_started(callbacks, path, bundle_path)
return proc

Expand Down
58 changes: 58 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
EmailNotificationRequest,
TaskCallbackRequest,
)
from airflow.dag_processing.manager import process_parse_results
from airflow.dag_processing.processor import (
DagFileParseRequest,
DagFileParsingResult,
Expand Down Expand Up @@ -581,6 +582,63 @@ def fake_collect_dags(self, *args, **kwargs):
assert called is True


def test_callback_processing_does_not_update_timestamps(session):
"""Callback processing should not update last_finish_time to prevent stale DAG detection."""
stat = process_parse_results(
run_duration=1.0,
finish_time=timezone.utcnow(),
run_count=5,
bundle_name="test",
bundle_version=None,
parsing_result=None,
session=session,
is_callback_only=True,
)

assert stat.last_finish_time is None
assert stat.run_count == 5


def test_normal_parsing_updates_timestamps(session):
"""last_finish_time should be updated when parsing a dag file."""
finish_time = timezone.utcnow()

stat = process_parse_results(
run_duration=2.0,
finish_time=finish_time,
run_count=3,
bundle_name="test-bundle",
bundle_version="v1",
parsing_result=DagFileParsingResult(fileloc="test.py", serialized_dags=[]),
session=session,
is_callback_only=False,
)

assert stat.last_finish_time == finish_time
assert stat.run_count == 4
assert stat.import_errors == 0


def test_import_error_updates_timestamps(session):
"""last_finish_time should be updated when parsing a dag file results in import errors."""
finish_time = timezone.utcnow()

stat = process_parse_results(
run_duration=1.5,
finish_time=finish_time,
run_count=2,
bundle_name="test-bundle",
bundle_version="v1",
parsing_result=None,
session=session,
is_callback_only=False,
)

assert stat.last_finish_time == finish_time
assert stat.run_count == 3
assert stat.import_errors == 1


class TestExecuteDagCallbacks:
"""Test the _execute_dag_callbacks function with context_from_server"""

Expand Down
Loading