Skip to content

Commit

Permalink
kafka: add delete.retention.ms to topic config handler
Browse files Browse the repository at this point in the history
Adds configuration for the topic property `delete.retention.ms`
through the `CreateTopic`, `AlterConfig`, and `IncrementalAlterConfig`
Kafka APIs.
  • Loading branch information
WillemKauf committed Nov 1, 2024
1 parent 62d5199 commit 44fe3a7
Show file tree
Hide file tree
Showing 17 changed files with 218 additions and 13 deletions.
13 changes: 13 additions & 0 deletions src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "model/namespace.h"
#include "model/timestamp.h"
#include "storage/types.h"
#include "utils/tristate.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/sharded.hh>
Expand Down Expand Up @@ -318,6 +319,12 @@ metadata_cache::get_default_record_value_subject_name_strategy() const {
return pandaproxy::schema_registry::subject_name_strategy::topic_name;
}

std::optional<std::chrono::milliseconds>
metadata_cache::get_default_delete_retention_ms() const {
// TODO: return config::shard_local_cfg().tombstone_retention_ms();
return std::nullopt;
}

topic_properties metadata_cache::get_default_properties() const {
topic_properties tp;
tp.compression = {get_default_compression()};
Expand All @@ -335,6 +342,12 @@ topic_properties metadata_cache::get_default_properties() const {
get_default_retention_local_target_bytes()};
tp.retention_local_target_ms = tristate<std::chrono::milliseconds>{
get_default_retention_local_target_ms()};
/*TODO(willem):
tp.delete_retention_ms = tristate<std::chrono::milliseconds>{
get_default_delete_retention_ms()};
*/
tp.delete_retention_ms = tristate<std::chrono::milliseconds>{
disable_tristate};

return tp;
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ class metadata_cache {
bool get_default_record_value_schema_id_validation() const;
pandaproxy::schema_registry::subject_name_strategy
get_default_record_value_subject_name_strategy() const;
std::optional<std::chrono::milliseconds>
get_default_delete_retention_ms() const;

topic_properties get_default_properties() const;
std::optional<partition_assignment>
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,8 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
incremental_update(
updated_properties.iceberg_translation_interval_ms,
overrides.iceberg_translation_interval_ms);
incremental_update(
updated_properties.delete_retention_ms, overrides.delete_retention_ms);

auto& properties = tp->second.get_configuration().properties;
// no configuration change, no need to generate delta
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ struct incremental_topic_updates
leaders_preference;
property_update<std::optional<std::chrono::milliseconds>>
iceberg_translation_interval_ms;
property_update<tristate<std::chrono::milliseconds>> delete_retention_ms;

// To allow us to better control use of the deprecated shadow_indexing
// field, use getters and setters instead.
Expand Down Expand Up @@ -674,7 +675,8 @@ struct incremental_topic_updates
leaders_preference,
remote_read,
remote_write,
iceberg_translation_interval_ms);
iceberg_translation_interval_ms,
delete_retention_ms);
}

