diff --git a/src/v/cluster/metadata_dissemination_handler.cc b/src/v/cluster/metadata_dissemination_handler.cc index 61d0319984a0..4486e6f783c8 100644 --- a/src/v/cluster/metadata_dissemination_handler.cc +++ b/src/v/cluster/metadata_dissemination_handler.cc @@ -19,9 +19,11 @@ #include "model/fundamental.h" #include "model/metadata.h" #include "model/timeout_clock.h" +#include "ssx/async_algorithm.h" #include #include +#include #include #include @@ -51,27 +53,26 @@ ss::future metadata_dissemination_handler::do_update_leadership( chunked_vector leaders) { vlog(clusterlog.trace, "Received a metadata update"); - co_await ss::parallel_for_each( - boost::irange(0, ss::smp::count), - [this, leaders = std::move(leaders)](ss::shard_id shard) { - chunked_vector local_leaders; - local_leaders.reserve(leaders.size()); - std::copy( - leaders.begin(), leaders.end(), std::back_inserter(local_leaders)); - - return ss::smp::submit_to( - shard, [this, leaders = std::move(local_leaders)] { - for (auto& leader : leaders) { - _leaders.local().update_partition_leader( - leader.ntp, - leader.revision, - leader.term, - leader.leader_id); - } - }); - }); - - co_return update_leadership_reply{}; + return ss::do_with( + std::move(leaders), + [this](const chunked_vector& leaders) { + return ss::parallel_for_each( + boost::irange(0, ss::smp::count), + [this, &leaders](ss::shard_id shard) { + return ss::smp::submit_to(shard, [this, &leaders] { + return ssx::async_for_each( + leaders, + [this](const ntp_leader_revision& leader) { + _leaders.local().update_partition_leader( + leader.ntp, + leader.revision, + leader.term, + leader.leader_id); + }); + }); + }); + }) + .then([] { return update_leadership_reply{}; }); } namespace {