Skip to content

Commit

Permalink
Merge pull request #23380 from WillemKauf/compaction_enhancements
Browse files Browse the repository at this point in the history
`storage`: compaction enhancements
  • Loading branch information
WillemKauf authored Sep 24, 2024
2 parents d8ea521 + 2626003 commit d57dc7c
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 119 deletions.
100 changes: 71 additions & 29 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,48 +500,56 @@ ss::future<> disk_log_impl::adjacent_merge_compact(

segment_set disk_log_impl::find_sliding_range(
const compaction_config& cfg, std::optional<model::offset> new_start_offset) {
if (
_last_compaction_window_start_offset.has_value()
&& (_last_compaction_window_start_offset.value()
<= _segs.front()->offsets().get_base_offset())) {
// If this evaluates to true, it is likely because local retention has
// removed segments. e.g, segments ([0],[1],[2]), have been garbage
// collected to segments ([2]), while _last_window_compaction_offset
// == 1. To avoid compaction getting stuck in this situation, we reset
// the compaction window offset here.
vlog(
gclog.debug,
"[{}] start offset ({}) <= base offset of front segment ({}), "
"resetting compaction window start offset",
config().ntp(),
_last_compaction_window_start_offset.value(),
_segs.front()->offsets().get_base_offset());

_last_compaction_window_start_offset.reset();
}

// Collect all segments that have stable data.
segment_set::underlying_t buf;
for (const auto& seg : _segs) {
if (seg->has_appender() || !seg->has_compactible_offsets(cfg)) {
// Stop once we get to an unstable segment.
break;
}
if (
_last_compaction_window_start_offset.has_value()
&& (seg->offsets().get_base_offset()
>= _last_compaction_window_start_offset.value())) {
// Force clean segment production by compacting down to the
// start of the log before considering new segments in the
// compaction window.
break;
}
if (
new_start_offset
&& seg->offsets().get_base_offset() < new_start_offset.value()) {
// Skip over segments that are being truncated.
continue;
}

buf.emplace_back(seg);
}
segment_set segs(std::move(buf));
if (segs.empty()) {
return segs;
}

// If a previous sliding window compaction ran, and there are no new
// segments, segments at the start of that window and above have been
// fully deduplicated and don't need to be compacted further.
//
// If there are new segments that have not been compacted, we can't make
// this claim, and compact everything again.
if (
segs.back()->finished_windowed_compaction()
&& _last_compaction_window_start_offset.has_value()) {
while (!segs.empty()) {
if (
segs.back()->offsets().get_base_offset()
>= _last_compaction_window_start_offset.value()) {
// A previous compaction deduplicated the keys above this
// offset. As such, segments above this point would not benefit
// from being included in the compaction window.
segs.pop_back();
} else {
break;
}
}
}
return segs;
}

Expand All @@ -559,9 +567,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
if (cfg.asrc) {
cfg.asrc->check();
}
if (seg->finished_self_compaction()) {
continue;
}

auto result = co_await storage::internal::self_compact_segment(
seg,
_stm_manager,
Expand All @@ -571,6 +577,10 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
_manager.resources(),
_feature_table);

if (result.did_compact() == false) {
continue;
}

vlog(
gclog.debug,
"[{}] segment {} self compaction result: {}",
Expand All @@ -579,6 +589,17 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
result);
has_self_compacted = true;
}

// Remove any of the segments that have already been cleanly compacted. They
// would be no-ops to compact.
while (!segs.empty()) {
if (segs.back()->index().has_clean_compact_timestamp()) {
segs.pop_back();
} else {
break;
}
}

// Remove any of the beginning segments that don't have any
// compactible records. They would be no-ops to compact.
while (!segs.empty()) {
Expand All @@ -592,7 +613,12 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
segs.pop_front();
}
if (segs.empty()) {
vlog(gclog.debug, "[{}] no more segments to compact", config().ntp());
vlog(
gclog.debug,
"[{}] no segments left in sliding window to compact (all segments "
"were already cleanly compacted, or did not have any compactible "
"records)",
config().ntp());
co_return has_self_compacted;
}
vlog(
Expand Down Expand Up @@ -641,6 +667,23 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
idx_start_offset,
map.max_offset());

std::optional<model::offset> next_window_start_offset = idx_start_offset;
if (idx_start_offset == segs.front()->offsets().get_base_offset()) {
// We have cleanly compacted up to the first segment in the sliding
// range (not necessarily equivalent to the first segment in the log-
// segments may have been removed from the sliding range if they were
// already cleanly compacted or had no compactible offsets). Reset the
// start offset to allow new segments into the sliding window range.
vlog(
gclog.debug,
"[{}] fully de-duplicated up to start of sliding range with offset "
"{}, resetting sliding window start offset",
config().ntp(),
idx_start_offset);

next_window_start_offset.reset();
}

