Skip to content

Commit

Permalink
[SchemaRegistry] update positional args to req kwargs (Azure#20763)
Browse files Browse the repository at this point in the history
* make params required kwargs

* update changelog

* adams comments

* update avro tests + samples

* fix samples

* pylint error

* adams comments
  • Loading branch information
swathipil authored Sep 27, 2021
1 parent 55526df commit d32de70
Show file tree
Hide file tree
Showing 22 changed files with 189 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Features Added

- `auto_register_schemas` keyword argument has been added to `SchemaRegistryAvroSerializer`, which will allow for automatically registering schemas passed in to the `serialize`.
- `value` parameter in `serialize` on `SchemaRegistryAvroSerializer` takes type `Mapping` rather than `Dict`.

### Breaking Changes

Expand All @@ -13,6 +14,9 @@
- `data` parameter in the `serialize` and `deserialize` methods on `SchemaRegistryAvroSerializer` has been renamed `value`.
- `schema` parameter in the `serialize` method on `SchemaRegistryAvroSerializer` no longer accepts argument of type `bytes`.
- `SchemaRegistryAvroSerializer` constructor no longer takes in the `codec` keyword argument.
- The following positional arguments are now required keyword arguments:
- `client` and `group_name` in `SchemaRegistryAvroSerializer` constructor
- `schema` in `serialize` on `SchemaRegistryAvroSerializer`

### Bugs Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#
# --------------------------------------------------------------------------
from io import BytesIO
from typing import Any, Dict, Union
from typing import Any, Dict, Mapping
import avro

from ._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX
Expand All @@ -36,20 +36,23 @@ class SchemaRegistryAvroSerializer(object):
SchemaRegistryAvroSerializer provides the ability to serialize and deserialize data according
to the given avro schema. It would automatically register, get and cache the schema.
:param client: The schema registry client
:keyword client: Required. The schema registry client
which is used to register schema and retrieve schema from the service.
:type client: ~azure.schemaregistry.SchemaRegistryClient
:param str group_name: Schema group under which schema should be registered.
:paramtype client: ~azure.schemaregistry.SchemaRegistryClient
:keyword str group_name: Required. Schema group under which schema should be registered.
:keyword bool auto_register_schemas: When true, register new schemas passed to serialize.
Otherwise, and by default, fail if it has not been pre-registered in the registry.
"""

def __init__(self, client, group_name, **kwargs):
# type: ("SchemaRegistryClient", str, Any) -> None
self._schema_group = group_name
def __init__(self, **kwargs):
# type: (Any) -> None
try:
self._schema_group = kwargs.pop("group_name")
self._schema_registry_client = kwargs.pop("client") # type: "SchemaRegistryClient"
except KeyError as e:
raise TypeError("'{}' is a required keyword.".format(e.args[0]))
self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec"))
self._schema_registry_client = client # type: "SchemaRegistryClient"
self._auto_register_schemas = kwargs.get("auto_register_schemas", False)
self._auto_register_schema_func = (
self._schema_registry_client.register_schema
Expand Down Expand Up @@ -119,20 +122,23 @@ def _get_schema(self, schema_id, **kwargs):
self._schema_to_id[schema_str] = schema_id
return schema_str

def serialize(self, value, schema, **kwargs):
# type: (Dict[str, Any], Union[str, bytes], Any) -> bytes
def serialize(self, value, **kwargs):
# type: (Mapping[str, Any], Any) -> bytes
"""
Encode data with the given schema. The returns bytes are consisted of: The first 4 bytes
denoting record format identifier. The following 32 bytes denoting schema id returned by schema registry
service. The remaining bytes are the real data payload.
:param value: The data to be encoded.
:type value: Dict[str, Any]
:param schema: The schema used to encode the data.
:type schema: str
:type value: Mapping[str, Any]
:keyword schema: Required. The schema used to encode the data.
:paramtype schema: str
:rtype: bytes
"""
raw_input_schema = schema
try:
raw_input_schema = kwargs.pop("schema")
except KeyError as e:
raise TypeError("'{}' is a required keyword.".format(e.args[0]))
try:
cached_schema = self._user_input_schema_cache[raw_input_schema]
except KeyError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
CLIENT_ID=os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_ID']
CLIENT_SECRET=os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_SECRET']

SCHEMA_REGISTRY_ENDPOINT=os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE=os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME=os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMA_STRING = """
{"namespace": "example.avro",
Expand All @@ -59,9 +59,9 @@ def serialize(serializer):
dict_data_alice = {"name": u"Alice", "favorite_number": 15, "favorite_color": u"green"}

# Schema would be automatically registered into Schema Registry and cached locally.
payload_ben = serializer.serialize(dict_data_ben, SCHEMA_STRING)
payload_ben = serializer.serialize(dict_data_ben, schema=SCHEMA_STRING)
# The second call won't trigger a service call.
payload_alice = serializer.serialize(dict_data_alice, SCHEMA_STRING)
payload_alice = serializer.serialize(dict_data_alice, schema=SCHEMA_STRING)

print('Encoded bytes are: ', payload_ben)
print('Encoded bytes are: ', payload_alice)
Expand All @@ -79,8 +79,8 @@ def deserialize(serializer, bytes_payload):


if __name__ == '__main__':
schema_registry = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_ENDPOINT, credential=token_credential)
serializer = SchemaRegistryAvroSerializer(schema_registry, GROUP_NAME)
schema_registry = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential)
serializer = SchemaRegistryAvroSerializer(client=schema_registry, group_name=GROUP_NAME, auto_register_schemas=True)
bytes_data_ben, bytes_data_alice = serialize(serializer)
dict_data_ben = deserialize(serializer, bytes_data_ben)
dict_data_alice = deserialize(serializer, bytes_data_alice)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']


