Skip to content

Commit

Permalink
sr: Introduce compatility module
Browse files Browse the repository at this point in the history
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
  • Loading branch information
oleiman authored and pgellert committed Aug 19, 2024
1 parent 3771501 commit b27954c
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/v/pandaproxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ redpanda_cc_library(
"rest/proxy.cc",
"schema_registry/api.cc",
"schema_registry/avro.cc",
"schema_registry/compatibility.cc",
"schema_registry/configuration.cc",
"schema_registry/error.cc",
"schema_registry/handlers.cc",
Expand Down Expand Up @@ -189,6 +190,7 @@ redpanda_cc_library(
"rest/proxy.h",
"schema_registry/api.h",
"schema_registry/avro.h",
"schema_registry/compatibility.h",
"schema_registry/configuration.h",
"schema_registry/error.h",
"schema_registry/errors.h",
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ v_cc_library(
SRCS
api.cc
configuration.cc
compatibility.cc
handlers.cc
error.cc
service.cc
Expand Down
174 changes: 174 additions & 0 deletions src/v/pandaproxy/schema_registry/compatibility.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "pandaproxy/schema_registry/compatibility.h"

namespace pandaproxy::schema_registry {

std::ostream& operator<<(std::ostream& os, const avro_incompatibility_type& t) {
switch (t) {
case avro_incompatibility_type::name_mismatch:
return os << "NAME_MISMATCH";
case avro_incompatibility_type::fixed_size_mismatch:
return os << "FIXED_SIZE_MISMATCH";
case avro_incompatibility_type::missing_enum_symbols:
return os << "MISSING_ENUM_SYMBOLS";
case avro_incompatibility_type::reader_field_missing_default_value:
return os << "READER_FIELD_MISSING_DEFAULT_VALUE";
case avro_incompatibility_type::type_mismatch:
return os << "TYPE_MISMATCH";
case avro_incompatibility_type::missing_union_branch:
return os << "MISSING_UNION_BRANCH";
case avro_incompatibility_type::unknown:
return os << "UNKNOWN";
};
__builtin_unreachable();
}

std::string_view description_for_type(avro_incompatibility_type t) {
switch (t) {
case avro_incompatibility_type::name_mismatch:
return "The name of the schema has changed (path '{path}')";
case avro_incompatibility_type::fixed_size_mismatch:
return "The size of FIXED type field at path '{path}' in the "
"{{reader}} schema does not match with the {{writer}} schema";
case avro_incompatibility_type::missing_enum_symbols:
return "The {{reader}} schema is missing enum symbols '{additional}' "
"at path '{path}' in the {{writer}} schema";
case avro_incompatibility_type::reader_field_missing_default_value:
return "The field '{additional}' at path '{path}' in the {{reader}} "
"schema has "
"no default value and is missing in the {{writer}}";
case avro_incompatibility_type::type_mismatch:
return "The type (path '{path}') of a field in the {{reader}} schema "
"does not match with the {{writer}} schema";
case avro_incompatibility_type::missing_union_branch:
return "The {{reader}} schema is missing a type inside a union field "
"at path '{path}' in the {{writer}} schema";
case avro_incompatibility_type::unknown:
return "{{reader}} schema is not compatible with {{writer}} schema: "
"check '{path}'";
};
__builtin_unreachable();
}

std::ostream& operator<<(std::ostream& os, const avro_incompatibility& v) {
fmt::print(
os,
"{{errorType:'{}', description:'{}', additionalInfo:'{}'}}",
v._type,
v.describe(),
v._additional_info);
return os;
}

ss::sstring avro_incompatibility::describe() const {
return fmt::format(
fmt::runtime(description_for_type(_type)),
fmt::arg("path", _path.string()),
fmt::arg("additional", _additional_info));
}

std::ostream&
operator<<(std::ostream& os, const proto_incompatibility_type& t) {
switch (t) {
case proto_incompatibility_type::message_removed:
return os << "MESSAGE_REMOVED";
case proto_incompatibility_type::field_kind_changed:
return os << "FIELD_KIND_CHANGED";
case proto_incompatibility_type::field_scalar_kind_changed:
return os << "FIELD_SCALAR_KIND_CHANGED";
case proto_incompatibility_type::field_named_type_changed:
return os << "FIELD_NAMED_TYPE_CHANGED";
case proto_incompatibility_type::required_field_added:
return os << "REQUIRED_FIELD_ADDED";
case proto_incompatibility_type::required_field_removed:
return os << "REQUIRED_FIELD_REMOVED";
case proto_incompatibility_type::oneof_field_removed:
return os << "ONEOF_FIELD_REMOVED";
case proto_incompatibility_type::multiple_fields_moved_to_oneof:
return os << "MULTIPLE_FIELDS_MOVED_TO_ONEOF";
case proto_incompatibility_type::unknown:
return os << "UNKNOWN";
}
__builtin_unreachable();
}

std::string_view description_for_type(proto_incompatibility_type t) {
switch (t) {
case proto_incompatibility_type::message_removed:
return "The {{reader}} schema is missing a field of type MESSAGE at "
"path '{path}' in the {{writer}} schema";
case proto_incompatibility_type::field_kind_changed:
return "The type of a field at path '{path}' in the {{reader}} "
"schema does not match the {{writer}} schema";
case proto_incompatibility_type::field_scalar_kind_changed:
return "The kind of a SCALAR field at path '{path}' in the {{reader}} "
"schema does not match its kind in the {{writer}} schema";
case proto_incompatibility_type::field_named_type_changed:
return "The type of a MESSAGE field at path '{path}' in the {{reader}} "
"schema does not match its type in the {{writer}} schema ";
case proto_incompatibility_type::required_field_added:
return "A required field at path '{path}' in the {{reader}} schema "
"is missing in the {{writer}} schema";
case proto_incompatibility_type::required_field_removed:
return "The {{reader}} schema is missing a required field at path: "
"'{path}' in the {{writer}} schema";
case proto_incompatibility_type::oneof_field_removed:
return "The {{reader}} schema is missing a oneof field at path "
"'{path}' in the {{writer}} schema";
case proto_incompatibility_type::multiple_fields_moved_to_oneof:
return "Multiple fields in the oneof at path '{path}' in the "
"{{reader}} schema are outside a oneof in the {{writer}} "
"schema ";
case proto_incompatibility_type::unknown:
return "{{reader}} schema is not compatible with {{writer}} schema: "
"check '{path}'";
}
__builtin_unreachable();
}

std::ostream& operator<<(std::ostream& os, const proto_incompatibility& v) {
fmt::print(
os, "{{errorType:'{}', description:'{}'}}", v._type, v.describe());
return os;
}

ss::sstring proto_incompatibility::describe() const {
return fmt::format(
fmt::runtime(description_for_type(_type)),
fmt::arg("path", _path.string()));
}

compatibility_result
raw_compatibility_result::operator()(verbose is_verbose) && {
compatibility_result result = {.is_compat = !has_error()};
if (is_verbose) {
result.messages.reserve(_errors.size());
std::transform(
std::make_move_iterator(_errors.begin()),
std::make_move_iterator(_errors.end()),
std::back_inserter(result.messages),
[](auto e) {
return std::visit(
[](auto&& e) { return fmt::format("{{{}}}", e); }, e);
});
}
return result;
}

void raw_compatibility_result::merge(raw_compatibility_result&& other) {
_errors.reserve(_errors.size() + other._errors.size());
std::move(
other._errors.begin(), other._errors.end(), std::back_inserter(_errors));
}

} // namespace pandaproxy::schema_registry
174 changes: 174 additions & 0 deletions src/v/pandaproxy/schema_registry/compatibility.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "base/vassert.h"
#include "pandaproxy/schema_registry/types.h"

#include <fmt/format.h>

#include <filesystem>
#include <string_view>
#include <vector>

/**
* compatibility.h
*
* Support classes for tracking, accumulating, and emitting formatted error
* messages while checking compatibility of avro & protobuf schemas.
*/

namespace pandaproxy::schema_registry {

enum class avro_incompatibility_type {
name_mismatch = 0,
fixed_size_mismatch,
missing_enum_symbols,
reader_field_missing_default_value,
type_mismatch,
missing_union_branch,
unknown,
};

/**
* avro_incompatibility - A single incompatibility between Avro schemas.
*
* Encapsulates:
* - the path to the location of the incompatibility in the _writer_ schema
* - the type of incompatibility
* - any additional context for the incompatibility (e.g. a field name)
*
* Primary interface is `describe`, which combines the contained info into
* a format string which can then be interpolated with identifying info for
* the reader and writer schema in the request handler.
*/
class avro_incompatibility {
public:
using Type = avro_incompatibility_type;
avro_incompatibility(
std::filesystem::path path, Type type, std::string_view info)
: _path(std::move(path))
, _type(type)
, _additional_info(info) {}

avro_incompatibility(std::filesystem::path path, Type type)
: avro_incompatibility(std::move(path), type, "") {}

ss::sstring describe() const;

private:
friend std::ostream&
operator<<(std::ostream& os, const avro_incompatibility& v);

friend bool
operator==(const avro_incompatibility&, const avro_incompatibility&)
= default;

// Useful for unit testing
template<typename H>
friend H AbslHashValue(H h, const avro_incompatibility& e) {
return H::combine(
std::move(h), e._path.string(), e._type, e._additional_info);
}

std::filesystem::path _path;
Type _type;
ss::sstring _additional_info;
};

enum class proto_incompatibility_type {
message_removed = 0,
field_kind_changed,
field_scalar_kind_changed,
field_named_type_changed,
required_field_added,
required_field_removed,
oneof_field_removed,
multiple_fields_moved_to_oneof,
unknown,
};

/**
* proto_incompatibility - A single incompatibility between Protobuf schemas.
*
* Encapsulates:
* - the path to the location of the incompatibility in the _writer_ schema
* - the type of incompatibility
*
* Primary interface is `describe`, which combines the contained info into
* a format string which can then be interpolated with identifying info for
* the reader and writer schemas in the request handler.
*/
class proto_incompatibility {
public:
using Type = proto_incompatibility_type;
proto_incompatibility(std::filesystem::path path, Type type)
: _path(std::move(path))
, _type(type) {}

ss::sstring describe() const;
Type type() const { return _type; }

private:
friend std::ostream&
operator<<(std::ostream& os, const proto_incompatibility& v);

friend bool
operator==(const proto_incompatibility&, const proto_incompatibility&)
= default;

// Helpful for unit testing
template<typename H>
friend H AbslHashValue(H h, const proto_incompatibility& e) {
return H::combine(std::move(h), e._path.string(), e._type);
}

std::filesystem::path _path;
Type _type;
};

/**
* raw_compatibility_result - A collection of unformatted proto or avro
* incompatibilities. Its purpose is twofold:
* - Provide an abstracted way to accumulate incompatibilities across
* a recursive call chain. The `merge` function makes this simple
* and seeks to avoid excessive copying.
* - Provide a (type-constrained) generic means to process raw
* incompatibilities into formatted error messages.
*/
class raw_compatibility_result {
using schema_incompatibility
= std::variant<avro_incompatibility, proto_incompatibility>;

public:
raw_compatibility_result() = default;

template<typename T, typename... Args>
requires std::constructible_from<T, Args&&...>
&& std::convertible_to<T, schema_incompatibility>
auto emplace(Args&&... args) {
return _errors.emplace_back(
std::in_place_type<T>, std::forward<Args>(args)...);
}

compatibility_result operator()(verbose is_verbose) &&;

// Move the contents of other into the errors vec of this
void merge(raw_compatibility_result&& other);

bool has_error() const { return !_errors.empty(); }

private:
std::vector<schema_incompatibility> _errors{};
};

} // namespace pandaproxy::schema_registry

0 comments on commit b27954c

Please sign in to comment.