Skip to content

Commit

Permalink
[SchemaRegistry] add lru cache to avro serializer (#20813)
Browse files Browse the repository at this point in the history
fixes: #20712
performance comparison results: #20712 (comment)
  • Loading branch information
swathipil authored Sep 30, 2021
1 parent 1bc3354 commit 6e10afe
Show file tree
Hide file tree
Showing 24 changed files with 174 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from io import BytesIO
from typing import Any, Dict, Mapping
import avro
Expand Down Expand Up @@ -59,8 +63,6 @@ def __init__(self, **kwargs):
if self._auto_register_schemas
else self._schema_registry_client.get_schema_id
)
self._id_to_schema = {}
self._schema_to_id = {}
self._user_input_schema_cache = {}

def __enter__(self):
Expand All @@ -79,8 +81,9 @@ def close(self):
"""
self._schema_registry_client.close()

def _get_schema_id(self, schema_name, schema, **kwargs):
# type: (str, avro.schema.Schema, Any) -> str
@lru_cache(maxsize=128)
def _get_schema_id(self, schema_name, schema_str, **kwargs):
# type: (str, str, Any) -> str
"""
Get schema id from local cache with the given schema.
If there is no item in the local cache, get schema id from the service and cache it.
Expand All @@ -92,17 +95,12 @@ def _get_schema_id(self, schema_name, schema, **kwargs):
:return: Schema Id
:rtype: str
"""
schema_str = str(schema)
try:
return self._schema_to_id[schema_str]
except KeyError:
schema_id = self._auto_register_schema_func(
self._schema_group, schema_name, "Avro", schema_str, **kwargs
).schema_id
self._schema_to_id[schema_str] = schema_id
self._id_to_schema[schema_id] = schema_str
return schema_id
schema_id = self._auto_register_schema_func(
self._schema_group, schema_name, "Avro", schema_str, **kwargs
).schema_id
return schema_id

@lru_cache(maxsize=128)
def _get_schema(self, schema_id, **kwargs):
# type: (str, Any) -> str
"""
Expand All @@ -112,15 +110,10 @@ def _get_schema(self, schema_id, **kwargs):
:param str schema_id: Schema id
:return: Schema content
"""
try:
return self._id_to_schema[schema_id]
except KeyError:
schema_str = self._schema_registry_client.get_schema(
schema_id, **kwargs
).schema_content
self._id_to_schema[schema_id] = schema_str
self._schema_to_id[schema_str] = schema_id
return schema_str
schema_str = self._schema_registry_client.get_schema(
schema_id, **kwargs
).schema_content
return schema_str

def serialize(self, value, **kwargs):
# type: (Mapping[str, Any], Any) -> bytes
Expand All @@ -147,7 +140,7 @@ def serialize(self, value, **kwargs):
cached_schema = parsed_schema

record_format_identifier = b"\0\0\0\0"
schema_id = self._get_schema_id(cached_schema.fullname, cached_schema, **kwargs)
schema_id = self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs)
data_bytes = self._avro_serializer.serialize(value, cached_schema)

stream = BytesIO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer

TENANT_ID=os.environ['SCHEMA_REGISTRY_AZURE_TENANT_ID']
CLIENT_ID=os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_ID']
CLIENT_SECRET=os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_SECRET']
TENANT_ID=os.environ['AZURE_TENANT_ID']
CLIENT_ID=os.environ['AZURE_CLIENT_ID']
CLIENT_SECRET=os.environ['AZURE_CLIENT_SECRET']

SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE=os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME=os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE=os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME=os.environ['SCHEMAREGISTRY_GROUP']
SCHEMA_STRING = """
{"namespace": "example.avro",
"type": "record",
Expand Down Expand Up @@ -79,7 +79,7 @@ def deserialize(serializer, bytes_payload):


if __name__ == '__main__':
schema_registry = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential)
schema_registry = SchemaRegistryClient(endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential)
serializer = SchemaRegistryAvroSerializer(client=schema_registry, group_name=GROUP_NAME, auto_register_schemas=True)
bytes_data_ben, bytes_data_alice = serialize(serializer)
dict_data_ben = deserialize(serializer, bytes_data_ben)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP']


def on_event(partition_context, event):
Expand Down Expand Up @@ -48,7 +48,7 @@ def on_event(partition_context, event):
# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace'
avro_serializer = SchemaRegistryAvroSerializer(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE,
endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE,
credential=DefaultAzureCredential()
),
group_name=GROUP_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP']

