From 66216133c351c2cb32955a7c2a93824249d2ec19 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 25 Jul 2024 14:07:16 +0100 Subject: [PATCH 1/9] pandaproxy/parsing: More lenient bool parsing Specifically, accept `True` by using case-insensitive comparison. Signed-off-by: Ben Pope --- src/v/pandaproxy/parsing/from_chars.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/v/pandaproxy/parsing/from_chars.h b/src/v/pandaproxy/parsing/from_chars.h index 5b7706e4147cc..2c4001bb78dec 100644 --- a/src/v/pandaproxy/parsing/from_chars.h +++ b/src/v/pandaproxy/parsing/from_chars.h @@ -18,6 +18,8 @@ #include +#include + #include #include #include @@ -79,7 +81,7 @@ class from_chars { using value_type = typename type::rep; return wrap(from_chars{}(in)); } else if constexpr (is_ss_bool) { - return type(in == "true" || in == "TRUE" || in == "1"); + return type(boost::iequals(in, "true") || in == "1"); } else if constexpr (is_arithmetic) { return do_from_chars(in); } From 7a9fcfa8b2260d4b6542bb3d8e0553a2bf97c02f Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 25 Jul 2024 14:25:45 +0100 Subject: [PATCH 2/9] schema_registry: Allow sorting references Signed-off-by: Ben Pope --- src/v/pandaproxy/schema_registry/types.cc | 5 +++++ src/v/pandaproxy/schema_registry/types.h | 3 +++ 2 files changed, 8 insertions(+) diff --git a/src/v/pandaproxy/schema_registry/types.cc b/src/v/pandaproxy/schema_registry/types.cc index e8c4cdf01909e..4187d6374efc9 100644 --- a/src/v/pandaproxy/schema_registry/types.cc +++ b/src/v/pandaproxy/schema_registry/types.cc @@ -73,6 +73,11 @@ std::ostream& operator<<(std::ostream& os, const schema_reference& ref) { return os; } +bool operator<(const schema_reference& lhs, const schema_reference& rhs) { + return std::tie(lhs.name, lhs.sub, lhs.version) + < std::tie(rhs.name, rhs.sub, rhs.version); +} + std::ostream& operator<<(std::ostream& os, const unparsed_schema& ref) { fmt::print(os, "subject: {}, {}", ref.sub(), ref.def()); return os; diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index 0e20be93ef953..caf4e9c725258 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -108,6 +108,9 @@ struct schema_reference { friend std::ostream& operator<<(std::ostream& os, const schema_reference& ref); + friend bool + operator<(const schema_reference& lhs, const schema_reference& rhs); + ss::sstring name; subject sub{invalid_subject}; schema_version version{invalid_schema_version}; From 26dfa333d7057cc088968bde314b6293e32db3b1 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 25 Jul 2024 14:27:14 +0100 Subject: [PATCH 3/9] schema_registry/store: Allow normalization Signed-off-by: Ben Pope --- src/v/pandaproxy/schema_registry/sharded_store.cc | 2 +- src/v/pandaproxy/schema_registry/sharded_store.h | 3 ++- src/v/pandaproxy/schema_registry/types.h | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index f1eb7d729998c..5cea8c678c7ed 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -75,7 +75,7 @@ ss::future<> sharded_store::start(is_mutable mut, ss::smp_service_group sg) { ss::future<> sharded_store::stop() { return _store.stop(); } ss::future -sharded_store::make_canonical_schema(unparsed_schema schema) { +sharded_store::make_canonical_schema(unparsed_schema schema, normalize norm) { switch (schema.type()) { case schema_type::avro: { auto [sub, unparsed] = std::move(schema).destructure(); diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index d84befa543e21..bf9fc976f5170 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -28,7 +28,8 @@ class sharded_store { ss::future<> stop(); ///\brief Make the canonical form of the schema - ss::future make_canonical_schema(unparsed_schema schema); + ss::future make_canonical_schema( + unparsed_schema schema, normalize norm = normalize::no); ///\brief Check the schema parses with the native format ss::future validate_schema(canonical_schema schema); diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index caf4e9c725258..42426f29bdf7e 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -35,6 +35,7 @@ using include_deleted = ss::bool_class; using is_deleted = ss::bool_class; using default_to_global = ss::bool_class; using force = ss::bool_class; +using normalize = ss::bool_class; template std::enable_if_t, std::optional> From a2a24d7d243a08c76f5e8c128802aefe7185ee12 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 25 Jul 2024 14:28:43 +0100 Subject: [PATCH 4/9] schema_registry: Allow normalization at startup Signed-off-by: Ben Pope --- src/v/config/configuration.cc | 6 ++++++ src/v/config/configuration.h | 1 + src/v/pandaproxy/schema_registry/sharded_store.cc | 5 ++++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 60b3ef8edbac7..11efcdf6c9f42 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3160,6 +3160,12 @@ configuration::configuration() "Per-shard capacity of the cache for validating schema IDs.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 128) + , schema_registry_normalize_on_startup( + *this, + "schema_registry_normalize_on_startup", + "Normalize schemas as they are read from the topic on startup.", + {.needs_restart = needs_restart::yes, .visibility = visibility::user}, + false) , pp_sr_smp_max_non_local_requests( *this, "pp_sr_smp_max_non_local_requests", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 31d9e7bb68a19..93f253071bdc3 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -583,6 +583,7 @@ struct configuration final : public config_store { enable_schema_id_validation; config::property kafka_schema_id_validation_cache_capacity; + property schema_registry_normalize_on_startup; property> pp_sr_smp_max_non_local_requests; bounded_property max_in_flight_schema_registry_requests_per_shard; bounded_property max_in_flight_pandaproxy_requests_per_shard; diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 5cea8c678c7ed..b94ce0d6c7813 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -12,6 +12,7 @@ #include "pandaproxy/schema_registry/sharded_store.h" #include "base/vlog.h" +#include "config/configuration.h" #include "hashing/jump_consistent_hash.h" #include "hashing/xx.h" #include "pandaproxy/logger.h" @@ -241,9 +242,11 @@ ss::future sharded_store::upsert( schema_id id, schema_version version, is_deleted deleted) { + auto norm = normalize{ + config::shard_local_cfg().schema_registry_normalize_on_startup()}; co_return co_await upsert( marker, - co_await make_canonical_schema(std::move(schema)), + co_await make_canonical_schema(std::move(schema), norm), id, version, deleted); From fabfbc5ae69ebbf90ebd354d5f262ec8ad71a429 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 25 Jul 2024 14:29:21 +0100 Subject: [PATCH 5/9] schema_registry: Allow normalization over REST Signed-off-by: Ben Pope --- .../api/api-doc/schema_registry.json | 12 +++++++++++ src/v/pandaproxy/schema_registry/handlers.cc | 21 +++++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/v/pandaproxy/api/api-doc/schema_registry.json b/src/v/pandaproxy/api/api-doc/schema_registry.json index e999015929b7d..c2f3ed02449b7 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry.json @@ -622,6 +622,12 @@ "required": true, "type": "string" }, + { + "name": "normalize", + "in": "query", + "required": false, + "type": "boolean" + }, { "name": "deleted", "in": "query", @@ -783,6 +789,12 @@ "required": true, "type": "string" }, + { + "name": "normalize", + "in": "query", + "required": false, + "type": "boolean" + }, { "name": "schema_def", "in": "body", diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 2edfd6d65ca05..5366ee3d02ff8 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -407,7 +407,14 @@ post_subject(server::request_t rq, server::reply_t rp) { auto inc_del{ parse::query_param>(*rq.req, "deleted") .value_or(include_deleted::no)}; - vlog(plog.debug, "post_subject subject='{}', deleted='{}'", sub, inc_del); + auto norm{parse::query_param>(*rq.req, "normalize") + .value_or(normalize::no)}; + vlog( + plog.debug, + "post_subject subject='{}', normalize='{}', deleted='{}'", + sub, + norm, + inc_del); // We must sync co_await rq.service().writer().read_sync(); @@ -419,7 +426,7 @@ post_subject(server::request_t rq, server::reply_t rp) { auto unparsed = co_await ppj::rjson_parse( std::move(rq.req), post_subject_versions_request_handler<>{sub}); schema = co_await rq.service().schema_store().make_canonical_schema( - std::move(unparsed.def)); + std::move(unparsed.def), norm); } catch (const exception& e) { if (e.code() == error_code::schema_empty) { throw as_exception(invalid_subject_schema(sub)); @@ -446,7 +453,13 @@ post_subject_versions(server::request_t rq, server::reply_t rp) { parse_content_type_header(rq); parse_accept_header(rq, rp); auto sub = parse::request_param(*rq.req, "subject"); - vlog(plog.debug, "post_subject_versions subject='{}'", sub); + auto norm{parse::query_param>(*rq.req, "normalize") + .value_or(normalize::no)}; + vlog( + plog.debug, + "post_subject_versions subject='{}', normalize='{}'", + sub, + norm); co_await rq.service().writer().read_sync(); @@ -455,7 +468,7 @@ post_subject_versions(server::request_t rq, server::reply_t rp) { subject_schema schema{ co_await rq.service().schema_store().make_canonical_schema( - std::move(unparsed.def)), + std::move(unparsed.def), norm), unparsed.version.value_or(invalid_schema_version), unparsed.id.value_or(invalid_schema_id), is_deleted::no}; From 0772f65c063fe0a541ff2bdb73859c574371b9c3 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Fri, 26 Jul 2024 11:04:15 +0100 Subject: [PATCH 6/9] schema_registry/avro: Fix recursion for normalization Fixes CORE-5704 TODO: Add tests Signed-off-by: Ben Pope --- src/v/pandaproxy/schema_registry/avro.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/v/pandaproxy/schema_registry/avro.cc b/src/v/pandaproxy/schema_registry/avro.cc index 6e9193187a260..86658b44c54c7 100644 --- a/src/v/pandaproxy/schema_registry/avro.cc +++ b/src/v/pandaproxy/schema_registry/avro.cc @@ -245,6 +245,11 @@ result sanitize_avro_type( case avro::AVRO_FIXED: case avro::AVRO_MAP: std::sort(o.begin(), o.end(), member_sorter{}); + for (auto& i : o) { + if (auto res = sanitize(i.value, ctx); !res.has_value()) { + return res; + } + } break; case avro::AVRO_RECORD: { auto res = sanitize_record(o, ctx); From f06cbaf63d501a2a3d42b7ea3797812f6188edd7 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 25 Jul 2024 15:11:33 +0100 Subject: [PATCH 7/9] schema_registry/dt: Refactor No functional changes Signed-off-by: Ben Pope --- tests/rptest/tests/schema_registry_test.py | 31 ++++++++++++---------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 2b6d340fd34ee..14979a73f2e80 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -119,21 +119,21 @@ def get_subject_name(sns: str, topic: str, field: MessageField, }) -class TestDataset(NamedTuple): +class TestCompatDataset(NamedTuple): type: SchemaType schema_base: str schema_backward_compatible: str schema_not_backward_compatible: str -def get_dataset(type: SchemaType) -> TestDataset: +def get_compat_dataset(type: SchemaType) -> TestCompatDataset: if type == SchemaType.AVRO: - return TestDataset(type=SchemaType.AVRO, - schema_base=schema1_def, - schema_backward_compatible=schema2_def, - schema_not_backward_compatible=schema3_def) + return TestCompatDataset(type=SchemaType.AVRO, + schema_base=schema1_def, + schema_backward_compatible=schema2_def, + schema_not_backward_compatible=schema3_def) if type == SchemaType.JSON: - return TestDataset( + return TestCompatDataset( type=SchemaType.JSON, schema_base=""" { @@ -476,12 +476,15 @@ def _post_subjects_subject(self, deleted=False, headers=HTTP_POST_HEADERS, **kwargs): - return self._request( - "POST", - f"subjects/{subject}{'?deleted=true' if deleted else ''}", - headers=headers, - data=data, - **kwargs) + params = {} + if (deleted): + params['deleted'] = 'true' + return self._request("POST", + f"subjects/{subject}", + headers=headers, + data=data, + params=params, + **kwargs) def _post_subjects_subject_versions(self, subject, @@ -1111,7 +1114,7 @@ def test_post_compatibility_subject_version(self, """ Verify compatibility """ - dataset = get_dataset(dataset_type) + dataset = get_compat_dataset(dataset_type) self.logger.debug(f"testing with {dataset=}") topic = create_topic_names(1)[0] From 86feb1392a1a99a138f7f2bb7fd6d72d3c8e33e9 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 25 Jul 2024 19:09:08 +0100 Subject: [PATCH 8/9] schema_registry/dt: Add tests for normalization Signed-off-by: Ben Pope --- tests/rptest/tests/schema_registry_test.py | 131 +++++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 14979a73f2e80..5e0e321ae1daf 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -170,6 +170,87 @@ def get_compat_dataset(type: SchemaType) -> TestCompatDataset: assert False, f"Unsupported schema {type=}" +class TestNormalizeDataset(NamedTuple): + type: SchemaType + schema_base: str + schema_canonical: str + schema_normalized: str + + +def get_normalize_dataset(type: SchemaType) -> TestNormalizeDataset: + if type == SchemaType.AVRO: + return TestNormalizeDataset(type=SchemaType.AVRO, + schema_base="""{ + "name": "myrecord", + "type": "record", + "fields": [ + { + "name": "nested", + "type": { + "type": "array", + "items": { + "fields": [ + { + "type": "string", + "name": "f1" + } + ], + "name": "nested_item", + "type": "record" + } + } + } + ] +}""", + schema_canonical=re.sub( + r"[\n\t\s]*", "", """{ + "type": "record", + "name": "myrecord", + "fields": [ + { + "name": "nested", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "nested_item", + "fields": [ + { + "name": "f1", + "type": "string" + } + ] + } + } + } + ] +}"""), + schema_normalized=re.sub( + r"[\n\t\s]*", "", """{ + "type": "record", + "name": "myrecord", + "fields": [ + { + "name": "nested", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "nested_item", + "fields": [ + { + "name": "f1", + "type": "string" + } + ] + } + } + } + ] +}""")) + assert False, f"Unsupported schema {type=}" + + class SchemaRegistryEndpoints(RedpandaTest): """ Test schema registry against a redpanda cluster. @@ -474,11 +555,14 @@ def _post_subjects_subject(self, subject, data, deleted=False, + normalize=False, headers=HTTP_POST_HEADERS, **kwargs): params = {} if (deleted): params['deleted'] = 'true' + if (normalize): + params['normalize'] = 'true' return self._request("POST", f"subjects/{subject}", headers=headers, @@ -489,12 +573,17 @@ def _post_subjects_subject(self, def _post_subjects_subject_versions(self, subject, data, + normalize=False, headers=HTTP_POST_HEADERS, **kwargs): + params = {} + if (normalize): + params['normalize'] = 'true' return self._request("POST", f"subjects/{subject}/versions", headers=headers, data=data, + params=params, **kwargs) def _get_subjects_subject_versions_version(self, @@ -1106,6 +1195,48 @@ def test_config(self): assert result_raw.json( )["message"] == f"Subject 'foo-key' not found.", f"{json.dumps(result_raw.json(), indent=1)}" + @cluster(num_nodes=3) + @parametrize(dataset_type=SchemaType.AVRO) + def test_normalize(self, dataset_type: SchemaType): + dataset = get_normalize_dataset(dataset_type) + self.logger.debug(f"testing with {dataset=}") + + topics = create_topic_names(2)[0] + canonical_topic = topics[0] + normalize_topic = topics[1] + + base_schema = json.dumps({ + "schema": dataset.schema_base, + "schemaType": str(dataset.type) + }) + + self.logger.debug( + f"Register a schema against a subject - not normalized") + result_raw = self._post_subjects_subject_versions( + subject=f"{canonical_topic}-key", + data=base_schema, + normalize=False) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + v1_id = result_raw.json()["id"] + + self.logger.debug(f"Checking that the returned schema is canonical") + result_raw = self._get_schemas_ids_id(id=v1_id) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()['schema'] == dataset.schema_canonical + + self.logger.debug(f"Register a schema against a subject - normalized") + result_raw = self._post_subjects_subject_versions( + subject=f"{normalize_topic}-key", data=base_schema, normalize=True) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + v1_id = result_raw.json()["id"] + + self.logger.debug(f"Checking that the returned schema is normalized") + result_raw = self._get_schemas_ids_id(id=v1_id) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()['schema'] == dataset.schema_normalized + @cluster(num_nodes=3) @parametrize(dataset_type=SchemaType.AVRO) @parametrize(dataset_type=SchemaType.JSON) From a4533901edc18a8ade573403500fb984be6ea34b Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 25 Jul 2024 14:29:54 +0100 Subject: [PATCH 9/9] schema_registry: Allow normalization of JSON Schema Signed-off-by: Ben Pope --- src/v/pandaproxy/schema_registry/json.cc | 42 +++++++++++++++++-- src/v/pandaproxy/schema_registry/json.h | 4 +- .../schema_registry/sharded_store.cc | 3 +- tests/rptest/tests/schema_registry_test.py | 34 ++++++++++++++- 4 files changed, 75 insertions(+), 8 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 22963f7adb10d..c658278f17647 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -1469,6 +1469,32 @@ bool check_compatible_dialects( return true; } +void sort(json::Value& val) { + switch (val.GetType()) { + case rapidjson::Type::kFalseType: + case rapidjson::Type::kNullType: + case rapidjson::Type::kNumberType: + case rapidjson::Type::kStringType: + case rapidjson::Type::kTrueType: + break; + case rapidjson::Type::kArrayType: { + for (auto& v : val.GetArray()) { + sort(v); + } + break; + } + case rapidjson::Type::kObjectType: { + auto v = val.GetObject(); + std::sort(v.begin(), v.end(), [](auto& lhs, auto& rhs) { + return std::string_view{ + lhs.name.GetString(), lhs.name.GetStringLength()} + < std::string_view{ + rhs.name.GetString(), rhs.name.GetStringLength()}; + }); + } + } +} + } // namespace ss::future @@ -1483,16 +1509,24 @@ make_json_schema_definition(sharded_store&, canonical_schema schema) { } ss::future make_canonical_json_schema( - sharded_store& store, unparsed_schema unparsed_schema) { - // TODO BP: More validation and normalisation - parse_json(unparsed_schema.def().shared_raw()).value(); // throws on error + sharded_store& store, unparsed_schema unparsed_schema, normalize norm) { auto [sub, unparsed] = std::move(unparsed_schema).destructure(); auto [def, type, refs] = std::move(unparsed).destructure(); + auto doc = parse_json(std::move(def)).value(); // throws on error + if (norm) { + sort(doc); + std::sort(refs.begin(), refs.end()); + refs.erase(std::unique(refs.begin(), refs.end()), refs.end()); + } + json::chunked_buffer out; + json::Writer w{out}; + doc.Accept(w); + canonical_schema schema{ std::move(sub), canonical_schema_definition{ - canonical_schema_definition::raw_string{std::move(def)()}, + canonical_schema_definition::raw_string{std::move(out).as_iobuf()}, type, std::move(refs)}}; diff --git a/src/v/pandaproxy/schema_registry/json.h b/src/v/pandaproxy/schema_registry/json.h index c251fa0719fd7..fa96a7ea90203 100644 --- a/src/v/pandaproxy/schema_registry/json.h +++ b/src/v/pandaproxy/schema_registry/json.h @@ -19,8 +19,8 @@ namespace pandaproxy::schema_registry { ss::future make_json_schema_definition(sharded_store& store, canonical_schema schema); -ss::future -make_canonical_json_schema(sharded_store& store, unparsed_schema def); +ss::future make_canonical_json_schema( + sharded_store& store, unparsed_schema def, normalize norm = normalize::no); bool check_compatible( const json_schema_definition& reader, const json_schema_definition& writer); diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index b94ce0d6c7813..4d24aafa29f54 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -88,7 +88,8 @@ sharded_store::make_canonical_schema(unparsed_schema schema, normalize norm) { co_return co_await make_canonical_protobuf_schema( *this, std::move(schema)); case schema_type::json: - co_return co_await make_canonical_json_schema(*this, std::move(schema)); + co_return co_await make_canonical_json_schema( + *this, std::move(schema), norm); } __builtin_unreachable(); } diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 5e0e321ae1daf..09c2a5bf8bfb2 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -109,7 +109,7 @@ def get_subject_name(sns: str, topic: str, field: MessageField, google.protobuf.Timestamp timestamp = 1; }""" -json_number_schema_def = '{"type": "number"}' +json_number_schema_def = '{"type":"number"}' log_config = LoggingConfig('info', logger_levels={ @@ -248,6 +248,37 @@ def get_normalize_dataset(type: SchemaType) -> TestNormalizeDataset: } ] }""")) + if type == SchemaType.JSON: + return TestNormalizeDataset( + type=SchemaType.JSON, + schema_base="""{ + "type": "object", + "properties": { + "aaaa": {"type": "number"} + }, + "additionalProperties": {"type": "boolean"}, + "required": ["aaaa"] +} +""", + schema_canonical=re.sub( + r"[\n\t\s]*", "", """{ + "type": "object", + "properties": { + "aaaa": {"type": "number"} + }, + "additionalProperties": {"type": "boolean"}, + "required": ["aaaa"] +}"""), + schema_normalized=re.sub( + r"[\n\t\s]*", "", """{ + "additionalProperties": {"type": "boolean"}, + "properties": { + "aaaa": {"type": "number"} + }, + "required": ["aaaa"], + "type": "object" +}"""), + ) assert False, f"Unsupported schema {type=}" @@ -1197,6 +1228,7 @@ def test_config(self): @cluster(num_nodes=3) @parametrize(dataset_type=SchemaType.AVRO) + @parametrize(dataset_type=SchemaType.JSON) def test_normalize(self, dataset_type: SchemaType): dataset = get_normalize_dataset(dataset_type) self.logger.debug(f"testing with {dataset=}")