Skip to content

Commit

Permalink
Merge pull request #24874 from andrwng/storage-compaction-always-merge
Browse files Browse the repository at this point in the history
storage: always schedule adjacent segment compaction
  • Loading branch information
andrwng authored Jan 28, 2025
2 parents 36c57ba + 08d0433 commit 8fafb35
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 39 deletions.
12 changes: 3 additions & 9 deletions src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,11 @@ struct reupload_fixture : public archiver_fixture {
manifest_view);
}

// XXX: rename this! It doesn't just run self compaction -- it performs
// houskeeping which may also merge segments.
ss::lw_shared_ptr<storage::segment> self_compact_next_segment(
model::offset max_collectible = model::offset::max()) {
auto& seg_set = disk_log_impl()->segments();
auto size_before = seg_set.size();

disk_log_impl()
->housekeeping(storage::housekeeping_config{
Expand All @@ -241,13 +242,6 @@ struct reupload_fixture : public archiver_fixture {
abort_source})
.get();

auto size_after = seg_set.size();

// We are only looking to trigger self-compaction here.
// If the segment count reduced, adjacent segment compaction must
// have occurred.
BOOST_REQUIRE_EQUAL(size_before, size_after);

ss::lw_shared_ptr<storage::segment> last_compacted_segment;
for (auto& i : seg_set) {
if (i->finished_self_compaction()) {
Expand Down Expand Up @@ -448,11 +442,11 @@ FIXTURE_TEST(
std::stringstream st;
stm_manifest.serialize_json(st);
vlog(test_log.debug, "manifest: {}", st.str());
verify_segment_request("500-1-v1.log", stm_manifest);

BOOST_REQUIRE_EQUAL(
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{999});
auto replaced = stm_manifest.replaced_segments();
BOOST_REQUIRE_EQUAL(replaced.size(), 1);
BOOST_REQUIRE_EQUAL(replaced[0].base_offset, model::offset{500});
}

Expand Down
14 changes: 8 additions & 6 deletions src/v/cluster/archival/tests/segment_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {

auto first = spec.segment_starts.begin();
auto second = std::next(first);
// Segment boundaries: [5, 14][15, 24][25, 34][35, 50]...
for (; second != spec.segment_starts.end(); ++first, ++second) {
b | storage::add_segment(*first);
for (auto curr_offset = *first; curr_offset < *second; ++curr_offset) {
Expand All @@ -843,19 +844,18 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
.get();
}
auto seg = b.get_log_segments().back();
seg->appender().close().get();
seg->release_appender().get();
seg->release_appender(&b.get_disk_log_impl().readers()).get();
}

b | storage::add_segment(*first)
| storage::add_random_batch(*first, spec.last_segment_num_records);

// Compaction will rewrite each segment.
// Compaction will rewrite each segment, and merge the first two.
// Segment boundaries: [5, 24][25, 34][35, 50]...
b.gc(model::timestamp::max(), std::nullopt).get();

size_t max_size = b.get_segment(0).size_bytes()
+ b.get_segment(1).size_bytes()
+ b.get_segment(2).size_bytes();
+ b.get_segment(1).size_bytes();
archival::segment_collector collector{
model::offset{5}, m, b.get_disk_log_impl(), max_size};

Expand All @@ -868,6 +868,8 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
ss::default_priority_class(), segment_lock_timeout)
.get());

// The upload candidate should align with the manifest's segment
// boundaries.
auto upload_candidate = upload_with_locks.candidate;
BOOST_REQUIRE(!upload_candidate.sources.empty());
BOOST_REQUIRE_EQUAL(upload_candidate.starting_offset, model::offset{10});
Expand All @@ -885,7 +887,7 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
BOOST_REQUIRE_EQUAL(
expected_content_length, upload_candidate.content_length);

BOOST_REQUIRE_EQUAL(upload_with_locks.read_locks.size(), 3);
BOOST_REQUIRE_EQUAL(upload_with_locks.read_locks.size(), 2);
}

SEASTAR_THREAD_TEST_CASE(test_same_size_reupload_skipped) {
Expand Down
12 changes: 7 additions & 5 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1208,11 +1208,13 @@ ss::future<> disk_log_impl::do_compact(
std::rethrow_exception(eptr);
}
bool compacted = did_compact_fut.get();
if (!compacted) {
// If sliding window compaction did not occur, we fall back to adjacent
// segment compaction.
co_await compact_adjacent_segments(compact_cfg);
}
vlog(
gclog.debug,
"Sliding compaction of {} did {}compact data, proceeding to adjacent "
"segment compaction",
config().ntp(),
compacted ? "" : "not ");
co_await compact_adjacent_segments(compact_cfg);
}

ss::future<bool> disk_log_impl::chunked_sliding_window_compact(
Expand Down
28 changes: 9 additions & 19 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1822,22 +1822,21 @@ FIXTURE_TEST(adjacent_segment_compaction, storage_test_fixture) {

// There are 4 segments, and the last is the active segments. The first two
// will merge, and the third will be compacted but not merged.

log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(log->segment_count(), 4);
BOOST_REQUIRE_EQUAL(log->segment_count(), 3);

// Check if it honors max_compactible offset by resetting it to the base
// offset of first segment. Nothing should be compacted.
const auto first_segment_offsets = log->segments().front()->offsets();
c_cfg.compact.max_collectible_offset
= first_segment_offsets.get_base_offset();
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(log->segment_count(), 4);

// The segment count will be reduced again.
c_cfg.compact.max_collectible_offset = model::offset::max();
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(log->segment_count(), 3);

// Now compact without restricting collectible offset. The segment count
// will be reduced again.
c_cfg.compact.max_collectible_offset = model::offset::max();
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(log->segment_count(), 2);

Expand Down Expand Up @@ -1890,9 +1889,6 @@ FIXTURE_TEST(adjacent_segment_compaction_terms, storage_test_fixture) {
as);

// compact all the individual segments
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6);

// the two segments with term 2 can be combined
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);
Expand Down Expand Up @@ -1965,21 +1961,15 @@ FIXTURE_TEST(max_adjacent_segment_compaction, storage_test_fixture) {
as);

// self compaction steps
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6);

// the first two segments are combined 2+2=4 < 6 MB
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);

// the new first and second are too big 4+5 > 6 MB but the second and third
// can be combined 5 + 15KB < 6 MB
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);

// then the next 16 KB can be folded in
log->housekeeping(c_cfg).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 3);
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);

// that's all that can be done. the next seg is an appender
log->housekeeping(c_cfg).get();
Expand Down Expand Up @@ -2180,16 +2170,16 @@ FIXTURE_TEST(compaction_backlog_calculation, storage_test_fixture) {
// self compaction steps
log->housekeeping(c_cfg).get();

BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);
auto new_backlog_size = log->compaction_backlog();
/**
* after all self segments are compacted they shouldn't be included into the
* backlog (only last segment is since it has appender and isn't self
* compacted)
*/
BOOST_REQUIRE_EQUAL(
BOOST_REQUIRE_LT(
new_backlog_size,
backlog_size - self_seg_compaction_sz + segments[4]->size_bytes());
backlog_size - self_seg_compaction_sz + segments[3]->size_bytes());
}

FIXTURE_TEST(not_compacted_log_backlog, storage_test_fixture) {
Expand Down

0 comments on commit 8fafb35

Please sign in to comment.