Skip to content

Commit

Permalink
fix: add metric is_eligible_for_master_election to support reelection…
Browse files Browse the repository at this point in the history
… decision in codis-sentinel (OpenAtomFoundation#2766)

* add replication

* add metric 'is_eligible_for_master_election' to indicate whether the instance has corrupted full sync, which can be used in codis-pika cluster reelection scenario

* add IsEligibleForMasterElection

* IsEligibleForMasterElection slaveof force
---------

Co-authored-by: liuyuecai <liuyuecai@360.cn>
Co-authored-by: cjh <1271435567@qq.com>
  • Loading branch information
3 people authored Jul 12, 2024
1 parent 807cff1 commit 057812f
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 13 deletions.
7 changes: 4 additions & 3 deletions codis/pkg/models/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (g *Group) SelectNewMaster() (string, int) {
var newMasterIndex = -1

for index, server := range g.Servers {
if index == 0 || server.State != GroupServerStateNormal {
if index == 0 || server.State != GroupServerStateNormal || !server.IsEligibleForMasterElection {
continue
}

Expand Down Expand Up @@ -84,8 +84,9 @@ type GroupServer struct {
// master or slave
Role GroupServerRole `json:"role"`
// If it is a master node, take the master_repl_offset field, otherwise take the slave_repl_offset field
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"`

// Monitoring status, 0 normal, 1 subjective offline, 2 actual offline
// If marked as 2 , no service is provided
Expand Down
11 changes: 10 additions & 1 deletion codis/pkg/topom/topom_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer *
groupServer.Role = models.GroupServerRole(state.Replication.Role)
groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum
groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset
groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection
groupServer.Action.State = models.ActionSynced
err = s.storeUpdateGroup(group)
// clean cache whether err is nil or not
Expand Down Expand Up @@ -531,7 +532,12 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa
continue
}

err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth)
if server.IsEligibleForMasterElection {
err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth)
} else {
err = updateMasterToNewOneForcefully(server.Addr, newMasterAddr, s.config.ProductAuth)
}

if err != nil {
// skip err, and retry to update master-slave replication relationship through next heartbeat check
err = nil
Expand All @@ -548,14 +554,17 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa
}

func updateMasterToNewOne(serverAddr, masterAddr string, auth string) (err error) {
log.Infof("[%s] switch master to server [%s]", serverAddr, masterAddr)
return setNewRedisMaster(serverAddr, masterAddr, auth, false)
}

func promoteServerToNewMaster(serverAddr, auth string) (err error) {
log.Infof("[%s] switch master to NO:ONE", serverAddr)
return setNewRedisMaster(serverAddr, "NO:ONE", auth, false)
}

func updateMasterToNewOneForcefully(serverAddr, masterAddr string, auth string) (err error) {
log.Infof("[%s] switch master to server [%s] forcefully", serverAddr, masterAddr)
return setNewRedisMaster(serverAddr, masterAddr, auth, true)
}

Expand Down
3 changes: 2 additions & 1 deletion codis/pkg/topom/topom_sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *Topom) CheckStateAndSwitchSlavesAndMasters(filter func(index int, g *mo

if len(recoveredGroupServersState) > 0 {
// offline GroupServer's service has recovered, check and fix it's master-slave replication relationship
s.tryFixReplicationRelationships(ctx, recoveredGroupServersState,len(masterOfflineGroups))
s.tryFixReplicationRelationships(ctx, recoveredGroupServersState, len(masterOfflineGroups))
}

return nil
Expand Down Expand Up @@ -92,6 +92,7 @@ func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group
groupServer.Role = models.GroupServerRole(state.Replication.Role)
groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum
groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset
groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection
groupServer.Action.State = models.ActionSynced
}
}
Expand Down
18 changes: 10 additions & 8 deletions codis/pkg/utils/redis/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ func (i *InfoSlave) UnmarshalJSON(b []byte) error {
}

type InfoReplication struct {
Role string `json:"role"`
ConnectedSlaves int `json:"connected_slaves"`
MasterHost string `json:"master_host"`
MasterPort string `json:"master_port"`
MasterLinkStatus string `json:"master_link_status"` // down; up
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
Slaves []InfoSlave `json:"-"`
Role string `json:"role"`
ConnectedSlaves int `json:"connected_slaves"`
MasterHost string `json:"master_host"`
MasterPort string `json:"master_port"`
MasterLinkStatus string `json:"master_link_status"` // down; up
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"`
Slaves []InfoSlave `json:"-"`
}

type ReplicationState struct {
Expand Down Expand Up @@ -108,6 +109,7 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error {
i.MasterPort = kvmap["master_host"]
i.MasterHost = kvmap["master_port"]
i.MasterLinkStatus = kvmap["master_link_status"]
i.IsEligibleForMasterElection = kvmap["is_eligible_for_master_election"] == "true"

if val, ok := kvmap["binlog_file_num"]; ok {
if intval, err := strconv.ParseUint(val, 10, 64); err == nil {
Expand Down
7 changes: 7 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,10 @@ cache-lfu-decay-time: 1
#
# Example:
# rename-command : FLUSHDB 360flushdb

# [You can ignore this item]
# This is NOT a regular conf item, it is a internal used metric that relies on pika.conf for persistent storage.
# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election'
# which serves for the scenario of codis-pika cluster reelection
# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING]
internal-used-unfinished-full-sync :
39 changes: 39 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ class PikaConf : public pstd::BaseConf {
int64_t rsync_timeout_ms() {
return rsync_timeout_ms_.load(std::memory_order::memory_order_relaxed);
}

// Slow Commands configuration
const std::string GetSlowCmd() {
std::shared_lock l(rwlock_);
Expand Down Expand Up @@ -825,6 +826,7 @@ class PikaConf : public pstd::BaseConf {
}

int64_t cache_maxmemory() { return cache_maxmemory_; }

void SetSlowCmd(const std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
Expand All @@ -841,6 +843,40 @@ class PikaConf : public pstd::BaseConf {
pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_);
}

void SetInternalUsedUnFinishedFullSync(const std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
pstd::StringToLower(lower_value);
TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value);
pstd::StringSplit2Set(lower_value, ',', internal_used_unfinished_full_sync_);
}

void AddInternalUsedUnfinishedFullSync(const std::string& db_name) {
{
std::lock_guard l(rwlock_);
internal_used_unfinished_full_sync_.insert(db_name);
std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ',');
pstd::StringToLower(lower_value);
TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value);
}
ConfigRewrite();
}

void RemoveInternalUsedUnfinishedFullSync(const std::string& db_name) {
{
std::lock_guard l(rwlock_);
internal_used_unfinished_full_sync_.erase(db_name);
std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ',');
pstd::StringToLower(lower_value);
TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value);
}
ConfigRewrite();
}

size_t GetUnfinishedFullSyncCount() {
std::shared_lock l(rwlock_);
return internal_used_unfinished_full_sync_.size();
}
void SetCacheType(const std::string &value);
void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; }
int zset_cache_start_direction() { return zset_cache_start_direction_; }
Expand Down Expand Up @@ -1014,6 +1050,9 @@ class PikaConf : public pstd::BaseConf {
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
std::atomic_int64_t rsync_timeout_ms_ = 1000;

//Internal used metrics Persisted by pika.conf
std::unordered_set<std::string> internal_used_unfinished_full_sync_;
};

#endif
10 changes: 10 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ void InfoCmd::InfoReplication(std::string& info) {
std::stringstream tmp_stream;
std::stringstream out_of_sync;
std::stringstream repl_connect_status;
int32_t syncing_full_count = 0;
bool all_db_sync = true;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->GetDB()) {
Expand All @@ -1077,6 +1078,7 @@ void InfoCmd::InfoReplication(std::string& info) {
} else if (slave_db->State() == ReplState::kWaitDBSync) {
out_of_sync << "WaitDBSync)";
repl_connect_status << "syncing_full";
++syncing_full_count;
} else if (slave_db->State() == ReplState::kError) {
out_of_sync << "Error)";
repl_connect_status << "error";
Expand Down Expand Up @@ -1151,6 +1153,13 @@ void InfoCmd::InfoReplication(std::string& info) {
<< slaves_list_str;
}

//if current instance is syncing full or has full sync corrupted, it's not qualified to be a new master
if (syncing_full_count == 0 && g_pika_conf->GetUnfinishedFullSyncCount() == 0) {
tmp_stream << "is_eligible_for_master_election:true" << "\r\n";
} else {
tmp_stream << "is_eligible_for_master_election:false" << "\r\n";
}

Status s;
uint32_t filenum = 0;
uint64_t offset = 0;
Expand Down Expand Up @@ -3321,6 +3330,7 @@ void ClearReplicationIDCmd::DoInitial() {

void ClearReplicationIDCmd::Do() {
g_pika_conf->SetReplicationID("");
g_pika_conf->SetInternalUsedUnFinishedFullSync("");
g_pika_conf->ConfigRewriteReplicationID();
res_.SetRes(CmdRes::kOk, "ReplicationID is cleared");
}
Expand Down
11 changes: 11 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ int PikaConf::Load() {
SetAdminCmd(admin_cmd_list);
}

std::string unfinished_full_sync;
GetConfStr("internal-used-unfinished-full-sync", &unfinished_full_sync);
if (replication_id_.empty()) {
unfinished_full_sync.clear();
}
SetInternalUsedUnFinishedFullSync(unfinished_full_sync);


GetConfInt("sync-thread-num", &sync_thread_num_);
if (sync_thread_num_ <= 0) {
sync_thread_num_ = 3;
Expand Down Expand Up @@ -701,6 +709,7 @@ int PikaConf::Load() {
} else {
rsync_timeout_ms_.store(tmp_rsync_timeout_ms);
}

return ret;
}

Expand Down Expand Up @@ -774,6 +783,7 @@ int PikaConf::ConfigRewrite() {
SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_);
SetConfInt("slave-priority", slave_priority_);
SetConfInt("throttle-bytes-per-second", throttle_bytes_per_second_);
SetConfStr("internal-used-unfinished-full-sync", pstd::Set2String(internal_used_unfinished_full_sync_, ','));
SetConfInt("max-rsync-parallel-num", max_rsync_parallel_num_);
SetConfInt("sync-window-size", sync_window_size_.load());
SetConfInt("consensus-level", consensus_level_.load());
Expand Down Expand Up @@ -828,6 +838,7 @@ int PikaConf::ConfigRewrite() {
int PikaConf::ConfigRewriteReplicationID() {
std::lock_guard l(rwlock_);
SetConfStr("replication-id", replication_id_);
SetConfStr("internal-used-unfinished-full-sync", pstd::Set2String(internal_used_unfinished_full_sync_, ','));
if (!diff_commands_.empty()) {
std::vector<pstd::BaseConf::Rep::ConfItem> filtered_items;
for (const auto& diff_command : diff_commands_) {
Expand Down
4 changes: 4 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ bool DB::TryUpdateMasterOffset() {
}
master_db->Logger()->SetProducerStatus(filenum, offset);
slave_db->SetReplState(ReplState::kTryConnect);

//now full sync is finished, remove unfinished full sync count
g_pika_conf->RemoveInternalUsedUnfinishedFullSync(slave_db->DBName());

return true;
}

Expand Down
3 changes: 3 additions & 0 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
slave_db->StopRsync();
slave_db->SetReplState(ReplState::kWaitDBSync);
LOG(INFO) << "DB: " << db_name << " Need Wait To Sync";

//now full sync is starting, add an unfinished full sync count
g_pika_conf->AddInternalUsedUnfinishedFullSync(slave_db->DBName());
}

void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
Expand Down

0 comments on commit 057812f

Please sign in to comment.