Skip to content

Commit

Permalink
Merge pull request #22977 from mmaslankaprv/fix-dissemination-large-s…
Browse files Browse the repository at this point in the history
…hard-count

Fix dissemination large shard count
  • Loading branch information
mmaslankaprv authored Aug 21, 2024
2 parents 8b2e1ed + 8e4f51f commit ec8493e
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 29 deletions.
45 changes: 23 additions & 22 deletions src/v/cluster/metadata_dissemination_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/timeout_clock.h"
#include "ssx/async_algorithm.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/smp.hh>

#include <boost/range/irange.hpp>
Expand Down Expand Up @@ -49,29 +51,28 @@ metadata_dissemination_handler::update_leadership_v2(

ss::future<update_leadership_reply>
metadata_dissemination_handler::do_update_leadership(
fragmented_vector<ntp_leader_revision> leaders) {
chunked_vector<ntp_leader_revision> leaders) {
vlog(clusterlog.trace, "Received a metadata update");
co_await ss::parallel_for_each(
boost::irange<ss::shard_id>(0, ss::smp::count),
[this, leaders = std::move(leaders)](ss::shard_id shard) {
fragmented_vector<ntp_leader_revision> local_leaders;
local_leaders.reserve(leaders.size());
std::copy(
leaders.begin(), leaders.end(), std::back_inserter(local_leaders));

return ss::smp::submit_to(
shard, [this, leaders = std::move(local_leaders)] {
for (auto& leader : leaders) {
_leaders.local().update_partition_leader(
leader.ntp,
leader.revision,
leader.term,
leader.leader_id);
}
});
});

co_return update_leadership_reply{};
return ss::do_with(
std::move(leaders),
[this](const chunked_vector<ntp_leader_revision>& leaders) {
return ss::parallel_for_each(
boost::irange<ss::shard_id>(0, ss::smp::count),
[this, &leaders](ss::shard_id shard) {
return ss::smp::submit_to(shard, [this, &leaders] {
return ssx::async_for_each(
leaders,
[this](const ntp_leader_revision& leader) {
_leaders.local().update_partition_leader(
leader.ntp,
leader.revision,
leader.term,
leader.leader_id);
});
});
});
})
.then([] { return update_leadership_reply{}; });
}

namespace {
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/metadata_dissemination_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class metadata_dissemination_handler

private:
ss::future<update_leadership_reply>
do_update_leadership(fragmented_vector<ntp_leader_revision>);
do_update_leadership(chunked_vector<ntp_leader_revision>);

ss::sharded<partition_leaders_table>& _leaders;
}; // namespace cluster
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/metadata_dissemination_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ ss::future<> metadata_dissemination_service::update_leaders_with_health_report(
ss::future<> metadata_dissemination_service::dispatch_one_update(
model::node_id target_id, update_retry_meta& meta) {
// copy updates to make retries possible
fragmented_vector<ntp_leader_revision> updates;
chunked_vector<ntp_leader_revision> updates;
updates.reserve(meta.updates.size());
std::copy(
meta.updates.begin(), meta.updates.end(), std::back_inserter(updates));
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/metadata_dissemination_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct update_leadership_request_v2
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;
static constexpr int8_t version = 0;
fragmented_vector<ntp_leader_revision> leaders;
chunked_vector<ntp_leader_revision> leaders;

update_leadership_request_v2() noexcept = default;

Expand All @@ -119,7 +119,7 @@ struct update_leadership_request_v2
};

update_leadership_request_v2 copy() const {
fragmented_vector<ntp_leader_revision> leaders_cp;
chunked_vector<ntp_leader_revision> leaders_cp;
leaders_cp.reserve(leaders.size());
std::copy(
leaders.begin(), leaders.end(), std::back_inserter(leaders_cp));
Expand All @@ -134,7 +134,7 @@ struct update_leadership_request_v2
}

explicit update_leadership_request_v2(
fragmented_vector<ntp_leader_revision> leaders)
chunked_vector<ntp_leader_revision> leaders)
: leaders(std::move(leaders)) {}

auto serde_fields() { return std::tie(leaders); }
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
tests::random_named_int<model::term_id>(),
tests::random_named_int<model::node_id>(),
tests::random_named_int<model::revision_id>()));
fragmented_vector<cluster::ntp_leader_revision> l_revs;
chunked_vector<cluster::ntp_leader_revision> l_revs;
l_revs.emplace_back(
model::random_ntp(),
tests::random_named_int<model::term_id>(),
Expand Down
2 changes: 1 addition & 1 deletion src/v/compat/metadata_dissemination_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace compat {
template<>
struct instance_generator<cluster::update_leadership_request_v2> {
static cluster::update_leadership_request_v2 random() {
fragmented_vector<cluster::ntp_leader_revision> values;
chunked_vector<cluster::ntp_leader_revision> values;
values.emplace_back(
model::random_ntp(),
tests::random_named_int<model::term_id>(),
Expand Down

0 comments on commit ec8493e

Please sign in to comment.