From 43d09ad862b995bdd5b0c5ab3c986fdbc03ba692 Mon Sep 17 00:00:00 2001 From: iscai-msft Date: Thu, 5 Jun 2025 15:04:33 -0400 Subject: [PATCH 1/8] add is_sdk_model check and tests --- .../azure-core/azure/core/serialization.py | 10 + .../tests/test_serialization_hybrid_model.py | 1170 +++++++++++++++++ 2 files changed, 1180 insertions(+) create mode 100644 sdk/core/azure-core/tests/test_serialization_hybrid_model.py diff --git a/sdk/core/azure-core/azure/core/serialization.py b/sdk/core/azure-core/azure/core/serialization.py index 7991aca913be..fe4f14b0371f 100644 --- a/sdk/core/azure-core/azure/core/serialization.py +++ b/sdk/core/azure-core/azure/core/serialization.py @@ -129,3 +129,13 @@ def default(self, o: Any) -> Any: except AttributeError: pass return super(AzureJSONEncoder, self).default(o) + +def is_sdk_model(obj: Any) -> bool: + """Check if the object is an SDK model. + + :param obj: The object to check. + :type obj: any + :return: True if the object is an SDK model, False otherwise. + :rtype: bool + """ + return bool(getattr(obj, "_is_model", False) or hasattr(obj, "_attribute_map")) diff --git a/sdk/core/azure-core/tests/test_serialization_hybrid_model.py b/sdk/core/azure-core/tests/test_serialization_hybrid_model.py new file mode 100644 index 000000000000..fdf6cbd3fd09 --- /dev/null +++ b/sdk/core/azure-core/tests/test_serialization_hybrid_model.py @@ -0,0 +1,1170 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +from azure.core.serialization import is_sdk_model + +def test_is_sdk_model_with_hybrid_model(): + + class HybridModel(Model): + pass + + assert is_sdk_model(HybridModel()) + +def test_is_sdk_model_with_msrest_model(): + # Instead of importing msrest, we're just going to do a basic rendering of the msrest models + class MsrestModel(object): + _subtype_map = {} + _attribute_map = {} + _validation = {} + + def __init__(self, *args, **kwargs): + self.additional_properties = {} + + assert is_sdk_model(MsrestModel()) + class InstatiatedMsrestModel(MsrestModel): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.attr = "value" + + assert is_sdk_model(InstatiatedMsrestModel()) + +def test_is_sdk_model_with_non_models(): + assert not is_sdk_model({}) + assert not is_sdk_model([]) + assert not is_sdk_model("string") + assert not is_sdk_model(42) + assert not is_sdk_model(None) + assert not is_sdk_model(object) + + class Model: + def __init__(self): + self.attr = "value" + assert not is_sdk_model(Model()) + + +# ------------------------------------------------------- +# Definition for hybrid model base class + +import logging +import copy +import calendar +import decimal +import functools +import sys +import base64 +import re +import typing +import enum +import email.utils +from datetime import datetime, date, time, timedelta, timezone +import xml.etree.ElementTree as ET +from collections.abc import MutableMapping +from typing_extensions import Self +import isodate +from azure.core.exceptions import DeserializationError +from azure.core import CaseInsensitiveEnumMeta +from azure.core.pipeline import PipelineResponse +from azure.core.serialization import _Null, _timedelta_as_isostr + +_LOGGER = logging.getLogger(__name__) + + +TZ_UTC = timezone.utc +_T = typing.TypeVar("_T") + +_VALID_DATE = re.compile(r"\d{4}[-]\d{2}[-]\d{2}T\d{2}:\d{2}:\d{2}" + r"\.?\d*Z?[-+]?[\d{2}]?:?[\d{2}]?") +_VALID_RFC7231 = re.compile( + r"(Mon|Tue|Wed|Thu|Fri|Sat|Sun),\s\d{2}\s" + r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s\d{4}\s\d{2}:\d{2}:\d{2}\sGMT" +) + +def _serialize_bytes(o, format: typing.Optional[str] = None) -> str: + encoded = base64.b64encode(o).decode() + if format == "base64url": + return encoded.strip("=").replace("+", "-").replace("/", "_") + return encoded + + +def _serialize_datetime(o, format: typing.Optional[str] = None): + if hasattr(o, "year") and hasattr(o, "hour"): + if format == "rfc7231": + return email.utils.format_datetime(o, usegmt=True) + if format == "unix-timestamp": + return int(calendar.timegm(o.utctimetuple())) + + # astimezone() fails for naive times in Python 2.7, so make make sure o is aware (tzinfo is set) + if not o.tzinfo: + iso_formatted = o.replace(tzinfo=TZ_UTC).isoformat() + else: + iso_formatted = o.astimezone(TZ_UTC).isoformat() + # Replace the trailing "+00:00" UTC offset with "Z" (RFC 3339: https://www.ietf.org/rfc/rfc3339.txt) + return iso_formatted.replace("+00:00", "Z") + # Next try datetime.date or datetime.time + return o.isoformat() + +def _is_readonly(p): + try: + return p._visibility == ["read"] + except AttributeError: + return False + +def _deserialize_datetime(attr: typing.Union[str, datetime]) -> datetime: + """Deserialize ISO-8601 formatted string into Datetime object. + + :param str attr: response string to be deserialized. + :rtype: ~datetime.datetime + :returns: The datetime object from that input + """ + if isinstance(attr, datetime): + # i'm already deserialized + return attr + attr = attr.upper() + match = _VALID_DATE.match(attr) + if not match: + raise ValueError("Invalid datetime string: " + attr) + + check_decimal = attr.split(".") + if len(check_decimal) > 1: + decimal_str = "" + for digit in check_decimal[1]: + if digit.isdigit(): + decimal_str += digit + else: + break + if len(decimal_str) > 6: + attr = attr.replace(decimal_str, decimal_str[0:6]) + + date_obj = isodate.parse_datetime(attr) + test_utc = date_obj.utctimetuple() + if test_utc.tm_year > 9999 or test_utc.tm_year < 1: + raise OverflowError("Hit max or min date") + return date_obj + + +def _deserialize_datetime_rfc7231(attr: typing.Union[str, datetime]) -> datetime: + """Deserialize RFC7231 formatted string into Datetime object. + + :param str attr: response string to be deserialized. + :rtype: ~datetime.datetime + :returns: The datetime object from that input + """ + if isinstance(attr, datetime): + # i'm already deserialized + return attr + match = _VALID_RFC7231.match(attr) + if not match: + raise ValueError("Invalid datetime string: " + attr) + + return email.utils.parsedate_to_datetime(attr) + + +def _deserialize_datetime_unix_timestamp(attr: typing.Union[float, datetime]) -> datetime: + """Deserialize unix timestamp into Datetime object. + + :param str attr: response string to be deserialized. + :rtype: ~datetime.datetime + :returns: The datetime object from that input + """ + if isinstance(attr, datetime): + # i'm already deserialized + return attr + return datetime.fromtimestamp(attr, TZ_UTC) + + +def _deserialize_date(attr: typing.Union[str, date]) -> date: + """Deserialize ISO-8601 formatted string into Date object. + :param str attr: response string to be deserialized. + :rtype: date + :returns: The date object from that input + """ + # This must NOT use defaultmonth/defaultday. Using None ensure this raises an exception. + if isinstance(attr, date): + return attr + return isodate.parse_date(attr, defaultmonth=None, defaultday=None) # type: ignore + + +def _deserialize_time(attr: typing.Union[str, time]) -> time: + """Deserialize ISO-8601 formatted string into time object. + + :param str attr: response string to be deserialized. + :rtype: datetime.time + :returns: The time object from that input + """ + if isinstance(attr, time): + return attr + return isodate.parse_time(attr) + + +def _deserialize_bytes(attr): + if isinstance(attr, (bytes, bytearray)): + return attr + return bytes(base64.b64decode(attr)) + + +def _deserialize_bytes_base64(attr): + if isinstance(attr, (bytes, bytearray)): + return attr + padding = "=" * (3 - (len(attr) + 3) % 4) # type: ignore + attr = attr + padding # type: ignore + encoded = attr.replace("-", "+").replace("_", "/") + return bytes(base64.b64decode(encoded)) + + +def _deserialize_duration(attr): + if isinstance(attr, timedelta): + return attr + return isodate.parse_duration(attr) + + +def _deserialize_decimal(attr): + if isinstance(attr, decimal.Decimal): + return attr + return decimal.Decimal(str(attr)) + + +def _deserialize_int_as_str(attr): + if isinstance(attr, int): + return attr + return int(attr) + + +_DESERIALIZE_MAPPING = { + datetime: _deserialize_datetime, + date: _deserialize_date, + time: _deserialize_time, + bytes: _deserialize_bytes, + bytearray: _deserialize_bytes, + timedelta: _deserialize_duration, + typing.Any: lambda x: x, + decimal.Decimal: _deserialize_decimal, +} + +_DESERIALIZE_MAPPING_WITHFORMAT = { + "rfc3339": _deserialize_datetime, + "rfc7231": _deserialize_datetime_rfc7231, + "unix-timestamp": _deserialize_datetime_unix_timestamp, + "base64": _deserialize_bytes, + "base64url": _deserialize_bytes_base64, +} + + +def get_deserializer(annotation: typing.Any, rf: typing.Optional["_RestField"] = None): + if annotation is int and rf and rf._format == "str": + return _deserialize_int_as_str + if rf and rf._format: + return _DESERIALIZE_MAPPING_WITHFORMAT.get(rf._format) + return _DESERIALIZE_MAPPING.get(annotation) # pyright: ignore + + +def _get_type_alias_type(module_name: str, alias_name: str): + types = { + k: v + for k, v in sys.modules[module_name].__dict__.items() + if isinstance(v, typing._GenericAlias) # type: ignore + } + if alias_name not in types: + return alias_name + return types[alias_name] + + +def _get_model(module_name: str, model_name: str): + models = {k: v for k, v in sys.modules[module_name].__dict__.items() if isinstance(v, type)} + module_end = module_name.rsplit(".", 1)[0] + models.update({k: v for k, v in sys.modules[module_end].__dict__.items() if isinstance(v, type)}) + if isinstance(model_name, str): + model_name = model_name.split(".")[-1] + if model_name not in models: + return model_name + return models[model_name] + + +_UNSET = object() + + +class _MyMutableMapping(MutableMapping[str, typing.Any]): + def __init__(self, data: typing.Dict[str, typing.Any]) -> None: + self._data = data + + def __contains__(self, key: typing.Any) -> bool: + return key in self._data + + def __getitem__(self, key: str) -> typing.Any: + return self._data.__getitem__(key) + + def __setitem__(self, key: str, value: typing.Any) -> None: + self._data.__setitem__(key, value) + + def __delitem__(self, key: str) -> None: + self._data.__delitem__(key) + + def __iter__(self) -> typing.Iterator[typing.Any]: + return self._data.__iter__() + + def __len__(self) -> int: + return self._data.__len__() + + def __ne__(self, other: typing.Any) -> bool: + return not self.__eq__(other) + + def keys(self) -> typing.KeysView[str]: + """ + :returns: a set-like object providing a view on D's keys + :rtype: ~typing.KeysView + """ + return self._data.keys() + + def values(self) -> typing.ValuesView[typing.Any]: + """ + :returns: an object providing a view on D's values + :rtype: ~typing.ValuesView + """ + return self._data.values() + + def items(self) -> typing.ItemsView[str, typing.Any]: + """ + :returns: set-like object providing a view on D's items + :rtype: ~typing.ItemsView + """ + return self._data.items() + + def get(self, key: str, default: typing.Any = None) -> typing.Any: + """ + Get the value for key if key is in the dictionary, else default. + :param str key: The key to look up. + :param any default: The value to return if key is not in the dictionary. Defaults to None + :returns: D[k] if k in D, else d. + :rtype: any + """ + try: + return self[key] + except KeyError: + return default + + @typing.overload + def pop(self, key: str) -> typing.Any: ... # pylint: disable=arguments-differ + + @typing.overload + def pop(self, key: str, default: _T) -> _T: ... # pylint: disable=signature-differs + + @typing.overload + def pop(self, key: str, default: typing.Any) -> typing.Any: ... # pylint: disable=signature-differs + + def pop(self, key: str, default: typing.Any = _UNSET) -> typing.Any: + """ + Removes specified key and return the corresponding value. + :param str key: The key to pop. + :param any default: The value to return if key is not in the dictionary + :returns: The value corresponding to the key. + :rtype: any + :raises KeyError: If key is not found and default is not given. + """ + if default is _UNSET: + return self._data.pop(key) + return self._data.pop(key, default) + + def popitem(self) -> typing.Tuple[str, typing.Any]: + """ + Removes and returns some (key, value) pair + :returns: The (key, value) pair. + :rtype: tuple + :raises KeyError: if D is empty. + """ + return self._data.popitem() + + def clear(self) -> None: + """ + Remove all items from D. + """ + self._data.clear() + + def update(self, *args: typing.Any, **kwargs: typing.Any) -> None: # pylint: disable=arguments-differ + """ + Updates D from mapping/iterable E and F. + :param any args: Either a mapping object or an iterable of key-value pairs. + """ + self._data.update(*args, **kwargs) + + @typing.overload + def setdefault(self, key: str, default: None = None) -> None: ... + + @typing.overload + def setdefault(self, key: str, default: typing.Any) -> typing.Any: ... # pylint: disable=signature-differs + + def setdefault(self, key: str, default: typing.Any = _UNSET) -> typing.Any: + """ + Same as calling D.get(k, d), and setting D[k]=d if k not found + :param str key: The key to look up. + :param any default: The value to set if key is not in the dictionary + :returns: D[k] if k in D, else d. + :rtype: any + """ + if default is _UNSET: + return self._data.setdefault(key) + return self._data.setdefault(key, default) + + def __eq__(self, other: typing.Any) -> bool: + try: + other_model = self.__class__(other) + except Exception: + return False + return self._data == other_model._data + + def __repr__(self) -> str: + return str(self._data) + + +def _is_model(obj: typing.Any) -> bool: + return getattr(obj, "_is_model", False) + + +def _serialize(o, format: typing.Optional[str] = None): # pylint: disable=too-many-return-statements + if isinstance(o, list): + return [_serialize(x, format) for x in o] + if isinstance(o, dict): + return {k: _serialize(v, format) for k, v in o.items()} + if isinstance(o, set): + return {_serialize(x, format) for x in o} + if isinstance(o, tuple): + return tuple(_serialize(x, format) for x in o) + if isinstance(o, (bytes, bytearray)): + return _serialize_bytes(o, format) + if isinstance(o, decimal.Decimal): + return float(o) + if isinstance(o, enum.Enum): + return o.value + if isinstance(o, int): + if format == "str": + return str(o) + return o + try: + # First try datetime.datetime + return _serialize_datetime(o, format) + except AttributeError: + pass + # Last, try datetime.timedelta + try: + return _timedelta_as_isostr(o) + except AttributeError: + # This will be raised when it hits value.total_seconds in the method above + pass + return o + + +def _get_rest_field( + attr_to_rest_field: typing.Dict[str, "_RestField"], rest_name: str +) -> typing.Optional["_RestField"]: + try: + return next(rf for rf in attr_to_rest_field.values() if rf._rest_name == rest_name) + except StopIteration: + return None + + +def _create_value(rf: typing.Optional["_RestField"], value: typing.Any) -> typing.Any: + if not rf: + return _serialize(value, None) + if rf._is_multipart_file_input: + return value + if rf._is_model: + return _deserialize(rf._type, value) + if isinstance(value, ET.Element): + value = _deserialize(rf._type, value) + return _serialize(value, rf._format) + + +class Model(_MyMutableMapping): + _is_model = True + # label whether current class's _attr_to_rest_field has been calculated + # could not see _attr_to_rest_field directly because subclass inherits it from parent class + _calculated: typing.Set[str] = set() + + def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: + class_name = self.__class__.__name__ + if len(args) > 1: + raise TypeError(f"{class_name}.__init__() takes 2 positional arguments but {len(args) + 1} were given") + dict_to_pass = { + rest_field._rest_name: rest_field._default + for rest_field in self._attr_to_rest_field.values() + if rest_field._default is not _UNSET + } + if args: # pylint: disable=too-many-nested-blocks + if isinstance(args[0], ET.Element): + existed_attr_keys = [] + model_meta = getattr(self, "_xml", {}) + + for rf in self._attr_to_rest_field.values(): + prop_meta = getattr(rf, "_xml", {}) + xml_name = prop_meta.get("name", rf._rest_name) + xml_ns = prop_meta.get("ns", model_meta.get("ns", None)) + if xml_ns: + xml_name = "{" + xml_ns + "}" + xml_name + + # attribute + if prop_meta.get("attribute", False) and args[0].get(xml_name) is not None: + existed_attr_keys.append(xml_name) + dict_to_pass[rf._rest_name] = _deserialize(rf._type, args[0].get(xml_name)) + continue + + # unwrapped element is array + if prop_meta.get("unwrapped", False): + # unwrapped array could either use prop items meta/prop meta + if prop_meta.get("itemsName"): + xml_name = prop_meta.get("itemsName") + xml_ns = prop_meta.get("itemNs") + if xml_ns: + xml_name = "{" + xml_ns + "}" + xml_name + items = args[0].findall(xml_name) # pyright: ignore + if len(items) > 0: + existed_attr_keys.append(xml_name) + dict_to_pass[rf._rest_name] = _deserialize(rf._type, items) + continue + + # text element is primitive type + if prop_meta.get("text", False): + if args[0].text is not None: + dict_to_pass[rf._rest_name] = _deserialize(rf._type, args[0].text) + continue + + # wrapped element could be normal property or array, it should only have one element + item = args[0].find(xml_name) + if item is not None: + existed_attr_keys.append(xml_name) + dict_to_pass[rf._rest_name] = _deserialize(rf._type, item) + + # rest thing is additional properties + for e in args[0]: + if e.tag not in existed_attr_keys: + dict_to_pass[e.tag] = _convert_element(e) + else: + dict_to_pass.update( + {k: _create_value(_get_rest_field(self._attr_to_rest_field, k), v) for k, v in args[0].items()} + ) + else: + non_attr_kwargs = [k for k in kwargs if k not in self._attr_to_rest_field] + if non_attr_kwargs: + # actual type errors only throw the first wrong keyword arg they see, so following that. + raise TypeError(f"{class_name}.__init__() got an unexpected keyword argument '{non_attr_kwargs[0]}'") + dict_to_pass.update( + { + self._attr_to_rest_field[k]._rest_name: _create_value(self._attr_to_rest_field[k], v) + for k, v in kwargs.items() + if v is not None + } + ) + super().__init__(dict_to_pass) + + def copy(self) -> "Model": + return Model(self.__dict__) + + def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> Self: + if f"{cls.__module__}.{cls.__qualname__}" not in cls._calculated: + # we know the last nine classes in mro are going to be 'Model', '_MyMutableMapping', 'MutableMapping', + # 'Mapping', 'Collection', 'Sized', 'Iterable', 'Container' and 'object' + mros = cls.__mro__[:-9][::-1] # ignore parents, and reverse the mro order + attr_to_rest_field: typing.Dict[str, _RestField] = { # map attribute name to rest_field property + k: v for mro_class in mros for k, v in mro_class.__dict__.items() if k[0] != "_" and hasattr(v, "_type") + } + annotations = { + k: v + for mro_class in mros + if hasattr(mro_class, "__annotations__") + for k, v in mro_class.__annotations__.items() + } + for attr, rf in attr_to_rest_field.items(): + rf._module = cls.__module__ + if not rf._type: + rf._type = rf._get_deserialize_callable_from_annotation(annotations.get(attr, None)) + if not rf._rest_name_input: + rf._rest_name_input = attr + cls._attr_to_rest_field: typing.Dict[str, _RestField] = dict(attr_to_rest_field.items()) + cls._calculated.add(f"{cls.__module__}.{cls.__qualname__}") + + return super().__new__(cls) + + def __init_subclass__(cls, discriminator: typing.Optional[str] = None) -> None: + for base in cls.__bases__: + if hasattr(base, "__mapping__"): + base.__mapping__[discriminator or cls.__name__] = cls # type: ignore + + @classmethod + def _get_discriminator(cls, exist_discriminators) -> typing.Optional["_RestField"]: + for v in cls.__dict__.values(): + if isinstance(v, _RestField) and v._is_discriminator and v._rest_name not in exist_discriminators: + return v + return None + + @classmethod + def _deserialize(cls, data, exist_discriminators): + if not hasattr(cls, "__mapping__"): + return cls(data) + discriminator = cls._get_discriminator(exist_discriminators) + if discriminator is None: + return cls(data) + exist_discriminators.append(discriminator._rest_name) + if isinstance(data, ET.Element): + model_meta = getattr(cls, "_xml", {}) + prop_meta = getattr(discriminator, "_xml", {}) + xml_name = prop_meta.get("name", discriminator._rest_name) + xml_ns = prop_meta.get("ns", model_meta.get("ns", None)) + if xml_ns: + xml_name = "{" + xml_ns + "}" + xml_name + + if data.get(xml_name) is not None: + discriminator_value = data.get(xml_name) + else: + discriminator_value = data.find(xml_name).text # pyright: ignore + else: + discriminator_value = data.get(discriminator._rest_name) + mapped_cls = cls.__mapping__.get(discriminator_value, cls) # pyright: ignore # pylint: disable=no-member + return mapped_cls._deserialize(data, exist_discriminators) + + def as_dict(self, *, exclude_readonly: bool = False) -> typing.Dict[str, typing.Any]: + """Return a dict that can be turned into json using json.dump. + + :keyword bool exclude_readonly: Whether to remove the readonly properties. + :returns: A dict JSON compatible object + :rtype: dict + """ + + result = {} + readonly_props = [] + if exclude_readonly: + readonly_props = [p._rest_name for p in self._attr_to_rest_field.values() if _is_readonly(p)] + for k, v in self.items(): + if exclude_readonly and k in readonly_props: # pyright: ignore + continue + is_multipart_file_input = False + try: + is_multipart_file_input = next( + rf for rf in self._attr_to_rest_field.values() if rf._rest_name == k + )._is_multipart_file_input + except StopIteration: + pass + result[k] = v if is_multipart_file_input else Model._as_dict_value(v, exclude_readonly=exclude_readonly) + return result + + @staticmethod + def _as_dict_value(v: typing.Any, exclude_readonly: bool = False) -> typing.Any: + if v is None or isinstance(v, _Null): + return None + if isinstance(v, (list, tuple, set)): + return type(v)(Model._as_dict_value(x, exclude_readonly=exclude_readonly) for x in v) + if isinstance(v, dict): + return {dk: Model._as_dict_value(dv, exclude_readonly=exclude_readonly) for dk, dv in v.items()} + return v.as_dict(exclude_readonly=exclude_readonly) if hasattr(v, "as_dict") else v + + +def _deserialize_model(model_deserializer: typing.Optional[typing.Callable], obj): + if _is_model(obj): + return obj + return _deserialize(model_deserializer, obj) + + +def _deserialize_with_optional(if_obj_deserializer: typing.Optional[typing.Callable], obj): + if obj is None: + return obj + return _deserialize_with_callable(if_obj_deserializer, obj) + + +def _deserialize_with_union(deserializers, obj): + for deserializer in deserializers: + try: + return _deserialize(deserializer, obj) + except DeserializationError: + pass + raise DeserializationError() + + +def _deserialize_dict( + value_deserializer: typing.Optional[typing.Callable], + module: typing.Optional[str], + obj: typing.Dict[typing.Any, typing.Any], +): + if obj is None: + return obj + if isinstance(obj, ET.Element): + obj = {child.tag: child for child in obj} + return {k: _deserialize(value_deserializer, v, module) for k, v in obj.items()} + + +def _deserialize_multiple_sequence( + entry_deserializers: typing.List[typing.Optional[typing.Callable]], + module: typing.Optional[str], + obj, +): + if obj is None: + return obj + return type(obj)(_deserialize(deserializer, entry, module) for entry, deserializer in zip(obj, entry_deserializers)) + + +def _deserialize_sequence( + deserializer: typing.Optional[typing.Callable], + module: typing.Optional[str], + obj, +): + if obj is None: + return obj + if isinstance(obj, ET.Element): + obj = list(obj) + return type(obj)(_deserialize(deserializer, entry, module) for entry in obj) + + +def _sorted_annotations(types: typing.List[typing.Any]) -> typing.List[typing.Any]: + return sorted( + types, + key=lambda x: hasattr(x, "__name__") and x.__name__.lower() in ("str", "float", "int", "bool"), + ) + + +def _get_deserialize_callable_from_annotation( # pylint: disable=too-many-return-statements, too-many-branches + annotation: typing.Any, + module: typing.Optional[str], + rf: typing.Optional["_RestField"] = None, +) -> typing.Optional[typing.Callable[[typing.Any], typing.Any]]: + if not annotation: + return None + + # is it a type alias? + if isinstance(annotation, str): + if module is not None: + annotation = _get_type_alias_type(module, annotation) + + # is it a forward ref / in quotes? + if isinstance(annotation, (str, typing.ForwardRef)): + try: + model_name = annotation.__forward_arg__ # type: ignore + except AttributeError: + model_name = annotation + if module is not None: + annotation = _get_model(module, model_name) # type: ignore + + try: + if module and _is_model(annotation): + if rf: + rf._is_model = True + + return functools.partial(_deserialize_model, annotation) # pyright: ignore + except Exception: + pass + + # is it a literal? + try: + if annotation.__origin__ is typing.Literal: # pyright: ignore + return None + except AttributeError: + pass + + # is it optional? + try: + if any(a for a in annotation.__args__ if a == type(None)): # pyright: ignore + if len(annotation.__args__) <= 2: # pyright: ignore + if_obj_deserializer = _get_deserialize_callable_from_annotation( + next(a for a in annotation.__args__ if a != type(None)), module, rf # pyright: ignore + ) + + return functools.partial(_deserialize_with_optional, if_obj_deserializer) + # the type is Optional[Union[...]], we need to remove the None type from the Union + annotation_copy = copy.copy(annotation) + annotation_copy.__args__ = [a for a in annotation_copy.__args__ if a != type(None)] # pyright: ignore + return _get_deserialize_callable_from_annotation(annotation_copy, module, rf) + except AttributeError: + pass + + # is it union? + if getattr(annotation, "__origin__", None) is typing.Union: + # initial ordering is we make `string` the last deserialization option, because it is often them most generic + deserializers = [ + _get_deserialize_callable_from_annotation(arg, module, rf) + for arg in _sorted_annotations(annotation.__args__) # pyright: ignore + ] + + return functools.partial(_deserialize_with_union, deserializers) + + try: + if annotation._name == "Dict": # pyright: ignore + value_deserializer = _get_deserialize_callable_from_annotation( + annotation.__args__[1], module, rf # pyright: ignore + ) + + return functools.partial( + _deserialize_dict, + value_deserializer, + module, + ) + except (AttributeError, IndexError): + pass + try: + if annotation._name in ["List", "Set", "Tuple", "Sequence"]: # pyright: ignore + if len(annotation.__args__) > 1: # pyright: ignore + entry_deserializers = [ + _get_deserialize_callable_from_annotation(dt, module, rf) + for dt in annotation.__args__ # pyright: ignore + ] + return functools.partial(_deserialize_multiple_sequence, entry_deserializers, module) + deserializer = _get_deserialize_callable_from_annotation( + annotation.__args__[0], module, rf # pyright: ignore + ) + + return functools.partial(_deserialize_sequence, deserializer, module) + except (TypeError, IndexError, AttributeError, SyntaxError): + pass + + def _deserialize_default( + deserializer, + obj, + ): + if obj is None: + return obj + try: + return _deserialize_with_callable(deserializer, obj) + except Exception: + pass + return obj + + if get_deserializer(annotation, rf): + return functools.partial(_deserialize_default, get_deserializer(annotation, rf)) + + return functools.partial(_deserialize_default, annotation) + + +def _deserialize_with_callable( + deserializer: typing.Optional[typing.Callable[[typing.Any], typing.Any]], + value: typing.Any, +): # pylint: disable=too-many-return-statements + try: + if value is None or isinstance(value, _Null): + return None + if isinstance(value, ET.Element): + if deserializer is str: + return value.text or "" + if deserializer is int: + return int(value.text) if value.text else None + if deserializer is float: + return float(value.text) if value.text else None + if deserializer is bool: + return value.text == "true" if value.text else None + if deserializer is None: + return value + if deserializer in [int, float, bool]: + return deserializer(value) + if isinstance(deserializer, CaseInsensitiveEnumMeta): + try: + return deserializer(value) + except ValueError: + # for unknown value, return raw value + return value + if isinstance(deserializer, type) and issubclass(deserializer, Model): + return deserializer._deserialize(value, []) + return typing.cast(typing.Callable[[typing.Any], typing.Any], deserializer)(value) + except Exception as e: + raise DeserializationError() from e + + +def _deserialize( + deserializer: typing.Any, + value: typing.Any, + module: typing.Optional[str] = None, + rf: typing.Optional["_RestField"] = None, + format: typing.Optional[str] = None, +) -> typing.Any: + if isinstance(value, PipelineResponse): + value = value.http_response.json() + if rf is None and format: + rf = _RestField(format=format) + if not isinstance(deserializer, functools.partial): + deserializer = _get_deserialize_callable_from_annotation(deserializer, module, rf) + return _deserialize_with_callable(deserializer, value) + + +def _failsafe_deserialize( + deserializer: typing.Any, + value: typing.Any, + module: typing.Optional[str] = None, + rf: typing.Optional["_RestField"] = None, + format: typing.Optional[str] = None, +) -> typing.Any: + try: + return _deserialize(deserializer, value, module, rf, format) + except DeserializationError: + _LOGGER.warning( + "Ran into a deserialization error. Ignoring since this is failsafe deserialization", exc_info=True + ) + return None + + +def _failsafe_deserialize_xml( + deserializer: typing.Any, + value: typing.Any, +) -> typing.Any: + try: + return _deserialize_xml(deserializer, value) + except DeserializationError: + _LOGGER.warning( + "Ran into a deserialization error. Ignoring since this is failsafe deserialization", exc_info=True + ) + return None + + +class _RestField: + def __init__( + self, + *, + name: typing.Optional[str] = None, + type: typing.Optional[typing.Callable] = None, # pylint: disable=redefined-builtin + is_discriminator: bool = False, + visibility: typing.Optional[typing.List[str]] = None, + default: typing.Any = _UNSET, + format: typing.Optional[str] = None, + is_multipart_file_input: bool = False, + xml: typing.Optional[typing.Dict[str, typing.Any]] = None, + ): + self._type = type + self._rest_name_input = name + self._module: typing.Optional[str] = None + self._is_discriminator = is_discriminator + self._visibility = visibility + self._is_model = False + self._default = default + self._format = format + self._is_multipart_file_input = is_multipart_file_input + self._xml = xml if xml is not None else {} + + @property + def _class_type(self) -> typing.Any: + return getattr(self._type, "args", [None])[0] + + @property + def _rest_name(self) -> str: + if self._rest_name_input is None: + raise ValueError("Rest name was never set") + return self._rest_name_input + + def __get__(self, obj: Model, type=None): # pylint: disable=redefined-builtin + # by this point, type and rest_name will have a value bc we default + # them in __new__ of the Model class + item = obj.get(self._rest_name) + if item is None: + return item + if self._is_model: + return item + return _deserialize(self._type, _serialize(item, self._format), rf=self) + + def __set__(self, obj: Model, value) -> None: + if value is None: + # we want to wipe out entries if users set attr to None + try: + obj.__delitem__(self._rest_name) + except KeyError: + pass + return + if self._is_model: + if not _is_model(value): + value = _deserialize(self._type, value) + obj.__setitem__(self._rest_name, value) + return + obj.__setitem__(self._rest_name, _serialize(value, self._format)) + + def _get_deserialize_callable_from_annotation( + self, annotation: typing.Any + ) -> typing.Optional[typing.Callable[[typing.Any], typing.Any]]: + return _get_deserialize_callable_from_annotation(annotation, self._module, self) + + +def rest_field( + *, + name: typing.Optional[str] = None, + type: typing.Optional[typing.Callable] = None, # pylint: disable=redefined-builtin + visibility: typing.Optional[typing.List[str]] = None, + default: typing.Any = _UNSET, + format: typing.Optional[str] = None, + is_multipart_file_input: bool = False, + xml: typing.Optional[typing.Dict[str, typing.Any]] = None, +) -> typing.Any: + return _RestField( + name=name, + type=type, + visibility=visibility, + default=default, + format=format, + is_multipart_file_input=is_multipart_file_input, + xml=xml, + ) + + +def rest_discriminator( + *, + name: typing.Optional[str] = None, + type: typing.Optional[typing.Callable] = None, # pylint: disable=redefined-builtin + visibility: typing.Optional[typing.List[str]] = None, + xml: typing.Optional[typing.Dict[str, typing.Any]] = None, +) -> typing.Any: + return _RestField(name=name, type=type, is_discriminator=True, visibility=visibility, xml=xml) + + +def serialize_xml(model: Model, exclude_readonly: bool = False) -> str: + """Serialize a model to XML. + + :param Model model: The model to serialize. + :param bool exclude_readonly: Whether to exclude readonly properties. + :returns: The XML representation of the model. + :rtype: str + """ + return ET.tostring(_get_element(model, exclude_readonly), encoding="unicode") # type: ignore + + +def _get_element( + o: typing.Any, + exclude_readonly: bool = False, + parent_meta: typing.Optional[typing.Dict[str, typing.Any]] = None, + wrapped_element: typing.Optional[ET.Element] = None, +) -> typing.Union[ET.Element, typing.List[ET.Element]]: + if _is_model(o): + model_meta = getattr(o, "_xml", {}) + + # if prop is a model, then use the prop element directly, else generate a wrapper of model + if wrapped_element is None: + wrapped_element = _create_xml_element( + model_meta.get("name", o.__class__.__name__), + model_meta.get("prefix"), + model_meta.get("ns"), + ) + + readonly_props = [] + if exclude_readonly: + readonly_props = [p._rest_name for p in o._attr_to_rest_field.values() if _is_readonly(p)] + + for k, v in o.items(): + # do not serialize readonly properties + if exclude_readonly and k in readonly_props: + continue + + prop_rest_field = _get_rest_field(o._attr_to_rest_field, k) + if prop_rest_field: + prop_meta = getattr(prop_rest_field, "_xml").copy() + # use the wire name as xml name if no specific name is set + if prop_meta.get("name") is None: + prop_meta["name"] = k + else: + # additional properties will not have rest field, use the wire name as xml name + prop_meta = {"name": k} + + # if no ns for prop, use model's + if prop_meta.get("ns") is None and model_meta.get("ns"): + prop_meta["ns"] = model_meta.get("ns") + prop_meta["prefix"] = model_meta.get("prefix") + + if prop_meta.get("unwrapped", False): + # unwrapped could only set on array + wrapped_element.extend(_get_element(v, exclude_readonly, prop_meta)) + elif prop_meta.get("text", False): + # text could only set on primitive type + wrapped_element.text = _get_primitive_type_value(v) + elif prop_meta.get("attribute", False): + xml_name = prop_meta.get("name", k) + if prop_meta.get("ns"): + ET.register_namespace(prop_meta.get("prefix"), prop_meta.get("ns")) # pyright: ignore + xml_name = "{" + prop_meta.get("ns") + "}" + xml_name # pyright: ignore + # attribute should be primitive type + wrapped_element.set(xml_name, _get_primitive_type_value(v)) + else: + # other wrapped prop element + wrapped_element.append(_get_wrapped_element(v, exclude_readonly, prop_meta)) + return wrapped_element + if isinstance(o, list): + return [_get_element(x, exclude_readonly, parent_meta) for x in o] # type: ignore + if isinstance(o, dict): + result = [] + for k, v in o.items(): + result.append( + _get_wrapped_element( + v, + exclude_readonly, + { + "name": k, + "ns": parent_meta.get("ns") if parent_meta else None, + "prefix": parent_meta.get("prefix") if parent_meta else None, + }, + ) + ) + return result + + # primitive case need to create element based on parent_meta + if parent_meta: + return _get_wrapped_element( + o, + exclude_readonly, + { + "name": parent_meta.get("itemsName", parent_meta.get("name")), + "prefix": parent_meta.get("itemsPrefix", parent_meta.get("prefix")), + "ns": parent_meta.get("itemsNs", parent_meta.get("ns")), + }, + ) + + raise ValueError("Could not serialize value into xml: " + o) + + +def _get_wrapped_element( + v: typing.Any, + exclude_readonly: bool, + meta: typing.Optional[typing.Dict[str, typing.Any]], +) -> ET.Element: + wrapped_element = _create_xml_element( + meta.get("name") if meta else None, meta.get("prefix") if meta else None, meta.get("ns") if meta else None + ) + if isinstance(v, (dict, list)): + wrapped_element.extend(_get_element(v, exclude_readonly, meta)) + elif _is_model(v): + _get_element(v, exclude_readonly, meta, wrapped_element) + else: + wrapped_element.text = _get_primitive_type_value(v) + return wrapped_element + + +def _get_primitive_type_value(v) -> str: + if v is True: + return "true" + if v is False: + return "false" + if isinstance(v, _Null): + return "" + return str(v) + + +def _create_xml_element(tag, prefix=None, ns=None): + if prefix and ns: + ET.register_namespace(prefix, ns) + if ns: + return ET.Element("{" + ns + "}" + tag) + return ET.Element(tag) + + +def _deserialize_xml( + deserializer: typing.Any, + value: str, +) -> typing.Any: + element = ET.fromstring(value) # nosec + return _deserialize(deserializer, element) + + +def _convert_element(e: ET.Element): + # dict case + if len(e.attrib) > 0 or len({child.tag for child in e}) > 1: + dict_result: typing.Dict[str, typing.Any] = {} + for child in e: + if dict_result.get(child.tag) is not None: + if isinstance(dict_result[child.tag], list): + dict_result[child.tag].append(_convert_element(child)) + else: + dict_result[child.tag] = [dict_result[child.tag], _convert_element(child)] + else: + dict_result[child.tag] = _convert_element(child) + dict_result.update(e.attrib) + return dict_result + # array case + if len(e) > 0: + array_result: typing.List[typing.Any] = [] + for child in e: + array_result.append(_convert_element(child)) + return array_result + # primitive case + return e.text From 9db5b349c73d5307769c05d6b45262f805244a72 Mon Sep 17 00:00:00 2001 From: iscai-msft Date: Thu, 5 Jun 2025 15:45:01 -0400 Subject: [PATCH 2/8] add changeset --- sdk/core/azure-core/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index b8f7e4dbe1ff..830252128094 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -5,6 +5,7 @@ ### Features Added - Added a `start_time` keyword argument to the `start_span` and `start_as_current_span` methods in the `OpenTelemetryTracer` class. This allows users to specify a custom start time for created spans. #41106 +- Added `is_sdk_model` method to `azure.core.serialization`. Returns whether a given input is a model from one of our generated sdks. #41445 ### Breaking Changes From 9c13f1b07214ebdf823013b41e7d0a9c8d6127f5 Mon Sep 17 00:00:00 2001 From: iscai-msft Date: Thu, 5 Jun 2025 16:20:23 -0400 Subject: [PATCH 3/8] format --- sdk/core/azure-core/azure/core/serialization.py | 3 ++- .../tests/test_serialization_hybrid_model.py | 12 ++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdk/core/azure-core/azure/core/serialization.py b/sdk/core/azure-core/azure/core/serialization.py index fe4f14b0371f..77a36890a28d 100644 --- a/sdk/core/azure-core/azure/core/serialization.py +++ b/sdk/core/azure-core/azure/core/serialization.py @@ -129,7 +129,8 @@ def default(self, o: Any) -> Any: except AttributeError: pass return super(AzureJSONEncoder, self).default(o) - + + def is_sdk_model(obj: Any) -> bool: """Check if the object is an SDK model. diff --git a/sdk/core/azure-core/tests/test_serialization_hybrid_model.py b/sdk/core/azure-core/tests/test_serialization_hybrid_model.py index fdf6cbd3fd09..301e529e30a8 100644 --- a/sdk/core/azure-core/tests/test_serialization_hybrid_model.py +++ b/sdk/core/azure-core/tests/test_serialization_hybrid_model.py @@ -4,6 +4,7 @@ # ------------------------------------ from azure.core.serialization import is_sdk_model + def test_is_sdk_model_with_hybrid_model(): class HybridModel(Model): @@ -11,6 +12,7 @@ class HybridModel(Model): assert is_sdk_model(HybridModel()) + def test_is_sdk_model_with_msrest_model(): # Instead of importing msrest, we're just going to do a basic rendering of the msrest models class MsrestModel(object): @@ -22,12 +24,14 @@ def __init__(self, *args, **kwargs): self.additional_properties = {} assert is_sdk_model(MsrestModel()) - class InstatiatedMsrestModel(MsrestModel): + + class InstantiatedMsrestModel(MsrestModel): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.attr = "value" - assert is_sdk_model(InstatiatedMsrestModel()) + assert is_sdk_model(InstantiatedMsrestModel()) + def test_is_sdk_model_with_non_models(): assert not is_sdk_model({}) @@ -40,6 +44,7 @@ def test_is_sdk_model_with_non_models(): class Model: def __init__(self): self.attr = "value" + assert not is_sdk_model(Model()) @@ -79,6 +84,7 @@ def __init__(self): r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s\d{4}\s\d{2}:\d{2}:\d{2}\sGMT" ) + def _serialize_bytes(o, format: typing.Optional[str] = None) -> str: encoded = base64.b64encode(o).decode() if format == "base64url": @@ -103,12 +109,14 @@ def _serialize_datetime(o, format: typing.Optional[str] = None): # Next try datetime.date or datetime.time return o.isoformat() + def _is_readonly(p): try: return p._visibility == ["read"] except AttributeError: return False + def _deserialize_datetime(attr: typing.Union[str, datetime]) -> datetime: """Deserialize ISO-8601 formatted string into Datetime object. From 4b29a4f0363c6d8b59a4db9c4ed26e2d348bf842 Mon Sep 17 00:00:00 2001 From: iscai-msft Date: Thu, 5 Jun 2025 16:31:49 -0400 Subject: [PATCH 4/8] is_sdk_model -> is_generated_model --- sdk/core/azure-core/CHANGELOG.md | 2 +- .../azure-core/azure/core/serialization.py | 2 +- .../tests/test_serialization_hybrid_model.py | 28 +++++++++---------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index 830252128094..063fa490ef4b 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -5,7 +5,7 @@ ### Features Added - Added a `start_time` keyword argument to the `start_span` and `start_as_current_span` methods in the `OpenTelemetryTracer` class. This allows users to specify a custom start time for created spans. #41106 -- Added `is_sdk_model` method to `azure.core.serialization`. Returns whether a given input is a model from one of our generated sdks. #41445 +- Added `is_generated_model` method to `azure.core.serialization`. Returns whether a given input is a model from one of our generated sdks. #41445 ### Breaking Changes diff --git a/sdk/core/azure-core/azure/core/serialization.py b/sdk/core/azure-core/azure/core/serialization.py index 77a36890a28d..556b04456afc 100644 --- a/sdk/core/azure-core/azure/core/serialization.py +++ b/sdk/core/azure-core/azure/core/serialization.py @@ -131,7 +131,7 @@ def default(self, o: Any) -> Any: return super(AzureJSONEncoder, self).default(o) -def is_sdk_model(obj: Any) -> bool: +def is_generated_model(obj: Any) -> bool: """Check if the object is an SDK model. :param obj: The object to check. diff --git a/sdk/core/azure-core/tests/test_serialization_hybrid_model.py b/sdk/core/azure-core/tests/test_serialization_hybrid_model.py index 301e529e30a8..3c7589c20186 100644 --- a/sdk/core/azure-core/tests/test_serialization_hybrid_model.py +++ b/sdk/core/azure-core/tests/test_serialization_hybrid_model.py @@ -2,18 +2,18 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. # ------------------------------------ -from azure.core.serialization import is_sdk_model +from azure.core.serialization import is_generated_model -def test_is_sdk_model_with_hybrid_model(): +def test_is_generated_model_with_hybrid_model(): class HybridModel(Model): pass - assert is_sdk_model(HybridModel()) + assert is_generated_model(HybridModel()) -def test_is_sdk_model_with_msrest_model(): +def test_is_generated_model_with_msrest_model(): # Instead of importing msrest, we're just going to do a basic rendering of the msrest models class MsrestModel(object): _subtype_map = {} @@ -23,29 +23,29 @@ class MsrestModel(object): def __init__(self, *args, **kwargs): self.additional_properties = {} - assert is_sdk_model(MsrestModel()) + assert is_generated_model(MsrestModel()) class InstantiatedMsrestModel(MsrestModel): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.attr = "value" - assert is_sdk_model(InstantiatedMsrestModel()) + assert is_generated_model(InstantiatedMsrestModel()) -def test_is_sdk_model_with_non_models(): - assert not is_sdk_model({}) - assert not is_sdk_model([]) - assert not is_sdk_model("string") - assert not is_sdk_model(42) - assert not is_sdk_model(None) - assert not is_sdk_model(object) +def test_is_generated_model_with_non_models(): + assert not is_generated_model({}) + assert not is_generated_model([]) + assert not is_generated_model("string") + assert not is_generated_model(42) + assert not is_generated_model(None) + assert not is_generated_model(object) class Model: def __init__(self): self.attr = "value" - assert not is_sdk_model(Model()) + assert not is_generated_model(Model()) # ------------------------------------------------------- From 495dadf922e079e869831c685364c686abf9c46a Mon Sep 17 00:00:00 2001 From: iscai-msft Date: Fri, 6 Jun 2025 15:32:35 -0400 Subject: [PATCH 5/8] update docstrings --- sdk/core/azure-core/azure/core/serialization.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/core/azure-core/azure/core/serialization.py b/sdk/core/azure-core/azure/core/serialization.py index 556b04456afc..59c0fb00f5c0 100644 --- a/sdk/core/azure-core/azure/core/serialization.py +++ b/sdk/core/azure-core/azure/core/serialization.py @@ -132,11 +132,11 @@ def default(self, o: Any) -> Any: def is_generated_model(obj: Any) -> bool: - """Check if the object is an SDK model. + """Check if the object is a generated SDK model. :param obj: The object to check. :type obj: any - :return: True if the object is an SDK model, False otherwise. + :return: True if the object is a generated SDK model, False otherwise. :rtype: bool """ return bool(getattr(obj, "_is_model", False) or hasattr(obj, "_attribute_map")) From f5f33709555ec3e61a721dd69ff9a97c23cd06a6 Mon Sep 17 00:00:00 2001 From: iscai-msft Date: Mon, 9 Jun 2025 13:10:25 -0400 Subject: [PATCH 6/8] build tests on top of scaffolding --- .../azure-core/tests/test_serialization.py | 46 +- .../tests/test_serialization_hybrid_model.py | 1178 ----------------- 2 files changed, 45 insertions(+), 1179 deletions(-) delete mode 100644 sdk/core/azure-core/tests/test_serialization_hybrid_model.py diff --git a/sdk/core/azure-core/tests/test_serialization.py b/sdk/core/azure-core/tests/test_serialization.py index e76d9e80a86b..b3c66aac2027 100644 --- a/sdk/core/azure-core/tests/test_serialization.py +++ b/sdk/core/azure-core/tests/test_serialization.py @@ -8,8 +8,10 @@ import json import sys -from azure.core.serialization import AzureJSONEncoder, NULL +from azure.core.serialization import AzureJSONEncoder, NULL, is_generated_model import pytest +from modeltest._utils.model_base import Model as HybridModel, rest_field +from modeltest import models def _expand_value(obj): @@ -462,3 +464,45 @@ def __init__(self): ], } assert json.loads(json_dumps_with_encoder(expected.to_dict())) == expected_dict + + +def test_is_generated_model_with_hybrid_model(): + assert is_generated_model(HybridModel()) + assert is_generated_model(models.FlattenModel({"name": "wall-e", "properties": {"description": "a dog", "age": 2}})) + assert is_generated_model(models.ClientNamedPropertyModel(prop_client_name="wall-e")) + assert is_generated_model(models.ReadonlyModel()) + + +def test_is_generated_model_with_msrest_model(): + # Instead of importing msrest, we're just going to do a basic rendering of the msrest models + class MsrestModel(object): + _subtype_map = {} + _attribute_map = {} + _validation = {} + + def __init__(self, *args, **kwargs): + self.additional_properties = {} + + assert is_generated_model(MsrestModel()) + + class InstantiatedMsrestModel(MsrestModel): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.attr = "value" + + assert is_generated_model(InstantiatedMsrestModel()) + + +def test_is_generated_model_with_non_models(): + assert not is_generated_model({}) + assert not is_generated_model([]) + assert not is_generated_model("string") + assert not is_generated_model(42) + assert not is_generated_model(None) + assert not is_generated_model(object) + + class Model: + def __init__(self): + self.attr = "value" + + assert not is_generated_model(Model()) diff --git a/sdk/core/azure-core/tests/test_serialization_hybrid_model.py b/sdk/core/azure-core/tests/test_serialization_hybrid_model.py deleted file mode 100644 index 3c7589c20186..000000000000 --- a/sdk/core/azure-core/tests/test_serialization_hybrid_model.py +++ /dev/null @@ -1,1178 +0,0 @@ -# ------------------------------------ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. -# ------------------------------------ -from azure.core.serialization import is_generated_model - - -def test_is_generated_model_with_hybrid_model(): - - class HybridModel(Model): - pass - - assert is_generated_model(HybridModel()) - - -def test_is_generated_model_with_msrest_model(): - # Instead of importing msrest, we're just going to do a basic rendering of the msrest models - class MsrestModel(object): - _subtype_map = {} - _attribute_map = {} - _validation = {} - - def __init__(self, *args, **kwargs): - self.additional_properties = {} - - assert is_generated_model(MsrestModel()) - - class InstantiatedMsrestModel(MsrestModel): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.attr = "value" - - assert is_generated_model(InstantiatedMsrestModel()) - - -def test_is_generated_model_with_non_models(): - assert not is_generated_model({}) - assert not is_generated_model([]) - assert not is_generated_model("string") - assert not is_generated_model(42) - assert not is_generated_model(None) - assert not is_generated_model(object) - - class Model: - def __init__(self): - self.attr = "value" - - assert not is_generated_model(Model()) - - -# ------------------------------------------------------- -# Definition for hybrid model base class - -import logging -import copy -import calendar -import decimal -import functools -import sys -import base64 -import re -import typing -import enum -import email.utils -from datetime import datetime, date, time, timedelta, timezone -import xml.etree.ElementTree as ET -from collections.abc import MutableMapping -from typing_extensions import Self -import isodate -from azure.core.exceptions import DeserializationError -from azure.core import CaseInsensitiveEnumMeta -from azure.core.pipeline import PipelineResponse -from azure.core.serialization import _Null, _timedelta_as_isostr - -_LOGGER = logging.getLogger(__name__) - - -TZ_UTC = timezone.utc -_T = typing.TypeVar("_T") - -_VALID_DATE = re.compile(r"\d{4}[-]\d{2}[-]\d{2}T\d{2}:\d{2}:\d{2}" + r"\.?\d*Z?[-+]?[\d{2}]?:?[\d{2}]?") -_VALID_RFC7231 = re.compile( - r"(Mon|Tue|Wed|Thu|Fri|Sat|Sun),\s\d{2}\s" - r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s\d{4}\s\d{2}:\d{2}:\d{2}\sGMT" -) - - -def _serialize_bytes(o, format: typing.Optional[str] = None) -> str: - encoded = base64.b64encode(o).decode() - if format == "base64url": - return encoded.strip("=").replace("+", "-").replace("/", "_") - return encoded - - -def _serialize_datetime(o, format: typing.Optional[str] = None): - if hasattr(o, "year") and hasattr(o, "hour"): - if format == "rfc7231": - return email.utils.format_datetime(o, usegmt=True) - if format == "unix-timestamp": - return int(calendar.timegm(o.utctimetuple())) - - # astimezone() fails for naive times in Python 2.7, so make make sure o is aware (tzinfo is set) - if not o.tzinfo: - iso_formatted = o.replace(tzinfo=TZ_UTC).isoformat() - else: - iso_formatted = o.astimezone(TZ_UTC).isoformat() - # Replace the trailing "+00:00" UTC offset with "Z" (RFC 3339: https://www.ietf.org/rfc/rfc3339.txt) - return iso_formatted.replace("+00:00", "Z") - # Next try datetime.date or datetime.time - return o.isoformat() - - -def _is_readonly(p): - try: - return p._visibility == ["read"] - except AttributeError: - return False - - -def _deserialize_datetime(attr: typing.Union[str, datetime]) -> datetime: - """Deserialize ISO-8601 formatted string into Datetime object. - - :param str attr: response string to be deserialized. - :rtype: ~datetime.datetime - :returns: The datetime object from that input - """ - if isinstance(attr, datetime): - # i'm already deserialized - return attr - attr = attr.upper() - match = _VALID_DATE.match(attr) - if not match: - raise ValueError("Invalid datetime string: " + attr) - - check_decimal = attr.split(".") - if len(check_decimal) > 1: - decimal_str = "" - for digit in check_decimal[1]: - if digit.isdigit(): - decimal_str += digit - else: - break - if len(decimal_str) > 6: - attr = attr.replace(decimal_str, decimal_str[0:6]) - - date_obj = isodate.parse_datetime(attr) - test_utc = date_obj.utctimetuple() - if test_utc.tm_year > 9999 or test_utc.tm_year < 1: - raise OverflowError("Hit max or min date") - return date_obj - - -def _deserialize_datetime_rfc7231(attr: typing.Union[str, datetime]) -> datetime: - """Deserialize RFC7231 formatted string into Datetime object. - - :param str attr: response string to be deserialized. - :rtype: ~datetime.datetime - :returns: The datetime object from that input - """ - if isinstance(attr, datetime): - # i'm already deserialized - return attr - match = _VALID_RFC7231.match(attr) - if not match: - raise ValueError("Invalid datetime string: " + attr) - - return email.utils.parsedate_to_datetime(attr) - - -def _deserialize_datetime_unix_timestamp(attr: typing.Union[float, datetime]) -> datetime: - """Deserialize unix timestamp into Datetime object. - - :param str attr: response string to be deserialized. - :rtype: ~datetime.datetime - :returns: The datetime object from that input - """ - if isinstance(attr, datetime): - # i'm already deserialized - return attr - return datetime.fromtimestamp(attr, TZ_UTC) - - -def _deserialize_date(attr: typing.Union[str, date]) -> date: - """Deserialize ISO-8601 formatted string into Date object. - :param str attr: response string to be deserialized. - :rtype: date - :returns: The date object from that input - """ - # This must NOT use defaultmonth/defaultday. Using None ensure this raises an exception. - if isinstance(attr, date): - return attr - return isodate.parse_date(attr, defaultmonth=None, defaultday=None) # type: ignore - - -def _deserialize_time(attr: typing.Union[str, time]) -> time: - """Deserialize ISO-8601 formatted string into time object. - - :param str attr: response string to be deserialized. - :rtype: datetime.time - :returns: The time object from that input - """ - if isinstance(attr, time): - return attr - return isodate.parse_time(attr) - - -def _deserialize_bytes(attr): - if isinstance(attr, (bytes, bytearray)): - return attr - return bytes(base64.b64decode(attr)) - - -def _deserialize_bytes_base64(attr): - if isinstance(attr, (bytes, bytearray)): - return attr - padding = "=" * (3 - (len(attr) + 3) % 4) # type: ignore - attr = attr + padding # type: ignore - encoded = attr.replace("-", "+").replace("_", "/") - return bytes(base64.b64decode(encoded)) - - -def _deserialize_duration(attr): - if isinstance(attr, timedelta): - return attr - return isodate.parse_duration(attr) - - -def _deserialize_decimal(attr): - if isinstance(attr, decimal.Decimal): - return attr - return decimal.Decimal(str(attr)) - - -def _deserialize_int_as_str(attr): - if isinstance(attr, int): - return attr - return int(attr) - - -_DESERIALIZE_MAPPING = { - datetime: _deserialize_datetime, - date: _deserialize_date, - time: _deserialize_time, - bytes: _deserialize_bytes, - bytearray: _deserialize_bytes, - timedelta: _deserialize_duration, - typing.Any: lambda x: x, - decimal.Decimal: _deserialize_decimal, -} - -_DESERIALIZE_MAPPING_WITHFORMAT = { - "rfc3339": _deserialize_datetime, - "rfc7231": _deserialize_datetime_rfc7231, - "unix-timestamp": _deserialize_datetime_unix_timestamp, - "base64": _deserialize_bytes, - "base64url": _deserialize_bytes_base64, -} - - -def get_deserializer(annotation: typing.Any, rf: typing.Optional["_RestField"] = None): - if annotation is int and rf and rf._format == "str": - return _deserialize_int_as_str - if rf and rf._format: - return _DESERIALIZE_MAPPING_WITHFORMAT.get(rf._format) - return _DESERIALIZE_MAPPING.get(annotation) # pyright: ignore - - -def _get_type_alias_type(module_name: str, alias_name: str): - types = { - k: v - for k, v in sys.modules[module_name].__dict__.items() - if isinstance(v, typing._GenericAlias) # type: ignore - } - if alias_name not in types: - return alias_name - return types[alias_name] - - -def _get_model(module_name: str, model_name: str): - models = {k: v for k, v in sys.modules[module_name].__dict__.items() if isinstance(v, type)} - module_end = module_name.rsplit(".", 1)[0] - models.update({k: v for k, v in sys.modules[module_end].__dict__.items() if isinstance(v, type)}) - if isinstance(model_name, str): - model_name = model_name.split(".")[-1] - if model_name not in models: - return model_name - return models[model_name] - - -_UNSET = object() - - -class _MyMutableMapping(MutableMapping[str, typing.Any]): - def __init__(self, data: typing.Dict[str, typing.Any]) -> None: - self._data = data - - def __contains__(self, key: typing.Any) -> bool: - return key in self._data - - def __getitem__(self, key: str) -> typing.Any: - return self._data.__getitem__(key) - - def __setitem__(self, key: str, value: typing.Any) -> None: - self._data.__setitem__(key, value) - - def __delitem__(self, key: str) -> None: - self._data.__delitem__(key) - - def __iter__(self) -> typing.Iterator[typing.Any]: - return self._data.__iter__() - - def __len__(self) -> int: - return self._data.__len__() - - def __ne__(self, other: typing.Any) -> bool: - return not self.__eq__(other) - - def keys(self) -> typing.KeysView[str]: - """ - :returns: a set-like object providing a view on D's keys - :rtype: ~typing.KeysView - """ - return self._data.keys() - - def values(self) -> typing.ValuesView[typing.Any]: - """ - :returns: an object providing a view on D's values - :rtype: ~typing.ValuesView - """ - return self._data.values() - - def items(self) -> typing.ItemsView[str, typing.Any]: - """ - :returns: set-like object providing a view on D's items - :rtype: ~typing.ItemsView - """ - return self._data.items() - - def get(self, key: str, default: typing.Any = None) -> typing.Any: - """ - Get the value for key if key is in the dictionary, else default. - :param str key: The key to look up. - :param any default: The value to return if key is not in the dictionary. Defaults to None - :returns: D[k] if k in D, else d. - :rtype: any - """ - try: - return self[key] - except KeyError: - return default - - @typing.overload - def pop(self, key: str) -> typing.Any: ... # pylint: disable=arguments-differ - - @typing.overload - def pop(self, key: str, default: _T) -> _T: ... # pylint: disable=signature-differs - - @typing.overload - def pop(self, key: str, default: typing.Any) -> typing.Any: ... # pylint: disable=signature-differs - - def pop(self, key: str, default: typing.Any = _UNSET) -> typing.Any: - """ - Removes specified key and return the corresponding value. - :param str key: The key to pop. - :param any default: The value to return if key is not in the dictionary - :returns: The value corresponding to the key. - :rtype: any - :raises KeyError: If key is not found and default is not given. - """ - if default is _UNSET: - return self._data.pop(key) - return self._data.pop(key, default) - - def popitem(self) -> typing.Tuple[str, typing.Any]: - """ - Removes and returns some (key, value) pair - :returns: The (key, value) pair. - :rtype: tuple - :raises KeyError: if D is empty. - """ - return self._data.popitem() - - def clear(self) -> None: - """ - Remove all items from D. - """ - self._data.clear() - - def update(self, *args: typing.Any, **kwargs: typing.Any) -> None: # pylint: disable=arguments-differ - """ - Updates D from mapping/iterable E and F. - :param any args: Either a mapping object or an iterable of key-value pairs. - """ - self._data.update(*args, **kwargs) - - @typing.overload - def setdefault(self, key: str, default: None = None) -> None: ... - - @typing.overload - def setdefault(self, key: str, default: typing.Any) -> typing.Any: ... # pylint: disable=signature-differs - - def setdefault(self, key: str, default: typing.Any = _UNSET) -> typing.Any: - """ - Same as calling D.get(k, d), and setting D[k]=d if k not found - :param str key: The key to look up. - :param any default: The value to set if key is not in the dictionary - :returns: D[k] if k in D, else d. - :rtype: any - """ - if default is _UNSET: - return self._data.setdefault(key) - return self._data.setdefault(key, default) - - def __eq__(self, other: typing.Any) -> bool: - try: - other_model = self.__class__(other) - except Exception: - return False - return self._data == other_model._data - - def __repr__(self) -> str: - return str(self._data) - - -def _is_model(obj: typing.Any) -> bool: - return getattr(obj, "_is_model", False) - - -def _serialize(o, format: typing.Optional[str] = None): # pylint: disable=too-many-return-statements - if isinstance(o, list): - return [_serialize(x, format) for x in o] - if isinstance(o, dict): - return {k: _serialize(v, format) for k, v in o.items()} - if isinstance(o, set): - return {_serialize(x, format) for x in o} - if isinstance(o, tuple): - return tuple(_serialize(x, format) for x in o) - if isinstance(o, (bytes, bytearray)): - return _serialize_bytes(o, format) - if isinstance(o, decimal.Decimal): - return float(o) - if isinstance(o, enum.Enum): - return o.value - if isinstance(o, int): - if format == "str": - return str(o) - return o - try: - # First try datetime.datetime - return _serialize_datetime(o, format) - except AttributeError: - pass - # Last, try datetime.timedelta - try: - return _timedelta_as_isostr(o) - except AttributeError: - # This will be raised when it hits value.total_seconds in the method above - pass - return o - - -def _get_rest_field( - attr_to_rest_field: typing.Dict[str, "_RestField"], rest_name: str -) -> typing.Optional["_RestField"]: - try: - return next(rf for rf in attr_to_rest_field.values() if rf._rest_name == rest_name) - except StopIteration: - return None - - -def _create_value(rf: typing.Optional["_RestField"], value: typing.Any) -> typing.Any: - if not rf: - return _serialize(value, None) - if rf._is_multipart_file_input: - return value - if rf._is_model: - return _deserialize(rf._type, value) - if isinstance(value, ET.Element): - value = _deserialize(rf._type, value) - return _serialize(value, rf._format) - - -class Model(_MyMutableMapping): - _is_model = True - # label whether current class's _attr_to_rest_field has been calculated - # could not see _attr_to_rest_field directly because subclass inherits it from parent class - _calculated: typing.Set[str] = set() - - def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: - class_name = self.__class__.__name__ - if len(args) > 1: - raise TypeError(f"{class_name}.__init__() takes 2 positional arguments but {len(args) + 1} were given") - dict_to_pass = { - rest_field._rest_name: rest_field._default - for rest_field in self._attr_to_rest_field.values() - if rest_field._default is not _UNSET - } - if args: # pylint: disable=too-many-nested-blocks - if isinstance(args[0], ET.Element): - existed_attr_keys = [] - model_meta = getattr(self, "_xml", {}) - - for rf in self._attr_to_rest_field.values(): - prop_meta = getattr(rf, "_xml", {}) - xml_name = prop_meta.get("name", rf._rest_name) - xml_ns = prop_meta.get("ns", model_meta.get("ns", None)) - if xml_ns: - xml_name = "{" + xml_ns + "}" + xml_name - - # attribute - if prop_meta.get("attribute", False) and args[0].get(xml_name) is not None: - existed_attr_keys.append(xml_name) - dict_to_pass[rf._rest_name] = _deserialize(rf._type, args[0].get(xml_name)) - continue - - # unwrapped element is array - if prop_meta.get("unwrapped", False): - # unwrapped array could either use prop items meta/prop meta - if prop_meta.get("itemsName"): - xml_name = prop_meta.get("itemsName") - xml_ns = prop_meta.get("itemNs") - if xml_ns: - xml_name = "{" + xml_ns + "}" + xml_name - items = args[0].findall(xml_name) # pyright: ignore - if len(items) > 0: - existed_attr_keys.append(xml_name) - dict_to_pass[rf._rest_name] = _deserialize(rf._type, items) - continue - - # text element is primitive type - if prop_meta.get("text", False): - if args[0].text is not None: - dict_to_pass[rf._rest_name] = _deserialize(rf._type, args[0].text) - continue - - # wrapped element could be normal property or array, it should only have one element - item = args[0].find(xml_name) - if item is not None: - existed_attr_keys.append(xml_name) - dict_to_pass[rf._rest_name] = _deserialize(rf._type, item) - - # rest thing is additional properties - for e in args[0]: - if e.tag not in existed_attr_keys: - dict_to_pass[e.tag] = _convert_element(e) - else: - dict_to_pass.update( - {k: _create_value(_get_rest_field(self._attr_to_rest_field, k), v) for k, v in args[0].items()} - ) - else: - non_attr_kwargs = [k for k in kwargs if k not in self._attr_to_rest_field] - if non_attr_kwargs: - # actual type errors only throw the first wrong keyword arg they see, so following that. - raise TypeError(f"{class_name}.__init__() got an unexpected keyword argument '{non_attr_kwargs[0]}'") - dict_to_pass.update( - { - self._attr_to_rest_field[k]._rest_name: _create_value(self._attr_to_rest_field[k], v) - for k, v in kwargs.items() - if v is not None - } - ) - super().__init__(dict_to_pass) - - def copy(self) -> "Model": - return Model(self.__dict__) - - def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> Self: - if f"{cls.__module__}.{cls.__qualname__}" not in cls._calculated: - # we know the last nine classes in mro are going to be 'Model', '_MyMutableMapping', 'MutableMapping', - # 'Mapping', 'Collection', 'Sized', 'Iterable', 'Container' and 'object' - mros = cls.__mro__[:-9][::-1] # ignore parents, and reverse the mro order - attr_to_rest_field: typing.Dict[str, _RestField] = { # map attribute name to rest_field property - k: v for mro_class in mros for k, v in mro_class.__dict__.items() if k[0] != "_" and hasattr(v, "_type") - } - annotations = { - k: v - for mro_class in mros - if hasattr(mro_class, "__annotations__") - for k, v in mro_class.__annotations__.items() - } - for attr, rf in attr_to_rest_field.items(): - rf._module = cls.__module__ - if not rf._type: - rf._type = rf._get_deserialize_callable_from_annotation(annotations.get(attr, None)) - if not rf._rest_name_input: - rf._rest_name_input = attr - cls._attr_to_rest_field: typing.Dict[str, _RestField] = dict(attr_to_rest_field.items()) - cls._calculated.add(f"{cls.__module__}.{cls.__qualname__}") - - return super().__new__(cls) - - def __init_subclass__(cls, discriminator: typing.Optional[str] = None) -> None: - for base in cls.__bases__: - if hasattr(base, "__mapping__"): - base.__mapping__[discriminator or cls.__name__] = cls # type: ignore - - @classmethod - def _get_discriminator(cls, exist_discriminators) -> typing.Optional["_RestField"]: - for v in cls.__dict__.values(): - if isinstance(v, _RestField) and v._is_discriminator and v._rest_name not in exist_discriminators: - return v - return None - - @classmethod - def _deserialize(cls, data, exist_discriminators): - if not hasattr(cls, "__mapping__"): - return cls(data) - discriminator = cls._get_discriminator(exist_discriminators) - if discriminator is None: - return cls(data) - exist_discriminators.append(discriminator._rest_name) - if isinstance(data, ET.Element): - model_meta = getattr(cls, "_xml", {}) - prop_meta = getattr(discriminator, "_xml", {}) - xml_name = prop_meta.get("name", discriminator._rest_name) - xml_ns = prop_meta.get("ns", model_meta.get("ns", None)) - if xml_ns: - xml_name = "{" + xml_ns + "}" + xml_name - - if data.get(xml_name) is not None: - discriminator_value = data.get(xml_name) - else: - discriminator_value = data.find(xml_name).text # pyright: ignore - else: - discriminator_value = data.get(discriminator._rest_name) - mapped_cls = cls.__mapping__.get(discriminator_value, cls) # pyright: ignore # pylint: disable=no-member - return mapped_cls._deserialize(data, exist_discriminators) - - def as_dict(self, *, exclude_readonly: bool = False) -> typing.Dict[str, typing.Any]: - """Return a dict that can be turned into json using json.dump. - - :keyword bool exclude_readonly: Whether to remove the readonly properties. - :returns: A dict JSON compatible object - :rtype: dict - """ - - result = {} - readonly_props = [] - if exclude_readonly: - readonly_props = [p._rest_name for p in self._attr_to_rest_field.values() if _is_readonly(p)] - for k, v in self.items(): - if exclude_readonly and k in readonly_props: # pyright: ignore - continue - is_multipart_file_input = False - try: - is_multipart_file_input = next( - rf for rf in self._attr_to_rest_field.values() if rf._rest_name == k - )._is_multipart_file_input - except StopIteration: - pass - result[k] = v if is_multipart_file_input else Model._as_dict_value(v, exclude_readonly=exclude_readonly) - return result - - @staticmethod - def _as_dict_value(v: typing.Any, exclude_readonly: bool = False) -> typing.Any: - if v is None or isinstance(v, _Null): - return None - if isinstance(v, (list, tuple, set)): - return type(v)(Model._as_dict_value(x, exclude_readonly=exclude_readonly) for x in v) - if isinstance(v, dict): - return {dk: Model._as_dict_value(dv, exclude_readonly=exclude_readonly) for dk, dv in v.items()} - return v.as_dict(exclude_readonly=exclude_readonly) if hasattr(v, "as_dict") else v - - -def _deserialize_model(model_deserializer: typing.Optional[typing.Callable], obj): - if _is_model(obj): - return obj - return _deserialize(model_deserializer, obj) - - -def _deserialize_with_optional(if_obj_deserializer: typing.Optional[typing.Callable], obj): - if obj is None: - return obj - return _deserialize_with_callable(if_obj_deserializer, obj) - - -def _deserialize_with_union(deserializers, obj): - for deserializer in deserializers: - try: - return _deserialize(deserializer, obj) - except DeserializationError: - pass - raise DeserializationError() - - -def _deserialize_dict( - value_deserializer: typing.Optional[typing.Callable], - module: typing.Optional[str], - obj: typing.Dict[typing.Any, typing.Any], -): - if obj is None: - return obj - if isinstance(obj, ET.Element): - obj = {child.tag: child for child in obj} - return {k: _deserialize(value_deserializer, v, module) for k, v in obj.items()} - - -def _deserialize_multiple_sequence( - entry_deserializers: typing.List[typing.Optional[typing.Callable]], - module: typing.Optional[str], - obj, -): - if obj is None: - return obj - return type(obj)(_deserialize(deserializer, entry, module) for entry, deserializer in zip(obj, entry_deserializers)) - - -def _deserialize_sequence( - deserializer: typing.Optional[typing.Callable], - module: typing.Optional[str], - obj, -): - if obj is None: - return obj - if isinstance(obj, ET.Element): - obj = list(obj) - return type(obj)(_deserialize(deserializer, entry, module) for entry in obj) - - -def _sorted_annotations(types: typing.List[typing.Any]) -> typing.List[typing.Any]: - return sorted( - types, - key=lambda x: hasattr(x, "__name__") and x.__name__.lower() in ("str", "float", "int", "bool"), - ) - - -def _get_deserialize_callable_from_annotation( # pylint: disable=too-many-return-statements, too-many-branches - annotation: typing.Any, - module: typing.Optional[str], - rf: typing.Optional["_RestField"] = None, -) -> typing.Optional[typing.Callable[[typing.Any], typing.Any]]: - if not annotation: - return None - - # is it a type alias? - if isinstance(annotation, str): - if module is not None: - annotation = _get_type_alias_type(module, annotation) - - # is it a forward ref / in quotes? - if isinstance(annotation, (str, typing.ForwardRef)): - try: - model_name = annotation.__forward_arg__ # type: ignore - except AttributeError: - model_name = annotation - if module is not None: - annotation = _get_model(module, model_name) # type: ignore - - try: - if module and _is_model(annotation): - if rf: - rf._is_model = True - - return functools.partial(_deserialize_model, annotation) # pyright: ignore - except Exception: - pass - - # is it a literal? - try: - if annotation.__origin__ is typing.Literal: # pyright: ignore - return None - except AttributeError: - pass - - # is it optional? - try: - if any(a for a in annotation.__args__ if a == type(None)): # pyright: ignore - if len(annotation.__args__) <= 2: # pyright: ignore - if_obj_deserializer = _get_deserialize_callable_from_annotation( - next(a for a in annotation.__args__ if a != type(None)), module, rf # pyright: ignore - ) - - return functools.partial(_deserialize_with_optional, if_obj_deserializer) - # the type is Optional[Union[...]], we need to remove the None type from the Union - annotation_copy = copy.copy(annotation) - annotation_copy.__args__ = [a for a in annotation_copy.__args__ if a != type(None)] # pyright: ignore - return _get_deserialize_callable_from_annotation(annotation_copy, module, rf) - except AttributeError: - pass - - # is it union? - if getattr(annotation, "__origin__", None) is typing.Union: - # initial ordering is we make `string` the last deserialization option, because it is often them most generic - deserializers = [ - _get_deserialize_callable_from_annotation(arg, module, rf) - for arg in _sorted_annotations(annotation.__args__) # pyright: ignore - ] - - return functools.partial(_deserialize_with_union, deserializers) - - try: - if annotation._name == "Dict": # pyright: ignore - value_deserializer = _get_deserialize_callable_from_annotation( - annotation.__args__[1], module, rf # pyright: ignore - ) - - return functools.partial( - _deserialize_dict, - value_deserializer, - module, - ) - except (AttributeError, IndexError): - pass - try: - if annotation._name in ["List", "Set", "Tuple", "Sequence"]: # pyright: ignore - if len(annotation.__args__) > 1: # pyright: ignore - entry_deserializers = [ - _get_deserialize_callable_from_annotation(dt, module, rf) - for dt in annotation.__args__ # pyright: ignore - ] - return functools.partial(_deserialize_multiple_sequence, entry_deserializers, module) - deserializer = _get_deserialize_callable_from_annotation( - annotation.__args__[0], module, rf # pyright: ignore - ) - - return functools.partial(_deserialize_sequence, deserializer, module) - except (TypeError, IndexError, AttributeError, SyntaxError): - pass - - def _deserialize_default( - deserializer, - obj, - ): - if obj is None: - return obj - try: - return _deserialize_with_callable(deserializer, obj) - except Exception: - pass - return obj - - if get_deserializer(annotation, rf): - return functools.partial(_deserialize_default, get_deserializer(annotation, rf)) - - return functools.partial(_deserialize_default, annotation) - - -def _deserialize_with_callable( - deserializer: typing.Optional[typing.Callable[[typing.Any], typing.Any]], - value: typing.Any, -): # pylint: disable=too-many-return-statements - try: - if value is None or isinstance(value, _Null): - return None - if isinstance(value, ET.Element): - if deserializer is str: - return value.text or "" - if deserializer is int: - return int(value.text) if value.text else None - if deserializer is float: - return float(value.text) if value.text else None - if deserializer is bool: - return value.text == "true" if value.text else None - if deserializer is None: - return value - if deserializer in [int, float, bool]: - return deserializer(value) - if isinstance(deserializer, CaseInsensitiveEnumMeta): - try: - return deserializer(value) - except ValueError: - # for unknown value, return raw value - return value - if isinstance(deserializer, type) and issubclass(deserializer, Model): - return deserializer._deserialize(value, []) - return typing.cast(typing.Callable[[typing.Any], typing.Any], deserializer)(value) - except Exception as e: - raise DeserializationError() from e - - -def _deserialize( - deserializer: typing.Any, - value: typing.Any, - module: typing.Optional[str] = None, - rf: typing.Optional["_RestField"] = None, - format: typing.Optional[str] = None, -) -> typing.Any: - if isinstance(value, PipelineResponse): - value = value.http_response.json() - if rf is None and format: - rf = _RestField(format=format) - if not isinstance(deserializer, functools.partial): - deserializer = _get_deserialize_callable_from_annotation(deserializer, module, rf) - return _deserialize_with_callable(deserializer, value) - - -def _failsafe_deserialize( - deserializer: typing.Any, - value: typing.Any, - module: typing.Optional[str] = None, - rf: typing.Optional["_RestField"] = None, - format: typing.Optional[str] = None, -) -> typing.Any: - try: - return _deserialize(deserializer, value, module, rf, format) - except DeserializationError: - _LOGGER.warning( - "Ran into a deserialization error. Ignoring since this is failsafe deserialization", exc_info=True - ) - return None - - -def _failsafe_deserialize_xml( - deserializer: typing.Any, - value: typing.Any, -) -> typing.Any: - try: - return _deserialize_xml(deserializer, value) - except DeserializationError: - _LOGGER.warning( - "Ran into a deserialization error. Ignoring since this is failsafe deserialization", exc_info=True - ) - return None - - -class _RestField: - def __init__( - self, - *, - name: typing.Optional[str] = None, - type: typing.Optional[typing.Callable] = None, # pylint: disable=redefined-builtin - is_discriminator: bool = False, - visibility: typing.Optional[typing.List[str]] = None, - default: typing.Any = _UNSET, - format: typing.Optional[str] = None, - is_multipart_file_input: bool = False, - xml: typing.Optional[typing.Dict[str, typing.Any]] = None, - ): - self._type = type - self._rest_name_input = name - self._module: typing.Optional[str] = None - self._is_discriminator = is_discriminator - self._visibility = visibility - self._is_model = False - self._default = default - self._format = format - self._is_multipart_file_input = is_multipart_file_input - self._xml = xml if xml is not None else {} - - @property - def _class_type(self) -> typing.Any: - return getattr(self._type, "args", [None])[0] - - @property - def _rest_name(self) -> str: - if self._rest_name_input is None: - raise ValueError("Rest name was never set") - return self._rest_name_input - - def __get__(self, obj: Model, type=None): # pylint: disable=redefined-builtin - # by this point, type and rest_name will have a value bc we default - # them in __new__ of the Model class - item = obj.get(self._rest_name) - if item is None: - return item - if self._is_model: - return item - return _deserialize(self._type, _serialize(item, self._format), rf=self) - - def __set__(self, obj: Model, value) -> None: - if value is None: - # we want to wipe out entries if users set attr to None - try: - obj.__delitem__(self._rest_name) - except KeyError: - pass - return - if self._is_model: - if not _is_model(value): - value = _deserialize(self._type, value) - obj.__setitem__(self._rest_name, value) - return - obj.__setitem__(self._rest_name, _serialize(value, self._format)) - - def _get_deserialize_callable_from_annotation( - self, annotation: typing.Any - ) -> typing.Optional[typing.Callable[[typing.Any], typing.Any]]: - return _get_deserialize_callable_from_annotation(annotation, self._module, self) - - -def rest_field( - *, - name: typing.Optional[str] = None, - type: typing.Optional[typing.Callable] = None, # pylint: disable=redefined-builtin - visibility: typing.Optional[typing.List[str]] = None, - default: typing.Any = _UNSET, - format: typing.Optional[str] = None, - is_multipart_file_input: bool = False, - xml: typing.Optional[typing.Dict[str, typing.Any]] = None, -) -> typing.Any: - return _RestField( - name=name, - type=type, - visibility=visibility, - default=default, - format=format, - is_multipart_file_input=is_multipart_file_input, - xml=xml, - ) - - -def rest_discriminator( - *, - name: typing.Optional[str] = None, - type: typing.Optional[typing.Callable] = None, # pylint: disable=redefined-builtin - visibility: typing.Optional[typing.List[str]] = None, - xml: typing.Optional[typing.Dict[str, typing.Any]] = None, -) -> typing.Any: - return _RestField(name=name, type=type, is_discriminator=True, visibility=visibility, xml=xml) - - -def serialize_xml(model: Model, exclude_readonly: bool = False) -> str: - """Serialize a model to XML. - - :param Model model: The model to serialize. - :param bool exclude_readonly: Whether to exclude readonly properties. - :returns: The XML representation of the model. - :rtype: str - """ - return ET.tostring(_get_element(model, exclude_readonly), encoding="unicode") # type: ignore - - -def _get_element( - o: typing.Any, - exclude_readonly: bool = False, - parent_meta: typing.Optional[typing.Dict[str, typing.Any]] = None, - wrapped_element: typing.Optional[ET.Element] = None, -) -> typing.Union[ET.Element, typing.List[ET.Element]]: - if _is_model(o): - model_meta = getattr(o, "_xml", {}) - - # if prop is a model, then use the prop element directly, else generate a wrapper of model - if wrapped_element is None: - wrapped_element = _create_xml_element( - model_meta.get("name", o.__class__.__name__), - model_meta.get("prefix"), - model_meta.get("ns"), - ) - - readonly_props = [] - if exclude_readonly: - readonly_props = [p._rest_name for p in o._attr_to_rest_field.values() if _is_readonly(p)] - - for k, v in o.items(): - # do not serialize readonly properties - if exclude_readonly and k in readonly_props: - continue - - prop_rest_field = _get_rest_field(o._attr_to_rest_field, k) - if prop_rest_field: - prop_meta = getattr(prop_rest_field, "_xml").copy() - # use the wire name as xml name if no specific name is set - if prop_meta.get("name") is None: - prop_meta["name"] = k - else: - # additional properties will not have rest field, use the wire name as xml name - prop_meta = {"name": k} - - # if no ns for prop, use model's - if prop_meta.get("ns") is None and model_meta.get("ns"): - prop_meta["ns"] = model_meta.get("ns") - prop_meta["prefix"] = model_meta.get("prefix") - - if prop_meta.get("unwrapped", False): - # unwrapped could only set on array - wrapped_element.extend(_get_element(v, exclude_readonly, prop_meta)) - elif prop_meta.get("text", False): - # text could only set on primitive type - wrapped_element.text = _get_primitive_type_value(v) - elif prop_meta.get("attribute", False): - xml_name = prop_meta.get("name", k) - if prop_meta.get("ns"): - ET.register_namespace(prop_meta.get("prefix"), prop_meta.get("ns")) # pyright: ignore - xml_name = "{" + prop_meta.get("ns") + "}" + xml_name # pyright: ignore - # attribute should be primitive type - wrapped_element.set(xml_name, _get_primitive_type_value(v)) - else: - # other wrapped prop element - wrapped_element.append(_get_wrapped_element(v, exclude_readonly, prop_meta)) - return wrapped_element - if isinstance(o, list): - return [_get_element(x, exclude_readonly, parent_meta) for x in o] # type: ignore - if isinstance(o, dict): - result = [] - for k, v in o.items(): - result.append( - _get_wrapped_element( - v, - exclude_readonly, - { - "name": k, - "ns": parent_meta.get("ns") if parent_meta else None, - "prefix": parent_meta.get("prefix") if parent_meta else None, - }, - ) - ) - return result - - # primitive case need to create element based on parent_meta - if parent_meta: - return _get_wrapped_element( - o, - exclude_readonly, - { - "name": parent_meta.get("itemsName", parent_meta.get("name")), - "prefix": parent_meta.get("itemsPrefix", parent_meta.get("prefix")), - "ns": parent_meta.get("itemsNs", parent_meta.get("ns")), - }, - ) - - raise ValueError("Could not serialize value into xml: " + o) - - -def _get_wrapped_element( - v: typing.Any, - exclude_readonly: bool, - meta: typing.Optional[typing.Dict[str, typing.Any]], -) -> ET.Element: - wrapped_element = _create_xml_element( - meta.get("name") if meta else None, meta.get("prefix") if meta else None, meta.get("ns") if meta else None - ) - if isinstance(v, (dict, list)): - wrapped_element.extend(_get_element(v, exclude_readonly, meta)) - elif _is_model(v): - _get_element(v, exclude_readonly, meta, wrapped_element) - else: - wrapped_element.text = _get_primitive_type_value(v) - return wrapped_element - - -def _get_primitive_type_value(v) -> str: - if v is True: - return "true" - if v is False: - return "false" - if isinstance(v, _Null): - return "" - return str(v) - - -def _create_xml_element(tag, prefix=None, ns=None): - if prefix and ns: - ET.register_namespace(prefix, ns) - if ns: - return ET.Element("{" + ns + "}" + tag) - return ET.Element(tag) - - -def _deserialize_xml( - deserializer: typing.Any, - value: str, -) -> typing.Any: - element = ET.fromstring(value) # nosec - return _deserialize(deserializer, element) - - -def _convert_element(e: ET.Element): - # dict case - if len(e.attrib) > 0 or len({child.tag for child in e}) > 1: - dict_result: typing.Dict[str, typing.Any] = {} - for child in e: - if dict_result.get(child.tag) is not None: - if isinstance(dict_result[child.tag], list): - dict_result[child.tag].append(_convert_element(child)) - else: - dict_result[child.tag] = [dict_result[child.tag], _convert_element(child)] - else: - dict_result[child.tag] = _convert_element(child) - dict_result.update(e.attrib) - return dict_result - # array case - if len(e) > 0: - array_result: typing.List[typing.Any] = [] - for child in e: - array_result.append(_convert_element(child)) - return array_result - # primitive case - return e.text From a0667d70d11bccd88495b1671e71e0a3625e8fd0 Mon Sep 17 00:00:00 2001 From: iscai-msft Date: Mon, 9 Jun 2025 14:47:45 -0400 Subject: [PATCH 7/8] switch wd --- eng/pipelines/tsp-spec-sync.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eng/pipelines/tsp-spec-sync.yml b/eng/pipelines/tsp-spec-sync.yml index 6a02da1c02ec..a50bb1ee3dcf 100644 --- a/eng/pipelines/tsp-spec-sync.yml +++ b/eng/pipelines/tsp-spec-sync.yml @@ -32,7 +32,7 @@ extends: - script: | tsp-client update --local-spec-repo ../../specs/scratch/ displayName: 'Run TSP Client Update' - workingDirectory: ./sdk/core/azure-core/tests/specs_sdk/scratch + workingDirectory: /sdk/core/azure-core/tests/specs_sdk/scratch - script: | changes=$(git status --porcelain) From 1d509f4ecf98b1424b9168f11c23da86b2fd1446 Mon Sep 17 00:00:00 2001 From: iscai-msft Date: Mon, 9 Jun 2025 14:49:12 -0400 Subject: [PATCH 8/8] revert accidental commit --- eng/pipelines/tsp-spec-sync.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eng/pipelines/tsp-spec-sync.yml b/eng/pipelines/tsp-spec-sync.yml index a50bb1ee3dcf..6a02da1c02ec 100644 --- a/eng/pipelines/tsp-spec-sync.yml +++ b/eng/pipelines/tsp-spec-sync.yml @@ -32,7 +32,7 @@ extends: - script: | tsp-client update --local-spec-repo ../../specs/scratch/ displayName: 'Run TSP Client Update' - workingDirectory: /sdk/core/azure-core/tests/specs_sdk/scratch + workingDirectory: ./sdk/core/azure-core/tests/specs_sdk/scratch - script: | changes=$(git status --porcelain)