SCHEMA_STRING = """
{"namespace": "example.avro",
Expand Down Expand Up @@ -62,7 +62,7 @@ def send_event_data_batch(producer, serializer):
# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace'
avro_serializer = SchemaRegistryAvroSerializer(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE,
endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE,
credential=DefaultAzureCredential()
),
group_name=GROUP_NAME,
Expand Down
12 changes: 8 additions & 4 deletions sdk/schemaregistry/azure-schemaregistry-avroserializer/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# -------------------------------------------------------------------------

import re
import sys
import os.path
from io import open
from setuptools import find_packages, setup
Expand Down Expand Up @@ -39,6 +40,12 @@
'azure.schemaregistry',
'azure.schemaregistry.serializer'
]
install_packages = [
'azure-schemaregistry==1.0.0b2',
'avro<2.0.0,>=1.10.0'
]
if sys.version_info < (3,0):
install_packages.append('backports.functools-lru-cache>=1.6.4')

setup(
name=PACKAGE_NAME,
Expand All @@ -64,8 +71,5 @@
],
zip_safe=False,
packages=find_packages(exclude=exclude_packages),
install_requires=[
'azure-schemaregistry==1.0.0b2',
'avro<2.0.0,>=1.10.0'
]
install_requires=install_packages
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"fc61e4d3e31b46f6a758fa1b67f35cc5"}'
string: '{"id":"f666e373299048fabaa4296f5dbfed46"}'
headers:
content-type:
- application/json
date:
- Fri, 24 Sep 2021 19:54:45 GMT
- Tue, 28 Sep 2021 22:27:25 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- fc61e4d3e31b46f6a758fa1b67f35cc5
- f666e373299048fabaa4296f5dbfed46
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/fc61e4d3e31b46f6a758fa1b67f35cc5?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"fc61e4d3e31b46f6a758fa1b67f35cc5"}'
string: '{"id":"f666e373299048fabaa4296f5dbfed46"}'
headers:
content-type:
- application/json
date:
- Fri, 24 Sep 2021 19:54:47 GMT
- Tue, 28 Sep 2021 22:27:26 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- fc61e4d3e31b46f6a758fa1b67f35cc5
- f666e373299048fabaa4296f5dbfed46
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/fc61e4d3e31b46f6a758fa1b67f35cc5?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,11 @@ def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistr
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache
assert str(avro.schema.parse(schema_str)) in sr_avro_serializer._schema_to_id

assert encoded_data[0:4] == b'\0\0\0\0'
schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id
assert encoded_data[4:36] == schema_id.encode("utf-8")

assert schema_id in sr_avro_serializer._id_to_schema

decoded_data = sr_avro_serializer.deserialize(encoded_data)
assert decoded_data["name"] == u"Ben"
assert decoded_data["favorite_number"] == 7
Expand All @@ -115,14 +112,11 @@ def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregi
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache
assert str(avro.schema.parse(schema_str)) in sr_avro_serializer._schema_to_id

assert encoded_data[0:4] == b'\0\0\0\0'
schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id
assert encoded_data[4:36] == schema_id.encode("utf-8")

assert schema_id in sr_avro_serializer._id_to_schema

decoded_data = sr_avro_serializer.deserialize(encoded_data)
assert decoded_data["name"] == u"Ben"
assert decoded_data["favorite_number"] == 7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@

def create_client():
# [START create_sr_client_async]
SCHEMA_REGISTRY_FQN = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
SCHEMAREGISTRY_FQN = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
token_credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=SCHEMA_REGISTRY_FQN, credential=token_credential)
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=SCHEMAREGISTRY_FQN, credential=token_credential)
# [END create_sr_client_async]
TENANT_ID = os.environ['SCHEMA_REGISTRY_AZURE_TENANT_ID']
CLIENT_ID = os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_ID']
CLIENT_SECRET = os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_SECRET']
TENANT_ID = os.environ['AZURE_TENANT_ID']
CLIENT_ID = os.environ['AZURE_CLIENT_ID']
CLIENT_SECRET = os.environ['AZURE_CLIENT_SECRET']
token_credential = ClientSecretCredential(TENANT_ID, CLIENT_ID, CLIENT_SECRET)
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=SCHEMA_REGISTRY_FQN, credential=token_credential)
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=SCHEMAREGISTRY_FQN, credential=token_credential)
return schema_registry_client, token_credential


async def register_schema(schema_registry_client):
# [START register_schema_async]
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']
GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP']
NAME = 'your-schema-name'
FORMAT = SchemaFormat.AVRO
SCHEMA_DEFINITION = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
Expand All @@ -66,7 +66,7 @@ async def get_schema(schema_registry_client, id):


async def get_schema_id(schema_registry_client):
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = 'your-schema-name'
format = SchemaFormat.AVRO
schema_definition = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
Expand Down
Loading

0 comments on commit 6e10afe

Please sign in to comment.