Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-7229] storage: add tombstone deletion implementation to local storage compaction #23231

Merged
merged 13 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/storage/segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ bool segment::may_have_compactible_records() const {
// that there were no data records, so err on the side of caution.
return true;
}
return num_compactible_records.value() > 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is mostly due to historical reasons, in which we didn't
want to have completely empty segments post compaction. This issue is solved
by the placeholder feature.

What is the "placeholder" feature?

return num_compactible_records.value() > 0;
}

} // namespace storage
73 changes: 63 additions & 10 deletions src/v/storage/tests/segment_deduplication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// by the Apache License, Version 2.0

#include "gmock/gmock.h"
#include "model/record_batch_types.h"
#include "model/tests/random_batch.h"
#include "random/generators.h"
#include "storage/chunk_cache.h"
#include "storage/disk_log_impl.h"
Expand Down Expand Up @@ -122,8 +124,6 @@ TEST(FindSlidingRangeTest, TestCollectExcludesPrevious) {
ASSERT_EQ(segs.front()->offsets().get_base_offset(), model::offset{0});
}

// Even though segments with one record would be skipped over during
// compaction, that shouldn't be reflected by the sliding range.
TEST(FindSlidingRangeTest, TestCollectOneRecordSegments) {
storage::disk_log_builder b;
build_segments(
Expand All @@ -140,12 +140,11 @@ TEST(FindSlidingRangeTest, TestCollectOneRecordSegments) {
ss::default_priority_class(),
never_abort);
auto segs = disk_log.find_sliding_range(cfg);
// Even though these segments don't have compactible records, they should
// be collected. E.g., they should still be self compacted to rebuild
// indexes if necessary, etc.
ASSERT_EQ(5, segs.size());

// These segments are considered to have compactible records.
for (const auto& seg : segs) {
ASSERT_FALSE(seg->may_have_compactible_records());
ASSERT_TRUE(seg->may_have_compactible_records());
}
WillemKauf marked this conversation as resolved.
Show resolved Hide resolved

// Add some segments with multiple records. They should be eligible for
Expand All @@ -158,11 +157,65 @@ TEST(FindSlidingRangeTest, TestCollectOneRecordSegments) {
/*mark_compacted=*/false);
segs = disk_log.find_sliding_range(cfg);
ASSERT_EQ(8, segs.size());
int i = 0;
for (const auto& seg : segs) {
bool should_have_records = i >= 5;
ASSERT_EQ(should_have_records, seg->may_have_compactible_records());
i++;
ASSERT_TRUE(seg->may_have_compactible_records());
}
}

TEST(FindSlidingRangeTest, TestPlaceholderBatchesNoCompactibleRecords) {
storage::disk_log_builder b;
b | start();
const int num_placeholder_batches = 3;
std::vector<int> offsets = {10, 17, 25};
ASSERT_EQ(offsets.size(), num_placeholder_batches);
for (int i = 0; i < num_placeholder_batches; ++i) {
auto placeholder_batch = model::test::make_random_batch(
model::offset{offsets[i]},
2,
false,
model::record_batch_type::compaction_placeholder);
b | add_segment(offsets[i]) | add_batch(std::move(placeholder_batch));
}
auto& disk_log = b.get_disk_log_impl();
auto cleanup = ss::defer([&] { b.stop().get(); });
compaction_config cfg(
model::offset{30},
std::nullopt,
ss::default_priority_class(),
never_abort);

ASSERT_EQ(disk_log.segment_count(), num_placeholder_batches);

// None of the segments should be included in the sliding range.
auto segs = disk_log.find_sliding_range(cfg);
ASSERT_EQ(0, segs.size());

for (const auto& seg : disk_log.segments()) {
ASSERT_FALSE(seg->may_have_compactible_records());
}
}

TEST(FindSlidingRangeTest, TestEmptySegmentNoCompactibleRecords) {
storage::disk_log_builder b;
b | start();
b | add_segment(0);
auto& disk_log = b.get_disk_log_impl();
auto cleanup = ss::defer([&] { b.stop().get(); });
compaction_config cfg(
model::offset{30},
std::nullopt,
ss::default_priority_class(),
never_abort);

ASSERT_EQ(disk_log.segment_count(), 1);

// The single closed, empty segment shouldn't be included in the sliding
// range.
auto segs = disk_log.find_sliding_range(cfg);
ASSERT_EQ(0, segs.size());

for (const auto& seg : disk_log.segments()) {
ASSERT_FALSE(seg->may_have_compactible_records());
}
}

Expand Down
84 changes: 57 additions & 27 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3217,6 +3217,8 @@ struct compact_test_args {
long msg_per_segment;
long segments;
long expected_compacted_segments;
long num_expected_gaps;
bool keys_use_segment_id;
};

static void
Expand All @@ -3239,36 +3241,42 @@ do_compact_test(const compact_test_args args, storage_test_fixture& f) {
auto log = mgr.manage(std::move(ntp_cfg)).get0();
auto disk_log = log;

auto append_batch =
[](ss::shared_ptr<storage::log> log, model::term_id term) {
iobuf key = bytes_to_iobuf(bytes("key"));
iobuf value = random_generators::make_iobuf(100);
auto append_batch = [](
ss::shared_ptr<storage::log> log,
model::term_id term,
std::optional<int> segment_id = std::nullopt) {
const auto key_str = ssx::sformat("key{}", segment_id.value_or(-1));
iobuf key = bytes_to_iobuf(bytes(key_str.data()));
iobuf value = random_generators::make_iobuf(100);

storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));

builder.add_raw_kv(key.copy(), value.copy());
builder.add_raw_kv(key.copy(), value.copy());

auto batch = std::move(builder).build();
auto batch = std::move(builder).build();

batch.set_term(term);
batch.header().first_timestamp = model::timestamp::now();
auto reader = model::make_memory_record_batch_reader(
{std::move(batch)});
storage::log_append_config cfg{
.should_fsync = storage::log_append_config::fsync::no,
.io_priority = ss::default_priority_class(),
.timeout = model::no_timeout,
};
batch.set_term(term);
batch.header().first_timestamp = model::timestamp::now();
auto reader = model::make_memory_record_batch_reader(
{std::move(batch)});
storage::log_append_config cfg{
.should_fsync = storage::log_append_config::fsync::no,
.io_priority = ss::default_priority_class(),
.timeout = model::no_timeout,
};

std::move(reader)
.for_each_ref(log->make_appender(cfg), cfg.timeout)
.get();
};
std::move(reader)
.for_each_ref(log->make_appender(cfg), cfg.timeout)
.get();
};

