From 0215be8d7200cbf7c25a405305644d68b2856b1b Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Wed, 4 Sep 2024 13:09:21 -0400 Subject: [PATCH 1/2] `cluster`: don't apply invalid property change in `topic_table` We check that the updated topic properties post `topic_table::apply(update_topic_properties_cmd)` is valid with the `schema_id_validation_validator`. However, this early `co_return` with an error code does not actually prevent the invalid configuration from being persisted in the `topic_table`. Rework the logic in `topic_table::apply()` to prevent persisting of an invalid configuration. (cherry picked from commit 3a36f7a0fb7f30364d0c0bd5060e324de0718c73) --- src/v/cluster/topic_table.cc | 73 ++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 878eef885225..14642064dc44 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -844,79 +844,88 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { if (tp == _topics.end()) { co_return make_error_code(errc::topic_not_exists); } - auto& properties = tp->second.get_configuration().properties; - auto properties_snapshot = properties; + auto updated_properties = tp->second.get_configuration().properties; auto& overrides = cmd.value; /** * Update topic properties */ incremental_update( - properties.cleanup_policy_bitflags, overrides.cleanup_policy_bitflags); + updated_properties.cleanup_policy_bitflags, + overrides.cleanup_policy_bitflags); incremental_update( - properties.compaction_strategy, overrides.compaction_strategy); - incremental_update(properties.compression, overrides.compression); - incremental_update(properties.retention_bytes, overrides.retention_bytes); + updated_properties.compaction_strategy, overrides.compaction_strategy); + incremental_update(updated_properties.compression, overrides.compression); incremental_update( - properties.retention_duration, overrides.retention_duration); - incremental_update(properties.segment_size, overrides.segment_size); - incremental_update(properties.timestamp_type, overrides.timestamp_type); - - incremental_update(properties.shadow_indexing, overrides.shadow_indexing); - incremental_update(properties.batch_max_bytes, overrides.batch_max_bytes); - + updated_properties.retention_bytes, overrides.retention_bytes); + incremental_update( + updated_properties.retention_duration, overrides.retention_duration); + incremental_update(updated_properties.segment_size, overrides.segment_size); + incremental_update( + updated_properties.timestamp_type, overrides.timestamp_type); + incremental_update( + updated_properties.shadow_indexing, overrides.shadow_indexing); incremental_update( - properties.retention_local_target_bytes, + updated_properties.batch_max_bytes, overrides.batch_max_bytes); + incremental_update( + updated_properties.retention_local_target_bytes, overrides.retention_local_target_bytes); incremental_update( - properties.retention_local_target_ms, + updated_properties.retention_local_target_ms, overrides.retention_local_target_ms); incremental_update( - properties.remote_delete, + updated_properties.remote_delete, overrides.remote_delete, storage::ntp_config::default_remote_delete); - incremental_update(properties.segment_ms, overrides.segment_ms); + incremental_update(updated_properties.segment_ms, overrides.segment_ms); incremental_update( - properties.record_key_schema_id_validation, + updated_properties.record_key_schema_id_validation, overrides.record_key_schema_id_validation); incremental_update( - properties.record_key_schema_id_validation_compat, + updated_properties.record_key_schema_id_validation_compat, overrides.record_key_schema_id_validation_compat); incremental_update( - properties.record_key_subject_name_strategy, + updated_properties.record_key_subject_name_strategy, overrides.record_key_subject_name_strategy); incremental_update( - properties.record_key_subject_name_strategy_compat, + updated_properties.record_key_subject_name_strategy_compat, overrides.record_key_subject_name_strategy_compat); incremental_update( - properties.record_value_schema_id_validation, + updated_properties.record_value_schema_id_validation, overrides.record_value_schema_id_validation); incremental_update( - properties.record_value_schema_id_validation_compat, + updated_properties.record_value_schema_id_validation_compat, overrides.record_value_schema_id_validation_compat); incremental_update( - properties.record_value_subject_name_strategy, + updated_properties.record_value_subject_name_strategy, overrides.record_value_subject_name_strategy); incremental_update( - properties.record_value_subject_name_strategy_compat, + updated_properties.record_value_subject_name_strategy_compat, overrides.record_value_subject_name_strategy_compat); incremental_update( - properties.initial_retention_local_target_bytes, + updated_properties.initial_retention_local_target_bytes, overrides.initial_retention_local_target_bytes); incremental_update( - properties.initial_retention_local_target_ms, + updated_properties.initial_retention_local_target_ms, overrides.initial_retention_local_target_ms); - incremental_update(properties.write_caching, overrides.write_caching); - incremental_update(properties.flush_ms, overrides.flush_ms); - incremental_update(properties.flush_bytes, overrides.flush_bytes); + incremental_update( + updated_properties.write_caching, overrides.write_caching); + incremental_update(updated_properties.flush_ms, overrides.flush_ms); + incremental_update(updated_properties.flush_bytes, overrides.flush_bytes); + + auto& properties = tp->second.get_configuration().properties; + // no configuration change, no need to generate delta - if (properties == properties_snapshot) { + if (updated_properties == properties) { co_return errc::success; } - if (!schema_id_validation_validator::is_valid(properties)) { + if (!schema_id_validation_validator::is_valid(updated_properties)) { co_return schema_id_validation_validator::ec; } + // Apply the changes + properties = std::move(updated_properties); + // generate deltas for controller backend const auto& assignments = tp->second.get_assignments(); for (auto& p_as : assignments) { From 1203b065de2c16bfb9d3b2a02e10982c51b845cb Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Wed, 4 Sep 2024 14:11:33 -0400 Subject: [PATCH 2/2] `cluster`: add check to `topic_table_test` To ensure updates made from invalid `update_topic_properties_cmd` are not persisted in `topic_properties`. (cherry picked from commit 6da3157c2892f5d79c88f1823606018e747579da) --- src/v/cluster/tests/topic_table_test.cc | 28 +++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/v/cluster/tests/topic_table_test.cc b/src/v/cluster/tests/topic_table_test.cc index 768055a3be8c..97ead40b15bd 100644 --- a/src/v/cluster/tests/topic_table_test.cc +++ b/src/v/cluster/tests/topic_table_test.cc @@ -519,6 +519,34 @@ FIXTURE_TEST(test_topic_with_schema_id_validation_ops, topic_table_fixture) { cfg = topics.get_topic_cfg(tp_ns); BOOST_REQUIRE(cfg.has_value()); BOOST_REQUIRE(!cfg->properties.record_key_schema_id_validation.has_value()); + + // Ensure that an invalid update cmd does not get persisted in the topic + // table. + // Sanity check before starting. + BOOST_REQUIRE(!cfg->properties.record_key_schema_id_validation.has_value()); + BOOST_REQUIRE( + !cfg->properties.record_key_schema_id_validation_compat.has_value()); + + update.record_key_schema_id_validation.op + = cluster::incremental_update_operation::set; + update.record_key_schema_id_validation.value.emplace(true); + + update.record_key_schema_id_validation_compat.op + = cluster::incremental_update_operation::set; + update.record_key_schema_id_validation_compat.value.emplace(false); + ec = topics + .apply( + cluster::update_topic_properties_cmd{tp_ns, update}, + model::offset{11}) + .get(); + BOOST_REQUIRE_EQUAL(ec, cluster::errc::topic_invalid_config); + cfg = topics.get_topic_cfg(tp_ns); + BOOST_REQUIRE(cfg.has_value()); + + // Properties from invalid configuration should not have been persisted. + BOOST_REQUIRE(!cfg->properties.record_key_schema_id_validation.has_value()); + BOOST_REQUIRE( + !cfg->properties.record_key_schema_id_validation_compat.has_value()); } FIXTURE_TEST(test_topic_table_iterator_basic, topic_table_fixture) {