Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,19 @@ def _fetch_callbacks(

callback_queue: list[CallbackRequest] = []
with prohibit_commit(session) as guard:
bundle_names = [bundle.name for bundle in self._dag_bundles]
query = select(DbCallbackRequest)
query = query.order_by(DbCallbackRequest.priority_weight.desc()).limit(
self.max_callbacks_per_loop
)
query = with_row_locks(query, of=DbCallbackRequest, session=session, skip_locked=True)
callbacks = session.scalars(query)
for callback in callbacks:
req = callback.get_callback_request()
if req.bundle_name not in bundle_names:
continue
try:
callback_queue.append(callback.get_callback_request())
callback_queue.append(req)
session.delete(callback)
except Exception as e:
self.log.warning("Error adding callback for execution: %s, %s", callback, e)
Expand Down
46 changes: 46 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle):

with configure_testing_dag_bundle(dag_filepath):
manager = DagFileProcessorManager(max_runs=1)
manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())

with create_session() as session:
callbacks = manager._fetch_callbacks(session=session)
Expand Down Expand Up @@ -810,6 +811,51 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_te
manager.run()
assert session.query(DbCallbackRequest).count() == 1

@conf_vars({("core", "load_examples"): "False"})
def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle):
"""Ensure callbacks for bundles not owned by current dag processor manager are ignored and not deleted."""

dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"

# Create two callbacks: one for the active 'testing' bundle and one for a different bundle
matching = DagCallbackRequest(
dag_id="test_start_date_scheduling",
bundle_name="testing",
bundle_version=None,
filepath="test_on_failure_callback_dag.py",
is_failure_callback=True,
run_id="match",
)
non_matching = DagCallbackRequest(
dag_id="test_start_date_scheduling",
bundle_name="other-bundle",
bundle_version=None,
filepath="test_on_failure_callback_dag.py",
is_failure_callback=True,
run_id="no-match",
)

with create_session() as session:
session.add(DbCallbackRequest(callback=matching, priority_weight=100))
session.add(DbCallbackRequest(callback=non_matching, priority_weight=200))

with configure_testing_dag_bundle(dag_filepath):
manager = DagFileProcessorManager(max_runs=1)
manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())

with create_session() as session:
callbacks = manager._fetch_callbacks(session=session)

# Only the matching callback should be returned
assert [c.run_id for c in callbacks] == ["match"]

# The non-matching callback should remain in the DB
remaining = session.query(DbCallbackRequest).all()
assert len(remaining) == 1
# Decode remaining request and verify it's for the other bundle
remaining_req = remaining[0].get_callback_request()
assert remaining_req.bundle_name == "other-bundle"

@mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
def test_callback_queue(self, mock_get_logger, configure_testing_dag_bundle):
mock_logger = MagicMock()
Expand Down