Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema_registry: Switch Schema ID Validation enablement to cluster config #11284

10 changes: 10 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "config/node_config.h"
#include "config/validators.h"
#include "model/metadata.h"
#include "pandaproxy/schema_registry/schema_id_validation.h"
#include "security/gssapi_principal_mapper.h"
#include "security/mtls.h"
#include "storage/chunk_cache.h"
Expand Down Expand Up @@ -2100,6 +2101,15 @@ configuration::configuration()
"that unsafe strings are permitted",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
300s)
, enable_schema_id_validation(
*this,
"enable_schema_id_validation",
"Enable Server Side Schema ID Validation.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
pandaproxy::schema_registry::schema_id_validation_mode::none,
{pandaproxy::schema_registry::schema_id_validation_mode::none,
pandaproxy::schema_registry::schema_id_validation_mode::redpanda,
pandaproxy::schema_registry::schema_id_validation_mode::compat})
, kafka_schema_id_validation_cache_capacity(
*this,
"kafka_schema_id_validation_cache_capacity",
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "model/metadata.h"
#include "model/timestamp.h"
#include "net/unresolved_address.h"
#include "pandaproxy/schema_registry/schema_id_validation.h"

#include <seastar/core/sstring.hh>
#include <seastar/net/inet_address.hh>
Expand Down Expand Up @@ -423,6 +424,8 @@ struct configuration final : public config_store {
property<std::chrono::seconds> legacy_unsafe_log_warning_interval_sec;

// schema id validation
enum_property<pandaproxy::schema_registry::schema_id_validation_mode>
enable_schema_id_validation;
config::property<size_t> kafka_schema_id_validation_cache_capacity;

bounded_property<double, numeric_bounds> kafka_memory_share_for_fetch;
Expand Down
30 changes: 30 additions & 0 deletions src/v/config/convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/timestamp.h"
#include "pandaproxy/schema_registry/schema_id_validation.h"
#include "pandaproxy/schema_registry/subject_name_strategy.h"
#include "utils/string_switch.h"

Expand Down Expand Up @@ -470,4 +471,33 @@ struct convert<pandaproxy::schema_registry::subject_name_strategy> {
}
};

template<>
struct convert<pandaproxy::schema_registry::schema_id_validation_mode> {
using type = pandaproxy::schema_registry::schema_id_validation_mode;

static constexpr auto acceptable_values = std::to_array(
{to_string_view(type::none),
to_string_view(type::redpanda),
to_string_view(type::compat)});

static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); }

static bool decode(const Node& node, type& rhs) {
auto value = node.as<std::string>();

if (
std::find(acceptable_values.begin(), acceptable_values.end(), value)
== acceptable_values.end()) {
return false;
}

rhs = string_switch<type>(std::string_view{value})
.match(to_string_view(type::none), type::none)
.match(to_string_view(type::redpanda), type::redpanda)
.match(to_string_view(type::compat), type::compat);

return true;
}
};

} // namespace YAML
6 changes: 6 additions & 0 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "json/stringbuffer.h"
#include "json/writer.h"
#include "oncore.h"
#include "pandaproxy/schema_registry/schema_id_validation.h"
#include "pandaproxy/schema_registry/subject_name_strategy.h"
#include "reflection/type_traits.h"
#include "utils/intrusive_list_helpers.h"
Expand Down Expand Up @@ -621,6 +622,11 @@ consteval std::string_view property_type_name() {
pandaproxy::schema_registry::
subject_name_strategy>) {
return "string";
} else if constexpr (std::is_same_v<
type,
pandaproxy::schema_registry::
schema_id_validation_mode>) {
return "string";
} else {
static_assert(dependent_false<T>::value, "Type name not defined");
}
Expand Down
6 changes: 6 additions & 0 deletions src/v/config/rjson_serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,10 @@ void rjson_serialize(
stringize(w, v);
}

void rjson_serialize(
json::Writer<json::StringBuffer>& w,
const pandaproxy::schema_registry::schema_id_validation_mode& v) {
stringize(w, v);
}

} // namespace json
5 changes: 5 additions & 0 deletions src/v/config/rjson_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "json/json.h"
#include "json/stringbuffer.h"
#include "json/writer.h"
#include "pandaproxy/schema_registry/schema_id_validation.h"
#include "pandaproxy/schema_registry/subject_name_strategy.h"
#include "seastarx.h"

Expand Down Expand Up @@ -83,4 +84,8 @@ void rjson_serialize(
json::Writer<json::StringBuffer>& w,
const pandaproxy::schema_registry::subject_name_strategy& v);

