Skip to content

Outlet Event Extra Data is Empty in Task Instance Success Listener #52574

@dygger

Description

@dygger

Apache Airflow version

3.0.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When accessing outlet event extra data in the on_task_instance_success listener hook, the extra field returns an empty dict even though the task function explicitly sets non-empty extra data.

What you think should happen instead?

The extra field in outlet events should contain the data set by the task and be accessible in the listener hook.

How to reproduce

DAGs

with DAG(
    dag_id="asset-producer",
    dagrun_timeout=timedelta(minutes=10),
    start_date=datetime(2021, 1, 1),
    schedule=None,
    catchup=False,
    is_paused_upon_creation=False,
    tags=["test"],
):

    @task(outlets=[asset])
    def produce(
        *,
        params: dict | None = None,
        outlet_events=None,
    ):
        outlet_events[asset].extra = {"name1": "value1", "nested_obj": {"name2": "value2"}}
        logger.info(f"Outlet events: {outlet_events}")

    produce()


with DAG(
    dag_id="asset-consumer",
    start_date=datetime(2021, 1, 1),
    schedule=[asset],
    catchup=False,
    is_paused_upon_creation=False,
    tags=["test"],
):

    @task(inlets=[asset])
    def consume(*, triggering_asset_events=None):
        for k, v in triggering_asset_events.items():
            logger.info(f"[Consumer] triggering_asset_event: {k}: {v}")

    consume()

Listener

@hookimpl
def on_task_instance_success(
    previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance
):
    logger.info(f"on_task_instance_success: {task_instance.task_id}")
    if isinstance(task_instance, RuntimeTaskInstance):
        context = task_instance.get_template_context()
        outlets = context["outlets"]
        logger.info(f"on_task_instance_success outlets: {outlets}")
        outlet_events = context["outlet_events"]
        for outlet in outlets:
            event = outlet_events[outlet]
            logger.info(f"on_task_instance_success outlet event: {event}")

Logs

Producer (event extra is empty in the listener)

[2025-06-30, 16:56:00] INFO - Outlet events: OutletEventAccessors(_dict={AssetUniqueKey(name='test-asset', uri='test-asset'): OutletEventAccessor(key=AssetUniqueKey(name='test-asset', uri='test-asset'), extra={'name1': 'value1', 'nested_obj': {'name2': 'value2'}}, asset_alias_events=[])}): source="unusual_prefix_e7352dd1f756ead6f66dfc[4](http://localhost:8080/dags/asset-producer/runs/manual__2025-06-30T13:55:55.913030+00:00/tasks/produce?try_number=1#4)89eeb1b1519f4e074_outlet_test_dag"
[2025-06-30, 16:56:00] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-06-30, 16:56:00] INFO - on_task_instance_success: produce: source="pipeline_run_listener"
[2025-06-30, 16:56:00] INFO - on_task_instance_success outlets: [Asset(name='test-asset', uri='test-asset', group='asset', extra={}, watchers=[])]: source="pipeline_run_listener"
[202[5](http://localhost:8080/dags/asset-producer/runs/manual__2025-06-30T13:55:55.913030+00:00/tasks/produce?try_number=1#5)-06-30, 16:56:00] INFO - on_task_instance_success outlet event: OutletEventAccessor(key=AssetUniqueKey(name='test-asset', uri='test-asset'), extra={}, asset_alias_events=[]): source="pipeline_run_listener"

Consumer (event extra is correct)

[2025-06-30, 16:56:03] INFO - [Consumer] triggering_asset_event: Asset(name='test-asset', uri='test-asset', group='asset', extra={}, watchers=[]): [AssetEventDagRunReferenceResult(asset=AssetReferenceAssetEventDagRun(name='test-asset', uri='test-asset', extra={}), extra={'name1': 'value1', 'nested_obj': {'name2': 'value2'}}, source_task_id='produce', source_dag_id='asset-producer', source_run_id='manual__2025-06-30T13:55:55.913030+00:00', source_map_index=-1, source_aliases=[], timestamp=datetime.datetime(2025, 6, 30, 13, 56, 0, 859276, tzinfo=TzInfo(UTC)))]: source="unusual_prefix_e7352dd1f756ead6f66dfc[4](http://localhost:8080/dags/asset-consumer/runs/asset_triggered__2025-06-30T13:56:00.882558+00:00_orhaCyKJ/tasks/consume?try_number=1#4)89eeb1b1519f4e074_outlet_test_dag"
[2025-06-30, 16:56:03] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"

Operating System

Ubuntu 24.04.2 LTS

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions