From 92ace070533cfadddad41030bacc3b9a09fe6d59 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 7 Jun 2023 17:12:19 +0100 Subject: [PATCH 1/9] schema_registry: Introduce schema_id_validation_mode Signed-off-by: Ben Pope --- .../schema_registry/schema_id_validation.h | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 src/v/pandaproxy/schema_registry/schema_id_validation.h diff --git a/src/v/pandaproxy/schema_registry/schema_id_validation.h b/src/v/pandaproxy/schema_registry/schema_id_validation.h new file mode 100644 index 0000000000000..1a279c324685c --- /dev/null +++ b/src/v/pandaproxy/schema_registry/schema_id_validation.h @@ -0,0 +1,62 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "utils/string_switch.h" + +#include + +#include + +namespace pandaproxy::schema_registry { + +enum class schema_id_validation_mode { + // Disabled + none = 0, + // Use Redpanda topic properties + redpanda, + // Use Redpanda and compatible topic properties + compat, +}; + +constexpr std::string_view to_string_view(schema_id_validation_mode m) { + switch (m) { + case schema_id_validation_mode::none: + return "none"; + case schema_id_validation_mode::redpanda: + return "redpanda"; + case schema_id_validation_mode::compat: + return "compat"; + } +} + +inline std::ostream& operator<<(std::ostream& o, schema_id_validation_mode m) { + return o << to_string_view(m); +} + +inline std::istream& operator>>(std::istream& i, schema_id_validation_mode& m) { + seastar::sstring s; + i >> s; + m = string_switch(s) + .match( + to_string_view(schema_id_validation_mode::none), + schema_id_validation_mode::none) + .match( + to_string_view(schema_id_validation_mode::redpanda), + schema_id_validation_mode::redpanda) + .match( + to_string_view(schema_id_validation_mode::compat), + schema_id_validation_mode::compat); + return i; +} + +} // namespace pandaproxy::schema_registry From ef8351ba0974391d59747114df1d340111873400 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 7 Jun 2023 19:45:18 +0100 Subject: [PATCH 2/9] config: Support schema_id_validation_mode Signed-off-by: Ben Pope --- src/v/config/convert.h | 30 +++++++++++++++++++++++++++++ src/v/config/property.h | 6 ++++++ src/v/config/rjson_serialization.cc | 6 ++++++ src/v/config/rjson_serialization.h | 5 +++++ 4 files changed, 47 insertions(+) diff --git a/src/v/config/convert.h b/src/v/config/convert.h index 17a074edbda3d..58ed7770769c2 100644 --- a/src/v/config/convert.h +++ b/src/v/config/convert.h @@ -15,6 +15,7 @@ #include "model/fundamental.h" #include "model/metadata.h" #include "model/timestamp.h" +#include "pandaproxy/schema_registry/schema_id_validation.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" #include "utils/string_switch.h" @@ -470,4 +471,33 @@ struct convert { } }; +template<> +struct convert { + using type = pandaproxy::schema_registry::schema_id_validation_mode; + + static constexpr auto acceptable_values = std::to_array( + {to_string_view(type::none), + to_string_view(type::redpanda), + to_string_view(type::compat)}); + + static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); } + + static bool decode(const Node& node, type& rhs) { + auto value = node.as(); + + if ( + std::find(acceptable_values.begin(), acceptable_values.end(), value) + == acceptable_values.end()) { + return false; + } + + rhs = string_switch(std::string_view{value}) + .match(to_string_view(type::none), type::none) + .match(to_string_view(type::redpanda), type::redpanda) + .match(to_string_view(type::compat), type::compat); + + return true; + } +}; + } // namespace YAML diff --git a/src/v/config/property.h b/src/v/config/property.h index 0de13fa4defcf..71e7611f75013 100644 --- a/src/v/config/property.h +++ b/src/v/config/property.h @@ -15,6 +15,7 @@ #include "json/stringbuffer.h" #include "json/writer.h" #include "oncore.h" +#include "pandaproxy/schema_registry/schema_id_validation.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" #include "reflection/type_traits.h" #include "utils/intrusive_list_helpers.h" @@ -621,6 +622,11 @@ consteval std::string_view property_type_name() { pandaproxy::schema_registry:: subject_name_strategy>) { return "string"; + } else if constexpr (std::is_same_v< + type, + pandaproxy::schema_registry:: + schema_id_validation_mode>) { + return "string"; } else { static_assert(dependent_false::value, "Type name not defined"); } diff --git a/src/v/config/rjson_serialization.cc b/src/v/config/rjson_serialization.cc index 36b6369ba78e3..9ca9a141868a0 100644 --- a/src/v/config/rjson_serialization.cc +++ b/src/v/config/rjson_serialization.cc @@ -172,4 +172,10 @@ void rjson_serialize( stringize(w, v); } +void rjson_serialize( + json::Writer& w, + const pandaproxy::schema_registry::schema_id_validation_mode& v) { + stringize(w, v); +} + } // namespace json diff --git a/src/v/config/rjson_serialization.h b/src/v/config/rjson_serialization.h index 4e29df79e669b..501ba3a75bf42 100644 --- a/src/v/config/rjson_serialization.h +++ b/src/v/config/rjson_serialization.h @@ -19,6 +19,7 @@ #include "json/json.h" #include "json/stringbuffer.h" #include "json/writer.h" +#include "pandaproxy/schema_registry/schema_id_validation.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" #include "seastarx.h" @@ -83,4 +84,8 @@ void rjson_serialize( json::Writer& w, const pandaproxy::schema_registry::subject_name_strategy& v); +void rjson_serialize( + json::Writer& w, + const pandaproxy::schema_registry::schema_id_validation_mode& v); + } // namespace json From 3d9da45a93f626ad7d37fe5fcfa4bc48cba4bd7e Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 7 Jun 2023 14:34:33 +0100 Subject: [PATCH 3/9] config: Introduce enable_schema_id_validation The configuration globally enables or disables Schema ID validation. Signed-off-by: Ben Pope --- src/v/config/configuration.cc | 10 ++++++++++ src/v/config/configuration.h | 3 +++ 2 files changed, 13 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 1baaeca06b506..e8161f2063e1c 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -14,6 +14,7 @@ #include "config/node_config.h" #include "config/validators.h" #include "model/metadata.h" +#include "pandaproxy/schema_registry/schema_id_validation.h" #include "security/gssapi_principal_mapper.h" #include "security/mtls.h" #include "storage/chunk_cache.h" @@ -2100,6 +2101,15 @@ configuration::configuration() "that unsafe strings are permitted", {.needs_restart = needs_restart::no, .visibility = visibility::user}, 300s) + , enable_schema_id_validation( + *this, + "enable_schema_id_validation", + "Enable Server Side Schema ID Validation.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + pandaproxy::schema_registry::schema_id_validation_mode::none, + {pandaproxy::schema_registry::schema_id_validation_mode::none, + pandaproxy::schema_registry::schema_id_validation_mode::redpanda, + pandaproxy::schema_registry::schema_id_validation_mode::compat}) , kafka_schema_id_validation_cache_capacity( *this, "kafka_schema_id_validation_cache_capacity", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index f791e263bb246..d1e51c6ed62e0 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -25,6 +25,7 @@ #include "model/metadata.h" #include "model/timestamp.h" #include "net/unresolved_address.h" +#include "pandaproxy/schema_registry/schema_id_validation.h" #include #include @@ -423,6 +424,8 @@ struct configuration final : public config_store { property legacy_unsafe_log_warning_interval_sec; // schema id validation + enum_property + enable_schema_id_validation; config::property kafka_schema_id_validation_cache_capacity; bounded_property kafka_memory_share_for_fetch; From c16384624e506a3293d10bc04c64dabe3e277db3 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 7 Jun 2023 15:57:20 +0100 Subject: [PATCH 4/9] schema_registry: Switch schema id validation to config Instead of using a feature for enabling and disabling Schema ID Validation, use the cluster configuration parameter `enable_schema_id_validation`. Disable some testing, since the feture is no longer on by default. Additional property testing will be added in a later commit. Signed-off-by: Ben Pope --- src/v/kafka/server/handlers/alter_configs.cc | 5 +- .../kafka/server/handlers/describe_configs.cc | 5 +- .../handlers/incremental_alter_configs.cc | 7 ++- src/v/kafka/server/tests/alter_config_test.cc | 10 ++++ .../pandaproxy/schema_registry/validation.cc | 6 +- tests/rptest/tests/cluster_config_test.py | 3 + tests/rptest/tests/schema_registry_test.py | 60 ++++++------------- tests/rptest/tests/topic_creation_test.py | 8 --- 8 files changed, 46 insertions(+), 58 deletions(-) diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 676a35357d099..55423cf2fce04 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -232,8 +232,9 @@ create_topic_properties_update( kafka::config_resource_operation::set); continue; } - if (ctx.feature_table().local().is_active( - features::feature::schema_id_validation)) { + if ( + config::shard_local_cfg().enable_schema_id_validation() + != pandaproxy::schema_registry::schema_id_validation_mode::none) { if (schema_id_validation_config_parser( cfg, kafka::config_resource_operation::set)) { continue; diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index 70fae38a3a0ce..4f461e8061e0d 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -810,8 +810,9 @@ ss::future describe_configs_handler::handle( request.data.include_documentation, config::shard_local_cfg().log_segment_ms.desc())); - if (ctx.feature_table().local().is_active( - features::feature::schema_id_validation)) { + if ( + config::shard_local_cfg().enable_schema_id_validation() + != pandaproxy::schema_registry::schema_id_validation_mode::none) { constexpr std::string_view key_validation = "Enable validation of the schema id for keys on a record"; constexpr std::string_view val_validation diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index d85c1b4e4a913..94d6b9685381f 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -123,7 +123,7 @@ bool valid_config_resource_operation(uint8_t v) { checked create_topic_properties_update( - request_context& ctx, incremental_alter_configs_resource& resource) { + request_context&, incremental_alter_configs_resource& resource) { model::topic_namespace tp_ns( model::kafka_namespace, model::topic(resource.resource_name)); cluster::topic_properties_update update(tp_ns); @@ -241,8 +241,9 @@ create_topic_properties_update( update.properties.segment_ms, cfg.value, op); continue; } - if (ctx.feature_table().local().is_active( - features::feature::schema_id_validation)) { + if ( + config::shard_local_cfg().enable_schema_id_validation() + != pandaproxy::schema_registry::schema_id_validation_mode::none) { if (schema_id_validation_config_parser(cfg, op)) { continue; } diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 26855522ff140..968515676c809 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "cluster/config_frontend.h" #include "config/configuration.h" #include "kafka/protocol/alter_configs.h" #include "kafka/protocol/create_topics.h" @@ -330,6 +331,15 @@ FIXTURE_TEST( FIXTURE_TEST( test_topic_describe_configs_requested_properties, alter_config_test_fixture) { wait_for_controller_leadership().get(); + app.controller->get_config_frontend() + .invoke_on_all([](cluster::config_frontend& cfg_frontend) { + cluster::config_update_request r{ + .upsert = {{"enable_schema_id_validation", "compat"}}}; + return cfg_frontend.patch(r, model::timeout_clock::now() + 1s) + .discard_result(); + }) + .get(); + model::topic test_tp{"topic-1"}; create_topic(test_tp, 6); diff --git a/src/v/pandaproxy/schema_registry/validation.cc b/src/v/pandaproxy/schema_registry/validation.cc index f361c5ec8bac2..eb3f40c49fb52 100644 --- a/src/v/pandaproxy/schema_registry/validation.cc +++ b/src/v/pandaproxy/schema_registry/validation.cc @@ -24,6 +24,7 @@ #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/protobuf.h" #include "pandaproxy/schema_registry/schema_id_cache.h" +#include "pandaproxy/schema_registry/schema_id_validation.h" #include "pandaproxy/schema_registry/seq_writer.h" #include "pandaproxy/schema_registry/sharded_store.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" @@ -357,8 +358,9 @@ class schema_id_validator::impl { // If Schema Registry is not enabled, the safe default is to reject co_return kafka::error_code::invalid_record; } - if (!_api->_controller->get_feature_table().local().is_active( - features::feature::schema_id_validation)) { + if ( + config::shard_local_cfg().enable_schema_id_validation() + == pandaproxy::schema_registry::schema_id_validation_mode::none) { co_return std::move(rbr); } diff --git a/tests/rptest/tests/cluster_config_test.py b/tests/rptest/tests/cluster_config_test.py index 20148fd7de08b..847149a00225a 100644 --- a/tests/rptest/tests/cluster_config_test.py +++ b/tests/rptest/tests/cluster_config_test.py @@ -556,6 +556,9 @@ def test_valid_settings(self): # Don't enable coproc: it generates log errors if its companion service isn't running exclude_settings.add('enable_coproc') + # Don't enable schema id validation: the interdepedencies are too complex and are tested elsewhere. + exclude_settings.add('enable_schema_id_validation') + initial_config = self.admin.get_cluster_config() for name, p in schema_properties.items(): diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index a0be01dbdac78..ff29566637196 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -7,7 +7,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from enum import IntEnum +from enum import Enum import http.client import json from typing import Optional @@ -32,11 +32,18 @@ from rptest.services.cluster import cluster from rptest.services.redpanda import ResourceSettings, SecurityConfig, LoggingConfig, PandaproxyConfig, SchemaRegistryConfig from rptest.services.serde_client import SerdeClient +from rptest.tests.cluster_config_test import wait_for_version_status_sync from rptest.tests.pandaproxy_test import User, PandaProxyTLSProvider from rptest.tests.redpanda_test import RedpandaTest from rptest.util import inject_remote_script, search_logs_with_timeout +class SchemaIdValidationMode(str, Enum): + NONE = "none" + REDPANDA = "redpanda" + COMPAT = "compat" + + def create_topic_names(count): return list(f"pandaproxy-topic-{uuid.uuid4()}" for _ in range(count)) @@ -1264,11 +1271,8 @@ def test_schema_id_validation(self, validate_schema_id: Optional[bool] = None, subject_name_strategy: Optional[str] = None, payload_class: str = "com.redpanda.Payload"): - Admin(self.redpanda).put_feature("schema_id_validation", - {"state": "active"}) - self.redpanda.await_feature("schema_id_validation", - True, - timeout_sec=10) + self.redpanda.set_cluster_config( + {'enable_schema_id_validation': SchemaIdValidationMode.COMPAT}) def get_next_strategy(subject_name_strategy): all_strategies = list(TopicSpec.SubjectNameStrategyCompat) @@ -1305,12 +1309,6 @@ def bool_alpha(b: bool) -> str: f"Connecting to redpanda: {self.redpanda.brokers()} schema_Reg: {schema_reg}" ) - Admin(self.redpanda).put_feature("schema_id_validation", - {"state": "active"}) - self.redpanda.await_feature("schema_id_validation", - True, - timeout_sec=10) - # Test against misconfigered strategy client = self._get_serde_client( protocol, @@ -2161,7 +2159,12 @@ def test_mtls_and_basic_auth(self): class SchemaValidationTopicPropertiesTest(RedpandaTest): def __init__(self, *args, **kwargs): super(SchemaValidationTopicPropertiesTest, - self).__init__(*args, **kwargs) + self).__init__(*args, + extra_rp_conf={ + 'enable_schema_id_validation': + SchemaIdValidationMode.COMPAT.value + }, + **kwargs) self.rpk = RpkTool(self.redpanda) self.admin = Admin(self.redpanda) @@ -2171,10 +2174,9 @@ def test_schema_id_validation_disabled_config(self): When the feature is disabled, the configs should not appear ''' - self.admin.put_feature("schema_id_validation", {"state": "disabled"}) - self.redpanda.await_feature("schema_id_validation", - False, - timeout_sec=10) + self.redpanda.set_cluster_config( + {'enable_schema_id_validation': SchemaIdValidationMode.NONE}) + topic = "default-topic" self.rpk.create_topic(topic) desc = self.rpk.describe_topic_configs(topic) @@ -2190,10 +2192,6 @@ def test_schema_id_validation_active_config(self): When the feature is active, the configs should be default ''' - self.admin.put_feature("schema_id_validation", {"state": "active"}) - self.redpanda.await_feature("schema_id_validation", - True, - timeout_sec=10) topic = "default-topic" self.rpk.create_topic(topic) desc = self.rpk.describe_topic_configs(topic) @@ -2214,10 +2212,6 @@ def test_schema_id_validation_active_explicit_default_config(self): dynamic, so that tools with a reconcialiation loop aren't confused ''' - self.admin.put_feature("schema_id_validation", {"state": "active"}) - self.redpanda.await_feature("schema_id_validation", - True, - timeout_sec=10) topic = "default-topic" self.rpk.create_topic( topic, @@ -2272,10 +2266,6 @@ def test_schema_id_validation_active_nondefault_config(self): as dyamic ''' - self.admin.put_feature("schema_id_validation", {"state": "active"}) - self.redpanda.await_feature("schema_id_validation", - True, - timeout_sec=10) topic = "default-topic" self.rpk.create_topic( topic, @@ -2309,10 +2299,6 @@ def test_schema_id_validation_active_nondefault_compat_config(self): as dyamic ''' - self.admin.put_feature("schema_id_validation", {"state": "active"}) - self.redpanda.await_feature("schema_id_validation", - True, - timeout_sec=10) topic = "default-topic" self.rpk.create_topic( topic, @@ -2349,10 +2335,6 @@ def test_schema_id_validation_create_collision(self): Test creating a topic where Redpanda and compat modes are incompatible ''' - self.admin.put_feature("schema_id_validation", {"state": "active"}) - self.redpanda.await_feature("schema_id_validation", - True, - timeout_sec=10) topic = "default-topic" try: self.rpk.create_topic( @@ -2373,10 +2355,6 @@ def test_schema_id_validation_alter_collision(self): Test altering a topic where Redpanda and compat modes are incompatible ''' - self.admin.put_feature("schema_id_validation", {"state": "active"}) - self.redpanda.await_feature("schema_id_validation", - True, - timeout_sec=10) topic = "default-topic" self.rpk.create_topic( topic, diff --git a/tests/rptest/tests/topic_creation_test.py b/tests/rptest/tests/topic_creation_test.py index 7ad240ff84716..a3254351f26f1 100644 --- a/tests/rptest/tests/topic_creation_test.py +++ b/tests/rptest/tests/topic_creation_test.py @@ -193,14 +193,6 @@ class CreateTopicsTest(RedpandaTest): lambda: "true" if random.randint(0, 1) else "false", 'segment.ms': lambda: random.choice([-1, random.randint(10000, 10000000)]), - TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION: - lambda: "true" if random.randint(0, 1) else "false", - TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY: - lambda: random.choice(list(TopicSpec.SubjectNameStrategy)).value, - TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION: - lambda: "true" if random.randint(0, 1) else "false", - TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY: - lambda: random.choice(list(TopicSpec.SubjectNameStrategy)).value } def __init__(self, test_context): From 9a5ab71cb3aaa29cd9ab254b51f493cb0a592ee7 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 7 Jun 2023 15:59:52 +0100 Subject: [PATCH 5/9] features: Retire `schema_id_validation` Schema ID validation is now enabled with cluster config: `enable_schema_id_validation` Closes #11113 Signed-off-by: Ben Pope --- src/v/features/feature_table.cc | 2 -- src/v/features/feature_table.h | 7 ------- 2 files changed, 9 deletions(-) diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index 613b92b348d87..239235b4d3bba 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -69,8 +69,6 @@ std::string_view to_string_view(feature f) { return "transaction_partitioning"; case feature::force_partition_reconfiguration: return "force_partition_reconfiguration"; - case feature::schema_id_validation: - return "schema_id_validation"; case feature::raft_append_entries_serde: return "raft_append_entries_serde"; diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index 9978cacba2f3e..490835857a880 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -59,7 +59,6 @@ enum class feature : std::uint64_t { cloud_storage_manifest_format_v2 = 1ULL << 24U, transaction_partitioning = 1ULL << 25U, force_partition_reconfiguration = 1ULL << 26U, - schema_id_validation = 1ULL << 27U, raft_append_entries_serde = 1ULL << 28U, // Dummy features for testing only @@ -267,12 +266,6 @@ constexpr static std::array feature_schema{ feature::force_partition_reconfiguration, feature_spec::available_policy::always, feature_spec::prepare_policy::always}, - feature_spec{ - cluster::cluster_version{10}, - "schema_id_validation", - feature::schema_id_validation, - feature_spec::available_policy::always, - feature_spec::prepare_policy::always}, feature_spec{ cluster::cluster_version{10}, "raft_append_entries_serde", From ad5c5c542557305427bab42c558729ff14792c67 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 7 Jun 2023 20:58:46 +0100 Subject: [PATCH 6/9] kafka/server: Refactor schema ID validation topic properties Pure refactor: Move the `_compat` topic properties first to make the next refactor simpler. Signed-off-by: Ben Pope --- .../server/handlers/configs/config_utils.h | 26 ++--- .../kafka/server/handlers/describe_configs.cc | 107 +++++++++--------- 2 files changed, 67 insertions(+), 66 deletions(-) diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 2867aba963403..920b74e7a481d 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -399,31 +399,31 @@ class schema_id_validation_config_parser { auto prop = string_switch>(cfg.name) - .match( - topic_property_record_key_schema_id_validation, - &props.record_key_schema_id_validation) .match( topic_property_record_key_schema_id_validation_compat, &props.record_key_schema_id_validation_compat) - .match( - topic_property_record_key_subject_name_strategy, - &props.record_key_subject_name_strategy) .match( topic_property_record_key_subject_name_strategy_compat, &props.record_key_subject_name_strategy_compat) - .match( - topic_property_record_value_schema_id_validation, - &props.record_value_schema_id_validation) .match( topic_property_record_value_schema_id_validation_compat, &props.record_value_schema_id_validation_compat) - .match( - topic_property_record_value_subject_name_strategy, - &props.record_value_subject_name_strategy) .match( topic_property_record_value_subject_name_strategy_compat, &props.record_value_subject_name_strategy_compat) - .default_match(std::nullopt); + .match( + topic_property_record_key_schema_id_validation, + &props.record_key_schema_id_validation) + .match( + topic_property_record_key_subject_name_strategy, + &props.record_key_subject_name_strategy) + .match( + topic_property_record_value_schema_id_validation, + &props.record_value_schema_id_validation) + .match( + topic_property_record_value_subject_name_strategy, + &props.record_value_subject_name_strategy) + .default_match(std::nullopt); if (prop.has_value()) { ss::visit( prop.value(), [&cfg, op](auto& p) { apply(*p, cfg.value, op); }); diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index 4f461e8061e0d..c6f4907f99095 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -818,19 +818,6 @@ ss::future describe_configs_handler::handle( constexpr std::string_view val_validation = "Enable validation of the schema id for values on a record"; const bool hide_default_override = true; - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_schema_id_validation, - ctx.metadata_cache() - .get_default_record_key_schema_id_validation(), - topic_property_record_key_schema_id_validation, - topic_config->properties.record_key_schema_id_validation, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, key_validation), - &describe_as_string, - hide_default_override); add_topic_config_if_requested( resource, @@ -847,24 +834,6 @@ ss::future describe_configs_handler::handle( &describe_as_string, hide_default_override); - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_subject_name_strategy, - ctx.metadata_cache() - .get_default_record_key_subject_name_strategy(), - topic_property_record_key_subject_name_strategy, - topic_config->properties.record_key_subject_name_strategy, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - fmt::format( - "The subject name strategy for keys if {} is enabled", - topic_property_record_key_schema_id_validation)), - &describe_as_string< - pandaproxy::schema_registry::subject_name_strategy>, - hide_default_override); - add_topic_config_if_requested( resource, result, @@ -888,11 +857,12 @@ ss::future describe_configs_handler::handle( add_topic_config_if_requested( resource, result, - topic_property_record_value_schema_id_validation, + topic_property_record_value_schema_id_validation_compat, ctx.metadata_cache() .get_default_record_value_schema_id_validation(), - topic_property_record_value_schema_id_validation, - topic_config->properties.record_value_schema_id_validation, + topic_property_record_value_schema_id_validation_compat, + topic_config->properties + .record_value_schema_id_validation_compat, request.data.include_synonyms, maybe_make_documentation( request.data.include_documentation, val_validation), @@ -902,32 +872,51 @@ ss::future describe_configs_handler::handle( add_topic_config_if_requested( resource, result, - topic_property_record_value_schema_id_validation_compat, + topic_property_record_value_subject_name_strategy_compat, ctx.metadata_cache() - .get_default_record_value_schema_id_validation(), - topic_property_record_value_schema_id_validation_compat, + .get_default_record_value_subject_name_strategy(), + topic_property_record_value_subject_name_strategy_compat, topic_config->properties - .record_value_schema_id_validation_compat, + .record_value_subject_name_strategy_compat, request.data.include_synonyms, maybe_make_documentation( - request.data.include_documentation, val_validation), + request.data.include_documentation, + fmt::format( + "The subject name strategy for values if {} is enabled", + topic_property_record_value_schema_id_validation_compat)), + [](auto sns) { + return ss::sstring(to_string_view_compat(sns)); + }, + hide_default_override); + + add_topic_config_if_requested( + resource, + result, + topic_property_record_key_schema_id_validation, + ctx.metadata_cache() + .get_default_record_key_schema_id_validation(), + topic_property_record_key_schema_id_validation, + topic_config->properties.record_key_schema_id_validation, + request.data.include_synonyms, + maybe_make_documentation( + request.data.include_documentation, key_validation), &describe_as_string, hide_default_override); add_topic_config_if_requested( resource, result, - topic_property_record_value_subject_name_strategy, + topic_property_record_key_subject_name_strategy, ctx.metadata_cache() - .get_default_record_value_subject_name_strategy(), - topic_property_record_value_subject_name_strategy, - topic_config->properties.record_value_subject_name_strategy, + .get_default_record_key_subject_name_strategy(), + topic_property_record_key_subject_name_strategy, + topic_config->properties.record_key_subject_name_strategy, request.data.include_synonyms, maybe_make_documentation( request.data.include_documentation, fmt::format( - "The subject name strategy for values if {} is enabled", - topic_property_record_value_schema_id_validation)), + "The subject name strategy for keys if {} is enabled", + topic_property_record_key_schema_id_validation)), &describe_as_string< pandaproxy::schema_registry::subject_name_strategy>, hide_default_override); @@ -935,21 +924,33 @@ ss::future describe_configs_handler::handle( add_topic_config_if_requested( resource, result, - topic_property_record_value_subject_name_strategy_compat, + topic_property_record_value_schema_id_validation, + ctx.metadata_cache() + .get_default_record_value_schema_id_validation(), + topic_property_record_value_schema_id_validation, + topic_config->properties.record_value_schema_id_validation, + request.data.include_synonyms, + maybe_make_documentation( + request.data.include_documentation, val_validation), + &describe_as_string, + hide_default_override); + + add_topic_config_if_requested( + resource, + result, + topic_property_record_value_subject_name_strategy, ctx.metadata_cache() .get_default_record_value_subject_name_strategy(), - topic_property_record_value_subject_name_strategy_compat, - topic_config->properties - .record_value_subject_name_strategy_compat, + topic_property_record_value_subject_name_strategy, + topic_config->properties.record_value_subject_name_strategy, request.data.include_synonyms, maybe_make_documentation( request.data.include_documentation, fmt::format( "The subject name strategy for values if {} is enabled", - topic_property_record_value_schema_id_validation_compat)), - [](auto sns) { - return ss::sstring(to_string_view_compat(sns)); - }, + topic_property_record_value_schema_id_validation)), + &describe_as_string< + pandaproxy::schema_registry::subject_name_strategy>, hide_default_override); } From 3eaa014e1aabd5c72fc1d1211dd0ac74e3788144 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 7 Jun 2023 21:48:05 +0100 Subject: [PATCH 7/9] kafka: Support schema_id_validation_mode::redpanda `schema_id_validation_mode::redpanda` is a subset of `compat`, that does not allow the compat topic properties in topic `create`, `alter`, and `incremental_alter`, and doesn't show them during `describe`. Signed-off-by: Ben Pope --- .../server/handlers/configs/config_utils.h | 20 ++++++--- .../kafka/server/handlers/describe_configs.cc | 43 +++++++++++-------- 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 920b74e7a481d..1950a78e5a310 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -19,6 +19,7 @@ #include "kafka/server/handlers/topics/types.h" #include "kafka/server/request_context.h" #include "kafka/types.h" +#include "model/schema_id_validation.h" #include "outcome.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" #include "security/acl.h" @@ -397,8 +398,10 @@ class schema_id_validation_config_parser { decltype(&props.record_key_schema_id_validation), decltype(&props.record_key_subject_name_strategy)>; - auto prop - = string_switch>(cfg.name) + auto matcher = string_switch>(cfg.name); + switch (config::shard_local_cfg().enable_schema_id_validation()) { + case model::schema_id_validation_mode::compat: + matcher .match( topic_property_record_key_schema_id_validation_compat, &props.record_key_schema_id_validation_compat) @@ -410,7 +413,10 @@ class schema_id_validation_config_parser { &props.record_value_schema_id_validation_compat) .match( topic_property_record_value_subject_name_strategy_compat, - &props.record_value_subject_name_strategy_compat) + &props.record_value_subject_name_strategy_compat); + [[fallthrough]]; + case model::schema_id_validation_mode::redpanda: + matcher .match( topic_property_record_key_schema_id_validation, &props.record_key_schema_id_validation) @@ -422,8 +428,12 @@ class schema_id_validation_config_parser { &props.record_value_schema_id_validation) .match( topic_property_record_value_subject_name_strategy, - &props.record_value_subject_name_strategy) - .default_match(std::nullopt); + &props.record_value_subject_name_strategy); + [[fallthrough]]; + case model::schema_id_validation_mode::none: + break; + } + auto prop = matcher.default_match(std::nullopt); if (prop.has_value()) { ss::visit( prop.value(), [&cfg, op](auto& p) { apply(*p, cfg.value, op); }); diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index c6f4907f99095..02001608b3594 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -810,15 +810,15 @@ ss::future describe_configs_handler::handle( request.data.include_documentation, config::shard_local_cfg().log_segment_ms.desc())); - if ( - config::shard_local_cfg().enable_schema_id_validation() - != pandaproxy::schema_registry::schema_id_validation_mode::none) { - constexpr std::string_view key_validation - = "Enable validation of the schema id for keys on a record"; - constexpr std::string_view val_validation - = "Enable validation of the schema id for values on a record"; - const bool hide_default_override = true; - + constexpr std::string_view key_validation + = "Enable validation of the schema id for keys on a record"; + constexpr std::string_view val_validation + = "Enable validation of the schema id for values on a record"; + constexpr bool validation_hide_default_override = true; + + switch (config::shard_local_cfg().enable_schema_id_validation()) { + case pandaproxy::schema_registry::schema_id_validation_mode:: + compat: { add_topic_config_if_requested( resource, result, @@ -832,7 +832,7 @@ ss::future describe_configs_handler::handle( maybe_make_documentation( request.data.include_documentation, key_validation), &describe_as_string, - hide_default_override); + validation_hide_default_override); add_topic_config_if_requested( resource, @@ -852,7 +852,7 @@ ss::future describe_configs_handler::handle( [](auto sns) { return ss::sstring(to_string_view_compat(sns)); }, - hide_default_override); + validation_hide_default_override); add_topic_config_if_requested( resource, @@ -867,7 +867,7 @@ ss::future describe_configs_handler::handle( maybe_make_documentation( request.data.include_documentation, val_validation), &describe_as_string, - hide_default_override); + validation_hide_default_override); add_topic_config_if_requested( resource, @@ -887,8 +887,11 @@ ss::future describe_configs_handler::handle( [](auto sns) { return ss::sstring(to_string_view_compat(sns)); }, - hide_default_override); - + validation_hide_default_override); + [[fallthrough]]; + } + case pandaproxy::schema_registry::schema_id_validation_mode:: + redpanda: { add_topic_config_if_requested( resource, result, @@ -901,7 +904,7 @@ ss::future describe_configs_handler::handle( maybe_make_documentation( request.data.include_documentation, key_validation), &describe_as_string, - hide_default_override); + validation_hide_default_override); add_topic_config_if_requested( resource, @@ -919,7 +922,7 @@ ss::future describe_configs_handler::handle( topic_property_record_key_schema_id_validation)), &describe_as_string< pandaproxy::schema_registry::subject_name_strategy>, - hide_default_override); + validation_hide_default_override); add_topic_config_if_requested( resource, @@ -933,7 +936,7 @@ ss::future describe_configs_handler::handle( maybe_make_documentation( request.data.include_documentation, val_validation), &describe_as_string, - hide_default_override); + validation_hide_default_override); add_topic_config_if_requested( resource, @@ -951,7 +954,11 @@ ss::future describe_configs_handler::handle( topic_property_record_value_schema_id_validation)), &describe_as_string< pandaproxy::schema_registry::subject_name_strategy>, - hide_default_override); + validation_hide_default_override); + } + case pandaproxy::schema_registry::schema_id_validation_mode::none: { + break; + } } break; From dcc685a928753b00604f4f17f105d1c826df5fca Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Wed, 7 Jun 2023 23:10:10 +0100 Subject: [PATCH 8/9] schema_registry/validation: Support schema_id_validation_mode::redpanda Signed-off-by: Ben Pope --- .../server/handlers/configs/config_utils.h | 8 +- .../pandaproxy/schema_registry/validation.cc | 89 ++++++++++++------- src/v/pandaproxy/schema_registry/validation.h | 4 +- 3 files changed, 64 insertions(+), 37 deletions(-) diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 1950a78e5a310..49bca18331b7b 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -19,8 +19,8 @@ #include "kafka/server/handlers/topics/types.h" #include "kafka/server/request_context.h" #include "kafka/types.h" -#include "model/schema_id_validation.h" #include "outcome.h" +#include "pandaproxy/schema_registry/schema_id_validation.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" #include "security/acl.h" @@ -400,7 +400,7 @@ class schema_id_validation_config_parser { auto matcher = string_switch>(cfg.name); switch (config::shard_local_cfg().enable_schema_id_validation()) { - case model::schema_id_validation_mode::compat: + case pandaproxy::schema_registry::schema_id_validation_mode::compat: matcher .match( topic_property_record_key_schema_id_validation_compat, @@ -415,7 +415,7 @@ class schema_id_validation_config_parser { topic_property_record_value_subject_name_strategy_compat, &props.record_value_subject_name_strategy_compat); [[fallthrough]]; - case model::schema_id_validation_mode::redpanda: + case pandaproxy::schema_registry::schema_id_validation_mode::redpanda: matcher .match( topic_property_record_key_schema_id_validation, @@ -430,7 +430,7 @@ class schema_id_validation_config_parser { topic_property_record_value_subject_name_strategy, &props.record_value_subject_name_strategy); [[fallthrough]]; - case model::schema_id_validation_mode::none: + case pandaproxy::schema_registry::schema_id_validation_mode::none: break; } auto prop = matcher.default_match(std::nullopt); diff --git a/src/v/pandaproxy/schema_registry/validation.cc b/src/v/pandaproxy/schema_registry/validation.cc index eb3f40c49fb52..d5ef90881c653 100644 --- a/src/v/pandaproxy/schema_registry/validation.cc +++ b/src/v/pandaproxy/schema_registry/validation.cc @@ -139,6 +139,22 @@ ss::future> get_record_name( co_return std::nullopt; } +template +T combine( + pandaproxy::schema_registry::schema_id_validation_mode mode, + std::optional const& redpanda, + std::optional const& compat, + T dflt) { + switch (mode) { + case pandaproxy::schema_registry::schema_id_validation_mode::none: + return dflt; + case pandaproxy::schema_registry::schema_id_validation_mode::redpanda: + return redpanda.value_or(dflt); + case pandaproxy::schema_registry::schema_id_validation_mode::compat: + return redpanda.value_or(compat.value_or(dflt)); + } +} + } // namespace class schema_id_validator::impl { @@ -150,31 +166,30 @@ class schema_id_validator::impl { impl( const std::unique_ptr& api, model::topic topic, - const cluster::topic_properties& props) + const cluster::topic_properties& props, + pandaproxy::schema_registry::schema_id_validation_mode mode) : _api{api} , _topic{std::move(topic)} - , _record_key_schema_id_validation{props.record_key_schema_id_validation - .value_or( - props - .record_key_schema_id_validation_compat - .value_or(false))} - , _record_key_subject_name_strategy{props.record_key_subject_name_strategy - .value_or( - props - .record_key_subject_name_strategy_compat - .value_or( - subject_name_strategy:: - topic_name))} - , _record_value_schema_id_validation{props - .record_value_schema_id_validation - .value_or( - props - .record_value_schema_id_validation_compat - .value_or(false))} - , _record_value_subject_name_strategy{ - props.record_value_subject_name_strategy.value_or( - props.record_value_subject_name_strategy_compat.value_or( - subject_name_strategy::topic_name))} {} + , _record_key_schema_id_validation{combine( + mode, + props.record_key_schema_id_validation, + props.record_key_schema_id_validation_compat, + false)} + , _record_key_subject_name_strategy{combine( + mode, + props.record_key_subject_name_strategy, + props.record_key_subject_name_strategy_compat, + subject_name_strategy::topic_name)} + , _record_value_schema_id_validation{combine( + mode, + props.record_value_schema_id_validation, + props.record_value_schema_id_validation_compat, + false)} + , _record_value_subject_name_strategy{combine( + mode, + props.record_value_subject_name_strategy, + props.record_value_subject_name_strategy_compat, + subject_name_strategy::topic_name)} {} auto validate_field( field field, model::topic topic, subject_name_strategy sns, iobuf buf) @@ -414,26 +429,36 @@ class schema_id_validator::impl { schema_id_validator::schema_id_validator( const std::unique_ptr& api, const model::topic& topic, - const cluster::topic_properties& props) - : _impl{std::make_unique(api, topic, props)} {} + const cluster::topic_properties& props, + pandaproxy::schema_registry::schema_id_validation_mode mode) + : _impl{std::make_unique(api, topic, props, mode)} {} schema_id_validator::schema_id_validator(schema_id_validator&&) noexcept = default; schema_id_validator::~schema_id_validator() noexcept = default; -bool should_validate_schema_id(const cluster::topic_properties& props) { - return props.record_key_schema_id_validation.value_or( - props.record_key_schema_id_validation_compat.value_or(false)) - || props.record_value_schema_id_validation.value_or( - props.record_value_schema_id_validation_compat.value_or(false)); +bool should_validate_schema_id( + const cluster::topic_properties& props, + pandaproxy::schema_registry::schema_id_validation_mode mode) { + return combine( + mode, + props.record_key_schema_id_validation, + props.record_key_schema_id_validation_compat, + false) + || combine( + mode, + props.record_value_schema_id_validation, + props.record_value_schema_id_validation_compat, + false); } std::optional maybe_make_schema_id_validator( const std::unique_ptr& api, const model::topic& topic, const cluster::topic_properties& props) { - return api != nullptr && should_validate_schema_id(props) - ? std::make_optional(api, topic, props) + auto mode = config::shard_local_cfg().enable_schema_id_validation(); + return api != nullptr && should_validate_schema_id(props, mode) + ? std::make_optional(api, topic, props, mode) : std::nullopt; } diff --git a/src/v/pandaproxy/schema_registry/validation.h b/src/v/pandaproxy/schema_registry/validation.h index 201bcfe282d35..fc5dc985ecc14 100644 --- a/src/v/pandaproxy/schema_registry/validation.h +++ b/src/v/pandaproxy/schema_registry/validation.h @@ -16,6 +16,7 @@ #include "model/record_batch_reader.h" #include "outcome.h" #include "pandaproxy/schema_registry/api.h" +#include "pandaproxy/schema_registry/schema_id_validation.h" #include "seastarx.h" #include @@ -28,7 +29,8 @@ class schema_id_validator { schema_id_validator( const std::unique_ptr& api, const model::topic& topic, - const cluster::topic_properties& props); + const cluster::topic_properties& props, + pandaproxy::schema_registry::schema_id_validation_mode mode); schema_id_validator(schema_id_validator&&) noexcept; schema_id_validator(const schema_id_validator&) = delete; schema_id_validator& operator=(schema_id_validator&&) = delete; From 59a3f88569b1e013d00194ac7d7468161e4fc386 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Thu, 8 Jun 2023 12:48:00 +0100 Subject: [PATCH 9/9] schema_registry/dt: Refactor property tests Simplify the tests by extracting out _get_topic_properties. Signed-off-by: Ben Pope --- tests/rptest/tests/schema_registry_test.py | 206 +++++++++------------ 1 file changed, 88 insertions(+), 118 deletions(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index ff29566637196..037c3e8ae2fac 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -2168,8 +2168,55 @@ def __init__(self, *args, **kwargs): self.rpk = RpkTool(self.redpanda) self.admin = Admin(self.redpanda) - @cluster(num_nodes=3) - def test_schema_id_validation_disabled_config(self): + def _get_topic_properties(self, mode: Optional[SchemaIdValidationMode], + enable: Optional[bool], + strategy: TopicSpec.SubjectNameStrategy): + enable_str = f"{enable}".lower() + config = {} + if mode == SchemaIdValidationMode.REDPANDA: + if enable is not None: + config.update({ + TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION: + enable_str, + TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION: + enable_str, + }) + if strategy is not None: + config.update({ + TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY: + strategy.value, + TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY: + strategy.value, + }) + + if mode == SchemaIdValidationMode.COMPAT: + if enable is not None: + config.update({ + TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION_COMPAT: + enable_str, + TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION_COMPAT: + enable_str, + }) + if strategy is not None: + if strategy == TopicSpec.SubjectNameStrategy.TOPIC_NAME: + strategy_compat = TopicSpec.SubjectNameStrategyCompat.TOPIC_NAME + elif strategy == TopicSpec.SubjectNameStrategy.RECORD_NAME: + strategy_compat = TopicSpec.SubjectNameStrategyCompat.RECORD_NAME + elif strategy == TopicSpec.SubjectNameStrategy.TOPIC_RECORD_NAME: + strategy_compat = TopicSpec.SubjectNameStrategyCompat.TOPIC_RECORD_NAME + + config.update({ + TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY_COMPAT: + strategy_compat.value, + TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY_COMPAT: + strategy_compat.value, + }) + return config + + @cluster(num_nodes=1) + @parametrize(mode=SchemaIdValidationMode.REDPANDA) + @parametrize(mode=SchemaIdValidationMode.COMPAT) + def test_schema_id_validation_disabled_config(self, mode): ''' When the feature is disabled, the configs should not appear ''' @@ -2181,153 +2228,76 @@ def test_schema_id_validation_disabled_config(self): self.rpk.create_topic(topic) desc = self.rpk.describe_topic_configs(topic) - assert TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION not in desc - assert TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY not in desc - assert TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION not in desc - assert TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY not in desc + all_config = self._get_topic_properties( + mode, False, TopicSpec.SubjectNameStrategy.TOPIC_NAME) - @cluster(num_nodes=3) - def test_schema_id_validation_active_config(self): + for k in all_config.items(): + assert k not in desc + + @cluster(num_nodes=1) + @parametrize(mode=SchemaIdValidationMode.REDPANDA) + @parametrize(mode=SchemaIdValidationMode.COMPAT) + def test_schema_id_validation_active_config(self, mode): ''' When the feature is active, the configs should be default ''' + self.redpanda.set_cluster_config({'enable_schema_id_validation': mode}) + topic = "default-topic" self.rpk.create_topic(topic) desc = self.rpk.describe_topic_configs(topic) - assert desc[TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION] == ( - 'false', 'DEFAULT_CONFIG') - assert desc[TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY] == ( - TopicSpec.SubjectNameStrategy.TOPIC_NAME.value, 'DEFAULT_CONFIG') - assert desc[TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION] == ( - 'false', 'DEFAULT_CONFIG') - assert desc[TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY] == ( - TopicSpec.SubjectNameStrategy.TOPIC_NAME.value, 'DEFAULT_CONFIG') + config = self._get_topic_properties( + mode, False, TopicSpec.SubjectNameStrategy.TOPIC_NAME) - @cluster(num_nodes=3) - def test_schema_id_validation_active_explicit_default_config(self): + for k, v in config.items(): + assert desc[k] == (v, 'DEFAULT_CONFIG') + + @cluster(num_nodes=1) + @parametrize(mode=SchemaIdValidationMode.REDPANDA) + @parametrize(mode=SchemaIdValidationMode.COMPAT) + def test_schema_id_validation_active_explicit_default_config(self, mode): ''' If the configuration is explicitly set to default, pretend it isn't dynamic, so that tools with a reconcialiation loop aren't confused ''' + self.redpanda.set_cluster_config({'enable_schema_id_validation': mode}) + topic = "default-topic" - self.rpk.create_topic( - topic, - config={ - TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION: - 'false', - TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY: - TopicSpec.SubjectNameStrategy.TOPIC_NAME.value, - TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION: - 'false', - TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY: - TopicSpec.SubjectNameStrategy.TOPIC_NAME.value, - TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION_COMPAT: - 'false', - TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY_COMPAT: - TopicSpec.SubjectNameStrategyCompat.TOPIC_NAME.value, - TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION_COMPAT: - 'false', - TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY_COMPAT: - TopicSpec.SubjectNameStrategyCompat.TOPIC_NAME.value, - }) + + config = self._get_topic_properties( + mode, False, TopicSpec.SubjectNameStrategy.TOPIC_NAME) + + self.rpk.create_topic(topic, config=config) desc = self.rpk.describe_topic_configs(topic) - assert desc[TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION] == ( - 'false', 'DEFAULT_CONFIG') - assert desc[TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY] == ( - TopicSpec.SubjectNameStrategy.TOPIC_NAME.value, 'DEFAULT_CONFIG') - assert desc[TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION] == ( - 'false', 'DEFAULT_CONFIG') - assert desc[TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY] == ( - TopicSpec.SubjectNameStrategy.TOPIC_NAME.value, 'DEFAULT_CONFIG') - - assert desc[ - TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION_COMPAT] == ( - 'false', 'DEFAULT_CONFIG') - assert desc[ - TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY_COMPAT] == ( - TopicSpec.SubjectNameStrategyCompat.TOPIC_NAME.value, - 'DEFAULT_CONFIG') - assert desc[ - TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION_COMPAT] == ( - 'false', 'DEFAULT_CONFIG') - assert desc[ - TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY_COMPAT] == ( - TopicSpec.SubjectNameStrategyCompat.TOPIC_NAME.value, - 'DEFAULT_CONFIG') + for k, v in config.items(): + assert desc[k] == (v, 'DEFAULT_CONFIG') @cluster(num_nodes=1) - def test_schema_id_validation_active_nondefault_config(self): + @parametrize(mode=SchemaIdValidationMode.REDPANDA) + @parametrize(mode=SchemaIdValidationMode.COMPAT) + def test_schema_id_validation_active_nondefault_config(self, mode): ''' If the configuration is explicitly set to non-default, it should show as dyamic ''' + self.redpanda.set_cluster_config({'enable_schema_id_validation': mode}) + topic = "default-topic" - self.rpk.create_topic( - topic, - config={ - TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION: - 'true', - TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY: - TopicSpec.SubjectNameStrategy.TOPIC_RECORD_NAME.value, - TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION: - 'true', - TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY: - TopicSpec.SubjectNameStrategy.RECORD_NAME.value, - }) - desc = self.rpk.describe_topic_configs(topic) - assert desc[TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION] == ( - 'true', 'DYNAMIC_TOPIC_CONFIG') - assert desc[TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY] == ( - TopicSpec.SubjectNameStrategy.TOPIC_RECORD_NAME.value, - 'DYNAMIC_TOPIC_CONFIG') - assert desc[TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION] == ( - 'true', 'DYNAMIC_TOPIC_CONFIG') - assert desc[TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY] == ( - TopicSpec.SubjectNameStrategy.RECORD_NAME.value, - 'DYNAMIC_TOPIC_CONFIG') + config = self._get_topic_properties( + mode, True, TopicSpec.SubjectNameStrategy.RECORD_NAME) - @cluster(num_nodes=1) - def test_schema_id_validation_active_nondefault_compat_config(self): - ''' - If the configuration is explicitly set to non-default, it should show - as dyamic - ''' + self.rpk.create_topic(topic, config=config) - topic = "default-topic" - self.rpk.create_topic( - topic, - config={ - TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION_COMPAT: - 'true', - TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY_COMPAT: - TopicSpec.SubjectNameStrategyCompat.TOPIC_RECORD_NAME.value, - TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION_COMPAT: - 'true', - TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY_COMPAT: - TopicSpec.SubjectNameStrategyCompat.RECORD_NAME.value, - }) desc = self.rpk.describe_topic_configs(topic) - assert desc[ - TopicSpec.PROPERTY_RECORD_KEY_SCHEMA_ID_VALIDATION_COMPAT] == ( - 'true', 'DYNAMIC_TOPIC_CONFIG') - assert desc[ - TopicSpec.PROPERTY_RECORD_KEY_SUBJECT_NAME_STRATEGY_COMPAT] == ( - TopicSpec.SubjectNameStrategyCompat.TOPIC_RECORD_NAME.value, - 'DYNAMIC_TOPIC_CONFIG') - assert desc[ - TopicSpec.PROPERTY_RECORD_VALUE_SCHEMA_ID_VALIDATION_COMPAT] == ( - 'true', 'DYNAMIC_TOPIC_CONFIG') - assert desc[ - TopicSpec.PROPERTY_RECORD_VALUE_SUBJECT_NAME_STRATEGY_COMPAT] == ( - TopicSpec.SubjectNameStrategyCompat.RECORD_NAME.value, - 'DYNAMIC_TOPIC_CONFIG') + for k, v in config.items(): + assert desc[k] == (v, 'DYNAMIC_TOPIC_CONFIG') @cluster(num_nodes=1) def test_schema_id_validation_create_collision(self):