From 639cd6b4cdd71e03a3b6d0aeadca2097d74666b0 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Tue, 6 Feb 2024 20:42:11 -0800 Subject: [PATCH] compaction: coroutinze build_compaction_index no logic changes --- src/v/storage/segment_utils.cc | 65 ++++++++++++---------------------- 1 file changed, 23 insertions(+), 42 deletions(-) diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index b37a04aafc88..f1d571637819 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -49,6 +49,7 @@ #include #include #include +#include #include #include @@ -552,51 +553,31 @@ ss::future> do_self_compact_segment( ss::future<> build_compaction_index( model::record_batch_reader rdr, ss::lw_shared_ptr stm_manager, - fragmented_vector&& aborted_txs, + fragmented_vector aborted_txs, segment_full_path p, compaction_config cfg, storage_resources& resources) { - return make_compacted_index_writer( - p, cfg.iopc, resources, cfg.sanitizer_config) - .then([r = std::move(rdr), stm_manager, txs = std::move(aborted_txs)]( - compacted_index_writer w) mutable { - return ss::do_with( - std::move(w), - [stm_manager, r = std::move(r), txs = std::move(txs)]( - auto& writer) mutable { - return std::move(r) - .consume( - tx_reducer(stm_manager, std::move(txs), &writer), - model::no_timeout) - .then_wrapped( - [&writer](ss::future fut) mutable { - if (fut.failed()) { - vlog( - gclog.error, - "Error rebuilding index: {}, {}", - writer.filename(), - fut.get_exception()); - } else { - vlog( - gclog.info, - "tx reducer path: {} stats {}", - writer.filename(), - fut.get0()); - } - }) - .finally([&writer]() { - // writer needs to be closed in all cases, - // else can trigger a potential assert. - return writer.close().handle_exception( - [](std::exception_ptr e) { - vlog( - gclog.warn, - "error closing compacted index:{}", - e); - }); - }); - }); - }); + auto w = co_await make_compacted_index_writer( + p, cfg.iopc, resources, cfg.sanitizer_config); + auto reducer = tx_reducer(stm_manager, std::move(aborted_txs), &w); + auto index_builder = co_await ss::coroutine::as_future( + std::move(rdr) + .consume(std::move(reducer), model::no_timeout) + .finally([&w] { return w.close(); })); + if (index_builder.failed()) { + auto exception = index_builder.get_exception(); + vlog( + gclog.error, + "Error rebuilding index: {}, {}", + w.filename(), + exception); + std::rethrow_exception(exception); + } + vlog( + gclog.info, + "tx reducer path: {} stats {}", + w.filename(), + index_builder.get0()); } bool compacted_index_needs_rebuild(compacted_index::recovery_state state) {