Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-3181] Schema Registry: Normalization #22519

Merged
merged 9 commits into from
Jul 26, 2024
6 changes: 6 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3160,6 +3160,12 @@ configuration::configuration()
"Per-shard capacity of the cache for validating schema IDs.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
128)
, schema_registry_normalize_on_startup(
*this,
"schema_registry_normalize_on_startup",
"Normalize schemas as they are read from the topic on startup.",
{.needs_restart = needs_restart::yes, .visibility = visibility::user},
false)
, pp_sr_smp_max_non_local_requests(
*this,
"pp_sr_smp_max_non_local_requests",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ struct configuration final : public config_store {
enable_schema_id_validation;
config::property<size_t> kafka_schema_id_validation_cache_capacity;

property<bool> schema_registry_normalize_on_startup;
property<std::optional<uint32_t>> pp_sr_smp_max_non_local_requests;
bounded_property<size_t> max_in_flight_schema_registry_requests_per_shard;
bounded_property<size_t> max_in_flight_pandaproxy_requests_per_shard;
Expand Down
12 changes: 12 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,12 @@
"required": true,
"type": "string"
},
{
"name": "normalize",
"in": "query",
"required": false,
"type": "boolean"
},
{
"name": "deleted",
"in": "query",
Expand Down Expand Up @@ -783,6 +789,12 @@
"required": true,
"type": "string"
},
{
"name": "normalize",
"in": "query",
"required": false,
"type": "boolean"
},
Comment on lines +796 to +797
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I think we're not in the habit of this generally, but can we include the default value here?

{
"name": "schema_def",
"in": "body",
Expand Down
4 changes: 3 additions & 1 deletion src/v/pandaproxy/parsing/from_chars.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <seastar/core/sstring.hh>

#include <boost/algorithm/string.hpp>

#include <cctype>
#include <charconv>
#include <chrono>
Expand Down Expand Up @@ -79,7 +81,7 @@ class from_chars {
using value_type = typename type::rep;
return wrap(from_chars<value_type>{}(in));
} else if constexpr (is_ss_bool) {
return type(in == "true" || in == "TRUE" || in == "1");
return type(boost::iequals(in, "true") || in == "1");
} else if constexpr (is_arithmetic) {
return do_from_chars(in);
}
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ result<void> sanitize_avro_type(
case avro::AVRO_FIXED:
case avro::AVRO_MAP:
std::sort(o.begin(), o.end(), member_sorter<object_type::complex>{});
for (auto& i : o) {
if (auto res = sanitize(i.value, ctx); !res.has_value()) {
return res;
}
}
break;
case avro::AVRO_RECORD: {
auto res = sanitize_record(o, ctx);
Expand Down
21 changes: 17 additions & 4 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,14 @@ post_subject(server::request_t rq, server::reply_t rp) {
auto inc_del{
parse::query_param<std::optional<include_deleted>>(*rq.req, "deleted")
.value_or(include_deleted::no)};
vlog(plog.debug, "post_subject subject='{}', deleted='{}'", sub, inc_del);
BenPope marked this conversation as resolved.
Show resolved Hide resolved
auto norm{parse::query_param<std::optional<normalize>>(*rq.req, "normalize")
.value_or(normalize::no)};
vlog(
plog.debug,
"post_subject subject='{}', normalize='{}', deleted='{}'",
sub,
norm,
inc_del);
// We must sync
co_await rq.service().writer().read_sync();

Expand All @@ -419,7 +426,7 @@ post_subject(server::request_t rq, server::reply_t rp) {
auto unparsed = co_await ppj::rjson_parse(
std::move(rq.req), post_subject_versions_request_handler<>{sub});
schema = co_await rq.service().schema_store().make_canonical_schema(
std::move(unparsed.def));
std::move(unparsed.def), norm);
} catch (const exception& e) {
if (e.code() == error_code::schema_empty) {
throw as_exception(invalid_subject_schema(sub));
Expand All @@ -446,7 +453,13 @@ post_subject_versions(server::request_t rq, server::reply_t rp) {
parse_content_type_header(rq);
parse_accept_header(rq, rp);
auto sub = parse::request_param<subject>(*rq.req, "subject");
vlog(plog.debug, "post_subject_versions subject='{}'", sub);
auto norm{parse::query_param<std::optional<normalize>>(*rq.req, "normalize")
.value_or(normalize::no)};
vlog(
plog.debug,
"post_subject_versions subject='{}', normalize='{}'",
sub,
norm);

co_await rq.service().writer().read_sync();

Expand All @@ -455,7 +468,7 @@ post_subject_versions(server::request_t rq, server::reply_t rp) {

subject_schema schema{
co_await rq.service().schema_store().make_canonical_schema(
std::move(unparsed.def)),
std::move(unparsed.def), norm),
unparsed.version.value_or(invalid_schema_version),
unparsed.id.value_or(invalid_schema_id),
is_deleted::no};
Expand Down
42 changes: 38 additions & 4 deletions src/v/pandaproxy/schema_registry/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1469,6 +1469,32 @@ bool check_compatible_dialects(
return true;
}

void sort(json::Value& val) {
switch (val.GetType()) {
case rapidjson::Type::kFalseType:
case rapidjson::Type::kNullType:
case rapidjson::Type::kNumberType:
case rapidjson::Type::kStringType:
case rapidjson::Type::kTrueType:
break;
case rapidjson::Type::kArrayType: {
for (auto& v : val.GetArray()) {
sort(v);
}
break;
}
Comment on lines +1480 to +1485
Copy link
Contributor

@andijcr andijcr Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some arrays should be sorted i think. looking at draft7

        "allOf": { "$ref": "#/definitions/schemaArray" },
        "anyOf": { "$ref": "#/definitions/schemaArray" },
        "oneOf": { "$ref": "#/definitions/schemaArray" },
        "required": { "$ref": "#/definitions/stringArray" },
        "enum": {
            "type": "array",
            "items": true,
            "minItems": 1,
            "uniqueItems": true
        },
        "type": {
            "anyOf": [
                { "$ref": "#/definitions/simpleTypes" },
                {
                    "type": "array",
                    "items": { "$ref": "#/definitions/simpleTypes" },
                    "minItems": 1,
                    "uniqueItems": true
                }
            ]
        },

(note that "items" should not be sorted)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This'll have to wait.

case rapidjson::Type::kObjectType: {
auto v = val.GetObject();
std::sort(v.begin(), v.end(), [](auto& lhs, auto& rhs) {
return std::string_view{
lhs.name.GetString(), lhs.name.GetStringLength()}
< std::string_view{
rhs.name.GetString(), rhs.name.GetStringLength()};
});
}
}
}

} // namespace

ss::future<json_schema_definition>
Expand All @@ -1483,16 +1509,24 @@ make_json_schema_definition(sharded_store&, canonical_schema schema) {
}

ss::future<canonical_schema> make_canonical_json_schema(
sharded_store& store, unparsed_schema unparsed_schema) {
// TODO BP: More validation and normalisation
parse_json(unparsed_schema.def().shared_raw()).value(); // throws on error
sharded_store& store, unparsed_schema unparsed_schema, normalize norm) {
auto [sub, unparsed] = std::move(unparsed_schema).destructure();
auto [def, type, refs] = std::move(unparsed).destructure();

auto doc = parse_json(std::move(def)).value(); // throws on error
if (norm) {
sort(doc);
std::sort(refs.begin(), refs.end());
refs.erase(std::unique(refs.begin(), refs.end()), refs.end());
}
json::chunked_buffer out;
json::Writer<json::chunked_buffer> w{out};
doc.Accept(w);

canonical_schema schema{
std::move(sub),
canonical_schema_definition{
canonical_schema_definition::raw_string{std::move(def)()},
canonical_schema_definition::raw_string{std::move(out).as_iobuf()},
type,
std::move(refs)}};

Expand Down
4 changes: 2 additions & 2 deletions src/v/pandaproxy/schema_registry/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ namespace pandaproxy::schema_registry {
ss::future<json_schema_definition>
make_json_schema_definition(sharded_store& store, canonical_schema schema);

ss::future<canonical_schema>
make_canonical_json_schema(sharded_store& store, unparsed_schema def);
ss::future<canonical_schema> make_canonical_json_schema(
sharded_store& store, unparsed_schema def, normalize norm = normalize::no);

bool check_compatible(
const json_schema_definition& reader, const json_schema_definition& writer);
Expand Down
10 changes: 7 additions & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "pandaproxy/schema_registry/sharded_store.h"

#include "base/vlog.h"
#include "config/configuration.h"
#include "hashing/jump_consistent_hash.h"
#include "hashing/xx.h"
#include "pandaproxy/logger.h"
Expand Down Expand Up @@ -75,7 +76,7 @@ ss::future<> sharded_store::start(is_mutable mut, ss::smp_service_group sg) {
ss::future<> sharded_store::stop() { return _store.stop(); }

ss::future<canonical_schema>
sharded_store::make_canonical_schema(unparsed_schema schema) {
sharded_store::make_canonical_schema(unparsed_schema schema, normalize norm) {
switch (schema.type()) {
case schema_type::avro: {
auto [sub, unparsed] = std::move(schema).destructure();
Expand All @@ -87,7 +88,8 @@ sharded_store::make_canonical_schema(unparsed_schema schema) {
co_return co_await make_canonical_protobuf_schema(
*this, std::move(schema));
case schema_type::json:
co_return co_await make_canonical_json_schema(*this, std::move(schema));
co_return co_await make_canonical_json_schema(
*this, std::move(schema), norm);
}
__builtin_unreachable();
}
Expand Down Expand Up @@ -241,9 +243,11 @@ ss::future<bool> sharded_store::upsert(
schema_id id,
schema_version version,
is_deleted deleted) {
auto norm = normalize{
config::shard_local_cfg().schema_registry_normalize_on_startup()};
co_return co_await upsert(
marker,
co_await make_canonical_schema(std::move(schema)),
co_await make_canonical_schema(std::move(schema), norm),
BenPope marked this conversation as resolved.
Show resolved Hide resolved
id,
version,
deleted);
Expand Down
3 changes: 2 additions & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class sharded_store {
ss::future<> stop();

///\brief Make the canonical form of the schema
ss::future<canonical_schema> make_canonical_schema(unparsed_schema schema);
ss::future<canonical_schema> make_canonical_schema(
unparsed_schema schema, normalize norm = normalize::no);

///\brief Check the schema parses with the native format
ss::future<void> validate_schema(canonical_schema schema);
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ std::ostream& operator<<(std::ostream& os, const schema_reference& ref) {
return os;
}

bool operator<(const schema_reference& lhs, const schema_reference& rhs) {
return std::tie(lhs.name, lhs.sub, lhs.version)
< std::tie(rhs.name, rhs.sub, rhs.version);
}

std::ostream& operator<<(std::ostream& os, const unparsed_schema& ref) {
fmt::print(os, "subject: {}, {}", ref.sub(), ref.def());
return os;
Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ using include_deleted = ss::bool_class<struct include_deleted_tag>;
using is_deleted = ss::bool_class<struct is_deleted_tag>;
using default_to_global = ss::bool_class<struct default_to_global_tag>;
using force = ss::bool_class<struct force_tag>;
using normalize = ss::bool_class<struct normalize_tag>;

template<typename E>
std::enable_if_t<std::is_enum_v<E>, std::optional<E>>
Expand Down Expand Up @@ -108,6 +109,9 @@ struct schema_reference {
friend std::ostream&
operator<<(std::ostream& os, const schema_reference& ref);

friend bool
operator<(const schema_reference& lhs, const schema_reference& rhs);

BenPope marked this conversation as resolved.
Show resolved Hide resolved
ss::sstring name;
subject sub{invalid_subject};
schema_version version{invalid_schema_version};
Expand Down
Loading