From a06f2304ec825d4b63bb051337b4ce83f39f9b1c Mon Sep 17 00:00:00 2001 From: Yaroslav Stepanchuk Date: Thu, 29 Dec 2022 10:17:50 +0200 Subject: [PATCH 1/2] Refactoring: if the function returns Status marked as [[nodiscard]], check it --- src/cluster/cluster.cc | 34 +++++++---- src/cluster/cluster.h | 2 +- src/cluster/replication.cc | 38 ++++++++---- src/cluster/replication.h | 2 +- src/commands/redis_cmd.cc | 93 ++++++++++++++++++----------- src/config/config.cc | 10 +++- src/main.cc | 5 +- src/server/server.cc | 46 ++++++++------ src/server/server.h | 6 +- src/server/worker.cc | 22 ++++--- src/storage/batch_extractor.cc | 4 +- src/storage/scripting.cc | 3 +- src/storage/storage.cc | 40 ++++++------- src/storage/storage.h | 9 +-- tests/cppunit/config_test.cc | 56 ++++++++++------- tests/cppunit/cron_test.cc | 5 +- utils/kvrocks2redis/main.cc | 5 +- utils/kvrocks2redis/redis_writer.cc | 27 ++++++--- utils/kvrocks2redis/sync.cc | 14 +++-- 19 files changed, 267 insertions(+), 154 deletions(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 515123b6aae..f89906bc183 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -90,7 +90,7 @@ Status Cluster::SetNodeId(const std::string &node_id) { } // Set replication relationship - if (myself_ != nullptr) SetMasterSlaveRepl(); + if (myself_) return SetMasterSlaveRepl(); return Status::OK(); } @@ -217,7 +217,12 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b } // Set replication relationship - if (myself_ != nullptr) SetMasterSlaveRepl(); + if (myself_) { + s = SetMasterSlaveRepl(); + if (!s.IsOK()) { + return s.Prefixed("failed to set master-replica replication"); + } + } // Clear data of migrated slots if (!migrated_slots_.empty()) { @@ -235,26 +240,31 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b } // Set replication relationship by cluster topology setting -void Cluster::SetMasterSlaveRepl() { - if (!svr_) return; +Status Cluster::SetMasterSlaveRepl() { + if (!svr_) return Status::OK(); - if (myself_ == nullptr) return; + if (!myself_) return Status::OK(); if (myself_->role_ == kClusterMaster) { // Master mode - svr_->RemoveMaster(); + auto s = svr_->RemoveMaster(); + if (!s.IsOK()) { + return s.Prefixed("failed to remove master"); + } LOG(INFO) << "MASTER MODE enabled by cluster topology setting"; } else if (nodes_.find(myself_->master_id_) != nodes_.end()) { - // Slave mode and master node is existing + // Replica mode and master node is existing std::shared_ptr master = nodes_[myself_->master_id_]; - Status s = svr_->AddMaster(master->host_, master->port_, false); - if (s.IsOK()) { - LOG(INFO) << "SLAVE OF " << master->host_ << ":" << master->port_ << " enabled by cluster topology setting"; - } else { + auto s = svr_->AddMaster(master->host_, master->port_, false); + if (!s.IsOK()) { LOG(WARNING) << "SLAVE OF " << master->host_ << ":" << master->port_ - << " enabled by cluster topology setting, encounter error: " << s.Msg(); + << " wasn't enabled by cluster topology setting, encounter error: " << s.Msg(); + return s.Prefixed("failed to add master"); } + LOG(INFO) << "SLAVE OF " << master->host_ << ":" << master->port_ << " enabled by cluster topology setting"; } + + return Status::OK(); } bool Cluster::IsNotMaster() { return myself_ == nullptr || myself_->role_ != kClusterMaster || svr_->IsSlave(); } diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index 7651a9a0dd9..d947d8c36dd 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -90,7 +90,7 @@ class Cluster { bool IsWriteForbiddenSlot(int slot); Status CanExecByMySelf(const Redis::CommandAttributes *attributes, const std::vector &cmd_tokens, Redis::Connection *conn); - void SetMasterSlaveRepl(); + Status SetMasterSlaveRepl(); Status MigrateSlot(int slot, const std::string &dst_node_id); Status ImportSlot(Redis::Connection *conn, int slot, int state); std::string GetMyId() const { return myid_; } diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index 01dca57f850..415cd3988f9 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -55,12 +55,16 @@ Status FeedSlaveThread::Start() { sigaddset(&mask, SIGHUP); sigaddset(&mask, SIGPIPE); pthread_sigmask(SIG_BLOCK, &mask, &omask); - Util::SockSend(conn_->GetFD(), "+OK\r\n"); + auto s = Util::SockSend(conn_->GetFD(), "+OK\r\n"); + if (!s.IsOK()) { + LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg(); + return; + } this->loop(); }); } catch (const std::system_error &e) { conn_ = nullptr; // prevent connection was freed when failed to start the thread - return Status(Status::NotOK, e.what()); + return {Status::NotOK, e.what()}; } return Status::OK(); } @@ -544,7 +548,13 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent * << Util::StringToHex(bulk_string); return CBState::RESTART; } - self->ParseWriteBatch(bulk_string); + + s = self->ParseWriteBatch(bulk_string); + if (!s.IsOK()) { + LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << Util::StringToHex(bulk_string) + << ": " << s.Msg(); + return CBState::RESTART; + } } evbuffer_drain(input, self->incr_bulk_len_ + 2); self->incr_state_ = Incr_batch_size; @@ -612,7 +622,12 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, v if (evbuffer_get_length(input) < self->fullsync_filesize_) { return CBState::AGAIN; } - meta = Engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_, self->fullsync_meta_id_, input); + auto s = + Engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_, self->fullsync_meta_id_, input, &meta); + if (!s.IsOK()) { + LOG(ERROR) << "[replication] Failed to parse meta and save: " << s.Msg(); + return CBState::AGAIN; + } target_dir = self->srv_->GetConfig()->backup_sync_dir; } else { // Master using new version @@ -895,13 +910,13 @@ void ReplicationThread::EventTimerCB(int, int16_t, void *ctx) { } } -rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_string) { +Status ReplicationThread::ParseWriteBatch(const std::string &batch_string) { rocksdb::WriteBatch write_batch(batch_string); WriteBatchHandler write_batch_handler; - rocksdb::Status status; - status = write_batch.Iterate(&write_batch_handler); - if (!status.ok()) return status; + auto db_status = write_batch.Iterate(&write_batch_handler); + if (!db_status.ok()) return {Status::NotOK, "failed to iterate over write batch: " + db_status.ToString()}; + switch (write_batch_handler.Type()) { case kBatchTypePublish: srv_->PublishMessage(write_batch_handler.Key(), write_batch_handler.Value()); @@ -910,7 +925,10 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri if (write_batch_handler.Key() == Engine::kPropagateScriptCommand) { std::vector tokens = Util::TokenizeRedisProtocol(write_batch_handler.Value()); if (!tokens.empty()) { - srv_->ExecPropagatedCommand(tokens); + auto s = srv_->ExecPropagatedCommand(tokens); + if (!s.IsOK()) { + return s.Prefixed("failed to execute propagate command"); + } } } break; @@ -927,7 +945,7 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri case kBatchTypeNone: break; } - return rocksdb::Status::OK(); + return Status::OK(); } bool ReplicationThread::isRestoringError(const char *err) { diff --git a/src/cluster/replication.h b/src/cluster/replication.h index a96cf4c1bdc..c02d6806430 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -196,7 +196,7 @@ class ReplicationThread { static void EventTimerCB(int, int16_t, void *ctx); - rocksdb::Status ParseWriteBatch(const std::string &batch_string); + Status ParseWriteBatch(const std::string &batch_string); }; /* diff --git a/src/commands/redis_cmd.cc b/src/commands/redis_cmd.cc index 5fc20a7b33f..4f72c8592a1 100644 --- a/src/commands/redis_cmd.cc +++ b/src/commands/redis_cmd.cc @@ -4143,32 +4143,36 @@ class CommandSlaveOf : public Commander { return Status::OK(); } - Status s; if (host_.empty()) { - s = svr->RemoveMaster(); - if (s.IsOK()) { - *output = Redis::SimpleString("OK"); - LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')"; - if (svr->GetConfig()->cluster_enabled) { - svr->slot_migrate_->SetMigrateStopFlag(false); - LOG(INFO) << "Change server role to master, restart migration task"; - } + auto s = svr->RemoveMaster(); + if (!s.IsOK()) { + return s.Prefixed("failed to remove master"); } - } else { - s = svr->AddMaster(host_, port_, false); - if (s.IsOK()) { - *output = Redis::SimpleString("OK"); - LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr() - << "')"; - if (svr->GetConfig()->cluster_enabled) { - svr->slot_migrate_->SetMigrateStopFlag(true); - LOG(INFO) << "Change server role to slave, stop migration task"; - } - } else { - LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ << " (user request from '" << conn->GetAddr() - << "') encounter error: " << s.Msg(); + + *output = Redis::SimpleString("OK"); + LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')"; + if (svr->GetConfig()->cluster_enabled) { + svr->slot_migrate_->SetMigrateStopFlag(false); + LOG(INFO) << "Change server role to master, restart migration task"; + } + + return Status::OK(); + } + + auto s = svr->AddMaster(host_, port_, false); + if (s.IsOK()) { + *output = Redis::SimpleString("OK"); + LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr() + << "')"; + if (svr->GetConfig()->cluster_enabled) { + svr->slot_migrate_->SetMigrateStopFlag(true); + LOG(INFO) << "Change server role to slave, stop migration task"; } + } else { + LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ << " (user request from '" << conn->GetAddr() + << "') encounter error: " << s.Msg(); } + return s; } @@ -4249,19 +4253,26 @@ class CommandPSync : public Commander { // be taken over, so should never trigger any event in worker thread. conn->Detach(); conn->EnableFlag(Redis::Connection::kSlave); - Util::SockSetBlocking(conn->GetFD(), 1); + auto s = Util::SockSetBlocking(conn->GetFD(), 1); + if (!s.IsOK()) { + conn->EnableFlag(Redis::Connection::kCloseAsync); + return s.Prefixed("failed to set blocking mode on socket"); + } svr->stats_.IncrPSyncOKCounter(); - Status s = svr->AddSlave(conn, next_repl_seq); + s = svr->AddSlave(conn, next_repl_seq); if (!s.IsOK()) { std::string err = "-ERR " + s.Msg() + "\r\n"; - Util::SockSend(conn->GetFD(), err); + s = Util::SockSend(conn->GetFD(), err); + if (!s.IsOK()) { + LOG(WARNING) << "failed to send error message to the replica: " << s.Msg(); + } conn->EnableFlag(Redis::Connection::kCloseAsync); - LOG(WARNING) << "Failed to add salve: " << conn->GetAddr() << " to start increment syncing"; + LOG(WARNING) << "Failed to add replica: " << conn->GetAddr() << " to start incremental syncing"; } else { - LOG(INFO) << "New slave: " << conn->GetAddr() << " was added, start increment syncing"; + LOG(INFO) << "New replica: " << conn->GetAddr() << " was added, start incremental syncing"; } - return Status::OK(); + return s; } private: @@ -4962,7 +4973,11 @@ class CommandFetchMeta : public Commander { int repl_fd = conn->GetFD(); std::string ip = conn->GetIP(); - Util::SockSetBlocking(repl_fd, 1); + auto s = Util::SockSetBlocking(repl_fd, 1); + if (!s.IsOK()) { + return s.Prefixed("failed to set blocking mode on socket"); + } + conn->NeedNotClose(); conn->EnableFlag(Redis::Connection::kCloseAsync); svr->stats_.IncrFullSyncCounter(); @@ -4975,9 +4990,11 @@ class CommandFetchMeta : public Commander { std::string files; auto s = Engine::Storage::ReplDataManager::GetFullReplDataInfo(svr->storage_, &files); if (!s.IsOK()) { - Util::SockSend(repl_fd, "-ERR can't create db checkpoint"); - LOG(WARNING) << "[replication] Failed to get full data file info," - << " error: " << s.Msg(); + s = Util::SockSend(repl_fd, "-ERR can't create db checkpoint"); + if (!s.IsOK()) { + LOG(WARNING) << "[replication] Failed to send error response: " << s.Msg(); + } + LOG(WARNING) << "[replication] Failed to get full data file info: " << s.Msg(); return; } // Send full data file info @@ -5008,7 +5025,11 @@ class CommandFetchFile : public Commander { int repl_fd = conn->GetFD(); std::string ip = conn->GetIP(); - Util::SockSetBlocking(repl_fd, 1); + auto s = Util::SockSetBlocking(repl_fd, 1); + if (!s.IsOK()) { + return s.Prefixed("failed to set blocking mode on socket"); + } + conn->NeedNotClose(); // Feed-replica-file thread will close the replica fd conn->EnableFlag(Redis::Connection::kCloseAsync); @@ -5351,7 +5372,11 @@ class CommandScript : public Commander { if (args_.size() == 2 && subcommand_ == "flush") { svr->ScriptFlush(); - svr->Propagate(Engine::kPropagateScriptCommand, args_); + auto s = svr->Propagate(Engine::kPropagateScriptCommand, args_); + if (!s.IsOK()) { + LOG(ERROR) << "Failed to propagate script command: " << s.Msg(); + return s; + } *output = Redis::SimpleString("OK"); } else if (args_.size() >= 2 && subcommand_ == "exists") { *output = Redis::MultiLen(args_.size() - 2); diff --git a/src/config/config.cc b/src/config/config.cc index 8551bddae6a..8a8cb810506 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -605,7 +605,10 @@ void Config::SetMaster(const std::string &host, uint32_t port) { master_port = port; auto iter = fields_.find("slaveof"); if (iter != fields_.end()) { - iter->second->Set(master_host + " " + std::to_string(master_port)); + auto s = iter->second->Set(master_host + " " + std::to_string(master_port)); + if (!s.IsOK()) { + LOG(ERROR) << "Failed to set the value of 'slaveof' setting: " << s.Msg(); + } } } @@ -614,7 +617,10 @@ void Config::ClearMaster() { master_port = 0; auto iter = fields_.find("slaveof"); if (iter != fields_.end()) { - iter->second->Set("no one"); + auto s = iter->second->Set("no one"); + if (!s.IsOK()) { + LOG(ERROR) << "Failed to clear the value of 'slaveof' setting: " << s.Msg(); + } } } diff --git a/src/main.cc b/src/main.cc index 86d8e33c89a..8efd6a9999c 100644 --- a/src/main.cc +++ b/src/main.cc @@ -265,7 +265,10 @@ static Status createPidFile(const std::string &path) { return Status(Status::NotOK, strerror(errno)); } std::string pid_str = std::to_string(getpid()); - Util::Write(*fd, pid_str); + auto s = Util::Write(*fd, pid_str); + if (!s.IsOK()) { + return s.Prefixed("failed to write to PID-file"); + } return Status::OK(); } diff --git a/src/server/server.cc b/src/server/server.cc index 7b4a545eae1..2a622111ae1 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -129,7 +129,10 @@ Status Server::Start() { if (!s.IsOK()) return s; } else { // Generate new replication id if not a replica - storage_->ShiftReplId(); + auto s = storage_->ShiftReplId(); + if (!s.IsOK()) { + return s.Prefixed("failed to shift replication id"); + } } if (config_->cluster_enabled) { @@ -255,9 +258,11 @@ Status Server::RemoveMaster() { master_host_.clear(); master_port_ = 0; config_->ClearMaster(); - if (replication_thread_) replication_thread_->Stop(); - replication_thread_ = nullptr; - storage_->ShiftReplId(); + if (replication_thread_) { + replication_thread_->Stop(); + replication_thread_ = nullptr; + } + return storage_->ShiftReplId(); } return Status::OK(); } @@ -524,40 +529,43 @@ void Server::UnblockOnStreams(const std::vector &keys, Redis::Conne } } -Status Server::WakeupBlockingConns(const std::string &key, size_t n_conns) { +void Server::WakeupBlockingConns(const std::string &key, size_t n_conns) { std::lock_guard guard(blocking_keys_mu_); auto iter = blocking_keys_.find(key); if (iter == blocking_keys_.end() || iter->second.empty()) { - return Status(Status::NotOK); + return; } + while (n_conns-- && !iter->second.empty()) { auto conn_ctx = iter->second.front(); - conn_ctx->owner->EnableWriteEvent(conn_ctx->fd); + auto s = conn_ctx->owner->EnableWriteEvent(conn_ctx->fd); + if (!s.IsOK()) { + LOG(ERROR) << "failed to enable write event on blocked client " << conn_ctx->fd << ": " << s.Msg(); + } delConnContext(conn_ctx); iter->second.pop_front(); } - return Status::OK(); } -Status Server::OnEntryAddedToStream(const std::string &ns, const std::string &key, - const Redis::StreamEntryID &entry_id) { +void Server::OnEntryAddedToStream(const std::string &ns, const std::string &key, const Redis::StreamEntryID &entry_id) { std::lock_guard guard(blocking_keys_mu_); auto iter = blocked_stream_consumers_.find(key); if (iter == blocked_stream_consumers_.end() || iter->second.empty()) { - return Status(Status::NotOK); + return; } for (auto it = iter->second.begin(); it != iter->second.end();) { auto consumer = *it; if (consumer->ns == ns && entry_id > consumer->last_consumed_id) { - consumer->owner->EnableWriteEvent(consumer->fd); + auto s = consumer->owner->EnableWriteEvent(consumer->fd); + if (!s.IsOK()) { + LOG(ERROR) << "failed to enable write event on blocked stream consumer " << consumer->fd << ": " << s.Msg(); + } it = iter->second.erase(it); } else { ++it; } } - - return Status::OK(); } void Server::delConnContext(ConnContext *c) { @@ -659,7 +667,7 @@ void Server::cron() { // Purge backup if needed, it will cost much disk space if we keep backup and full sync // checkpoints at the same time if (config_->purge_backup_on_fullsync && (storage_->ExistCheckpoint() || storage_->ExistSyncCheckpoint())) { - AsyncPurgeOldBackups(0, 0); + s = AsyncPurgeOldBackups(0, 0); } } @@ -1361,7 +1369,9 @@ void Server::KillClient(int64_t *killed, const std::string &addr, uint64_t id, u if (IsSlave() && (type & kTypeMaster || (!addr.empty() && addr == master_host_ + ":" + std::to_string(master_port_)))) { // Stop replication thread and start a new one to replicate - AddMaster(master_host_, master_port_, true); + if (auto s = AddMaster(master_host_, master_port_, true); !s.IsOK()) { + LOG(ERROR) << "Failed to add master " << master_host_ << ":" << master_port_ << "with error: " << s.Msg(); + } (*killed)++; } } @@ -1402,9 +1412,9 @@ Status Server::ScriptGet(const std::string &sha, std::string *body) { return Status::OK(); } -void Server::ScriptSet(const std::string &sha, const std::string &body) { +Status Server::ScriptSet(const std::string &sha, const std::string &body) { std::string funcname = Engine::kLuaFunctionPrefix + sha; - storage_->WriteToPropagateCF(funcname, body); + return storage_->WriteToPropagateCF(funcname, body); } void Server::ScriptReset() { diff --git a/src/server/server.h b/src/server/server.h index b3a6ab5a0fb..b888b15c1f2 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -152,8 +152,8 @@ class Server { void BlockOnStreams(const std::vector &keys, const std::vector &entry_ids, Redis::Connection *conn); void UnblockOnStreams(const std::vector &keys, Redis::Connection *conn); - Status WakeupBlockingConns(const std::string &key, size_t n_conns); - Status OnEntryAddedToStream(const std::string &ns, const std::string &key, const Redis::StreamEntryID &entry_id); + void WakeupBlockingConns(const std::string &key, size_t n_conns); + void OnEntryAddedToStream(const std::string &ns, const std::string &key, const Redis::StreamEntryID &entry_id); std::string GetLastRandomKeyCursor(); void SetLastRandomKeyCursor(const std::string &cursor); @@ -194,7 +194,7 @@ class Server { lua_State *Lua() { return lua_; } Status ScriptExists(const std::string &sha); Status ScriptGet(const std::string &sha, std::string *body); - void ScriptSet(const std::string &sha, const std::string &body); + Status ScriptSet(const std::string &sha, const std::string &body); void ScriptReset(); void ScriptFlush(); diff --git a/src/server/worker.cc b/src/server/worker.cc index c7f82ebc88f..8121a187c02 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -159,10 +159,13 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock auto conn = new Redis::Connection(bev, worker); bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, Redis::Connection::OnEvent, conn); bufferevent_enable(bev, EV_READ); - Status status = worker->AddConnection(conn); - if (!status.IsOK()) { - std::string err_msg = Redis::Error("ERR " + status.Msg()); - Util::SockSend(fd, err_msg); + s = worker->AddConnection(conn); + if (!s.IsOK()) { + std::string err_msg = Redis::Error("ERR " + s.Msg()); + s = Util::SockSend(fd, err_msg); + if (!s.IsOK()) { + LOG(WARNING) << "Failed to send error response to socket: " << s.Msg(); + } conn->Close(); return; } @@ -187,10 +190,13 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f auto conn = new Redis::Connection(bev, worker); bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, Redis::Connection::OnEvent, conn); bufferevent_enable(bev, EV_READ); - Status status = worker->AddConnection(conn); - if (!status.IsOK()) { - std::string err_msg = Redis::Error("ERR " + status.Msg()); - Util::SockSend(fd, err_msg); + auto s = worker->AddConnection(conn); + if (!s.IsOK()) { + std::string err_msg = Redis::Error("ERR " + s.Msg()); + s = Util::SockSend(fd, err_msg); + if (!s.IsOK()) { + LOG(WARNING) << "Failed to send error response to socket: " << s.Msg(); + } conn->Close(); return; } diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index 8d893db9c10..ac57ffdc5a3 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -38,7 +38,9 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) { } } else { // Redis type log data - log_data_.Decode(blob); + if (auto s = log_data_.Decode(blob); !s.IsOK()) { + LOG(WARNING) << "Failed to decode Redis type log: " << s.Msg(); + } } } diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc index 1c3b3a71c9c..8269485510a 100644 --- a/src/storage/scripting.cc +++ b/src/storage/scripting.cc @@ -926,8 +926,7 @@ Status createFunction(Server *srv, const std::string &body, std::string *sha, lu return Status(Status::NotOK, "Error running script (new function): " + errMsg + "\n"); } // would store lua function into propagate column family and propagate those scripts to slaves - srv->ScriptSet(*sha, body); - return Status::OK(); + return srv->ScriptSet(*sha, body); } } // namespace Lua diff --git a/src/storage/storage.cc b/src/storage/storage.cc index efb7e46942a..ef020bfd50a 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -77,7 +77,7 @@ Storage::Storage(Config *config) : env_(rocksdb::Env::Default()), config_(config } Storage::~Storage() { - if (backup_ != nullptr) { + if (backup_) { DestroyBackup(); } CloseDB(); @@ -248,7 +248,9 @@ Status Storage::Open(bool read_only) { size_t subkey_block_cache_size = config_->RocksDB.subkey_block_cache_size * MiB; rocksdb::Options options = InitOptions(); - CreateColumnFamilies(options); + if (auto s = CreateColumnFamilies(options); !s.IsOK()) { + return s.Prefixed("failed to create column families"); + } std::shared_ptr shared_block_cache; if (config_->RocksDB.share_metadata_and_subkey_block_cache) { @@ -372,10 +374,9 @@ Status Storage::CreateBackup() { return Status::OK(); } -Status Storage::DestroyBackup() { +void Storage::DestroyBackup() { backup_->StopBackup(); delete backup_; - return Status(); } Status Storage::RestoreFromBackup() { @@ -604,7 +605,7 @@ uint64_t Storage::GetTotalSize(const std::string &ns) { return total_size; } -Status Storage::CheckDBSizeLimit() { +void Storage::CheckDBSizeLimit() { bool reach_db_size_limit = false; if (config_->max_db_size == 0) { reach_db_size_limit = false; @@ -612,8 +613,9 @@ Status Storage::CheckDBSizeLimit() { reach_db_size_limit = GetTotalSize() >= config_->max_db_size * GiB; } if (reach_db_size_limit_ == reach_db_size_limit) { - return Status::OK(); + return; } + reach_db_size_limit_ = reach_db_size_limit; if (reach_db_size_limit_) { LOG(WARNING) << "[storage] ENABLE db_size limit " << config_->max_db_size << " GB" @@ -621,7 +623,6 @@ Status Storage::CheckDBSizeLimit() { } else { LOG(WARNING) << "[storage] DISABLE db_size limit, set kvrocks to read-write mode "; } - return Status::OK(); } void Storage::SetIORateLimit(int64_t max_io_mb) { @@ -645,12 +646,12 @@ Status Storage::WriteToPropagateCF(const std::string &key, const std::string &va return Status::OK(); } -bool Storage::ShiftReplId() { +Status Storage::ShiftReplId() { const char *charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; const int charset_len = static_cast(strlen(charset)); // Do nothing if don't enable rsid psync - if (!config_->use_rsid_psync) return true; + if (!config_->use_rsid_psync) return Status::OK(); std::random_device rd; std::mt19937 gen(rd() + getpid()); @@ -663,8 +664,7 @@ bool Storage::ShiftReplId() { LOG(INFO) << "[replication] New replication id: " << replid_; // Write new replication id into db engine - WriteToPropagateCF(kReplicationIdKey, replid_); - return true; + return WriteToPropagateCF(kReplicationIdKey, replid_); } std::string Storage::GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq) { @@ -834,10 +834,8 @@ int Storage::ReplDataManager::OpenDataFile(Storage *storage, const std::string & return rv; } -Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(Storage *storage, - rocksdb::BackupID meta_id, - evbuffer *evbuf) { - Storage::ReplDataManager::MetaInfo meta; +Status Storage::ReplDataManager::ParseMetaAndSave(Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf, + Storage::ReplDataManager::MetaInfo *meta) { auto meta_file = "meta/" + std::to_string(meta_id); DLOG(INFO) << "[meta] id: " << meta_id; @@ -850,16 +848,16 @@ Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(St // timestamp; UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_LF); DLOG(INFO) << "[meta] timestamp: " << line.get(); - meta.timestamp = std::strtoll(line.get(), nullptr, 10); + meta->timestamp = std::strtoll(line.get(), nullptr, 10); // sequence line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF); DLOG(INFO) << "[meta] seq:" << line.get(); - meta.seq = std::strtoull(line.get(), nullptr, 10); + meta->seq = std::strtoull(line.get(), nullptr, 10); // optional metadata line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF); if (strncmp(line.get(), "metadata", 8) == 0) { DLOG(INFO) << "[meta] meta: " << line.get(); - meta.meta_data = std::string(line.get(), line.length); + meta->meta_data = std::string(line.get(), line.length); line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF); } DLOG(INFO) << "[meta] file count: " << line.get(); @@ -877,10 +875,10 @@ Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(St while (*(cptr++) != ' ') { } auto crc32 = std::strtoul(cptr, nullptr, 10); - meta.files.emplace_back(filename, crc32); + meta->files.emplace_back(filename, crc32); } - SwapTmpFile(storage, storage->config_->backup_sync_dir, meta_file); - return meta; + + return SwapTmpFile(storage, storage->config_->backup_sync_dir, meta_file); } Status MkdirRecursively(rocksdb::Env *env, const std::string &dir) { diff --git a/src/storage/storage.h b/src/storage/storage.h index 271d1a60660..ee28e6b60e7 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -79,7 +79,7 @@ class Storage { Status SetDBOption(const std::string &key, const std::string &value); Status CreateColumnFamilies(const rocksdb::Options &options); Status CreateBackup(); - Status DestroyBackup(); + void DestroyBackup(); Status RestoreFromBackup(); Status RestoreFromCheckpoint(); Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr *iter); @@ -103,7 +103,7 @@ class Storage { LockManager *GetLockManager() { return &lock_mgr_; } void PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backup_max_keep_hours); uint64_t GetTotalSize(const std::string &ns = kDefaultNamespace); - Status CheckDBSizeLimit(); + void CheckDBSizeLimit(); void SetIORateLimit(int64_t max_io_mb); std::unique_ptr ReadLockGuard(); @@ -139,7 +139,8 @@ class Storage { // [[filename, checksum]...] std::vector> files; }; - static MetaInfo ParseMetaAndSave(Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf); + static Status ParseMetaAndSave(Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf, + Storage::ReplDataManager::MetaInfo *meta); static std::unique_ptr NewTmpFile(Storage *storage, const std::string &dir, const std::string &repl_file); static Status SwapTmpFile(Storage *storage, const std::string &dir, const std::string &repl_file); @@ -155,7 +156,7 @@ class Storage { void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ = yes_or_no; } bool IsDBInRetryableIOError() { return db_in_retryable_io_error_; } - bool ShiftReplId(); + Status ShiftReplId(); std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq); std::string GetReplIdFromDbEngine(); diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc index f58b6c6eae3..7c7b934aa59 100644 --- a/tests/cppunit/config_test.cc +++ b/tests/cppunit/config_test.cc @@ -18,8 +18,6 @@ * */ -#include "config.h" - #include #include @@ -35,7 +33,8 @@ TEST(Config, GetAndSet) { const char *path = "test.conf"; Config config; - config.Load(CLIOptions(path)); + auto s = config.Load(CLIOptions(path)); + EXPECT_FALSE(s.IsOK()); std::map mutable_cases = { {"timeout", "1000"}, {"maxclients", "2000"}, @@ -82,7 +81,7 @@ TEST(Config, GetAndSet) { }; std::vector values; for (const auto &iter : mutable_cases) { - auto s = config.Set(nullptr, iter.first, iter.second); + s = config.Set(nullptr, iter.first, iter.second); ASSERT_TRUE(s.IsOK()); config.Get(iter.first, &values); ASSERT_TRUE(s.IsOK()); @@ -91,9 +90,10 @@ TEST(Config, GetAndSet) { EXPECT_EQ(values[1], iter.second); } ASSERT_TRUE(config.Rewrite().IsOK()); - config.Load(CLIOptions(path)); + s = config.Load(CLIOptions(path)); + EXPECT_TRUE(s.IsOK()); for (const auto &iter : mutable_cases) { - auto s = config.Set(nullptr, iter.first, iter.second); + s = config.Set(nullptr, iter.first, iter.second); ASSERT_TRUE(s.IsOK()); config.Get(iter.first, &values); ASSERT_EQ(values.size(), 2); @@ -125,7 +125,7 @@ TEST(Config, GetAndSet) { {"rocksdb.row_cache_size", "100"}, }; for (const auto &iter : immutable_cases) { - auto s = config.Set(nullptr, iter.first, iter.second); + s = config.Set(nullptr, iter.first, iter.second); ASSERT_FALSE(s.IsOK()); } } @@ -184,7 +184,8 @@ TEST(Namespace, Add) { unlink(path); Config config; - config.Load(CLIOptions(path)); + auto s = config.Load(CLIOptions(path)); + EXPECT_FALSE(s.IsOK()); config.slot_id_encoded = false; EXPECT_TRUE(!config.AddNamespace("ns", "t0").IsOK()); config.requirepass = "foobared"; @@ -196,15 +197,16 @@ TEST(Namespace, Add) { } for (size_t i = 0; i < namespaces.size(); i++) { std::string token; - config.GetNamespace(namespaces[i], &token); + s = config.GetNamespace(namespaces[i], &token); + EXPECT_TRUE(s.IsOK()); EXPECT_EQ(token, tokens[i]); } for (size_t i = 0; i < namespaces.size(); i++) { - auto s = config.AddNamespace(namespaces[i], tokens[i]); + s = config.AddNamespace(namespaces[i], tokens[i]); EXPECT_FALSE(s.IsOK()); EXPECT_EQ(s.Msg(), "the token has already exists"); } - auto s = config.AddNamespace("n1", "t0"); + s = config.AddNamespace("n1", "t0"); EXPECT_FALSE(s.IsOK()); EXPECT_EQ(s.Msg(), "the namespace has already exists"); @@ -219,14 +221,15 @@ TEST(Namespace, Set) { unlink(path); Config config; - config.Load(CLIOptions(path)); + auto s = config.Load(CLIOptions(path)); + EXPECT_FALSE(s.IsOK()); config.slot_id_encoded = false; config.requirepass = "foobared"; std::vector namespaces = {"n1", "n2", "n3", "n4"}; std::vector tokens = {"t1", "t2", "t3", "t4"}; std::vector new_tokens = {"nt1", "nt2'", "nt3", "nt4"}; for (size_t i = 0; i < namespaces.size(); i++) { - auto s = config.SetNamespace(namespaces[i], tokens[i]); + s = config.SetNamespace(namespaces[i], tokens[i]); EXPECT_FALSE(s.IsOK()); EXPECT_EQ(s.Msg(), "the namespace was not found"); } @@ -235,7 +238,8 @@ TEST(Namespace, Set) { } for (size_t i = 0; i < namespaces.size(); i++) { std::string token; - config.GetNamespace(namespaces[i], &token); + s = config.GetNamespace(namespaces[i], &token); + EXPECT_TRUE(s.IsOK()); EXPECT_EQ(token, tokens[i]); } for (size_t i = 0; i < namespaces.size(); i++) { @@ -243,7 +247,8 @@ TEST(Namespace, Set) { } for (size_t i = 0; i < namespaces.size(); i++) { std::string token; - config.GetNamespace(namespaces[i], &token); + s = config.GetNamespace(namespaces[i], &token); + EXPECT_TRUE(s.IsOK()); EXPECT_EQ(token, new_tokens[i]); } unlink(path); @@ -254,7 +259,8 @@ TEST(Namespace, Delete) { unlink(path); Config config; - config.Load(CLIOptions(path)); + auto s = config.Load(CLIOptions(path)); + EXPECT_FALSE(s.IsOK()); config.slot_id_encoded = false; config.requirepass = "foobared"; std::vector namespaces = {"n1", "n2", "n3", "n4"}; @@ -264,13 +270,16 @@ TEST(Namespace, Delete) { } for (size_t i = 0; i < namespaces.size(); i++) { std::string token; - config.GetNamespace(namespaces[i], &token); + s = config.GetNamespace(namespaces[i], &token); + EXPECT_TRUE(s.IsOK()); EXPECT_EQ(token, tokens[i]); } for (const auto &ns : namespaces) { - config.DelNamespace(ns); + s = config.DelNamespace(ns); + EXPECT_TRUE(s.IsOK()); std::string token; - config.GetNamespace(ns, &token); + s = config.GetNamespace(ns, &token); + EXPECT_FALSE(s.IsOK()); EXPECT_TRUE(token.empty()); } unlink(path); @@ -280,7 +289,8 @@ TEST(Namespace, RewriteNamespaces) { const char *path = "test.conf"; unlink(path); Config config; - config.Load(CLIOptions(path)); + auto s = config.Load(CLIOptions(path)); + EXPECT_FALSE(s.IsOK()); config.requirepass = "test"; config.backup_dir = "test"; config.slot_id_encoded = false; @@ -293,10 +303,12 @@ TEST(Namespace, RewriteNamespaces) { EXPECT_TRUE(config.DelNamespace("to-be-deleted-ns").IsOK()); Config new_config; - auto s = new_config.Load(CLIOptions(path)); + s = new_config.Load(CLIOptions(path)); + EXPECT_TRUE(s.IsOK()); for (size_t i = 0; i < namespaces.size(); i++) { std::string token; - new_config.GetNamespace(namespaces[i], &token); + s = new_config.GetNamespace(namespaces[i], &token); + EXPECT_TRUE(s.IsOK()); EXPECT_EQ(token, tokens[i]); } diff --git a/tests/cppunit/cron_test.cc b/tests/cppunit/cron_test.cc index 98c5d4228b0..69d017b8856 100644 --- a/tests/cppunit/cron_test.cc +++ b/tests/cppunit/cron_test.cc @@ -29,9 +29,10 @@ class CronTest : public testing::Test { explicit CronTest() { cron = std::make_unique(); std::vector schedule{"*", "3", "*", "*", "*"}; - cron->SetScheduleTime(schedule); + auto s = cron->SetScheduleTime(schedule); + EXPECT_TRUE(s.IsOK()); } - ~CronTest() = default; + ~CronTest() override = default; protected: std::unique_ptr cron; diff --git a/utils/kvrocks2redis/main.cc b/utils/kvrocks2redis/main.cc index 03dcd3575d6..415901082bd 100644 --- a/utils/kvrocks2redis/main.cc +++ b/utils/kvrocks2redis/main.cc @@ -88,7 +88,10 @@ static Status createPidFile(const std::string &path) { return Status(Status::NotOK, strerror(errno)); } std::string pid_str = std::to_string(getpid()); - Util::Write(fd, pid_str); + auto s = Util::Write(fd, pid_str); + if (!s.IsOK()) { + return s.Prefixed("failed to write to PID-file"); + } close(fd); return Status::OK(); } diff --git a/utils/kvrocks2redis/redis_writer.cc b/utils/kvrocks2redis/redis_writer.cc index 7b38545dfd0..6d3c845bdcd 100644 --- a/utils/kvrocks2redis/redis_writer.cc +++ b/utils/kvrocks2redis/redis_writer.cc @@ -67,7 +67,10 @@ Status RedisWriter::FlushDB(const std::string &ns) { return s; } - updateNextOffset(ns, 0); + s = updateNextOffset(ns, 0); + if (!s.IsOK()) { + return s; + } s = Write(ns, {Redis::Command2RESP({"FLUSHDB"})}); if (!s.IsOK()) return s; @@ -136,7 +139,11 @@ void RedisWriter::sync() { Stop(); return; } - updateNextOffset(iter.first, next_offsets_[iter.first] + getted_line_leng); + s = updateNextOffset(iter.first, next_offsets_[iter.first] + getted_line_leng); + if (!s.IsOK()) { + LOG(ERROR) << "ERR updating next offset: " << s.Msg(); + break; + } } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } @@ -173,7 +180,11 @@ Status RedisWriter::getRedisConn(const std::string &ns, const std::string &host, Status RedisWriter::authRedis(const std::string &ns, const std::string &auth) { const auto auth_len_str = std::to_string(auth.length()); - Util::SockSend(redis_fds_[ns], "*2" CRLF "$4" CRLF "auth" CRLF "$" + auth_len_str + CRLF + auth + CRLF); + auto s = Util::SockSend(redis_fds_[ns], "*2" CRLF "$4" CRLF "auth" CRLF "$" + auth_len_str + CRLF + auth + CRLF); + if (!s.IsOK()) { + return s.Prefixed("[kvrocks2redis] failed to send AUTH command"); + } + std::string line = GET_OR_RET(Util::SockReadLine(redis_fds_[ns]).Prefixed("read redis auth response err")); if (line.compare(0, 3, "+OK") != 0) { return {Status::NotOK, "[kvrocks2redis] redis Auth failed: " + line}; @@ -184,8 +195,11 @@ Status RedisWriter::authRedis(const std::string &ns, const std::string &auth) { Status RedisWriter::selectDB(const std::string &ns, int db_number) { const auto db_number_str = std::to_string(db_number); const auto db_number_str_len = std::to_string(db_number_str.length()); - Util::SockSend(redis_fds_[ns], - "*2" CRLF "$6" CRLF "select" CRLF "$" + db_number_str_len + CRLF + db_number_str + CRLF); + auto s = Util::SockSend(redis_fds_[ns], + "*2" CRLF "$6" CRLF "select" CRLF "$" + db_number_str_len + CRLF + db_number_str + CRLF); + if (!s.IsOK()) { + return s.Prefixed("failed to send SELECT command to socket"); + } LOG(INFO) << "[kvrocks2redis] select db request was sent, waiting for response"; std::string line = GET_OR_RET(Util::SockReadLine(redis_fds_[ns]).Prefixed("read select db response err")); if (line.compare(0, 3, "+OK") != 0) { @@ -224,8 +238,7 @@ Status RedisWriter::writeNextOffsetToFile(const std::string &ns, std::istream::o offset_string += " "; } offset_string += '\0'; - Util::Pwrite(next_offset_fds_[ns], offset_string, 0); - return Status::OK(); + return Util::Pwrite(next_offset_fds_[ns], offset_string, 0); } std::string RedisWriter::getNextOffsetFilePath(const std::string &ns) { diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc index 97352551546..bda9d89978e 100644 --- a/utils/kvrocks2redis/sync.cc +++ b/utils/kvrocks2redis/sync.cc @@ -180,7 +180,10 @@ Status Sync::incrementBatchLoop() { auto bat = rocksdb::WriteBatch(bulk_data_str); int count = static_cast(bat.Count()); parser_->ParseWriteBatch(bulk_data_str); - updateNextSeq(next_seq_ + count); + auto s = updateNextSeq(next_seq_ + count); + if (!s.IsOK()) { + return s.Prefixed("failed to update next sequence"); + } } evbuffer_drain(evbuf, incr_bulk_len_ + 2); incr_state_ = Incr_batch_size; @@ -209,7 +212,11 @@ void Sync::parseKVFromLocalStorage() { LOG(ERROR) << "[kvrocks2redis] Failed to parse full db, encounter error: " << s.Msg(); return; } - updateNextSeq(storage_->LatestSeq() + 1); + + s = updateNextSeq(storage_->LatestSeq() + 1); + if (!s.IsOK()) { + LOG(ERROR) << "[kvrocks2redis] Failed to update next sequence: " << s.Msg(); + } } Status Sync::updateNextSeq(rocksdb::SequenceNumber seq) { @@ -242,6 +249,5 @@ Status Sync::writeNextSeqToFile(rocksdb::SequenceNumber seq) { seq_string += " "; } seq_string += '\0'; - Util::Pwrite(next_seq_fd_, seq_string, 0); - return Status::OK(); + return Util::Pwrite(next_seq_fd_, seq_string, 0); } From cef7943a8cb2ed06152a1111cc816549d2651edc Mon Sep 17 00:00:00 2001 From: Yaroslav Stepanchuk Date: Sun, 1 Jan 2023 10:01:08 +0200 Subject: [PATCH 2/2] Add compiler option: treat unused-result warning as errors --- CMakeLists.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d4a89c26ee4..a63f5ddcc6c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -177,8 +177,7 @@ add_library(kvrocks_objs OBJECT ${KVROCKS_SRCS}) target_include_directories(kvrocks_objs PUBLIC src src/common ${PROJECT_BINARY_DIR}) target_compile_features(kvrocks_objs PUBLIC cxx_std_17) -# TODO: Add -Werror=unused-result to compile options -target_compile_options(kvrocks_objs PUBLIC -Wall -Wpedantic -Wsign-compare -Wreturn-type -fno-omit-frame-pointer) +target_compile_options(kvrocks_objs PUBLIC -Wall -Wpedantic -Wsign-compare -Wreturn-type -fno-omit-frame-pointer -Werror=unused-result) if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") target_compile_options(kvrocks_objs PUBLIC -Wno-pedantic) elseif((CMAKE_CXX_COMPILER_ID STREQUAL "Clang") OR (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang"))