Skip to content

Commit

Permalink
config: add tombstone_retention_ms to cluster properties
Browse files Browse the repository at this point in the history
Adding support for `tombstone_retention_ms`, which is a cluster property
that is equivalent to Kafka's `log.cleaner.delete.retention.ms`, but is more precisely
named, and represents the retention time for tombstone records in compacted topics
in `redpanda`.

This controls the default value for the topic level property `delete.retention.ms`.
  • Loading branch information
WillemKauf committed Nov 1, 2024
1 parent 44fe3a7 commit 442bea8
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 37 deletions.
9 changes: 2 additions & 7 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,7 @@ metadata_cache::get_default_record_value_subject_name_strategy() const {

std::optional<std::chrono::milliseconds>
metadata_cache::get_default_delete_retention_ms() const {
// TODO: return config::shard_local_cfg().tombstone_retention_ms();
return std::nullopt;
return config::shard_local_cfg().tombstone_retention_ms();
}

topic_properties metadata_cache::get_default_properties() const {
Expand All @@ -342,12 +341,8 @@ topic_properties metadata_cache::get_default_properties() const {
get_default_retention_local_target_bytes()};
tp.retention_local_target_ms = tristate<std::chrono::milliseconds>{
get_default_retention_local_target_ms()};
/*TODO(willem):
tp.delete_retention_ms = tristate<std::chrono::milliseconds>{
get_default_delete_retention_ms()};
*/
tp.delete_retention_ms = tristate<std::chrono::milliseconds>{
disable_tristate};
get_default_delete_retention_ms()};

return tp;
}
Expand Down
8 changes: 7 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,6 @@ configuration::configuration()
.visibility = visibility::user},
{},
validate_connection_rate)

, transactional_id_expiration_ms(
*this,
"transactional_id_expiration_ms",
Expand Down Expand Up @@ -945,6 +944,13 @@ configuration::configuration()
"How often to trigger background compaction.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
10s)
, tombstone_retention_ms(
*this,
"tombstone_retention_ms",
"The retention time for tombstone records in a compacted topic.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
std::nullopt,
validate_tombstone_retention_ms)
, log_disable_housekeeping_for_tests(
*this,
"log_disable_housekeeping_for_tests",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ struct configuration final : public config_store {
// same as log.retention.ms in kafka
retention_duration_property log_retention_ms;
property<std::chrono::milliseconds> log_compaction_interval_ms;
// same as delete.retention.ms in kafka
property<std::optional<std::chrono::milliseconds>> tombstone_retention_ms;
property<bool> log_disable_housekeeping_for_tests;
property<bool> log_compaction_use_sliding_window;
// same as retention.size in kafka - TODO: size not implemented
Expand Down
36 changes: 36 additions & 0 deletions src/v/config/validators.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "config/configuration.h"
#include "model/namespace.h"
#include "model/validation.h"
#include "serde/rw/chrono.h"
#include "ssx/sformat.h"
#include "utils/inet_address_wrapper.h"

Expand Down Expand Up @@ -247,4 +248,39 @@ validate_cloud_storage_api_endpoint(const std::optional<ss::sstring>& os) {
return std::nullopt;
}

std::optional<ss::sstring> validate_tombstone_retention_ms(
const std::optional<std::chrono::milliseconds>& ms) {
if (ms.has_value()) {
// For simplicity's sake, cloud storage enable/read/write permissions
// cannot be enabled at the same time as tombstone_retention_ms at the
// cluster level, to avoid the case in which redpanda refuses to create
// new, misconfigured topics due to cluster defaults
const auto& cloud_storage_enabled
= config::shard_local_cfg().cloud_storage_enabled;
const auto& cloud_storage_remote_write
= config::shard_local_cfg().cloud_storage_enable_remote_write;
const auto& cloud_storage_remote_read
= config::shard_local_cfg().cloud_storage_enable_remote_read;
if (
cloud_storage_enabled() || cloud_storage_remote_write()
|| cloud_storage_remote_read()) {
return fmt::format(
"cannot set {} if any of ({}, {}, {}) are enabled at the cluster "
"level",
config::shard_local_cfg().tombstone_retention_ms.name(),
cloud_storage_enabled.name(),
cloud_storage_remote_write.name(),
cloud_storage_remote_read.name());
}

if (ms.value() < 1ms || ms.value() > serde::max_serializable_ms) {
return fmt::format(
"tombstone_retention_ms should be in range: [1, {}]",
serde::max_serializable_ms);
}
}

return std::nullopt;
}

}; // namespace config
3 changes: 3 additions & 0 deletions src/v/config/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ validate_audit_excluded_topics(const std::vector<ss::sstring>&);
std::optional<ss::sstring>
validate_cloud_storage_api_endpoint(const std::optional<ss::sstring>& os);

std::optional<ss::sstring> validate_tombstone_retention_ms(
const std::optional<std::chrono::milliseconds>& ms);

}; // namespace config
5 changes: 1 addition & 4 deletions src/v/kafka/server/handlers/configs/config_response_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -846,12 +846,9 @@ config_response_container_t make_topic_configs(
topic_properties.delete_retention_ms,
metadata_cache.get_default_delete_retention_ms()),
include_synonyms,
/*TODO:
maybe_make_documentation(
include_documentation,
config::shard_local_cfg().tombstone_retention_ms.desc()),
*/
std::nullopt);
config::shard_local_cfg().tombstone_retention_ms.desc()));

