Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: check function return Status marked as [[nodiscard]] #1214

Merged
merged 2 commits into from
Jan 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -177,8 +177,7 @@ add_library(kvrocks_objs OBJECT ${KVROCKS_SRCS})

target_include_directories(kvrocks_objs PUBLIC src src/common ${PROJECT_BINARY_DIR})
target_compile_features(kvrocks_objs PUBLIC cxx_std_17)
# TODO: Add -Werror=unused-result to compile options
target_compile_options(kvrocks_objs PUBLIC -Wall -Wpedantic -Wsign-compare -Wreturn-type -fno-omit-frame-pointer)
target_compile_options(kvrocks_objs PUBLIC -Wall -Wpedantic -Wsign-compare -Wreturn-type -fno-omit-frame-pointer -Werror=unused-result)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
target_compile_options(kvrocks_objs PUBLIC -Wno-pedantic)
elseif((CMAKE_CXX_COMPILER_ID STREQUAL "Clang") OR (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang"))
34 changes: 22 additions & 12 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@ Status Cluster::SetNodeId(const std::string &node_id) {
}

// Set replication relationship
if (myself_ != nullptr) SetMasterSlaveRepl();
if (myself_) return SetMasterSlaveRepl();

return Status::OK();
}
@@ -217,7 +217,12 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}

// Set replication relationship
if (myself_ != nullptr) SetMasterSlaveRepl();
if (myself_) {
s = SetMasterSlaveRepl();
if (!s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}
}

// Clear data of migrated slots
if (!migrated_slots_.empty()) {
@@ -235,26 +240,31 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}

// Set replication relationship by cluster topology setting
void Cluster::SetMasterSlaveRepl() {
if (!svr_) return;
Status Cluster::SetMasterSlaveRepl() {
if (!svr_) return Status::OK();

if (myself_ == nullptr) return;
if (!myself_) return Status::OK();

if (myself_->role_ == kClusterMaster) {
// Master mode
svr_->RemoveMaster();
auto s = svr_->RemoveMaster();
if (!s.IsOK()) {
return s.Prefixed("failed to remove master");
}
LOG(INFO) << "MASTER MODE enabled by cluster topology setting";
} else if (nodes_.find(myself_->master_id_) != nodes_.end()) {
// Slave mode and master node is existing
// Replica mode and master node is existing
std::shared_ptr<ClusterNode> master = nodes_[myself_->master_id_];
Status s = svr_->AddMaster(master->host_, master->port_, false);
if (s.IsOK()) {
LOG(INFO) << "SLAVE OF " << master->host_ << ":" << master->port_ << " enabled by cluster topology setting";
} else {
auto s = svr_->AddMaster(master->host_, master->port_, false);
if (!s.IsOK()) {
LOG(WARNING) << "SLAVE OF " << master->host_ << ":" << master->port_
<< " enabled by cluster topology setting, encounter error: " << s.Msg();
<< " wasn't enabled by cluster topology setting, encounter error: " << s.Msg();
return s.Prefixed("failed to add master");
}
LOG(INFO) << "SLAVE OF " << master->host_ << ":" << master->port_ << " enabled by cluster topology setting";
}

return Status::OK();
}

bool Cluster::IsNotMaster() { return myself_ == nullptr || myself_->role_ != kClusterMaster || svr_->IsSlave(); }
2 changes: 1 addition & 1 deletion src/cluster/cluster.h
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@ class Cluster {
bool IsWriteForbiddenSlot(int slot);
Status CanExecByMySelf(const Redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
Redis::Connection *conn);
void SetMasterSlaveRepl();
Status SetMasterSlaveRepl();
Status MigrateSlot(int slot, const std::string &dst_node_id);
Status ImportSlot(Redis::Connection *conn, int slot, int state);
std::string GetMyId() const { return myid_; }
38 changes: 28 additions & 10 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
@@ -55,12 +55,16 @@ Status FeedSlaveThread::Start() {
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &mask, &omask);
Util::SockSend(conn_->GetFD(), "+OK\r\n");
auto s = Util::SockSend(conn_->GetFD(), "+OK\r\n");
if (!s.IsOK()) {
LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
return;
}
this->loop();
});
} catch (const std::system_error &e) {
conn_ = nullptr; // prevent connection was freed when failed to start the thread
return Status(Status::NotOK, e.what());
return {Status::NotOK, e.what()};
}
return Status::OK();
}
@@ -544,7 +548,13 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
<< Util::StringToHex(bulk_string);
return CBState::RESTART;
}
self->ParseWriteBatch(bulk_string);

