Skip to content

Commit

Permalink
Merge pull request #1414 from mmaslankaprv/fix-moving-not-fully-initi…
Browse files Browse the repository at this point in the history
…alized-partition

handling moving not initialized partition cross shards
  • Loading branch information
dotnwat authored May 22, 2021
2 parents d1d8cff + 7597945 commit caf7679
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 20 deletions.
45 changes: 30 additions & 15 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "cluster/controller_backend.h"

#include "cluster/cluster_utils.h"
#include "cluster/errc.h"
#include "cluster/fwd.h"
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "cluster/partition.h"
Expand Down Expand Up @@ -422,18 +424,20 @@ controller_backend::execute_partitition_op(const topic_table::delta& delta) {
__builtin_unreachable();
}

ss::future<std::optional<model::revision_id>>
ss::future<std::optional<controller_backend::cross_shard_move_request>>
controller_backend::ask_remote_shard_for_initail_rev(
model::ntp ntp, ss::shard_id shard) {
return container().invoke_on(shard, [ntp](controller_backend& remote) {
if (auto it = remote._cross_shard_requests.find(ntp);
it != remote._cross_shard_requests.end()) {
std::optional<model::revision_id> ret{it->second};
remote._cross_shard_requests.erase(it);
return ret;
}
return std::optional<model::revision_id>{};
});
using ret_t = std::optional<controller_backend::cross_shard_move_request>;
return container().invoke_on(
shard, [ntp = std::move(ntp)](controller_backend& remote) {
if (auto it = remote._cross_shard_requests.find(ntp);
it != remote._cross_shard_requests.end()) {
ret_t ret{std::move(it->second)};
remote._cross_shard_requests.erase(it);
return ret;
}
return ret_t{};
});
}

