Skip to content

Commit

Permalink
sr/test: Unit tests for protobuf compat messages
Browse files Browse the repository at this point in the history
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
  • Loading branch information
oleiman authored and pgellert committed Aug 20, 2024
1 parent b6d9e9d commit 54e856f
Showing 1 changed file with 220 additions and 0 deletions.
220 changes: 220 additions & 0 deletions src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@
#include "pandaproxy/schema_registry/exceptions.h"
#include "pandaproxy/schema_registry/protobuf.h"
#include "pandaproxy/schema_registry/sharded_store.h"
#include "pandaproxy/schema_registry/test/compatibility_common.h"
#include "pandaproxy/schema_registry/types.h"

#include <seastar/testing/thread_test_case.hh>

#include <absl/container/flat_hash_set.h>
#include <boost/test/unit_test.hpp>

#include <array>
#include <utility>

namespace pp = pandaproxy;
namespace pps = pp::schema_registry;

namespace {

struct simple_sharded_store {
simple_sharded_store() {
store.start(pps::is_mutable::yes, ss::default_smp_service_group())
Expand Down Expand Up @@ -77,6 +82,22 @@ bool check_compatible(
.get();
}

pps::compatibility_result check_compatible_verbose(
const pps::canonical_schema_definition& r,
const pps::canonical_schema_definition& w) {
pps::sharded_store s;
return check_compatible(
pps::make_protobuf_schema_definition(
s, {pps::subject("r"), {r.shared_raw(), pps::schema_type::protobuf}})
.get(),
pps::make_protobuf_schema_definition(
s, {pps::subject("w"), {w.shared_raw(), pps::schema_type::protobuf}})
.get(),
pps::verbose::yes);
}

} // namespace

SEASTAR_THREAD_TEST_CASE(test_protobuf_simple) {
simple_sharded_store store;

Expand Down Expand Up @@ -758,3 +779,202 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_compatibility_oneof_fully_removed) {
R"(syntax = "proto3"; message Simple { int32 other = 3; })",
R"(syntax = "proto3"; message Simple { oneof wrapper { int32 id = 1; int32 new_id = 2; } int32 other = 3; })"));
}

namespace {

const pps::canonical_schema_definition proto2_old{
R"(syntax = "proto2";
message someMessage {
required int32 a = 1;
}
message myrecord {
message Msg1 {
required int32 f1 = 1;
}
required Msg1 m1 = 1;
required Msg1 m2 = 2;
required int32 i1 = 3;
required int32 i2 = 4;
oneof union {
int32 u1 = 5;
string u2 = 6;
bool u3 = 23;
bool u4 = 40;
}
required int32 notu1 = 7;
required string notu2 = 8;
})",
pps::schema_type::protobuf};

const pps::canonical_schema_definition proto2_new{
R"(syntax = "proto2";
message myrecord {
message Msg1d {
required int32 f1 = 1;
}
required Msg1d m1 = 1;
required int32 m2 = 2;
required string i1 = 3;
// required int32 i2 = 4;
oneof union {
int32 u1 = 5;
string u2 = 16;
string u3 = 23;
}
required bool u4 = 40;
oneof union2 {
int32 notu1 = 7;
string notu2 = 8;
}
required string whoops = 12;
})",
pps::schema_type::protobuf};

const pps::canonical_schema_definition proto3_old{
R"(syntax = "proto3";
message someMessage {
int32 a = 1;
}
message myrecord {
message Msg1 {
int32 f1 = 1;
}
Msg1 m1 = 1;
Msg1 m2 = 2;
int32 i1 = 3;
int32 i2 = 4;
oneof union {
int32 u1 = 5;
string u2 = 6;
bool u3 = 23;
bool u4 = 40;
}
int32 notu1 = 7;
string notu2 = 8;
}
)",
pps::schema_type::protobuf};

