Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SchemaRegistry] add lru cache to avro serializer #20813

Merged
26 commits merged into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from io import BytesIO
from typing import Any, Dict, Mapping
import avro
Expand Down Expand Up @@ -59,8 +63,6 @@ def __init__(self, **kwargs):
if self._auto_register_schemas
else self._schema_registry_client.get_schema_id
)
self._id_to_schema = {}
self._schema_to_id = {}
self._user_input_schema_cache = {}

def __enter__(self):
Expand All @@ -79,8 +81,9 @@ def close(self):
"""
self._schema_registry_client.close()

def _get_schema_id(self, schema_name, schema, **kwargs):
# type: (str, avro.schema.Schema, Any) -> str
@lru_cache(maxsize=128)
swathipil marked this conversation as resolved.
Show resolved Hide resolved
def _get_schema_id(self, schema_name, schema_str, **kwargs):
# type: (str, str, Any) -> str
"""
Get schema id from local cache with the given schema.
If there is no item in the local cache, get schema id from the service and cache it.
Expand All @@ -92,17 +95,12 @@ def _get_schema_id(self, schema_name, schema, **kwargs):
:return: Schema Id
:rtype: str
"""
schema_str = str(schema)
try:
return self._schema_to_id[schema_str]
except KeyError:
schema_id = self._auto_register_schema_func(
self._schema_group, schema_name, "Avro", schema_str, **kwargs
).schema_id
self._schema_to_id[schema_str] = schema_id
self._id_to_schema[schema_id] = schema_str
return schema_id
schema_id = self._auto_register_schema_func(
self._schema_group, schema_name, "Avro", schema_str, **kwargs
).schema_id
return schema_id

@lru_cache(maxsize=128)
def _get_schema(self, schema_id, **kwargs):
# type: (str, Any) -> str
"""
Expand All @@ -112,15 +110,10 @@ def _get_schema(self, schema_id, **kwargs):
:param str schema_id: Schema id
:return: Schema content
"""
try:
return self._id_to_schema[schema_id]
except KeyError:
schema_str = self._schema_registry_client.get_schema(
schema_id, **kwargs
).schema_content
self._id_to_schema[schema_id] = schema_str
self._schema_to_id[schema_str] = schema_id
return schema_str
schema_str = self._schema_registry_client.get_schema(
schema_id, **kwargs
).schema_content
return schema_str

def serialize(self, value, **kwargs):
# type: (Mapping[str, Any], Any) -> bytes
Expand All @@ -147,7 +140,7 @@ def serialize(self, value, **kwargs):
cached_schema = parsed_schema

record_format_identifier = b"\0\0\0\0"
schema_id = self._get_schema_id(cached_schema.fullname, cached_schema, **kwargs)
schema_id = self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why cast cached_schema to be type of string here?


I'm thinking of the accepted type of input schema here based on the discussion we just had today -- hide apache avro from the surfaces areas.

maybe we could start by just supporting "str" first, because accepting bytes brings the problem of decoding bytes into string.

do we know what type fastavro is expecting for the schema, string, bytes or dict?

Copy link
Member Author

@swathipil swathipil Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lru_cache does not work when the parameters are mutable (i.e., avro.schema.Schema), and we cast schema into a string in _get_schema-id anyway and don't every use the actual Schema object for anything.


I think removing Schema as an input type is a good idea (in the separate PR for removing avro leaks), but just wondering if this is part of the surface? since this serialize method only accepts str/bytes and it's only internally that we convert/pass around avro.schema.Schema.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only support str as this aligns with other languages. This will already be addressed in this PR: https://github.com/Azure/azure-sdk-for-python/pull/20763/files.
Merge this PR after.

Copy link
Member Author

@swathipil swathipil Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keeping everything, except for bytes as a type for schema. that will be addressed in another PR which will be merged first. #20763

I can deal with removing the avro.schema.Schema in the other PR for addressing types being leaked.

data_bytes = self._avro_serializer.serialize(value, cached_schema)

stream = BytesIO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer

TENANT_ID=os.environ['SCHEMA_REGISTRY_AZURE_TENANT_ID']
CLIENT_ID=os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_ID']
CLIENT_SECRET=os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_SECRET']
TENANT_ID=os.environ['AZURE_TENANT_ID']
CLIENT_ID=os.environ['AZURE_CLIENT_ID']
CLIENT_SECRET=os.environ['AZURE_CLIENT_SECRET']

SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE=os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME=os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE=os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME=os.environ['SCHEMAREGISTRY_GROUP']
SCHEMA_STRING = """
{"namespace": "example.avro",
"type": "record",
Expand Down Expand Up @@ -79,7 +79,7 @@ def deserialize(serializer, bytes_payload):


if __name__ == '__main__':
schema_registry = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential)
schema_registry = SchemaRegistryClient(endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential)
serializer = SchemaRegistryAvroSerializer(client=schema_registry, group_name=GROUP_NAME, auto_register_schemas=True)
bytes_data_ben, bytes_data_alice = serialize(serializer)
dict_data_ben = deserialize(serializer, bytes_data_ben)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP']