constexpr std::string_view key_validation
= "Enable validation of the schema id for keys on a record";
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/handlers/incremental_alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "cluster/config_frontend.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/node_config.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/incremental_alter_configs.h"
Expand Down
28 changes: 21 additions & 7 deletions src/v/kafka/server/handlers/topics/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,25 @@ get_leaders_preference(const config_map_t& config) {
return std::nullopt;
}

static tristate<std::chrono::milliseconds>
get_delete_retention_ms(const config_map_t& config) {
auto delete_retention_ms = get_tristate_value<std::chrono::milliseconds>(
config, topic_property_delete_retention_ms);

// If the config entry for delete.retention.ms is in the "empty" state, and
// the cluster default is also std::nullopt, ensure the option is disabled
// by default. DescribeConfigs calls should still return DEFAULT_CONFIG when
// describing this state, thanks to override_if_not_default in
// config_response_utils.cc.
if (
delete_retention_ms.is_empty()
&& !config::shard_local_cfg().tombstone_retention_ms().has_value()) {
return tristate<std::chrono::milliseconds>{disable_tristate};
}

return delete_retention_ms;
}

cluster::custom_assignable_topic_configuration
to_cluster_type(const creatable_topic& t) {
auto cfg = cluster::topic_configuration(
Expand Down Expand Up @@ -282,13 +301,8 @@ to_cluster_type(const creatable_topic& t) {
= get_duration_value<std::chrono::milliseconds>(
config_entries, topic_property_iceberg_translation_interval_ms, true);

/*TODO:
Disable tombstone.retention.ms if it, along with the cluster default, is
unset.
*/
cfg.properties.delete_retention_ms
= get_tristate_value<std::chrono::milliseconds>(
config_entries, topic_property_delete_retention_ms);
cfg.properties.delete_retention_ms = get_delete_retention_ms(
config_entries);

schema_id_validation_config_parser schema_id_validation_config_parser{
cfg.properties};
Expand Down
3 changes: 0 additions & 3 deletions src/v/kafka/server/tests/alter_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -902,12 +902,9 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) {
assert_property_value(
test_tp,
"delete.retention.ms",
/* TODO:
fmt::format(
"{}",
config::shard_local_cfg().tombstone_retention_ms().value_or(-1ms)),
*/
fmt::format("{}", -1ms),
new_describe_resp);
}

Expand Down
17 changes: 17 additions & 0 deletions src/v/redpanda/admin/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1724,6 +1724,23 @@ void config_multi_property_validation(
auto name = ss::sstring(updated_config.cloud_storage_cache_size.name());
errors[name] = invalid_cache.value();
}

// For simplicity's sake, cloud storage read/write permissions cannot be
// enabled at the same time as tombstone_retention_ms at the cluster level,
// to avoid the case in which topics are created with TS read/write
// permissions and bugs are encountered later with tombstone removal.
if (updated_config.tombstone_retention_ms().has_value() &&
(updated_config.cloud_storage_enabled()
|| updated_config.cloud_storage_enable_remote_read()
|| updated_config.cloud_storage_enable_remote_write())) {
errors["cloud_storage_enabled"] = ssx::sformat(
"cannot set {} if any of ({}, {}, {}) are enabled at the cluster "
"level",
updated_config.tombstone_retention_ms.name(),
updated_config.cloud_storage_enabled.name(),
updated_config.cloud_storage_enable_remote_read.name(),
updated_config.cloud_storage_enable_remote_write.name());
}
}
} // namespace

Expand Down
15 changes: 4 additions & 11 deletions src/v/storage/ntp_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,8 @@ class ntp_config {
// RRR sanity check.
return std::nullopt;
}
/*TODO(willem):
auto& cluster_default
= config::shard_local_cfg().tombstone_retention_ms();
*/
auto& cluster_default
= config::shard_local_cfg().tombstone_retention_ms();
if (_overrides) {
// Tombstone deletion should not be enabled at the same time as
// tiered storage.
Expand All @@ -327,16 +325,11 @@ class ntp_config {

// If the tristate holds an empty optional, fall back to cluster
// default.
/*TODO(willem):
return cluster_default;
*/
return cluster_default;
}
// Fall back to cluster default, since _overrides being nullptr signals
// that remote.read and remote.write is disabled for this topic.
/*TODO(willem):
return cluster_default;
*/
return std::nullopt;
return cluster_default;
}

std::optional<model::cleanup_policy_bitflags>
Expand Down
5 changes: 5 additions & 0 deletions tests/rptest/tests/cluster_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,11 @@ def test_valid_settings(self):
# to RP's data dir for it to start
continue

if name == 'tombstone_retention_ms':
# Don't modify tombstone_retention_ms, leave it as nullopt in case of
# cloud storage read/write properties.
continue

if name == 'record_key_subject_name_strategy' or name == 'record_value_subject_name_strategy':
valid_value = random.choice(
[e for e in p['enum_values'] if e != initial_value])
Expand Down
7 changes: 3 additions & 4 deletions tests/rptest/tests/describe_topics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,9 @@ def test_describe_topics_with_documentation_and_types(self):
ConfigProperty(
config_type="LONG",
value="-1",
#TODO(willem): doc_string="The retention time for tombstone records in a compacted topic"
doc_string="",
#TODO(willem): remove DYNAMIC_TOPIC_CONFIG once cluster default is added
source_type="DYNAMIC_TOPIC_CONFIG")
doc_string=
"The retention time for tombstone records in a compacted topic."
)
}

tp_spec = TopicSpec()
Expand Down

0 comments on commit 442bea8

Please sign in to comment.