Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-10484 Improve SR caching on Python client #1744

Merged
merged 3 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 116 additions & 26 deletions src/confluent_kafka/schema_registry/schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,6 @@ def set(self, schema_id, schema, subject_name=None):
schema (Schema): Schema instance

subject_name(str): Optional, subject schema is registered under

Returns:
int: The schema_id
"""

with self.lock:
Expand All @@ -229,7 +226,8 @@ def get_schema(self, schema_id):
Schema: The schema if known; else None
"""

return self.schema_id_index.get(schema_id, None)
with self.lock:
return self.schema_id_index.get(schema_id, None)

def get_schema_id_by_subject(self, subject, schema):
"""
Expand All @@ -249,6 +247,73 @@ def get_schema_id_by_subject(self, subject, schema):
return self.schema_index.get(schema, None)


class _RegisteredSchemaCache(object):
"""
Thread-safe cache for use with the Schema Registry Client.

This cache may be used to retrieve registered schemas based on subject_name/version/schema
- Get registered schema based on subject name + version
- Get registered schema based on subject name + schema
"""

def __init__(self):
self.lock = Lock()
self.schema_version_index = defaultdict(dict)
self.schema_index = defaultdict(dict)

def set(self, subject_name, schema, version, registered_schema):
"""
Add a Schema identified by schema_id to the cache.

Args:
subject_name (str): The subject name this registered schema is associated with

schema (Schema): The schema this registered schema is associated with

version (int): The version this registered schema is associated with

registered_schema (RegisteredSchema): The registered schema instance
"""

with self.lock:
if schema is not None:
self.schema_index[subject_name][schema] = registered_schema
elif version is not None:
self.schema_version_index[subject_name][version] = registered_schema

def get_registered_schema_by_version(self, subject_name, version):
"""
Get the registered schema instance associated with version from the cache.

Args:
subject_name (str): The subject name this registered schema is associated with

version (int): The version this registered schema is associated with

Returns:
RegisteredSchema: The registered schema if known; else None
"""

with self.lock:
return self.schema_version_index.get(subject_name, {}).get(version, None)

def get_registered_schema_by_schema(self, subject_name, schema):
"""
Get the registered schema instance associated with schema from the cache.

Args:
subject_name (str): The subject name this registered schema is associated with

schema (Schema): The schema this registered schema is associated with

Returns:
RegisteredSchema: The registered schema if known; else None
"""

with self.lock:
return self.schema_index.get(subject_name, {}).get(schema, None)


class SchemaRegistryClient(object):
"""
A Confluent Schema Registry client.
Expand Down Expand Up @@ -292,6 +357,7 @@ class SchemaRegistryClient(object):
def __init__(self, conf):
self._rest_client = _RestClient(conf)
self._cache = _SchemaCache()
self._metadata_cache = _RegisteredSchemaCache()

def __enter__(self):
return self
Expand Down Expand Up @@ -398,6 +464,10 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False):
`POST Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_
""" # noqa: E501

registered_schema = self._metadata_cache.get_registered_schema_by_schema(subject_name, schema)
if registered_schema is not None:
return registered_schema

request = {'schema': schema.schema_str}

# CP 5.5 adds new fields (for JSON and Protobuf).
Expand All @@ -414,17 +484,25 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False):

schema_type = response.get('schemaType', 'AVRO')

return RegisteredSchema(schema_id=response['id'],
schema=Schema(response['schema'],
schema_type,
[
SchemaReference(name=ref['name'],
subject=ref['subject'],
version=ref['version'])
for ref in response.get('references', [])
]),
subject=response['subject'],
version=response['version'])
registered_schema = RegisteredSchema(
schema_id=response['id'],
schema=Schema(
response['schema'],
schema_type,
[
SchemaReference(
name=ref['name'],
subject=ref['subject'],
version=ref['version']
) for ref in response.get('references', [])
]
),
subject=response['subject'],
version=response['version']
)
self._metadata_cache.set(subject_name, schema, None, registered_schema)

return registered_schema

def get_subjects(self):
"""
Expand Down Expand Up @@ -524,22 +602,34 @@ def get_version(self, subject_name, version):
`GET Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
""" # noqa: E501

registered_schema = self._metadata_cache.get_registered_schema_by_version(subject_name, version)
if registered_schema is not None:
return registered_schema

response = self._rest_client.get('subjects/{}/versions/{}'
.format(_urlencode(subject_name),
version))

schema_type = response.get('schemaType', 'AVRO')
return RegisteredSchema(schema_id=response['id'],
schema=Schema(response['schema'],
schema_type,
[
SchemaReference(name=ref['name'],
subject=ref['subject'],
version=ref['version'])
for ref in response.get('references', [])
]),
subject=response['subject'],
version=response['version'])
registered_schema = RegisteredSchema(
schema_id=response['id'],
schema=Schema(
response['schema'],
schema_type,
[
SchemaReference(
name=ref['name'],
subject=ref['subject'],
version=ref['version']
) for ref in response.get('references', [])
]
),
subject=response['subject'],
version=response['version']
)
self._metadata_cache.set(subject_name, None, version, registered_schema)

return registered_schema

def get_versions(self, subject_name):
"""
Expand Down
Empty file modified tools/source-package-verification.sh
100644 → 100755
Empty file.