def on_event(partition_context, event):
Expand Down Expand Up @@ -48,7 +48,7 @@ def on_event(partition_context, event):
# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace'
avro_serializer = SchemaRegistryAvroSerializer(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE,
endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE,
credential=DefaultAzureCredential()
),
group_name=GROUP_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP']

SCHEMA_STRING = """
{"namespace": "example.avro",
Expand Down Expand Up @@ -62,7 +62,7 @@ def send_event_data_batch(producer, serializer):
# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace'
avro_serializer = SchemaRegistryAvroSerializer(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE,
endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE,
credential=DefaultAzureCredential()
),
group_name=GROUP_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# -------------------------------------------------------------------------

import re
import sys
import os.path
from io import open
from setuptools import find_packages, setup
Expand Down Expand Up @@ -39,6 +40,12 @@
'azure.schemaregistry',
'azure.schemaregistry.serializer'
]
install_packages = [
'azure-schemaregistry==1.0.0b2',
'avro<2.0.0,>=1.10.0'
]
if sys.version_info < (3,0):
install_packages.append('backports.functools-lru-cache>=1.6.4')

setup(
name=PACKAGE_NAME,
Expand All @@ -64,8 +71,5 @@
],
zip_safe=False,
packages=find_packages(exclude=exclude_packages),
install_requires=[
'azure-schemaregistry==1.0.0b2',
'avro<2.0.0,>=1.10.0'
]
install_requires=install_packages
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"fc61e4d3e31b46f6a758fa1b67f35cc5"}'
string: '{"id":"f666e373299048fabaa4296f5dbfed46"}'
headers:
content-type:
- application/json
date:
- Fri, 24 Sep 2021 19:54:45 GMT
- Tue, 28 Sep 2021 22:27:25 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- fc61e4d3e31b46f6a758fa1b67f35cc5
- f666e373299048fabaa4296f5dbfed46
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/fc61e4d3e31b46f6a758fa1b67f35cc5?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"fc61e4d3e31b46f6a758fa1b67f35cc5"}'
string: '{"id":"f666e373299048fabaa4296f5dbfed46"}'
headers:
content-type:
- application/json
date:
- Fri, 24 Sep 2021 19:54:47 GMT
- Tue, 28 Sep 2021 22:27:26 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- fc61e4d3e31b46f6a758fa1b67f35cc5
- f666e373299048fabaa4296f5dbfed46
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/fc61e4d3e31b46f6a758fa1b67f35cc5?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,11 @@ def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistr
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache
assert str(avro.schema.parse(schema_str)) in sr_avro_serializer._schema_to_id

assert encoded_data[0:4] == b'\0\0\0\0'
schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id
assert encoded_data[4:36] == schema_id.encode("utf-8")

assert schema_id in sr_avro_serializer._id_to_schema

decoded_data = sr_avro_serializer.deserialize(encoded_data)
assert decoded_data["name"] == u"Ben"
assert decoded_data["favorite_number"] == 7
Expand All @@ -115,14 +112,11 @@ def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregi
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache
assert str(avro.schema.parse(schema_str)) in sr_avro_serializer._schema_to_id

assert encoded_data[0:4] == b'\0\0\0\0'
schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id
assert encoded_data[4:36] == schema_id.encode("utf-8")

assert schema_id in sr_avro_serializer._id_to_schema

decoded_data = sr_avro_serializer.deserialize(encoded_data)
assert decoded_data["name"] == u"Ben"
assert decoded_data["favorite_number"] == 7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@

def create_client():
# [START create_sr_client_async]
SCHEMA_REGISTRY_FQN = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
SCHEMAREGISTRY_FQN = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
token_credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=SCHEMA_REGISTRY_FQN, credential=token_credential)
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=SCHEMAREGISTRY_FQN, credential=token_credential)
# [END create_sr_client_async]
TENANT_ID = os.environ['SCHEMA_REGISTRY_AZURE_TENANT_ID']
CLIENT_ID = os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_ID']
CLIENT_SECRET = os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_SECRET']
TENANT_ID = os.environ['AZURE_TENANT_ID']
CLIENT_ID = os.environ['AZURE_CLIENT_ID']
CLIENT_SECRET = os.environ['AZURE_CLIENT_SECRET']
token_credential = ClientSecretCredential(TENANT_ID, CLIENT_ID, CLIENT_SECRET)
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=SCHEMA_REGISTRY_FQN, credential=token_credential)
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=SCHEMAREGISTRY_FQN, credential=token_credential)
return schema_registry_client, token_credential


async def register_schema(schema_registry_client):
# [START register_schema_async]
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP']
NAME = 'your-schema-name'
FORMAT = SchemaFormat.AVRO
SCHEMA_DEFINITION = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
Expand All @@ -66,7 +66,7 @@ async def get_schema(schema_registry_client, id):


async def get_schema_id(schema_registry_client):
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = 'your-schema-name'
format = SchemaFormat.AVRO
schema_definition = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
Expand Down
Loading