Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:delete_consensus_level #1852

Merged
merged 2 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class SyncProgress {
int SlaveSize();

private:
LogOffset InternalCalCommittedIndex(const std::unordered_map<std::string, LogOffset>& match_index);

std::shared_mutex rwlock_;
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves_;
Expand Down Expand Up @@ -128,7 +127,6 @@ class ConsensusCoordinator {

// invoked by follower
pstd::Status ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status ProcessLocalUpdate(const LogOffset& leader_commit);

// Negotiate
pstd::Status LeaderNegotiate(const LogOffset& f_last_offset, bool* reject, std::vector<LogOffset>* hints);
Expand Down Expand Up @@ -191,7 +189,6 @@ class ConsensusCoordinator {
pstd::Status InternalAppendBinlog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, LogOffset* log_offset);
void InternalApply(const MemLog::LogItem& log);
void InternalApplyFollower(const MemLog::LogItem& log);
bool InternalUpdateCommittedIndex(const LogOffset& slave_committed_index, LogOffset* updated_committed_index);

pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, const BinlogOffset& end_offset,
Expand Down
1 change: 0 additions & 1 deletion include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class SyncMasterSlot : public SyncSlot {
pstd::Status ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status ConsensusSanityCheck();
pstd::Status ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status ConsensusProcessLocalUpdate(const LogOffset& leader_commit);
LogOffset ConsensusCommittedIndex();
LogOffset ConsensusLastIndex();
uint32_t ConsensusTerm();
Expand Down
2 changes: 0 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ class PikaServer : public pstd::noncopyable {
bool leader_protected_mode();
void CheckLeaderProtectedMode();
bool readonly(const std::string& table, const std::string& key);
bool ConsensusCheck(const std::string& db_name, const std::string& key);
int repl_state();
std::string repl_state_str();
bool force_full_sync();
Expand All @@ -179,7 +178,6 @@ class PikaServer : public pstd::noncopyable {
bool IsCompacting();
bool IsDBExist(const std::string& db_name);
bool IsDBSlotExist(const std::string& db_name, uint32_t slot_id);
bool IsCommandSupport(const std::string& command);
bool IsDBBinlogIoError(const std::string& db_name);
pstd::Status DoSameThingSpecificDB(const TaskType& type, const std::set<std::string>& dbs = {});

Expand Down
13 changes: 0 additions & 13 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,6 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
return c_ptr;
}
}
if (g_pika_conf->consensus_level() != 0 && c_ptr->is_write()) {
c_ptr->SetStage(Cmd::kBinlogStage);
}
if (!g_pika_server->IsCommandSupport(opt)) {
c_ptr->res().SetRes(CmdRes::kErrOther, "This command is not supported in current configuration");
return c_ptr;
}

// reject all the request before new master sync finished
if (g_pika_server->leader_protected_mode()) {
Expand All @@ -110,9 +103,6 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->res().SetRes(CmdRes::kErrOther, "Server in read-only");
return c_ptr;
}
if (!g_pika_server->ConsensusCheck(current_db_, cur_key.front())) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Consensus level not match");
}
}

// Process Command
Expand All @@ -125,9 +115,6 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
if (g_pika_conf->slowlog_slower_than() >= 0) {
ProcessSlowlog(argv, start_us, c_ptr->GetDoDuration());
}
if (g_pika_conf->consensus_level() != 0 && c_ptr->is_write()) {
c_ptr->SetStage(Cmd::kExecuteStage);
}

return c_ptr;
}
Expand Down
69 changes: 1 addition & 68 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ Status SyncProgress::Update(const std::string& ip, int port, const LogOffset& st
match_index_[ip + std::to_string(port)] = acked_offset;
}

// if consensus_level == 0 LogOffset() return
*committed_index = InternalCalCommittedIndex(GetAllMatchIndex());
return Status::OK();
}

Expand All @@ -168,24 +166,6 @@ int SyncProgress::SlaveSize() {
return static_cast<int32_t>(slaves_.size());
}

LogOffset SyncProgress::InternalCalCommittedIndex(const std::unordered_map<std::string, LogOffset>& match_index) {
int consensus_level = g_pika_conf->consensus_level();
if (consensus_level == 0) {
return {};
}
if (static_cast<int>(match_index.size()) < consensus_level) {
return {};
}
std::vector<LogOffset> offsets;
offsets.reserve(match_index.size());
for (const auto& index : match_index) {
offsets.push_back(index.second);
}
std::sort(offsets.begin(), offsets.end());
LogOffset offset = offsets[offsets.size() - consensus_level];
return offset;
}

