From daeb07e94624f027f11f960a269f098c88eb7702 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Wed, 10 Apr 2024 23:32:13 -0700 Subject: [PATCH 1/8] sr/types: Introduce verbose flag and compatibility_result Signed-off-by: Oren Leiman (cherry picked from commit 3771501a369f98f0e9a67dd8e28ee7b7bd047dc5) --- src/v/pandaproxy/schema_registry/types.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index 42426f29bdf7..24950e0a6310 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -36,6 +36,7 @@ using is_deleted = ss::bool_class; using default_to_global = ss::bool_class; using force = ss::bool_class; using normalize = ss::bool_class; +using verbose = ss::bool_class; template std::enable_if_t, std::optional> @@ -534,6 +535,11 @@ from_string_view(std::string_view sv) { .default_match(std::nullopt); } +struct compatibility_result { + bool is_compat; + std::vector messages; +}; + } // namespace pandaproxy::schema_registry template<> From ddc6d8e53969da155c5bf1929216c7bc0078aa49 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 30 Apr 2024 16:59:48 -0700 Subject: [PATCH 2/8] sr: Introduce compatility module Signed-off-by: Oren Leiman (cherry picked from commit b27954caf6630fcae4b67b2fa3f2178069e88c81) Conflicts: src/v/pandaproxy/BUILD (doesn't exist) --- .../pandaproxy/schema_registry/CMakeLists.txt | 1 + .../schema_registry/compatibility.cc | 174 ++++++++++++++++++ .../schema_registry/compatibility.h | 174 ++++++++++++++++++ 3 files changed, 349 insertions(+) create mode 100644 src/v/pandaproxy/schema_registry/compatibility.cc create mode 100644 src/v/pandaproxy/schema_registry/compatibility.h diff --git a/src/v/pandaproxy/schema_registry/CMakeLists.txt b/src/v/pandaproxy/schema_registry/CMakeLists.txt index 631fab17db1d..3c6bf0580954 100644 --- a/src/v/pandaproxy/schema_registry/CMakeLists.txt +++ b/src/v/pandaproxy/schema_registry/CMakeLists.txt @@ -14,6 +14,7 @@ v_cc_library( SRCS api.cc configuration.cc + compatibility.cc handlers.cc error.cc service.cc diff --git a/src/v/pandaproxy/schema_registry/compatibility.cc b/src/v/pandaproxy/schema_registry/compatibility.cc new file mode 100644 index 000000000000..7b2c991e1681 --- /dev/null +++ b/src/v/pandaproxy/schema_registry/compatibility.cc @@ -0,0 +1,174 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "pandaproxy/schema_registry/compatibility.h" + +namespace pandaproxy::schema_registry { + +std::ostream& operator<<(std::ostream& os, const avro_incompatibility_type& t) { + switch (t) { + case avro_incompatibility_type::name_mismatch: + return os << "NAME_MISMATCH"; + case avro_incompatibility_type::fixed_size_mismatch: + return os << "FIXED_SIZE_MISMATCH"; + case avro_incompatibility_type::missing_enum_symbols: + return os << "MISSING_ENUM_SYMBOLS"; + case avro_incompatibility_type::reader_field_missing_default_value: + return os << "READER_FIELD_MISSING_DEFAULT_VALUE"; + case avro_incompatibility_type::type_mismatch: + return os << "TYPE_MISMATCH"; + case avro_incompatibility_type::missing_union_branch: + return os << "MISSING_UNION_BRANCH"; + case avro_incompatibility_type::unknown: + return os << "UNKNOWN"; + }; + __builtin_unreachable(); +} + +std::string_view description_for_type(avro_incompatibility_type t) { + switch (t) { + case avro_incompatibility_type::name_mismatch: + return "The name of the schema has changed (path '{path}')"; + case avro_incompatibility_type::fixed_size_mismatch: + return "The size of FIXED type field at path '{path}' in the " + "{{reader}} schema does not match with the {{writer}} schema"; + case avro_incompatibility_type::missing_enum_symbols: + return "The {{reader}} schema is missing enum symbols '{additional}' " + "at path '{path}' in the {{writer}} schema"; + case avro_incompatibility_type::reader_field_missing_default_value: + return "The field '{additional}' at path '{path}' in the {{reader}} " + "schema has " + "no default value and is missing in the {{writer}}"; + case avro_incompatibility_type::type_mismatch: + return "The type (path '{path}') of a field in the {{reader}} schema " + "does not match with the {{writer}} schema"; + case avro_incompatibility_type::missing_union_branch: + return "The {{reader}} schema is missing a type inside a union field " + "at path '{path}' in the {{writer}} schema"; + case avro_incompatibility_type::unknown: + return "{{reader}} schema is not compatible with {{writer}} schema: " + "check '{path}'"; + }; + __builtin_unreachable(); +} + +std::ostream& operator<<(std::ostream& os, const avro_incompatibility& v) { + fmt::print( + os, + "{{errorType:'{}', description:'{}', additionalInfo:'{}'}}", + v._type, + v.describe(), + v._additional_info); + return os; +} + +ss::sstring avro_incompatibility::describe() const { + return fmt::format( + fmt::runtime(description_for_type(_type)), + fmt::arg("path", _path.string()), + fmt::arg("additional", _additional_info)); +} + +std::ostream& +operator<<(std::ostream& os, const proto_incompatibility_type& t) { + switch (t) { + case proto_incompatibility_type::message_removed: + return os << "MESSAGE_REMOVED"; + case proto_incompatibility_type::field_kind_changed: + return os << "FIELD_KIND_CHANGED"; + case proto_incompatibility_type::field_scalar_kind_changed: + return os << "FIELD_SCALAR_KIND_CHANGED"; + case proto_incompatibility_type::field_named_type_changed: + return os << "FIELD_NAMED_TYPE_CHANGED"; + case proto_incompatibility_type::required_field_added: + return os << "REQUIRED_FIELD_ADDED"; + case proto_incompatibility_type::required_field_removed: + return os << "REQUIRED_FIELD_REMOVED"; + case proto_incompatibility_type::oneof_field_removed: + return os << "ONEOF_FIELD_REMOVED"; + case proto_incompatibility_type::multiple_fields_moved_to_oneof: + return os << "MULTIPLE_FIELDS_MOVED_TO_ONEOF"; + case proto_incompatibility_type::unknown: + return os << "UNKNOWN"; + } + __builtin_unreachable(); +} + +std::string_view description_for_type(proto_incompatibility_type t) { + switch (t) { + case proto_incompatibility_type::message_removed: + return "The {{reader}} schema is missing a field of type MESSAGE at " + "path '{path}' in the {{writer}} schema"; + case proto_incompatibility_type::field_kind_changed: + return "The type of a field at path '{path}' in the {{reader}} " + "schema does not match the {{writer}} schema"; + case proto_incompatibility_type::field_scalar_kind_changed: + return "The kind of a SCALAR field at path '{path}' in the {{reader}} " + "schema does not match its kind in the {{writer}} schema"; + case proto_incompatibility_type::field_named_type_changed: + return "The type of a MESSAGE field at path '{path}' in the {{reader}} " + "schema does not match its type in the {{writer}} schema "; + case proto_incompatibility_type::required_field_added: + return "A required field at path '{path}' in the {{reader}} schema " + "is missing in the {{writer}} schema"; + case proto_incompatibility_type::required_field_removed: + return "The {{reader}} schema is missing a required field at path: " + "'{path}' in the {{writer}} schema"; + case proto_incompatibility_type::oneof_field_removed: + return "The {{reader}} schema is missing a oneof field at path " + "'{path}' in the {{writer}} schema"; + case proto_incompatibility_type::multiple_fields_moved_to_oneof: + return "Multiple fields in the oneof at path '{path}' in the " + "{{reader}} schema are outside a oneof in the {{writer}} " + "schema "; + case proto_incompatibility_type::unknown: + return "{{reader}} schema is not compatible with {{writer}} schema: " + "check '{path}'"; + } + __builtin_unreachable(); +} + +std::ostream& operator<<(std::ostream& os, const proto_incompatibility& v) { + fmt::print( + os, "{{errorType:'{}', description:'{}'}}", v._type, v.describe()); + return os; +} + +ss::sstring proto_incompatibility::describe() const { + return fmt::format( + fmt::runtime(description_for_type(_type)), + fmt::arg("path", _path.string())); +} + +compatibility_result +raw_compatibility_result::operator()(verbose is_verbose) && { + compatibility_result result = {.is_compat = !has_error()}; + if (is_verbose) { + result.messages.reserve(_errors.size()); + std::transform( + std::make_move_iterator(_errors.begin()), + std::make_move_iterator(_errors.end()), + std::back_inserter(result.messages), + [](auto e) { + return std::visit( + [](auto&& e) { return fmt::format("{{{}}}", e); }, e); + }); + } + return result; +} + +void raw_compatibility_result::merge(raw_compatibility_result&& other) { + _errors.reserve(_errors.size() + other._errors.size()); + std::move( + other._errors.begin(), other._errors.end(), std::back_inserter(_errors)); +} + +} // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/compatibility.h b/src/v/pandaproxy/schema_registry/compatibility.h new file mode 100644 index 000000000000..ef991d7495a0 --- /dev/null +++ b/src/v/pandaproxy/schema_registry/compatibility.h @@ -0,0 +1,174 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "base/vassert.h" +#include "pandaproxy/schema_registry/types.h" + +#include + +#include +#include +#include + +/** + * compatibility.h + * + * Support classes for tracking, accumulating, and emitting formatted error + * messages while checking compatibility of avro & protobuf schemas. + */ + +namespace pandaproxy::schema_registry { + +enum class avro_incompatibility_type { + name_mismatch = 0, + fixed_size_mismatch, + missing_enum_symbols, + reader_field_missing_default_value, + type_mismatch, + missing_union_branch, + unknown, +}; + +/** + * avro_incompatibility - A single incompatibility between Avro schemas. + * + * Encapsulates: + * - the path to the location of the incompatibility in the _writer_ schema + * - the type of incompatibility + * - any additional context for the incompatibility (e.g. a field name) + * + * Primary interface is `describe`, which combines the contained info into + * a format string which can then be interpolated with identifying info for + * the reader and writer schema in the request handler. + */ +class avro_incompatibility { +public: + using Type = avro_incompatibility_type; + avro_incompatibility( + std::filesystem::path path, Type type, std::string_view info) + : _path(std::move(path)) + , _type(type) + , _additional_info(info) {} + + avro_incompatibility(std::filesystem::path path, Type type) + : avro_incompatibility(std::move(path), type, "") {} + + ss::sstring describe() const; + +private: + friend std::ostream& + operator<<(std::ostream& os, const avro_incompatibility& v); + + friend bool + operator==(const avro_incompatibility&, const avro_incompatibility&) + = default; + + // Useful for unit testing + template + friend H AbslHashValue(H h, const avro_incompatibility& e) { + return H::combine( + std::move(h), e._path.string(), e._type, e._additional_info); + } + + std::filesystem::path _path; + Type _type; + ss::sstring _additional_info; +}; + +enum class proto_incompatibility_type { + message_removed = 0, + field_kind_changed, + field_scalar_kind_changed, + field_named_type_changed, + required_field_added, + required_field_removed, + oneof_field_removed, + multiple_fields_moved_to_oneof, + unknown, +}; + +/** + * proto_incompatibility - A single incompatibility between Protobuf schemas. + * + * Encapsulates: + * - the path to the location of the incompatibility in the _writer_ schema + * - the type of incompatibility + * + * Primary interface is `describe`, which combines the contained info into + * a format string which can then be interpolated with identifying info for + * the reader and writer schemas in the request handler. + */ +class proto_incompatibility { +public: + using Type = proto_incompatibility_type; + proto_incompatibility(std::filesystem::path path, Type type) + : _path(std::move(path)) + , _type(type) {} + + ss::sstring describe() const; + Type type() const { return _type; } + +private: + friend std::ostream& + operator<<(std::ostream& os, const proto_incompatibility& v); + + friend bool + operator==(const proto_incompatibility&, const proto_incompatibility&) + = default; + + // Helpful for unit testing + template + friend H AbslHashValue(H h, const proto_incompatibility& e) { + return H::combine(std::move(h), e._path.string(), e._type); + } + + std::filesystem::path _path; + Type _type; +}; + +/** + * raw_compatibility_result - A collection of unformatted proto or avro + * incompatibilities. Its purpose is twofold: + * - Provide an abstracted way to accumulate incompatibilities across + * a recursive call chain. The `merge` function makes this simple + * and seeks to avoid excessive copying. + * - Provide a (type-constrained) generic means to process raw + * incompatibilities into formatted error messages. + */ +class raw_compatibility_result { + using schema_incompatibility + = std::variant; + +public: + raw_compatibility_result() = default; + + template + requires std::constructible_from + && std::convertible_to + auto emplace(Args&&... args) { + return _errors.emplace_back( + std::in_place_type, std::forward(args)...); + } + + compatibility_result operator()(verbose is_verbose) &&; + + // Move the contents of other into the errors vec of this + void merge(raw_compatibility_result&& other); + + bool has_error() const { return !_errors.empty(); } + +private: + std::vector _errors{}; +}; + +} // namespace pandaproxy::schema_registry From e06364bf5d172ac63a96d5291994bae3dca328f8 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Wed, 10 Apr 2024 23:34:36 -0700 Subject: [PATCH 3/8] sr/sharded_store: Wire up compatibility_result Signed-off-by: Oren Leiman (cherry picked from commit 25fcd5ef0c0807dcbb3b1ebc6043d7636358701c) --- src/v/pandaproxy/schema_registry/avro.cc | 11 ++- src/v/pandaproxy/schema_registry/avro.h | 6 +- src/v/pandaproxy/schema_registry/json.cc | 25 +++-- src/v/pandaproxy/schema_registry/json.h | 6 +- src/v/pandaproxy/schema_registry/protobuf.cc | 8 +- src/v/pandaproxy/schema_registry/protobuf.h | 5 +- .../schema_registry/sharded_store.cc | 94 ++++++++++++++++--- .../schema_registry/sharded_store.h | 12 +++ .../test/compatibility_avro.cc | 13 +-- .../schema_registry/test/test_json_schema.cc | 12 ++- 10 files changed, 147 insertions(+), 45 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/avro.cc b/src/v/pandaproxy/schema_registry/avro.cc index 86658b44c54c..1c700d713b0c 100644 --- a/src/v/pandaproxy/schema_registry/avro.cc +++ b/src/v/pandaproxy/schema_registry/avro.cc @@ -17,6 +17,7 @@ #include "json/document.h" #include "json/json.h" #include "json/types.h" +#include "pandaproxy/schema_registry/compatibility.h" #include "pandaproxy/schema_registry/error.h" #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/sharded_store.h" @@ -550,9 +551,13 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) { def.refs()}; } -bool check_compatible( - const avro_schema_definition& reader, const avro_schema_definition& writer) { - return check_compatible(*reader().root(), *writer().root()); +compatibility_result check_compatible( + const avro_schema_definition& reader, + const avro_schema_definition& writer, + verbose is_verbose [[maybe_unused]]) { + // TODO(gellert.nagy): start using the is_verbose flag in a follow up PR + return compatibility_result{ + .is_compat = check_compatible(*reader().root(), *writer().root())}; } } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/avro.h b/src/v/pandaproxy/schema_registry/avro.h index 80c38d1a47a9..9d13a67c490e 100644 --- a/src/v/pandaproxy/schema_registry/avro.h +++ b/src/v/pandaproxy/schema_registry/avro.h @@ -23,7 +23,9 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema); result sanitize_avro_schema_definition(unparsed_schema_definition def); -bool check_compatible( - const avro_schema_definition& reader, const avro_schema_definition& writer); +compatibility_result check_compatible( + const avro_schema_definition& reader, + const avro_schema_definition& writer, + verbose is_verbose = verbose::no); } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 1b14f0159c3a..e1b69f3af58d 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -1621,15 +1621,22 @@ ss::future make_canonical_json_schema( co_return schema; } -bool check_compatible( - const json_schema_definition& reader, const json_schema_definition& writer) { - // schemas might be using incompatible dialects - if (!check_compatible_dialects(reader().doc, writer().doc)) { - return false; - } - // reader is a superset of writer iff every schema that is valid for writer - // is also valid for reader - return is_superset(reader().doc, writer().doc); +compatibility_result check_compatible( + const json_schema_definition& reader, + const json_schema_definition& writer, + verbose is_verbose [[maybe_unused]]) { + auto is_compatible = [&]() { + // schemas might be using incompatible dialects + if (!check_compatible_dialects(reader().doc, writer().doc)) { + return false; + } + // reader is a superset of writer iff every schema that is valid for + // writer is also valid for reader + return is_superset(reader().doc, writer().doc); + }(); + + // TODO(gellert.nagy): start using the is_verbose flag in a follow up PR + return compatibility_result{.is_compat = is_compatible}; } } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/json.h b/src/v/pandaproxy/schema_registry/json.h index fa96a7ea9020..b288425209cf 100644 --- a/src/v/pandaproxy/schema_registry/json.h +++ b/src/v/pandaproxy/schema_registry/json.h @@ -22,7 +22,9 @@ make_json_schema_definition(sharded_store& store, canonical_schema schema); ss::future make_canonical_json_schema( sharded_store& store, unparsed_schema def, normalize norm = normalize::no); -bool check_compatible( - const json_schema_definition& reader, const json_schema_definition& writer); +compatibility_result check_compatible( + const json_schema_definition& reader, + const json_schema_definition& writer, + verbose is_verbose = verbose::no); } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/protobuf.cc b/src/v/pandaproxy/schema_registry/protobuf.cc index 1446d8503187..935985d444a0 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.cc +++ b/src/v/pandaproxy/schema_registry/protobuf.cc @@ -624,11 +624,13 @@ struct compatibility_checker { } // namespace -bool check_compatible( +compatibility_result check_compatible( const protobuf_schema_definition& reader, - const protobuf_schema_definition& writer) { + const protobuf_schema_definition& writer, + verbose is_verbose [[maybe_unused]]) { compatibility_checker checker{reader(), writer()}; - return checker.check_compatible(); + // TODO(gellert.nagy): start using the is_verbose flag in a follow up PR + return compatibility_result{.is_compat = checker.check_compatible()}; } } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/protobuf.h b/src/v/pandaproxy/schema_registry/protobuf.h index edbc6aaf65c3..58c325064a47 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.h +++ b/src/v/pandaproxy/schema_registry/protobuf.h @@ -25,8 +25,9 @@ validate_protobuf_schema(sharded_store& store, canonical_schema schema); ss::future make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema); -bool check_compatible( +compatibility_result check_compatible( const protobuf_schema_definition& reader, - const protobuf_schema_definition& writer); + const protobuf_schema_definition& writer, + verbose is_verbose = verbose::no); } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 4d24aafa29f5..1acaae781afb 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -24,6 +24,7 @@ #include "pandaproxy/schema_registry/protobuf.h" #include "pandaproxy/schema_registry/store.h" #include "pandaproxy/schema_registry/types.h" +#include "pandaproxy/schema_registry/util.h" #include #include @@ -49,13 +50,14 @@ ss::shard_id shard_for(schema_id id) { return jump_consistent_hash(id(), ss::smp::count); } -bool check_compatible(const valid_schema& reader, const valid_schema& writer) { - return reader.visit([&](const auto& reader) { - return writer.visit([&](const auto& writer) { +compatibility_result check_compatible( + const valid_schema& reader, const valid_schema& writer, verbose is_verbose) { + return reader.visit([&](const auto& reader) -> compatibility_result { + return writer.visit([&](const auto& writer) -> compatibility_result { if constexpr (std::is_same_v) { - return check_compatible(reader, writer); + return check_compatible(reader, writer, is_verbose); } - return false; + return {.is_compat = false}; }); }); } @@ -682,6 +684,18 @@ ss::future<> sharded_store::maybe_update_max_schema_id(schema_id id) { ss::future sharded_store::is_compatible( schema_version version, canonical_schema new_schema) { + auto rslt = co_await do_is_compatible( + version, std::move(new_schema), verbose::no); + co_return rslt.is_compat; +} + +ss::future sharded_store::is_compatible( + schema_version version, canonical_schema new_schema, verbose is_verbose) { + return do_is_compatible(version, std::move(new_schema), is_verbose); +} + +ss::future sharded_store::do_is_compatible( + schema_version version, canonical_schema new_schema, verbose is_verbose) { // Lookup the version_ids const auto sub = new_schema.sub(); const auto versions = co_await _store.invoke_on( @@ -707,16 +721,22 @@ ss::future sharded_store::is_compatible( auto old_schema = co_await get_subject_schema( sub, version, include_deleted::no); + // Lookup the compatibility level + auto compat = co_await get_compatibility(sub, default_to_global::yes); + // Types must always match if (old_schema.schema.type() != new_schema.type()) { - co_return false; + compatibility_result result{.is_compat = false}; + if (is_verbose) { + result.messages = { + "Incompatible because of different schema type", + fmt::format("{{compatibility: {}}}", compat)}; + } + co_return result; } - // Lookup the compatibility level - auto compat = co_await get_compatibility(sub, default_to_global::yes); - if (compat == compatibility_level::none) { - co_return true; + co_return compatibility_result{.is_compat = true}; } // Currently support JSON, PROTOBUF, AVRO @@ -742,8 +762,18 @@ ss::future sharded_store::is_compatible( auto new_valid = co_await make_valid_schema(std::move(new_schema)); - auto is_compat = true; - for (; is_compat && ver_it != versions.end(); ++ver_it) { + compatibility_result result{.is_compat = true}; + + auto formatter = [](std::string_view rdr, std::string_view wrtr) { + return [rdr, wrtr](std::string_view msg) { + return fmt::format( + fmt::runtime(msg), + fmt::arg("reader", rdr), + fmt::arg("writer", wrtr)); + }; + }; + + for (; result.is_compat && ver_it != versions.end(); ++ver_it) { if (ver_it->deleted) { continue; } @@ -753,22 +783,56 @@ ss::future sharded_store::is_compatible( auto old_valid = co_await make_valid_schema( std::move(old_schema.schema)); + std::vector version_messages; + if ( compat == compatibility_level::backward || compat == compatibility_level::backward_transitive || compat == compatibility_level::full || compat == compatibility_level::full_transitive) { - is_compat = is_compat && check_compatible(new_valid, old_valid); + auto r = check_compatible(new_valid, old_valid, is_verbose); + result.is_compat = result.is_compat && r.is_compat; + version_messages.reserve( + version_messages.size() + r.messages.size()); + std::transform( + std::make_move_iterator(r.messages.begin()), + std::make_move_iterator(r.messages.end()), + std::back_inserter(version_messages), + formatter("new", "old")); } if ( compat == compatibility_level::forward || compat == compatibility_level::forward_transitive || compat == compatibility_level::full || compat == compatibility_level::full_transitive) { - is_compat = is_compat && check_compatible(old_valid, new_valid); + auto r = check_compatible(old_valid, new_valid, is_verbose); + result.is_compat = result.is_compat && r.is_compat; + version_messages.reserve( + version_messages.size() + r.messages.size()); + std::transform( + std::make_move_iterator(r.messages.begin()), + std::make_move_iterator(r.messages.end()), + std::back_inserter(version_messages), + formatter("old", "new")); + } + + if (is_verbose && !result.is_compat) { + version_messages.emplace_back( + fmt::format("{{oldSchemaVersion: {}}}", old_schema.version)); + version_messages.emplace_back( + fmt::format("{{oldSchema: '{}'}}", to_string(old_valid.raw()))); + version_messages.emplace_back( + fmt::format("{{compatibility: {}}}", compat)); } + + result.messages.reserve( + result.messages.size() + version_messages.size()); + std::move( + version_messages.begin(), + version_messages.end(), + std::back_inserter(result.messages)); } - co_return is_compat; + co_return result; } void sharded_store::check_mode_mutability(force f) const { diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index bf9fc976f517..4ce9e5611d9a 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -178,12 +178,24 @@ class sharded_store { ss::future is_compatible(schema_version version, canonical_schema new_schema); + ///\brief Check if the provided schema is compatible with the subject and + /// version, according the the current compatibility, with the result + /// optionally accompanied by a vector of detailed error messages. + /// + /// If the compatibility level is transitive, then all versions are checked, + /// otherwise checks are against the version provided and newer. + ss::future is_compatible( + schema_version version, canonical_schema new_schema, verbose is_verbose); + ss::future has_version(const subject&, schema_id, include_deleted); //// \brief Throw if the store is not mutable void check_mode_mutability(force f) const; private: + ss::future do_is_compatible( + schema_version version, canonical_schema new_schema, verbose is_verbose); + ss::future upsert_schema(schema_id id, canonical_schema_definition def); ss::future<> delete_schema(schema_id id); diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc b/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc index 78e2dc4e6ee6..596143c3f38c 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc @@ -23,12 +23,13 @@ bool check_compatible( const pps::canonical_schema_definition& w) { pps::sharded_store s; return check_compatible( - pps::make_avro_schema_definition( - s, {pps::subject("r"), {r.shared_raw(), pps::schema_type::avro}}) - .get(), - pps::make_avro_schema_definition( - s, {pps::subject("w"), {w.shared_raw(), pps::schema_type::avro}}) - .get()); + pps::make_avro_schema_definition( + s, {pps::subject("r"), {r.shared_raw(), pps::schema_type::avro}}) + .get(), + pps::make_avro_schema_definition( + s, {pps::subject("w"), {w.shared_raw(), pps::schema_type::avro}}) + .get()) + .is_compat; } SEASTAR_THREAD_TEST_CASE(test_avro_type_promotion) { diff --git a/src/v/pandaproxy/schema_registry/test/test_json_schema.cc b/src/v/pandaproxy/schema_registry/test/test_json_schema.cc index 619fcfd0c33c..463d5b8b547e 100644 --- a/src/v/pandaproxy/schema_registry/test/test_json_schema.cc +++ b/src/v/pandaproxy/schema_registry/test/test_json_schema.cc @@ -24,6 +24,12 @@ namespace pp = pandaproxy; namespace pps = pp::schema_registry; +bool check_compatible( + const pps::json_schema_definition& reader_schema, + const pps::json_schema_definition& writer_schema) { + return pps::check_compatible(reader_schema, writer_schema).is_compat; +} + struct store_fixture { store_fixture() { store.start(pps::is_mutable::yes, ss::default_smp_service_group()) @@ -1034,14 +1040,14 @@ SEASTAR_THREAD_TEST_CASE(test_compatibility_check) { try { // sanity check that each schema is compatible with itself BOOST_CHECK_MESSAGE( - pps::check_compatible( + ::check_compatible( make_json_schema(data.reader_schema), make_json_schema(data.reader_schema)), fmt::format( "reader '{}' should be compatible with itself", data.reader_schema)); BOOST_CHECK_MESSAGE( - pps::check_compatible( + ::check_compatible( make_json_schema(data.writer_schema), make_json_schema(data.writer_schema)), fmt::format( @@ -1051,7 +1057,7 @@ SEASTAR_THREAD_TEST_CASE(test_compatibility_check) { // check compatibility (or not) reader->writer BOOST_CHECK_EQUAL( data.reader_is_compatible_with_writer, - pps::check_compatible( + ::check_compatible( make_json_schema(data.reader_schema), make_json_schema(data.writer_schema))); } catch (...) { From 63454137838606ca2bd4be6c95d3fcdee5866f95 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Wed, 10 Apr 2024 23:31:34 -0700 Subject: [PATCH 4/8] sr/swagger: Add messages and verbose flag to compatibility API spec Signed-off-by: Oren Leiman (cherry picked from commit d8bcac608a37c396298957cef557e2716aa4efa1) --- src/v/pandaproxy/api/api-doc/schema_registry.json | 6 ++++++ .../api/api-doc/schema_registry_definitions.def.json | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/v/pandaproxy/api/api-doc/schema_registry.json b/src/v/pandaproxy/api/api-doc/schema_registry.json index 4c56dda37e88..f76f5b6df8c2 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry.json @@ -1148,6 +1148,12 @@ "schema": { "$ref": "#/definitions/schema_def" } + }, + { + "name": "verbose", + "in": "query", + "required": false, + "type": "boolean" } ], "produces": [ diff --git a/src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json b/src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json index cfeecafcc149..58f8937f9210 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json @@ -108,6 +108,12 @@ "properties": { "is_compatible": { "type": "boolean" - } + }, + "messages": { + "type": "array", + "items": { + "type": "string" + } + }, } } From af05474a1a8137d9d09996e97b29bd295fcb1929 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Wed, 10 Apr 2024 23:35:22 -0700 Subject: [PATCH 5/8] sr/handlers: Wire up compatibility_result Signed-off-by: Oren Leiman (cherry picked from commit 992a8e7abd1667924f86fbe3b56ccf9afa97e0a9) --- src/v/pandaproxy/schema_registry/handlers.cc | 39 ++++++++++++++++--- .../schema_registry/requests/compatibility.h | 9 +++++ 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 5366ee3d02ff..2ddd84365338 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -637,6 +637,9 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) { parse_accept_header(rq, rp); auto ver = parse::request_param(*rq.req, "version"); auto sub = parse::request_param(*rq.req, "subject"); + auto is_verbose{ + parse::query_param>(*rq.req, "verbose") + .value_or(verbose::no)}; auto unparsed = co_await ppj::rjson_parse( std::move(rq.req), post_subject_versions_request_handler<>{sub}); @@ -660,17 +663,43 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) { version = parse_numerical_schema_version(ver).value(); } - auto schema = co_await rq.service().schema_store().make_canonical_schema( - std::move(unparsed.def)); + canonical_schema schema; + try { + schema = co_await rq.service().schema_store().make_canonical_schema( + std::move(unparsed.def)); + } catch (exception& e) { + constexpr auto reportable = [](std::error_code ec) { + constexpr std::array errors{ + error_code::schema_invalid, error_code::schema_empty}; + return absl::c_any_of( + errors, [ec](error_code e) { return ec == e; }); + }; + if (is_verbose && reportable(e.code())) { + rp.rep->write_body( + "json", + json::rjson_serialize(post_compatibility_res{ + .is_compat = false, + .messages = {e.message()}, + .is_verbose = is_verbose, + })); + co_return rp; + } + throw; + } + auto get_res = co_await get_or_load( - rq, [&rq, schema{std::move(schema)}, version]() { + rq, [&rq, schema{std::move(schema)}, version, is_verbose]() { return rq.service().schema_store().is_compatible( - version, schema.share()); + version, schema.share(), is_verbose); }); rp.rep->write_body( "json", - ppj::rjson_serialize(post_compatibility_res{.is_compat = get_res})); + json::rjson_serialize(post_compatibility_res{ + .is_compat = get_res.is_compat, + .messages = std::move(get_res.messages), + .is_verbose = is_verbose, + })); co_return rp; } diff --git a/src/v/pandaproxy/schema_registry/requests/compatibility.h b/src/v/pandaproxy/schema_registry/requests/compatibility.h index a39608b689ee..8e81ded6b662 100644 --- a/src/v/pandaproxy/schema_registry/requests/compatibility.h +++ b/src/v/pandaproxy/schema_registry/requests/compatibility.h @@ -18,6 +18,11 @@ namespace pandaproxy::schema_registry { struct post_compatibility_res { bool is_compat{false}; + std::vector messages; + + // `is_verbose` is not rendered into the response but `messages` are + // conditionally rendered based on `is_verbose` + verbose is_verbose; }; template @@ -27,6 +32,10 @@ void rjson_serialize( w.StartObject(); w.Key("is_compatible"); ::json::rjson_serialize(w, res.is_compat); + if (res.is_verbose) { + w.Key("messages"); + ::json::rjson_serialize(w, res.messages); + } w.EndObject(); } From a2844da6e0c7f1b77472fa358dcea058e5f8d2f0 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 30 Apr 2024 21:15:44 -0700 Subject: [PATCH 6/8] rpk: check_compatibility --verbose-compat Signed-off-by: Oren Leiman (cherry picked from commit 2825e146adc42045cf54273ef4c447d81809ff16) --- .../rpk/pkg/cli/registry/schema/check_compatibility.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/go/rpk/pkg/cli/registry/schema/check_compatibility.go b/src/go/rpk/pkg/cli/registry/schema/check_compatibility.go index 9d4ec6516e28..820d79be0882 100644 --- a/src/go/rpk/pkg/cli/registry/schema/check_compatibility.go +++ b/src/go/rpk/pkg/cli/registry/schema/check_compatibility.go @@ -23,7 +23,8 @@ import ( ) type compatCheckResponse struct { - Compatible bool `json:"compatible" yaml:"compatible"` + Compatible bool `json:"compatible" yaml:"compatible"` + Messages []string `json:"messages,omitempty" yaml:"messages,omitempty"` } func newCheckCompatibilityCommand(fs afero.Fs, p *config.Params) *cobra.Command { @@ -66,9 +67,10 @@ func newCheckCompatibilityCommand(fs afero.Fs, p *config.Params) *cobra.Command Type: t, References: references, } - compatible, err := cl.CheckCompatibility(cmd.Context(), subject, version, schema) + ctx := sr.WithParams(cmd.Context(), sr.Verbose) + compatible, err := cl.CheckCompatibility(ctx, subject, version, schema) out.MaybeDie(err, "unable to check compatibility: %v", err) - if isText, _, s, err := f.Format(compatCheckResponse{compatible.Is}); !isText { + if isText, _, s, err := f.Format(compatCheckResponse{compatible.Is, compatible.Messages}); !isText { out.MaybeDie(err, "unable to print in the required format %q: %v", f.Kind, err) out.Exit(s) } @@ -76,6 +78,8 @@ func newCheckCompatibilityCommand(fs afero.Fs, p *config.Params) *cobra.Command fmt.Println("Schema is compatible.") } else { fmt.Println("Schema is not compatible.") + messages := strings.Join(compatible.Messages, "\n") + fmt.Println(messages) } }, } From 975ee8c8e3015031a871c4eb70937568bca83427 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Wed, 10 Apr 2024 23:35:34 -0700 Subject: [PATCH 7/8] dt/sr_test: Test for post_compat messages Signed-off-by: Oren Leiman (cherry picked from commit 1e31632169822c381d2276e4210e22af0262b1b4) --- tests/rptest/tests/schema_registry_test.py | 169 ++++++++++++++++++++- 1 file changed, 167 insertions(+), 2 deletions(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 09c2a5bf8bfb..a8fe6ff42dcb 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -13,6 +13,7 @@ from typing import Literal, NamedTuple, Optional import uuid import re +import urllib.parse import requests import time import random @@ -111,6 +112,95 @@ def get_subject_name(sns: str, topic: str, field: MessageField, json_number_schema_def = '{"type":"number"}' +validation_schemas = dict( + proto3=""" +syntax = "proto3"; + +message myrecord { + message Msg1 { + int32 f1 = 1; + } + Msg1 m1 = 1; + Msg1 m2 = 2; +} +""", + proto3_incompat=""" +syntax = "proto3"; + +message myrecord { + // MESSAGE_REMOVED + message Msg1d { + int32 f1 = 1; + } + // FIELD_NAMED_TYPE_CHANGED + Msg1d m1 = 1; +} +""", + proto2=""" +syntax = "proto2"; + +message myrecord { + message Msg1 { + required int32 f1 = 1; + } + required Msg1 m1 = 1; + required Msg1 m2 = 2; +} +""", + proto2_incompat=""" +syntax = "proto2"; + +message myrecord { + // MESSAGE_REMOVED + message Msg1d { + required int32 f1 = 1; + } + // FIELD_NAMED_TYPE_CHANGED + required Msg1d m1 = 1; +} +""", + avro=""" +{ + "type": "record", + "name": "myrecord", + "fields": [ + { + "name": "f1", + "type": "string" + }, + { + "name": "enumF", + "type": { + "name": "ABorC", + "type": "enum", + "symbols": ["a", "b", "c"] + } + } + ] +} +""", + avro_incompat=""" +{ + "type": "record", + "name": "myrecord", + "fields": [ + { + "name": "f1", + "type": "int" + }, + { + "name": "enumF", + "type": { + "name": "ABorC", + "type": "enum", + "symbols": ["a"] + } + } + ] +} +""", +) + log_config = LoggingConfig('info', logger_levels={ 'security': 'trace', @@ -681,10 +771,16 @@ def _post_compatibility_subject_version(self, version, data, headers=HTTP_POST_HEADERS, + verbose: bool | None = None, **kwargs): + params = {} + if verbose is not None: + params['verbose'] = verbose + return self._request( "POST", f"compatibility/subjects/{subject}/versions/{version}", + params=params, headers=headers, data=data, **kwargs) @@ -1314,6 +1410,7 @@ def test_post_compatibility_subject_version(self, subject=f"{topic}-key", version=1, data=schema_2_data) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["is_compatible"] == True + assert result_raw.json().get("messages", None) == None self.logger.debug("Set subject config - BACKWARD") result_raw = self._set_config_subject( @@ -1326,14 +1423,31 @@ def test_post_compatibility_subject_version(self, subject=f"{topic}-key", version=1, data=schema_2_data) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["is_compatible"] == True + assert result_raw.json().get("messages", None) == None + + self.logger.debug("Check compatibility backward, no default, verbose") + result_raw = self._post_compatibility_subject_version( + subject=f"{topic}-key", + version=1, + data=schema_3_data, + verbose=True) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()["is_compatible"] == False - self.logger.debug("Check compatibility backward, no default") + self.logger.debug( + "Check compatibility backward, no default, not verbose") result_raw = self._post_compatibility_subject_version( - subject=f"{topic}-key", version=1, data=schema_3_data) + subject=f"{topic}-key", + version=1, + data=schema_3_data, + verbose=False) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["is_compatible"] == False + assert result_raw.json().get("messages", None) == None, \ + f"Expected no messages, got {result_raw.json()['messages']}" self.logger.debug("Posting incompatible schema 3 as a subject key") + result_raw = self._post_subjects_subject_versions( subject=f"{topic}-key", data=schema_3_data) assert result_raw.status_code == requests.codes.conflict @@ -1374,6 +1488,57 @@ def test_post_compatibility_subject_version(self, assert result_raw.status_code == requests.codes.ok assert result_raw.json()["id"] == v1_id + @cluster(num_nodes=3) + @parametrize(schemas=("avro", "avro_incompat", "AVRO")) + @parametrize(schemas=("proto3", "proto3_incompat", "PROTOBUF")) + @parametrize(schemas=("proto2", "proto2_incompat", "PROTOBUF")) + def test_compatibility_messages(self, schemas): + """ + Verify compatibility messages + """ + + topic = create_topic_names(1)[0] + + self.logger.debug(f"Register a schema against a subject") + schema_data = json.dumps({ + "schema": validation_schemas[schemas[0]], + "schemaType": schemas[2], + }) + incompatible_data = json.dumps({ + "schema": validation_schemas[schemas[1]], + "schemaType": schemas[2], + }) + + super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS + + self.logger.debug("Posting schema as a subject key") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_data) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + v1_id = result_raw.json()["id"] + + self.logger.debug("Set subject config - BACKWARD") + result_raw = self._set_config_subject( + subject=f"{topic}-key", + data=json.dumps({"compatibility": "BACKWARD"})) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Check compatibility full") + result_raw = self._post_compatibility_subject_version( + subject=f"{topic}-key", + version=1, + data=incompatible_data, + verbose=True) + + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()["is_compatible"] == False + msgs = result_raw.json()["messages"] + for message in ["oldSchemaVersion", "oldSchema", "compatibility"]: + assert any( + message in m for m in msgs + ), f"Expected to find an instance of '{message}', got {msgs}" + @cluster(num_nodes=3) def test_delete_subject(self): """ From 12db02d0c023c440936b4030e73fe361fefa1f33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Fri, 16 Aug 2024 15:13:04 +0100 Subject: [PATCH 8/8] sr: search backwards for transitive compat checks This matches the reference implementation and provides a more useful output because the users likely prefer comparing the new schema against the most recent incompatible schema. (cherry picked from commit dcf281c964911a34058215505946ee32d4c98305) --- .../schema_registry/sharded_store.cc | 12 +++-- tests/rptest/tests/schema_registry_test.py | 49 +++++++++++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 1acaae781afb..b882f9e0039c 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -752,7 +752,8 @@ ss::future sharded_store::do_is_compatible( throw as_exception(invalid_schema_type(new_schema.type())); } - // if transitive, search all, otherwise seach forwards from version + // search backwards + // if transitive, search all, seach until version if ( compat == compatibility_level::backward_transitive || compat == compatibility_level::forward_transitive @@ -760,6 +761,9 @@ ss::future sharded_store::do_is_compatible( ver_it = versions.begin(); } + auto it = std::reverse_iterator(versions.end()); + auto it_end = std::reverse_iterator(ver_it); + auto new_valid = co_await make_valid_schema(std::move(new_schema)); compatibility_result result{.is_compat = true}; @@ -773,13 +777,13 @@ ss::future sharded_store::do_is_compatible( }; }; - for (; result.is_compat && ver_it != versions.end(); ++ver_it) { - if (ver_it->deleted) { + for (; result.is_compat && it != it_end; ++it) { + if (it->deleted) { continue; } auto old_schema = co_await get_subject_schema( - sub, ver_it->version, include_deleted::no); + sub, it->version, include_deleted::no); auto old_valid = co_await make_valid_schema( std::move(old_schema.schema)); diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index a8fe6ff42dcb..f98c95d3e4f0 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -1488,6 +1488,55 @@ def test_post_compatibility_subject_version(self, assert result_raw.status_code == requests.codes.ok assert result_raw.json()["id"] == v1_id + @cluster(num_nodes=3) + def test_post_compatibility_subject_version_transitive_order(self): + """ + Verify the compatibility message shows the latest failing schema + """ + + topic = create_topic_names(1)[0] + + schema_1_data = json.dumps({"schema": schema1_def}) + schema_2_data = json.dumps({"schema": schema2_def}) + schema_3_data = json.dumps({"schema": schema3_def}) + + self.logger.debug("Posting schema 1 as a subject key") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_1_data) + self.logger.debug(f"{result_raw=}") + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Set subject config - BACKWARD_TRANSITIVE") + result_raw = self._set_config_subject( + subject=f"{topic}-key", + data=json.dumps({"compatibility": "BACKWARD_TRANSITIVE"})) + self.logger.debug(f"{result_raw=}") + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Posting schema 2 (compatible with schema 1)") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_2_data) + self.logger.debug(result_raw, result_raw.json()) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug( + "Check compatibility schema 3 (incompatible with both schema 1 and 2) with verbose=True" + ) + result_raw = self._post_compatibility_subject_version( + subject=f"{topic}-key", + version=1, + data=schema_3_data, + verbose=True) + self.logger.debug(result_raw, result_raw.json()) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()["is_compatible"] == False + + messages = result_raw.json().get("messages", []) + assert not any(schema1_def in m for m in messages), \ + f"Expected schema 3 to be compared against schema 2 only (not schema 1)" + assert any(schema2_def in m for m in messages), \ + f"Expected schema 3 to be compared against schema 2 only (not schema 1)" + @cluster(num_nodes=3) @parametrize(schemas=("avro", "avro_incompat", "AVRO")) @parametrize(schemas=("proto3", "proto3_incompat", "PROTOBUF"))