Skip to content

Commit

Permalink
refactor(cluster): #2652 initiate migration process from CONFIG cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Feb 27, 2024
1 parent d54f220 commit e0c26fa
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 22 deletions.
37 changes: 37 additions & 0 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
[&](const Node& node) { return node.id == my_id; });
if (owned_by_me) {
result->my_slots_.Set(shard.slot_ranges, true);
result->my_migrations_ = shard.migrations;
} else {
for (const auto& m : shard.migrations) {
if (my_id == m.target_id) {
result->my_incoming_migrations_.push_back(m);
}
}
}
}

Expand Down Expand Up @@ -236,6 +243,32 @@ optional<ClusterConfig::Node> ParseClusterNode(const JsonType& json) {
return node;
}

optional<std::vector<ClusterConfig::MigrationInfo>> ParseMigrations(const JsonType& json) {
if (!json.is_array()) {
LOG(INFO) << "no migrations found: " << json;
return nullopt;
}

std::vector<ClusterConfig::MigrationInfo> res;
for (const auto& element : json.array_range()) {
auto target_id = element.at_or_null("target_id");
auto ip = element.at_or_null("ip");
auto port = ReadNumeric<uint16_t>(element.at_or_null("port"));
auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));

if (!target_id.is_string() || !ip.is_string() || !port || !slots) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid migration json " << json;
return nullopt;
}

res.emplace_back(ClusterConfig::MigrationInfo{.slot_ranges = std::move(*slots),
.target_id = target_id.as_string(),
.ip = ip.as_string(),
.port = *port});
}
return res;
}

optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType& json) {
ClusterConfig::ClusterShards config;

Expand Down Expand Up @@ -278,6 +311,10 @@ optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType
shard.replicas.push_back(std::move(node).value());
}

if (auto migrations = ParseMigrations(element.at_or_null("migrations")); migrations) {
shard.migrations = std::move(*migrations);
}

config.push_back(std::move(shard));
}

Expand Down
18 changes: 18 additions & 0 deletions src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ class ClusterConfig {
uint16_t port = 0;
};

struct MigrationInfo {
std::vector<SlotRange> slot_ranges;
std::string target_id;
std::string ip;
uint16_t port = 0;
};

struct ClusterShard {
SlotRanges slot_ranges;
Node master;
std::vector<Node> replicas;
std::vector<MigrationInfo> migrations;
};

using ClusterShards = std::vector<ClusterShard>;
Expand Down Expand Up @@ -84,6 +92,14 @@ class ClusterConfig {

const SlotSet& GetOwnedSlots() const;

const std::vector<MigrationInfo>& GetMigrations() const {
return my_migrations_;
}

const std::vector<MigrationInfo>& GetIncomingMigrations() const {
return my_incoming_migrations_;
}

private:
struct SlotEntry {
const ClusterShard* shard = nullptr;
Expand All @@ -95,6 +111,8 @@ class ClusterConfig {
ClusterShards config_;

SlotSet my_slots_;
std::vector<MigrationInfo> my_migrations_;
std::vector<MigrationInfo> my_incoming_migrations_;
};

} // namespace dfly
53 changes: 36 additions & 17 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ ClusterConfig* ClusterFamily::cluster_config() {
}

ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
ClusterShard info{
.slot_ranges = {{.start = 0, .end = ClusterConfig::kMaxSlotNum}},
.master = {},
.replicas = {},
};
ClusterShard info{.slot_ranges = {{.start = 0, .end = ClusterConfig::kMaxSlotNum}},
.master = {},
.replicas = {},
.migrations = {}};

optional<Replica::Info> replication_info = server_family_->GetReplicaInfo();
ServerState& etl = *ServerState::tlocal();
Expand Down Expand Up @@ -478,24 +477,22 @@ void WriteFlushSlotsToJournal(const SlotSet& slots) {
} // namespace

void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) {
SinkReplyBuilder* rb = cntx->reply_builder();

if (args.size() != 1) {
return rb->SendError(WrongNumArgsError("DFLYCLUSTER CONFIG"));
return cntx->SendError(WrongNumArgsError("DFLYCLUSTER CONFIG"));
}

string_view json_str = ArgS(args, 0);
optional<JsonType> json = JsonFromString(json_str, PMR_NS::get_default_resource());
if (!json.has_value()) {
LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str;
return rb->SendError("Invalid JSON cluster config", kSyntaxErrType);
return cntx->SendError("Invalid JSON cluster config", kSyntaxErrType);
}

shared_ptr<ClusterConfig> new_config =
ClusterConfig::CreateFromConfig(server_family_->master_id(), json.value());
if (new_config == nullptr) {
LOG(WARNING) << "Can't set cluster config";
return rb->SendError("Invalid cluster configuration.");
return cntx->SendError("Invalid cluster configuration.");
}

lock_guard gu(set_config_mu);
Expand All @@ -517,12 +514,15 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
tracker.TrackOnThread();
};

