Skip to content

Commit

Permalink
Merge branch 'unstable' into change-stream-encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored Jul 15, 2024
2 parents 266feb6 + 79d53a1 commit dc3d2c5
Show file tree
Hide file tree
Showing 24 changed files with 535 additions and 242 deletions.
35 changes: 18 additions & 17 deletions .github/workflows/kvrocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,28 +129,29 @@ jobs:
compiler: auto
without_luajit: -DENABLE_LUAJIT=OFF
- name: Ubuntu GCC
os: ubuntu-20.04
os: ubuntu-22.04
compiler: gcc
- name: SonarCloud with Coverage
os: ubuntu-22.04
compiler: gcc
sonarcloud: -DCMAKE_CXX_FLAGS=--coverage
- name: Ubuntu Clang
os: ubuntu-20.04
compiler: clang
- name: Ubuntu 22 GCC
os: ubuntu-22.04
compiler: gcc
- name: Ubuntu 22 Clang
os: ubuntu-22.04
compiler: clang
# FIXME: https://github.com/apache/kvrocks/issues/2411
# - name: Ubuntu 24 GCC
# os: ubuntu-24.04
# compiler: gcc
# - name: Ubuntu 24 Clang
# os: ubuntu-24.04
# compiler: clang
- name: Ubuntu GCC ASan
os: ubuntu-20.04
os: ubuntu-22.04
without_jemalloc: -DDISABLE_JEMALLOC=ON
with_sanitizer: -DENABLE_ASAN=ON
compiler: gcc
- name: Ubuntu Clang ASan
os: ubuntu-20.04
os: ubuntu-22.04
with_sanitizer: -DENABLE_ASAN=ON
without_jemalloc: -DDISABLE_JEMALLOC=ON
compiler: clang
Expand All @@ -161,46 +162,46 @@ jobs:
compiler: gcc
ignore_when_tsan: -tags="ignore_when_tsan"
- name: Ubuntu Clang TSan
os: ubuntu-20.04
os: ubuntu-22.04
with_sanitizer: -DENABLE_TSAN=ON
without_jemalloc: -DDISABLE_JEMALLOC=ON
compiler: clang
ignore_when_tsan: -tags="ignore_when_tsan"
- name: Ubuntu Clang UBSAN
os: ubuntu-20.04
os: ubuntu-22.04
with_sanitizer: -DENABLE_UBSAN=ON
without_jemalloc: -DDISABLE_JEMALLOC=ON
compiler: clang
- name: Ubuntu GCC Ninja
os: ubuntu-20.04
os: ubuntu-22.04
with_ninja: --ninja
compiler: gcc
- name: Ubuntu GCC with OpenSSL
os: ubuntu-20.04
os: ubuntu-22.04
compiler: gcc
with_openssl: -DENABLE_OPENSSL=ON
- name: Ubuntu Clang with OpenSSL
os: ubuntu-22.04
compiler: clang
with_openssl: -DENABLE_OPENSSL=ON
- name: Ubuntu GCC without luaJIT
os: ubuntu-20.04
os: ubuntu-22.04
without_luajit: -DENABLE_LUAJIT=OFF
compiler: gcc
- name: Ubuntu Clang without luaJIT
os: ubuntu-20.04
os: ubuntu-22.04
without_luajit: -DENABLE_LUAJIT=OFF
compiler: clang
- name: Ubuntu GCC with old encoding
os: ubuntu-20.04
os: ubuntu-22.04
compiler: gcc
new_encoding: -DENABLE_NEW_ENCODING=FALSE
- name: Ubuntu Clang with old encoding
os: ubuntu-22.04
compiler: clang
new_encoding: -DENABLE_NEW_ENCODING=FALSE
- name: Ubuntu GCC with speedb enabled
os: ubuntu-20.04
os: ubuntu-22.04
compiler: gcc
with_speedb: -DENABLE_SPEEDB=ON

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr-lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ jobs:
revert
style
test
release
# Configure which scopes are allowed (newline-delimited).
# These are regex patterns auto-wrapped in `^ $`.
scopes: |
Expand Down
105 changes: 62 additions & 43 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const s
// 2. Add the slot into to-assign node
// 3. Update the map of slots to nodes.
// remember: The atomicity of the process is based on
// the transactionality of ClearKeysOfSlot().
// the transactionality of ClearKeysOfSlotRange().
for (auto [s_start, s_end] : slot_ranges) {
for (int slot = s_start; slot <= s_end; slot++) {
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
Expand All @@ -129,7 +129,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const s
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace, SlotRange::GetPoint(slot));
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slot: " << s.ToString();
}
Expand Down Expand Up @@ -214,7 +214,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
if (!migrated_slots_.empty()) {
for (const auto &[slot, _] : migrated_slots_) {
if (slots_nodes_[slot] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace, SlotRange::GetPoint(slot));
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slots: " << s.ToString();
}
Expand Down Expand Up @@ -258,41 +258,53 @@ Status Cluster::SetMasterSlaveRepl() {

bool Cluster::IsNotMaster() { return myself_ == nullptr || myself_->role != kClusterMaster || srv_->IsSlave(); }

Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {
if (!IsValidSlot(slot)) {
return {Status::NotOK, errSlotOutOfRange};
Status Cluster::SetSlotRangeMigrated(const SlotRange &slot_range, const std::string &ip_port) {
if (!slot_range.IsValid()) {
return {Status::NotOK, errSlotRangeInvalid};
}

// It is called by slot-migrating thread which is an asynchronous thread.
// Therefore, it should be locked when a record is added to 'migrated_slots_'
// which will be accessed when executing commands.
auto exclusivity = srv_->WorkExclusivityGuard();
migrated_slots_[slot] = ip_port;
for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
migrated_slots_[slot] = ip_port;
}
return Status::OK();
}

Status Cluster::SetSlotImported(int slot) {
if (!IsValidSlot(slot)) {
return {Status::NotOK, errSlotOutOfRange};
Status Cluster::SetSlotRangeImported(const SlotRange &slot_range) {
if (!slot_range.IsValid()) {
return {Status::NotOK, errSlotRangeInvalid};
}

// It is called by command 'cluster import'. When executing the command, the
// exclusive lock has been locked. Therefore, it can't be locked again.
imported_slots_.insert(slot);
for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
imported_slots_.insert(slot);
}
return Status::OK();
}

Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx) {
Status Cluster::MigrateSlotRange(const SlotRange &slot_range, const std::string &dst_node_id,
SyncMigrateContext *blocking_ctx) {
if (nodes_.find(dst_node_id) == nodes_.end()) {
return {Status::NotOK, "Can't find the destination node id"};
}

if (!IsValidSlot(slot)) {
return {Status::NotOK, errSlotOutOfRange};
if (!slot_range.IsValid()) {
return {Status::NotOK, errSlotRangeInvalid};
}

if (!migrated_slots_.empty() &&
slot_range.HasOverlap({migrated_slots_.begin()->first, migrated_slots_.rbegin()->first})) {
return {Status::NotOK, "Can't migrate slot which has been migrated"};
}

if (slots_nodes_[slot] != myself_) {
return {Status::NotOK, "Can't migrate slot which doesn't belong to me"};
for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
if (slots_nodes_[slot] != myself_) {
return {Status::NotOK, "Can't migrate slot which doesn't belong to me"};
}
}

if (IsNotMaster()) {
Expand All @@ -308,51 +320,55 @@ Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrat
}

const auto &dst = nodes_[dst_node_id];
Status s = srv_->slot_migrator->PerformSlotMigration(dst_node_id, dst->host, dst->port, slot, blocking_ctx);
Status s =
srv_->slot_migrator->PerformSlotRangeMigration(dst_node_id, dst->host, dst->port, slot_range, blocking_ctx);
return s;
}

Status Cluster::ImportSlot(redis::Connection *conn, int slot, int state) {
Status Cluster::ImportSlotRange(redis::Connection *conn, const SlotRange &slot_range, int state) {
if (IsNotMaster()) {
return {Status::NotOK, "Slave can't import slot"};
}

if (!IsValidSlot(slot)) {
return {Status::NotOK, errSlotOutOfRange};
if (!slot_range.IsValid()) {
return {Status::NotOK, errSlotRangeInvalid};
}
auto source_node = srv_->cluster->slots_nodes_[slot];
if (source_node && source_node->id == myid_) {
return {Status::NotOK, "Can't import slot which belongs to me"};

for (auto slot = slot_range.start; slot <= slot_range.end; slot++) {
auto source_node = srv_->cluster->slots_nodes_[slot];
if (source_node && source_node->id == myid_) {
return {Status::NotOK, "Can't import slot which belongs to me"};
}
}

Status s;
switch (state) {
case kImportStart:
s = srv_->slot_import->Start(slot);
s = srv_->slot_import->Start(slot_range);
if (!s.IsOK()) return s;

// Set link importing
conn->SetImporting();
myself_->importing_slot = slot;
myself_->importing_slot_range = slot_range;
// Set link error callback
conn->close_cb = [object_ptr = srv_->slot_import.get(), slot](int fd) {
conn->close_cb = [object_ptr = srv_->slot_import.get(), slot_range](int fd) {
auto s = object_ptr->StopForLinkError();
if (!s.IsOK()) {
LOG(ERROR) << fmt::format("[import] Failed to stop importing slot {}: {}", slot, s.Msg());
LOG(ERROR) << fmt::format("[import] Failed to stop importing slot(s) {}: {}", slot_range.String(), s.Msg());
}
}; // Stop forbidding writing slot to accept write commands
if (slot == srv_->slot_migrator->GetForbiddenSlot()) srv_->slot_migrator->ReleaseForbiddenSlot();
LOG(INFO) << fmt::format("[import] Start importing slot {}", slot);
if (slot_range == srv_->slot_migrator->GetForbiddenSlotRange()) srv_->slot_migrator->ReleaseForbiddenSlotRange();
LOG(INFO) << fmt::format("[import] Start importing slot(s) {}", slot_range.String());
break;
case kImportSuccess:
s = srv_->slot_import->Success(slot);
s = srv_->slot_import->Success(slot_range);
if (!s.IsOK()) return s;
LOG(INFO) << fmt::format("[import] Mark the importing slot {} as succeed", slot);
LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as succeed", slot_range.String());
break;
case kImportFailed:
s = srv_->slot_import->Fail(slot);
s = srv_->slot_import->Fail(slot_range);
if (!s.IsOK()) return s;
LOG(INFO) << fmt::format("[import] Mark the importing slot {} as failed", slot);
LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as failed", slot_range.String());
break;
default:
return {Status::NotOK, errInvalidImportState};
Expand Down Expand Up @@ -550,15 +566,16 @@ std::string Cluster::genNodesDescription() {
// Just for MYSELF node to show the importing/migrating slot
if (node->id == myid_) {
if (srv_->slot_migrator) {
auto migrating_slot = srv_->slot_migrator->GetMigratingSlot();
if (migrating_slot != -1) {
node_str.append(fmt::format(" [{}->-{}]", migrating_slot, srv_->slot_migrator->GetDstNode()));
auto migrating_slot_range = srv_->slot_migrator->GetMigratingSlotRange();
if (migrating_slot_range.IsValid()) {
node_str.append(fmt::format(" [{}->-{}]", migrating_slot_range.String(), srv_->slot_migrator->GetDstNode()));
}
}
if (srv_->slot_import) {
auto importing_slot = srv_->slot_import->GetSlot();
if (importing_slot != -1) {
node_str.append(fmt::format(" [{}-<-{}]", importing_slot, getNodeIDBySlot(importing_slot)));
auto importing_slot_range = srv_->slot_import->GetSlotRange();
if (importing_slot_range.IsValid()) {
node_str.append(
fmt::format(" [{}-<-{}]", importing_slot_range.String(), getNodeIDBySlot(importing_slot_range.start)));
}
}
}
Expand Down Expand Up @@ -802,7 +819,9 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
return Status::OK();
}

bool Cluster::IsWriteForbiddenSlot(int slot) const { return srv_->slot_migrator->GetForbiddenSlot() == slot; }
bool Cluster::IsWriteForbiddenSlot(int slot) const {
return srv_->slot_migrator->GetForbiddenSlotRange().Contains(slot);
}

Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn) {
Expand Down Expand Up @@ -846,7 +865,7 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
return Status::OK(); // I'm serving this slot
}

if (myself_ && myself_->importing_slot == slot &&
if (myself_ && myself_->importing_slot_range.Contains(slot) &&
(conn->IsImporting() || conn->IsFlagEnabled(redis::Connection::kAsking))) {
// While data migrating, the topology of the destination node has not been changed.
// The destination node has to serve the requests from the migrating slot,
Expand Down Expand Up @@ -874,10 +893,10 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
// Only HARD mode is meaningful to the Kvrocks cluster,
// so it will force clearing all information after resetting.
Status Cluster::Reset() {
if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlot() != -1) {
if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlotRange().IsValid()) {
return {Status::NotOK, "Can't reset cluster while migrating slot"};
}
if (srv_->slot_import && srv_->slot_import->GetSlot() != -1) {
if (srv_->slot_import && srv_->slot_import->GetSlotRange().IsValid()) {
return {Status::NotOK, "Can't reset cluster while importing slot"};
}
if (!srv_->storage->IsEmptyDB()) {
Expand Down
11 changes: 6 additions & 5 deletions src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ClusterNode {
std::string master_id;
std::bitset<kClusterSlots> slots;
std::vector<std::string> replicas;
int importing_slot = -1;
SlotRange importing_slot_range = {-1, -1};
};

struct SlotInfo {
Expand All @@ -74,8 +74,8 @@ class Cluster {
StatusOr<std::string> GetReplicas(const std::string &node_id);
Status SetNodeId(const std::string &node_id);
Status SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const std::string &node_id, int64_t version);
Status SetSlotMigrated(int slot, const std::string &ip_port);
Status SetSlotImported(int slot);
Status SetSlotRangeMigrated(const SlotRange &slot_range, const std::string &ip_port);
Status SetSlotRangeImported(const SlotRange &slot_range);
Status GetSlotsInfo(std::vector<SlotInfo> *slot_infos);
Status GetClusterInfo(std::string *cluster_infos);
int64_t GetVersion() const { return version_; }
Expand All @@ -85,8 +85,9 @@ class Cluster {
Status CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn);
Status SetMasterSlaveRepl();
Status MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx = nullptr);
Status ImportSlot(redis::Connection *conn, int slot, int state);
Status MigrateSlotRange(const SlotRange &slot_range, const std::string &dst_node_id,
SyncMigrateContext *blocking_ctx = nullptr);
Status ImportSlotRange(redis::Connection *conn, const SlotRange &slot_range, int state);
std::string GetMyId() const { return myid_; }
Status DumpClusterNodes(const std::string &file);
Status LoadClusterNodes(const std::string &file_path);
Expand Down
Loading

0 comments on commit dc3d2c5

Please sign in to comment.