From 9e7474148cfdaf28441fd1b875792caf2e343d33 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Wed, 18 Sep 2024 11:48:15 -0400 Subject: [PATCH 1/5] `storage`: replace `get_tombstone_delete_horizon()` Replace `internal::get_tombstone_delete_horizon()` with `internal::is_past_tombstone_delete_horizon()`. This change makes the code a little bit cleaner, and potentially reduces the number of operations in `should_keep()` related functions, as the current timestamp only needs to be evaluated once for every tombstone record in a segment with a clean compaction timestamp. Also remove `should_remove_tombstone_record()`, since its logic is now trivial. --- src/v/storage/segment_deduplication_utils.cc | 12 +++---- src/v/storage/segment_utils.cc | 35 ++++++-------------- src/v/storage/segment_utils.h | 22 +++++------- 3 files changed, 25 insertions(+), 44 deletions(-) diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index 66e4cc60ed485..16e628d4bd3fa 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -166,12 +166,12 @@ ss::future deduplicate_segment( auto compaction_placeholder_enabled = feature_table.local().is_active( features::feature::compaction_placeholder_batch); - const std::optional tombstone_delete_horizon - = internal::get_tombstone_delete_horizon(seg, cfg); + const bool past_tombstone_delete_horizon + = internal::is_past_tombstone_delete_horizon(seg, cfg); auto copy_reducer = internal::copy_data_segment_reducer( [&map, segment_last_offset = seg->offsets().get_committed_offset(), - tombstone_delete_horizon = tombstone_delete_horizon, + past_tombstone_delete_horizon, compaction_placeholder_enabled]( const model::record_batch& b, const model::record& r, @@ -190,9 +190,9 @@ ss::future deduplicate_segment( b.header()); return ss::make_ready_future(true); } - // Potentially deal with tombstone record removal - if (internal::should_remove_tombstone_record( - r, tombstone_delete_horizon)) { + + // Deal with tombstone record removal + if (r.is_tombstone() && past_tombstone_delete_horizon) { return ss::make_ready_future(false); } diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 8e16c44f32393..18282cb9de134 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -381,8 +381,8 @@ ss::future do_copy_segment_data( auto old_broker_timestamp = seg->index().broker_timestamp(); auto old_clean_compact_timestamp = seg->index().clean_compact_timestamp(); - const std::optional tombstone_delete_horizon - = get_tombstone_delete_horizon(seg, cfg); + const bool past_tombstone_delete_horizon + = internal::is_past_tombstone_delete_horizon(seg, cfg); // find out which offsets will survive compaction auto idx_path = seg->reader().path().to_compacted_index(); @@ -415,14 +415,13 @@ ss::future do_copy_segment_data( "copying compacted segment data from {} to {}", seg->reader().filename(), tmpname); - auto should_keep = [compacted_list = std::move(compacted_offsets), - tombstone_delete_horizon = tombstone_delete_horizon]( + past_tombstone_delete_horizon]( const model::record_batch& b, const model::record& r, bool) { - // Potentially deal with tombstone record removal - if (should_remove_tombstone_record(r, tombstone_delete_horizon)) { + // Deal with tombstone record removal + if (r.is_tombstone() && past_tombstone_delete_horizon) { return ss::make_ready_future(false); } @@ -1138,32 +1137,18 @@ void mark_segment_as_finished_window_compaction( } } -std::optional get_tombstone_delete_horizon( +bool is_past_tombstone_delete_horizon( ss::lw_shared_ptr seg, const compaction_config& cfg) { - std::optional tombstone_delete_horizon = {}; - const auto& tombstone_retention_ms_opt = cfg.tombstone_retention_ms; if ( seg->index().has_clean_compact_timestamp() - && tombstone_retention_ms_opt.has_value()) { - tombstone_delete_horizon = model::timestamp( + && cfg.tombstone_retention_ms.has_value()) { + auto tombstone_delete_horizon = model::timestamp( seg->index().clean_compact_timestamp()->value() + cfg.tombstone_retention_ms->count()); - } - return tombstone_delete_horizon; -} - -bool should_remove_tombstone_record( - const model::record& r, - std::optional tombstone_delete_horizon) { - if (!r.is_tombstone()) { - return false; - } - - if (!tombstone_delete_horizon.has_value()) { - return false; + return (model::timestamp::now() > tombstone_delete_horizon); } - return (model::timestamp::now() > tombstone_delete_horizon.value()); + return false; } } // namespace storage::internal diff --git a/src/v/storage/segment_utils.h b/src/v/storage/segment_utils.h index 6718952e5fd17..219950a808012 100644 --- a/src/v/storage/segment_utils.h +++ b/src/v/storage/segment_utils.h @@ -258,21 +258,17 @@ inline bool is_compactible(const model::record_batch& b) { offset_delta_time should_apply_delta_time_offset( ss::sharded& feature_table); -// Optionally returns the timestamp past which a tombstone may be removed. -// This returns a value iff the segment `s` has been marked as cleanly -// compacted, and the compaction_config has a value assigned for -// `tombstone_retention_ms`. In all other cases, `std::nullopt` is returned, -// indicating that tombstone records will not be removed if encountered. -std::optional get_tombstone_delete_horizon( +// Checks if a segment is past the tombstone deletion horizon. +// +// Returns true iff the segment `s` has been marked as cleanly +// compacted, the `compaction_config` has a value assigned for +// `tombstone_retention_ms`, and the current timestamp is greater than +// `clean_compact_timestamp + tombstone_retention_ms`. In all other cases, +// the returned value is false, indicating that tombstone records in the segment +// are not yet eligible for removal. +bool is_past_tombstone_delete_horizon( ss::lw_shared_ptr seg, const compaction_config& cfg); -// Returns true iff the record `r` is a tombstone, and the timestamp returned by -// `now()` is past the `tombstone_delete_horizon` timestamp (in the case it has -// a value). Returns false in all other cases. -bool should_remove_tombstone_record( - const model::record& r, - std::optional tombstone_delete_horizon); - // Mark a segment as completed window compaction, and whether it is "clean" (in // which case the `clean_compact_timestamp` is set in the segment's index). void mark_segment_as_finished_window_compaction( From f48db49bdbf376dc0a2b3a384103b015d95939cb Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 19 Sep 2024 14:33:54 -0400 Subject: [PATCH 2/5] `storage`: add `segment_index::may_have_tombstone_records()` functions For future changes for sliding window compaction scheduling, it is helpful to have a flag that indicates whether a segment may contain tombstone records or not. Add a new `bool` field, `may_have_tombstone_records` to the `index_state`, and mark/unmark its value during segment deduplication and data copying. This field is `true` by default, which can lead to false-positives. A segment is only considered to not have tombstone records after proven by de-duplication/ segment data copying in the compaction process. --- src/v/storage/index_state.cc | 10 +++++++- src/v/storage/index_state.h | 8 ++++++- src/v/storage/segment_deduplication_utils.cc | 17 +++++++++++++- src/v/storage/segment_index.cc | 9 ++++++-- src/v/storage/segment_index.h | 14 +++++++++++- src/v/storage/segment_utils.cc | 24 +++++++++++++++++--- 6 files changed, 73 insertions(+), 9 deletions(-) diff --git a/src/v/storage/index_state.cc b/src/v/storage/index_state.cc index 055e35845d287..8823caa627684 100644 --- a/src/v/storage/index_state.cc +++ b/src/v/storage/index_state.cc @@ -161,6 +161,7 @@ std::ostream& operator<<(std::ostream& o, const index_state& s) { << ", num_compactible_records_appended:" << s.num_compactible_records_appended << ", clean_compact_timestamp:" << s.clean_compact_timestamp + << ", may_have_tombstone_records:" << s.may_have_tombstone_records << ", index(" << s.relative_offset_index.size() << "," << s.relative_time_index.size() << "," << s.position_index.size() << ")}"; @@ -184,6 +185,7 @@ void index_state::serde_write(iobuf& out) const { write(tmp, broker_timestamp); write(tmp, num_compactible_records_appended); write(tmp, clean_compact_timestamp); + write(tmp, may_have_tombstone_records); crc::crc32c crc; crc_extend_iobuf(crc, tmp); @@ -285,6 +287,11 @@ void read_nested( } else { st.clean_compact_timestamp = std::nullopt; } + if (hdr._version >= index_state::may_have_tombstone_records_version) { + read_nested(p, st.may_have_tombstone_records, 0U); + } else { + st.may_have_tombstone_records = true; + } } index_state index_state::copy() const { return *this; } @@ -365,7 +372,8 @@ index_state::index_state(const index_state& o) noexcept , non_data_timestamps(o.non_data_timestamps) , broker_timestamp(o.broker_timestamp) , num_compactible_records_appended(o.num_compactible_records_appended) - , clean_compact_timestamp(o.clean_compact_timestamp) {} + , clean_compact_timestamp(o.clean_compact_timestamp) + , may_have_tombstone_records(o.may_have_tombstone_records) {} namespace serde_compat { uint64_t index_state_serde::checksum(const index_state& r) { diff --git a/src/v/storage/index_state.h b/src/v/storage/index_state.h index 728bda60db878..d71d356f15418 100644 --- a/src/v/storage/index_state.h +++ b/src/v/storage/index_state.h @@ -74,11 +74,12 @@ class offset_time_index { 1 byte - non_data_timestamps */ struct index_state - : serde::envelope, serde::compat_version<4>> { + : serde::envelope, serde::compat_version<4>> { static constexpr auto monotonic_timestamps_version = 5; static constexpr auto broker_timestamp_version = 6; static constexpr auto num_compactible_records_version = 7; static constexpr auto clean_compact_timestamp_version = 8; + static constexpr auto may_have_tombstone_records_version = 9; static index_state make_empty_index(offset_delta_time with_offset); @@ -138,6 +139,11 @@ struct index_state // every previous record in the log. std::optional clean_compact_timestamp{std::nullopt}; + // may_have_tombstone_records is `true` by default, until compaction + // deduplication/segment data copying is performed and it is proven that + // the segment does not contain any tombstone records. + bool may_have_tombstone_records{true}; + size_t size() const; bool empty() const; diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index 16e628d4bd3fa..cef7ac7ab6af4 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -168,8 +168,10 @@ ss::future deduplicate_segment( const bool past_tombstone_delete_horizon = internal::is_past_tombstone_delete_horizon(seg, cfg); + bool may_have_tombstone_records = false; auto copy_reducer = internal::copy_data_segment_reducer( [&map, + &may_have_tombstone_records, segment_last_offset = seg->offsets().get_committed_offset(), past_tombstone_delete_horizon, compaction_placeholder_enabled]( @@ -196,7 +198,14 @@ ss::future deduplicate_segment( return ss::make_ready_future(false); } - return should_keep(map, b, r); + return should_keep(map, b, r).then( + [&may_have_tombstone_records, + is_tombstone = r.is_tombstone()](bool keep) { + if (is_tombstone && keep) { + may_have_tombstone_records = true; + } + return keep; + }); }, &appender, seg->path().is_internal_topic(), @@ -208,8 +217,14 @@ ss::future deduplicate_segment( auto new_idx = co_await std::move(rdr).consume( std::move(copy_reducer), model::no_timeout); + + // restore broker timestamp and clean compact timestamp new_idx.broker_timestamp = seg->index().broker_timestamp(); new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp(); + + // Set may_have_tombstone_records + new_idx.may_have_tombstone_records = may_have_tombstone_records; + co_return new_idx; } diff --git a/src/v/storage/segment_index.cc b/src/v/storage/segment_index.cc index a0bbf14522729..158bca13aaba3 100644 --- a/src/v/storage/segment_index.cc +++ b/src/v/storage/segment_index.cc @@ -47,7 +47,8 @@ segment_index::segment_index( ss::sharded& feature_table, std::optional sanitizer_config, std::optional broker_timestamp, - std::optional clean_compact_timestamp) + std::optional clean_compact_timestamp, + bool may_have_tombstone_records) : _path(std::move(path)) , _step(step) , _feature_table(std::ref(feature_table)) @@ -57,6 +58,7 @@ segment_index::segment_index( _state.base_offset = base; _state.broker_timestamp = broker_timestamp; _state.clean_compact_timestamp = clean_compact_timestamp; + _state.may_have_tombstone_records = may_have_tombstone_records; } segment_index::segment_index( @@ -88,15 +90,18 @@ ss::future segment_index::open() { } void segment_index::reset() { - // Persist the base offset and clean compaction timestamp through a reset. + // Persist the base offset, clean compaction timestamp, and tombstones + // identifier through a reset. auto base = _state.base_offset; auto clean_compact_timestamp = _state.clean_compact_timestamp; + auto may_have_tombstone_records = _state.may_have_tombstone_records; _state = index_state::make_empty_index( storage::internal::should_apply_delta_time_offset(_feature_table)); _state.base_offset = base; _state.clean_compact_timestamp = clean_compact_timestamp; + _state.may_have_tombstone_records = may_have_tombstone_records; _acc = 0; } diff --git a/src/v/storage/segment_index.h b/src/v/storage/segment_index.h index b9645473c99df..bd26c0cc6eca7 100644 --- a/src/v/storage/segment_index.h +++ b/src/v/storage/segment_index.h @@ -130,7 +130,8 @@ class segment_index { ss::sharded& feature_table, std::optional sanitizer_config, std::optional broker_timestamp = std::nullopt, - std::optional clean_compact_timestamp = std::nullopt); + std::optional clean_compact_timestamp = std::nullopt, + bool may_have_tombstone_records = true); ~segment_index() noexcept = default; segment_index(segment_index&&) noexcept = default; @@ -215,6 +216,17 @@ class segment_index { return _state.clean_compact_timestamp; } + void set_may_have_tombstone_records(bool b) { + if (_state.may_have_tombstone_records != b) { + _needs_persistence = true; + } + _state.may_have_tombstone_records = b; + } + + bool may_have_tombstone_records() const { + return _state.may_have_tombstone_records; + } + ss::future materialize_index(); ss::future<> flush(); ss::future<> truncate(model::offset, model::timestamp); diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 18282cb9de134..c6314e8e3eaef 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -415,8 +415,10 @@ ss::future do_copy_segment_data( "copying compacted segment data from {} to {}", seg->reader().filename(), tmpname); + bool may_have_tombstone_records = false; auto should_keep = [compacted_list = std::move(compacted_offsets), - past_tombstone_delete_horizon]( + past_tombstone_delete_horizon, + &may_have_tombstone_records]( const model::record_batch& b, const model::record& r, bool) { @@ -426,7 +428,13 @@ ss::future do_copy_segment_data( } const auto o = b.base_offset() + model::offset_delta(r.offset_delta()); - return ss::make_ready_future(compacted_list.contains(o)); + const auto keep = compacted_list.contains(o); + + if (r.is_tombstone() && keep) { + may_have_tombstone_records = true; + } + + return ss::make_ready_future(keep); }; model::offset segment_last_offset{}; @@ -462,6 +470,9 @@ ss::future do_copy_segment_data( new_index.broker_timestamp = old_broker_timestamp; new_index.clean_compact_timestamp = old_clean_compact_timestamp; + // Set may_have_tombstone_records + new_index.may_have_tombstone_records = may_have_tombstone_records; + co_return new_index; } @@ -895,6 +906,12 @@ make_concatenated_segment( return new_ts; }(); + // If any of the segments contain a tombstone record, then the new index + // should reflect that. + auto new_may_have_tombstone_records = std::ranges::any_of( + segments, + [](const auto& s) { return s->index().may_have_tombstone_records(); }); + segment_index index( index_name, offsets.get_base_offset(), @@ -902,7 +919,8 @@ make_concatenated_segment( feature_table, cfg.sanitizer_config, new_broker_timestamp, - new_clean_compact_timestamp); + new_clean_compact_timestamp, + new_may_have_tombstone_records); co_return std::make_tuple( ss::make_lw_shared( From 06d65d7ff993ed2c8961052a81e98e7ce1d1dcec Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Wed, 18 Sep 2024 17:10:44 -0400 Subject: [PATCH 3/5] `storage`: add `may_have_removable_tombstones()` Adds a helper function that indicates whether or not a segment may have tombstones eligible for deletion. This can return false-positives, since any segment that has not yet been through the compaction process is assumed to potentially have tombstones until proven otherwise. --- src/v/storage/segment_utils.cc | 6 ++++++ src/v/storage/segment_utils.h | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index c6314e8e3eaef..96c07ab9d27f5 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -1169,4 +1169,10 @@ bool is_past_tombstone_delete_horizon( return false; } +bool may_have_removable_tombstones( + ss::lw_shared_ptr seg, const compaction_config& cfg) { + return seg->index().may_have_tombstone_records() + && is_past_tombstone_delete_horizon(seg, cfg); +} + } // namespace storage::internal diff --git a/src/v/storage/segment_utils.h b/src/v/storage/segment_utils.h index 219950a808012..aaf7a6552a9e1 100644 --- a/src/v/storage/segment_utils.h +++ b/src/v/storage/segment_utils.h @@ -269,6 +269,16 @@ offset_delta_time should_apply_delta_time_offset( bool is_past_tombstone_delete_horizon( ss::lw_shared_ptr seg, const compaction_config& cfg); +// Checks if a segment may have any tombstones currently eligible for deletion. +// +// Returns true if the segment is marked as potentially having tombstone +// records, and if the result of evaluating +// `is_past_tombstone_delete_horizon(seg, cfg)` is also true. This can return +// false-positives, since segments that have not yet gone through the compaction +// process are assumed to potentially contain tombstones until proven otherwise. +bool may_have_removable_tombstones( + ss::lw_shared_ptr seg, const compaction_config& cfg); + // Mark a segment as completed window compaction, and whether it is "clean" (in // which case the `clean_compact_timestamp` is set in the segment's index). void mark_segment_as_finished_window_compaction( From 356b76dac58f90b8f3c649b001dfee787467cb73 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Wed, 18 Sep 2024 17:15:23 -0400 Subject: [PATCH 4/5] `storage`: alter `disk_log_impl::find_sliding_range()` This commit does two things, which seemed easier to combine into one commit: 1. The behavior of sliding window compaction is changed, such that newly added/closed segments are ignored until the current "round" of sliding window compaction cleanly compacts all segments down to the start of the range. This allows for sliding window compaction to avoid a situation where clean segments are not being produced due to a high ingress rate or key cardinality (this situation would prevent timely tombstone removal). `_last_compaction_window_offset` must reach the base offset of the first segment in the currently active window before new segments will be considered for compaction. 2. Segments that are cleanly compacted or have been through a round of window compaction are considered in the sliding window range. However, it would be a no-op to actually perform window compaction over these segments. Self-compaction is performed to remove tombstones on segments that may contain them, and all cleanly compacted segments are removed before sliding window compaction occurs. This allows for timely tombstone removal by avoiding the situation in which a partition which is no longer being produced to can still trigger tombstone removal. Both of these changes improve the rate at which tombstone removal occurs, and help prevent clean segment/tombstone removal "starvation". --- src/v/storage/disk_log_impl.cc | 100 +++++++++----- src/v/storage/disk_log_impl.h | 12 +- src/v/storage/segment_utils.cc | 4 +- src/v/storage/tests/compaction_e2e_test.cc | 59 ++++----- .../tests/segment_deduplication_test.cc | 123 +++++++++++++++++- 5 files changed, 228 insertions(+), 70 deletions(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 00ae6208c9248..2fc041e38e94a 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -500,6 +500,26 @@ ss::future<> disk_log_impl::adjacent_merge_compact( segment_set disk_log_impl::find_sliding_range( const compaction_config& cfg, std::optional new_start_offset) { + if ( + _last_compaction_window_start_offset.has_value() + && (_last_compaction_window_start_offset.value() + <= _segs.front()->offsets().get_base_offset())) { + // If this evaluates to true, it is likely because local retention has + // removed segments. e.g, segments ([0],[1],[2]), have been garbage + // collected to segments ([2]), while _last_window_compaction_offset + // == 1. To avoid compaction getting stuck in this situation, we reset + // the compaction window offset here. + vlog( + gclog.debug, + "[{}] start offset ({}) <= base offset of front segment ({}), " + "resetting compaction window start offset", + config().ntp(), + _last_compaction_window_start_offset.value(), + _segs.front()->offsets().get_base_offset()); + + _last_compaction_window_start_offset.reset(); + } + // Collect all segments that have stable data. segment_set::underlying_t buf; for (const auto& seg : _segs) { @@ -507,12 +527,22 @@ segment_set disk_log_impl::find_sliding_range( // Stop once we get to an unstable segment. break; } + if ( + _last_compaction_window_start_offset.has_value() + && (seg->offsets().get_base_offset() + >= _last_compaction_window_start_offset.value())) { + // Force clean segment production by compacting down to the + // start of the log before considering new segments in the + // compaction window. + break; + } if ( new_start_offset && seg->offsets().get_base_offset() < new_start_offset.value()) { // Skip over segments that are being truncated. continue; } + buf.emplace_back(seg); } segment_set segs(std::move(buf)); @@ -520,28 +550,6 @@ segment_set disk_log_impl::find_sliding_range( return segs; } - // If a previous sliding window compaction ran, and there are no new - // segments, segments at the start of that window and above have been - // fully deduplicated and don't need to be compacted further. - // - // If there are new segments that have not been compacted, we can't make - // this claim, and compact everything again. - if ( - segs.back()->finished_windowed_compaction() - && _last_compaction_window_start_offset.has_value()) { - while (!segs.empty()) { - if ( - segs.back()->offsets().get_base_offset() - >= _last_compaction_window_start_offset.value()) { - // A previous compaction deduplicated the keys above this - // offset. As such, segments above this point would not benefit - // from being included in the compaction window. - segs.pop_back(); - } else { - break; - } - } - } return segs; } @@ -559,9 +567,7 @@ ss::future disk_log_impl::sliding_window_compact( if (cfg.asrc) { cfg.asrc->check(); } - if (seg->finished_self_compaction()) { - continue; - } + auto result = co_await storage::internal::self_compact_segment( seg, _stm_manager, @@ -571,6 +577,10 @@ ss::future disk_log_impl::sliding_window_compact( _manager.resources(), _feature_table); + if (result.did_compact() == false) { + continue; + } + vlog( gclog.debug, "[{}] segment {} self compaction result: {}", @@ -579,6 +589,17 @@ ss::future disk_log_impl::sliding_window_compact( result); has_self_compacted = true; } + + // Remove any of the segments that have already been cleanly compacted. They + // would be no-ops to compact. + while (!segs.empty()) { + if (segs.back()->index().has_clean_compact_timestamp()) { + segs.pop_back(); + } else { + break; + } + } + // Remove any of the beginning segments that don't have any // compactible records. They would be no-ops to compact. while (!segs.empty()) { @@ -592,7 +613,12 @@ ss::future disk_log_impl::sliding_window_compact( segs.pop_front(); } if (segs.empty()) { - vlog(gclog.debug, "[{}] no more segments to compact", config().ntp()); + vlog( + gclog.debug, + "[{}] no segments left in sliding window to compact (all segments " + "were already cleanly compacted, or did not have any compactible " + "records)", + config().ntp()); co_return has_self_compacted; } vlog( @@ -641,6 +667,23 @@ ss::future disk_log_impl::sliding_window_compact( idx_start_offset, map.max_offset()); + std::optional next_window_start_offset = idx_start_offset; + if (idx_start_offset == segs.front()->offsets().get_base_offset()) { + // We have cleanly compacted up to the first segment in the sliding + // range (not necessarily equivalent to the first segment in the log- + // segments may have been removed from the sliding range if they were + // already cleanly compacted or had no compactible offsets). Reset the + // start offset to allow new segments into the sliding window range. + vlog( + gclog.debug, + "[{}] fully de-duplicated up to start of sliding range with offset " + "{}, resetting sliding window start offset", + config().ntp(), + idx_start_offset); + + next_window_start_offset.reset(); + } + auto segment_modify_lock = co_await _segment_rewrite_lock.get_units(); for (auto& seg : segs) { if (cfg.asrc) { @@ -792,7 +835,7 @@ ss::future disk_log_impl::sliding_window_compact( gclog.debug, "[{}] Final compacted segment {}", config().ntp(), seg); } - _last_compaction_window_start_offset = idx_start_offset; + _last_compaction_window_start_offset = next_window_start_offset; co_return true; } @@ -1269,8 +1312,7 @@ ss::future<> disk_log_impl::do_compact( bool compacted = did_compact_fut.get(); if (!compacted) { // If sliding window compaction did not occur, we fall back to adjacent - // segment compaction (as self compaction of segments occured in - // sliding_window_compact()). + // segment compaction. co_await compact_adjacent_segments(compact_cfg); } } diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index b709c76e5b2f1..740fcc6a4656a 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -187,12 +187,20 @@ class disk_log_impl final : public log { std::optional retention_offset(gc_config) const final; // Collects an iterable list of segments over which to perform sliding - // window compaction. + // window compaction. This can include segments which have already had their + // keys de-duplicated in every segment between the start of the log and + // themselves (these are referred to as "clean" segments). These segments + // would be no-ops to include in sliding window compaction, but they are + // included in the range anyways in order to allow for timely tombstone + // record removal via self-compaction, and to ensure that this function + // returns a contiguous range of segments. It is up to the caller to filter + // out these already cleanly-compacted segments. segment_set find_sliding_range( const compaction_config& cfg, std::optional new_start_offset = std::nullopt); - void set_last_compaction_window_start_offset(model::offset o) { + void + set_last_compaction_window_start_offset(std::optional o) { _last_compaction_window_start_offset = o; } diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 96c07ab9d27f5..7308b58e3fa76 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -738,7 +738,9 @@ ss::future self_compact_segment( "Cannot compact an active segment. cfg:{} - segment:{}", cfg, s)); } - if (s->finished_self_compaction() || !s->has_compactible_offsets(cfg)) { + if ( + !s->has_compactible_offsets(cfg) + || (s->finished_self_compaction() && !may_have_removable_tombstones(s, cfg))) { co_return compaction_result{s->size_bytes()}; } diff --git a/src/v/storage/tests/compaction_e2e_test.cc b/src/v/storage/tests/compaction_e2e_test.cc index 1449c1993d22a..498ce2c8928b1 100644 --- a/src/v/storage/tests/compaction_e2e_test.cc +++ b/src/v/storage/tests/compaction_e2e_test.cc @@ -506,7 +506,7 @@ TEST_P(CompactionFilledReaderTest, ReadFilledGaps) { model::offset end_offset = consume_to_end ? model::offset::max() : model::offset{random_generators::get_int( - start_offset(), log_end_offset())}; + start_offset(), log_end_offset())}; storage::log_reader_config reader_cfg{ start_offset, @@ -729,10 +729,6 @@ TEST_F(CompactionFixtureTest, TestTombstones) { // to compact the tombstone record will be eligible for deletion. ss::sleep(tombstone_retention_ms).get(); - // Flush and force roll the log, so that sliding window compaction can - // occur. - log->flush().get(); - log->force_roll(ss::default_priority_class()).get(); did_compact = do_sliding_window_compact( log->segments().back()->offsets().get_base_offset(), tombstone_retention_ms) @@ -801,6 +797,9 @@ TEST_P(CompactionFixtureTombstonesParamTest, TestTombstonesCompletelyEmptyLog) { .get(); ASSERT_TRUE(did_compact); + for (int i = 0; i < num_segments; ++i) { + ASSERT_TRUE(log->segments()[i]->index().has_clean_compact_timestamp()); + } { tests::kafka_consume_transport consumer(make_kafka_client().get()); @@ -828,11 +827,6 @@ TEST_P(CompactionFixtureTombstonesParamTest, TestTombstonesCompletelyEmptyLog) { // to compact the tombstone records will be eligible for deletion. ss::sleep(tombstone_retention_ms).get(); - // Restart to allow sliding window compaction to occur - restart(should_wipe::no); - wait_for_leader(ntp).get(); - partition = app.partition_manager.local().get(ntp).get(); - log = partition->log().get(); did_compact = do_sliding_window_compact( log->segments().back()->offsets().get_base_offset(), tombstone_retention_ms) @@ -848,7 +842,6 @@ TEST_P(CompactionFixtureTombstonesParamTest, TestTombstonesCompletelyEmptyLog) { model::partition_id(0), model::offset(0)) .get(); - // The tombstones should have been removed after second round of // compaction post tombstone.retention.ms. ASSERT_TRUE(consumed_kvs.empty()); @@ -917,14 +910,11 @@ TEST_P( auto num_records_produced = latest_kv_map.size() - num_tombstones_produced; - std::optional tombstone_retention_ms - = wait_for_retention_ms ? 1000ms - : std::optional{}; - // Perform first round of sliding window compaction. + // Don't allow for tombstone clean-up to occur. bool did_compact = do_sliding_window_compact( log->segments().back()->offsets().get_base_offset(), - tombstone_retention_ms) + std::nullopt) .get(); ASSERT_TRUE(did_compact); @@ -952,23 +942,25 @@ TEST_P( } } + std::optional tombstone_retention_ms + = wait_for_retention_ms ? 1000ms + : std::optional{}; + // Maybe sleep for tombstone.retention.ms time, so that the next time we // attempt to compact the tombstone records will be eligible for deletion. if (wait_for_retention_ms) { ss::sleep(tombstone_retention_ms.value()).get(); } - // Restart to allow sliding window compaction to occur - restart(should_wipe::no); - wait_for_leader(ntp).get(); - partition = app.partition_manager.local().get(ntp).get(); - log = partition->log().get(); did_compact = do_sliding_window_compact( log->segments().back()->offsets().get_base_offset(), tombstone_retention_ms) .get(); - ASSERT_TRUE(did_compact); + // Compaction will only have occurred if tombstones were eligible for + // deletion. + ASSERT_EQ(did_compact, wait_for_retention_ms); + { tests::kafka_consume_transport consumer(make_kafka_client().get()); consumer.start().get(); @@ -1058,17 +1050,14 @@ TEST_P( auto num_records_produced = latest_kv_map.size() - num_tombstones_produced; - std::optional tombstone_retention_ms - = wait_for_retention_ms ? 1000ms - : std::optional{}; - int prev_num_clean_compacted = 0; bool did_compact = true; // Perform as many rounds of sliding window compaction as required. + // Don't allow for tombstone clean-up to occur. while (did_compact) { did_compact = do_sliding_window_compact( log->segments().back()->offsets().get_base_offset(), - tombstone_retention_ms, + std::nullopt, max_keys) .get(); @@ -1085,8 +1074,6 @@ TEST_P( // All segments should be clean, minus the active segment. ASSERT_EQ(prev_num_clean_compacted, log->segment_count() - 1); - // Assume above rounds of compaction finished before tombstone.retention.ms - // time passed. { tests::kafka_consume_transport consumer(make_kafka_client().get()); consumer.start().get(); @@ -1109,23 +1096,25 @@ TEST_P( ASSERT_EQ(consumed_kvs.size(), latest_kv_map.size()); } + std::optional tombstone_retention_ms + = wait_for_retention_ms ? 1000ms + : std::optional{}; + // Maybe sleep for tombstone.retention.ms time, so that the next time we // attempt to compact the tombstone records will be eligible for deletion. if (wait_for_retention_ms) { ss::sleep(tombstone_retention_ms.value()).get(); } - // Restart to allow sliding window compaction to occur - restart(should_wipe::no); - wait_for_leader(ntp).get(); - partition = app.partition_manager.local().get(ntp).get(); - log = partition->log().get(); did_compact = do_sliding_window_compact( log->segments().back()->offsets().get_base_offset(), tombstone_retention_ms) .get(); - ASSERT_TRUE(did_compact); + // Compaction will only have occurred if tombstones were eligible for + // deletion. + ASSERT_EQ(did_compact, wait_for_retention_ms); + { tests::kafka_consume_transport consumer(make_kafka_client().get()); consumer.start().get(); diff --git a/src/v/storage/tests/segment_deduplication_test.cc b/src/v/storage/tests/segment_deduplication_test.cc index 44c03cf97047c..c0f2f7f1253d8 100644 --- a/src/v/storage/tests/segment_deduplication_test.cc +++ b/src/v/storage/tests/segment_deduplication_test.cc @@ -10,6 +10,7 @@ #include "gmock/gmock.h" #include "model/record_batch_types.h" #include "model/tests/random_batch.h" +#include "model/timestamp.h" #include "random/generators.h" #include "storage/chunk_cache.h" #include "storage/disk_log_impl.h" @@ -18,15 +19,18 @@ #include "storage/segment_utils.h" #include "storage/tests/disk_log_builder_fixture.h" #include "storage/tests/utils/disk_log_builder.h" +#include "storage/types.h" #include "test_utils/test.h" #include #include #include +#include #include using namespace storage; +using namespace std::chrono_literals; namespace { ss::abort_source never_abort; @@ -40,7 +44,9 @@ void add_segments( int num_segs, int records_per_seg = 10, int start_offset = 0, - bool mark_compacted = true) { + bool mark_compacted = true, + bool may_have_tombstones = true, + std::optional clean_compacted_ts = std::nullopt) { auto& disk_log = b.get_disk_log_impl(); for (int i = 0; i < num_segs; i++) { auto offset = start_offset + i * records_per_seg; @@ -53,6 +59,13 @@ void add_segments( seg->mark_as_finished_self_compaction(); seg->mark_as_finished_windowed_compaction(); } + + seg->index().set_may_have_tombstone_records(may_have_tombstones); + + if (clean_compacted_ts.has_value()) { + seg->index().maybe_set_clean_compact_timestamp( + clean_compacted_ts.value()); + } if (seg->has_appender()) { seg->appender().close().get(); seg->release_appender(); @@ -65,9 +78,18 @@ void build_segments( int num_segs, int records_per_seg = 10, int start_offset = 0, - bool mark_compacted = true) { + bool mark_compacted = true, + bool may_have_tombstones = true, + std::optional clean_compacted_ts = std::nullopt) { b | start(); - add_segments(b, num_segs, records_per_seg, start_offset, mark_compacted); + add_segments( + b, + num_segs, + records_per_seg, + start_offset, + mark_compacted, + may_have_tombstones, + clean_compacted_ts); } TEST(FindSlidingRangeTest, TestCollectSegments) { @@ -219,6 +241,101 @@ TEST(FindSlidingRangeTest, TestEmptySegmentNoCompactibleRecords) { } } +TEST(FindSlidingRangeTest, TestAllCleanlyCompactedSegments) { + storage::disk_log_builder b; + const auto num_segs = 3; + // Mark as compacted, do not have tombstones, and cleanly compacted at a + // previous timestamp. + build_segments(b, num_segs, 10, 0, true, false, model::timestamp{0}); + auto cleanup = ss::defer([&] { b.stop().get(); }); + auto& disk_log = b.get_disk_log_impl(); + compaction_config cfg( + model::offset{30}, 1ms, ss::default_priority_class(), never_abort); + auto segs = disk_log.find_sliding_range(cfg, model::offset{0}); + // All cleanly compacted segments are still considered in the range. + ASSERT_EQ(segs.size(), num_segs); +} + +TEST(FindSlidingRangeTest, TestCompactionLastSegmentNotCompacted) { + storage::disk_log_builder b; + const auto num_segs = 3; + // Mark as not compacted. + build_segments(b, num_segs, 10, 0, false); + auto cleanup = ss::defer([&] { b.stop().get(); }); + auto& disk_log = b.get_disk_log_impl(); + compaction_config cfg( + model::offset{30}, + std::nullopt, + ss::default_priority_class(), + never_abort); + auto segs = disk_log.find_sliding_range(cfg); + ASSERT_EQ(3, segs.size()); + ASSERT_EQ(segs.front()->offsets().get_base_offset(), model::offset{0}); + + // Set the last window start offset to 20. Now, even though the last segment + // in the group is marked as not compacted, it still will not be considered + // in the window. + disk_log.set_last_compaction_window_start_offset(model::offset(20)); + segs = disk_log.find_sliding_range(cfg); + ASSERT_EQ(2, segs.size()); + + // Reset the last window start offset, and now all segments are once again + // considered in the window. + disk_log.set_last_compaction_window_start_offset(std::nullopt); + segs = disk_log.find_sliding_range(cfg); + ASSERT_EQ(3, segs.size()); +} + +TEST(FindSlidingRangeTest, TestWindowWithRemovedSegments) { + storage::disk_log_builder b; + const auto num_segs = 3; + // Mark as not compacted + build_segments(b, num_segs, 10, 0, false); + auto cleanup = ss::defer([&] { b.stop().get(); }); + auto& disk_log = b.get_disk_log_impl(); + + // Set the last compaction window start offset, then remove a segment from + // the log such that start offset < the log's base offset. + disk_log.set_last_compaction_window_start_offset(model::offset(5)); + disk_log.segments().pop_front(); + + compaction_config cfg( + model::offset{30}, 1ms, ss::default_priority_class(), never_abort); + auto segs = disk_log.find_sliding_range(cfg, model::offset{0}); + + // We should have reset the compaction window start offset, and had the + // remaining two segments in the sliding range. + ASSERT_EQ(segs.size(), 2); + ASSERT_FALSE( + disk_log.get_last_compaction_window_start_offset().has_value()); +} + +TEST(FindSlidingRangeTest, TestWindowWithTruncatedSegments) { + storage::disk_log_builder b; + const auto num_segs = 3; + // Mark as not compacted + build_segments(b, num_segs, 10, 0, false); + auto cleanup = ss::defer([&] { b.stop().get(); }); + auto& disk_log = b.get_disk_log_impl(); + + // Set the last compaction window start offset, then prefix truncate the log + // such that start offset < the log's base offset. + disk_log.set_last_compaction_window_start_offset(model::offset(5)); + truncate_prefix_config trunc_cfg( + model::offset{10}, ss::default_priority_class()); + disk_log.truncate_prefix(trunc_cfg).get(); + + compaction_config cfg( + model::offset{30}, 1ms, ss::default_priority_class(), never_abort); + auto segs = disk_log.find_sliding_range(cfg, model::offset{0}); + + // We should have reset the compaction window start offset, and had the + // remaining two segments in the sliding range. + ASSERT_EQ(segs.size(), 2); + ASSERT_FALSE( + disk_log.get_last_compaction_window_start_offset().has_value()); +} + TEST(BuildOffsetMap, TestBuildSimpleMap) { storage::disk_log_builder b; build_segments(b, 3); From 262600310ec7dde3f8735f9055a33e2a8280a390 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Wed, 18 Sep 2024 22:08:51 -0400 Subject: [PATCH 5/5] `storage`: don't index cleanly compacted segments Cleanly compacted segments do not need to have their keys added to the compaction offset map, since the deduplication process considers unindexed keys to be valid records to keep. By not indexing these segments, the compaction process can use less memory and cleanly compact down to the start of the log faster. --- src/v/storage/disk_log_impl.h | 5 ++ src/v/storage/segment_deduplication_utils.cc | 18 ++++ src/v/storage/tests/compaction_e2e_test.cc | 88 +++++++++++++++++++- 3 files changed, 110 insertions(+), 1 deletion(-) diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 740fcc6a4656a..05101a394d27a 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -204,6 +204,11 @@ class disk_log_impl final : public log { _last_compaction_window_start_offset = o; } + const std::optional& + get_last_compaction_window_start_offset() const { + return _last_compaction_window_start_offset; + } + readers_cache& readers() { return *_readers_cache; } storage_resources& resources(); diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index cef7ac7ab6af4..1ae96a5411448 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -110,6 +110,24 @@ ss::future build_offset_map( cfg.asrc->check(); } auto seg = *iter; + if (seg->index().has_clean_compact_timestamp()) { + // This segment has already been fully deduplicated, so building the + // offset map for it would be pointless. + vlog( + gclog.trace, + "segment is already cleanly compacted, no need to add it to the " + "offset_map: {}", + seg->filename()); + + min_segment_fully_indexed = seg->offsets().get_base_offset(); + + if (iter == segs.begin()) { + break; + } else { + --iter; + continue; + } + } vlog(gclog.trace, "Adding segment to offset map: {}", seg->filename()); try { diff --git a/src/v/storage/tests/compaction_e2e_test.cc b/src/v/storage/tests/compaction_e2e_test.cc index 498ce2c8928b1..ff5d93648ec9d 100644 --- a/src/v/storage/tests/compaction_e2e_test.cc +++ b/src/v/storage/tests/compaction_e2e_test.cc @@ -363,6 +363,92 @@ TEST_F(CompactionFixtureTest, TestDedupeMultiPass) { ASSERT_NO_FATAL_FAILURE(check_records(cardinality, num_segments - 1).get()); } +TEST_F(CompactionFixtureTest, TestDedupeMultiPassAddedSegment) { + constexpr auto duplicates_per_key = 10; + constexpr auto num_segments = 25; + constexpr auto total_records = 100; + constexpr auto cardinality = total_records / duplicates_per_key; // 10 + size_t records_per_segment = total_records / num_segments; // 4 + generate_data(num_segments, cardinality, records_per_segment).get(); + + // Compact, but with a map size that requires us to compact multiple times + // to compact everything. + ss::abort_source never_abort; + auto& disk_log = dynamic_cast(*log); + storage::compaction_config cfg( + disk_log.segments().back()->offsets().get_base_offset(), + std::nullopt, + ss::default_priority_class(), + never_abort, + std::nullopt, + cardinality - 1); + disk_log.sliding_window_compact(cfg).get(); + const auto& segs = disk_log.segments(); + + auto segments_compacted = disk_log.get_probe().get_segments_compacted(); + + // After first round of compaction, we should have a value for the window + // start offset. + ASSERT_TRUE(disk_log.get_last_compaction_window_start_offset().has_value()); + + // Add an additional segment. This won't be considered for sliding window + // compaction until the first window of segments is fully compacted. + generate_data(1, cardinality, 1, 1, total_records).get(); + + // Another attempt to compact will actually rewrite segments, but not the + // last one. + disk_log.sliding_window_compact(cfg).get(); + auto segments_compacted_2 = disk_log.get_probe().get_segments_compacted(); + ASSERT_LT(segments_compacted, segments_compacted_2); + + // segs.size() - 2 to account for active segment. + for (int i = 0; i < segs.size() - 2; ++i) { + auto& seg = segs[i]; + ASSERT_TRUE(seg->finished_windowed_compaction()); + ASSERT_TRUE(seg->finished_self_compaction()); + ASSERT_TRUE(seg->index().has_clean_compact_timestamp()); + } + + // The last added segment should not have had any compaction operations + // performed. + ASSERT_FALSE(segs[segs.size() - 2]->finished_windowed_compaction()); + ASSERT_FALSE(segs[segs.size() - 2]->finished_self_compaction()); + ASSERT_FALSE(segs[segs.size() - 2]->index().has_clean_compact_timestamp()); + + // We should have compacted all the way down to the start of the log, and + // reset the start offset. + ASSERT_FALSE( + disk_log.get_last_compaction_window_start_offset().has_value()); + + // Another round of compaction to cleanly compact the newly added segment. + disk_log.sliding_window_compact(cfg).get(); + + // Now, these values should be set. + ASSERT_TRUE(segs[segs.size() - 2]->finished_windowed_compaction()); + ASSERT_TRUE(segs[segs.size() - 2]->finished_self_compaction()); + ASSERT_TRUE(segs[segs.size() - 2]->index().has_clean_compact_timestamp()); + + auto segments_compacted_3 = disk_log.get_probe().get_segments_compacted(); + ASSERT_LT(segments_compacted_2, segments_compacted_3); + + // We would have fully indexed the new segment, and since the rest of the + // segments are already cleanly compacted, our start window should once + // again have been reset. + ASSERT_FALSE( + disk_log.get_last_compaction_window_start_offset().has_value()); + + // But the above compaction should deduplicate any remaining keys. + // Subsequent compactions will be no-ops. + disk_log.sliding_window_compact(cfg).get(); + auto segments_compacted_4 = disk_log.get_probe().get_segments_compacted(); + ASSERT_EQ(segments_compacted_3, segments_compacted_4); + + ASSERT_FALSE( + disk_log.get_last_compaction_window_start_offset().has_value()); + + ASSERT_NO_FATAL_FAILURE(check_records(cardinality, num_segments - 1).get()); +} + class CompactionFixtureBatchSizeParamTest : public CompactionFixtureTest , public ::testing::WithParamInterface {}; @@ -506,7 +592,7 @@ TEST_P(CompactionFilledReaderTest, ReadFilledGaps) { model::offset end_offset = consume_to_end ? model::offset::max() : model::offset{random_generators::get_int( - start_offset(), log_end_offset())}; + start_offset(), log_end_offset())}; storage::log_reader_config reader_cfg{ start_offset,