Skip to content

Commit

Permalink
chore: ignore applying the same cluster config twice (#3932)
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev authored Oct 16, 2024
1 parent 98bb5da commit ef814f6
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
}

optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
ClusterShardInfos config;
std::vector<ClusterShardInfo> config;

if (!json.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
Expand Down Expand Up @@ -271,7 +271,7 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
config.push_back(std::move(shard));
}

return config;
return ClusterShardInfos(config);
}
} // namespace

Expand Down
136 changes: 116 additions & 20 deletions src/server/cluster/cluster_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {

TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 0, .end = 0}}),
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
.replicas = {},
.migrations = {}}}),
kMyId,
ClusterShardInfos({{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 0, .end = 0}}),
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
.replicas = {},
.migrations = {}}})),
nullptr);
}

Expand Down Expand Up @@ -150,18 +151,19 @@ TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) {

TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
auto config = ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}),
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}),
.master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002},
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}),
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}},
.migrations = {}}});
kMyId, ClusterShardInfos(
{{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}),
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}),
.master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002},
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}),
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}},
.migrations = {}}}));
EXPECT_NE(config, nullptr);
SlotSet owned_slots = config->GetOwnedSlots();
EXPECT_EQ(owned_slots.ToSlotRanges().Size(), 1);
Expand Down Expand Up @@ -609,4 +611,98 @@ TEST_F(ClusterConfigTest, SlotSetAPI) {
}
}

TEST_F(ClusterConfigTest, ConfigComparison) {
auto config1 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 8000 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }]
},
{
"slot_ranges": [ { "start": 8001, "end": 16383 } ],
"master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": []
}
])json");

EXPECT_EQ(config1->GetConfig(), config1->GetConfig());

auto config2 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 16383 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }]
}
])json");
EXPECT_NE(config1->GetConfig(), config2->GetConfig());
EXPECT_EQ(config2->GetConfig(), config2->GetConfig());

auto config3 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 8000 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9002, "node_id": "id1" }]
},
{
"slot_ranges": [ { "start": 8001, "end": 16383 } ],
"master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": []
}
])json");
EXPECT_NE(config1->GetConfig(), config3->GetConfig());
EXPECT_NE(config2->GetConfig(), config3->GetConfig());
EXPECT_EQ(config3->GetConfig(), config3->GetConfig());

auto config4 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 8000 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id2" }]
},
{
"slot_ranges": [ { "start": 8001, "end": 16383 } ],
"master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": []
}
])json");

EXPECT_NE(config1->GetConfig(), config4->GetConfig());
EXPECT_NE(config2->GetConfig(), config4->GetConfig());
EXPECT_NE(config3->GetConfig(), config4->GetConfig());
EXPECT_EQ(config4->GetConfig(), config4->GetConfig());

auto config5 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 8000 } ],
"master": { "id": "id2", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }]
},
{
"slot_ranges": [ { "start": 8001, "end": 16383 } ],
"master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": []
}
])json");
EXPECT_NE(config1->GetConfig(), config5->GetConfig());
EXPECT_NE(config2->GetConfig(), config5->GetConfig());
EXPECT_NE(config3->GetConfig(), config5->GetConfig());
EXPECT_NE(config4->GetConfig(), config5->GetConfig());
EXPECT_EQ(config5->GetConfig(), config5->GetConfig());
}

} // namespace dfly::cluster
20 changes: 20 additions & 0 deletions src/server/cluster/cluster_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ std::string MigrationInfo::ToString() const {
slot_ranges.ToString(), ")");
}

bool ClusterShardInfo::operator==(const ClusterShardInfo& r) const {
if (slot_ranges == r.slot_ranges && master == r.master) {
auto lreplicas = replicas;
auto lmigrations = migrations;
auto rreplicas = r.replicas;
auto rmigrations = r.migrations;
std::sort(lreplicas.begin(), lreplicas.end());
std::sort(lmigrations.begin(), lmigrations.end());
std::sort(rreplicas.begin(), rreplicas.end());
std::sort(rmigrations.begin(), rmigrations.end());
return lreplicas == rreplicas && lmigrations == rmigrations;
}
return false;
}

ClusterShardInfos::ClusterShardInfos(std::vector<ClusterShardInfo> infos)
: infos_(std::move(infos)) {
std::sort(infos_.begin(), infos_.end());
}

namespace {
enum class ClusterMode {
kUninitialized,
Expand Down
49 changes: 48 additions & 1 deletion src/server/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ struct ClusterNodeInfo {
bool operator==(const ClusterNodeInfo& r) const noexcept {
return port == r.port && ip == r.ip && id == r.id;
}

bool operator<(const ClusterNodeInfo& r) const noexcept {
return id < r.id;
}
};

struct MigrationInfo {
Expand All @@ -100,6 +104,10 @@ struct MigrationInfo {
return node_info == r.node_info && slot_ranges == r.slot_ranges;
}

bool operator<(const MigrationInfo& r) const noexcept {
return node_info < r.node_info;
}

std::string ToString() const;
};

Expand All @@ -108,9 +116,48 @@ struct ClusterShardInfo {
ClusterNodeInfo master;
std::vector<ClusterNodeInfo> replicas;
std::vector<MigrationInfo> migrations;

bool operator==(const ClusterShardInfo& r) const;

bool operator<(const ClusterShardInfo& r) const noexcept {
return master < r.master;
}
};

using ClusterShardInfos = std::vector<ClusterShardInfo>;
class ClusterShardInfos {
public:
ClusterShardInfos() = default;
ClusterShardInfos(std::vector<ClusterShardInfo> infos);
ClusterShardInfos(ClusterShardInfo info) : infos_({info}) {
}

auto begin() const noexcept {
return infos_.cbegin();
}

auto end() const noexcept {
return infos_.cend();
}

auto size() const noexcept {
return infos_.size();
}

bool empty() const noexcept {
return infos_.empty();
}

bool operator==(const ClusterShardInfos& r) const noexcept {
return infos_ == r.infos_;
}

bool operator!=(const ClusterShardInfos& r) const noexcept {
return infos_ != r.infos_;
}

private:
std::vector<ClusterShardInfo> infos_;
};

// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t {
Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
if (new_config == nullptr) {
LOG(WARNING) << "Can't set cluster config";
return cntx->SendError("Invalid cluster configuration.");
} else if (tl_cluster_config && tl_cluster_config->GetConfig() == new_config->GetConfig()) {
return cntx->SendOk();
}

PreparedToRemoveOutgoingMigrations outgoing_migrations; // should be removed without mutex lock
Expand Down

0 comments on commit ef814f6

Please sign in to comment.