void rjson_serialize(
json::Writer<json::StringBuffer>& w,
const pandaproxy::schema_registry::schema_id_validation_mode& v);

} // namespace json
2 changes: 0 additions & 2 deletions src/v/features/feature_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ std::string_view to_string_view(feature f) {
return "transaction_partitioning";
case feature::force_partition_reconfiguration:
return "force_partition_reconfiguration";
case feature::schema_id_validation:
return "schema_id_validation";
NyaliaLui marked this conversation as resolved.
Show resolved Hide resolved
case feature::raft_append_entries_serde:
return "raft_append_entries_serde";

Expand Down
7 changes: 0 additions & 7 deletions src/v/features/feature_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ enum class feature : std::uint64_t {
cloud_storage_manifest_format_v2 = 1ULL << 24U,
transaction_partitioning = 1ULL << 25U,
force_partition_reconfiguration = 1ULL << 26U,
schema_id_validation = 1ULL << 27U,
raft_append_entries_serde = 1ULL << 28U,

// Dummy features for testing only
Expand Down Expand Up @@ -267,12 +266,6 @@ constexpr static std::array feature_schema{
feature::force_partition_reconfiguration,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always},
feature_spec{
cluster::cluster_version{10},
"schema_id_validation",
feature::schema_id_validation,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always},
feature_spec{
cluster::cluster_version{10},
"raft_append_entries_serde",
Expand Down
5 changes: 3 additions & 2 deletions src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ create_topic_properties_update(
kafka::config_resource_operation::set);
continue;
}
if (ctx.feature_table().local().is_active(
features::feature::schema_id_validation)) {
if (
config::shard_local_cfg().enable_schema_id_validation()
!= pandaproxy::schema_registry::schema_id_validation_mode::none) {
if (schema_id_validation_config_parser(
cfg, kafka::config_resource_operation::set)) {
continue;
Expand Down
42 changes: 26 additions & 16 deletions src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "kafka/server/request_context.h"
#include "kafka/types.h"
#include "outcome.h"
#include "pandaproxy/schema_registry/schema_id_validation.h"
#include "pandaproxy/schema_registry/subject_name_strategy.h"
#include "security/acl.h"

Expand Down Expand Up @@ -397,33 +398,42 @@ class schema_id_validation_config_parser {
decltype(&props.record_key_schema_id_validation),
decltype(&props.record_key_subject_name_strategy)>;

auto prop
= string_switch<std::optional<property_t>>(cfg.name)
.match(
topic_property_record_key_schema_id_validation,
&props.record_key_schema_id_validation)
auto matcher = string_switch<std::optional<property_t>>(cfg.name);
switch (config::shard_local_cfg().enable_schema_id_validation()) {
case pandaproxy::schema_registry::schema_id_validation_mode::compat:
matcher
.match(
topic_property_record_key_schema_id_validation_compat,
&props.record_key_schema_id_validation_compat)
.match(
topic_property_record_key_subject_name_strategy,
&props.record_key_subject_name_strategy)
.match(
topic_property_record_key_subject_name_strategy_compat,
&props.record_key_subject_name_strategy_compat)
.match(
topic_property_record_value_schema_id_validation,
&props.record_value_schema_id_validation)
.match(
topic_property_record_value_schema_id_validation_compat,
&props.record_value_schema_id_validation_compat)
.match(
topic_property_record_value_subject_name_strategy,
&props.record_value_subject_name_strategy)
.match(
topic_property_record_value_subject_name_strategy_compat,
&props.record_value_subject_name_strategy_compat)
.default_match(std::nullopt);
&props.record_value_subject_name_strategy_compat);
[[fallthrough]];
case pandaproxy::schema_registry::schema_id_validation_mode::redpanda:
matcher
.match(
topic_property_record_key_schema_id_validation,
&props.record_key_schema_id_validation)
.match(
topic_property_record_key_subject_name_strategy,
&props.record_key_subject_name_strategy)
.match(
topic_property_record_value_schema_id_validation,
&props.record_value_schema_id_validation)
.match(
topic_property_record_value_subject_name_strategy,
&props.record_value_subject_name_strategy);
[[fallthrough]];
case pandaproxy::schema_registry::schema_id_validation_mode::none:
break;
}
auto prop = matcher.default_match(std::nullopt);
if (prop.has_value()) {
ss::visit(
prop.value(), [&cfg, op](auto& p) { apply(*p, cfg.value, op); });
Expand Down
Loading