Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions airflow-core/src/airflow/serialization/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import airflow.serialization.serializers
from airflow.configuration import conf
from airflow.serialization.typing import is_pydantic_model
from airflow.stats import Stats
from airflow.utils.module_loading import import_string, iter_namespace, qualname

Expand All @@ -53,7 +52,6 @@
OLD_SOURCE = "__source"
OLD_DATA = "__var"
OLD_DICT = "dict"
PYDANTIC_MODEL_QUALNAME = "pydantic.main.BaseModel"

DEFAULT_VERSION = 0

Expand Down Expand Up @@ -147,12 +145,6 @@ def serialize(o: object, depth: int = 0) -> U | None:
qn = "builtins.tuple"
classname = qn

if is_pydantic_model(o):
# to match the generic Pydantic serializer and deserializer in _serializers and _deserializers
qn = PYDANTIC_MODEL_QUALNAME
# the actual Pydantic model class to encode
classname = qualname(o)

# if there is a builtin serializer available use that
if qn in _serializers:
data, serialized_classname, version, is_serialized = _serializers[qn].serialize(o)
Expand Down Expand Up @@ -264,10 +256,7 @@ def deserialize(o: T | None, full=True, type_hint: Any = None) -> object:

# registered deserializer
if classname in _deserializers:
return _deserializers[classname].deserialize(cls, version, deserialize(value))
if is_pydantic_model(cls):
if PYDANTIC_MODEL_QUALNAME in _deserializers:
return _deserializers[PYDANTIC_MODEL_QUALNAME].deserialize(cls, version, deserialize(value))
return _deserializers[classname].deserialize(classname, version, deserialize(value))

# class has deserialization function
if hasattr(cls, "deserialize"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
return float(o), name, __version__, True


def deserialize(cls: type, version: int, data: object) -> decimal.Decimal:
def deserialize(classname: str, version: int, data: object) -> decimal.Decimal:
from decimal import Decimal

if version > __version__:
raise TypeError(f"serialized {version} of {qualname(cls)} > {__version__}")
raise TypeError(f"serialized {version} of {classname} > {__version__}")

if cls is not Decimal:
raise TypeError(f"do not know how to deserialize {qualname(cls)}")
if classname != qualname(Decimal):
raise TypeError(f"{classname} != {qualname(Decimal)}")

return Decimal(str(data))
12 changes: 6 additions & 6 deletions airflow-core/src/airflow/serialization/serializers/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
return list(cast("list", o)), qualname(o), __version__, True


def deserialize(cls: type, version: int, data: list) -> tuple | set | frozenset:
def deserialize(classname: str, version: int, data: list) -> tuple | set | frozenset:
if version > __version__:
raise TypeError(f"serialized version {version} is newer than class version {__version__}")
raise TypeError("serialized version is newer than class version")

if cls is tuple:
if classname == qualname(tuple):
return tuple(data)

if cls is set:
if classname == qualname(set):
return set(data)

if cls is frozenset:
if classname == qualname(frozenset):
return frozenset(data)

raise TypeError(f"do not know how to deserialize {qualname(cls)}")
raise TypeError(f"do not know how to deserialize {classname}")


def stringify(classname: str, version: int, data: list) -> str:
Expand Down
12 changes: 6 additions & 6 deletions airflow-core/src/airflow/serialization/serializers/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
return "", "", 0, False


def deserialize(cls: type, version: int, data: dict | str) -> datetime.date | datetime.timedelta:
def deserialize(classname: str, version: int, data: dict | str) -> datetime.date | datetime.timedelta:
import datetime

from pendulum import DateTime
Expand All @@ -86,16 +86,16 @@ def deserialize(cls: type, version: int, data: dict | str) -> datetime.date | da
else None
)

if cls is datetime.datetime and isinstance(data, dict):
if classname == qualname(datetime.datetime) and isinstance(data, dict):
return datetime.datetime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)

if cls is DateTime and isinstance(data, dict):
if classname == qualname(DateTime) and isinstance(data, dict):
return DateTime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)

if cls is datetime.timedelta and isinstance(data, (str, float)):
if classname == qualname(datetime.timedelta) and isinstance(data, (str, float)):
return datetime.timedelta(seconds=float(data))

if cls is datetime.date and isinstance(data, str):
if classname == qualname(datetime.date) and isinstance(data, str):
return datetime.date.fromisoformat(data)

