From 45ab19db2750f78faff7656c638346eaf99a171f Mon Sep 17 00:00:00 2001 From: nailo2c Date: Sun, 1 Jun 2025 11:26:53 -0700 Subject: [PATCH 01/11] improve numpy serializer test coverage --- .../serialization/serializers/test_serializers.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/airflow-core/tests/unit/serialization/serializers/test_serializers.py b/airflow-core/tests/unit/serialization/serializers/test_serializers.py index 5b8d95f35388e..aff4a4be0d9e7 100644 --- a/airflow-core/tests/unit/serialization/serializers/test_serializers.py +++ b/airflow-core/tests/unit/serialization/serializers/test_serializers.py @@ -167,11 +167,25 @@ def test_encode_k8s_v1pod(self): } def test_numpy(self): + from airflow.serialization.serializers.numpy import deserialize, serialize + i = np.int16(10) e = serialize(i) d = deserialize(e) assert i == d + assert serialize(np.bool_(False)) == (True, "numpy.bool_", 1, True) + assert serialize(np.float32(3.14)) == (float(np.float32(3.14)), "numpy.float32", 1, True) + assert serialize(np.array([1, 2, 3])) == ("", "", 0, False) + + with pytest.raises(TypeError) as ctx: + deserialize("numpy.int32", 999, 123) + assert "serialized version is newer" in str(ctx.value) + + with pytest.raises(TypeError) as ctx: + deserialize("numpy.float32", 1, 123) + assert "unsupported numpy.float32" in str(ctx.value) + def test_params(self): i = ParamsDict({"x": Param(default="value", description="there is a value", key="test")}) e = serialize(i) From c56efe2df980cf36d9e3c43e5170b80a2d991113 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Sun, 1 Jun 2025 14:20:45 -0700 Subject: [PATCH 02/11] improve pandas, timezone serializer test coverage --- .../serializers/test_serializers.py | 51 ++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/serialization/serializers/test_serializers.py b/airflow-core/tests/unit/serialization/serializers/test_serializers.py index aff4a4be0d9e7..8c57eb3c51286 100644 --- a/airflow-core/tests/unit/serialization/serializers/test_serializers.py +++ b/airflow-core/tests/unit/serialization/serializers/test_serializers.py @@ -39,6 +39,15 @@ PENDULUM3 = version.parse(metadata.version("pendulum")).major == 3 +class CustomTZ(datetime.tzinfo): + name = "My/Custom" + + +class NoNameTZ(datetime.tzinfo): + def utcoffset(self, dt): + return datetime.timedelta(hours=2) + + @skip_if_force_lowest_dependencies_marker class TestSerializers: def test_datetime(self): @@ -167,13 +176,14 @@ def test_encode_k8s_v1pod(self): } def test_numpy(self): - from airflow.serialization.serializers.numpy import deserialize, serialize - i = np.int16(10) e = serialize(i) d = deserialize(e) assert i == d + def test_numpy_serializers(self): + from airflow.serialization.serializers.numpy import deserialize, serialize + assert serialize(np.bool_(False)) == (True, "numpy.bool_", 1, True) assert serialize(np.float32(3.14)) == (float(np.float32(3.14)), "numpy.float32", 1, True) assert serialize(np.array([1, 2, 3])) == ("", "", 0, False) @@ -200,6 +210,19 @@ def test_pandas(self): d = deserialize(e) assert i.equals(d) + def test_pandas_serializers(self): + from airflow.serialization.serializers.pandas import deserialize, serialize + + assert serialize(123) == ("", "", 0, False) + + with pytest.raises(TypeError) as ctx: + deserialize("pandas.core.frame.DataFrame", 999, "") + assert "serialized 999 of pandas.core.frame.DataFrame > 1" in str(ctx.value) + + with pytest.raises(TypeError) as ctx: + deserialize("pandas.core.frame.DataFrame", 1, 123) + assert "serialized pandas.core.frame.DataFrame has wrong data type " in str(ctx.value) + def test_iceberg(self): pytest.importorskip("pyiceberg", minversion="2.0.0") from pyiceberg.catalog import Catalog @@ -400,3 +423,27 @@ def test_pendulum_2_to_3(self, ser_value, expected): def test_pendulum_3_to_2(self, ser_value, expected): """Test deserialize objects in pendulum 2 which serialised in pendulum 3.""" assert deserialize(ser_value) == expected + + def test_timezone(self): + import pytz + + from airflow.serialization.serializers.timezone import _get_tzinfo_name, deserialize, serialize + + assert serialize(FixedTimezone(0)) == ("UTC", "pendulum.tz.timezone.FixedTimezone", 1, True) + assert serialize(NoNameTZ()) == ("", "", 0, False) + + with pytest.raises(TypeError) as ctx: + deserialize("pendulum.tz.timezone.FixedTimezone", 1, 1.23) + assert "is not of type int or str" in str(ctx.value) + + with pytest.raises(TypeError) as ctx: + deserialize("pendulum.tz.timezone.FixedTimezone", 999, "UTC") + assert "serialized 999 of pendulum.tz.timezone.FixedTimezone > 1" in str(ctx.value) + + zi = deserialize("backports.zoneinfo.ZoneInfo", 1, "Asia/Taipei") + assert isinstance(zi, ZoneInfo) + assert zi.key == "Asia/Taipei" + + assert _get_tzinfo_name(None) is None + assert _get_tzinfo_name(CustomTZ()) == "My/Custom" + assert _get_tzinfo_name(pytz.timezone("Asia/Taipei")) == "Asia/Taipei" From aa7b28bf326b4bccc7c6a0a0bb81d98a6edadae5 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Sun, 1 Jun 2025 15:20:33 -0700 Subject: [PATCH 03/11] improve k8s serializer test coverage --- .../serialization/serializers/test_serializers.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/serialization/serializers/test_serializers.py b/airflow-core/tests/unit/serialization/serializers/test_serializers.py index 8c57eb3c51286..8663fcadb98bc 100644 --- a/airflow-core/tests/unit/serialization/serializers/test_serializers.py +++ b/airflow-core/tests/unit/serialization/serializers/test_serializers.py @@ -27,6 +27,7 @@ import pendulum.tz import pytest from dateutil.tz import tzutc +from kubernetes.client import models as k8s from packaging import version from pendulum import DateTime from pendulum.tz.timezone import FixedTimezone, Timezone @@ -154,8 +155,6 @@ def test_encode_decimal(self, expr, expected): assert deserialize(serialize(decimal.Decimal(expr))) == decimal.Decimal(expected) def test_encode_k8s_v1pod(self): - from kubernetes.client import models as k8s - pod = k8s.V1Pod( metadata=k8s.V1ObjectMeta( name="foo", @@ -276,6 +275,15 @@ def test_deltalake(selfa): assert i._storage_options == d._storage_options assert d._storage_options is None + def test_kubernetes(self, monkeypatch): + from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator + from airflow.serialization.serializers.kubernetes import serialize + + pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo")) + monkeypatch.setattr(PodGenerator, "serialize_pod", lambda o: (_ for _ in ()).throw(Exception("fail"))) + assert serialize(pod) == ("", "", 0, False) + assert serialize(123) == ("", "", 0, False) + @pytest.mark.skipif(not PENDULUM3, reason="Test case for pendulum~=3") @pytest.mark.parametrize( "ser_value, expected", From 93cf8500b90cf777b249b9dd48be35610d6a5a43 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Mon, 2 Jun 2025 10:53:17 -0700 Subject: [PATCH 04/11] improve serialized_objects test coverage --- .../serialization/test_serialized_objects.py | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 6f287289673d0..232968c2f39e8 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -49,6 +49,7 @@ from airflow.sdk.execution_time.context import OutletEventAccessor, OutletEventAccessors from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding from airflow.serialization.serialized_objects import BaseSerialization, LazyDeserializedDAG, SerializedDAG +from airflow.timetables.base import DataInterval from airflow.triggers.base import BaseTrigger from airflow.utils import timezone from airflow.utils.db import LazySelectSequence @@ -510,3 +511,56 @@ def test_get_task_assets(): ("c", asset1), ("d", asset1), ] + + +def test_get_run_data_interval(): + lazy_serialized_dag1 = LazyDeserializedDAG(data={"dag": {"dag_id": "dag1"}}) + with pytest.raises(ValueError, match="different DAGs"): + lazy_serialized_dag1.get_run_data_interval(DAG_RUN) + + lazy_serialized_dag2 = LazyDeserializedDAG(data={"dag": {"dag_id": "test_dag_id"}}) + with pytest.raises(ValueError, match="Cannot calculate data interval"): + lazy_serialized_dag2.get_run_data_interval(DAG_RUN) + + DAG_RUN.data_interval_start = datetime(2025, 1, 1, 0, 0) + DAG_RUN.data_interval_end = datetime(2025, 1, 2, 0, 0) + interval = lazy_serialized_dag2.get_run_data_interval(DAG_RUN) + assert isinstance(interval, DataInterval) + + +def test_hash_property(): + from airflow.models.serialized_dag import SerializedDagModel + + data = {"dag": {"dag_id": "dag1"}} + lazy_serialized_dag = LazyDeserializedDAG(data=data) + assert lazy_serialized_dag.hash == SerializedDagModel.hash(data) + + +def test_decode_asset_condition(): + from airflow.sdk.definitions.asset import AssetAlias, AssetAll, AssetAny, AssetRef + from airflow.serialization.serialized_objects import decode_asset_condition + + asset_dict = { + "__type": DAT.ASSET, + "name": "test_asset", + "uri": "test://asset-uri", + "group": "test-group", + "extra": {}, + } + assert isinstance(decode_asset_condition(asset_dict), Asset) + + asset_all_dict = {"__type": DAT.ASSET_ALL, "objects": [asset_dict, asset_dict]} + assert isinstance(decode_asset_condition(asset_all_dict), AssetAll) + + asset_any_dict = {"__type": DAT.ASSET_ANY, "objects": [asset_dict]} + assert isinstance(decode_asset_condition(asset_any_dict), AssetAny) + + asset_alias_dict = {"__type": DAT.ASSET_ALIAS, "name": "test_alias", "group": "test-alias-group"} + assert isinstance(decode_asset_condition(asset_alias_dict), AssetAlias) + + asset_ref_dict = {"__type": DAT.ASSET_REF, "name": "test_ref"} + assert isinstance(decode_asset_condition(asset_ref_dict), AssetRef) + + bad = {"__type": "UNKNOWN_TYPE"} + with pytest.raises(ValueError, match="deserialization not implemented for DAT 'UNKNOWN_TYPE'"): + decode_asset_condition(bad) From 868633b0af5426c4f6c8f9f23e3a29fe7f80c3c4 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Wed, 4 Jun 2025 19:52:47 -0700 Subject: [PATCH 05/11] improve serialized_objects test coverage #2 --- .../serializers/test_serializers.py | 81 ++++++++++++++++++- .../serialization/test_serialized_objects.py | 63 ++++++++++++++- 2 files changed, 142 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/serialization/serializers/test_serializers.py b/airflow-core/tests/unit/serialization/serializers/test_serializers.py index 8663fcadb98bc..b8e0caabac5ab 100644 --- a/airflow-core/tests/unit/serialization/serializers/test_serializers.py +++ b/airflow-core/tests/unit/serialization/serializers/test_serializers.py @@ -18,6 +18,8 @@ import datetime import decimal +import sys +from decimal import Decimal from importlib import metadata from unittest.mock import patch from zoneinfo import ZoneInfo @@ -34,6 +36,7 @@ from airflow.sdk.definitions.param import Param, ParamsDict from airflow.serialization.serde import DATA, deserialize, serialize +from airflow.utils.module_loading import qualname from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker @@ -174,6 +177,19 @@ def test_encode_k8s_v1pod(self): "spec": {"containers": [{"image": "bar", "name": "foo"}]}, } + def test_bignum(self): + from airflow.serialization.serializers.bignum import deserialize, serialize + + assert serialize(12345) == ("", "", 0, False) + + with pytest.raises(TypeError) as ctx: + deserialize(qualname(Decimal), 999, "0") + assert f"serialized 999 of {qualname(Decimal)}" in str(ctx.value) + + with pytest.raises(TypeError) as ctx: + deserialize("wrong.ClassName", 1, "0") + assert f"wrong.ClassName != {qualname(Decimal)}" in str(ctx.value) + def test_numpy(self): i = np.int16(10) e = serialize(i) @@ -248,7 +264,7 @@ def test_iceberg(self): mock_load_catalog.assert_called_with("catalog", uri=uri) mock_load_table.assert_called_with((identifier[1], identifier[2])) - def test_deltalake(selfa): + def test_deltalake(self): deltalake = pytest.importorskip("deltalake") with ( @@ -275,6 +291,19 @@ def test_deltalake(selfa): assert i._storage_options == d._storage_options assert d._storage_options is None + def test_deltalake_serialize_deserialize(self): + from airflow.serialization.serializers.deltalake import deserialize, serialize + + assert serialize(object()) == ("", "", 0, False) + + with pytest.raises(TypeError) as e: + deserialize("deltalake.table.DeltaTable", 999, {}) + assert "serialized version is newer than class version" in str(e.value) + + with pytest.raises(TypeError) as e2: + deserialize("not_a_real_class", 1, {}) + assert "do not know how to deserialize" in str(e2.value) + def test_kubernetes(self, monkeypatch): from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.serialization.serializers.kubernetes import serialize @@ -455,3 +484,53 @@ def test_timezone(self): assert _get_tzinfo_name(None) is None assert _get_tzinfo_name(CustomTZ()) == "My/Custom" assert _get_tzinfo_name(pytz.timezone("Asia/Taipei")) == "Asia/Taipei" + + def test_json_schema(self, monkeypatch): + from airflow.exceptions import AirflowException + from airflow.serialization.json_schema import load_dag_schema_dict + + monkeypatch.setattr( + "airflow.serialization.json_schema.pkgutil.get_data", lambda __name__, fname: None + ) + + with pytest.raises(AirflowException) as ctx: + load_dag_schema_dict() + assert "Schema file schema.json does not exists" in str(ctx.value) + + def test_builtin(self): + from airflow.serialization.serializers import builtin + + result = builtin.deserialize(qualname(frozenset), 1, [13, 14]) + assert isinstance(result, frozenset) + assert result == frozenset([13, 14]) + + with pytest.raises(TypeError) as ctx: + builtin.deserialize(qualname(tuple), 999, [1, 2]) + assert "serialized version is newer than class version" in str(ctx.value) + + with pytest.raises(TypeError) as ctx: + builtin.deserialize("builtins.list", 1, [1, 2]) + assert "do not know how to deserialize" in str(ctx.value) + + with pytest.raises(TypeError) as ctx: + builtin.stringify("builtins.list", 1, [1, 2]) + assert "do not know how to stringify" in str(ctx.value) + + def test_serde(self): + from airflow.serialization.serde import CLASSNAME, DATA, VERSION, _stringify, decode, deserialize + + with pytest.raises(ValueError, match="cannot decode"): + decode({"__classname__": 123, "__version__": 1, "__data__": {}}) + + with pytest.raises(RecursionError, match="maximum recursion depth reached for serialization"): + serialize(object(), depth=sys.getrecursionlimit() - 1) + + fake = {"a": 1, "b": 2, "__version__": 1} + result = deserialize(fake, type_hint=dict, full=False) + assert result == "builtins.dict@version=0(a=1,b=2,__version__=1)" + + with pytest.raises(TypeError, match="classname cannot be empty"): + deserialize({CLASSNAME: "", VERSION: 1, DATA: {}}) + + assert _stringify("dummy", 1, 123) == "dummy@version=1(123)" + assert _stringify("dummy", 1, [1]) == "dummy@version=1([,1,])" diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 232968c2f39e8..4a5cac28500ee 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -25,8 +25,9 @@ import pytest from dateutil import relativedelta from kubernetes.client import models as k8s -from pendulum.tz.timezone import Timezone +from pendulum.tz.timezone import FixedTimezone, Timezone +from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.exceptions import ( AirflowException, AirflowFailException, @@ -37,6 +38,7 @@ from airflow.models.connection import Connection from airflow.models.dag import DAG from airflow.models.dagrun import DagRun +from airflow.models.deadline import DeadlineAlert, DeadlineReference from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance from airflow.models.xcom_arg import XComArg from airflow.providers.standard.operators.bash import BashOperator @@ -117,6 +119,26 @@ class Test: BaseSerialization.serialize(obj, strict=True) # now raises +def test_validate_schema(): + from airflow.serialization.serialized_objects import BaseSerialization + + with pytest.raises(AirflowException, match="BaseSerialization is not set"): + BaseSerialization.validate_schema({"any": "thing"}) + + BaseSerialization._json_schema = object() + with pytest.raises(TypeError, match="Invalid type: Only dict and str are supported"): + BaseSerialization.validate_schema(123) + + class Test: + def validate(self, obj): + self.obj = obj + + t = Test() + BaseSerialization._json_schema = t + BaseSerialization.validate_schema('{"foo": "bar"}') + assert t.obj == {"foo": "bar"} + + TI = TaskInstance( task=EmptyOperator(task_id="test-task"), run_id="fake_run", @@ -279,6 +301,37 @@ def __len__(self) -> int: DAT.CONNECTION, lambda a, b: a.get_uri() == b.get_uri(), ), + ( + TaskCallbackRequest( + filepath="filepath", + ti=TI, + bundle_name="testing", + bundle_version=None, + ), + DAT.TASK_CALLBACK_REQUEST, + lambda a, b: a.ti == b.ti, + ), + ( + DagCallbackRequest( + filepath="filepath", + dag_id="fake_dag", + run_id="fake_run", + bundle_name="testing", + bundle_version=None, + ), + DAT.DAG_CALLBACK_REQUEST, + lambda a, b: a.dag_id == b.dag_id, + ), + (Asset.ref(name="test"), DAT.ASSET_REF, lambda a, b: a.name == b.name), + ( + DeadlineAlert( + reference=DeadlineReference.DAGRUN_LOGICAL_DATE, + interval=timedelta(), + callback="fake_callable", + ), + None, + None, + ), ( create_outlet_event_accessors( Asset(uri="test", name="test", group="test-group"), {"key": "value"}, [] @@ -564,3 +617,11 @@ def test_decode_asset_condition(): bad = {"__type": "UNKNOWN_TYPE"} with pytest.raises(ValueError, match="deserialization not implemented for DAT 'UNKNOWN_TYPE'"): decode_asset_condition(bad) + + +def test_encode_timezone(): + from airflow.serialization.serialized_objects import encode_timezone + + assert encode_timezone(FixedTimezone(0)) == "UTC" + with pytest.raises(ValueError): + encode_timezone(object()) From 4118e3f6f4ce48cec84b641a235a855dffd23616 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Wed, 4 Jun 2025 21:54:32 -0700 Subject: [PATCH 06/11] fix DeadlineReference import path issue --- .../tests/unit/serialization/test_serialized_objects.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 4a5cac28500ee..ad4e24eb466d1 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -20,6 +20,7 @@ import json from collections.abc import Iterator from datetime import datetime, timedelta +from typing import cast import pendulum import pytest @@ -38,7 +39,7 @@ from airflow.models.connection import Connection from airflow.models.dag import DAG from airflow.models.dagrun import DagRun -from airflow.models.deadline import DeadlineAlert, DeadlineReference +from airflow.models.deadline import DeadlineAlert from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance from airflow.models.xcom_arg import XComArg from airflow.providers.standard.operators.bash import BashOperator @@ -46,6 +47,7 @@ from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.triggers.file import FileDeleteTrigger from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent, AssetUniqueKey, AssetWatcher +from airflow.sdk.definitions.deadline import DeadlineReference from airflow.sdk.definitions.decorators import task from airflow.sdk.definitions.param import Param from airflow.sdk.execution_time.context import OutletEventAccessor, OutletEventAccessors @@ -325,7 +327,7 @@ def __len__(self) -> int: (Asset.ref(name="test"), DAT.ASSET_REF, lambda a, b: a.name == b.name), ( DeadlineAlert( - reference=DeadlineReference.DAGRUN_LOGICAL_DATE, + reference=cast("DeadlineReference", DeadlineReference.DAGRUN_LOGICAL_DATE), interval=timedelta(), callback="fake_callable", ), From 257c62bffd0b6aeba3d33cfc99c53eed374588ff Mon Sep 17 00:00:00 2001 From: nailo2c Date: Sun, 8 Jun 2025 15:35:31 -0700 Subject: [PATCH 07/11] import DeadlineAlert from airflow.sdk.definitions.deadline --- .../tests/unit/serialization/test_serialized_objects.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index ad4e24eb466d1..c7578a50dca27 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -20,7 +20,6 @@ import json from collections.abc import Iterator from datetime import datetime, timedelta -from typing import cast import pendulum import pytest @@ -39,7 +38,6 @@ from airflow.models.connection import Connection from airflow.models.dag import DAG from airflow.models.dagrun import DagRun -from airflow.models.deadline import DeadlineAlert from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance from airflow.models.xcom_arg import XComArg from airflow.providers.standard.operators.bash import BashOperator @@ -47,7 +45,7 @@ from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.triggers.file import FileDeleteTrigger from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent, AssetUniqueKey, AssetWatcher -from airflow.sdk.definitions.deadline import DeadlineReference +from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference from airflow.sdk.definitions.decorators import task from airflow.sdk.definitions.param import Param from airflow.sdk.execution_time.context import OutletEventAccessor, OutletEventAccessors @@ -327,7 +325,7 @@ def __len__(self) -> int: (Asset.ref(name="test"), DAT.ASSET_REF, lambda a, b: a.name == b.name), ( DeadlineAlert( - reference=cast("DeadlineReference", DeadlineReference.DAGRUN_LOGICAL_DATE), + reference=DeadlineReference.DAGRUN_LOGICAL_DATE, interval=timedelta(), callback="fake_callable", ), From a947ec8865cc3717db26facb76ae7e7e1c5a9b0b Mon Sep 17 00:00:00 2001 From: nailo2c Date: Sun, 8 Jun 2025 21:44:05 -0700 Subject: [PATCH 08/11] refactor test_serializers.py --- .../serializers/test_serializers.py | 211 ++++++++++++------ 1 file changed, 144 insertions(+), 67 deletions(-) diff --git a/airflow-core/tests/unit/serialization/serializers/test_serializers.py b/airflow-core/tests/unit/serialization/serializers/test_serializers.py index b8e0caabac5ab..8fb7e256940dc 100644 --- a/airflow-core/tests/unit/serialization/serializers/test_serializers.py +++ b/airflow-core/tests/unit/serialization/serializers/test_serializers.py @@ -35,7 +35,8 @@ from pendulum.tz.timezone import FixedTimezone, Timezone from airflow.sdk.definitions.param import Param, ParamsDict -from airflow.serialization.serde import DATA, deserialize, serialize +from airflow.serialization.serde import CLASSNAME, DATA, VERSION, _stringify, decode, deserialize, serialize +from airflow.serialization.serializers import builtin from airflow.utils.module_loading import qualname from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker @@ -46,6 +47,15 @@ class CustomTZ(datetime.tzinfo): name = "My/Custom" + def utcoffset(self, dt: datetime.datetime | None) -> datetime.timedelta: + return datetime.timedelta(hours=2) + + def dst(self, dt: datetime.datetime | None) -> datetime.timedelta | None: + return datetime.timedelta(0) + + def tzname(self, dt: datetime.datetime | None) -> str | None: + return self.name + class NoNameTZ(datetime.tzinfo): def utcoffset(self, dt): @@ -178,17 +188,32 @@ def test_encode_k8s_v1pod(self): } def test_bignum(self): - from airflow.serialization.serializers.bignum import deserialize, serialize + from airflow.serialization.serializers.bignum import serialize assert serialize(12345) == ("", "", 0, False) - with pytest.raises(TypeError) as ctx: - deserialize(qualname(Decimal), 999, "0") - assert f"serialized 999 of {qualname(Decimal)}" in str(ctx.value) + @pytest.mark.parametrize( + ("klass", "version", "payload", "msg"), + [ + ( + qualname(Decimal), + 999, + "0", + rf"serialized 999 of {qualname(Decimal)}", # newer version + ), + ( + "wrong.ClassName", + 1, + "0", + r"wrong\.ClassName != .*Decimal", # wrong classname + ), + ], + ) + def test_bignum_deserialize_errors(self, klass, version, payload, msg): + from airflow.serialization.serializers.bignum import deserialize - with pytest.raises(TypeError) as ctx: - deserialize("wrong.ClassName", 1, "0") - assert f"wrong.ClassName != {qualname(Decimal)}" in str(ctx.value) + with pytest.raises(TypeError, match=msg): + deserialize(klass, version, payload) def test_numpy(self): i = np.int16(10) @@ -197,19 +222,24 @@ def test_numpy(self): assert i == d def test_numpy_serializers(self): - from airflow.serialization.serializers.numpy import deserialize, serialize + from airflow.serialization.serializers.numpy import serialize assert serialize(np.bool_(False)) == (True, "numpy.bool_", 1, True) assert serialize(np.float32(3.14)) == (float(np.float32(3.14)), "numpy.float32", 1, True) assert serialize(np.array([1, 2, 3])) == ("", "", 0, False) - with pytest.raises(TypeError) as ctx: - deserialize("numpy.int32", 999, 123) - assert "serialized version is newer" in str(ctx.value) + @pytest.mark.parametrize( + ("klass", "ver", "value", "msg"), + [ + ("numpy.int32", 999, 123, r"serialized version is newer"), + ("numpy.float32", 1, 123, r"unsupported numpy\.float32"), + ], + ) + def test_numpy_deserialize_errors(self, klass, ver, value, msg): + from airflow.serialization.serializers.numpy import deserialize - with pytest.raises(TypeError) as ctx: - deserialize("numpy.float32", 1, 123) - assert "unsupported numpy.float32" in str(ctx.value) + with pytest.raises(TypeError, match=msg): + deserialize(klass, ver, value) def test_params(self): i = ParamsDict({"x": Param(default="value", description="there is a value", key="test")}) @@ -226,17 +256,22 @@ def test_pandas(self): assert i.equals(d) def test_pandas_serializers(self): - from airflow.serialization.serializers.pandas import deserialize, serialize + from airflow.serialization.serializers.pandas import serialize assert serialize(123) == ("", "", 0, False) - with pytest.raises(TypeError) as ctx: - deserialize("pandas.core.frame.DataFrame", 999, "") - assert "serialized 999 of pandas.core.frame.DataFrame > 1" in str(ctx.value) + @pytest.mark.parametrize( + ("version", "data", "msg"), + [ + (999, "", r"serialized 999 .* > 1"), # version too new + (1, 123, r"wrong data type .*"), # bad payload type + ], + ) + def test_pandas_deserialize_errors(self, version, data, msg): + from airflow.serialization.serializers.pandas import deserialize - with pytest.raises(TypeError) as ctx: - deserialize("pandas.core.frame.DataFrame", 1, 123) - assert "serialized pandas.core.frame.DataFrame has wrong data type " in str(ctx.value) + with pytest.raises(TypeError, match=msg): + deserialize("pandas.core.frame.DataFrame", version, data) def test_iceberg(self): pytest.importorskip("pyiceberg", minversion="2.0.0") @@ -292,17 +327,32 @@ def test_deltalake(self): assert d._storage_options is None def test_deltalake_serialize_deserialize(self): - from airflow.serialization.serializers.deltalake import deserialize, serialize + from airflow.serialization.serializers.deltalake import serialize assert serialize(object()) == ("", "", 0, False) - with pytest.raises(TypeError) as e: - deserialize("deltalake.table.DeltaTable", 999, {}) - assert "serialized version is newer than class version" in str(e.value) + @pytest.mark.parametrize( + ("klass", "version", "payload", "msg"), + [ + ( + "deltalake.table.DeltaTable", + 999, + {}, + r"serialized version is newer than class version", + ), + ( + "not_a_real_class", + 1, + {}, + r"do not know how to deserialize", + ), + ], + ) + def test_deltalake_deserialize_errors(self, klass, version, payload, msg): + from airflow.serialization.serializers.deltalake import deserialize - with pytest.raises(TypeError) as e2: - deserialize("not_a_real_class", 1, {}) - assert "do not know how to deserialize" in str(e2.value) + with pytest.raises(TypeError, match=msg): + deserialize(klass, version, payload) def test_kubernetes(self, monkeypatch): from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator @@ -462,28 +512,43 @@ def test_pendulum_3_to_2(self, ser_value, expected): assert deserialize(ser_value) == expected def test_timezone(self): - import pytz - - from airflow.serialization.serializers.timezone import _get_tzinfo_name, deserialize, serialize + from airflow.serialization.serializers.timezone import serialize assert serialize(FixedTimezone(0)) == ("UTC", "pendulum.tz.timezone.FixedTimezone", 1, True) assert serialize(NoNameTZ()) == ("", "", 0, False) - with pytest.raises(TypeError) as ctx: - deserialize("pendulum.tz.timezone.FixedTimezone", 1, 1.23) - assert "is not of type int or str" in str(ctx.value) - - with pytest.raises(TypeError) as ctx: - deserialize("pendulum.tz.timezone.FixedTimezone", 999, "UTC") - assert "serialized 999 of pendulum.tz.timezone.FixedTimezone > 1" in str(ctx.value) + def test_timezone_deserialize_zoneinfo(self): + from airflow.serialization.serializers.timezone import deserialize zi = deserialize("backports.zoneinfo.ZoneInfo", 1, "Asia/Taipei") assert isinstance(zi, ZoneInfo) assert zi.key == "Asia/Taipei" - assert _get_tzinfo_name(None) is None - assert _get_tzinfo_name(CustomTZ()) == "My/Custom" - assert _get_tzinfo_name(pytz.timezone("Asia/Taipei")) == "Asia/Taipei" + @pytest.mark.parametrize( + "klass, version, data, msg", + [ + ("pendulum.tz.timezone.FixedTimezone", 1, 1.23, "is not of type int or str"), + ("pendulum.tz.timezone.FixedTimezone", 999, "UTC", "serialized 999 .* > 1"), + ], + ) + def test_timezone_deserialize_errors(self, klass, version, data, msg): + from airflow.serialization.serializers.timezone import deserialize + + with pytest.raises(TypeError, match=msg): + deserialize(klass, version, data) + + @pytest.mark.parametrize( + "tz_obj, expected", + [ + (None, None), + (CustomTZ(), "My/Custom"), + (ZoneInfo("Asia/Taipei"), "Asia/Taipei"), + ], + ) + def test_timezone_get_tzinfo_name(self, tz_obj, expected): + from airflow.serialization.serializers.timezone import _get_tzinfo_name + + assert _get_tzinfo_name(tz_obj) == expected def test_json_schema(self, monkeypatch): from airflow.exceptions import AirflowException @@ -497,40 +562,52 @@ def test_json_schema(self, monkeypatch): load_dag_schema_dict() assert "Schema file schema.json does not exists" in str(ctx.value) - def test_builtin(self): - from airflow.serialization.serializers import builtin - - result = builtin.deserialize(qualname(frozenset), 1, [13, 14]) - assert isinstance(result, frozenset) - assert result == frozenset([13, 14]) + def test_builtin_deserialize_frozenset(self): + res = builtin.deserialize(qualname(frozenset), 1, [13, 14]) + assert isinstance(res, frozenset) + assert res == frozenset({13, 14}) - with pytest.raises(TypeError) as ctx: + def test_builtin_deserialize_version_too_new(self): + with pytest.raises(TypeError, match="serialized version is newer than class version"): builtin.deserialize(qualname(tuple), 999, [1, 2]) - assert "serialized version is newer than class version" in str(ctx.value) - - with pytest.raises(TypeError) as ctx: - builtin.deserialize("builtins.list", 1, [1, 2]) - assert "do not know how to deserialize" in str(ctx.value) - with pytest.raises(TypeError) as ctx: - builtin.stringify("builtins.list", 1, [1, 2]) - assert "do not know how to stringify" in str(ctx.value) - - def test_serde(self): - from airflow.serialization.serde import CLASSNAME, DATA, VERSION, _stringify, decode, deserialize + @pytest.mark.parametrize( + "func, msg", + [ + (builtin.deserialize, r"do not know how to deserialize"), + (builtin.stringify, r"do not know how to stringify"), + ], + ) + def test_builtin_unknown_type_errors(self, func, msg): + with pytest.raises(TypeError, match=msg): + func("builtins.list", 1, [1, 2]) + def test_serde_decode_type_error(self): + bad = {CLASSNAME: 123, VERSION: 1, DATA: {}} with pytest.raises(ValueError, match="cannot decode"): - decode({"__classname__": 123, "__version__": 1, "__data__": {}}) + decode(bad) + def test_serde_serialize_recursion_limit(self): + depth = sys.getrecursionlimit() - 1 with pytest.raises(RecursionError, match="maximum recursion depth reached for serialization"): - serialize(object(), depth=sys.getrecursionlimit() - 1) + serialize(object(), depth=depth) + def test_serde_deserialize_with_type_hint_stringified(self): fake = {"a": 1, "b": 2, "__version__": 1} - result = deserialize(fake, type_hint=dict, full=False) - assert result == "builtins.dict@version=0(a=1,b=2,__version__=1)" + got = deserialize(fake, type_hint=dict, full=False) + assert got == "builtins.dict@version=0(a=1,b=2,__version__=1)" + def test_serde_deserialize_empty_classname(self): + bad = {CLASSNAME: "", VERSION: 1, DATA: {}} with pytest.raises(TypeError, match="classname cannot be empty"): - deserialize({CLASSNAME: "", VERSION: 1, DATA: {}}) + deserialize(bad) - assert _stringify("dummy", 1, 123) == "dummy@version=1(123)" - assert _stringify("dummy", 1, [1]) == "dummy@version=1([,1,])" + @pytest.mark.parametrize( + "value, expected", + [ + (123, "dummy@version=1(123)"), + ([1], "dummy@version=1([,1,])"), + ], + ) + def test_serde_stringify_primitives(self, value, expected): + assert _stringify("dummy", 1, value) == expected From 0030387658fbd777b15ea18dce13795fd97c240d Mon Sep 17 00:00:00 2001 From: nailo2c Date: Mon, 9 Jun 2025 21:51:46 -0700 Subject: [PATCH 09/11] refactor test_serializers_objects.py --- .../serialization/test_serialized_objects.py | 138 ++++++++++++++---- 1 file changed, 106 insertions(+), 32 deletions(-) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index c7578a50dca27..90f7dc1ec11ab 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -44,7 +44,16 @@ from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.triggers.file import FileDeleteTrigger -from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent, AssetUniqueKey, AssetWatcher +from airflow.sdk.definitions.asset import ( + Asset, + AssetAlias, + AssetAliasEvent, + AssetAll, + AssetAny, + AssetRef, + AssetUniqueKey, + AssetWatcher, +) from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference from airflow.sdk.definitions.decorators import task from airflow.sdk.definitions.param import Param @@ -129,6 +138,10 @@ def test_validate_schema(): with pytest.raises(TypeError, match="Invalid type: Only dict and str are supported"): BaseSerialization.validate_schema(123) + +def test_serde_validate_schema_valid_json(): + from airflow.serialization.serialized_objects import BaseSerialization + class Test: def validate(self, obj): self.obj = obj @@ -566,18 +579,28 @@ def test_get_task_assets(): ] -def test_get_run_data_interval(): - lazy_serialized_dag1 = LazyDeserializedDAG(data={"dag": {"dag_id": "dag1"}}) +def test_lazy_dag_run_interval_wrong_dag(self): + lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "dag1"}}) + with pytest.raises(ValueError, match="different DAGs"): - lazy_serialized_dag1.get_run_data_interval(DAG_RUN) + lazy.get_run_data_interval(DAG_RUN) + + +def test_lazy_dag_run_interval_missing_interval(self): + lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "test_dag_id"}}) - lazy_serialized_dag2 = LazyDeserializedDAG(data={"dag": {"dag_id": "test_dag_id"}}) with pytest.raises(ValueError, match="Cannot calculate data interval"): - lazy_serialized_dag2.get_run_data_interval(DAG_RUN) + lazy.get_run_data_interval(DAG_RUN) + + +def test_lazy_dag_run_interval_success(self): + run = DAG_RUN + run.data_interval_start = datetime(2025, 1, 1) + run.data_interval_end = datetime(2025, 1, 2) + + lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "test_dag_id"}}) + interval = lazy.get_run_data_interval(run) - DAG_RUN.data_interval_start = datetime(2025, 1, 1, 0, 0) - DAG_RUN.data_interval_end = datetime(2025, 1, 2, 0, 0) - interval = lazy_serialized_dag2.get_run_data_interval(DAG_RUN) assert isinstance(interval, DataInterval) @@ -589,34 +612,85 @@ def test_hash_property(): assert lazy_serialized_dag.hash == SerializedDagModel.hash(data) -def test_decode_asset_condition(): - from airflow.sdk.definitions.asset import AssetAlias, AssetAll, AssetAny, AssetRef +@pytest.mark.parametrize( + "payload, expected_cls", + [ + pytest.param( + { + "__type": DAT.ASSET, + "name": "test_asset", + "uri": "test://asset-uri", + "group": "test-group", + "extra": {}, + }, + Asset, + id="asset", + ), + pytest.param( + { + "__type": DAT.ASSET_ALL, + "objects": [ + { + "__type": DAT.ASSET, + "name": "x", + "uri": "test://x", + "group": "g", + "extra": {}, + }, + { + "__type": DAT.ASSET, + "name": "x", + "uri": "test://x", + "group": "g", + "extra": {}, + }, + ], + }, + AssetAll, + id="asset_all", + ), + pytest.param( + { + "__type": DAT.ASSET_ANY, + "objects": [ + { + "__type": DAT.ASSET, + "name": "y", + "uri": "test://y", + "group": "g", + "extra": {}, + } + ], + }, + AssetAny, + id="asset_any", + ), + pytest.param( + {"__type": DAT.ASSET_ALIAS, "name": "alias", "group": "g"}, + AssetAlias, + id="asset_alias", + ), + pytest.param( + {"__type": DAT.ASSET_REF, "name": "ref"}, + AssetRef, + id="asset_ref", + ), + ], +) +def test_serde_decode_asset_condition_success(self, payload, expected_cls): from airflow.serialization.serialized_objects import decode_asset_condition - asset_dict = { - "__type": DAT.ASSET, - "name": "test_asset", - "uri": "test://asset-uri", - "group": "test-group", - "extra": {}, - } - assert isinstance(decode_asset_condition(asset_dict), Asset) + assert isinstance(decode_asset_condition(payload), expected_cls) - asset_all_dict = {"__type": DAT.ASSET_ALL, "objects": [asset_dict, asset_dict]} - assert isinstance(decode_asset_condition(asset_all_dict), AssetAll) - asset_any_dict = {"__type": DAT.ASSET_ANY, "objects": [asset_dict]} - assert isinstance(decode_asset_condition(asset_any_dict), AssetAny) - - asset_alias_dict = {"__type": DAT.ASSET_ALIAS, "name": "test_alias", "group": "test-alias-group"} - assert isinstance(decode_asset_condition(asset_alias_dict), AssetAlias) - - asset_ref_dict = {"__type": DAT.ASSET_REF, "name": "test_ref"} - assert isinstance(decode_asset_condition(asset_ref_dict), AssetRef) +def test_serde_decode_asset_condition_unknown_type(self): + from airflow.serialization.serialized_objects import decode_asset_condition - bad = {"__type": "UNKNOWN_TYPE"} - with pytest.raises(ValueError, match="deserialization not implemented for DAT 'UNKNOWN_TYPE'"): - decode_asset_condition(bad) + with pytest.raises( + ValueError, + match="deserialization not implemented for DAT 'UNKNOWN_TYPE'", + ): + decode_asset_condition({"__type": "UNKNOWN_TYPE"}) def test_encode_timezone(): From 8baaf2ff5702f723621de9674765a9f29b3ad067 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Mon, 9 Jun 2025 22:26:40 -0700 Subject: [PATCH 10/11] fix the 'self' not found issue --- .../unit/serialization/test_serialized_objects.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 90f7dc1ec11ab..bf525a46f5222 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -579,21 +579,21 @@ def test_get_task_assets(): ] -def test_lazy_dag_run_interval_wrong_dag(self): +def test_lazy_dag_run_interval_wrong_dag(): lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "dag1"}}) with pytest.raises(ValueError, match="different DAGs"): lazy.get_run_data_interval(DAG_RUN) -def test_lazy_dag_run_interval_missing_interval(self): +def test_lazy_dag_run_interval_missing_interval(): lazy = LazyDeserializedDAG(data={"dag": {"dag_id": "test_dag_id"}}) with pytest.raises(ValueError, match="Cannot calculate data interval"): lazy.get_run_data_interval(DAG_RUN) -def test_lazy_dag_run_interval_success(self): +def test_lazy_dag_run_interval_success(): run = DAG_RUN run.data_interval_start = datetime(2025, 1, 1) run.data_interval_end = datetime(2025, 1, 2) @@ -677,13 +677,13 @@ def test_hash_property(): ), ], ) -def test_serde_decode_asset_condition_success(self, payload, expected_cls): +def test_serde_decode_asset_condition_success(payload, expected_cls): from airflow.serialization.serialized_objects import decode_asset_condition assert isinstance(decode_asset_condition(payload), expected_cls) -def test_serde_decode_asset_condition_unknown_type(self): +def test_serde_decode_asset_condition_unknown_type(): from airflow.serialization.serialized_objects import decode_asset_condition with pytest.raises( From 106325aa78e3700234311bf4be35af90577293a5 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Thu, 12 Jun 2025 10:26:15 -0700 Subject: [PATCH 11/11] rename test functions for clearer scope --- .../serializers/test_serializers.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/airflow-core/tests/unit/serialization/serializers/test_serializers.py b/airflow-core/tests/unit/serialization/serializers/test_serializers.py index 8fb7e256940dc..58cf0ba3cd4ee 100644 --- a/airflow-core/tests/unit/serialization/serializers/test_serializers.py +++ b/airflow-core/tests/unit/serialization/serializers/test_serializers.py @@ -19,7 +19,6 @@ import datetime import decimal import sys -from decimal import Decimal from importlib import metadata from unittest.mock import patch from zoneinfo import ZoneInfo @@ -187,7 +186,7 @@ def test_encode_k8s_v1pod(self): "spec": {"containers": [{"image": "bar", "name": "foo"}]}, } - def test_bignum(self): + def test_bignum_serialize_non_decimal(self): from airflow.serialization.serializers.bignum import serialize assert serialize(12345) == ("", "", 0, False) @@ -196,10 +195,10 @@ def test_bignum(self): ("klass", "version", "payload", "msg"), [ ( - qualname(Decimal), + "decimal.Decimal", 999, "0", - rf"serialized 999 of {qualname(Decimal)}", # newer version + r"serialized 999 of decimal\.Decimal", # newer version ), ( "wrong.ClassName", @@ -354,7 +353,7 @@ def test_deltalake_deserialize_errors(self, klass, version, payload, msg): with pytest.raises(TypeError, match=msg): deserialize(klass, version, payload) - def test_kubernetes(self, monkeypatch): + def test_kubernetes_serializer(self, monkeypatch): from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.serialization.serializers.kubernetes import serialize @@ -511,10 +510,14 @@ def test_pendulum_3_to_2(self, ser_value, expected): """Test deserialize objects in pendulum 2 which serialised in pendulum 3.""" assert deserialize(ser_value) == expected - def test_timezone(self): + def test_timezone_serialize_fixed(self): from airflow.serialization.serializers.timezone import serialize assert serialize(FixedTimezone(0)) == ("UTC", "pendulum.tz.timezone.FixedTimezone", 1, True) + + def test_timezone_serialize_no_name(self): + from airflow.serialization.serializers.timezone import serialize + assert serialize(NoNameTZ()) == ("", "", 0, False) def test_timezone_deserialize_zoneinfo(self): @@ -550,7 +553,7 @@ def test_timezone_get_tzinfo_name(self, tz_obj, expected): assert _get_tzinfo_name(tz_obj) == expected - def test_json_schema(self, monkeypatch): + def test_json_schema_load_dag_schema_dict(self, monkeypatch): from airflow.exceptions import AirflowException from airflow.serialization.json_schema import load_dag_schema_dict