Skip to content

Commit

Permalink
Fix part of the clang-tidy warning in cluster and common (#1075)
Browse files Browse the repository at this point in the history
  • Loading branch information
tanruixiang authored and PragmaTwice committed Nov 9, 2022
1 parent e902221 commit 2ba60cf
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 109 deletions.
39 changes: 22 additions & 17 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ const char *errInvalidImportState = "Invalid import state";

ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
std::bitset<kClusterSlots> slots)
: id_(id), host_(host), port_(port), role_(role), master_id_(master_id), slots_(slots) {}
: id_(std::move(id)),
host_(std::move(host)),
port_(port),
role_(role),
master_id_(std::move(master_id)),
slots_(slots) {}

Cluster::Cluster(Server *svr, std::vector<std::string> binds, int port)
: svr_(svr), binds_(binds), port_(port), size_(0), version_(-1), myself_(nullptr) {
for (unsigned i = 0; i < kClusterSlots; i++) {
slots_nodes_[i] = nullptr;
: svr_(svr), binds_(std::move(binds)), port_(port), size_(0), version_(-1), myself_(nullptr) {
for (auto &slots_node : slots_nodes_) {
slots_node = nullptr;
}
}

Expand All @@ -70,7 +75,7 @@ bool Cluster::SubCommandIsExecExclusive(const std::string &subcommand) {
return false;
}

Status Cluster::SetNodeId(std::string node_id) {
Status Cluster::SetNodeId(const std::string &node_id) {
if (node_id.size() != kClusterNodeIdLen) {
return Status(Status::ClusterInvalidInfo, errInvalidNodeID);
}
Expand Down Expand Up @@ -100,7 +105,7 @@ Status Cluster::SetNodeId(std::string node_id) {
// This is different with CLUSTERX SETNODES commands because it uses new version
// topology to cover current version, it allows kvrocks nodes lost some topology
// updates since of network failure, it is state instead of operation.
Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_version) {
// Parameters check
if (new_version <= 0 || new_version != version_ + 1) {
return Status(Status::NotOK, errInvalidClusterVersion);
Expand Down Expand Up @@ -130,9 +135,9 @@ Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
// 3. Update the map of slots to nodes.
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots_[slot] = 0;
old_node->slots_[slot] = false;
}
to_assign_node->slots_[slot] = 1;
to_assign_node->slots_[slot] = true;
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
Expand Down Expand Up @@ -236,7 +241,7 @@ void Cluster::SetMasterSlaveRepl() {
} else if (nodes_.find(myself_->master_id_) != nodes_.end()) {
// Slave mode and master node is existing
std::shared_ptr<ClusterNode> master = nodes_[myself_->master_id_];
Status s = svr_->AddMaster(master->host_, master->port_, 0);
Status s = svr_->AddMaster(master->host_, master->port_, false);
if (s.IsOK()) {
LOG(INFO) << "SLAVE OF " << master->host_ << ":" << master->port_ << " enabled by cluster topology setting";
} else {
Expand Down Expand Up @@ -348,8 +353,8 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
cluster_infos->clear();

int ok_slot = 0;
for (int i = 0; i < kClusterSlots; i++) {
if (slots_nodes_[i] != nullptr) ok_slot++;
for (auto &slots_node : slots_nodes_) {
if (slots_node != nullptr) ok_slot++;
}

*cluster_infos =
Expand Down Expand Up @@ -425,7 +430,7 @@ Status Cluster::GetSlotsInfo(std::vector<SlotInfo> *slots_infos) {
return Status::OK();
}

SlotInfo Cluster::GenSlotNodeInfo(int start, int end, std::shared_ptr<ClusterNode> n) {
SlotInfo Cluster::GenSlotNodeInfo(int start, int end, const std::shared_ptr<ClusterNode> &n) {
std::vector<SlotInfo::NodeInfo> vn;
vn.push_back({n->host_, n->port_, n->id_}); // itself

Expand Down Expand Up @@ -539,7 +544,7 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
int port = *parse_result;

// 4) role
int role;
int role = 0;
if (strcasecmp(fields[3].c_str(), "master") == 0) {
role = kClusterMaster;
} else if (strcasecmp(fields[3].c_str(), "slave") == 0 || strcasecmp(fields[3].c_str(), "replica") == 0) {
Expand Down Expand Up @@ -569,15 +574,15 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
// 6) slot info
auto valid_range = NumericRange<int>{0, kClusterSlots - 1};
for (unsigned i = 5; i < fields.size(); i++) {
int start, stop;
int start = 0, stop = 0;
std::vector<std::string> ranges = Util::Split(fields[i], "-");
if (ranges.size() == 1) {
auto parse_result = ParseInt<int>(ranges[0], valid_range, 10);
if (!parse_result) {
return Status(Status::ClusterInvalidInfo, errSlotOutOfRange);
}
start = *parse_result;
slots.set(start, 1);
slots.set(start, true);
if (role == kClusterMaster) {
if (slots_nodes->find(start) != slots_nodes->end()) {
return Status(Status::ClusterInvalidInfo, errSlotOverlapped);
Expand All @@ -594,7 +599,7 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
start = *parse_start;
stop = *parse_stop;
for (int j = start; j <= stop; j++) {
slots.set(j, 1);
slots.set(j, true);
if (role == kClusterMaster) {
if (slots_nodes->find(j) != slots_nodes->end()) {
return Status(Status::ClusterInvalidInfo, errSlotOverlapped);
Expand Down Expand Up @@ -624,7 +629,7 @@ bool Cluster::IsWriteForbiddenSlot(int slot) {
Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
Redis::Connection *conn) {
std::vector<int> keys_indexes;
auto s = Redis::GetKeysFromCommand(attributes->name, cmd_tokens.size(), &keys_indexes);
auto s = Redis::GetKeysFromCommand(attributes->name, static_cast<int>(cmd_tokens.size()), &keys_indexes);
// No keys
if (!s.IsOK()) return Status::OK();
if (keys_indexes.size() == 0) return Status::OK();
Expand Down
6 changes: 3 additions & 3 deletions src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class Cluster {
explicit Cluster(Server *svr, std::vector<std::string> binds, int port);
Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force);
Status GetClusterNodes(std::string *nodes_str);
Status SetNodeId(std::string node_id);
Status SetSlot(int slot, std::string node_id, int64_t version);
Status SetNodeId(const std::string &node_id);
Status SetSlot(int slot, const std::string &node_id, int64_t version);
Status SetSlotMigrated(int slot, const std::string &ip_port);
Status SetSlotImported(int slot);
Status GetSlotsInfo(std::vector<SlotInfo> *slot_infos);
Expand All @@ -99,7 +99,7 @@ class Cluster {

private:
std::string GenNodesDescription();
SlotInfo GenSlotNodeInfo(int start, int end, std::shared_ptr<ClusterNode> n);
SlotInfo GenSlotNodeInfo(int start, int end, const std::shared_ptr<ClusterNode> &n);
Status ParseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes);
Server *svr_;
Expand Down
6 changes: 3 additions & 3 deletions src/cluster/redis_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ static const uint16_t crc16tab[256] = {
0x2e93, 0x3eb2, 0x0ed1, 0x1ef0};

uint16_t crc16(const char *buf, int len) {
int i;
int i = 0;
uint16_t crc = 0;
for (i = 0; i < len; i++) crc = (crc << 8) ^ crc16tab[((crc >> 8) ^ *buf++) & 0x00FF];
return crc;
Expand All @@ -67,9 +67,9 @@ uint16_t GetSlotNumFromKey(const std::string &key) {
}

std::string GetTagFromKey(const std::string &key) {
auto left_pos = key.find("{");
auto left_pos = key.find('{');
if (left_pos == std::string::npos) return std::string();
auto right_pos = key.find("}", left_pos + 1);
auto right_pos = key.find('}', left_pos + 1);
// Note that we hash the whole key if there is nothing between {}.
if (right_pos == std::string::npos || right_pos <= left_pos + 1) {
return std::string();
Expand Down
22 changes: 11 additions & 11 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ void send_string(bufferevent *bev, const std::string &data) {
void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int16_t events, void *state_machine_ptr) {
if (events & BEV_EVENT_CONNECTED) {
// call write_cb when connected
bufferevent_data_cb write_cb;
bufferevent_data_cb write_cb = nullptr;
bufferevent_getcb(bev, nullptr, &write_cb, nullptr, nullptr);
if (write_cb) write_cb(bev, state_machine_ptr);
return;
Expand Down Expand Up @@ -219,7 +219,7 @@ void ReplicationThread::CallbacksStateMachine::EvCallback(bufferevent *bev, void
}

void ReplicationThread::CallbacksStateMachine::Start() {
int cfd;
int cfd = 0;
struct bufferevent *bev = nullptr;

if (handlers_.empty()) {
Expand Down Expand Up @@ -524,7 +524,7 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
case Incr_batch_data:
// Read bulk data (batch data)
if (self->incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We got enough data
bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, self->incr_bulk_len_ + 2));
bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, static_cast<ssize_t>(self->incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, self->incr_bulk_len_);
// master would send the ping heartbeat packet to check whether the slave was alive or not,
// don't write ping to db here.
Expand Down Expand Up @@ -614,7 +614,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, v
return CBState::RESTART;
}
std::vector<std::string> need_files = Util::Split(std::string(line.get()), ",");
for (auto f : need_files) {
for (const auto &f : need_files) {
meta.files.emplace_back(f, 0);
}

Expand Down Expand Up @@ -698,7 +698,7 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
if (this->stop_flag_) {
return Status(Status::NotOK, "replication thread was stopped");
}
int sock_fd;
int sock_fd = 0;
Status s = Util::SockConnect(this->host_, this->port_, &sock_fd);
if (!s.IsOK()) {
return Status(Status::NotOK, "connect the server err: " + s.Msg());
Expand Down Expand Up @@ -730,7 +730,7 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
crcs.push_back(f_crc);
}
unsigned files_count = files.size();
fetch_file_callback fn = [&fetch_cnt, &skip_cnt, files_count](const std::string fetch_file,
fetch_file_callback fn = [&fetch_cnt, &skip_cnt, files_count](const std::string &fetch_file,
const uint32_t fetch_crc) {
fetch_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
Expand Down Expand Up @@ -787,9 +787,9 @@ Status ReplicationThread::sendAuth(int sock_fd) {
return Status::OK();
}

Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, std::string file,
uint32_t crc, fetch_file_callback fn) {
size_t file_size;
Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, const std::string &file,
uint32_t crc, const fetch_file_callback &fn) {
size_t file_size = 0;

// Read file size line
while (true) {
Expand Down Expand Up @@ -849,9 +849,9 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str
}

Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const std::vector<std::string> &files,
const std::vector<uint32_t> &crcs, fetch_file_callback fn) {
const std::vector<uint32_t> &crcs, const fetch_file_callback &fn) {
std::string files_str;
for (auto file : files) {
for (const auto &file : files) {
files_str += file;
files_str.push_back(',');
}
Expand Down
6 changes: 3 additions & 3 deletions src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ class ReplicationThread {

// Synchronized-Blocking ops
Status sendAuth(int sock_fd);
Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, const std::string file, uint32_t crc,
fetch_file_callback fn);
Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, const std::string &file, uint32_t crc,
const fetch_file_callback &fn);
Status fetchFiles(int sock_fd, const std::string &dir, const std::vector<std::string> &files,
const std::vector<uint32_t> &crcs, fetch_file_callback fn);
const std::vector<uint32_t> &crcs, const fetch_file_callback &fn);
Status parallelFetchFile(const std::string &dir, const std::vector<std::pair<std::string, uint32_t>> &files);
static bool isRestoringError(const char *err);
static bool isWrongPsyncNum(const char *err);
Expand Down
26 changes: 13 additions & 13 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ SlotMigrate::SlotMigrate(Server *svr, int speed, int pipeline_size, int seq_gap)
}
}

Status SlotMigrate::MigrateStart(Server *svr, const std::string &node_id, const std::string dst_ip, int dst_port,
Status SlotMigrate::MigrateStart(Server *svr, const std::string &node_id, const std::string &dst_ip, int dst_port,
int slot, int speed, int pipeline_size, int seq_gap) {
// Only one slot migration job at the same time
int16_t no_slot = -1;
Expand Down Expand Up @@ -127,7 +127,7 @@ SlotMigrate::~SlotMigrate() {
}
}

Status SlotMigrate::CreateMigrateHandleThread(void) {
Status SlotMigrate::CreateMigrateHandleThread() {
try {
t_ = std::thread([this]() {
Util::ThreadSetName("slot-migrate");
Expand Down Expand Up @@ -166,7 +166,7 @@ void SlotMigrate::Loop() {
}
}

void SlotMigrate::StateMachine(void) {
void SlotMigrate::StateMachine() {
state_machine_ = kSlotMigrateStart;
while (true) {
if (IsTerminated()) {
Expand Down Expand Up @@ -236,7 +236,7 @@ void SlotMigrate::StateMachine(void) {
}
}

Status SlotMigrate::Start(void) {
Status SlotMigrate::Start() {
// Get snapshot and sequence
slot_snapshot_ = storage_->GetDB()->GetSnapshot();
if (slot_snapshot_ == nullptr) {
Expand Down Expand Up @@ -275,7 +275,7 @@ Status SlotMigrate::Start(void) {
return Status::OK();
}

Status SlotMigrate::SendSnapshot(void) {
Status SlotMigrate::SendSnapshot() {
// Create DB iter of snapshot
uint64_t migratedkey_cnt = 0, expiredkey_cnt = 0, emptykey_cnt = 0;
std::string restore_cmds;
Expand Down Expand Up @@ -338,7 +338,7 @@ Status SlotMigrate::SendSnapshot(void) {
return Status::OK();
}

Status SlotMigrate::SyncWal(void) {
Status SlotMigrate::SyncWal() {
// Send incremental data in wal circularly until new increment less than a certain amount
auto s = SyncWalBeforeForbidSlot();
if (!s.IsOK()) {
Expand All @@ -355,7 +355,7 @@ Status SlotMigrate::SyncWal(void) {
return Status::OK();
}

Status SlotMigrate::Success(void) {
Status SlotMigrate::Success() {
if (stop_migrate_) {
LOG(ERROR) << "[migrate] Stop migrating slot " << migrate_slot_;
return Status(Status::NotOK);
Expand All @@ -375,7 +375,7 @@ Status SlotMigrate::Success(void) {
return Status::OK();
}

Status SlotMigrate::Fail(void) {
Status SlotMigrate::Fail() {
// Set destination status
if (!SetDstImportStatus(slot_job_->slot_fd_, kImportFailed)) {
LOG(INFO) << "[migrate] Failed to notify the destination that data migration failed";
Expand All @@ -386,7 +386,7 @@ Status SlotMigrate::Fail(void) {
return Status::OK();
}

Status SlotMigrate::Clean(void) {
Status SlotMigrate::Clean() {
LOG(INFO) << "[migrate] Clean resources of migrating slot " << migrate_slot_;
if (slot_snapshot_) {
storage_->GetDB()->ReleaseSnapshot(slot_snapshot_);
Expand All @@ -404,7 +404,7 @@ Status SlotMigrate::Clean(void) {
return Status::OK();
}

bool SlotMigrate::AuthDstServer(int sock_fd, std::string password) {
bool SlotMigrate::AuthDstServer(int sock_fd, const std::string &password) {
std::string cmd = Redis::MultiBulkString({"auth", password}, false);
auto s = Util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
Expand Down Expand Up @@ -718,7 +718,7 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m

bool SlotMigrate::MigrateBitmapKey(const InternalKey &inkey, std::unique_ptr<rocksdb::Iterator> *iter,
std::vector<std::string> *user_cmd, std::string *restore_cmds) {
uint32_t index, offset;
uint32_t index = 0, offset = 0;
std::string index_str = inkey.GetSubKey().ToString();
std::string fragment = (*iter)->value().ToString();
auto parse_result = ParseInt<int>(index_str, 10);
Expand Down Expand Up @@ -803,7 +803,7 @@ void SlotMigrate::ReleaseForbiddenSlot() {
forbidden_slot_ = -1;
}

void SlotMigrate::MigrateSpeedLimit(void) {
void SlotMigrate::MigrateSpeedLimit() {
if (migrate_speed_ > 0) {
uint64_t current_time = Util::GetTimeStampUS();
uint64_t per_request_time = 1000000 * pipeline_size_limit_ / migrate_speed_;
Expand Down Expand Up @@ -892,7 +892,7 @@ Status SlotMigrate::MigrateIncrementData(std::unique_ptr<rocksdb::TransactionLog
return Status::OK();
}

Status SlotMigrate::SyncWalBeforeForbidSlot(void) {
Status SlotMigrate::SyncWalBeforeForbidSlot() {
uint32_t count = 0;
while (count < kMaxLoopTimes) {
wal_increment_seq_ = storage_->GetDB()->GetLatestSequenceNumber();
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SlotMigrate : public Redis::Database {

Status CreateMigrateHandleThread(void);
void Loop();
Status MigrateStart(Server *svr, const std::string &node_id, const std::string dst_ip, int dst_port, int slot,
Status MigrateStart(Server *svr, const std::string &node_id, const std::string &dst_ip, int dst_port, int slot,
int speed, int pipeline_size, int seq_gap);
void ReleaseForbiddenSlot();
void SetMigrateSpeedLimit(int speed) {
Expand Down Expand Up @@ -113,7 +113,7 @@ class SlotMigrate : public Redis::Database {
Status Fail(void);
Status Clean(void);

bool AuthDstServer(int sock_fd, std::string password);
bool AuthDstServer(int sock_fd, const std::string &password);
bool SetDstImportStatus(int sock_fd, int status);
bool CheckResponseOnce(int sock_fd);
bool CheckResponseWithCounts(int sock_fd, int total);
Expand Down
Loading

0 comments on commit 2ba60cf

Please sign in to comment.