// TODO think about another place for it
RemoveFinishedIncomingMigrations();

server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr);

// TODO rewrite with outgoing migrations
if (!StartSlotMigrations(new_config->GetIncomingMigrations(), cntx)) {
return cntx->SendError("Can't start the migration");
}
RemoveFinishedMigrations();

if (!tracker.Wait(absl::Seconds(1))) {
LOG(WARNING) << "Cluster config change timed out";
}
Expand All @@ -534,7 +534,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
WriteFlushSlotsToJournal(deleted_slots);
}

return rb->SendOk();
return cntx->SendOk();
}

void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx) {
Expand Down Expand Up @@ -620,6 +620,19 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon
return cntx->SendLong(node->GetSyncId());
}

bool ClusterFamily::StartSlotMigrations(const std::vector<ClusterConfig::MigrationInfo>& migrations,
ConnectionContext* cntx) {
// Add validating and error processing
for (auto m : migrations) {
auto* node = AddMigration(m.ip, m.port, m.slot_ranges);
if (!node) {
return false;
}
node->Start(cntx);
}
return true;
}

static std::string_view state_to_str(MigrationState state) {
switch (state) {
case MigrationState::C_NO_STATE:
Expand Down Expand Up @@ -757,12 +770,20 @@ ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t
.get();
}

void ClusterFamily::RemoveFinishedIncomingMigrations() {
void ClusterFamily::RemoveFinishedMigrations() {
lock_guard lk(migration_mu_);
auto removed_items_it =
std::remove_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[](const auto& m) { return m->GetState() == MigrationState::C_FINISHED; });
incoming_migrations_jobs_.erase(removed_items_it, incoming_migrations_jobs_.end());

for (auto it = outgoing_migration_jobs_.begin(); it != outgoing_migration_jobs_.end();) {
if (it->second->GetState() == MigrationState::C_FINISHED) {
it = outgoing_migration_jobs_.erase(it);
} else {
++it;
}
}
}

void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
Expand Down Expand Up @@ -931,8 +952,6 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
migration->Cancel(shard->shard_id());
});

outgoing_migration_jobs_.erase(sync_id);

cntx->SendOk();
}

Expand Down
4 changes: 3 additions & 1 deletion src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ class ClusterFamily {
// create a ClusterSlotMigration entity which will execute migration
ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port, SlotRanges slots);

void RemoveFinishedIncomingMigrations();
bool StartSlotMigrations(const std::vector<ClusterConfig::MigrationInfo>& migrations,
ConnectionContext* cntx);
void RemoveFinishedMigrations();

// store info about migration and create unique session id
uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, SlotRanges slots);
Expand Down
31 changes: 27 additions & 4 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,10 +794,27 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
)
assert "NO_STATE" == status

res = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "5200", "5259"
migation_config = f"""
[
{{
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }},
"replicas": [],
"migrations": [{{ "slot_ranges": [ {{ "start": 5200, "end": 5259 }} ]
, "ip": "127.0.0.1", "port" : {nodes[0].admin_port}, "target_id": "{node_ids[1]}" }}]
}},
{{
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }},
"replicas": []
}}
]
"""

await push_config(
migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
c_nodes_admin,
)
assert 1 == res

while (
await c_nodes_admin[1].execute_command(
Expand Down Expand Up @@ -829,10 +846,16 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
assert e.args[0] == "Can't start the migration, another one is in progress"

await push_config(
config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
config.replace("LAST_SLOT_CUTOFF", "5199").replace("NEXT_SLOT_CUTOFF", "5200"),
c_nodes_admin,
)

status = await c_nodes_admin[0].execute_command("DFLYCLUSTER SLOT-MIGRATION-STATUS")
assert ["out 127.0.0.1:30002 STABLE_SYNC"] == status

status = await c_nodes_admin[1].execute_command("DFLYCLUSTER SLOT-MIGRATION-STATUS")
assert ["in 127.0.0.1:31001 STABLE_SYNC"] == status

await close_clients(*c_nodes, *c_nodes_admin)


Expand Down

0 comments on commit e0c26fa

Please sign in to comment.