Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SchemaRegistry] add support for getting schema by version #26055

Merged
13 commits merged into from
Sep 14, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

## 1.0.1 (Unreleased)

### Features Added

### Breaking Changes
This version and all future versions will require Python 3.7+, Python 3.6 is no longer supported.

### Bugs Fixed

### Other Changes
- Updated `avro` minimum dependency to 1.11.1, which fixed a bug that previously restriceted complex types from being used as names.

## 1.0.0 (2022-05-10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pip install azure-schemaregistry-avroencoder
To use this package, you must have:
* Azure subscription - [Create a free account][azure_sub]
* [Azure Schema Registry][schemaregistry_service] - [Here is the quickstart guide][quickstart_guide] to create a Schema Registry group using the Azure portal.
* Python 3.6 or later - [Install Python][python]
* Python 3.7 or later - [Install Python][python]

### Authenticate the client
Interaction with the Schema Registry Avro Encoder starts with an instance of AvroEncoder class, which takes the schema group name and the [Schema Registry Client][schemaregistry_client] class. The client constructor takes the Event Hubs fully qualified namespace and and Azure Active Directory credential:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Several Schema Registry Avro Encoder Python SDK samples are available to you in
* Receive `EventData` from Event Hubs and decode the received bytes.

## Prerequisites
- Python 3.6 or later.
- Python 3.7 or later.
- **Microsoft Azure Subscription:** To use Azure services, including Azure Schema Registry, you'll need a subscription.
If you do not have an existing Azure account, you may sign up for a free trial or use your MSDN subscriber benefits when you [create an account](https://account.windowsazure.com/Home/Index).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

install_packages = [
'azure-schemaregistry>=1.0.0,<2.0.0',
'avro>=1.11.0',
'avro>=1.11.1',
"typing-extensions>=4.0.1",
]

Expand All @@ -52,14 +52,13 @@
"Development Status :: 5 - Production/Stable",
'Programming Language :: Python',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'License :: OSI Approved :: MIT License',
],
python_requires=">=3.6",
python_requires=">=3.7",
zip_safe=False,
packages=find_namespace_packages(
include=['azure.schemaregistry.encoder.*'] # Exclude packages that will be covered by PEP420 or nspkg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import functools
import pytest
import json
import pdb

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
Expand All @@ -37,7 +36,7 @@
from avro.errors import AvroTypeException
from azure.schemaregistry.encoder.avroencoder._apache_avro_encoder import ApacheAvroObjectEncoder as AvroObjectEncoder

from devtools_testutils import AzureTestCase, PowerShellPreparer
from devtools_testutils import AzureTestCase

SchemaRegistryEnvironmentVariableLoader = functools.partial(EnvironmentVariableLoader, "schemaregistry", schemaregistry_fully_qualified_namespace="fake_resource.servicebus.windows.net/", schemaregistry_group="fakegroup")

Expand Down Expand Up @@ -434,7 +433,7 @@ def test_parse_record_name(self, **kwargs):
sr_avro_encoder.encode({"name": u"Ben"}, schema=schema_invalid_name_in_fullname)

schema_invalid_name_reserved_type = """{
"name":"record",
"name":"long",
"type":"record",
"fields":[{"name":"name","type":"string"}]
}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,12 @@ async def test_parse_record_name(self, schemaregistry_fully_qualified_namespace,
await sr_avro_encoder.encode({"name": u"Ben"}, schema=schema_invalid_name_in_fullname)

schema_invalid_name_reserved_type = """{
"name":"record",
"name":"long",
"type":"record",
"fields":[{"name":"name","type":"string"}]
}"""
with pytest.raises(InvalidSchemaError):
await sr_avro_encoder.encode({"name": u"Ben"}, schema=schema_invalid_name_reserved_type)
await sr_avro_encoder.encode({"name": u"Ben"}, schema=schema_invalid_name_reserved_type)

schema_wrong_type_name = """{
"name":1,
Expand Down
9 changes: 6 additions & 3 deletions sdk/schemaregistry/azure-schemaregistry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

## 1.1.1 (Unreleased)

### Features Added
This version and all future versions will require Python 3.7+, Python 3.6 is no longer supported.

### Breaking Changes
### Features Added

### Bugs Fixed
- `get_schema_by_version` method has been added to the sync and async `SchemaRegistryClient`.
- `version` has been added to `SchemaProperties`.

### Other Changes

- Updated azure-core minimum dependency to 1.24.0.

## 1.1.0 (2022-05-10)

This version and all future versions will require Python 3.6+. Python 2.7 is no longer supported.
Expand Down
26 changes: 25 additions & 1 deletion sdk/schemaregistry/azure-schemaregistry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pip install azure-schemaregistry
To use this package, you must have:
* Azure subscription - [Create a free account][azure_sub]
* [Azure Schema Registry][schemaregistry_service] - [Here is the quickstart guide][quickstart_guide] to create a Schema Registry group using the Azure portal.
* Python 3.6 or later - [Install Python][python]
* Python 3.7 or later - [Install Python][python]

### Authenticate the client

Expand Down Expand Up @@ -71,6 +71,7 @@ The following sections provide several code snippets covering some of the most c

- [Register a schema](#register-a-schema)
- [Get the schema by id](#get-the-schema-by-id)
- [Get the schema by version](#get-the-schema-by-version)
- [Get the id of a schema](#get-the-id-of-a-schema)

### Register a schema
Expand Down Expand Up @@ -127,6 +128,29 @@ with schema_registry_client:
properties = schema.properties
```

### Get the schema by version

Get the schema definition and its properties by schema version.

```python
import os

from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ["SCHEMAREGISTRY_GROUP"]
name = "your-schema-name"
version = int("<your schema version>")

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
schema = schema_registry_client.get_schema_by_version(group_name, name, version)
definition = schema.definition
properties = schema.properties
```

### Get the id of a schema

Get the schema id of a schema by schema definition and its properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def _parse_schema_properties_dict(response):
return {
"id": response.headers.get("schema-id"),
"group_name": response.headers.get("schema-group-name"),
"name": response.headers.get("schema-name")
"name": response.headers.get("schema-name"),
"version": int(response.headers.get("schema-version"))
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class SchemaProperties(object):
:vartype group_name: str
:ivar name: Name of schema.
:vartype name: str
:ivar version: Version of schema.
:vartype version: int
"""

def __init__(self, **kwargs):
Expand All @@ -46,11 +48,12 @@ def __init__(self, **kwargs):
self.format = kwargs.pop("format")
self.group_name = kwargs.pop("group_name")
self.name = kwargs.pop("name")
self.version = kwargs.pop("version")

def __repr__(self):
return (
f"SchemaProperties(id={self.id}, format={self.format}, "
f"group_name={self.group_name}, name={self.name})"[:1024]
f"group_name={self.group_name}, name={self.name}, version={self.version})"[:1024]
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
except ImportError:
_patch_all = []
from ._patch import patch_sdk as _patch_sdk
__all__ = ['AzureSchemaRegistry']

__all__ = ["AzureSchemaRegistry"]
__all__.extend([p for p in _patch_all if p not in __all__])

_patch_sdk()
Original file line number Diff line number Diff line change
Expand Up @@ -9,53 +9,50 @@
from copy import deepcopy
from typing import Any, TYPE_CHECKING

from msrest import Deserializer, Serializer

from azure.core import PipelineClient
from azure.core.rest import HttpRequest, HttpResponse

from ._configuration import AzureSchemaRegistryConfiguration
from ._serialization import Deserializer, Serializer
from .operations import SchemaGroupsOperations, SchemaOperations

if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
from typing import Dict

from azure.core.credentials import TokenCredential

class AzureSchemaRegistry:

class AzureSchemaRegistry: # pylint: disable=client-accepts-api-version-keyword
"""Azure Schema Registry is as a central schema repository, with support for versioning,
management, compatibility checking, and RBAC.

:ivar schema_groups: SchemaGroupsOperations operations
:vartype schema_groups: azure.schemaregistry._generated.operations.SchemaGroupsOperations
:ivar schema: SchemaOperations operations
:vartype schema: azure.schemaregistry._generated.operations.SchemaOperations
:param endpoint: The Schema Registry service endpoint, for example
my-namespace.servicebus.windows.net.
my-namespace.servicebus.windows.net. Required.
:type endpoint: str
:param credential: Credential needed for the client to connect to Azure.
:param credential: Credential needed for the client to connect to Azure. Required.
:type credential: ~azure.core.credentials.TokenCredential
:keyword api_version: Api Version. Default value is "2021-10". Note that overriding this
default value may result in unsupported behavior.
:paramtype api_version: str
"""

def __init__(
self,
endpoint: str,
credential: "TokenCredential",
**kwargs: Any
) -> None:
_endpoint = 'https://{endpoint}'
def __init__(self, endpoint: str, credential: "TokenCredential", **kwargs: Any) -> None:
_endpoint = "https://{endpoint}"
self._config = AzureSchemaRegistryConfiguration(endpoint=endpoint, credential=credential, **kwargs)
self._client = PipelineClient(base_url=_endpoint, config=self._config, **kwargs)

self._serialize = Serializer()
self._deserialize = Deserializer()
self._serialize.client_side_validation = False
self.schema_groups = SchemaGroupsOperations(self._client, self._config, self._serialize, self._deserialize)
self.schema = SchemaOperations(self._client, self._config, self._serialize, self._deserialize)


def send_request(
self,
request: HttpRequest,
**kwargs: Any
) -> HttpResponse:
def send_request(self, request: HttpRequest, **kwargs: Any) -> HttpResponse:
"""Runs the network request through the client's chained policies.

We have helper methods to create requests specific to this service in `azure.schemaregistry._generated.rest`.
Expand All @@ -67,7 +64,7 @@ def send_request(
>>> response = client.send_request(request)
<HttpResponse: 200 OK>

For more information on this code flow, see https://aka.ms/azsdk/python/protocol/quickstart
For more information on this code flow, see https://aka.ms/azsdk/dpcodegen/python/send_request

:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
Expand All @@ -78,7 +75,7 @@ def send_request(

request_copy = deepcopy(request)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}

request_copy.url = self._client.format_url(request_copy.url, **path_format_arguments)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,18 @@ class AzureSchemaRegistryConfiguration(Configuration): # pylint: disable=too-ma
attributes.

:param endpoint: The Schema Registry service endpoint, for example
my-namespace.servicebus.windows.net.
my-namespace.servicebus.windows.net. Required.
:type endpoint: str
:param credential: Credential needed for the client to connect to Azure.
:param credential: Credential needed for the client to connect to Azure. Required.
:type credential: ~azure.core.credentials.TokenCredential
:keyword api_version: Api Version. Default value is "2021-10". Note that overriding this
default value may result in unsupported behavior.
:paramtype api_version: str
"""

def __init__(
self,
endpoint: str,
credential: "TokenCredential",
**kwargs: Any
) -> None:
def __init__(self, endpoint: str, credential: "TokenCredential", **kwargs: Any) -> None:
super(AzureSchemaRegistryConfiguration, self).__init__(**kwargs)
api_version = kwargs.pop('api_version', "2021-10") # type: str
api_version = kwargs.pop("api_version", "2021-10") # type: str

if endpoint is None:
raise ValueError("Parameter 'endpoint' must not be None.")
Expand All @@ -51,23 +46,24 @@ def __init__(
self.endpoint = endpoint
self.credential = credential
self.api_version = api_version
self.credential_scopes = kwargs.pop('credential_scopes', ['https://eventhubs.azure.net/.default'])
kwargs.setdefault('sdk_moniker', 'azureschemaregistry/{}'.format(VERSION))
self.credential_scopes = kwargs.pop("credential_scopes", ["https://eventhubs.azure.net/.default"])
kwargs.setdefault("sdk_moniker", "azureschemaregistry/{}".format(VERSION))
self._configure(**kwargs)

def _configure(
self,
**kwargs # type: Any
self, **kwargs # type: Any
):
# type: (...) -> None
self.user_agent_policy = kwargs.get('user_agent_policy') or policies.UserAgentPolicy(**kwargs)
self.headers_policy = kwargs.get('headers_policy') or policies.HeadersPolicy(**kwargs)
self.proxy_policy = kwargs.get('proxy_policy') or policies.ProxyPolicy(**kwargs)
self.logging_policy = kwargs.get('logging_policy') or policies.NetworkTraceLoggingPolicy(**kwargs)
self.http_logging_policy = kwargs.get('http_logging_policy') or policies.HttpLoggingPolicy(**kwargs)
self.retry_policy = kwargs.get('retry_policy') or policies.RetryPolicy(**kwargs)
self.custom_hook_policy = kwargs.get('custom_hook_policy') or policies.CustomHookPolicy(**kwargs)
self.redirect_policy = kwargs.get('redirect_policy') or policies.RedirectPolicy(**kwargs)
self.authentication_policy = kwargs.get('authentication_policy')
self.user_agent_policy = kwargs.get("user_agent_policy") or policies.UserAgentPolicy(**kwargs)
self.headers_policy = kwargs.get("headers_policy") or policies.HeadersPolicy(**kwargs)
self.proxy_policy = kwargs.get("proxy_policy") or policies.ProxyPolicy(**kwargs)
self.logging_policy = kwargs.get("logging_policy") or policies.NetworkTraceLoggingPolicy(**kwargs)
self.http_logging_policy = kwargs.get("http_logging_policy") or policies.HttpLoggingPolicy(**kwargs)
self.retry_policy = kwargs.get("retry_policy") or policies.RetryPolicy(**kwargs)
self.custom_hook_policy = kwargs.get("custom_hook_policy") or policies.CustomHookPolicy(**kwargs)
self.redirect_policy = kwargs.get("redirect_policy") or policies.RedirectPolicy(**kwargs)
self.authentication_policy = kwargs.get("authentication_policy")
if self.credential and not self.authentication_policy:
self.authentication_policy = policies.BearerTokenCredentialPolicy(self.credential, *self.credential_scopes, **kwargs)
self.authentication_policy = policies.BearerTokenCredentialPolicy(
self.credential, *self.credential_scopes, **kwargs
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

__all__: List[str] = [] # Add all objects you want publicly available to users at this package level


def patch_sdk():
"""Do not remove from this file.

Expand Down
Loading