Skip to content

Commit

Permalink
c/m_dissemination: avoid copying updates vector
Browse files Browse the repository at this point in the history
Previously each time the leadership update request was received by the
`metadata_dissemination_handler` it created a copy for each of the shard
on the shard handling the request. This is inefficient and may lead to
OOM on the handler shard (especially for very large machines).

Instead of creating a copy for each shard we can simply use the const
reference as the updates vector doesn't change and it is safe to access
it from other cores.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Aug 21, 2024
1 parent 21d0685 commit 7dbf46e
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions src/v/cluster/metadata_dissemination_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/timeout_clock.h"
#include "ssx/async_algorithm.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/smp.hh>

#include <boost/range/irange.hpp>
Expand Down Expand Up @@ -51,27 +53,26 @@ ss::future<update_leadership_reply>
metadata_dissemination_handler::do_update_leadership(
chunked_vector<ntp_leader_revision> leaders) {
vlog(clusterlog.trace, "Received a metadata update");
co_await ss::parallel_for_each(
boost::irange<ss::shard_id>(0, ss::smp::count),
[this, leaders = std::move(leaders)](ss::shard_id shard) {
chunked_vector<ntp_leader_revision> local_leaders;
local_leaders.reserve(leaders.size());
std::copy(
leaders.begin(), leaders.end(), std::back_inserter(local_leaders));

return ss::smp::submit_to(
shard, [this, leaders = std::move(local_leaders)] {
for (auto& leader : leaders) {
_leaders.local().update_partition_leader(
leader.ntp,
leader.revision,
leader.term,
leader.leader_id);
}
});
});

co_return update_leadership_reply{};
return ss::do_with(
std::move(leaders),
[this](const chunked_vector<ntp_leader_revision>& leaders) {
return ss::parallel_for_each(
boost::irange<ss::shard_id>(0, ss::smp::count),
[this, &leaders](ss::shard_id shard) {
return ss::smp::submit_to(shard, [this, &leaders] {
return ssx::async_for_each(
leaders,
[this](const ntp_leader_revision& leader) {
_leaders.local().update_partition_leader(
leader.ntp,
leader.revision,
leader.term,
leader.leader_id);
});
});
});
})
.then([] { return update_leadership_reply{}; });
}

namespace {
Expand Down

0 comments on commit 7dbf46e

Please sign in to comment.