Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5fd0b17
Extract airflow listeners as a shared library
amoghrajesh Dec 29, 2025
c9f51bb
bump pluggy
amoghrajesh Dec 29, 2025
eb89147
remove registration factory from shared
amoghrajesh Dec 29, 2025
c870f64
bump pluggy
amoghrajesh Dec 29, 2025
f2d3407
bump pluggy
amoghrajesh Dec 29, 2025
eace825
use relative paths
amoghrajesh Dec 29, 2025
c75eae9
use relative paths
amoghrajesh Dec 29, 2025
9008b10
adding basic tests for listeners
amoghrajesh Dec 29, 2025
0fe673a
introducing a fixture for listener manager
amoghrajesh Dec 30, 2025
e82fbb0
migrating tests in core to use listener manager fixture
amoghrajesh Dec 30, 2025
55006d7
migrating tests in task sdk to use listener manager fixture
amoghrajesh Dec 30, 2025
73a96fc
adding listeners to compat module for providers
amoghrajesh Dec 30, 2025
40ac755
updating usage of listeners in OL
amoghrajesh Dec 30, 2025
51e88c3
improving the fixture
amoghrajesh Dec 30, 2025
dfab87f
updating OL tests to use fixture for listeners
amoghrajesh Dec 30, 2025
8a096c7
Merge branch 'main' into move-listeners-to-shared
amoghrajesh Dec 30, 2025
cf72e69
Merge branch 'main' into move-listeners-to-shared
amoghrajesh Jan 6, 2026
ab3e249
remove todo
amoghrajesh Jan 6, 2026
eac00cf
switch to structlog
amoghrajesh Jan 6, 2026
7df5e9b
import structure changes
amoghrajesh Jan 6, 2026
3a24db9
import structure changes
amoghrajesh Jan 6, 2026
6a21b5f
fix compat shim
amoghrajesh Jan 6, 2026
842c32f
fix compat shim
amoghrajesh Jan 6, 2026
eca2451
fix compat shim
amoghrajesh Jan 6, 2026
ad24930
fixing failing tests
amoghrajesh Jan 6, 2026
e0b5148
Merge branch 'main' into move-listeners-to-shared
amoghrajesh Jan 6, 2026
6877446
fixing failing tests
amoghrajesh Jan 6, 2026
c463664
inject spec as necessary
amoghrajesh Jan 7, 2026
4a63f94
Merge branch 'main' into move-listeners-to-shared
amoghrajesh Jan 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ dependencies = [
# Start of shared configuration dependencies
"pyyaml>=6.0.3",
# End of shared configuration dependencies
# Start of shared listeners dependencies
"pluggy>=1.5.0",
# End of shared listeners dependencies
]