const pps::canonical_schema_definition proto3_new{
R"(syntax = "proto3";
message myrecord {
message Msg1d {
int32 f1 = 1;
}
Msg1d m1 = 1;
int32 m2 = 2;
string i1 = 3;
// int32 i2 = 4;
oneof union {
int32 u1 = 5;
string u2 = 16;
string u3 = 23;
}
bool u4 = 40;
oneof union2 {
int32 notu1 = 7;
string notu2 = 8;
}
string whoops = 12;
})",
pps::schema_type::protobuf};

using incompatibility = pps::proto_incompatibility;

const absl::flat_hash_set<incompatibility> forward_expected{
{"#/myrecord/union/16", incompatibility::Type::oneof_field_removed},
{"#/myrecord/union/23", incompatibility::Type::field_scalar_kind_changed},
{"#/myrecord/1", incompatibility::Type::field_named_type_changed},
{"#/myrecord/2", incompatibility::Type::field_kind_changed},
{"#/myrecord/3", incompatibility::Type::field_scalar_kind_changed},
{"#/myrecord/Msg1d", incompatibility::Type::message_removed},
// These are ignored for proto3 schemas
{"#/myrecord/4", incompatibility::Type::required_field_added},
{"#/myrecord/7", incompatibility::Type::required_field_added},
{"#/myrecord/8", incompatibility::Type::required_field_added},
{"#/myrecord/12", incompatibility::Type::required_field_removed},
};

const absl::flat_hash_set<incompatibility> backward_expected{
{"#/someMessage", incompatibility::Type::message_removed},
{"#/myrecord/union2", incompatibility::Type::multiple_fields_moved_to_oneof},
{"#/myrecord/union/6", incompatibility::Type::oneof_field_removed},
{"#/myrecord/union/23", incompatibility::Type::field_scalar_kind_changed},
{"#/myrecord/union/40", incompatibility::Type::oneof_field_removed},
{"#/myrecord/1", incompatibility::Type::field_named_type_changed},
{"#/myrecord/2", incompatibility::Type::field_kind_changed},
{"#/myrecord/3", incompatibility::Type::field_scalar_kind_changed},
{"#/myrecord/Msg1", incompatibility::Type::message_removed},
// These are ignored for proto3 schemas
{"#/myrecord/4", incompatibility::Type::required_field_removed},
{"#/myrecord/40", incompatibility::Type::required_field_added},
{"#/myrecord/12", incompatibility::Type::required_field_added},
};

absl::flat_hash_set<incompatibility>
remove_proto2_incompatibilites(absl::flat_hash_set<incompatibility> exp) {
absl::erase_if(exp, [](const auto& e) {
return (
e.type() == incompatibility::Type::required_field_removed
|| e.type() == incompatibility::Type::required_field_added);
});
return exp;
}

const auto compat_data = std::to_array<compat_test_data<incompatibility>>({
{
proto2_old.copy(),
proto2_new.copy(),
forward_expected,
},
{
proto2_new.copy(),
proto2_old.copy(),
backward_expected,
},
{
proto3_old.copy(),
proto3_new.copy(),
remove_proto2_incompatibilites(forward_expected),
},
{
proto3_new.copy(),
proto3_old.copy(),
remove_proto2_incompatibilites(backward_expected),
},
});
} // namespace

SEASTAR_THREAD_TEST_CASE(test_protobuf_compat_messages) {
for (const auto& cd : compat_data) {
auto compat = check_compatible_verbose(cd.reader, cd.writer);
absl::flat_hash_set<ss::sstring> errs{
compat.messages.begin(), compat.messages.end()};
absl::flat_hash_set<ss::sstring> expected{
cd.expected.messages.begin(), cd.expected.messages.end()};
BOOST_CHECK(!compat.is_compat);
BOOST_CHECK_EQUAL(errs.size(), expected.size());
BOOST_REQUIRE_EQUAL(errs, expected);
}
}

0 comments on commit 54e856f

Please sign in to comment.