Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: compaction enhancements #23380

Merged
merged 5 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 71 additions & 29 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,48 +500,56 @@ ss::future<> disk_log_impl::adjacent_merge_compact(

segment_set disk_log_impl::find_sliding_range(
const compaction_config& cfg, std::optional<model::offset> new_start_offset) {
if (
_last_compaction_window_start_offset.has_value()
&& (_last_compaction_window_start_offset.value()
<= _segs.front()->offsets().get_base_offset())) {
// If this evaluates to true, it is likely because local retention has
// removed segments. e.g, segments ([0],[1],[2]), have been garbage
// collected to segments ([2]), while _last_window_compaction_offset
// == 1. To avoid compaction getting stuck in this situation, we reset
// the compaction window offset here.
vlog(
gclog.debug,
"[{}] start offset ({}) <= base offset of front segment ({}), "
"resetting compaction window start offset",
config().ntp(),
_last_compaction_window_start_offset.value(),
_segs.front()->offsets().get_base_offset());

_last_compaction_window_start_offset.reset();
}

// Collect all segments that have stable data.
segment_set::underlying_t buf;
for (const auto& seg : _segs) {
if (seg->has_appender() || !seg->has_compactible_offsets(cfg)) {
// Stop once we get to an unstable segment.
break;
}
if (
_last_compaction_window_start_offset.has_value()
&& (seg->offsets().get_base_offset()
>= _last_compaction_window_start_offset.value())) {
// Force clean segment production by compacting down to the
// start of the log before considering new segments in the
// compaction window.
break;
Comment on lines +530 to +537
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems possible that segments could have been removed such that the first segment falls above _last_compaction_window_start_offset. In that case, does this always return empty and we get stuck never compacting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should reset _last_compaction_window_start_offset at the top of this method. Then we wouldn't need to worry about resetting it at compaction time

Copy link
Contributor Author

@WillemKauf WillemKauf Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout. Added a new sanity check to the top of find_sliding_range() to reset the _last_compaction_window_start_offset if it is <= _segs.front()->offsets().get_base_offset().

However, the code which resets it in sliding_window_compact is still required- we want to compare idx_start_offset to the base offset of the first segment in our sliding window range (which is not necessarily the front segment in the log) as an indicator of whether or not we can reset the start offset and allow new segments into the range.

}
if (
new_start_offset
&& seg->offsets().get_base_offset() < new_start_offset.value()) {
// Skip over segments that are being truncated.
continue;
}

buf.emplace_back(seg);
}
segment_set segs(std::move(buf));
if (segs.empty()) {
return segs;
}

// If a previous sliding window compaction ran, and there are no new
// segments, segments at the start of that window and above have been
// fully deduplicated and don't need to be compacted further.
//
// If there are new segments that have not been compacted, we can't make
// this claim, and compact everything again.
if (
Copy link
Contributor Author

@WillemKauf WillemKauf Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may look a bit scary to remove this chunk of code, but this is functionally performing the same "clean-compacted" check we now perform here (since, logically, is_clean_compacted is defined this way here.)

We want to leave these segments in the compaction range so that they can be self-compacted in the case that they contain tombstones, and then filter them out afterwards before proceeding to sliding window compaction.

segs.back()->finished_windowed_compaction()
&& _last_compaction_window_start_offset.has_value()) {
while (!segs.empty()) {
if (
segs.back()->offsets().get_base_offset()
>= _last_compaction_window_start_offset.value()) {
// A previous compaction deduplicated the keys above this
// offset. As such, segments above this point would not benefit
// from being included in the compaction window.
segs.pop_back();
} else {
break;
}
}
}
return segs;
}

Expand All @@ -559,9 +567,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
if (cfg.asrc) {
cfg.asrc->check();
}
if (seg->finished_self_compaction()) {
continue;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed code here was a superfluous check that we perform in self_compact_segment() anyways. move continue logic after call by checking for result.did_compact() == false.

auto result = co_await storage::internal::self_compact_segment(
seg,
_stm_manager,
Expand All @@ -571,6 +577,10 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
_manager.resources(),
_feature_table);

if (result.did_compact() == false) {
continue;
}

vlog(
gclog.debug,
"[{}] segment {} self compaction result: {}",
Expand All @@ -579,6 +589,17 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
result);
has_self_compacted = true;
}

// Remove any of the segments that have already been cleanly compacted. They
// would be no-ops to compact.
while (!segs.empty()) {
if (segs.back()->index().has_clean_compact_timestamp()) {
segs.pop_back();
} else {
break;
}
}

// Remove any of the beginning segments that don't have any
// compactible records. They would be no-ops to compact.
while (!segs.empty()) {
Expand All @@ -592,7 +613,12 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
segs.pop_front();
}
if (segs.empty()) {
vlog(gclog.debug, "[{}] no more segments to compact", config().ntp());
vlog(
gclog.debug,
"[{}] no segments left in sliding window to compact (all segments "
"were already cleanly compacted, or did not have any compactible "
"records)",
config().ntp());
co_return has_self_compacted;
}
vlog(
Expand Down Expand Up @@ -641,6 +667,23 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
idx_start_offset,
map.max_offset());

std::optional<model::offset> next_window_start_offset = idx_start_offset;
if (idx_start_offset == segs.front()->offsets().get_base_offset()) {
// We have cleanly compacted up to the first segment in the sliding
// range (not necessarily equivalent to the first segment in the log-
// segments may have been removed from the sliding range if they were
// already cleanly compacted or had no compactible offsets). Reset the
// start offset to allow new segments into the sliding window range.
vlog(
gclog.debug,
"[{}] fully de-duplicated up to start of sliding range with offset "
"{}, resetting sliding window start offset",
config().ntp(),
idx_start_offset);

next_window_start_offset.reset();
}

auto segment_modify_lock = co_await _segment_rewrite_lock.get_units();
for (auto& seg : segs) {
if (cfg.asrc) {
Expand Down Expand Up @@ -792,7 +835,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
gclog.debug, "[{}] Final compacted segment {}", config().ntp(), seg);
}

_last_compaction_window_start_offset = idx_start_offset;
_last_compaction_window_start_offset = next_window_start_offset;

co_return true;
}
Expand Down Expand Up @@ -1269,8 +1312,7 @@ ss::future<> disk_log_impl::do_compact(
bool compacted = did_compact_fut.get();
if (!compacted) {
// If sliding window compaction did not occur, we fall back to adjacent
// segment compaction (as self compaction of segments occured in
// sliding_window_compact()).
// segment compaction.
co_await compact_adjacent_segments(compact_cfg);
}
}
Expand Down
17 changes: 15 additions & 2 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,28 @@ class disk_log_impl final : public log {
std::optional<model::offset> retention_offset(gc_config) const final;

// Collects an iterable list of segments over which to perform sliding
// window compaction.
// window compaction. This can include segments which have already had their
// keys de-duplicated in every segment between the start of the log and
// themselves (these are referred to as "clean" segments). These segments
// would be no-ops to include in sliding window compaction, but they are
// included in the range anyways in order to allow for timely tombstone
// record removal via self-compaction, and to ensure that this function
// returns a contiguous range of segments. It is up to the caller to filter
// out these already cleanly-compacted segments.
segment_set find_sliding_range(
const compaction_config& cfg,
std::optional<model::offset> new_start_offset = std::nullopt);

void set_last_compaction_window_start_offset(model::offset o) {
void
set_last_compaction_window_start_offset(std::optional<model::offset> o) {
_last_compaction_window_start_offset = o;
}

const std::optional<model::offset>&
get_last_compaction_window_start_offset() const {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing.

return _last_compaction_window_start_offset;
}

readers_cache& readers() { return *_readers_cache; }

storage_resources& resources();
Expand Down
10 changes: 9 additions & 1 deletion src/v/storage/index_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ std::ostream& operator<<(std::ostream& o, const index_state& s) {
<< ", num_compactible_records_appended:"
<< s.num_compactible_records_appended
<< ", clean_compact_timestamp:" << s.clean_compact_timestamp
<< ", may_have_tombstone_records:" << s.may_have_tombstone_records
<< ", index(" << s.relative_offset_index.size() << ","
<< s.relative_time_index.size() << "," << s.position_index.size()
<< ")}";
Expand All @@ -184,6 +185,7 @@ void index_state::serde_write(iobuf& out) const {
write(tmp, broker_timestamp);
write(tmp, num_compactible_records_appended);
write(tmp, clean_compact_timestamp);
write(tmp, may_have_tombstone_records);

crc::crc32c crc;
crc_extend_iobuf(crc, tmp);
Expand Down Expand Up @@ -285,6 +287,11 @@ void read_nested(
} else {
st.clean_compact_timestamp = std::nullopt;
}
if (hdr._version >= index_state::may_have_tombstone_records_version) {
read_nested(p, st.may_have_tombstone_records, 0U);
} else {
st.may_have_tombstone_records = true;
}
}

index_state index_state::copy() const { return *this; }
Expand Down Expand Up @@ -365,7 +372,8 @@ index_state::index_state(const index_state& o) noexcept
, non_data_timestamps(o.non_data_timestamps)
, broker_timestamp(o.broker_timestamp)
, num_compactible_records_appended(o.num_compactible_records_appended)
, clean_compact_timestamp(o.clean_compact_timestamp) {}
, clean_compact_timestamp(o.clean_compact_timestamp)
, may_have_tombstone_records(o.may_have_tombstone_records) {}

namespace serde_compat {
uint64_t index_state_serde::checksum(const index_state& r) {
Expand Down
8 changes: 7 additions & 1 deletion src/v/storage/index_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ class offset_time_index {
1 byte - non_data_timestamps
*/
struct index_state
: serde::envelope<index_state, serde::version<8>, serde::compat_version<4>> {
: serde::envelope<index_state, serde::version<9>, serde::compat_version<4>> {
static constexpr auto monotonic_timestamps_version = 5;
static constexpr auto broker_timestamp_version = 6;
static constexpr auto num_compactible_records_version = 7;
static constexpr auto clean_compact_timestamp_version = 8;
static constexpr auto may_have_tombstone_records_version = 9;

static index_state make_empty_index(offset_delta_time with_offset);

Expand Down Expand Up @@ -138,6 +139,11 @@ struct index_state
// every previous record in the log.
std::optional<model::timestamp> clean_compact_timestamp{std::nullopt};

// may_have_tombstone_records is `true` by default, until compaction
// deduplication/segment data copying is performed and it is proven that
// the segment does not contain any tombstone records.
bool may_have_tombstone_records{true};

size_t size() const;

bool empty() const;
Expand Down
47 changes: 40 additions & 7 deletions src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@ ss::future<model::offset> build_offset_map(
cfg.asrc->check();
}
auto seg = *iter;
if (seg->index().has_clean_compact_timestamp()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just making sure, is there anywhere in code comments that describes what it means to be clean compacted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1, 2

// This segment has already been fully deduplicated, so building the
// offset map for it would be pointless.
vlog(
gclog.trace,
"segment is already cleanly compacted, no need to add it to the "
"offset_map: {}",
seg->filename());

min_segment_fully_indexed = seg->offsets().get_base_offset();

if (iter == segs.begin()) {
break;
} else {
--iter;
continue;
}
}
vlog(gclog.trace, "Adding segment to offset map: {}", seg->filename());

try {
Expand Down Expand Up @@ -166,12 +184,14 @@ ss::future<index_state> deduplicate_segment(
auto compaction_placeholder_enabled = feature_table.local().is_active(
features::feature::compaction_placeholder_batch);

const std::optional<model::timestamp> tombstone_delete_horizon
= internal::get_tombstone_delete_horizon(seg, cfg);
const bool past_tombstone_delete_horizon
= internal::is_past_tombstone_delete_horizon(seg, cfg);
bool may_have_tombstone_records = false;
auto copy_reducer = internal::copy_data_segment_reducer(
[&map,
&may_have_tombstone_records,
segment_last_offset = seg->offsets().get_committed_offset(),
tombstone_delete_horizon = tombstone_delete_horizon,
past_tombstone_delete_horizon,
compaction_placeholder_enabled](
const model::record_batch& b,
const model::record& r,
Expand All @@ -190,13 +210,20 @@ ss::future<index_state> deduplicate_segment(
b.header());
return ss::make_ready_future<bool>(true);
}
// Potentially deal with tombstone record removal
if (internal::should_remove_tombstone_record(
r, tombstone_delete_horizon)) {

// Deal with tombstone record removal
if (r.is_tombstone() && past_tombstone_delete_horizon) {
return ss::make_ready_future<bool>(false);
}

return should_keep(map, b, r);
return should_keep(map, b, r).then(
[&may_have_tombstone_records,
is_tombstone = r.is_tombstone()](bool keep) {
if (is_tombstone && keep) {
may_have_tombstone_records = true;
}
return keep;
});
},
&appender,
seg->path().is_internal_topic(),
Expand All @@ -208,8 +235,14 @@ ss::future<index_state> deduplicate_segment(

auto new_idx = co_await std::move(rdr).consume(
std::move(copy_reducer), model::no_timeout);

// restore broker timestamp and clean compact timestamp
new_idx.broker_timestamp = seg->index().broker_timestamp();
new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp();

// Set may_have_tombstone_records
new_idx.may_have_tombstone_records = may_have_tombstone_records;

co_return new_idx;
}

Expand Down
9 changes: 7 additions & 2 deletions src/v/storage/segment_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ segment_index::segment_index(
ss::sharded<features::feature_table>& feature_table,
std::optional<ntp_sanitizer_config> sanitizer_config,
std::optional<model::timestamp> broker_timestamp,
std::optional<model::timestamp> clean_compact_timestamp)
std::optional<model::timestamp> clean_compact_timestamp,
bool may_have_tombstone_records)
: _path(std::move(path))
, _step(step)
, _feature_table(std::ref(feature_table))
Expand All @@ -57,6 +58,7 @@ segment_index::segment_index(
_state.base_offset = base;
_state.broker_timestamp = broker_timestamp;
_state.clean_compact_timestamp = clean_compact_timestamp;
_state.may_have_tombstone_records = may_have_tombstone_records;
}

segment_index::segment_index(
Expand Down Expand Up @@ -88,15 +90,18 @@ ss::future<ss::file> segment_index::open() {
}

void segment_index::reset() {
// Persist the base offset and clean compaction timestamp through a reset.
// Persist the base offset, clean compaction timestamp, and tombstones
// identifier through a reset.
auto base = _state.base_offset;
auto clean_compact_timestamp = _state.clean_compact_timestamp;
auto may_have_tombstone_records = _state.may_have_tombstone_records;

_state = index_state::make_empty_index(
storage::internal::should_apply_delta_time_offset(_feature_table));

_state.base_offset = base;
_state.clean_compact_timestamp = clean_compact_timestamp;
_state.may_have_tombstone_records = may_have_tombstone_records;

_acc = 0;
}
Expand Down
Loading