Expand Down Expand Up @@ -235,6 +238,7 @@ exclude = [
"../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/_shared/secrets_backend"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker"
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"
"../shared/listeners/src/airflow_shared/listeners" = "src/airflow/_shared/listeners"
"../shared/plugins_manager/src/airflow_shared/plugins_manager" = "src/airflow/_shared/plugins_manager"

[tool.hatch.build.targets.custom]
Expand Down Expand Up @@ -305,6 +309,7 @@ apache-airflow-devel-common = { workspace = true }
shared_distributions = [
"apache-airflow-shared-configuration",
"apache-airflow-shared-dagnode",
"apache-airflow-shared-listeners",
"apache-airflow-shared-logging",
"apache-airflow-shared-module-loading",
"apache-airflow-shared-observability",
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/_shared/listeners
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/listeners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
# under the License.
from __future__ import annotations

from pluggy import HookimplMarker
from airflow._shared.listeners import hookimpl

hookimpl = HookimplMarker("airflow")
__all__ = ["hookimpl"]
82 changes: 23 additions & 59 deletions airflow-core/src/airflow/listeners/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,72 +17,36 @@
# under the License.
from __future__ import annotations

import logging
from functools import cache
from typing import TYPE_CHECKING

import pluggy

from airflow._shared.listeners.listener import ListenerManager
from airflow._shared.listeners.spec import lifecycle, taskinstance
from airflow.listeners.spec import asset, dagrun, importerrors
from airflow.plugins_manager import integrate_listener_plugins

if TYPE_CHECKING:
from pluggy._hooks import _HookRelay

log = logging.getLogger(__name__)


def _before_hookcall(hook_name, hook_impls, kwargs):
log.debug("Calling %r with %r", hook_name, kwargs)
log.debug("Hook impls: %s", hook_impls)


def _after_hookcall(outcome, hook_name, hook_impls, kwargs):
log.debug("Result from %r: %s", hook_name, outcome.get_result())


class ListenerManager:
"""Manage listener registration and provides hook property for calling them."""

def __init__(self):
from airflow.listeners.spec import (
asset,
dagrun,
importerrors,
lifecycle,
taskinstance,
)

self.pm = pluggy.PluginManager("airflow")
self.pm.add_hookcall_monitoring(_before_hookcall, _after_hookcall)
self.pm.add_hookspecs(lifecycle)
self.pm.add_hookspecs(dagrun)
self.pm.add_hookspecs(asset)
self.pm.add_hookspecs(taskinstance)
self.pm.add_hookspecs(importerrors)

@property
def has_listeners(self) -> bool:
return bool(self.pm.get_plugins())

@property
def hook(self) -> _HookRelay:
"""Return hook, on which plugin methods specified in spec can be called."""
return self.pm.hook

def add_listener(self, listener):
if self.pm.is_registered(listener):
return
self.pm.register(listener)

def clear(self):
"""Remove registered plugins."""
for plugin in self.pm.get_plugins():
self.pm.unregister(plugin)


@cache
def get_listener_manager() -> ListenerManager:
"""Get singleton listener manager."""
"""
Get a listener manager for Airflow core.

Registers the following listeners:
- lifecycle: on_starting, before_stopping
- dagrun: on_dag_run_running, on_dag_run_success, on_dag_run_failed
- taskinstance: on_task_instance_running, on_task_instance_success, etc.
- asset: on_asset_created, on_asset_changed, etc.
- importerrors: on_new_dag_import_error, on_existing_dag_import_error
"""
_listener_manager = ListenerManager()

_listener_manager.add_hookspecs(lifecycle)
_listener_manager.add_hookspecs(dagrun)
_listener_manager.add_hookspecs(taskinstance)
_listener_manager.add_hookspecs(asset)
_listener_manager.add_hookspecs(importerrors)

integrate_listener_plugins(_listener_manager)
return _listener_manager


__all__ = ["get_listener_manager", "ListenerManager"]
1 change: 1 addition & 0 deletions airflow-core/src/airflow/listeners/spec/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/listeners/spec/importerrors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from pluggy import HookspecMarker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from airflow._shared.timezones import timezone
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun, Log
from airflow.models.asset import AssetEvent, AssetModel
from airflow.providers.standard.operators.empty import EmptyOperator
Expand Down Expand Up @@ -1285,12 +1284,6 @@ def test_patch_dag_run_bad_request(self, test_client):
body = response.json()
assert body["detail"][0]["msg"] == "Input should be 'queued', 'success' or 'failed'"

@pytest.fixture(autouse=True)
def clean_listener_manager(self):
get_listener_manager().clear()
yield
get_listener_manager().clear()

@pytest.mark.parametrize(
("state", "listener_state"),
[
Expand All @@ -1300,11 +1293,11 @@ def clean_listener_manager(self):
],
)
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_patch_dag_run_notifies_listeners(self, test_client, state, listener_state):
def test_patch_dag_run_notifies_listeners(self, test_client, state, listener_state, listener_manager):
from unit.listeners.class_listener import ClassBasedListener

listener = ClassBasedListener()
get_listener_manager().add_listener(listener)
listener_manager(listener)
response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}", json={"state": state})
assert response.status_code == 200
assert listener.state == listener_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagRun, Log, TaskInstance
from airflow.models.dag_version import DagVersion
from airflow.models.hitl import HITLDetail
Expand Down Expand Up @@ -4084,12 +4083,6 @@ class TestPatchTaskInstance(TestTaskInstanceEndpoint):
TASK_ID = "print_the_context"
RUN_ID = "TEST_DAG_RUN_ID"

@pytest.fixture(autouse=True)
def clean_listener_manager(self):
get_listener_manager().clear()
yield
get_listener_manager().clear()

@pytest.mark.parametrize(
("state", "listener_state"),
[
Expand All @@ -4098,13 +4091,15 @@ def clean_listener_manager(self):
("skipped", []),
],
)
def test_patch_task_instance_notifies_listeners(self, test_client, session, state, listener_state):
def test_patch_task_instance_notifies_listeners(
self, test_client, session, state, listener_state, listener_manager
):
from unit.listeners.class_listener import ClassBasedListener

self.create_task_instances(session)

listener = ClassBasedListener()
get_listener_manager().add_listener(listener)
listener_manager(listener)
test_client.patch(
self.ENDPOINT_URL,
json={
Expand Down
9 changes: 4 additions & 5 deletions airflow-core/tests/unit/assets/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

from airflow import settings
from airflow.assets.manager import AssetManager
from airflow.listeners.listener import get_listener_manager
from airflow.models.asset import (
AssetAliasModel,
AssetDagRunQueue,
Expand Down Expand Up @@ -183,11 +182,11 @@ def test_register_asset_change_no_downstreams(self, session, mock_task_instance)
assert session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 0

def test_register_asset_change_notifies_asset_listener(
self, session, mock_task_instance, testing_dag_bundle
self, session, mock_task_instance, testing_dag_bundle, listener_manager
):
asset_manager = AssetManager()
asset_listener.clear()
get_listener_manager().add_listener(asset_listener)
listener_manager(asset_listener)

bundle_name = "testing"

Expand All @@ -207,10 +206,10 @@ def test_register_asset_change_notifies_asset_listener(
assert len(asset_listener.changed) == 1
assert asset_listener.changed[0].uri == asset.uri

def test_create_assets_notifies_asset_listener(self, session):
def test_create_assets_notifies_asset_listener(self, session, listener_manager):
asset_manager = AssetManager()
asset_listener.clear()
get_listener_manager().add_listener(asset_listener)
listener_manager(asset_listener)

asset = Asset(uri="test://asset1", name="test_asset_1")

Expand Down
6 changes: 2 additions & 4 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
update_dag_parsing_results_in_db,
)
from airflow.exceptions import SerializationError
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import (
AssetActive,
Expand Down Expand Up @@ -321,12 +320,11 @@ def clean_db(self, session):
clear_db_import_errors()

@pytest.fixture(name="dag_import_error_listener")
def _dag_import_error_listener(self):
def _dag_import_error_listener(self, listener_manager):
from unit.listeners import dag_import_error_listener

get_listener_manager().add_listener(dag_import_error_listener)
listener_manager(dag_import_error_listener)
yield dag_import_error_listener
get_listener_manager().clear()
dag_import_error_listener.clear()

@mark_fab_auth_manager_test
Expand Down
5 changes: 2 additions & 3 deletions airflow-core/tests/unit/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from airflow._shared.timezones import timezone
from airflow.executors.local_executor import LocalExecutor
from airflow.jobs.job import Job, health_check_threshold, most_recent_job, perform_heartbeat, run_job
from airflow.listeners.listener import get_listener_manager
from airflow.utils.session import create_session
from airflow.utils.state import State

Expand Down Expand Up @@ -68,11 +67,11 @@ def test_base_job_respects_plugin_hooks(self):
assert job.state == State.SUCCESS
assert job.end_date is not None

def test_base_job_respects_plugin_lifecycle(self, dag_maker):
def test_base_job_respects_plugin_lifecycle(self, dag_maker, listener_manager):
"""
Test if DagRun is successful, and if Success callbacks is defined, it is sent to DagFileProcessor.
"""
get_listener_manager().add_listener(lifecycle_listener)
listener_manager(lifecycle_listener)

job = Job()
job_runner = MockJobRunner(job=job, func=lambda: sys.exit(0))
Expand Down
11 changes: 6 additions & 5 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@
from tests_common.test_utils.mock_operators import CustomOperator
from tests_common.test_utils.taskinstance import create_task_instance, run_task_instance
from unit.listeners import dag_listener
from unit.listeners.test_listeners import get_listener_manager
from unit.models import TEST_DAGS_FOLDER

if TYPE_CHECKING:
Expand Down Expand Up @@ -3190,7 +3189,9 @@ def test_dagrun_callbacks_are_called(self, state, expected_callback_msg, dag_mak
("state", "expected_callback_msg"), [(State.SUCCESS, "success"), (State.FAILED, "task_failure")]
)
@conf_vars({("scheduler", "use_job_schedule"): "False"})
def test_dagrun_plugins_are_notified(self, state, expected_callback_msg, dag_maker, session):
def test_dagrun_plugins_are_notified(
self, state, expected_callback_msg, dag_maker, session, listener_manager
):
"""
Test if DagRun is successful, and if Success callbacks is defined, it is sent to DagFileProcessor.
"""
Expand All @@ -3203,7 +3204,7 @@ def test_dagrun_plugins_are_notified(self, state, expected_callback_msg, dag_mak
EmptyOperator(task_id="dummy")

dag_listener.clear()
get_listener_manager().add_listener(dag_listener)
listener_manager(dag_listener)

scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
Expand Down Expand Up @@ -3374,7 +3375,7 @@ def test_dagrun_callbacks_are_added_when_callbacks_are_defined(self, state, msg,
session.close()

@conf_vars({("scheduler", "use_job_schedule"): "False"})
def test_dagrun_notify_called_success(self, dag_maker):
def test_dagrun_notify_called_success(self, dag_maker, listener_manager):
with dag_maker(
dag_id="test_dagrun_notify_called",
on_success_callback=lambda x: print("success"),
Expand All @@ -3383,7 +3384,7 @@ def test_dagrun_notify_called_success(self, dag_maker):
EmptyOperator(task_id="dummy")

dag_listener.clear()
get_listener_manager().add_listener(dag_listener)
listener_manager(dag_listener)

executor = MockExecutor(do_update=False)

Expand Down
14 changes: 6 additions & 8 deletions airflow-core/tests/unit/listeners/test_asset_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import pytest

from airflow.listeners.listener import get_listener_manager
from airflow.models.asset import AssetModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
Expand All @@ -28,19 +27,18 @@


@pytest.fixture(autouse=True)
def clean_listener_manager():
lm = get_listener_manager()
lm.clear()
lm.add_listener(asset_listener)
def clean_listener_state():
"""Clear listener state after each test."""
yield
lm = get_listener_manager()
lm.clear()
asset_listener.clear()


@pytest.mark.db_test
@provide_session
def test_asset_listener_on_asset_changed_gets_calls(create_task_instance_of_operator, session):
def test_asset_listener_on_asset_changed_gets_calls(
create_task_instance_of_operator, session, listener_manager
):
listener_manager(asset_listener)
asset_uri = "test://asset/"
asset_name = "test_asset_uri"
asset_group = "test-group"
Expand Down
Loading
Loading