-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
Other Airflow 2/3 version (please specify below)
If "Other Airflow 2/3 version" selected, which one?
3.0.6
What happened?
When using TaskFlow-style asset event emission with yield Metadata(...), the task fails if extra contains a nested dict (e.g., {"nested": {"key": "value"}}).
Flat dicts work fine.
Operator-style mutation of JSON-encoding the nested portion also works.outlet_events[...] does accept nested dicts, and
EDIT: Operator-style mutation of outlet_events[...] does accept nested dicts (the tasks do not fail), but does not include the extra in the Asset Event.
The failure occurs after the function completes—likely during serialization or persistence of the asset event—so logs look normal until teardown.
EDIT_2: This seems to also happen with other types such as list.
What you think should happen instead?
Metadata.extra should accept any JSON-compatible structure, including nested dicts, as documented. Nested dicts are valid JSON and should not cause task failure.
How to reproduce
import json
from airflow.sdk import Asset, AssetAlias, dag, task
from airflow.sdk.definitions.asset.metadata import Metadata
@dag(schedule=None, tags=['test'])
def test_metadata_extra_nested():
@task(outlets=[AssetAlias('dummy_asset_alias')])
def working():
asset_name = 'dummy_asset'
yield Metadata(
asset=Asset(asset_name),
alias=AssetAlias('dummy_asset_alias'),
extra={'key': 'value'}, # works (flat)
)
return asset_name
@task(outlets=[AssetAlias('dummy_asset_alias')])
def task_flow_not_working():
asset_name = 'dummy_asset'
yield Metadata(
asset=Asset(asset_name),
alias=AssetAlias('dummy_asset_alias'),
extra={'nested': {'key': 'value'}}, # fails (nested)
)
return asset_name
@task(outlets=[AssetAlias('dummy_asset_alias')])
def working_with_flat_workaround():
asset_name = 'dummy_asset'
yield Metadata(
asset=Asset(asset_name),
alias=AssetAlias('dummy_asset_alias'),
extra={'nested': json.dumps({'key': 'value'})}, # workaround
)
return asset_name
@task(outlets=[AssetAlias('dummy_asset_alias')])
def operator_style_working_with_nested(outlet_events=None):
asset_name = 'dummy_asset'
outlet_events[AssetAlias('dummy_asset_alias')].add(Asset(asset_name))
outlet_events[AssetAlias('dummy_asset_alias')].extra = {
'nested': {'key': 'value'} # EDIT: the task does not fail, but the extra is not included in the event
}
return asset_name
working()
task_flow_not_working()
working_with_flat_workaround()
operator_style_working_with_nested()
test_metadata_extra_nested()Operating System
Docker compose
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
No response
Anything else?
This may be related to #53474
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct