Skip to content

Commit

Permalink
compaction: coroutinze build_compaction_index
Browse files Browse the repository at this point in the history
no logic changes
  • Loading branch information
bharathv committed Feb 19, 2024
1 parent a5e4544 commit 639cd6b
Showing 1 changed file with 23 additions and 42 deletions.
65 changes: 23 additions & 42 deletions src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <seastar/core/seastar.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/when_all.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/defer.hh>

#include <absl/container/btree_map.h>
Expand Down Expand Up @@ -552,51 +553,31 @@ ss::future<std::optional<size_t>> do_self_compact_segment(
ss::future<> build_compaction_index(
model::record_batch_reader rdr,
ss::lw_shared_ptr<storage::stm_manager> stm_manager,
fragmented_vector<model::tx_range>&& aborted_txs,
fragmented_vector<model::tx_range> aborted_txs,
segment_full_path p,
compaction_config cfg,
storage_resources& resources) {
return make_compacted_index_writer(
p, cfg.iopc, resources, cfg.sanitizer_config)
.then([r = std::move(rdr), stm_manager, txs = std::move(aborted_txs)](
compacted_index_writer w) mutable {
return ss::do_with(
std::move(w),
[stm_manager, r = std::move(r), txs = std::move(txs)](
auto& writer) mutable {
return std::move(r)
.consume(
tx_reducer(stm_manager, std::move(txs), &writer),
model::no_timeout)
.then_wrapped(
[&writer](ss::future<tx_reducer::stats> fut) mutable {
if (fut.failed()) {
vlog(
gclog.error,
"Error rebuilding index: {}, {}",
writer.filename(),
fut.get_exception());
} else {
vlog(
gclog.info,
"tx reducer path: {} stats {}",
writer.filename(),
fut.get0());
}
})
.finally([&writer]() {
// writer needs to be closed in all cases,
// else can trigger a potential assert.
return writer.close().handle_exception(
[](std::exception_ptr e) {
vlog(
gclog.warn,
"error closing compacted index:{}",
e);
});
});
});
});
auto w = co_await make_compacted_index_writer(
p, cfg.iopc, resources, cfg.sanitizer_config);
auto reducer = tx_reducer(stm_manager, std::move(aborted_txs), &w);
auto index_builder = co_await ss::coroutine::as_future<tx_reducer::stats>(
std::move(rdr)
.consume(std::move(reducer), model::no_timeout)
.finally([&w] { return w.close(); }));
if (index_builder.failed()) {
auto exception = index_builder.get_exception();
vlog(
gclog.error,
"Error rebuilding index: {}, {}",
w.filename(),
exception);
std::rethrow_exception(exception);
}
vlog(
gclog.info,
"tx reducer path: {} stats {}",
w.filename(),
index_builder.get0());
}

bool compacted_index_needs_rebuild(compacted_index::recovery_state state) {
Expand Down

0 comments on commit 639cd6b

Please sign in to comment.