Expand All @@ -45,12 +45,14 @@ def on_event(partition_context, event):


# create a SchemaRegistryAvroSerializer instance
# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace'
avro_serializer = SchemaRegistryAvroSerializer(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE,
credential=DefaultAzureCredential()
),
group_name=GROUP_NAME
group_name=GROUP_NAME,
auto_register_schemas=True
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']

SCHEMA_STRING = """
Expand Down Expand Up @@ -59,12 +59,14 @@ def send_event_data_batch(producer, serializer):


# create a SchemaRegistryAvroSerializer instance
# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace'
avro_serializer = SchemaRegistryAvroSerializer(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE,
credential=DefaultAzureCredential()
),
group_name=GROUP_NAME
group_name=GROUP_NAME,
auto_register_schemas=True
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"041afcdb34a546faa3aa26a991567e32"}'
string: '{"id":"fc61e4d3e31b46f6a758fa1b67f35cc5"}'
headers:
content-type:
- application/json
date:
- Wed, 08 Sep 2021 22:17:05 GMT
- Fri, 24 Sep 2021 19:54:45 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- 041afcdb34a546faa3aa26a991567e32
- fc61e4d3e31b46f6a758fa1b67f35cc5
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/041afcdb34a546faa3aa26a991567e32?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/fc61e4d3e31b46f6a758fa1b67f35cc5?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"041afcdb34a546faa3aa26a991567e32"}'
string: '{"id":"fc61e4d3e31b46f6a758fa1b67f35cc5"}'
headers:
content-type:
- application/json
date:
- Wed, 08 Sep 2021 22:17:06 GMT
- Fri, 24 Sep 2021 19:54:47 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- 041afcdb34a546faa3aa26a991567e32
- fc61e4d3e31b46f6a758fa1b67f35cc5
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/041afcdb34a546faa3aa26a991567e32?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/fc61e4d3e31b46f6a758fa1b67f35cc5?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
)


SCHEMA_REGISTRY_ENDPOINT_PARAM = "schemaregistry_endpoint"
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_PARAM = "schemaregistry_fully_qualified_namespace"
SCHEMA_REGISTRY_GROUP_PARAM = "schemaregistry_group"
SCHEMA_REGISTRY_ENDPOINT_ENV_KEY_NAME = 'SCHEMA_REGISTRY_ENDPOINT'
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_ENV_KEY_NAME = 'SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE'
SCHEMA_REGISTRY_GROUP_ENV_KEY_NAME = 'SCHEMA_REGISTRY_GROUP'


Expand All @@ -59,13 +59,13 @@ def create_resource(self, name, **kwargs):
# TODO: right now the endpoint/group is fixed, as there is no way to create/delete resources using api, in the future we should be able to dynamically create and remove resources
if self.is_live:
return {
SCHEMA_REGISTRY_ENDPOINT_PARAM: os.environ[SCHEMA_REGISTRY_ENDPOINT_ENV_KEY_NAME],
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_PARAM: os.environ[SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_ENV_KEY_NAME],
SCHEMA_REGISTRY_GROUP_PARAM: os.environ[SCHEMA_REGISTRY_GROUP_ENV_KEY_NAME]
}

else:
return {
SCHEMA_REGISTRY_ENDPOINT_PARAM: "sr-playground.servicebus.windows.net",
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_PARAM: "sr-playground.servicebus.windows.net",
SCHEMA_REGISTRY_GROUP_PARAM: "azsdk_python_test_group"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from devtools_testutils import AzureTestCase, PowerShellPreparer

SchemaRegistryPowerShellPreparer = functools.partial(PowerShellPreparer, "schemaregistry", schemaregistry_endpoint="fake_resource.servicebus.windows.net/", schemaregistry_group="fakegroup")
SchemaRegistryPowerShellPreparer = functools.partial(PowerShellPreparer, "schemaregistry", schemaregistry_fully_qualified_namespace="fake_resource.servicebus.windows.net/", schemaregistry_group="fakegroup")

class SchemaRegistryAvroSerializerTests(AzureTestCase):

Expand Down Expand Up @@ -75,15 +75,16 @@ def test_raw_avro_serializer_negative(self):
raw_avro_object_serializer.serialize(dict_data_missing_required_field, schema)

@SchemaRegistryPowerShellPreparer()
def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistry_endpoint, schemaregistry_group, **kwargs):
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_endpoint)
sr_avro_serializer = SchemaRegistryAvroSerializer(sr_client, schemaregistry_group, auto_register_schemas=True)
def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs):
# TODO: AFTER RELEASING azure-schemaregistry=1.0.0b3, UPDATE 'endpoint' to 'fully_qualified_namespace'
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_fully_qualified_namespace)
sr_avro_serializer = SchemaRegistryAvroSerializer(client=sr_client, group_name=schemaregistry_group, auto_register_schemas=True)

schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
schema = avro.schema.parse(schema_str)

dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
encoded_data = sr_avro_serializer.serialize(dict_data, schema_str)
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache
assert str(avro.schema.parse(schema_str)) in sr_avro_serializer._schema_to_id
Expand All @@ -102,15 +103,16 @@ def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistr
sr_avro_serializer.close()

@SchemaRegistryPowerShellPreparer()
def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregistry_endpoint, schemaregistry_group, **kwargs):
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_endpoint)
sr_avro_serializer = SchemaRegistryAvroSerializer(sr_client, schemaregistry_group)
def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs):
# TODO: AFTER RELEASING azure-schemaregistry=1.0.0b3, UPDATE 'endpoint' to 'fully_qualified_namespace'
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_fully_qualified_namespace)
sr_avro_serializer = SchemaRegistryAvroSerializer(client=sr_client, group_name=schemaregistry_group)

schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
schema = avro.schema.parse(schema_str)

dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
encoded_data = sr_avro_serializer.serialize(dict_data, schema_str)
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache
assert str(avro.schema.parse(schema_str)) in sr_avro_serializer._schema_to_id
Expand Down
2 changes: 2 additions & 0 deletions sdk/schemaregistry/azure-schemaregistry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
- `schema_definition`, which has been renamed from `schema_content`
- `format`, which has been renamed from `serialization_type`
- `endpoint` parameter in `SchemaRegistryClient` constructor has been renamed `fully_qualified_namespace`
- `location` instance variable in `SchemaProperties` has been removed.
- `Schema` and `SchemaProperties` no longer have positional parameters, as they will not be constructed by the user.

### Bugs Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
from typing import Any, Optional
from typing import Any


class SchemaProperties(object):
Expand All @@ -50,13 +50,12 @@ class SchemaProperties(object):

def __init__(
self,
id=None, # pylint:disable=redefined-builtin
**kwargs
):
# type: (Optional[str], Any) -> None
self.id = id
self.format = kwargs.get('format')
self.version = kwargs.get('version')
# type: (Any) -> None
self.id = kwargs.pop('id')
self.format = kwargs.pop('format')
self.version = kwargs.pop('version')


class Schema(object):
Expand All @@ -81,9 +80,8 @@ class Schema(object):

def __init__(
self,
schema_definition,
properties,
**kwargs
):
# type: (str, SchemaProperties) -> None
self.schema_definition = schema_definition
self.properties = properties
# type: (Any) -> None
self.schema_definition = kwargs.pop("schema_definition")
self.properties = kwargs.pop("properties")
Loading

0 comments on commit d32de70

Please sign in to comment.