Skip to content

Commit

Permalink
storage: always schedule adjacent segment compaction
Browse files Browse the repository at this point in the history
We previously fell back on adjacent segment compaction only if there was
no new data to compact. In some situations, we've seen the rate of
incoming data outpace the compaction interval, causing segments to pile
up without ever being merged.

This change tweaks the logic to always run adjacent segment compaction
after running sliding window compaction.

Along the way, a couple tests needed to be tweaked to handle the fact
that housekeeping now may merge segments.
  • Loading branch information
andrwng committed Jan 24, 2025
1 parent 69e4666 commit 08d0433
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 39 deletions.
12 changes: 3 additions & 9 deletions src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,11 @@ struct reupload_fixture : public archiver_fixture {
manifest_view);
}

// XXX: rename this! It doesn't just run self compaction -- it performs
// houskeeping which may also merge segments.
ss::lw_shared_ptr<storage::segment> self_compact_next_segment(
model::offset max_collectible = model::offset::max()) {
auto& seg_set = disk_log_impl()->segments();
auto size_before = seg_set.size();

disk_log_impl()
->housekeeping(storage::housekeeping_config{
Expand All @@ -241,13 +242,6 @@ struct reupload_fixture : public archiver_fixture {
abort_source})
.get();

auto size_after = seg_set.size();

// We are only looking to trigger self-compaction here.
// If the segment count reduced, adjacent segment compaction must
// have occurred.
BOOST_REQUIRE_EQUAL(size_before, size_after);

ss::lw_shared_ptr<storage::segment> last_compacted_segment;
for (auto& i : seg_set) {
if (i->finished_self_compaction()) {
Expand Down Expand Up @@ -448,11 +442,11 @@ FIXTURE_TEST(
std::stringstream st;
stm_manifest.serialize_json(st);
vlog(test_log.debug, "manifest: {}", st.str());
verify_segment_request("500-1-v1.log", stm_manifest);

BOOST_REQUIRE_EQUAL(
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{999});
auto replaced = stm_manifest.replaced_segments();
BOOST_REQUIRE_EQUAL(replaced.size(), 1);
BOOST_REQUIRE_EQUAL(replaced[0].base_offset, model::offset{500});
}

Expand Down
14 changes: 8 additions & 6 deletions src/v/cluster/archival/tests/segment_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {

auto first = spec.segment_starts.begin();
auto second = std::next(first);
// Segment boundaries: [5, 14][15, 24][25, 34][35, 50]...
for (; second != spec.segment_starts.end(); ++first, ++second) {
b | storage::add_segment(*first);
for (auto curr_offset = *first; curr_offset < *second; ++curr_offset) {
Expand All @@ -843,19 +844,18 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
.get();
}
auto seg = b.get_log_segments().back();
seg->appender().close().get();
seg->release_appender().get();
seg->release_appender(&b.get_disk_log_impl().readers()).get();
}

b | storage::add_segment(*first)
| storage::add_random_batch(*first, spec.last_segment_num_records);

// Compaction will rewrite each segment.
// Compaction will rewrite each segment, and merge the first two.
// Segment boundaries: [5, 24][25, 34][35, 50]...
b.gc(model::timestamp::max(), std::nullopt).get();

size_t max_size = b.get_segment(0).size_bytes()
+ b.get_segment(1).size_bytes()
+ b.get_segment(2).size_bytes();
+ b.get_segment(1).size_bytes();
archival::segment_collector collector{
model::offset{5}, m, b.get_disk_log_impl(), max_size};

Expand All @@ -868,6 +868,8 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
ss::default_priority_class(), segment_lock_timeout)
.get());

// The upload candidate should align with the manifest's segment
// boundaries.
auto upload_candidate = upload_with_locks.candidate;
BOOST_REQUIRE(!upload_candidate.sources.empty());
BOOST_REQUIRE_EQUAL(upload_candidate.starting_offset, model::offset{10});
Expand All @@ -885,7 +887,7 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
BOOST_REQUIRE_EQUAL(
expected_content_length, upload_candidate.content_length);

BOOST_REQUIRE_EQUAL(upload_with_locks.read_locks.size(), 3);
BOOST_REQUIRE_EQUAL(upload_with_locks.read_locks.size(), 2);
}

SEASTAR_THREAD_TEST_CASE(test_same_size_reupload_skipped) {
Expand Down
12 changes: 7 additions & 5 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1317,11 +1317,13 @@ ss::future<> disk_log_impl::do_compact(
std::rethrow_exception(eptr);
}
bool compacted = did_compact_fut.get();
if (!compacted) {
// If sliding window compaction did not occur, we fall back to adjacent
// segment compaction.
co_await compact_adjacent_segments(compact_cfg);
}
vlog(
gclog.debug,
"Sliding compaction of {} did {}compact data, proceeding to adjacent "
"segment compaction",
config().ntp(),
compacted ? "" : "not ");
co_await compact_adjacent_segments(compact_cfg);
}

ss::future<> disk_log_impl::gc(gc_config cfg) {
Expand Down
28 changes: 9 additions & 19 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1822,22 +1822,21 @@ FIXTURE_TEST(adjacent_segment_compaction, storage_test_fixture) {

// There are 4 segments, and the last is the active segments. The first two
// will merge, and the third will be compacted but not merged.

log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(log->segment_count(), 4);
BOOST_REQUIRE_EQUAL(log->segment_count(), 3);

// Check if it honors max_compactible offset by resetting it to the base
// offset of first segment. Nothing should be compacted.
const auto first_segment_offsets = log->segments().front()->offsets();
c_cfg.compact.max_collectible_offset
= first_segment_offsets.get_base_offset();
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(log->segment_count(), 4);

// The segment count will be reduced again.
c_cfg.compact.max_collectible_offset = model::offset::max();
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(log->segment_count(), 3);

// Now compact without restricting collectible offset. The segment count
// will be reduced again.
c_cfg.compact.max_collectible_offset = model::offset::max();
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(log->segment_count(), 2);

Expand Down Expand Up @@ -1890,9 +1889,6 @@ FIXTURE_TEST(adjacent_segment_compaction_terms, storage_test_fixture) {
as);

// compact all the individual segments
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6);

// the two segments with term 2 can be combined
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);
Expand Down Expand Up @@ -1965,21 +1961,15 @@ FIXTURE_TEST(max_adjacent_segment_compaction, storage_test_fixture) {
as);

// self compaction steps
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6);

// the first two segments are combined 2+2=4 < 6 MB
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);

// the new first and second are too big 4+5 > 6 MB but the second and third
// can be combined 5 + 15KB < 6 MB
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);

// then the next 16 KB can be folded in
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 3);
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);

// that's all that can be done. the next seg is an appender
log->housekeeping(c_cfg).get();
Expand Down Expand Up @@ -2180,16 +2170,16 @@ FIXTURE_TEST(compaction_backlog_calculation, storage_test_fixture) {
// self compaction steps
log->housekeeping(c_cfg).get();

BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);
auto new_backlog_size = log->compaction_backlog();
/**
* after all self segments are compacted they shouldn't be included into the
* backlog (only last segment is since it has appender and isn't self
* compacted)
*/
BOOST_REQUIRE_EQUAL(
BOOST_REQUIRE_LT(
new_backlog_size,
backlog_size - self_seg_compaction_sz + segments[4]->size_bytes());
backlog_size - self_seg_compaction_sz + segments[3]->size_bytes());
}

FIXTURE_TEST(not_compacted_log_backlog, storage_test_fixture) {
Expand Down

0 comments on commit 08d0433

Please sign in to comment.