/* MemLog */

MemLog::MemLog() = default;
Expand Down Expand Up @@ -432,15 +412,7 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_pt
Status s = InternalAppendLog(attribute, cmd_ptr, nullptr, nullptr);
stable_logger_->Logger()->Unlock();

if (g_pika_conf->consensus_level() == 0) {
InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr));
return Status::OK();
}

return Status::OK();
}

Status ConsensusCoordinator::ProcessLocalUpdate(const LogOffset& leader_commit) {
InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr));
return Status::OK();
}

Expand All @@ -452,48 +424,9 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const
return s;
}

if (g_pika_conf->consensus_level() == 0) {
return Status::OK();
}

// do not commit log which is not current term log
if (committed_index.l_offset.term != term()) {
LOG_EVERY_N(INFO, 1000) << "Will not commit log term which is not equals to current term" // NOLINT
<< " To updated committed_index" << committed_index.ToString() << " current term " << term()
<< " from " << ip << " " << port << " start " << start.ToString() << " end "
<< end.ToString();
return Status::OK();
}

LogOffset updated_committed_index;
bool need_update = false;
{
std::lock_guard l(index_mu_);
need_update = InternalUpdateCommittedIndex(committed_index, &updated_committed_index);
}
if (need_update) {
s = ScheduleApplyLog(updated_committed_index);
// updateslave could be invoked by many thread
// not found means a late offset pass in ScheduleApplyLog
// an early offset is not found
if (!s.ok() && !s.IsNotFound()) {
return s;
}
}

return Status::OK();
}

bool ConsensusCoordinator::InternalUpdateCommittedIndex(const LogOffset& slave_committed_index,
LogOffset* updated_committed_index) {
if (slave_committed_index <= committed_index_) {
return false;
}
committed_index_ = slave_committed_index;
*updated_committed_index = slave_committed_index;
return true;
}

Status ConsensusCoordinator::InternalAppendBinlog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr,
LogOffset* log_offset) {
std::string binlog =
Expand Down
2 changes: 1 addition & 1 deletion src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
LogOffset leader_commit;
ParseBinlogOffset(res->consensus_meta().commit(), &leader_commit);
// Update follower commit && apply
slot->ConsensusProcessLocalUpdate(leader_commit);
return;
}

LogOffset ack_end;
Expand Down
5 changes: 0 additions & 5 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,16 +486,11 @@ Status SyncMasterSlot::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr)
return coordinator_.ProposeLog(cmd_ptr);
}

Status SyncMasterSlot::ConsensusSanityCheck() { return coordinator_.CheckEnoughFollower(); }

Status SyncMasterSlot::ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
return coordinator_.ProcessLeaderLog(cmd_ptr, attribute);
}

Status SyncMasterSlot::ConsensusProcessLocalUpdate(const LogOffset& leader_commit) {
return coordinator_.ProcessLocalUpdate(leader_commit);
}

LogOffset SyncMasterSlot::ConsensusCommittedIndex() { return coordinator_.committed_index(); }

LogOffset SyncMasterSlot::ConsensusLastIndex() { return coordinator_.MemLogger()->last_offset(); }
Expand Down
8 changes: 0 additions & 8 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,6 @@ bool PikaServer::readonly(const std::string& db_name, const std::string& key) {
return ((role_ & PIKA_ROLE_SLAVE) != 0) && g_pika_conf->slave_read_only();
}

bool PikaServer::ConsensusCheck(const std::string& db_name, const std::string& key) {
return true;
}

int PikaServer::repl_state() {
std::shared_lock l(state_protector_);
return repl_state_;
Expand Down Expand Up @@ -430,10 +426,6 @@ bool PikaServer::IsDBSlotExist(const std::string& db_name, uint32_t slot_id) {
}
}

bool PikaServer::IsCommandSupport(const std::string& command) {
return true;
}

bool PikaServer::IsDBBinlogIoError(const std::string& db_name) {
std::shared_ptr<DB> db = GetDB(db_name);
return db ? db->IsBinlogIoError() : true;
Expand Down