diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py index 0a480e94c..b0ad16278 100644 --- a/src/confluent_kafka/schema_registry/avro.py +++ b/src/confluent_kafka/schema_registry/avro.py @@ -83,6 +83,13 @@ class AvroSerializer(Serializer): | ``auto.register.schemas`` | bool | previously associated with a particular subject. | | | | Defaults to True. | +---------------------------+----------+--------------------------------------------------+ + | | | Whether to use the latest subject version for | + | ``use.latest.version`` | bool | serialization. | + | | | WARNING: There is no check that the latest | + | | | schema is backwards compatible with the object | + | | | being serialized. | + | | | Defaults to False. | + +-------------------------------------+----------+----------------------------------------+ | | | Callable(SerializationContext, str) -> str | | | | | | ``subject.name.strategy`` | callable | Instructs the AvroSerializer on how to construct | @@ -131,12 +138,13 @@ class AvroSerializer(Serializer): conf (dict): AvroSerializer configuration. """ # noqa: E501 - __slots__ = ['_hash', '_auto_register', '_known_subjects', '_parsed_schema', + __slots__ = ['_hash', '_auto_register', '_use_latest_version', '_known_subjects', '_parsed_schema', '_registry', '_schema', '_schema_id', '_schema_name', '_subject_name_func', '_to_dict'] # default configuration _default_conf = {'auto.register.schemas': True, + 'use.latest.version': False, 'subject.name.strategy': topic_subject_name_strategy} def __init__(self, schema_registry_client, schema_str, @@ -161,6 +169,12 @@ def __init__(self, schema_registry_client, schema_str, if not isinstance(self._auto_register, bool): raise ValueError("auto.register.schemas must be a boolean value") + self._use_latest_version = conf_copy.pop('use.latest.version') + if not isinstance(self._use_latest_version, bool): + raise ValueError("use.latest.version must be a boolean value") + if self._use_latest_version and self._auto_register: + raise ValueError("cannot enable both use.latest.version and auto.register.schemas") + self._subject_name_func = conf_copy.pop('subject.name.strategy') if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") @@ -208,18 +222,23 @@ def __call__(self, obj, ctx): subject = self._subject_name_func(ctx, self._schema_name) - # Check to ensure this schema has been registered under subject_name. - if self._auto_register and subject not in self._known_subjects: - # The schema name will always be the same. We can't however register - # a schema without a subject so we set the schema_id here to handle - # the initial registration. - self._schema_id = self._registry.register_schema(subject, - self._schema) - self._known_subjects.add(subject) - elif not self._auto_register and subject not in self._known_subjects: - registered_schema = self._registry.lookup_schema(subject, - self._schema) - self._schema_id = registered_schema.schema_id + if subject not in self._known_subjects: + if self._use_latest_version: + latest_schema = self._registry.get_latest_version(subject) + self._schema_id = latest_schema.schema_id + + else: + # Check to ensure this schema has been registered under subject_name. + if self._auto_register: + # The schema name will always be the same. We can't however register + # a schema without a subject so we set the schema_id here to handle + # the initial registration. + self._schema_id = self._registry.register_schema(subject, + self._schema) + else: + registered_schema = self._registry.lookup_schema(subject, + self._schema) + self._schema_id = registered_schema.schema_id self._known_subjects.add(subject) if self._to_dict is not None: diff --git a/src/confluent_kafka/schema_registry/json_schema.py b/src/confluent_kafka/schema_registry/json_schema.py index 1e702df04..f5e31820b 100644 --- a/src/confluent_kafka/schema_registry/json_schema.py +++ b/src/confluent_kafka/schema_registry/json_schema.py @@ -59,6 +59,13 @@ class JSONSerializer(Serializer): | ``auto.register.schemas`` | bool | previously associated with a particular subject. | | | | Defaults to True. | +---------------------------+----------+--------------------------------------------------+ + | | | Whether to use the latest subject version for | + | ``use.latest.version`` | bool | serialization. | + | | | WARNING: There is no check that the latest | + | | | schema is backwards compatible with the object | + | | | being serialized. | + | | | Defaults to False. | + +-------------------------------------+----------+----------------------------------------+ | | | Callable(SerializationContext, str) -> str | | | | | | ``subject.name.strategy`` | callable | Instructs the JsonSerializer on how to construct | @@ -109,12 +116,13 @@ class JSONSerializer(Serializer): conf (dict): JsonSerializer configuration. """ # noqa: E501 - __slots__ = ['_hash', '_auto_register', '_known_subjects', '_parsed_schema', + __slots__ = ['_hash', '_auto_register', '_use_latest_version', '_known_subjects', '_parsed_schema', '_registry', '_schema', '_schema_id', '_schema_name', '_subject_name_func', '_to_dict'] # default configuration _default_conf = {'auto.register.schemas': True, + 'use.latest.version': False, 'subject.name.strategy': topic_subject_name_strategy} def __init__(self, schema_str, schema_registry_client, to_dict=None, @@ -139,6 +147,12 @@ def __init__(self, schema_str, schema_registry_client, to_dict=None, if not isinstance(self._auto_register, bool): raise ValueError("auto.register.schemas must be a boolean value") + self._use_latest_version = conf_copy.pop('use.latest.version') + if not isinstance(self._use_latest_version, bool): + raise ValueError("use.latest.version must be a boolean value") + if self._use_latest_version and self._auto_register: + raise ValueError("cannot enable both use.latest.version and auto.register.schemas") + self._subject_name_func = conf_copy.pop('subject.name.strategy') if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") @@ -182,18 +196,23 @@ def __call__(self, obj, ctx): subject = self._subject_name_func(ctx, self._schema_name) - # Check to ensure this schema has been registered under subject_name. - if self._auto_register and subject not in self._known_subjects: - # The schema name will always be the same. We can't however register - # a schema without a subject so we set the schema_id here to handle - # the initial registration. - self._schema_id = self._registry.register_schema(subject, - self._schema) - self._known_subjects.add(subject) - elif not self._auto_register and subject not in self._known_subjects: - registered_schema = self._registry.lookup_schema(subject, - self._schema) - self._schema_id = registered_schema.schema_id + if subject not in self._known_subjects: + if self._use_latest_version: + latest_schema = self._registry.get_latest_version(subject) + self._schema_id = latest_schema.schema_id + + else: + # Check to ensure this schema has been registered under subject_name. + if self._auto_register: + # The schema name will always be the same. We can't however register + # a schema without a subject so we set the schema_id here to handle + # the initial registration. + self._schema_id = self._registry.register_schema(subject, + self._schema) + else: + registered_schema = self._registry.lookup_schema(subject, + self._schema) + self._schema_id = registered_schema.schema_id self._known_subjects.add(subject) if self._to_dict is not None: diff --git a/src/confluent_kafka/schema_registry/protobuf.py b/src/confluent_kafka/schema_registry/protobuf.py index d0b8d7d3d..1eaa71b3b 100644 --- a/src/confluent_kafka/schema_registry/protobuf.py +++ b/src/confluent_kafka/schema_registry/protobuf.py @@ -146,6 +146,17 @@ class ProtobufSerializer(object): | ``auto.register.schemas`` | bool | previously associated with a particular subject. | | | | Defaults to True. | +-------------------------------------+----------+------------------------------------------------------+ + | | | Whether to use the latest subject version for | + | ``use.latest.version`` | bool | serialization. | + | | | WARNING: There is no check that the latest | + | | | schema is backwards compatible with the object | + | | | being serialized. | + | | | Defaults to False. | + +-------------------------------------+----------+------------------------------------------------------+ + | | | Whether to skip known types when resolving schema | + | ``skip.known.types`` | bool | dependencies. | + | | | Defaults to False. | + +-------------------------------------+----------+------------------------------------------------------+ | | | Callable(SerializationContext, str) -> str | | | | | | ``subject.name.strategy`` | callable | Instructs the ProtobufSerializer on how to construct | @@ -194,12 +205,15 @@ class ProtobufSerializer(object): `Protobuf API reference `_ """ # noqa: E501 - __slots__ = ['_auto_register', '_registry', '_known_subjects', + __slots__ = ['_auto_register', '_use_latest_version', '_skip_known_types', + '_registry', '_known_subjects', '_msg_class', '_msg_index', '_schema', '_schema_id', '_ref_reference_subject_func', '_subject_name_func'] # default configuration _default_conf = { 'auto.register.schemas': True, + 'use.latest.version': False, + 'skip.known.types': False, 'subject.name.strategy': topic_subject_name_strategy, 'reference.subject.name.strategy': reference_subject_name_strategy } @@ -214,6 +228,16 @@ def __init__(self, msg_type, schema_registry_client, conf=None): if not isinstance(self._auto_register, bool): raise ValueError("auto.register.schemas must be a boolean value") + self._use_latest_version = conf_copy.pop('use.latest.version') + if not isinstance(self._use_latest_version, bool): + raise ValueError("use.latest.version must be a boolean value") + if self._use_latest_version and self._auto_register: + raise ValueError("cannot enable both use.latest.version and auto.register.schemas") + + self._skip_known_types = conf_copy.pop('skip.known.types') + if not isinstance(self._skip_known_types, bool): + raise ValueError("skip.known.types must be a boolean value") + self._subject_name_func = conf_copy.pop('subject.name.strategy') if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") @@ -266,6 +290,8 @@ def _resolve_dependencies(self, ctx, file_desc): """ schema_refs = [] for dep in file_desc.dependencies: + if self._skip_known_types and dep.name.startswith("google/protobuf/"): + continue dep_refs = self._resolve_dependencies(ctx, dep) subject = self._ref_reference_subject_func(ctx, dep) schema = Schema(_schema_to_str(dep), @@ -313,15 +339,22 @@ def __call__(self, message_type, ctx): message_type.DESCRIPTOR.full_name) if subject not in self._known_subjects: - self._schema.references = self._resolve_dependencies( - ctx, message_type.DESCRIPTOR.file) + if self._use_latest_version: + latest_schema = self._registry.get_latest_version(subject) + self._schema_id = latest_schema.schema_id - if self._auto_register: - self._schema_id = self._registry.register_schema(subject, - self._schema) else: - self._schema_id = self._registry.lookup_schema( - subject, self._schema).schema_id + self._schema.references = self._resolve_dependencies( + ctx, message_type.DESCRIPTOR.file) + + if self._auto_register: + self._schema_id = self._registry.register_schema(subject, + self._schema) + else: + self._schema_id = self._registry.lookup_schema( + subject, self._schema).schema_id + + self._known_subjects.add(subject) with _ContextStringIO() as fo: # Write the magic byte and schema ID in network byte order diff --git a/tests/schema_registry/conftest.py b/tests/schema_registry/conftest.py index b6f5eae6a..047dca0e5 100644 --- a/tests/schema_registry/conftest.py +++ b/tests/schema_registry/conftest.py @@ -297,12 +297,13 @@ def get_subject_version_callback(self, request, context): path_match = re.match(self.subject_versions, request.path) subject = path_match.group(1) version = path_match.group(2) + version_num = -1 if version == 'latest' else int(version) - if int(version) == 404: + if version_num == 404: context.status_code = 404 return {'error_code': 40402, 'message': "Version not found"} - if int(version) == 422: + if version_num == 422: context.status_code = 422 return {'error_code': 42202, 'message': "Invalid version"} @@ -313,7 +314,7 @@ def get_subject_version_callback(self, request, context): context.status_code = 200 return {'subject': subject, 'id': self.SCHEMA_ID, - 'version': int(version), + 'version': version_num, 'schema': self._load_avsc(self.SCHEMA)} def delete_subject_version_callback(self, request, context): @@ -322,13 +323,14 @@ def delete_subject_version_callback(self, request, context): path_match = re.match(self.subject_versions, request.path) subject = path_match.group(1) version = path_match.group(2) + version_num = -1 if version == 'latest' else int(version) - if int(version) == 404: + if version_num == 404: context.status_code = 404 return {"error_code": 40402, "message": "Version not found"} - if int(version) == 422: + if version_num == 422: context.status_code = 422 return {"error_code": 42202, "message": "Invalid version"} @@ -339,7 +341,7 @@ def delete_subject_version_callback(self, request, context): "message": "Subject not found"} context.status_code = 200 - return int(version) + return version_num def post_subject_version_callback(self, request, context): self.counter['POST'][request.path] += 1 diff --git a/tests/schema_registry/test_avro_serializer.py b/tests/schema_registry/test_avro_serializer.py index 1941fdd5b..73c6bb45d 100644 --- a/tests/schema_registry/test_avro_serializer.py +++ b/tests/schema_registry/test_avro_serializer.py @@ -74,6 +74,29 @@ def test_avro_serializer_config_auto_register_schemas_false(mock_schema_registry assert test_client.counter['POST'].get('/subjects/{}'.format(subject)) == 1 +def test_avro_serializer_config_use_latest_version(mock_schema_registry): + """ + Ensures auto.register.schemas=False does not register schema + """ + conf = {'url': TEST_URL} + test_client = mock_schema_registry(conf) + topic = "test-use-latest-version" + subject = topic + '-key' + + test_serializer = AvroSerializer(test_client, 'string', + conf={'auto.register.schemas': False, 'use.latest.version': True}) + + test_serializer("test", + SerializationContext("test-use-latest-version", + MessageField.KEY)) + + register_count = test_client.counter['POST'].get('/subjects/{}/versions' + .format(subject), 0) + assert register_count == 0 + # Ensure latest was requested + assert test_client.counter['GET'].get('/subjects/{}/versions/latest'.format(subject)) == 1 + + def test_avro_serializer_config_subject_name_strategy(): """ Ensures subject.name.strategy is applied