Skip to content

Commit

Permalink
Merge pull request redpanda-data#21291 from pgellert/kafka/alter-configs
Browse files Browse the repository at this point in the history
CORE-3091 Fix non-incremental AlterConfig resetting
  • Loading branch information
pgellert authored Jul 25, 2024
2 parents 24dcdc1 + 3771272 commit 56cd7c4
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 40 deletions.
11 changes: 11 additions & 0 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,20 @@ ss::future<std::vector<topic_result>> topics_frontend::create_topics(
model::timeout_clock::time_point timeout) {
for (auto& tp : topics) {
/**
* The shadow_indexing properties
* ('redpanda.remote.(read|write|delete)') are special "sticky" topic
* properties that are always set as a topic-level override.
*
* See: https://github.com/redpanda-data/redpanda/issues/7451
*
* Note that a manually created topic will have this assigned already by
* kafka/server/handlers/topics/types.cc::to_cluster_type, dependent on
* client-provided topic properties.
*
* tp.cfg.properties.remote_delete is stored as a bool (not
* std::optional<bool>) defaulted to its default value
* (ntp_config::default_remote_delete) on the construction of
* topic_properties(), so there is no need to overwrite it here.
*/
if (!tp.cfg.properties.shadow_indexing.has_value()) {
tp.cfg.properties.shadow_indexing
Expand Down
65 changes: 31 additions & 34 deletions src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "kafka/server/handlers/alter_configs.h"

#include "cluster/metadata_cache.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "features/feature_table.h"
Expand Down Expand Up @@ -42,6 +43,7 @@ static void parse_and_set_shadow_indexing_mode(
property_update,
const std::optional<ss::sstring>& value,
model::shadow_indexing_mode enabled_value) {
property_update.op = cluster::incremental_update_operation::set;
if (!value) {
property_update.value = model::shadow_indexing_mode::disabled;
}
Expand All @@ -56,6 +58,8 @@ static void parse_and_set_shadow_indexing_mode(

checked<cluster::topic_properties_update, alter_configs_resource_response>
create_topic_properties_update(alter_configs_resource& resource) {
using op_t = cluster::incremental_update_operation;

model::topic_namespace tp_ns(
model::kafka_namespace, model::topic(resource.resource_name));
cluster::topic_properties_update update(tp_ns);
Expand All @@ -67,41 +71,34 @@ create_topic_properties_update(alter_configs_resource& resource) {
* configuration in topic table, the only difference is the replication
* factor, if not set in the request explicitly it will not be overriden.
*/
update.properties.compaction_strategy.op
= cluster::incremental_update_operation::set;
update.properties.compression.op
= cluster::incremental_update_operation::set;
update.properties.segment_size.op
= cluster::incremental_update_operation::set;
update.properties.timestamp_type.op
= cluster::incremental_update_operation::set;
update.properties.retention_bytes.op
= cluster::incremental_update_operation::set;
update.properties.retention_duration.op
= cluster::incremental_update_operation::set;
update.properties.shadow_indexing.op
= cluster::incremental_update_operation::set;
update.custom_properties.replication_factor.op
= cluster::incremental_update_operation::none;
update.custom_properties.data_policy.op
= cluster::incremental_update_operation::none;
constexpr auto apply_op = [](op_t op) {
return [op](auto&&... prop) { ((prop.op = op), ...); };
};
std::apply(apply_op(op_t::remove), update.properties.serde_fields());
std::apply(apply_op(op_t::none), update.custom_properties.serde_fields());

static_assert(
std::tuple_size_v<decltype(update.properties.serde_fields())> == 26,
"If you added a property, please decide on it's default alter config "
"policy, and handle the update in the loop below");
static_assert(
std::tuple_size_v<decltype(update.custom_properties.serde_fields())> == 2,
"If you added a property, please decide on it's default alter config "
"policy, and handle the update in the loop below");

/**
* The shadow_indexing properties ('redpanda.remote.(read|write|delete)')
* are special "sticky" topic properties that are always set as a
* topic-level override. We should prevent changing them unless explicitly
* requested.
*
* See: https://github.com/redpanda-data/redpanda/issues/7451
*/
update.properties.shadow_indexing.op = op_t::none;
update.properties.remote_delete.op = op_t::none;

update.properties.record_key_schema_id_validation.op
= cluster::incremental_update_operation::set;
update.properties.record_key_schema_id_validation_compat.op
= cluster::incremental_update_operation::set;
update.properties.record_key_subject_name_strategy.op
= cluster::incremental_update_operation::set;
update.properties.record_key_subject_name_strategy_compat.op
= cluster::incremental_update_operation::set;
update.properties.record_value_schema_id_validation.op
= cluster::incremental_update_operation::set;
update.properties.record_value_schema_id_validation_compat.op
= cluster::incremental_update_operation::set;
update.properties.record_value_subject_name_strategy.op
= cluster::incremental_update_operation::set;
update.properties.record_value_subject_name_strategy_compat.op
= cluster::incremental_update_operation::set;
// Now that the defaults are set, continue to set properties from the
// request

schema_id_validation_config_parser schema_id_validation_config_parser{
update.properties};
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/handlers/incremental_alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "kafka/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "storage/ntp_config.h"
#include "strings/string_switch.h"

#include <seastar/core/do_with.hh>
Expand Down Expand Up @@ -223,7 +224,7 @@ create_topic_properties_update(
cfg.value,
op,
// Topic deletion is enabled by default
true);
storage::ntp_config::default_remote_delete);
continue;
}
if (cfg.name == topic_property_max_message_bytes) {
Expand Down
47 changes: 42 additions & 5 deletions tests/rptest/tests/topic_creation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,10 +622,12 @@ def test_shadow_indexing_mode_persistence(self):
"false", "DYNAMIC_TOPIC_CONFIG")

@cluster(num_nodes=1)
def topic_alter_config_test(self):
@matrix(incremental=[True, False])
def topic_alter_config_test(self, incremental):
"""
Intentionally use the legacy (deprecated in Kafka 2.3.0) AlterConfig
admin RPC, to check it works with our custom topic properties
Intentionally use either the legacy (deprecated in Kafka 2.3.0) AlterConfig
admin RPC or the new IncrementalAlterConfig API, to check that both work
with our custom topic properties
"""
rpk = RpkTool(self.redpanda)
topic = topic_name()
Expand All @@ -644,8 +646,43 @@ def topic_alter_config_test(self):
'initial.retention.local.target.ms': '123456'
}

for k, v in examples.items():
kcl.alter_topic_config({k: v}, incremental=False, topic=topic)
kcl.alter_topic_config(examples, incremental=incremental, topic=topic)
topic_config = rpk.describe_topic_configs(topic)
value, src = topic_config["retention.local.target.bytes"]
assert value == "123456" and src == "DYNAMIC_TOPIC_CONFIG"

kcl.alter_topic_config({"retention.local.target.bytes": "999999"},
incremental=incremental,
topic=topic)
topic_config = rpk.describe_topic_configs(topic)
value, src = topic_config["retention.local.target.bytes"]
assert value == "999999" and src == "DYNAMIC_TOPIC_CONFIG"

# All non-specified configs should revert to their default with incremental=False
for k, _ in examples.items():
if k != "retention.local.target.bytes":
# With the old alter configs API (incremental=False), all the other configs should revert to their default
# With the new incremental alter configs API, all the other configs should be unchanged
expected_src = "DYNAMIC_TOPIC_CONFIG" if incremental else "DEFAULT_CONFIG"

# The shadow_indexing properties ('redpanda.remote.(read|write|delete)')
# are special "sticky" topic properties that are always set as a
# topic-level override. To co-operate with kafka terraform providers, these
# configs show up as "DEFAULT_CONFIG" when they are set to the same value
# as their cluster-level default.
#
# See: https://github.com/redpanda-data/redpanda/issues/7451
hiding_configs = [
'redpanda.remote.delete',
'redpanda.remote.write',
'redpanda.remote.read',
]
if k in hiding_configs:
expected_src = "DEFAULT_CONFIG"

value, src = topic_config[k]
assert src == expected_src, \
f"[incremental={incremental}] Unexpected describe result for {k}: value={value}, src={src}"

# As a control, confirm that if we did pass an invalid property, we would have got an error
with expect_exception(RuntimeError, lambda e: "invalid" in str(e)):
Expand Down

0 comments on commit 56cd7c4

Please sign in to comment.