s = self->ParseWriteBatch(bulk_string);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << Util::StringToHex(bulk_string)
<< ": " << s.Msg();
return CBState::RESTART;
}
}
evbuffer_drain(input, self->incr_bulk_len_ + 2);
self->incr_state_ = Incr_batch_size;
@@ -612,7 +622,12 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, v
if (evbuffer_get_length(input) < self->fullsync_filesize_) {
return CBState::AGAIN;
}
meta = Engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_, self->fullsync_meta_id_, input);
auto s =
Engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_, self->fullsync_meta_id_, input, &meta);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to parse meta and save: " << s.Msg();
return CBState::AGAIN;
}
target_dir = self->srv_->GetConfig()->backup_sync_dir;
} else {
// Master using new version
@@ -895,13 +910,13 @@ void ReplicationThread::EventTimerCB(int, int16_t, void *ctx) {
}
}

rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_string) {
Status ReplicationThread::ParseWriteBatch(const std::string &batch_string) {
rocksdb::WriteBatch write_batch(batch_string);
WriteBatchHandler write_batch_handler;
rocksdb::Status status;

status = write_batch.Iterate(&write_batch_handler);
if (!status.ok()) return status;
auto db_status = write_batch.Iterate(&write_batch_handler);
if (!db_status.ok()) return {Status::NotOK, "failed to iterate over write batch: " + db_status.ToString()};

switch (write_batch_handler.Type()) {
case kBatchTypePublish:
srv_->PublishMessage(write_batch_handler.Key(), write_batch_handler.Value());
@@ -910,7 +925,10 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri
if (write_batch_handler.Key() == Engine::kPropagateScriptCommand) {
std::vector<std::string> tokens = Util::TokenizeRedisProtocol(write_batch_handler.Value());
if (!tokens.empty()) {
srv_->ExecPropagatedCommand(tokens);
auto s = srv_->ExecPropagatedCommand(tokens);
if (!s.IsOK()) {
return s.Prefixed("failed to execute propagate command");
}
}
}
break;
@@ -927,7 +945,7 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri
case kBatchTypeNone:
break;
}
return rocksdb::Status::OK();
return Status::OK();
}

bool ReplicationThread::isRestoringError(const char *err) {
2 changes: 1 addition & 1 deletion src/cluster/replication.h
Original file line number Diff line number Diff line change
@@ -196,7 +196,7 @@ class ReplicationThread {

static void EventTimerCB(int, int16_t, void *ctx);

rocksdb::Status ParseWriteBatch(const std::string &batch_string);
Status ParseWriteBatch(const std::string &batch_string);
};

/*
93 changes: 59 additions & 34 deletions src/commands/redis_cmd.cc
Original file line number Diff line number Diff line change
@@ -4143,32 +4143,36 @@ class CommandSlaveOf : public Commander {
return Status::OK();
}

Status s;
if (host_.empty()) {
s = svr->RemoveMaster();
if (s.IsOK()) {
*output = Redis::SimpleString("OK");
LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')";
if (svr->GetConfig()->cluster_enabled) {
svr->slot_migrate_->SetMigrateStopFlag(false);
LOG(INFO) << "Change server role to master, restart migration task";
}
auto s = svr->RemoveMaster();
if (!s.IsOK()) {
return s.Prefixed("failed to remove master");
}
} else {
s = svr->AddMaster(host_, port_, false);
if (s.IsOK()) {
*output = Redis::SimpleString("OK");
LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr()
<< "')";
if (svr->GetConfig()->cluster_enabled) {
svr->slot_migrate_->SetMigrateStopFlag(true);
LOG(INFO) << "Change server role to slave, stop migration task";
}
} else {
LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ << " (user request from '" << conn->GetAddr()
<< "') encounter error: " << s.Msg();

*output = Redis::SimpleString("OK");
LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')";
if (svr->GetConfig()->cluster_enabled) {
svr->slot_migrate_->SetMigrateStopFlag(false);
LOG(INFO) << "Change server role to master, restart migration task";
}

return Status::OK();
}

auto s = svr->AddMaster(host_, port_, false);
if (s.IsOK()) {
*output = Redis::SimpleString("OK");
LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr()
<< "')";
if (svr->GetConfig()->cluster_enabled) {
svr->slot_migrate_->SetMigrateStopFlag(true);
LOG(INFO) << "Change server role to slave, stop migration task";
}
} else {
LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ << " (user request from '" << conn->GetAddr()
<< "') encounter error: " << s.Msg();
}

return s;
}

@@ -4249,19 +4253,26 @@ class CommandPSync : public Commander {
// be taken over, so should never trigger any event in worker thread.
conn->Detach();
conn->EnableFlag(Redis::Connection::kSlave);
Util::SockSetBlocking(conn->GetFD(), 1);
auto s = Util::SockSetBlocking(conn->GetFD(), 1);
if (!s.IsOK()) {
conn->EnableFlag(Redis::Connection::kCloseAsync);
return s.Prefixed("failed to set blocking mode on socket");
}

svr->stats_.IncrPSyncOKCounter();
Status s = svr->AddSlave(conn, next_repl_seq);
s = svr->AddSlave(conn, next_repl_seq);
if (!s.IsOK()) {
std::string err = "-ERR " + s.Msg() + "\r\n";
Util::SockSend(conn->GetFD(), err);
s = Util::SockSend(conn->GetFD(), err);
if (!s.IsOK()) {
LOG(WARNING) << "failed to send error message to the replica: " << s.Msg();
}
conn->EnableFlag(Redis::Connection::kCloseAsync);
LOG(WARNING) << "Failed to add salve: " << conn->GetAddr() << " to start increment syncing";
LOG(WARNING) << "Failed to add replica: " << conn->GetAddr() << " to start incremental syncing";
} else {
LOG(INFO) << "New slave: " << conn->GetAddr() << " was added, start increment syncing";
LOG(INFO) << "New replica: " << conn->GetAddr() << " was added, start incremental syncing";
}
return Status::OK();
return s;
}

private:
@@ -4962,7 +4973,11 @@ class CommandFetchMeta : public Commander {
int repl_fd = conn->GetFD();
std::string ip = conn->GetIP();

Util::SockSetBlocking(repl_fd, 1);
auto s = Util::SockSetBlocking(repl_fd, 1);
if (!s.IsOK()) {
return s.Prefixed("failed to set blocking mode on socket");
}

conn->NeedNotClose();
conn->EnableFlag(Redis::Connection::kCloseAsync);
svr->stats_.IncrFullSyncCounter();
@@ -4975,9 +4990,11 @@ class CommandFetchMeta : public Commander {
std::string files;
auto s = Engine::Storage::ReplDataManager::GetFullReplDataInfo(svr->storage_, &files);
if (!s.IsOK()) {
Util::SockSend(repl_fd, "-ERR can't create db checkpoint");
LOG(WARNING) << "[replication] Failed to get full data file info,"
<< " error: " << s.Msg();
s = Util::SockSend(repl_fd, "-ERR can't create db checkpoint");
if (!s.IsOK()) {
LOG(WARNING) << "[replication] Failed to send error response: " << s.Msg();
}
LOG(WARNING) << "[replication] Failed to get full data file info: " << s.Msg();
return;
}
// Send full data file info
@@ -5008,7 +5025,11 @@ class CommandFetchFile : public Commander {
int repl_fd = conn->GetFD();
std::string ip = conn->GetIP();

Util::SockSetBlocking(repl_fd, 1);
auto s = Util::SockSetBlocking(repl_fd, 1);
if (!s.IsOK()) {
return s.Prefixed("failed to set blocking mode on socket");
}

conn->NeedNotClose(); // Feed-replica-file thread will close the replica fd
conn->EnableFlag(Redis::Connection::kCloseAsync);

@@ -5351,7 +5372,11 @@ class CommandScript : public Commander {

if (args_.size() == 2 && subcommand_ == "flush") {
svr->ScriptFlush();
svr->Propagate(Engine::kPropagateScriptCommand, args_);
auto s = svr->Propagate(Engine::kPropagateScriptCommand, args_);
if (!s.IsOK()) {
LOG(ERROR) << "Failed to propagate script command: " << s.Msg();
return s;
}
*output = Redis::SimpleString("OK");
} else if (args_.size() >= 2 && subcommand_ == "exists") {
*output = Redis::MultiLen(args_.size() - 2);
10 changes: 8 additions & 2 deletions src/config/config.cc
Original file line number Diff line number Diff line change
@@ -605,7 +605,10 @@ void Config::SetMaster(const std::string &host, uint32_t port) {
master_port = port;
auto iter = fields_.find("slaveof");
if (iter != fields_.end()) {
iter->second->Set(master_host + " " + std::to_string(master_port));
auto s = iter->second->Set(master_host + " " + std::to_string(master_port));
if (!s.IsOK()) {
LOG(ERROR) << "Failed to set the value of 'slaveof' setting: " << s.Msg();
}
}
}

@@ -614,7 +617,10 @@ void Config::ClearMaster() {
master_port = 0;
auto iter = fields_.find("slaveof");
if (iter != fields_.end()) {
iter->second->Set("no one");
auto s = iter->second->Set("no one");
if (!s.IsOK()) {
LOG(ERROR) << "Failed to clear the value of 'slaveof' setting: " << s.Msg();
}
}
}

5 changes: 4 additions & 1 deletion src/main.cc
Original file line number Diff line number Diff line change
@@ -265,7 +265,10 @@ static Status createPidFile(const std::string &path) {
return Status(Status::NotOK, strerror(errno));
}
std::string pid_str = std::to_string(getpid());
Util::Write(*fd, pid_str);
auto s = Util::Write(*fd, pid_str);
if (!s.IsOK()) {
return s.Prefixed("failed to write to PID-file");
}
return Status::OK();
}

Loading