Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance serializers with use.latest.version config #1133

Merged
merged 4 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions src/confluent_kafka/schema_registry/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ class AvroSerializer(Serializer):
| ``auto.register.schemas`` | bool | previously associated with a particular subject. |
| | | Defaults to True. |
+---------------------------+----------+--------------------------------------------------+
| | | Whether to use the latest subject version for |
| ``use.latest.version`` | bool | serialization. |
| | | WARNING: There is no check that the latest |
| | | schema is backwards compatible with the object |
| | | being serialized. |
| | | Defaults to False. |
+-------------------------------------+----------+----------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``subject.name.strategy`` | callable | Instructs the AvroSerializer on how to construct |
Expand Down Expand Up @@ -131,12 +138,13 @@ class AvroSerializer(Serializer):
conf (dict): AvroSerializer configuration.

""" # noqa: E501
__slots__ = ['_hash', '_auto_register', '_known_subjects', '_parsed_schema',
__slots__ = ['_hash', '_auto_register', '_use_latest_version', '_known_subjects', '_parsed_schema',
'_registry', '_schema', '_schema_id', '_schema_name',
'_subject_name_func', '_to_dict']

# default configuration
_default_conf = {'auto.register.schemas': True,
'use.latest.version': False,
'subject.name.strategy': topic_subject_name_strategy}

def __init__(self, schema_registry_client, schema_str,
Expand All @@ -161,6 +169,12 @@ def __init__(self, schema_registry_client, schema_str,
if not isinstance(self._auto_register, bool):
raise ValueError("auto.register.schemas must be a boolean value")

self._use_latest_version = conf_copy.pop('use.latest.version')
if not isinstance(self._use_latest_version, bool):
raise ValueError("use.latest.version must be a boolean value")
if self._use_latest_version and self._auto_register:
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")

self._subject_name_func = conf_copy.pop('subject.name.strategy')
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")
Expand Down Expand Up @@ -208,18 +222,23 @@ def __call__(self, obj, ctx):

subject = self._subject_name_func(ctx, self._schema_name)

# Check to ensure this schema has been registered under subject_name.
if self._auto_register and subject not in self._known_subjects:
# The schema name will always be the same. We can't however register
# a schema without a subject so we set the schema_id here to handle
# the initial registration.
self._schema_id = self._registry.register_schema(subject,
self._schema)
self._known_subjects.add(subject)
elif not self._auto_register and subject not in self._known_subjects:
registered_schema = self._registry.lookup_schema(subject,
self._schema)
self._schema_id = registered_schema.schema_id
if subject not in self._known_subjects:
if self._use_latest_version:
latest_schema = self._registry.get_latest_version(subject)
self._schema_id = latest_schema.schema_id

else:
# Check to ensure this schema has been registered under subject_name.
if self._auto_register:
# The schema name will always be the same. We can't however register
# a schema without a subject so we set the schema_id here to handle
# the initial registration.
self._schema_id = self._registry.register_schema(subject,
self._schema)
else:
registered_schema = self._registry.lookup_schema(subject,
self._schema)
self._schema_id = registered_schema.schema_id
self._known_subjects.add(subject)

if self._to_dict is not None:
Expand Down
45 changes: 32 additions & 13 deletions src/confluent_kafka/schema_registry/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ class JSONSerializer(Serializer):
| ``auto.register.schemas`` | bool | previously associated with a particular subject. |
| | | Defaults to True. |
+---------------------------+----------+--------------------------------------------------+
| | | Whether to use the latest subject version for |
| ``use.latest.version`` | bool | serialization. |
| | | WARNING: There is no check that the latest |
| | | schema is backwards compatible with the object |
| | | being serialized. |
| | | Defaults to False. |
+-------------------------------------+----------+----------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``subject.name.strategy`` | callable | Instructs the JsonSerializer on how to construct |
Expand Down Expand Up @@ -109,12 +116,13 @@ class JSONSerializer(Serializer):
conf (dict): JsonSerializer configuration.

""" # noqa: E501
__slots__ = ['_hash', '_auto_register', '_known_subjects', '_parsed_schema',
__slots__ = ['_hash', '_auto_register', '_use_latest_version', '_known_subjects', '_parsed_schema',
'_registry', '_schema', '_schema_id', '_schema_name',
'_subject_name_func', '_to_dict']

# default configuration
_default_conf = {'auto.register.schemas': True,
'use.latest.version': False,
'subject.name.strategy': topic_subject_name_strategy}

def __init__(self, schema_str, schema_registry_client, to_dict=None,
Expand All @@ -139,6 +147,12 @@ def __init__(self, schema_str, schema_registry_client, to_dict=None,
if not isinstance(self._auto_register, bool):
raise ValueError("auto.register.schemas must be a boolean value")

self._use_latest_version = conf_copy.pop('use.latest.version')
if not isinstance(self._use_latest_version, bool):
raise ValueError("use.latest.version must be a boolean value")
if self._use_latest_version and self._auto_register:
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")

self._subject_name_func = conf_copy.pop('subject.name.strategy')
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")
Expand Down Expand Up @@ -182,18 +196,23 @@ def __call__(self, obj, ctx):

subject = self._subject_name_func(ctx, self._schema_name)

# Check to ensure this schema has been registered under subject_name.
if self._auto_register and subject not in self._known_subjects:
# The schema name will always be the same. We can't however register
# a schema without a subject so we set the schema_id here to handle
# the initial registration.
self._schema_id = self._registry.register_schema(subject,
self._schema)
self._known_subjects.add(subject)
elif not self._auto_register and subject not in self._known_subjects:
registered_schema = self._registry.lookup_schema(subject,
self._schema)
self._schema_id = registered_schema.schema_id
if subject not in self._known_subjects:
if self._use_latest_version:
latest_schema = self._registry.get_latest_version(subject)
self._schema_id = latest_schema.schema_id

else:
# Check to ensure this schema has been registered under subject_name.
if self._auto_register:
# The schema name will always be the same. We can't however register
# a schema without a subject so we set the schema_id here to handle
# the initial registration.
self._schema_id = self._registry.register_schema(subject,
self._schema)
else:
registered_schema = self._registry.lookup_schema(subject,
self._schema)
self._schema_id = registered_schema.schema_id
self._known_subjects.add(subject)

if self._to_dict is not None:
Expand Down
49 changes: 41 additions & 8 deletions src/confluent_kafka/schema_registry/protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ class ProtobufSerializer(object):
| ``auto.register.schemas`` | bool | previously associated with a particular subject. |
| | | Defaults to True. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to use the latest subject version for |
| ``use.latest.version`` | bool | serialization. |
| | | WARNING: There is no check that the latest |
| | | schema is backwards compatible with the object |
| | | being serialized. |
| | | Defaults to False. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to skip known types when resolving schema |
| ``skip.known.types`` | bool | dependencies. |
| | | Defaults to False. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``subject.name.strategy`` | callable | Instructs the ProtobufSerializer on how to construct |
Expand Down Expand Up @@ -194,12 +205,15 @@ class ProtobufSerializer(object):
`Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_

""" # noqa: E501
__slots__ = ['_auto_register', '_registry', '_known_subjects',
__slots__ = ['_auto_register', '_use_latest_version', '_skip_known_types',
'_registry', '_known_subjects',
'_msg_class', '_msg_index', '_schema', '_schema_id',
'_ref_reference_subject_func', '_subject_name_func']
# default configuration
_default_conf = {
'auto.register.schemas': True,
'use.latest.version': False,
'skip.known.types': False,
'subject.name.strategy': topic_subject_name_strategy,
'reference.subject.name.strategy': reference_subject_name_strategy
}
Expand All @@ -214,6 +228,16 @@ def __init__(self, msg_type, schema_registry_client, conf=None):
if not isinstance(self._auto_register, bool):
raise ValueError("auto.register.schemas must be a boolean value")

self._use_latest_version = conf_copy.pop('use.latest.version')
if not isinstance(self._use_latest_version, bool):
raise ValueError("use.latest.version must be a boolean value")
if self._use_latest_version and self._auto_register:
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")

self._skip_known_types = conf_copy.pop('skip.known.types')
if not isinstance(self._skip_known_types, bool):
raise ValueError("skip.known.types must be a boolean value")

self._subject_name_func = conf_copy.pop('subject.name.strategy')
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")
Expand Down Expand Up @@ -266,6 +290,8 @@ def _resolve_dependencies(self, ctx, file_desc):
"""
schema_refs = []
for dep in file_desc.dependencies:
if self._skip_known_types and dep.name.startswith("google/protobuf/"):
continue
dep_refs = self._resolve_dependencies(ctx, dep)
subject = self._ref_reference_subject_func(ctx, dep)
schema = Schema(_schema_to_str(dep),
Expand Down Expand Up @@ -313,15 +339,22 @@ def __call__(self, message_type, ctx):
message_type.DESCRIPTOR.full_name)

if subject not in self._known_subjects:
self._schema.references = self._resolve_dependencies(
ctx, message_type.DESCRIPTOR.file)
if self._use_latest_version:
latest_schema = self._registry.get_latest_version(subject)
self._schema_id = latest_schema.schema_id

if self._auto_register:
self._schema_id = self._registry.register_schema(subject,
self._schema)
else:
self._schema_id = self._registry.lookup_schema(
subject, self._schema).schema_id
self._schema.references = self._resolve_dependencies(
ctx, message_type.DESCRIPTOR.file)

if self._auto_register:
self._schema_id = self._registry.register_schema(subject,
self._schema)
else:
self._schema_id = self._registry.lookup_schema(
subject, self._schema).schema_id

self._known_subjects.add(subject)

with _ContextStringIO() as fo:
# Write the magic byte and schema ID in network byte order
Expand Down
14 changes: 8 additions & 6 deletions tests/schema_registry/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,13 @@ def get_subject_version_callback(self, request, context):
path_match = re.match(self.subject_versions, request.path)
subject = path_match.group(1)
version = path_match.group(2)
version_num = -1 if version == 'latest' else int(version)

if int(version) == 404:
if version_num == 404:
context.status_code = 404
return {'error_code': 40402,
'message': "Version not found"}
if int(version) == 422:
if version_num == 422:
context.status_code = 422
return {'error_code': 42202,
'message': "Invalid version"}
Expand All @@ -313,7 +314,7 @@ def get_subject_version_callback(self, request, context):
context.status_code = 200
return {'subject': subject,
'id': self.SCHEMA_ID,
'version': int(version),
'version': version_num,
'schema': self._load_avsc(self.SCHEMA)}

def delete_subject_version_callback(self, request, context):
Expand All @@ -322,13 +323,14 @@ def delete_subject_version_callback(self, request, context):
path_match = re.match(self.subject_versions, request.path)
subject = path_match.group(1)
version = path_match.group(2)
version_num = -1 if version == 'latest' else int(version)

if int(version) == 404:
if version_num == 404:
context.status_code = 404
return {"error_code": 40402,
"message": "Version not found"}

if int(version) == 422:
if version_num == 422:
context.status_code = 422
return {"error_code": 42202,
"message": "Invalid version"}
Expand All @@ -339,7 +341,7 @@ def delete_subject_version_callback(self, request, context):
"message": "Subject not found"}

context.status_code = 200
return int(version)
return version_num

def post_subject_version_callback(self, request, context):
self.counter['POST'][request.path] += 1
Expand Down
23 changes: 23 additions & 0 deletions tests/schema_registry/test_avro_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,29 @@ def test_avro_serializer_config_auto_register_schemas_false(mock_schema_registry
assert test_client.counter['POST'].get('/subjects/{}'.format(subject)) == 1


def test_avro_serializer_config_use_latest_version(mock_schema_registry):
"""
Ensures auto.register.schemas=False does not register schema
"""
conf = {'url': TEST_URL}
test_client = mock_schema_registry(conf)
topic = "test-use-latest-version"
subject = topic + '-key'

test_serializer = AvroSerializer(test_client, 'string',
conf={'auto.register.schemas': False, 'use.latest.version': True})

test_serializer("test",
SerializationContext("test-use-latest-version",
MessageField.KEY))

register_count = test_client.counter['POST'].get('/subjects/{}/versions'
.format(subject), 0)
assert register_count == 0
# Ensure latest was requested
assert test_client.counter['GET'].get('/subjects/{}/versions/latest'.format(subject)) == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should really be more test coverage of this stuff.
but i'm ok with the effort/benefit tradeoff.



def test_avro_serializer_config_subject_name_strategy():
"""
Ensures subject.name.strategy is applied
Expand Down