Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify server's shorten form, change svr to srv #1861

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, s
std::bitset<kClusterSlots> slots)
: id(std::move(id)), host(std::move(host)), port(port), role(role), master_id(std::move(master_id)), slots(slots) {}

Cluster::Cluster(Server *svr, std::vector<std::string> binds, int port)
: svr_(svr), binds_(std::move(binds)), port_(port), size_(0), version_(-1), myself_(nullptr) {
Cluster::Cluster(Server *srv, std::vector<std::string> binds, int port)
: srv_(srv), binds_(std::move(binds)), port_(port), size_(0), version_(-1), myself_(nullptr) {
for (auto &slots_node : slots_nodes_) {
slots_node = nullptr;
}
Expand Down Expand Up @@ -127,7 +127,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const s
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
auto s = svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slot: " << s.ToString();
}
Expand Down Expand Up @@ -212,7 +212,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
if (!migrated_slots_.empty()) {
for (auto &it : migrated_slots_) {
if (slots_nodes_[it.first] != myself_) {
auto s = svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, it.first);
auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, it.first);
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slots: " << s.ToString();
}
Expand All @@ -228,21 +228,21 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b

// Set replication relationship by cluster topology setting
Status Cluster::SetMasterSlaveRepl() {
if (!svr_) return Status::OK();
if (!srv_) return Status::OK();

if (!myself_) return Status::OK();

if (myself_->role == kClusterMaster) {
// Master mode
auto s = svr_->RemoveMaster();
auto s = srv_->RemoveMaster();
if (!s.IsOK()) {
return s.Prefixed("failed to remove master");
}
LOG(INFO) << "MASTER MODE enabled by cluster topology setting";
} else if (nodes_.find(myself_->master_id) != nodes_.end()) {
// Replica mode and master node is existing
std::shared_ptr<ClusterNode> master = nodes_[myself_->master_id];
auto s = svr_->AddMaster(master->host, master->port, false);
auto s = srv_->AddMaster(master->host, master->port, false);
if (!s.IsOK()) {
LOG(WARNING) << "SLAVE OF " << master->host << ":" << master->port
<< " wasn't enabled by cluster topology setting, encounter error: " << s.Msg();
Expand All @@ -254,7 +254,7 @@ Status Cluster::SetMasterSlaveRepl() {
return Status::OK();
}

bool Cluster::IsNotMaster() { return myself_ == nullptr || myself_->role != kClusterMaster || svr_->IsSlave(); }
bool Cluster::IsNotMaster() { return myself_ == nullptr || myself_->role != kClusterMaster || srv_->IsSlave(); }

Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {
if (!IsValidSlot(slot)) {
Expand All @@ -264,7 +264,7 @@ Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {
// It is called by slot-migrating thread which is an asynchronous thread.
// Therefore, it should be locked when a record is added to 'migrated_slots_'
// which will be accessed when executing commands.
auto exclusivity = svr_->WorkExclusivityGuard();
auto exclusivity = srv_->WorkExclusivityGuard();
migrated_slots_[slot] = ip_port;
return Status::OK();
}
Expand Down Expand Up @@ -306,7 +306,7 @@ Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrat
}

const auto &dst = nodes_[dst_node_id];
Status s = svr_->slot_migrator->PerformSlotMigration(dst_node_id, dst->host, dst->port, slot, blocking_ctx);
Status s = srv_->slot_migrator->PerformSlotMigration(dst_node_id, dst->host, dst->port, slot, blocking_ctx);
return s;
}

Expand All @@ -321,34 +321,34 @@ Status Cluster::ImportSlot(redis::Connection *conn, int slot, int state) {

switch (state) {
case kImportStart:
if (!svr_->slot_import->Start(conn->GetFD(), slot)) {
if (!srv_->slot_import->Start(conn->GetFD(), slot)) {
return {Status::NotOK, fmt::format("Can't start importing slot {}", slot)};
}

// Set link importing
conn->SetImporting();
myself_->importing_slot = slot;
// Set link error callback
conn->close_cb = [object_ptr = svr_->slot_import.get(), capture_fd = conn->GetFD()](int fd) {
conn->close_cb = [object_ptr = srv_->slot_import.get(), capture_fd = conn->GetFD()](int fd) {
object_ptr->StopForLinkError(capture_fd);
};
// Stop forbidding writing slot to accept write commands
if (slot == svr_->slot_migrator->GetForbiddenSlot()) svr_->slot_migrator->ReleaseForbiddenSlot();
if (slot == srv_->slot_migrator->GetForbiddenSlot()) srv_->slot_migrator->ReleaseForbiddenSlot();
LOG(INFO) << "[import] Start importing slot " << slot;
break;
case kImportSuccess:
if (!svr_->slot_import->Success(slot)) {
if (!srv_->slot_import->Success(slot)) {
LOG(ERROR) << "[import] Failed to set slot importing success, maybe slot is wrong"
<< ", received slot: " << slot << ", current slot: " << svr_->slot_import->GetSlot();
<< ", received slot: " << slot << ", current slot: " << srv_->slot_import->GetSlot();
return {Status::NotOK, fmt::format("Failed to set slot {} importing success", slot)};
}

LOG(INFO) << "[import] Succeed to import slot " << slot;
break;
case kImportFailed:
if (!svr_->slot_import->Fail(slot)) {
if (!srv_->slot_import->Fail(slot)) {
LOG(ERROR) << "[import] Failed to set slot importing error, maybe slot is wrong"
<< ", received slot: " << slot << ", current slot: " << svr_->slot_import->GetSlot();
<< ", received slot: " << slot << ", current slot: " << srv_->slot_import->GetSlot();
return {Status::NotOK, fmt::format("Failed to set slot {} importing error", slot)};
}

Expand Down Expand Up @@ -395,15 +395,15 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
"cluster_my_epoch:" +
std::to_string(version_) + "\r\n";

if (myself_ != nullptr && myself_->role == kClusterMaster && !svr_->IsSlave()) {
if (myself_ != nullptr && myself_->role == kClusterMaster && !srv_->IsSlave()) {
// Get migrating status
std::string migrate_infos;
svr_->slot_migrator->GetMigrationInfo(&migrate_infos);
srv_->slot_migrator->GetMigrationInfo(&migrate_infos);
*cluster_infos += migrate_infos;

// Get importing status
std::string import_infos;
svr_->slot_import->GetImportInfo(&import_infos);
srv_->slot_import->GetImportInfo(&import_infos);
*cluster_infos += import_infos;
}

Expand Down Expand Up @@ -743,7 +743,7 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
return Status::OK();
}

bool Cluster::IsWriteForbiddenSlot(int slot) { return svr_->slot_migrator->GetForbiddenSlot() == slot; }
bool Cluster::IsWriteForbiddenSlot(int slot) { return srv_->slot_migrator->GetForbiddenSlot() == slot; }

Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn) {
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class SyncMigrateContext;

class Cluster {
public:
explicit Cluster(Server *svr, std::vector<std::string> binds, int port);
explicit Cluster(Server *srv, std::vector<std::string> binds, int port);
Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force);
Status GetClusterNodes(std::string *nodes_str);
Status SetNodeId(const std::string &node_id);
Expand Down Expand Up @@ -99,7 +99,7 @@ class Cluster {
SlotInfo genSlotNodeInfo(int start, int end, const std::shared_ptr<ClusterNode> &n);
static Status parseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes);
Server *svr_;
Server *srv_;
std::vector<std::string> binds_;
int port_;
int size_;
Expand Down
10 changes: 5 additions & 5 deletions src/cluster/slot_import.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

#include "slot_import.h"

SlotImport::SlotImport(Server *svr)
: Database(svr->storage, kDefaultNamespace),
svr_(svr),
SlotImport::SlotImport(Server *srv)
: Database(srv->storage, kDefaultNamespace),
srv_(srv),
import_slot_(-1),
import_status_(kImportNone),
import_fd_(-1) {
Expand Down Expand Up @@ -62,7 +62,7 @@ bool SlotImport::Success(int slot) {
return false;
}

Status s = svr_->cluster->SetSlotImported(import_slot_);
Status s = srv_->cluster->SetSlotImported(import_slot_);
if (!s.IsOK()) {
LOG(ERROR) << "[import] Failed to set slot, Err: " << s.Msg();
return false;
Expand Down Expand Up @@ -107,7 +107,7 @@ void SlotImport::StopForLinkError(int fd) {
// 4. ClearKeysOfSlot can clear data although server is a slave, because ClearKeysOfSlot
// deletes data in rocksdb directly. Therefore, it is necessary to avoid clearing data gotten
// from new master.
if (!svr_->IsSlave()) {
if (!srv_->IsSlave()) {
// Clean imported slot data
auto s = ClearKeysOfSlot(namespace_, import_slot_);
if (!s.ok()) {
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/slot_import.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ enum ImportStatus {

class SlotImport : public redis::Database {
public:
explicit SlotImport(Server *svr);
explicit SlotImport(Server *srv);
~SlotImport() = default;

bool Start(int fd, int slot);
Expand All @@ -51,7 +51,7 @@ class SlotImport : public redis::Database {
void GetImportInfo(std::string *info);

private:
Server *svr_ = nullptr;
Server *srv_ = nullptr;
std::mutex mutex_;
int import_slot_;
int import_status_;
Expand Down
18 changes: 9 additions & 9 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ static std::map<RedisType, std::string> type_to_cmd = {
{kRedisZSet, "zadd"}, {kRedisBitmap, "setbit"}, {kRedisSortedint, "siadd"}, {kRedisStream, "xadd"},
};

SlotMigrator::SlotMigrator(Server *svr, int max_migration_speed, int max_pipeline_size, int seq_gap_limit)
: Database(svr->storage, kDefaultNamespace), svr_(svr) {
SlotMigrator::SlotMigrator(Server *srv, int max_migration_speed, int max_pipeline_size, int seq_gap_limit)
: Database(srv->storage, kDefaultNamespace), srv_(srv) {
// Let metadata_cf_handle_ be nullptr, and get them in real time to avoid accessing invalid pointer,
// because metadata_cf_handle_ and db_ will be destroyed if DB is reopened.
// [Situation]:
Expand Down Expand Up @@ -71,7 +71,7 @@ SlotMigrator::SlotMigrator(Server *svr, int max_migration_speed, int max_pipelin
seq_gap_limit_ = seq_gap_limit;
}

if (svr->IsSlave()) {
if (srv->IsSlave()) {
SetStopMigrationFlag(true);
}
}
Expand All @@ -92,9 +92,9 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin

migration_state_ = MigrationState::kStarted;

auto speed = svr_->GetConfig()->migrate_speed;
auto seq_gap = svr_->GetConfig()->sequence_gap;
auto pipeline_size = svr_->GetConfig()->pipeline_size;
auto speed = srv_->GetConfig()->migrate_speed;
auto seq_gap = srv_->GetConfig()->sequence_gap;
auto pipeline_size = srv_->GetConfig()->pipeline_size;

if (speed <= 0) {
speed = 0;
Expand Down Expand Up @@ -279,7 +279,7 @@ Status SlotMigrator::startMigration() {
dst_fd_.Reset(*result);

// Auth first
std::string pass = svr_->GetConfig()->requirepass;
std::string pass = srv_->GetConfig()->requirepass;
if (!pass.empty()) {
auto s = authOnDstNode(*dst_fd_, pass);
if (!s.IsOK()) {
Expand Down Expand Up @@ -396,7 +396,7 @@ Status SlotMigrator::finishSuccessfulMigration() {
}

std::string dst_ip_port = dst_ip_ + ":" + std::to_string(dst_port_);
s = svr_->cluster->SetSlotMigrated(migrating_slot_, dst_ip_port);
s = srv_->cluster->SetSlotMigrated(migrating_slot_, dst_ip_port);
if (!s.IsOK()) {
return s.Prefixed(fmt::format("failed to set slot {} as migrated to {}", migrating_slot_.load(), dst_ip_port));
}
Expand Down Expand Up @@ -896,7 +896,7 @@ void SlotMigrator::setForbiddenSlot(int16_t slot) {
// Block server to set forbidden slot
uint64_t during = util::GetTimeStampUS();
{
auto exclusivity = svr_->WorkExclusivityGuard();
auto exclusivity = srv_->WorkExclusivityGuard();
forbidden_slot_ = slot;
}
during = util::GetTimeStampUS() - during;
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class SyncMigrateContext;

class SlotMigrator : public redis::Database {
public:
explicit SlotMigrator(Server *svr, int max_migration_speed = kDefaultMaxMigrationSpeed,
explicit SlotMigrator(Server *srv, int max_migration_speed = kDefaultMaxMigrationSpeed,
int max_pipeline_size = kDefaultMaxPipelineSize, int seq_gap_limit = kDefaultSequenceGapLimit);
SlotMigrator(const SlotMigrator &other) = delete;
SlotMigrator &operator=(const SlotMigrator &other) = delete;
Expand Down Expand Up @@ -147,7 +147,7 @@ class SlotMigrator : public redis::Database {
static const int kMaxItemsInCommand = 16; // number of items in every write command of complex keys
static const int kMaxLoopTimes = 10;

Server *svr_;
Server *srv_;
int max_migration_speed_;
int max_pipeline_size_;
int seq_gap_limit_;
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/sync_migrate_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void SyncMigrateContext::Resume(const Status &migrate_result) {
}

void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) {
auto &&slot_migrator = svr_->slot_migrator;
auto &&slot_migrator = srv_->slot_migrator;

if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
timer_.reset();
Expand All @@ -52,7 +52,7 @@ void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) {
}

void SyncMigrateContext::TimerCB(int, int16_t events) {
auto &&slot_migrator = svr_->slot_migrator;
auto &&slot_migrator = srv_->slot_migrator;

conn_->Reply(redis::NilString());
timer_.reset();
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/sync_migrate_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class SyncMigrateContext : private EvbufCallbackBase<SyncMigrateContext, false>,
private EventCallbackBase<SyncMigrateContext> {
public:
SyncMigrateContext(Server *svr, redis::Connection *conn, int timeout) : svr_(svr), conn_(conn), timeout_(timeout){};
SyncMigrateContext(Server *srv, redis::Connection *conn, int timeout) : srv_(srv), conn_(conn), timeout_(timeout){};

void Suspend();
void Resume(const Status &migrate_result);
Expand All @@ -34,7 +34,7 @@ class SyncMigrateContext : private EvbufCallbackBase<SyncMigrateContext, false>,
void TimerCB(int, int16_t events);

private:
Server *svr_;
Server *srv_;
redis::Connection *conn_;
int timeout_ = 0;
UniqueEvent timer_;
Expand Down
20 changes: 10 additions & 10 deletions src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class CommandGetBit : public Commander {
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
bool bit = false;
redis::Bitmap bitmap_db(svr->storage, conn->GetNamespace());
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.GetBit(args_[1], offset_, &bit);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

Expand Down Expand Up @@ -75,9 +75,9 @@ class CommandSetBit : public Commander {
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
bool old_bit = false;
redis::Bitmap bitmap_db(svr->storage, conn->GetNamespace());
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.SetBit(args_[1], offset_, bit_, &old_bit);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

Expand Down Expand Up @@ -115,9 +115,9 @@ class CommandBitCount : public Commander {
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
uint32_t cnt = 0;
redis::Bitmap bitmap_db(svr->storage, conn->GetNamespace());
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitCount(args_[1], start_, stop_, &cnt);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

Expand Down Expand Up @@ -167,9 +167,9 @@ class CommandBitPos : public Commander {
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
int64_t pos = 0;
redis::Bitmap bitmap_db(svr->storage, conn->GetNamespace());
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

Expand Down Expand Up @@ -205,15 +205,15 @@ class CommandBitOp : public Commander {
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<Slice> op_keys;
op_keys.reserve(args_.size() - 2);
for (uint64_t i = 3; i < args_.size(); i++) {
op_keys.emplace_back(args_[i]);
}

int64_t dest_key_len = 0;
redis::Bitmap bitmap_db(svr->storage, conn->GetNamespace());
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitOp(op_flag_, args_[1], args_[2], op_keys, &dest_key_len);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

Expand Down
Loading