Skip to content

Commit

Permalink
compaction: plumb feature table into self_compact_segment
Browse files Browse the repository at this point in the history
to be used by the next commit.
  • Loading branch information
bharathv committed Feb 21, 2024
1 parent 639cd6b commit 5ad9e14
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 14 deletions.
10 changes: 5 additions & 5 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ ss::future<> disk_log_impl::adjacent_merge_compact(
*_probe,
*_readers_cache,
_manager.resources(),
storage::internal::should_apply_delta_time_offset(_feature_table));
_feature_table);

vlog(
gclog.debug,
Expand Down Expand Up @@ -549,7 +549,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
*_probe,
*_readers_cache,
_manager.resources(),
storage::internal::should_apply_delta_time_offset(_feature_table));
_feature_table);

vlog(
gclog.debug,
Expand Down Expand Up @@ -692,8 +692,8 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
*appender,
compacted_idx_writer,
*_probe,
storage::internal::should_apply_delta_time_offset(
_feature_table));
storage::internal::should_apply_delta_time_offset(_feature_table),
_feature_table);

} catch (...) {
eptr = std::current_exception();
Expand Down Expand Up @@ -910,7 +910,7 @@ ss::future<compaction_result> disk_log_impl::compact_adjacent_segments(
*_probe,
*_readers_cache,
_manager.resources(),
storage::internal::should_apply_delta_time_offset(_feature_table));
_feature_table);
_probe->delete_segment(*replacement.get());
vlog(gclog.debug, "Final compacted segment {}", replacement);

Expand Down
3 changes: 2 additions & 1 deletion src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ ss::future<index_state> deduplicate_segment(
segment_appender& appender,
compacted_index_writer& cmp_idx_writer,
probe& probe,
offset_delta_time should_offset_delta_times) {
offset_delta_time should_offset_delta_times,
ss::sharded<features::feature_table>&) {
auto read_holder = co_await seg->read_lock();
if (seg->is_closed()) {
throw segment_closed_exception();
Expand Down
3 changes: 2 additions & 1 deletion src/v/storage/segment_deduplication_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ ss::future<index_state> deduplicate_segment(
segment_appender& appender,
compacted_index_writer& cmp_idx_writer,
storage::probe& probe,
offset_delta_time should_offset_delta_times);
offset_delta_time should_offset_delta_times,
ss::sharded<features::feature_table>&);

} // namespace storage
20 changes: 15 additions & 5 deletions src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ ss::future<storage::index_state> do_copy_segment_data(
storage::probe& pb,
ss::rwlock::holder rw_lock_holder,
storage_resources& resources,
offset_delta_time apply_offset) {
offset_delta_time apply_offset,
ss::sharded<features::feature_table>&) {
// preserve broker_timestamp from the segment's index
auto old_broker_timestamp = seg->index().broker_timestamp();

Expand Down Expand Up @@ -500,7 +501,8 @@ ss::future<std::optional<size_t>> do_self_compact_segment(
storage::readers_cache& readers_cache,
storage_resources& resources,
offset_delta_time apply_offset,
ss::rwlock::holder read_holder) {
ss::rwlock::holder read_holder,
ss::sharded<features::feature_table>& feature_table) {
vlog(gclog.trace, "self compacting segment {}", s->reader().path());
auto segment_generation = s->get_generation_id();

Expand All @@ -517,7 +519,13 @@ ss::future<std::optional<size_t>> do_self_compact_segment(
auto staging_to_clean = scoped_file_tracker{
cfg.files_to_cleanup, {staging_file}};
auto idx = co_await do_copy_segment_data(
s, cfg, pb, std::move(read_holder), resources, apply_offset);
s,
cfg,
pb,
std::move(read_holder),
resources,
apply_offset,
feature_table);

auto rdr_holder = co_await readers_cache.evict_segment_readers(s);

Expand Down Expand Up @@ -626,7 +634,7 @@ ss::future<compaction_result> self_compact_segment(
storage::probe& pb,
storage::readers_cache& readers_cache,
storage_resources& resources,
offset_delta_time apply_offset) {
ss::sharded<features::feature_table>& feature_table) {
if (s->has_appender()) {
throw std::runtime_error(fmt::format(
"Cannot compact an active segment. cfg:{} - segment:{}", cfg, s));
Expand Down Expand Up @@ -683,14 +691,16 @@ ss::future<compaction_result> self_compact_segment(
"Unexpected state {}",
int(state));
auto sz_before = s->size_bytes();
auto apply_offset = should_apply_delta_time_offset(feature_table);
auto sz_after = co_await do_self_compact_segment(
s,
cfg,
pb,
readers_cache,
resources,
apply_offset,
std::move(read_holder));
std::move(read_holder),
feature_table);
// compaction wasn't executed, return
if (!sz_after) {
co_return compaction_result(sz_before);
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/segment_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ ss::future<compaction_result> self_compact_segment(
storage::probe&,
storage::readers_cache&,
storage::storage_resources&,
offset_delta_time apply_offset);
ss::sharded<features::feature_table>& feature_table);

/// \brief, rebuilds a given segment's compacted index. This method acquires
/// locks on the segment.
Expand Down
10 changes: 9 additions & 1 deletion src/v/storage/tests/segment_deduplication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ using namespace storage;

namespace {
ss::abort_source never_abort;
ss::sharded<features::feature_table> feature_table;
} // anonymous namespace

// Builds a segment layout:
Expand Down Expand Up @@ -165,6 +166,13 @@ TEST(BuildOffsetMap, TestBuildSimpleMap) {
model::offset{30}, ss::default_priority_class(), never_abort);
probe pb;

feature_table.start().get();
feature_table
.invoke_on_all(
[](features::feature_table& f) { f.testing_activate_all(); })
.get();
auto defer = ss::defer([] { feature_table.stop().get(); });

// Self-compact each segment so we're left with compaction indices. This is
// a requirement to build the offset map.
for (auto& seg : segs) {
Expand All @@ -175,7 +183,7 @@ TEST(BuildOffsetMap, TestBuildSimpleMap) {
pb,
disk_log.readers(),
disk_log.resources(),
offset_delta_time::yes)
feature_table)
.get();
}

Expand Down

0 comments on commit 5ad9e14

Please sign in to comment.