raise TypeError(f"unknown date/time format {qualname(cls)}")
raise TypeError(f"unknown date/time format {classname}")
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
return data, qualname(o), __version__, True


def deserialize(cls: type, version: int, data: dict):
def deserialize(classname: str, version: int, data: dict):
from deltalake.table import DeltaTable

from airflow.models.crypto import get_fernet

if version > __version__:
raise TypeError("serialized version is newer than class version")

if cls is DeltaTable:
if classname == qualname(DeltaTable):
fernet = get_fernet()
properties = {}
for k, v in data["storage_options"].items():
Expand All @@ -76,4 +76,4 @@ def deserialize(cls: type, version: int, data: dict):

return DeltaTable(data["table_uri"], version=data["version"], storage_options=storage_options)

raise TypeError(f"do not know how to deserialize {qualname(cls)}")
raise TypeError(f"do not know how to deserialize {classname}")
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
return data, qualname(o), __version__, True


def deserialize(cls: type, version: int, data: dict):
def deserialize(classname: str, version: int, data: dict):
from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table

Expand All @@ -64,7 +64,7 @@ def deserialize(cls: type, version: int, data: dict):
if version > __version__:
raise TypeError("serialized version is newer than class version")

if cls is Table:
if classname == qualname(Table):
fernet = get_fernet()
properties = {}
for k, v in data["catalog_properties"].items():
Expand All @@ -73,4 +73,4 @@ def deserialize(cls: type, version: int, data: dict):
catalog = load_catalog(data["identifier"][0], **properties)
return catalog.load_table((data["identifier"][1], data["identifier"][2]))

raise TypeError(f"do not know how to deserialize {qualname(cls)}")
raise TypeError(f"do not know how to deserialize {classname}")
10 changes: 4 additions & 6 deletions airflow-core/src/airflow/serialization/serializers/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,11 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
return "", "", 0, False


def deserialize(cls: type, version: int, data: str) -> Any:
def deserialize(classname: str, version: int, data: str) -> Any:
if version > __version__:
raise TypeError("serialized version is newer than class version")

allowed_deserialize_classes = [import_string(classname) for classname in deserializers]
if classname not in deserializers:
raise TypeError(f"unsupported {classname} found for numpy deserialization")

if cls not in allowed_deserialize_classes:
raise TypeError(f"unsupported {qualname(cls)} found for numpy deserialization")

return cls(data)
return import_string(classname)(data)
13 changes: 4 additions & 9 deletions airflow-core/src/airflow/serialization/serializers/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,17 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
return buf.getvalue().hex().decode("utf-8"), qualname(o), __version__, True


def deserialize(cls: type, version: int, data: object) -> pd.DataFrame:
def deserialize(classname: str, version: int, data: object) -> pd.DataFrame:
if version > __version__:
raise TypeError(f"serialized {version} of {qualname(cls)} > {__version__}")
raise TypeError(f"serialized {version} of {classname} > {__version__}")

import pandas as pd

if cls is not pd.DataFrame:
raise TypeError(f"do not know how to deserialize {qualname(cls)}")
from pyarrow import parquet as pq

if not isinstance(data, str):
raise TypeError(f"serialized {qualname(cls)} has wrong data type {type(data)}")
raise TypeError(f"serialized {classname} has wrong data type {type(data)}")

from io import BytesIO

from pyarrow import parquet as pq

with BytesIO(bytes.fromhex(data)) as buf:
df = pq.read_table(buf).to_pandas()

Expand Down
79 changes: 0 additions & 79 deletions airflow-core/src/airflow/serialization/serializers/pydantic.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
return "", "", 0, False


def deserialize(cls: type, version: int, data: object) -> Any:
def deserialize(classname: str, version: int, data: object) -> Any:
from airflow.utils.timezone import parse_timezone

if not isinstance(data, (str, int)):
raise TypeError(f"{data} is not of type int or str but of {type(data)}")

if version > __version__:
raise TypeError(f"serialized {version} of {qualname(cls)} > {__version__}")
raise TypeError(f"serialized {version} of {classname} > {__version__}")

if qualname(cls) == "backports.zoneinfo.ZoneInfo" and isinstance(data, str):
if classname == "backports.zoneinfo.ZoneInfo" and isinstance(data, str):
from zoneinfo import ZoneInfo

return ZoneInfo(data)
Expand Down
32 changes: 0 additions & 32 deletions airflow-core/src/airflow/serialization/typing.py

This file was deleted.

Loading
Loading