for (int s = 0; s < args.segments; s++) {
std::optional<int> key = args.keys_use_segment_id
? s
: std::optional<int>{};
for (int i = 0; i < args.msg_per_segment; i++) {
append_batch(log, model::term_id(0));
append_batch(log, model::term_id(0), key);
}
disk_log->force_roll(ss::default_priority_class()).get0();
}
Expand Down Expand Up @@ -3297,9 +3305,11 @@ do_compact_test(const compact_test_args args, storage_test_fixture& f) {
BOOST_REQUIRE_EQUAL(
final_stats.committed_offset, args.segments * args.msg_per_segment);

// we used the same key for all messages, so we should have one huge gap at
// the beginning of each compacted segment
BOOST_REQUIRE_EQUAL(final_gaps.num_gaps, args.expected_compacted_segments);
// If we used keys with segment IDs for records in each segment, we
// should have one huge gap at the beginning of each compacted segment.
// If we used the same key for each record, we should only expect one gap
// after compaction runs across the entire window of segments.
BOOST_REQUIRE_EQUAL(final_gaps.num_gaps, args.num_expected_gaps);
BOOST_REQUIRE_EQUAL(final_gaps.first_gap_start, model::offset(0));

// If adjacent segment compaction worked in order from oldest to newest, we
Expand All @@ -3325,7 +3335,9 @@ FIXTURE_TEST(test_max_compact_offset_mid_segment, storage_test_fixture) {
.num_compactable_msg = 100,
.msg_per_segment = 100,
.segments = 3,
.expected_compacted_segments = 1},
.expected_compacted_segments = 1,
.num_expected_gaps = 1,
.keys_use_segment_id = false},
*this);
}

Expand All @@ -3338,7 +3350,25 @@ FIXTURE_TEST(test_max_compact_offset_unset, storage_test_fixture) {
.num_compactable_msg = 200,
.msg_per_segment = 100,
.segments = 3,
.expected_compacted_segments = 3},
.expected_compacted_segments = 3,
.num_expected_gaps = 1,
.keys_use_segment_id = false},
*this);
}

FIXTURE_TEST(
test_max_compact_offset_unset_use_segment_ids, storage_test_fixture) {
// Use segment IDs for keys, thereby preventing compaction from reducing
// down to just one record in the last segment (each segment will have 1,
// unique record)
do_compact_test(
{.max_compact_offs = model::offset::max(),
.num_compactable_msg = 200,
.msg_per_segment = 100,
.segments = 3,
.expected_compacted_segments = 3,
.num_expected_gaps = 3,
.keys_use_segment_id = true},
*this);
}

Expand Down