friend std::ostream&
Expand Down
17 changes: 16 additions & 1 deletion src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ create_topic_properties_update(
std::apply(apply_op(op_t::none), update.custom_properties.serde_fields());

static_assert(
std::tuple_size_v<decltype(update.properties.serde_fields())> == 31,
std::tuple_size_v<decltype(update.properties.serde_fields())> == 32,
"If you added a property, please decide on it's default alter config "
"policy, and handle the update in the loop below");
static_assert(
Expand Down Expand Up @@ -115,6 +115,13 @@ create_topic_properties_update(
= update.properties.get_shadow_indexing();
update_properties_shadow_indexing.op = op_t::none;

/*
Likewise, delete.retention.ms should be prevented from being changed
unless explicitly requested, due to tight coupling with shadow indexing
properties.
*/
update.properties.delete_retention_ms.op = op_t::none;

// Now that the defaults are set, continue to set properties from the
// request

Expand Down Expand Up @@ -349,6 +356,14 @@ create_topic_properties_update(
}
throw validation_error("Cloud topics is not enabled");
}
if (cfg.name == topic_property_delete_retention_ms) {
parse_and_set_tristate(
update.properties.delete_retention_ms,
cfg.value,
kafka::config_resource_operation::set,
delete_retention_ms_validator{});
continue;
}

if (cfg.name == topic_property_iceberg_translation_interval_ms) {
parse_and_set_optional_duration(
Expand Down
17 changes: 17 additions & 0 deletions src/v/kafka/server/handlers/configs/config_response_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,23 @@ config_response_container_t make_topic_configs(
}
}

add_topic_config_if_requested(
config_keys,
result,
topic_property_delete_retention_ms,
metadata_cache.get_default_delete_retention_ms(),
topic_property_delete_retention_ms,
override_if_not_default(
topic_properties.delete_retention_ms,
metadata_cache.get_default_delete_retention_ms()),
include_synonyms,
/*TODO:
maybe_make_documentation(
include_documentation,
config::shard_local_cfg().tombstone_retention_ms.desc()),
*/
std::nullopt);

constexpr std::string_view key_validation
= "Enable validation of the schema id for keys on a record";
constexpr std::string_view val_validation
Expand Down
33 changes: 31 additions & 2 deletions src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,23 @@ struct iceberg_config_validator {
}
};

struct delete_retention_ms_validator {
std::optional<ss::sstring> operator()(
const ss::sstring&,
const tristate<std::chrono::milliseconds>& maybe_value) {
if (maybe_value.has_optional_value()) {
const auto& value = maybe_value.value();
if (value < 1ms || value > serde::max_serializable_ms) {
return fmt::format(
"delete.retention.ms value invalid, expected to be in range "
"[1, {}]",
serde::max_serializable_ms);
}
}
return std::nullopt;
}
};

template<typename T, typename... ValidatorTypes>
requires requires(
model::topic_namespace_view tns,
Expand Down Expand Up @@ -666,11 +683,18 @@ inline void parse_and_set_bool(
}
}

template<typename T>
template<typename T, typename Validator = noop_validator<tristate<T>>>
requires requires(
const tristate<T>& value, const ss::sstring& str, Validator validator) {
{
validator(str, value)
} -> std::convertible_to<std::optional<ss::sstring>>;
}
void parse_and_set_tristate(
cluster::property_update<tristate<T>>& property,
const std::optional<ss::sstring>& value,
config_resource_operation op) {
config_resource_operation op,
Validator validator = noop_validator<tristate<T>>{}) {
// remove property value
if (op == config_resource_operation::remove) {
property.op = cluster::incremental_update_operation::remove;
Expand All @@ -685,6 +709,11 @@ void parse_and_set_tristate(
property.value = tristate<T>(std::make_optional<T>(parsed));
}

auto v_error = validator(*value, property.value);
if (v_error) {
throw validation_error(*v_error);
}

property.op = cluster::incremental_update_operation::set;
return;
}
Expand Down
7 changes: 5 additions & 2 deletions src/v/kafka/server/handlers/create_topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
namespace kafka {

namespace {

bool is_supported(std::string_view name) {
static constexpr auto supported_configs = std::to_array(
{topic_property_compression,
Expand Down Expand Up @@ -74,7 +75,8 @@ bool is_supported(std::string_view name) {
topic_property_flush_bytes,
topic_property_iceberg_enabled,
topic_property_leaders_preference,
topic_property_iceberg_translation_interval_ms});
topic_property_iceberg_translation_interval_ms,
topic_property_delete_retention_ms});

if (std::any_of(
supported_configs.begin(),
Expand Down Expand Up @@ -116,7 +118,8 @@ using validators = make_validator_types<
vcluster_id_validator,
write_caching_configs_validator,
iceberg_config_validator,
cloud_topic_config_validator>;
cloud_topic_config_validator,
delete_retention_ms_validator>;

static void
append_topic_configs(request_context& ctx, create_topics_response& response) {
Expand Down
9 changes: 9 additions & 0 deletions src/v/kafka/server/handlers/incremental_alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,14 @@ create_topic_properties_update(
op);
continue;
}
if (cfg.name == topic_property_delete_retention_ms) {
parse_and_set_tristate(
update.properties.delete_retention_ms,
cfg.value,
op,
delete_retention_ms_validator{});
continue;
}

} catch (const validation_error& e) {
vlog(
Expand Down Expand Up @@ -423,6 +431,7 @@ inline std::string_view map_config_name(std::string_view input) {
.match("log.message.timestamp.type", "log_message_timestamp_type")
.match("log.compression.type", "log_compression_type")
.match("log.roll.ms", "log_segment_ms")
.match("log.cleaner.delete.retention.ms", "tombstone_retention_ms")
.default_match(input);
}

Expand Down
13 changes: 11 additions & 2 deletions src/v/kafka/server/handlers/topics/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "model/timestamp.h"
#include "pandaproxy/schema_registry/subject_name_strategy.h"
#include "strings/string_switch.h"
#include "utils/tristate.h"

#include <seastar/core/sstring.hh>

Expand Down Expand Up @@ -107,7 +108,7 @@ get_string_value(const config_map_t& config, std::string_view key) {
}

// Either parse configuration or return nullopt
static std::optional<bool>
std::optional<bool>
get_bool_value(const config_map_t& config, std::string_view key) {
if (auto it = config.find(key); it != config.end()) {
try {
Expand All @@ -128,7 +129,7 @@ get_bool_value(const config_map_t& config, std::string_view key) {
return std::nullopt;
}

static model::shadow_indexing_mode
model::shadow_indexing_mode
get_shadow_indexing_mode(const config_map_t& config) {
auto arch_enabled = get_bool_value(config, topic_property_remote_write);
auto si_enabled = get_bool_value(config, topic_property_remote_read);
Expand Down Expand Up @@ -281,6 +282,14 @@ to_cluster_type(const creatable_topic& t) {
= get_duration_value<std::chrono::milliseconds>(
config_entries, topic_property_iceberg_translation_interval_ms, true);

/*TODO:
Disable tombstone.retention.ms if it, along with the cluster default, is
unset.
*/
cfg.properties.delete_retention_ms
= get_tristate_value<std::chrono::milliseconds>(
config_entries, topic_property_delete_retention_ms);

schema_id_validation_config_parser schema_id_validation_config_parser{
cfg.properties};

Expand Down
10 changes: 9 additions & 1 deletion src/v/kafka/server/handlers/topics/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ inline constexpr std::string_view topic_property_write_caching
inline constexpr std::string_view topic_property_flush_ms = "flush.ms";
inline constexpr std::string_view topic_property_flush_bytes = "flush.bytes";

inline constexpr std::string_view topic_property_delete_retention_ms
= "delete.retention.ms";

// Server side schema id validation
inline constexpr std::string_view topic_property_record_key_schema_id_validation
= "redpanda.key.schema.id.validation";
Expand Down Expand Up @@ -125,7 +128,6 @@ inline constexpr std::array<std::string_view, 20> allowlist_topic_noop_confs = {
"follower.replication.throttled.replicas",
"flush.messages",
"file.delete.delay.ms",
"delete.retention.ms",
"preallocate",
};

Expand Down Expand Up @@ -156,4 +158,10 @@ std::vector<kafka::creatable_topic_configs> report_topic_configs(
const cluster::metadata_cache& metadata_cache,
const cluster::topic_properties& topic_properties);

std::optional<bool>
get_bool_value(const config_map_t& config, std::string_view key);

model::shadow_indexing_mode
get_shadow_indexing_mode(const config_map_t& config);

} // namespace kafka
36 changes: 36 additions & 0 deletions src/v/kafka/server/handlers/topics/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "kafka/protocol/schemata/create_topics_request.h"
#include "kafka/protocol/schemata/create_topics_response.h"
#include "kafka/server/handlers/topics/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"

Expand Down Expand Up @@ -411,6 +412,41 @@ struct write_caching_configs_validator {
}
};

struct delete_retention_ms_validator {
static constexpr const char* error_message
= "Unsupported delete.retention.ms configuration, cannot be enabled "
"at the same time as redpanda.remote.read or redpanda.remote.write.";
static constexpr const auto config_name
= topic_property_delete_retention_ms;
static constexpr error_code ec = error_code::invalid_config;

static bool is_valid(const creatable_topic& c) {
const auto config_entries = config_map(c.configs);
auto end = config_entries.end();
bool delete_retention_ms
= (config_entries.find(topic_property_delete_retention_ms) != end);

try {
auto shadow_indexing_mode = get_shadow_indexing_mode(
config_entries);
// Cannot set delete_retention_ms at the same time as any tiered
// storage properties.
if (
delete_retention_ms
&& shadow_indexing_mode
!= model::shadow_indexing_mode::disabled) {
return false;
}
} catch (const boost::bad_lexical_cast&) {
// Caught a bad configuration exception.
// Return true for now- this will error out in a later stage.
return true;
}

return true;
}
};

template<typename T>
struct configuration_value_validator {
static constexpr const char* error_message = T::error_message;
Expand Down
Loading

0 comments on commit 44fe3a7

Please sign in to comment.