auto segment_modify_lock = co_await _segment_rewrite_lock.get_units();
for (auto& seg : segs) {
if (cfg.asrc) {
Expand Down Expand Up @@ -792,7 +835,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
gclog.debug, "[{}] Final compacted segment {}", config().ntp(), seg);
}

_last_compaction_window_start_offset = idx_start_offset;
_last_compaction_window_start_offset = next_window_start_offset;

co_return true;
}
Expand Down Expand Up @@ -1269,8 +1312,7 @@ ss::future<> disk_log_impl::do_compact(
bool compacted = did_compact_fut.get();
if (!compacted) {
// If sliding window compaction did not occur, we fall back to adjacent
// segment compaction (as self compaction of segments occured in
// sliding_window_compact()).
// segment compaction.
co_await compact_adjacent_segments(compact_cfg);
}
}
Expand Down
17 changes: 15 additions & 2 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,28 @@ class disk_log_impl final : public log {
std::optional<model::offset> retention_offset(gc_config) const final;

// Collects an iterable list of segments over which to perform sliding
// window compaction.
// window compaction. This can include segments which have already had their
// keys de-duplicated in every segment between the start of the log and
// themselves (these are referred to as "clean" segments). These segments
// would be no-ops to include in sliding window compaction, but they are
// included in the range anyways in order to allow for timely tombstone
// record removal via self-compaction, and to ensure that this function
// returns a contiguous range of segments. It is up to the caller to filter
// out these already cleanly-compacted segments.
segment_set find_sliding_range(
const compaction_config& cfg,
std::optional<model::offset> new_start_offset = std::nullopt);

void set_last_compaction_window_start_offset(model::offset o) {
void
set_last_compaction_window_start_offset(std::optional<model::offset> o) {
_last_compaction_window_start_offset = o;
}

const std::optional<model::offset>&
get_last_compaction_window_start_offset() const {
return _last_compaction_window_start_offset;
}

readers_cache& readers() { return *_readers_cache; }

storage_resources& resources();
Expand Down
10 changes: 9 additions & 1 deletion src/v/storage/index_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ std::ostream& operator<<(std::ostream& o, const index_state& s) {
<< ", num_compactible_records_appended:"
<< s.num_compactible_records_appended
<< ", clean_compact_timestamp:" << s.clean_compact_timestamp
<< ", may_have_tombstone_records:" << s.may_have_tombstone_records
<< ", index(" << s.relative_offset_index.size() << ","
<< s.relative_time_index.size() << "," << s.position_index.size()
<< ")}";
Expand All @@ -184,6 +185,7 @@ void index_state::serde_write(iobuf& out) const {
write(tmp, broker_timestamp);
write(tmp, num_compactible_records_appended);
write(tmp, clean_compact_timestamp);
write(tmp, may_have_tombstone_records);

crc::crc32c crc;
crc_extend_iobuf(crc, tmp);
Expand Down Expand Up @@ -285,6 +287,11 @@ void read_nested(
} else {
st.clean_compact_timestamp = std::nullopt;
}
if (hdr._version >= index_state::may_have_tombstone_records_version) {
read_nested(p, st.may_have_tombstone_records, 0U);
} else {
st.may_have_tombstone_records = true;
}
}

index_state index_state::copy() const { return *this; }
Expand Down Expand Up @@ -365,7 +372,8 @@ index_state::index_state(const index_state& o) noexcept
, non_data_timestamps(o.non_data_timestamps)
, broker_timestamp(o.broker_timestamp)
, num_compactible_records_appended(o.num_compactible_records_appended)
, clean_compact_timestamp(o.clean_compact_timestamp) {}
, clean_compact_timestamp(o.clean_compact_timestamp)
, may_have_tombstone_records(o.may_have_tombstone_records) {}

namespace serde_compat {
uint64_t index_state_serde::checksum(const index_state& r) {
Expand Down
8 changes: 7 additions & 1 deletion src/v/storage/index_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ class offset_time_index {
1 byte - non_data_timestamps
*/
struct index_state
: serde::envelope<index_state, serde::version<8>, serde::compat_version<4>> {
: serde::envelope<index_state, serde::version<9>, serde::compat_version<4>> {
static constexpr auto monotonic_timestamps_version = 5;
static constexpr auto broker_timestamp_version = 6;
static constexpr auto num_compactible_records_version = 7;
static constexpr auto clean_compact_timestamp_version = 8;
static constexpr auto may_have_tombstone_records_version = 9;

static index_state make_empty_index(offset_delta_time with_offset);

Expand Down Expand Up @@ -138,6 +139,11 @@ struct index_state
// every previous record in the log.
std::optional<model::timestamp> clean_compact_timestamp{std::nullopt};

// may_have_tombstone_records is `true` by default, until compaction
// deduplication/segment data copying is performed and it is proven that
// the segment does not contain any tombstone records.
bool may_have_tombstone_records{true};

size_t size() const;

bool empty() const;
Expand Down
47 changes: 40 additions & 7 deletions src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@ ss::future<model::offset> build_offset_map(
cfg.asrc->check();
}
auto seg = *iter;
if (seg->index().has_clean_compact_timestamp()) {
// This segment has already been fully deduplicated, so building the
// offset map for it would be pointless.
vlog(
gclog.trace,
"segment is already cleanly compacted, no need to add it to the "
"offset_map: {}",
seg->filename());

min_segment_fully_indexed = seg->offsets().get_base_offset();

if (iter == segs.begin()) {
break;
} else {
--iter;
continue;
}
}
vlog(gclog.trace, "Adding segment to offset map: {}", seg->filename());

try {
Expand Down Expand Up @@ -166,12 +184,14 @@ ss::future<index_state> deduplicate_segment(
auto compaction_placeholder_enabled = feature_table.local().is_active(
features::feature::compaction_placeholder_batch);

const std::optional<model::timestamp> tombstone_delete_horizon
= internal::get_tombstone_delete_horizon(seg, cfg);
const bool past_tombstone_delete_horizon
= internal::is_past_tombstone_delete_horizon(seg, cfg);
bool may_have_tombstone_records = false;
auto copy_reducer = internal::copy_data_segment_reducer(
[&map,
&may_have_tombstone_records,
segment_last_offset = seg->offsets().get_committed_offset(),
tombstone_delete_horizon = tombstone_delete_horizon,
past_tombstone_delete_horizon,
compaction_placeholder_enabled](
const model::record_batch& b,
const model::record& r,
Expand All @@ -190,13 +210,20 @@ ss::future<index_state> deduplicate_segment(
b.header());
return ss::make_ready_future<bool>(true);
}
// Potentially deal with tombstone record removal
if (internal::should_remove_tombstone_record(
r, tombstone_delete_horizon)) {

// Deal with tombstone record removal
if (r.is_tombstone() && past_tombstone_delete_horizon) {
return ss::make_ready_future<bool>(false);
}

return should_keep(map, b, r);
return should_keep(map, b, r).then(
[&may_have_tombstone_records,
is_tombstone = r.is_tombstone()](bool keep) {
if (is_tombstone && keep) {
may_have_tombstone_records = true;
}
return keep;
});
},
&appender,
seg->path().is_internal_topic(),
Expand All @@ -208,8 +235,14 @@ ss::future<index_state> deduplicate_segment(

auto new_idx = co_await std::move(rdr).consume(
std::move(copy_reducer), model::no_timeout);

// restore broker timestamp and clean compact timestamp
new_idx.broker_timestamp = seg->index().broker_timestamp();
new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp();

// Set may_have_tombstone_records
new_idx.may_have_tombstone_records = may_have_tombstone_records;

co_return new_idx;
}

Expand Down
9 changes: 7 additions & 2 deletions src/v/storage/segment_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ segment_index::segment_index(
ss::sharded<features::feature_table>& feature_table,
std::optional<ntp_sanitizer_config> sanitizer_config,
std::optional<model::timestamp> broker_timestamp,
std::optional<model::timestamp> clean_compact_timestamp)
std::optional<model::timestamp> clean_compact_timestamp,
bool may_have_tombstone_records)
: _path(std::move(path))
, _step(step)
, _feature_table(std::ref(feature_table))
Expand All @@ -57,6 +58,7 @@ segment_index::segment_index(
_state.base_offset = base;
_state.broker_timestamp = broker_timestamp;
_state.clean_compact_timestamp = clean_compact_timestamp;
_state.may_have_tombstone_records = may_have_tombstone_records;
}

segment_index::segment_index(
Expand Down Expand Up @@ -88,15 +90,18 @@ ss::future<ss::file> segment_index::open() {
}

void segment_index::reset() {
// Persist the base offset and clean compaction timestamp through a reset.
// Persist the base offset, clean compaction timestamp, and tombstones
// identifier through a reset.
auto base = _state.base_offset;
auto clean_compact_timestamp = _state.clean_compact_timestamp;
auto may_have_tombstone_records = _state.may_have_tombstone_records;

_state = index_state::make_empty_index(
storage::internal::should_apply_delta_time_offset(_feature_table));

_state.base_offset = base;
_state.clean_compact_timestamp = clean_compact_timestamp;
_state.may_have_tombstone_records = may_have_tombstone_records;

_acc = 0;
}
Expand Down
Loading

0 comments on commit d57dc7c

Please sign in to comment.