Skip to content

Commit

Permalink
Merge pull request #17544 from mmaslankaprv/break-redirection
Browse files Browse the repository at this point in the history
c/topics_frontend: break the loop when dispatching to current leader
  • Loading branch information
mmaslankaprv authored Apr 3, 2024
2 parents 3e29ac1 + fd93ff1 commit 90483d0
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
4 changes: 3 additions & 1 deletion src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ service::do_finish_partition_update(finish_partition_update_request req) {
req.ntp,
req.new_replica_set,
config::shard_local_cfg().replicate_append_timeout_ms()
+ model::timeout_clock::now());
+ model::timeout_clock::now(),
topics_frontend::dispatch_to_leader::no);

finish_partition_update_reply reply{.result = errc::success};
if (ec) {
if (ec.category() == cluster::error_category()) {
Expand Down
20 changes: 14 additions & 6 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1127,27 +1127,35 @@ ss::future<std::error_code> topics_frontend::abort_moving_partition_replicas(
ss::future<std::error_code> topics_frontend::finish_moving_partition_replicas(
model::ntp ntp,
std::vector<model::broker_shard> new_replica_set,
model::timeout_clock::time_point tout) {
model::timeout_clock::time_point tout,
dispatch_to_leader dispatch) {
auto leader = _leaders.local().get_leader(model::controller_ntp);

// no leader available
if (!leader) {
return ss::make_ready_future<std::error_code>(
errc::no_leader_controller);
}
// optimization: if update is not in progress return early
if (!_topics.local().is_update_in_progress(ntp)) {
return ss::make_ready_future<std::error_code>(
errc::no_update_in_progress);
}

// current node is a leader, just replicate
if (leader == _self) {
// optimization: if update is not in progress return early
if (!_topics.local().is_update_in_progress(ntp)) {
return ss::make_ready_future<std::error_code>(
errc::no_update_in_progress);
}

finish_moving_partition_replicas_cmd cmd(
std::move(ntp), std::move(new_replica_set));

return replicate_and_wait(_stm, _as, std::move(cmd), tout);
}

if (!dispatch) {
return ss::make_ready_future<std::error_code>(
errc::not_leader_controller);
}

return _connections.local()
.with_node_client<controller_client_protocol>(
_self,
Expand Down
11 changes: 10 additions & 1 deletion src/v/cluster/topics_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/sharded.hh>
#include <seastar/util/bool_class.hh>

#include <absl/container/flat_hash_map.h>

Expand All @@ -40,6 +41,13 @@ namespace cluster {
// on every core
class topics_frontend {
public:
/**
* A boolean that may be used by the caller to request redirecting a request
* to the leader. This is useful as topic operations must be executed on
* `redpanda/controller` partition leader.
*/
using dispatch_to_leader = ss::bool_class<struct dispatch_to_leader_tag>;

struct capacity_info {
absl::flat_hash_map<model::node_id, node_disk_space> node_disk_reports;
absl::flat_hash_map<model::partition_id, int64_t> ntp_sizes;
Expand Down Expand Up @@ -129,7 +137,8 @@ class topics_frontend {
ss::future<std::error_code> finish_moving_partition_replicas(
model::ntp,
std::vector<model::broker_shard>,
model::timeout_clock::time_point);
model::timeout_clock::time_point,
dispatch_to_leader = dispatch_to_leader::yes);

ss::future<std::error_code> revert_cancel_partition_move(
model::ntp, model::timeout_clock::time_point);
Expand Down

0 comments on commit 90483d0

Please sign in to comment.