diff --git a/src/v/cluster/tests/topic_table_test.cc b/src/v/cluster/tests/topic_table_test.cc index 6e1287220a69f..e668013b8a80c 100644 --- a/src/v/cluster/tests/topic_table_test.cc +++ b/src/v/cluster/tests/topic_table_test.cc @@ -513,6 +513,34 @@ FIXTURE_TEST(test_topic_with_schema_id_validation_ops, topic_table_fixture) { cfg = topics.get_topic_cfg(tp_ns); BOOST_REQUIRE(cfg.has_value()); BOOST_REQUIRE(!cfg->properties.record_key_schema_id_validation.has_value()); + + // Ensure that an invalid update cmd does not get persisted in the topic + // table. + // Sanity check before starting. + BOOST_REQUIRE(!cfg->properties.record_key_schema_id_validation.has_value()); + BOOST_REQUIRE( + !cfg->properties.record_key_schema_id_validation_compat.has_value()); + + update.record_key_schema_id_validation.op + = cluster::incremental_update_operation::set; + update.record_key_schema_id_validation.value.emplace(true); + + update.record_key_schema_id_validation_compat.op + = cluster::incremental_update_operation::set; + update.record_key_schema_id_validation_compat.value.emplace(false); + ec = topics + .apply( + cluster::update_topic_properties_cmd{tp_ns, update}, + model::offset{11}) + .get(); + BOOST_REQUIRE_EQUAL(ec, cluster::errc::topic_invalid_config); + cfg = topics.get_topic_cfg(tp_ns); + BOOST_REQUIRE(cfg.has_value()); + + // Properties from invalid configuration should not have been persisted. + BOOST_REQUIRE(!cfg->properties.record_key_schema_id_validation.has_value()); + BOOST_REQUIRE( + !cfg->properties.record_key_schema_id_validation_compat.has_value()); } FIXTURE_TEST(test_topic_table_iterator_basic, topic_table_fixture) { diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 19ba42e3b270e..88859587dcdb9 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -874,79 +874,88 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { != data_migrations::migrated_resource_state::non_restricted) { co_return errc::resource_is_being_migrated; } - auto& properties = tp->second.get_configuration().properties; - auto properties_snapshot = properties; + auto updated_properties = tp->second.get_configuration().properties; auto& overrides = cmd.value; /** * Update topic properties */ incremental_update( - properties.cleanup_policy_bitflags, overrides.cleanup_policy_bitflags); + updated_properties.cleanup_policy_bitflags, + overrides.cleanup_policy_bitflags); incremental_update( - properties.compaction_strategy, overrides.compaction_strategy); - incremental_update(properties.compression, overrides.compression); - incremental_update(properties.retention_bytes, overrides.retention_bytes); + updated_properties.compaction_strategy, overrides.compaction_strategy); + incremental_update(updated_properties.compression, overrides.compression); incremental_update( - properties.retention_duration, overrides.retention_duration); - incremental_update(properties.segment_size, overrides.segment_size); - incremental_update(properties.timestamp_type, overrides.timestamp_type); - - incremental_update(properties.shadow_indexing, overrides.shadow_indexing); - incremental_update(properties.batch_max_bytes, overrides.batch_max_bytes); - + updated_properties.retention_bytes, overrides.retention_bytes); + incremental_update( + updated_properties.retention_duration, overrides.retention_duration); + incremental_update(updated_properties.segment_size, overrides.segment_size); + incremental_update( + updated_properties.timestamp_type, overrides.timestamp_type); + incremental_update( + updated_properties.shadow_indexing, overrides.shadow_indexing); incremental_update( - properties.retention_local_target_bytes, + updated_properties.batch_max_bytes, overrides.batch_max_bytes); + incremental_update( + updated_properties.retention_local_target_bytes, overrides.retention_local_target_bytes); incremental_update( - properties.retention_local_target_ms, + updated_properties.retention_local_target_ms, overrides.retention_local_target_ms); incremental_update( - properties.remote_delete, + updated_properties.remote_delete, overrides.remote_delete, storage::ntp_config::default_remote_delete); - incremental_update(properties.segment_ms, overrides.segment_ms); + incremental_update(updated_properties.segment_ms, overrides.segment_ms); incremental_update( - properties.record_key_schema_id_validation, + updated_properties.record_key_schema_id_validation, overrides.record_key_schema_id_validation); incremental_update( - properties.record_key_schema_id_validation_compat, + updated_properties.record_key_schema_id_validation_compat, overrides.record_key_schema_id_validation_compat); incremental_update( - properties.record_key_subject_name_strategy, + updated_properties.record_key_subject_name_strategy, overrides.record_key_subject_name_strategy); incremental_update( - properties.record_key_subject_name_strategy_compat, + updated_properties.record_key_subject_name_strategy_compat, overrides.record_key_subject_name_strategy_compat); incremental_update( - properties.record_value_schema_id_validation, + updated_properties.record_value_schema_id_validation, overrides.record_value_schema_id_validation); incremental_update( - properties.record_value_schema_id_validation_compat, + updated_properties.record_value_schema_id_validation_compat, overrides.record_value_schema_id_validation_compat); incremental_update( - properties.record_value_subject_name_strategy, + updated_properties.record_value_subject_name_strategy, overrides.record_value_subject_name_strategy); incremental_update( - properties.record_value_subject_name_strategy_compat, + updated_properties.record_value_subject_name_strategy_compat, overrides.record_value_subject_name_strategy_compat); incremental_update( - properties.initial_retention_local_target_bytes, + updated_properties.initial_retention_local_target_bytes, overrides.initial_retention_local_target_bytes); incremental_update( - properties.initial_retention_local_target_ms, + updated_properties.initial_retention_local_target_ms, overrides.initial_retention_local_target_ms); - incremental_update(properties.write_caching, overrides.write_caching); - incremental_update(properties.flush_ms, overrides.flush_ms); - incremental_update(properties.flush_bytes, overrides.flush_bytes); + incremental_update( + updated_properties.write_caching, overrides.write_caching); + incremental_update(updated_properties.flush_ms, overrides.flush_ms); + incremental_update(updated_properties.flush_bytes, overrides.flush_bytes); + + auto& properties = tp->second.get_configuration().properties; + // no configuration change, no need to generate delta - if (properties == properties_snapshot) { + if (updated_properties == properties) { co_return errc::success; } - if (!schema_id_validation_validator::is_valid(properties)) { + if (!schema_id_validation_validator::is_valid(updated_properties)) { co_return schema_id_validation_validator::ec; } + // Apply the changes + properties = std::move(updated_properties); + // generate deltas for controller backend const auto& assignments = tp->second.get_assignments(); for (auto& [_, p_as] : assignments) {