Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] storage: plumb abort source to compaction copy reducer #21422

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ copy_data_segment_reducer::operator()(model::record_batch b) {
if (_inject_failure) {
throw std::runtime_error("injected error");
}
if (_as) {
_as->check();
}
const auto comp = b.header().attrs.compression();
if (!b.compressed()) {
co_return co_await filter_and_append(comp, std::move(b));
Expand Down
10 changes: 8 additions & 2 deletions src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,16 @@ class copy_data_segment_reducer : public compaction_reducer {
offset_delta_time apply_offset,
model::offset segment_last_offset,
compacted_index_writer* cidx = nullptr,
bool inject_failure = false)
bool inject_failure = false,
ss::abort_source* as = nullptr)
: _should_keep_fn(std::move(f))
, _segment_last_offset(segment_last_offset)
, _appender(a)
, _compacted_idx(cidx)
, _idx(index_state::make_empty_index(apply_offset))
, _internal_topic(internal_topic)
, _inject_failure(inject_failure) {}
, _inject_failure(inject_failure)
, _as(as) {}

ss::future<ss::stop_iteration> operator()(model::record_batch);
storage::index_state end_of_stream() { return std::move(_idx); }
Expand Down Expand Up @@ -174,6 +176,10 @@ class copy_data_segment_reducer : public compaction_reducer {

/// If set to true, will throw an exception on operator().
bool _inject_failure;

/// Allows the reducer to stop early, e.g. in case the partition is being
/// shut down.
ss::abort_source* _as;
};

class index_rebuilder_reducer : public compaction_reducer {
Expand Down
3 changes: 2 additions & 1 deletion src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ ss::future<index_state> deduplicate_segment(
should_offset_delta_times,
seg->offsets().get_committed_offset(),
&cmp_idx_writer,
inject_reader_failure);
inject_reader_failure,
cfg.asrc);

auto new_idx = co_await std::move(rdr).consume(
std::move(copy_reducer), model::no_timeout);
Expand Down
7 changes: 6 additions & 1 deletion src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,10 @@ ss::future<storage::index_state> do_copy_segment_data(
appender.get(),
seg->path().is_internal_topic(),
apply_offset,
segment_last_offset);
segment_last_offset,
/*cidx=*/nullptr,
/*inject_failure=*/false,
cfg.asrc);

// create the segment, get the in-memory index for the new segment
auto new_index = co_await create_segment_full_reader(
Expand Down Expand Up @@ -534,6 +537,8 @@ ss::future<std::optional<size_t>> do_self_compact_segment(
resources,
apply_offset,
feature_table);
vlog(
gclog.trace, "finished copying segment data for {}", s->reader().path());

auto rdr_holder = co_await readers_cache.evict_segment_readers(s);

Expand Down
Loading