ss::future<std::error_code> controller_backend::process_partition_update(
Expand Down Expand Up @@ -549,17 +553,20 @@ ss::future<std::error_code> controller_backend::process_partition_update(
if (contains_node(_self, previous.replicas)) {
auto previous_shard = get_target_shard(_self, previous.replicas);
// ask previous controller for partition initial revision
auto initial_revision = co_await ask_remote_shard_for_initail_rev(
auto x_core_move_req = co_await ask_remote_shard_for_initail_rev(
ntp, *previous_shard);

if (initial_revision) {
if (x_core_move_req) {
vlog(
clusterlog.trace,
"creating partition {} from shard {}",
ntp,
previous_shard);
auto ec = co_await create_partition(
std::move(ntp), requested.group, *initial_revision, {});
std::move(ntp),
requested.group,
x_core_move_req->revision,
x_core_move_req->initial_configuration.brokers());
if (ec) {
co_return ec;
}
Expand Down Expand Up @@ -772,6 +779,10 @@ ss::future<std::error_code> controller_backend::create_partition(
})
.then([] { return make_error_code(errc::success); });
}
controller_backend::cross_shard_move_request::cross_shard_move_request(
model::revision_id rev, raft::group_configuration cfg)
: revision(rev)
, initial_configuration(std::move(cfg)) {}

ss::future<std::error_code> controller_backend::shutdown_on_current_shard(
model::ntp ntp, model::revision_id rev) {
Expand All @@ -783,18 +794,22 @@ ss::future<std::error_code> controller_backend::shutdown_on_current_shard(
}
auto gr = partition->group();
auto init_rev = partition->get_ntp_config().get_revision();

try {
// remove from shard table
co_await remove_from_shard_table(ntp, gr, rev);
// shutdown partition
co_await _partition_manager.local().shutdown(ntp);
// after partition is stopped emplace cross shard request.
auto [_, success] = _cross_shard_requests.emplace(ntp, init_rev);
auto [it, success] = _cross_shard_requests.emplace(
ntp,
cross_shard_move_request(init_rev, partition->group_configuration()));

vassert(
success,
"only one cross shard request is allowed to be pending for single "
"ntp, current request: {}, ntp: {}, revision: {}",
_cross_shard_requests[ntp],
it->second,
ntp,
rev);
co_return errc::success;
Expand Down
30 changes: 26 additions & 4 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
#include "cluster/topic_table.h"
#include "cluster/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "outcome.h"
#include "raft/group_configuration.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/sharded.hh>

#include <absl/container/node_hash_map.h>

#include <ostream>

namespace cluster {

/// on every core, sharded
Expand All @@ -46,6 +50,23 @@ class controller_backend
std::vector<topic_table::delta> list_ntp_deltas(const model::ntp&) const;

private:
struct cross_shard_move_request {
cross_shard_move_request(model::revision_id, raft::group_configuration);

model::revision_id revision;
raft::group_configuration initial_configuration;
friend std::ostream& operator<<(
std::ostream& o,
const controller_backend::cross_shard_move_request& r) {
fmt::print(
o,
"{{revision: {}, configuration: {}}}",
r.revision,
r.initial_configuration);
return o;
}
};

using deltas_t = std::vector<topic_table::delta>;
using underlying_t = absl::flat_hash_map<model::ntp, deltas_t>;

Expand Down Expand Up @@ -96,7 +117,7 @@ class controller_backend
ss::future<std::error_code>
shutdown_on_current_shard(model::ntp, model::revision_id);

ss::future<std::optional<model::revision_id>>
ss::future<std::optional<cross_shard_move_request>>
ask_remote_shard_for_initail_rev(model::ntp, ss::shard_id);

void housekeeping();
Expand All @@ -119,10 +140,11 @@ class controller_backend
* This map is populated by backend instance on shard that given NTP is
* moved from. Map is then queried by the controller instance on target
* shard. Partition is created on target shard with the same initial
* revision as on originating shard, this way identity of node i.e. raft
* vnode doesn't change.
* revision and configuration as on originating shard, this way identity of
* node i.e. raft vnode doesn't change.
*/
absl::node_hash_map<model::ntp, model::revision_id> _cross_shard_requests;
absl::node_hash_map<model::ntp, cross_shard_move_request>
_cross_shard_requests;
};

std::vector<topic_table::delta> calculate_bootstrap_deltas(
Expand Down
10 changes: 10 additions & 0 deletions src/v/finjector/hbadger.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ class honey_badger {
void register_probe(std::string_view, probe* p);
void deregister_probe(std::string_view);

static constexpr bool is_enabled() {
#ifndef NDEBUG
// debug
return true;
#else
// production
return false;
#endif
}

void set_exception(std::string_view module, std::string_view point);
void set_delay(std::string_view module, std::string_view point);
void set_termination(std::string_view module, std::string_view point);
Expand Down
9 changes: 8 additions & 1 deletion src/v/redpanda/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ seastar_generate_swagger(
OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/admin/api-doc/partition.json.h
)

seastar_generate_swagger(
TARGET hbadger_swagger
VAR hbadger_swagger_file
IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/admin/api-doc/hbadger.json
OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/admin/api-doc/hbadger.json.h
)

v_cc_library(
NAME application
SRCS
Expand All @@ -69,7 +76,7 @@ add_executable(redpanda
target_link_libraries(redpanda PUBLIC v::application v::raft v::kafka)
set_property(TARGET redpanda PROPERTY POSITION_INDEPENDENT_CODE ON)
add_dependencies(v_application config_swagger raft_swagger kafka_swagger
security_swagger status_swagger broker_swagger partition_swagger)
security_swagger status_swagger broker_swagger partition_swagger hbadger_swagger)

if(CMAKE_BUILD_TYPE MATCHES Release)
include(CheckIPOSupported)
Expand Down
127 changes: 127 additions & 0 deletions src/v/redpanda/admin/api-doc/hbadger.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
{
"apiVersion": "0.0.1",
"swaggerVersion": "1.2",
"basePath": "/v1",
"resourcePath": "/failure-probes",
"produces": [
"application/json"
],
"apis": [
{
"path": "/v1/failure-probes",
"operations": [
{
"method": "GET",
"summary": "Get a list of available failure probes",
"type": "failure_injector_status",
"nickname": "get_failure_probes",
"produces": [
"application/json"
],
"parameters": []
}
]
},
{
"path": "/v1/failure-probes/{module}/{point}",
"operations": [
{
"method": "DELETE",
"summary": "Delete all failure injectors at given point",
"type": "void",
"nickname": "delete_failure_probe",
"produces": [
"application/json"
],
"parameters": [
{
"name": "module",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "point",
"in": "path",
"required": true,
"type": "string"
}
]
}
]
},
{
"path": "/v1/failure-probes/{module}/{point}/{type}",
"operations": [
{
"method": "POST",
"summary": "Enable failure injection of given type",
"type": "void",
"nickname": "set_failure_probe",
"produces": [
"application/json"
],
"parameters": [
{
"name": "module",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "point",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "type",
"in": "path",
"required": true,
"type": "string",
"enum": [
"exception",
"delay",
"terminate"
]
}
]
}
]
}
],
"models": {
"failure_probes": {
"id": "failure_probes",
"description": "Group of failure probes related with single resource",
"properties": {
"module": {
"type": "string",
"description": "failure probes module name"
},
"points": {
"type": "array",
"description": "list of failure points",
"items": {
"type": "string"
}
}
}
},
"failure_injector_status": {
"id": "failure_injector_status",
"description": "Status of failure injector with list of available probes",
"properties": {
"probes": {
"type": "array",
"items": {
"type": "failure_probes"
}
},
"enabled": {
"type": "boolean"
}
}
}
}
}
Loading

0 comments on commit caf7679

Please sign in to comment.