Skip to content

Conversation

@astro-anand
Copy link
Contributor

@astro-anand astro-anand commented Jul 30, 2025

Updated _msgpack_enc_hook to support sending subclasses of builtin types

closes: #53474


@astro-anand
Copy link
Contributor Author

@ashb let me know if this is what you had in mind

@RNHTTR
Copy link
Contributor

RNHTTR commented Jul 30, 2025

I wonder if the user should be responsible for casting to a supported built-in. In the case of the issue (i.e. using np.float64), what if they expect and want to use float64 downstream when accessing the extra? I guess that's not a big deal since extra here should just be metadata.

If we proceed with this, I think we need to update docs to highlight that types that can be forced into a builtin will be forced into a builtin

@amoghrajesh
Copy link
Contributor

It actually seems that the asset metadata never reaches BaseSerialization.serialize() - it fails much earlier during pydantic's default JSON serialization when sending the HTTP API request. We probably should send it through the serialization layer similar to xcoms.

@astro-anand
Copy link
Contributor Author

@RNHTTR @amoghrajesh You're both totally right. I think we should ensure that the serializer is called and ensure that on the other side of things, the deserialize method is called correctly too. Will update and test today

@astro-anand
Copy link
Contributor Author

astro-anand commented Jul 31, 2025

I've tried updating the _serialize_outlet_events method to use the serialize function defined in serde.py, but I'm still getting the same error.

def _serialize_outlet_events(events: OutletEventAccessorsProtocol) -> Iterator[dict[str, Any]]:
    from airflow.serialization.serde import serialize

    if TYPE_CHECKING:
        assert isinstance(events, OutletEventAccessors)
    # We just collect everything the user recorded in the accessors.
    # Further filtering will be done in the API server.
    for key, accessor in events._dict.items():
        if isinstance(key, AssetUniqueKey):
            yield {"dest_asset_key": serialize(key), "extra": serialize(accessor.extra)}
        for alias_event in accessor.asset_alias_events:
            yield attrs.asdict(alias_event)

@amoghrajesh
Copy link
Contributor

You have found a good bug.

import numpy as np
npf = np.float64(1.23412415)
isinstance(npf, _primitives)
Out[5]: True
npi = np.int64(290)
isinstance(npi, _primitives)
Out[7]: False

It never reaches the np serializer. Feel free to create an issue for that.

@astro-anand
Copy link
Contributor Author

Issue created and fixed here. Once merged, will update the outlet events with serialization & deserialization. Food for thought, do you think that asset metadata should respect the configured Xcom backend? Or is it fine that this will all be persisted in the metadata db?

ToSupervisor,
)
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
from airflow.serialization.serde import deserialize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not allowed to use non-sdk modules in sdk code. Why does this need to use deserialize (and serialize below)?

Copy link
Contributor Author

@astro-anand astro-anand Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right. This PR is not ready yet. I was trying to test if we could pass the Asset metadata through the serialization layer (like XComs) per @amoghrajesh's recommendation and this implementation was just meant to be a quick check if it would work. What do you think the best way to reuse existing serializers would be?

@astro-anand astro-anand marked this pull request as draft September 29, 2025 14:23
@astro-anand astro-anand force-pushed the asset-metadata-serialization branch from aa355a3 to 9fea8e0 Compare September 29, 2025 14:47
@astro-anand
Copy link
Contributor Author

astro-anand commented Sep 29, 2025

@uranusjr - I was roughly trying to replicate the xcom serialize_value functionality here. Won't this also be problematic since non-sdk code can't be used in the task sdk?

from airflow.serialization.serde import serialize

@uranusjr
Copy link
Member

There was a decision some time ago to move serde to SDK so it should be fine to use it in execution_time now. cc @amoghrajesh to confirm.

@amoghrajesh
Copy link
Contributor

@astro-anand serde has now been moved into task sdk: https://github.com/apache/airflow/tree/main/task-sdk/src/airflow/sdk/serde. You can rebase and import from there.

@kaxil kaxil added stale Stale PRs per the .github/workflows/stale.yml policy file pending-response labels Jan 15, 2026
@github-actions
Copy link

This pull request has been closed because the author has not responded to a request for more information.

@github-actions github-actions bot closed this Jan 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk pending-response stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Error when emitting Asset Metadata if value in extra is of type np.float64

5 participants