From 9b89c741135334dea32304a55dee568223928ca7 Mon Sep 17 00:00:00 2001 From: karelrooted Date: Mon, 12 Apr 2021 18:49:49 +0800 Subject: [PATCH] Remove codis support (#217) Remove codis support (well no one use codis anymore, will add a redis cluster like feature in next major version. Msetbit cmd was also removed because is no longer needed (previous added for codis support) --- CMakeLists.txt | 6 - README.md | 1 - kvrocks.conf | 5 - src/Makefile | 2 +- src/compact_filter.cc | 68 --- src/compact_filter.h | 36 -- src/compaction_checker.cc | 6 +- src/compaction_checker.h | 2 +- src/config.cc | 7 - src/config.h | 1 - src/redis_bitmap.cc | 29 - src/redis_bitmap.h | 6 - src/redis_cmd.cc | 425 ------------- src/redis_db.cc | 26 - src/redis_db.h | 18 - src/redis_slot.cc | 1055 --------------------------------- src/redis_slot.h | 227 ------- src/server.cc | 13 +- src/server.h | 2 - src/storage.cc | 67 --- src/storage.h | 3 - tests/config_test.cc | 1 - tools/kvrocks2redis/parser.cc | 1 - 23 files changed, 4 insertions(+), 2003 deletions(-) delete mode 100644 src/redis_slot.cc delete mode 100644 src/redis_slot.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 56089c70f1a..10bc78681af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -121,8 +121,6 @@ target_sources(kvrocks PRIVATE src/redis_pubsub.h src/redis_sortedint.cc src/redis_sortedint.h - src/redis_slot.cc - src/redis_slot.h src/lock_manager.cc src/rocksdb_crc32c.h src/config.cc @@ -213,8 +211,6 @@ target_sources(kvrocks2redis PRIVATE src/redis_pubsub.h src/redis_sortedint.cc src/redis_sortedint.h - src/redis_slot.cc - src/redis_slot.h src/replication.cc src/replication.h src/lock_manager.cc @@ -305,8 +301,6 @@ add_executable(unittest src/redis_geo.cc src/redis_sortedint.cc src/redis_sortedint.h - src/redis_slot.cc - src/redis_slot.h src/util.cc src/geohash.cc src/storage.cc diff --git a/README.md b/README.md index 6c09850946f..060803786de 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,6 @@ kvrocks has the following key features: - Namespace, similar to redis db but use token per namespace - Replication, async replication using binlog like MySQL - High Available, supports redis sentinel to failover when master or slave was failed -- Codis Protocol, the user can use the codis proxy and dashboard to manage the kvrocks > Thanks for @smartlee and the trip.com designer(@范世丽) contributes the kvrocks logo for us diff --git a/kvrocks.conf b/kvrocks.conf index bc72ef4d3bf..deafa5631f0 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -178,11 +178,6 @@ max-backup-to-keep 1 # default: 1 Week max-backup-keep-hours 168 -# Enable the kvrocks to support the codis protocol, if the db enabled the codis mode at first open, -# this option must not be disabled after restarted, and vice versa -# Defalut: no -codis-enabled no - # Ratio of the samples would be recorded when the profiling was enabled. # we simply use the rand to determine whether to record the sample or not. # diff --git a/src/Makefile b/src/Makefile index e71c69d70f3..7ffe2cba87b 100644 --- a/src/Makefile +++ b/src/Makefile @@ -34,7 +34,7 @@ FINAL_LIBS+= $(GLOG) $(LIBEVENT) $(LIBEVENT_PTHREADS) $(JEMALLOC) $(ROCKSDB) SHARED_OBJS= compact_filter.o config.o cron.o encoding.o event_listener.o lock_manager.o \ log_collector.o redis_bitmap.o redis_bitmap_string.o redis_cmd.o redis_connection.o redis_db.o \ redis_hash.o redis_list.o redis_metadata.o redis_pubsub.o redis_reply.o \ - redis_request.o redis_set.o redis_string.o redis_zset.o redis_geo.o redis_slot.o replication.o \ + redis_request.o redis_set.o redis_string.o redis_zset.o redis_geo.o replication.o \ server.o stats.o storage.o task_runner.o util.o geohash.o worker.o redis_sortedint.o \ compaction_checker.o table_properties_collector.o KVROCKS_OBJS= $(SHARED_OBJS) main.o diff --git a/src/compact_filter.cc b/src/compact_filter.cc index 89902a200f9..be0ccea09bf 100644 --- a/src/compact_filter.cc +++ b/src/compact_filter.cc @@ -3,7 +3,6 @@ #include #include #include "redis_bitmap.h" -#include "redis_slot.h" namespace Engine { using rocksdb::Slice; @@ -101,71 +100,4 @@ bool SubKeyFilter::Filter(int level, << ", result: " << (result ? "deleted" : "reserved"); return result; } - -bool SlotKeyFilter::IsKeyDeleted(const SlotInternalKey &ikey, const Slice &value) const { - std::string metadata_key; - - auto db = stor_->GetDB(); - const auto cf_handles = stor_->GetCFHandles(); - // storage close the would delete the column familiy handler and DB - if (!db || cf_handles->size() < 2) return false; - - auto slot_num = GetSlotNumFromKey(ikey.GetKey().ToString()); - PutFixed32(&metadata_key, slot_num); - if (cached_key_.empty() || metadata_key != cached_key_) { - std::string bytes; - if (!stor_->IncrDBRefs().IsOK()) { // the db is closing, don't use DB and cf_handles - return false; - } - rocksdb::Status s = db->Get(rocksdb::ReadOptions(), (*cf_handles)[4], metadata_key, &bytes); - stor_->DecrDBRefs(); - cached_key_ = std::move(metadata_key); - if (s.ok()) { - cached_metadata_ = std::move(bytes); - } else if (s.IsNotFound()) { - // metadata was deleted - // clear the metadata - cached_metadata_.clear(); - return true; - } else { - LOG(ERROR) << "[compact_filter/slotkey] Failed to fetch metadata" - << ", key: " << ikey.GetKey().ToString() - << ", err: " << s.ToString(); - cached_key_.clear(); - cached_metadata_.clear(); - return false; - } - } - // the metadata was not found, delete the subkey - if (cached_metadata_.empty()) return true; - - SlotMetadata metadata(false); - rocksdb::Status s = metadata.Decode(cached_metadata_); - if (!s.ok()) { - cached_key_.clear(); - LOG(ERROR) << "[compact_filter/slotkey] Failed to decode metadata" - << ", key: " << ikey.GetKey().ToString() - << ", err: " << s.ToString(); - return false; - } - // subkey's version wasn't equal to metadata's version means - // that key was deleted and created new one, so the old subkey - // should be deleted as well. - return ikey.GetVersion() != metadata.version; -} - -bool SlotKeyFilter::Filter(int level, - const Slice &key, - const Slice &value, - std::string *new_value, - bool *modified) const { - SlotInternalKey ikey(key); - bool result = IsKeyDeleted(ikey, value); - DLOG(INFO) << "[compact_filter/slotkey] " - << ", slot_num: " << ikey.GetSlotNum() - << ", metadata key: " << ikey.GetKey().ToString() - << ", verison: " << ikey.GetVersion() - << ", result: " << (result ? "deleted" : "reserved"); - return result; -} } // namespace Engine diff --git a/src/compact_filter.h b/src/compact_filter.h index 785d03d2083..151f30031b7 100644 --- a/src/compact_filter.h +++ b/src/compact_filter.h @@ -8,7 +8,6 @@ #include #include "redis_metadata.h" -#include "redis_slot.h" #include "storage.h" namespace Engine { @@ -80,39 +79,4 @@ class PubSubFilterFactory : public rocksdb::CompactionFilterFactory { return std::unique_ptr(new PubSubFilter()); } }; - -class SlotKeyFilter : public rocksdb::CompactionFilter { - public: - explicit SlotKeyFilter(Storage *storage) - : cached_key_(""), - cached_metadata_(""), - stor_(storage) {} - - const char *Name() const override { return "SlotKeyFilter"; } - bool IsKeyDeleted(const SlotInternalKey &ikey, const Slice &value) const; - bool Filter(int level, const Slice &key, const Slice &value, - std::string *new_value, bool *modified) const override; - - protected: - mutable std::string cached_key_; - mutable std::string cached_metadata_; - Engine::Storage *stor_; -}; - -class SlotKeyFilterFactory : public rocksdb::CompactionFilterFactory { - public: - explicit SlotKeyFilterFactory(Engine::Storage *storage) { - stor_ = storage; - } - - const char *Name() const override { return "SlotKeyFilterFactory"; } - std::unique_ptr CreateCompactionFilter( - const rocksdb::CompactionFilter::Context &context) override { - return std::unique_ptr( - new SlotKeyFilter(stor_)); - } - - private: - Engine::Storage *stor_ = nullptr; -}; } // namespace Engine diff --git a/src/compaction_checker.cc b/src/compaction_checker.cc index 1d0257e9561..8237c0a3d3f 100644 --- a/src/compaction_checker.cc +++ b/src/compaction_checker.cc @@ -2,14 +2,10 @@ #include #include "storage.h" -void CompactionChecker::CompactPubsubAndSlotFiles() { +void CompactionChecker::CompactPubsubFiles() { rocksdb::CompactRangeOptions compact_opts; compact_opts.change_level = true; std::vector cf_names = {Engine::kPubSubColumnFamilyName}; - if (storage_->CodisEnabled()) { - cf_names.emplace_back(Engine::kSlotColumnFamilyName); - cf_names.emplace_back(Engine::kSlotMetadataColumnFamilyName); - } for (const auto &cf_name : cf_names) { // the db is closing, don't use DB and cf_handles if (!storage_->IncrDBRefs().IsOK()) return; diff --git a/src/compaction_checker.h b/src/compaction_checker.h index f474229b769..6ccb86d1ce1 100644 --- a/src/compaction_checker.h +++ b/src/compaction_checker.h @@ -10,7 +10,7 @@ class CompactionChecker { explicit CompactionChecker(Engine::Storage *storage):storage_(storage) {} ~CompactionChecker() {} void PickCompactionFiles(const std::string &cf_name); - void CompactPubsubAndSlotFiles(); + void CompactPubsubFiles(); private: Engine::Storage *storage_ = nullptr; }; diff --git a/src/config.cc b/src/config.cc index fd38c91c8ef..c8b8248093a 100644 --- a/src/config.cc +++ b/src/config.cc @@ -71,7 +71,6 @@ Config::Config() { {"maxclients", false, new IntField(&maxclients, 10240, 0, INT_MAX)}, {"max-backup-to-keep", false, new IntField(&max_backup_to_keep, 1, 0, 64)}, {"max-backup-keep-hours", false, new IntField(&max_backup_keep_hours, 0, 0, INT_MAX)}, - {"codis-enabled", true, new YesNoField(&codis_enabled, false)}, {"master-use-repl-port", false, new YesNoField(&master_use_repl_port, false)}, {"requirepass", false, new StringField(&requirepass, "")}, {"masterauth", false, new StringField(&masterauth, "")}, @@ -354,9 +353,6 @@ Status Config::finish() { if (requirepass.empty() && !tokens.empty()) { return Status(Status::NotOK, "requirepass empty wasn't allowed while the namespace exists"); } - if (codis_enabled && !tokens.empty()) { - return Status(Status::NotOK, "enabled codis wasn't allowed while the namespace exists"); - } if (db_dir.empty()) db_dir = dir + "/db"; if (backup_dir.empty()) backup_dir = dir + "/backup"; if (log_dir.empty()) log_dir = dir; @@ -506,9 +502,6 @@ Status Config::AddNamespace(const std::string &ns, const std::string &token) { if (requirepass.empty()) { return Status(Status::NotOK, "forbidden to add namespace when requirepass was empty"); } - if (codis_enabled) { - return Status(Status::NotOK, "forbidden to add namespace when codis mode was enabled"); - } auto s = isNamespaceLegal(ns); if (!s.IsOK()) return s; if (tokens.find(token) != tokens.end()) { diff --git a/src/config.h b/src/config.h index 6357cccbe0e..9a3da945c71 100644 --- a/src/config.h +++ b/src/config.h @@ -62,7 +62,6 @@ struct Config{ int max_db_size = 0; int max_replication_mb = 0; int max_io_mb = 0; - bool codis_enabled = false; bool master_use_repl_port = false; std::vector binds; std::string dir; diff --git a/src/redis_bitmap.cc b/src/redis_bitmap.cc index e62cd97ac8c..9ba0b92ae3c 100644 --- a/src/redis_bitmap.cc +++ b/src/redis_bitmap.cc @@ -134,35 +134,6 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t offset, bool new_ return storage_->Write(rocksdb::WriteOptions(), &batch); } -rocksdb::Status Bitmap::MSetBit(const Slice &user_key, const std::vector &pairs) { - std::string ns_key, raw_value; - AppendNamespacePrefix(user_key, &ns_key); - - LockGuard guard(storage_->GetLockManager(), ns_key); - BitmapMetadata metadata; - rocksdb::Status s = GetMetadata(ns_key, &metadata, &raw_value); - if (!s.ok() && !s.IsNotFound()) return s; - - uint32_t cnt = 0; - rocksdb::WriteBatch batch; - WriteBatchLogData log_data(kRedisBitmap); - batch.PutLogData(log_data.Encode()); - for (const auto &pair : pairs) { - std::string sub_key; - InternalKey(ns_key, std::to_string(pair.index), metadata.version).Encode(&sub_key); - batch.Put(sub_key, pair.value); - - for (size_t j = 0; j < pair.value.size(); j++) { - cnt += kNum2Bits[static_cast(pair.value[j])]; - } - } - metadata.size = cnt; - std::string bytes; - metadata.Encode(&bytes); - batch.Put(metadata_cf_handle_, ns_key, bytes); - return storage_->Write(rocksdb::WriteOptions(), &batch); -} - rocksdb::Status Bitmap::BitCount(const Slice &user_key, int start, int stop, uint32_t *cnt) { *cnt = 0; std::string ns_key, raw_value; diff --git a/src/redis_bitmap.h b/src/redis_bitmap.h index 82eefb5384d..a38fa7c40c9 100644 --- a/src/redis_bitmap.h +++ b/src/redis_bitmap.h @@ -6,11 +6,6 @@ #include #include -typedef struct { - uint32_t index; - Slice value; -} BitmapPair; - namespace Redis { class Bitmap : public Database { @@ -18,7 +13,6 @@ class Bitmap : public Database { Bitmap(Engine::Storage *storage, const std::string &ns): Database(storage, ns) {} rocksdb::Status GetBit(const Slice &user_key, uint32_t offset, bool *bit); rocksdb::Status SetBit(const Slice &user_key, uint32_t offset, bool new_bit, bool *old_bit); - rocksdb::Status MSetBit(const Slice &user_key, const std::vector &pairs); rocksdb::Status BitCount(const Slice &user_key, int start, int stop, uint32_t *cnt); rocksdb::Status BitPos(const Slice &user_key, bool bit, int start, int stop, bool stop_given, int *pos); static bool GetBitFromValueAndOffset(const std::string &value, const uint32_t offset); diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc index d244c16ffb3..5d7354d717f 100644 --- a/src/redis_cmd.cc +++ b/src/redis_cmd.cc @@ -23,7 +23,6 @@ #include "redis_geo.h" #include "redis_pubsub.h" #include "redis_sortedint.h" -#include "redis_slot.h" #include "replication.h" #include "util.h" #include "storage.h" @@ -744,26 +743,6 @@ class CommandSetBit : public Commander { bool bit_ = false; }; -class CommandMSetBit : public Commander { - public: - CommandMSetBit() : Commander("msetbit", -4, true) {} - - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); - std::vector kvs; - uint32_t index; - for (size_t i = 2; i < args_.size(); i += 2) { - Status s = getBitOffsetFromArgument(args_[i], &index); - if (!s.IsOK()) return s; - kvs.emplace_back(BitmapPair{index, args_[i + 1]}); - } - rocksdb::Status s = bitmap_db.MSetBit(args_[1], kvs); - if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); - *output = Redis::SimpleString("OK"); - return Status::OK(); - } -}; - class CommandBitCount : public Commander { public: CommandBitCount() : Commander("bitcount", -2, false) {} @@ -4182,392 +4161,6 @@ class CommandDBName : public Commander { } }; -class CommandSlotsInfo : public Commander { - public: - CommandSlotsInfo() : Commander("slotsinfo", -1, false) {} - Status Parse(const std::vector &args) override { - if (args.size() > 1) { - try { - start_ = std::stoi(args[1]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - } - if (args.size() > 2) { - try { - count_ = std::stoi(args[2]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - } - return Commander::Parse(args); - } - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - - std::vector slot_counts; - rocksdb::Status s = - slot_db.GetInfo(start_, count_, &slot_counts); - if (!s.ok()) { - return Status(Status::RedisExecErr, s.ToString()); - } - - output->append(Redis::MultiLen(slot_counts.size())); - for (const auto sc : slot_counts) { - output->append(Redis::MultiLen(2)); - output->append(Redis::Integer(sc.slot_num)); - output->append(Redis::Integer(sc.count)); - } - return Status::OK(); - } - - private: - int start_ = 0; - int count_ = HASH_SLOTS_SIZE - 1; -}; - -class CommandSlotsScan : public CommandScanBase { - public: - CommandSlotsScan() : CommandScanBase("slotsscan", -3, false) {} - Status Parse(const std::vector &args) override { - if (args.size() % 2 != 1) { - return Status(Status::RedisParseErr, errWrongNumOfArguments); - } - try { - slot_num_ = std::stoi(args[1]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - ParseCursor(args[2]); - if (args.size() == 5) { - Status s = ParseMatchAndCountParam(Util::ToLower(args[3]), args_[4]); - if (!s.IsOK()) { - return s; - } - } - return Commander::Parse(args); - } - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - std::vector keys; - auto s = slot_db.Scan(slot_num_, cursor, limit, &keys); - if (!s.ok() && !s.IsNotFound()) { - return Status(Status::RedisExecErr, s.ToString()); - } - - *output = GenerateOutput(keys); - return Status::OK(); - } - - private: - uint32_t slot_num_ = 0; -}; - -class CommandSlotsDel : public Commander { - public: - CommandSlotsDel() : Commander("slotsdel", -2, false) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - std::vector slot_nums; - uint32_t slot_num; - for (size_t i = 1; i < args_.size(); i++) { - try { - slot_num = std::stoi(args_[i]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - slot_nums.emplace_back(slot_num); - slot_db.Del(slot_num); - } - - output->append(Redis::MultiLen(slot_nums.size())); - for (const auto sn : slot_nums) { - output->append(Redis::MultiLen(2)); - output->append(Redis::Integer(sn)); - output->append(Redis::Integer(0)); - } - return Status::OK(); - } -}; - -class CommandSlotsMgrtBase : public Commander { - public: - explicit CommandSlotsMgrtBase(const std::string &name, int arity, bool is_write = false) - : Commander(name, arity, is_write) {} - Status Parse(const std::vector &args) override { - if (args.size() != static_cast(arity_)) { - return Status(Status::RedisParseErr, errWrongNumOfArguments); - } - host = args[1]; - try { - port = std::stoi(args[2]); - timeout = std::stoull(args[3]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - return Commander::Parse(args); - } - - protected: - std::string host; - uint32_t port = 0; - uint64_t timeout = 0; -}; - -class CommandSlotsMgrtSlot : public CommandSlotsMgrtBase { - public: - CommandSlotsMgrtSlot() : CommandSlotsMgrtBase("slotsmgrtslot", 5, true) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - - uint32_t slot_num; - try { - slot_num = std::stoi(args_[4]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - auto s = slot_db.MigrateSlotRandomOne(host, port, timeout, slot_num); - uint64_t size; - auto st = slot_db.Size(slot_num, &size); - - output->append(Redis::MultiLen(2)); - output->append(s.IsOK() ? Redis::Integer(1) : Redis::Integer(0)); - output->append(Redis::Integer(size)); - return Status::OK(); - } -}; - -class CommandSlotsMgrtOne : public CommandSlotsMgrtBase { - public: - CommandSlotsMgrtOne() : CommandSlotsMgrtBase("slotsmgrtone", 5, true) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - - auto s = slot_db.MigrateOne(host, port, timeout, args_[4]); - *output = s.IsOK() ? Redis::Integer(1) : Redis::Integer(0); - return Status::OK(); - } -}; - -class CommandSlotsMgrtTagSlot : public CommandSlotsMgrtBase { - public: - CommandSlotsMgrtTagSlot() : CommandSlotsMgrtBase("slotsmgrttagslot", 5, true) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - - uint32_t slot_num; - try { - slot_num = std::stoi(args_[4]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - int ret; - auto s = slot_db.MigrateTagSlot(host, port, timeout, slot_num, &ret); - uint64_t size; - auto st = slot_db.Size(slot_num, &size); - - output->append(Redis::MultiLen(2)); - output->append(s.IsOK() ? Redis::Integer(ret) : Redis::Integer(0)); - output->append(Redis::Integer(size)); - return Status::OK(); - } -}; - -class CommandSlotsMgrtTagOne : public CommandSlotsMgrtBase { - public: - CommandSlotsMgrtTagOne() : CommandSlotsMgrtBase("slotsmgrttagone", 5, true) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - - int ret; - auto s = slot_db.MigrateTag(host, port, timeout, args_[4], &ret); - *output = s.IsOK() ? Redis::Integer(ret) : Redis::Integer(0); - return Status::OK(); - } -}; - -class CommandSlotsMgrtTagSlotAsync : public CommandSlotsMgrtBase { - public: - CommandSlotsMgrtTagSlotAsync() - : CommandSlotsMgrtBase("slotsmgrttagslot-async", 8, true) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - uint32_t slot_num; - try { - slot_num = std::stoi(args_[6]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - uint32_t key_num; - try { - key_num = std::stoi(args_[7]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - auto s = svr->slotsmgrt_sender_thread_->SlotsMigrateBatch(host, port, timeout, slot_num, key_num); - if (!s.IsOK()) { - LOG(WARNING) << "Slot batch migrate keys error"; - return Status(Status::RedisExecErr, "Slot batch migrating keys error: " + s.Msg()); - } - - uint64_t moved = 0, remained = 0; - s = svr->slotsmgrt_sender_thread_->GetSlotsMigrateResult(&moved, &remained); - if (!s.IsOK()) { - LOG(WARNING) << "Slot batch migrate keys get result error"; - return Status(Status::RedisExecErr, "Slot batch migrating keys get result error"); - } - output->append(Redis::MultiLen(2)); - output->append(Redis::Integer(moved)); - output->append(Redis::Integer(remained)); - return Status::OK(); - } -}; - -class CommandSlotsMgrtSlotAsync : public CommandSlotsMgrtTagSlotAsync { - public: - CommandSlotsMgrtSlotAsync() : CommandSlotsMgrtTagSlotAsync() { name_ = "slotsmgrtslot-async"; } -}; - -class CommandSlotsMgrtExecWrapper : public Commander { - public: - CommandSlotsMgrtExecWrapper() : Commander("slotsmgrt-exec-wrapper", -3, true) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - - int ret; - auto s = svr->slotsmgrt_sender_thread_->SlotsMigrateOne(args_[1], &ret); - output->append(Redis::MultiLen(2)); - output->append(Redis::Integer(ret)); - output->append(Redis::Integer(ret)); - return Status::OK(); - } -}; - -class CommandSlotsMgrtAsyncStatus : public Commander { - public: - CommandSlotsMgrtAsyncStatus() : Commander("slotsmgrt-async-status", 1, false) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - if (!svr->GetConfig()->codis_enabled) { - return Status(Status::RedisExecErr, "codis is no enabled"); - } - std::string ip; - int port; - uint32_t slot_num; - bool migrating; - uint64_t moved_keys; - uint64_t remain_keys; - auto s = svr->slotsmgrt_sender_thread_->GetSlotsMgrtSenderStatus(&ip, - &port, - &slot_num, - &migrating, - &moved_keys, - &remain_keys); - - std::string migrate_status = migrating ? "yes" : "no"; - std::vector list; - list.emplace_back(Redis::BulkString("host")); - list.emplace_back(Redis::BulkString(ip)); - list.emplace_back(Redis::BulkString("port")); - list.emplace_back(Redis::Integer(port)); - list.emplace_back(Redis::BulkString("slot number")); - list.emplace_back(Redis::BulkString(std::to_string(slot_num))); - list.emplace_back(Redis::BulkString("migrating")); - list.emplace_back(Redis::BulkString(migrate_status)); - list.emplace_back(Redis::BulkString("moved keys")); - list.emplace_back(Redis::BulkString(std::to_string(moved_keys))); - list.emplace_back(Redis::BulkString("remain keys")); - list.emplace_back(Redis::BulkString(std::to_string(remain_keys))); - - *output = Redis::MultiBulkString(list); - return Status::OK(); - } -}; - -class CommandSlotsMgrtAsyncCancel : public Commander { - public: - CommandSlotsMgrtAsyncCancel() : Commander("slotsmgrt-async-cancel", 1, false) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - if (!svr->GetConfig()->codis_enabled) { - return Status(Status::RedisExecErr, "codis is no enabled"); - } - - auto s = svr->slotsmgrt_sender_thread_->SlotsMigrateAsyncCancel(); - *output = s.IsOK() ? Redis::Integer(1) : Redis::Integer(0); - return Status::OK(); - } -}; - -class CommandSlotsRestore : public Commander { - public: - CommandSlotsRestore() : Commander("slotsrestore", -4, false) {} - - Status Parse(const std::vector &args) override { - if ((args_.size() - 4) % 3 != 0) { - return Status(Status::RedisParseErr, errWrongNumOfArguments); - } - for (unsigned int i = 1; i < args_.size(); i += 3) { - int ttl; - try { - ttl = std::stoi(args_[i + 1]); - } catch (std::exception &e) { - return Status(Status::RedisParseErr, errValueNotInterger); - } - key_values_.push_back(KeyValue{args_[i], ttl, args_[i + 2]}); - } - return Commander::Parse(args); - } - - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - auto s = slot_db.Restore(key_values_); - if (!s.ok()) { - *output = Redis::Error(s.ToString()); - } else { - *output = Redis::SimpleString("OK"); - } - return Status::OK(); - } - - private: - std::vector key_values_; -}; - -class CommandSlotsHashKey : public Commander { - public: - CommandSlotsHashKey() : Commander("slotshashkey", -2, false) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - Redis::Slot slot_db(svr->storage_); - std::vector slot_nums; - for (size_t i = 1; i < args_.size(); i++) { - auto slot_num = GetSlotNumFromKey(args_[i]); - slot_nums.emplace_back(slot_num); - } - output->append(Redis::MultiLen(slot_nums.size())); - for (const auto slot_num : slot_nums) { - output->append(Redis::Integer(slot_num)); - } - return Status::OK(); - } -}; - -class CommandSlotsCheck : public Commander { - public: - CommandSlotsCheck() : Commander("slotscheck", 1, false) {} - Status Execute(Server *svr, Connection *conn, std::string *output) override { - if (!conn->IsAdmin()) { - *output = Redis::Error(errAdministorPermissionRequired); - return Status::OK(); - } - Redis::Slot slot_db(svr->storage_); - auto s = slot_db.Check(); - if (!s.ok()) { - *output = Redis::Error(s.ToString()); - } else { - *output = Redis::SimpleString("OK"); - } - return Status::OK(); - } -}; - #define ADD_CMD(name, fn) \ {name, []() -> std::unique_ptr { \ return std::unique_ptr(new fn()); \ @@ -4632,7 +4225,6 @@ std::map command_table = { // bit command ADD_CMD("getbit", CommandGetBit), ADD_CMD("setbit", CommandSetBit), - ADD_CMD("msetbit", CommandMSetBit), ADD_CMD("bitcount", CommandBitCount), ADD_CMD("bitpos", CommandBitPos), @@ -4741,23 +4333,6 @@ std::map command_table = { ADD_CMD("sirangebyvalue", CommandSortedintRangeByValue), ADD_CMD("sirevrangebyvalue", CommandSortedintRevRangeByValue), - // Codis Slot command - ADD_CMD("slotsinfo", CommandSlotsInfo), - ADD_CMD("slotsscan", CommandSlotsScan), - ADD_CMD("slotsdel", CommandSlotsDel), - ADD_CMD("slotsmgrtslot", CommandSlotsMgrtSlot), - ADD_CMD("slotsmgrtone", CommandSlotsMgrtOne), - ADD_CMD("slotsmgrttagslot", CommandSlotsMgrtTagSlot), - ADD_CMD("slotsmgrttagone", CommandSlotsMgrtTagOne), - ADD_CMD("slotsrestore", CommandSlotsRestore), - ADD_CMD("slotshashkey", CommandSlotsHashKey), - ADD_CMD("slotscheck", CommandSlotsCheck), - ADD_CMD("slotsmgrtslot-async", CommandSlotsMgrtSlotAsync), - ADD_CMD("slotsmgrttagslot-async", CommandSlotsMgrtTagSlotAsync), - ADD_CMD("slotsmgrt-exec-wrapper", CommandSlotsMgrtExecWrapper), - ADD_CMD("slotsmgrt-async-status", CommandSlotsMgrtAsyncStatus), - ADD_CMD("slotsmgrt-async-cancel", CommandSlotsMgrtAsyncCancel), - // internal management cmd ADD_CMD("compact", CommandCompact), ADD_CMD("bgsave", CommandBGSave), diff --git a/src/redis_db.cc b/src/redis_db.cc index 51039a0e0a5..e524f5daf1a 100644 --- a/src/redis_db.cc +++ b/src/redis_db.cc @@ -483,30 +483,4 @@ Status WriteBatchLogData::Decode(const rocksdb::Slice &blob) { return Status::OK(); } - -void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) { - log_data_.Decode(blob); -} - -rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key, - const Slice &value) { - std::string ns, user_key; - std::vector command_args; - if (column_family_id == kColumnFamilyIDMetadata) { - ExtractNamespaceKey(key, &ns, &user_key); - put_keys_.emplace_back(user_key); - } - return rocksdb::Status::OK(); -} - -rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const Slice &key) { - std::string ns, user_key; - std::vector command_args; - if (column_family_id == kColumnFamilyIDMetadata) { - ExtractNamespaceKey(key, &ns, &user_key); - delete_keys_.emplace_back(user_key); - } - return rocksdb::Status::OK(); -} - } // namespace Redis diff --git a/src/redis_db.h b/src/redis_db.h index 8cc8d40088d..d7faa513350 100644 --- a/src/redis_db.h +++ b/src/redis_db.h @@ -87,23 +87,5 @@ class WriteBatchLogData { std::vector args_; }; -/* - * An extractor to extract update from raw writebatch - */ -class WriteBatchExtractor : public rocksdb::WriteBatch::Handler { - public: - void LogData(const rocksdb::Slice &blob) override; - rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice &key, - const rocksdb::Slice &value) override; - - rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) override; - std::vector *GetPutKeys() { return &put_keys_; } - std::vector *GetDeleteKeys() { return &delete_keys_; } - private: - std::vector put_keys_; - std::vector delete_keys_; - Redis::WriteBatchLogData log_data_; -}; - } // namespace Redis diff --git a/src/redis_slot.cc b/src/redis_slot.cc deleted file mode 100644 index 47c447ad560..00000000000 --- a/src/redis_slot.cc +++ /dev/null @@ -1,1055 +0,0 @@ -#include "redis_slot.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "util.h" -#include "redis_reply.h" -#include "encoding.h" - -uint32_t crc32tab[256]; -void CRC32TableInit(uint32_t poly) { - int i, j; - for (i = 0; i < 256; i++) { - uint32_t crc = i; - for (j = 0; j < 8; j++) { - if (crc & 1) { - crc = (crc >> 1) ^ poly; - } else { - crc = (crc >> 1); - } - } - crc32tab[i] = crc; - } -} - -void InitCRC32Table() { - CRC32TableInit(IEEE_POLY); -} - -uint32_t CRC32Update(uint32_t crc, const char *buf, int len) { - int i; - crc = ~crc; - for (i = 0; i < len; i++) { - crc = crc32tab[(uint8_t) (static_cast(crc) ^ buf[i])] ^ (crc >> 8); - } - return ~crc; -} - -uint32_t GetSlotNumFromKey(const std::string &key) { - auto tag = GetTagFromKey(key); - if (tag.empty()) { - tag = key; - } - auto crc = CRC32Update(0, tag.data(), static_cast(tag.size())); - return static_cast(crc & HASH_SLOTS_MASK); -} - -std::string GetTagFromKey(const std::string &key) { - auto left_pos = key.find("{"); - if (left_pos == std::string::npos) return std::string(); - auto right_pos = key.find("}", left_pos + 1); - // Note that we hash the whole key if there is nothing between {}. - if (right_pos == std::string::npos || right_pos <= left_pos + 1) { - return std::string(); - } - - return key.substr(left_pos + 1, right_pos - left_pos - 1); -} - -static void PthreadCall(const char *label, int result) { - if (result != 0) { - fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); - abort(); - } -} - -// Return false if timeout -static bool PthreadTimeoutCall(const char *label, int result) { - if (result != 0) { - if (result == ETIMEDOUT) { - return false; - } - fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); - abort(); - } - return true; -} - -SlotInternalKey::SlotInternalKey(rocksdb::Slice input) { - GetFixed32(&input, &slot_num_); - GetFixed64(&input, &version_); - key_ = Slice(input.data(), input.size()); - buf_ = nullptr; - memset(prealloc_, '\0', sizeof(prealloc_)); -} - -SlotInternalKey::SlotInternalKey(rocksdb::Slice key, uint64_t version) { - slot_num_ = GetSlotNumFromKey(key.ToString()); - key_ = key; - version_ = version; - buf_ = nullptr; - memset(prealloc_, '\0', sizeof(prealloc_)); -} - -SlotInternalKey::SlotInternalKey(uint32_t slot_num, uint64_t version) { - slot_num_ = slot_num; - version_ = version; - buf_ = nullptr; - memset(prealloc_, '\0', sizeof(prealloc_)); -} - -SlotInternalKey::~SlotInternalKey() { - if (buf_ != nullptr && buf_ != prealloc_) delete[]buf_; -} - -rocksdb::Slice SlotInternalKey::GetKey() const { - return key_; -} - -uint64_t SlotInternalKey::GetSlotNum() const { - return slot_num_; -} - -uint64_t SlotInternalKey::GetVersion() const { - return version_; -} - -void SlotInternalKey::Encode(std::string *out) { - out->clear(); - size_t pos = 0; - size_t total = 4 + 8 + key_.size(); - if (total < sizeof(prealloc_)) { - buf_ = prealloc_; - } else { - buf_ = new char[total]; - } - EncodeFixed32(buf_ + pos, slot_num_); - pos += 4; - EncodeFixed64(buf_ + pos, version_); - pos += 8; - memcpy(buf_ + pos, key_.data(), key_.size()); - pos += key_.size(); - out->assign(buf_, pos); -} - -bool SlotInternalKey::operator==(const SlotInternalKey &that) const { - if (key_ != that.key_) return false; - return version_ == that.version_; -} - -SlotMetadata::SlotMetadata(bool generate_version) { - if (generate_version) version = generateVersion(); - size = 0; -} - -rocksdb::Status SlotMetadata::Decode(const std::string &bytes) { - // version(8bytes) + size (8byte) - if (bytes.size() < 16) { - return rocksdb::Status::InvalidArgument("the metadata was too short"); - } - rocksdb::Slice input(bytes); - GetFixed64(&input, &version); - GetFixed64(&input, &size); - return rocksdb::Status::OK(); -} - -void SlotMetadata::Encode(std::string *dst) const { - PutFixed64(dst, version); - PutFixed64(dst, size); -} - -uint64_t SlotMetadata::generateVersion() { - struct timeval now; - gettimeofday(&now, nullptr); - uint64_t version = static_cast(now.tv_sec) * 1000000; - version += static_cast(now.tv_usec); - // use random position for initial counter to avoid conflicts, - // when the slave was promoted as master and the system clock may backoff - srand(static_cast(now.tv_sec)); - static std::atomic version_counter_{static_cast(std::rand())}; - uint64_t counter = version_counter_.fetch_add(1); - return (version << VersionCounterBits) + (counter % (1 << VersionCounterBits)); -} - -bool SlotMetadata::operator==(const SlotMetadata &that) const { - if (size != that.size) return false; - if (version != that.version) return false; - return true; -} - -Mutex::Mutex() { - PthreadCall("init mutex", pthread_mutex_init(&mu_, NULL)); -} - -Mutex::~Mutex() { - PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); -} - -void Mutex::Lock() { - PthreadCall("lock", pthread_mutex_lock(&mu_)); -} - -void Mutex::Unlock() { - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); -} - -CondVar::CondVar(Mutex *mu) - : mu_(mu) { - PthreadCall("init cv", pthread_cond_init(&cv_, NULL)); -} - -CondVar::~CondVar() { - PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); -} - -void CondVar::Wait() { - PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_)); -} - -// return false if timeout -bool CondVar::TimedWait(uint32_t timeout) { - /* - * pthread_cond_timedwait api use absolute API - * so we need gettimeofday + timeout - */ - struct timeval now; - gettimeofday(&now, NULL); - struct timespec tsp; - - int64_t usec = now.tv_usec + timeout * 1000LL; - tsp.tv_sec = now.tv_sec + usec / 1000000; - tsp.tv_nsec = (usec % 1000000) * 1000; - - return PthreadTimeoutCall("timewait", - pthread_cond_timedwait(&cv_, &mu_->mu_, &tsp)); -} - -void CondVar::Signal() { - PthreadCall("signal", pthread_cond_signal(&cv_)); -} - -void CondVar::SignalAll() { - PthreadCall("broadcast", pthread_cond_broadcast(&cv_)); -} - -namespace Redis { - -rocksdb::Status Slot::GetMetadata(uint32_t slot_num, SlotMetadata *metadata) { - LatestSnapShot ss(db_); - rocksdb::ReadOptions read_options; - read_options.snapshot = ss.GetSnapShot(); - std::string bytes, metadata_key; - PutFixed32(&metadata_key, slot_num); - rocksdb::Status s = db_->Get(read_options, slot_metadata_cf_handle_, metadata_key, &bytes); - if (!s.ok()) return s; - metadata->Decode(bytes); - return s; -} - -rocksdb::Status Slot::IsKeyExist(const Slice &key) { - auto slot_num = GetSlotNumFromKey(key.ToString()); - SlotMetadata metadata(false); - rocksdb::Status s = GetMetadata(slot_num, &metadata); - if (!s.ok()) return s; - - rocksdb::ReadOptions read_options; - LatestSnapShot ss(db_); - read_options.snapshot = ss.GetSnapShot(); - - std::string slot_key, raw_bytes; - SlotInternalKey(key, metadata.version).Encode(&slot_key); - s = db_->Get(read_options, slot_key_cf_handle_, slot_key, &raw_bytes); - if (!s.ok()) return s; - return rocksdb::Status::OK(); -} - -Status Slot::MigrateOne(const std::string &host, - int port, - uint64_t timeout, - const rocksdb::Slice &key) { - int sock_fd; - auto s = Util::SockConnect(host, port, &sock_fd, timeout, timeout); - if (!s.IsOK()) { - return Status(Status::NotOK, "connect the server err: " + s.Msg()); - } - s = MigrateOneKey(sock_fd, key); - close(sock_fd); - return s; -} - -Status Slot::MigrateOneKey(int sock_fd, const rocksdb::Slice &key) { - std::string ns_key; - AppendNamespacePrefix(key, &ns_key); - std::string bytes; - auto st = Database::GetRawMetadata(ns_key, &bytes); - if (!st.ok()) return Status(Status::NotFound, st.ToString()); - Metadata metadata(kRedisNone, false); - metadata.Decode(bytes); - if (metadata.Expired()) { - return Status(Status::NotFound, "the key was Expired"); - } - if (metadata.Type() != kRedisString && metadata.size == 0) { - return Status(Status::NotFound, "no elements"); - } - - size_t line_len; - std::string restore_command; - switch (metadata.Type()) { - case kRedisString: { - std::vector commands = {"set", key.ToString(), - bytes.substr(5, bytes.size() - 5)}; - if (metadata.expire > 0) { - commands.emplace_back("EX"); - commands.emplace_back(std::to_string(metadata.expire)); - } - restore_command = Redis::MultiBulkString(commands); - break; - } - case kRedisList: - case kRedisZSet: - case kRedisBitmap: - case kRedisHash: - case kRedisSet: - case kRedisSortedint: { - auto s = generateMigrateCommandComplexKV(key, metadata, &restore_command); - if (!s.IsOK()) { - return s; - } - } - default:break; // should never get here - } - auto s = Util::SockSend(sock_fd, restore_command); - if (!s.IsOK()) { - return Status(Status::NotOK, "[slotsrestore] send command err:" + s.Msg()); - } - evbuffer *evbuf = evbuffer_new(); - while (true) { - if (evbuffer_read(evbuf, sock_fd, -1) <= 0) { - evbuffer_free(evbuf); - return Status(Status::NotOK, std::string("[slotsrestore] read response err: ") + strerror(errno)); - } - char *line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT); - if (!line) continue; - if (line[0] == '-') { - auto error_msg = "[slotsrestore] got invalid response: " + std::string(line); - free(line); - evbuffer_free(evbuf); - return Status(Status::NotOK, error_msg); - } - free(line); - break; - } - if (metadata.Type() != kRedisString && metadata.expire != 0) { - auto ttl_command = - Redis::MultiBulkString({"EXPIREAT", key.ToString(), std::to_string(metadata.expire)}); - s = Util::SockSend(sock_fd, ttl_command); - if (!s.IsOK()) { - evbuffer_free(evbuf); - return Status(Status::NotOK, "[slotsrestore] send expire command err:" + s.Msg()); - } - while (true) { - if (evbuffer_read(evbuf, sock_fd, -1) <= 0) { - evbuffer_free(evbuf); - return Status(Status::NotOK, - std::string("[slotsrestore] expire command read response err: ") + strerror(errno)); - } - char *line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT); - if (!line) continue; - if (line[0] == '-') { - free(line); - evbuffer_free(evbuf); - return Status(Status::NotOK, "[slotsrestore] expire command got invalid response"); - } - free(line); - break; - } - } - evbuffer_free(evbuf); - Database::Del(key); - return Status::OK(); -} - -Status Slot::MigrateSlotRandomOne(const std::string &host, - int port, - uint64_t timeout, - uint32_t slot_num) { - std::vector keys; - auto s = Scan(slot_num, std::string(), 1, &keys); - if (!s.ok()) { - return Status(Status::NotOK, s.ToString()); - } - if (keys.size() == 0) { - return Status(Status::NotOK, "slot is empty"); - } - return MigrateOne(host, port, timeout, keys.back()); -} - -Status Slot::MigrateTagSlot(const std::string &host, - int port, - uint64_t timeout, - uint32_t slot_num, - int *ret) { - *ret = 0; - - std::vector keys; - auto s = Scan(slot_num, std::string(), 1, &keys); - if (!s.ok()) { - return Status(Status::NotOK, s.ToString()); - } - if (keys.size() == 0) { - return Status(Status::NotOK, "slot is empty"); - } - return MigrateTag(host, port, timeout, keys.back(), ret); -} - -Status Slot::MigrateTag(const std::string &host, - int port, - uint64_t timeout, - const std::string &key, - int *ret) { - *ret = 0; - - auto tag = GetTagFromKey(key); - if (tag.empty()) { - return MigrateOne(host, port, timeout, key); - } - - auto slot_num = GetSlotNumFromKey(tag); - SlotMetadata metadata(false); - rocksdb::Status s = GetMetadata(slot_num, &metadata); - if (!s.ok()) return Status(Status::NotOK, s.ToString()); - - LatestSnapShot ss(db_); - rocksdb::ReadOptions read_options; - read_options.snapshot = ss.GetSnapShot(); - read_options.fill_cache = false; - auto iter = db_->NewIterator(read_options, slot_key_cf_handle_); - std::string prefix_key; - SlotInternalKey(slot_num, metadata.version).Encode(&prefix_key); - for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) { - if (!iter->key().starts_with(prefix_key)) { - break; - } - SlotInternalKey ikey(iter->key()); - auto k = ikey.GetKey().ToString(); - auto t = GetTagFromKey(k); - if (t == tag) { - auto s = MigrateOne(host, port, timeout, k); - if (s.IsOK()) { - *ret += 1; - } - } - } - delete iter; - return Status::OK(); -} - -Status Slot::generateMigrateCommandComplexKV(const Slice &key, const Metadata &metadata, std::string *output) { - output->clear(); - - std::string cmd; - switch (metadata.Type()) { - case kRedisList:cmd = "rpush"; - break; - case kRedisZSet:cmd = "zadd"; - break; - case kRedisBitmap:cmd = "msetbit"; - break; - case kRedisHash:cmd = "hmset"; - break; - case kRedisSet:cmd = "sadd"; - break; - case kRedisSortedint:cmd = "siadd"; - break; - default:break; // should never get here - } - std::vector list = {cmd, key.ToString()}; - - LatestSnapShot ss(db_); - rocksdb::ReadOptions read_options; - read_options.snapshot = ss.GetSnapShot(); - read_options.fill_cache = false; - auto iter = db_->NewIterator(read_options); - std::string prefix_key, ns_key; - AppendNamespacePrefix(key, &ns_key); - InternalKey(ns_key, "", metadata.version).Encode(&prefix_key); - for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) { - if (!iter->key().starts_with(prefix_key)) { - break; - } - InternalKey ikey(iter->key()); - switch (metadata.Type()) { - case kRedisSet: { - list.emplace_back(ikey.GetSubKey().ToString()); - break; - } - case kRedisSortedint: { - auto id = DecodeFixed64(ikey.GetSubKey().ToString().data()); - list.emplace_back(std::to_string(id)); - break; - } - case kRedisZSet: { - auto score = DecodeDouble(iter->value().ToString().data()); - list.emplace_back(Util::Float2String(score)); - list.emplace_back(ikey.GetSubKey().ToString()); - break; - } - case kRedisBitmap: - case kRedisHash: { - list.emplace_back(ikey.GetSubKey().ToString()); - list.emplace_back(iter->value().ToString()); - break; - } - case kRedisList: { - list.emplace_back(iter->value().ToString()); - break; - } - default:break; // should never get here - } - } - delete iter; - - *output = Redis::MultiBulkString(list); - return Status::OK(); -} - -rocksdb::Status Slot::Check() { - std::string ns, user_key, value; - - LatestSnapShot ss(db_); - rocksdb::ReadOptions read_options; - read_options.snapshot = ss.GetSnapShot(); - read_options.fill_cache = false; - // check cf_metadata against cf_slot - auto iter = db_->NewIterator(read_options, metadata_cf_handle_); - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - Metadata metadata(kRedisNone, false); - value = iter->value().ToString(); - metadata.Decode(value); - if (metadata.Expired()) continue; - ExtractNamespaceKey(iter->key(), &ns, &user_key); - - auto s = IsKeyExist(user_key); - if (!s.ok()) { - return rocksdb::Status::NotFound("cf_metadata key not in cf_slot: " + user_key); - } - } - delete iter; - // check cf_slot against cf_metadata - iter = db_->NewIterator(read_options, slot_key_cf_handle_); - int ret; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - SlotInternalKey ikey(iter->key()); - auto key = ikey.GetKey().ToString(); - auto slot_num = GetSlotNumFromKey(key); - SlotMetadata metadata(false); - rocksdb::Status s = GetMetadata(slot_num, &metadata); - if (!s.ok()) { - continue; - } - if (ikey.GetVersion() != metadata.version) { - continue; - } - Exists({key}, &ret); - if (ret != 1) { - return rocksdb::Status::NotFound("cf_slot key not in cf_metadata: " + key); - } - } - delete iter; - return rocksdb::Status::OK(); -} - -rocksdb::Status Slot::GetInfo(uint32_t start, int count, std::vector *slot_counts) { - std::string value; - auto max_slot_num = start + count; - - LatestSnapShot ss(db_); - rocksdb::ReadOptions read_options; - read_options.snapshot = ss.GetSnapShot(); - read_options.fill_cache = false; - auto iter = db_->NewIterator(read_options, slot_metadata_cf_handle_); - - std::string start_key; - PutFixed32(&start_key, 0); - - for (iter->Seek(start_key); iter->Valid(); iter->Next()) { - auto key = iter->key().ToString(); - auto slot_num = DecodeFixed32(key.data()); - if (slot_num > max_slot_num) { - break; - } - SlotMetadata metadata(false); - metadata.Decode(iter->value().ToString()); - slot_counts->emplace_back(SlotCount{(uint32_t) slot_num, metadata.size}); - } - delete iter; - return rocksdb::Status::OK(); -} - -rocksdb::Status Slot::Del(uint32_t slot_num) { - LockGuard guard(storage_->GetLockManager(), std::to_string(slot_num)); - - std::string value, metadata_key; - PutFixed32(&metadata_key, slot_num); - auto s = db_->Get(rocksdb::ReadOptions(), slot_metadata_cf_handle_, metadata_key, &value); - if (!s.ok()) return s; - return storage_->Delete(rocksdb::WriteOptions(), slot_metadata_cf_handle_, metadata_key); -} - -rocksdb::Status Slot::AddKey(const Slice &key) { - LockGuard guard(storage_->GetLockManager(), key); - - auto slot_num = GetSlotNumFromKey(key.ToString()); - SlotMetadata metadata; - rocksdb::Status s = GetMetadata(slot_num, &metadata); - if (!s.ok() && !s.IsNotFound()) return s; - - rocksdb::ReadOptions read_options; - LatestSnapShot ss(db_); - read_options.snapshot = ss.GetSnapShot(); - - std::string slot_key, raw_bytes; - SlotInternalKey(key, metadata.version).Encode(&slot_key); - s = db_->Get(read_options, slot_key_cf_handle_, slot_key, &raw_bytes); - if (s.ok()) { - return rocksdb::Status::OK(); - } else if (!s.IsNotFound()) { - return s; - } - - rocksdb::WriteBatch batch; - batch.Put(slot_key_cf_handle_, slot_key, NULL); - - metadata.size++; - std::string bytes, metadata_key; - PutFixed32(&metadata_key, slot_num); - metadata.Encode(&bytes); - batch.Put(slot_metadata_cf_handle_, metadata_key, bytes); - - return storage_->Write(rocksdb::WriteOptions(), &batch); -} - -rocksdb::Status Slot::DeleteKey(const Slice &key) { - LockGuard guard(storage_->GetLockManager(), key); - - auto slot_num = GetSlotNumFromKey(key.ToString()); - SlotMetadata metadata(false); - rocksdb::Status s = GetMetadata(slot_num, &metadata); - if (!s.ok()) return s; - - rocksdb::ReadOptions read_options; - LatestSnapShot ss(db_); - read_options.snapshot = ss.GetSnapShot(); - - std::string slot_key, raw_bytes; - SlotInternalKey(key, metadata.version).Encode(&slot_key); - s = db_->Get(read_options, slot_key_cf_handle_, slot_key, &raw_bytes); - if (!s.ok()) return s; - - rocksdb::WriteBatch batch; - batch.Delete(slot_key_cf_handle_, slot_key); - - metadata.size--; - std::string bytes, metadata_key; - PutFixed32(&metadata_key, slot_num); - metadata.Encode(&bytes); - batch.Put(slot_metadata_cf_handle_, metadata_key, bytes); - - return storage_->Write(rocksdb::WriteOptions(), &batch); -} - -rocksdb::Status Slot::DeleteAll() { - LockGuard guard(storage_->GetLockManager(), "slots_all"); - - std::string first_key, last_key; - PutFixed32(&first_key, 0); - PutFixed32(&last_key, HASH_SLOTS_SIZE); - return db_->DeleteRange(rocksdb::WriteOptions(), slot_metadata_cf_handle_, first_key, last_key); -} - -rocksdb::Status Slot::Size(uint32_t slot_num, uint64_t *ret) { - *ret = 0; - - SlotMetadata metadata(false); - auto s = GetMetadata(slot_num, &metadata); - if (!s.ok()) return s; - *ret = metadata.size; - return rocksdb::Status::OK(); -} - -rocksdb::Status Slot::UpdateKeys(const std::vector &put_keys, - const std::vector &delete_keys, - rocksdb::WriteBatch *updates) { - std::map metadatas; - - for (const auto &key : put_keys) { - auto slot_num = GetSlotNumFromKey(key); - auto iter = metadatas.find(slot_num); - if (iter == metadatas.end()) { - SlotMetadata metadata; - auto s = GetMetadata(slot_num, &metadata); - if (!s.ok() && !s.IsNotFound()) return s; - metadatas[slot_num] = metadata; - } - auto s = IsKeyExist(key); - if (!s.ok()) { - std::string slot_key; - SlotInternalKey(key, metadatas[slot_num].version).Encode(&slot_key); - metadatas[slot_num].size++; - updates->Put(slot_key_cf_handle_, slot_key, NULL); - } - } - - for (const auto &key : delete_keys) { - auto slot_num = GetSlotNumFromKey(key); - auto iter = metadatas.find(slot_num); - if (iter == metadatas.end()) { - SlotMetadata metadata; - auto s = GetMetadata(slot_num, &metadata); - if (!s.ok() && !s.IsNotFound()) return s; - metadatas[slot_num] = metadata; - } - std::string slot_key; - SlotInternalKey(key, metadatas[slot_num].version).Encode(&slot_key); - auto s = IsKeyExist(key); - if (s.ok()) { - metadatas[slot_num].size--; - updates->Delete(slot_key_cf_handle_, slot_key); - } - } - - for (const auto &iter : metadatas) { - std::string bytes, metadata_key; - PutFixed32(&metadata_key, iter.first); - iter.second.Encode(&bytes); - updates->Put(slot_metadata_cf_handle_, metadata_key, bytes); - } - - return rocksdb::Status::OK(); -} - -rocksdb::Status Slot::Scan(uint32_t slot_num, - const std::string &cursor, - uint64_t limit, - std::vector *keys) { - uint64_t cnt = 0; - SlotMetadata metadata(false); - rocksdb::Status s = GetMetadata(slot_num, &metadata); - if (!s.ok()) return s; - - LatestSnapShot ss(db_); - rocksdb::ReadOptions read_options; - read_options.snapshot = ss.GetSnapShot(); - read_options.fill_cache = false; - auto iter = db_->NewIterator(read_options, slot_key_cf_handle_); - std::string prefix_key, start_key; - SlotInternalKey(slot_num, metadata.version).Encode(&prefix_key); - if (!cursor.empty()) { - SlotInternalKey(cursor, metadata.version).Encode(&start_key); - } else { - start_key = prefix_key; - } - for (iter->Seek(start_key); iter->Valid() && cnt < limit; iter->Next()) { - if (!cursor.empty() && iter->key() == start_key) { - // if cursor is not empty, then we need to skip start_key - // because we already return that key in the last scan - continue; - } - if (!iter->key().starts_with(prefix_key)) { - break; - } - SlotInternalKey ikey(iter->key()); - keys->emplace_back(ikey.GetKey().ToString()); - cnt++; - } - delete iter; - return rocksdb::Status::OK(); -} - -rocksdb::Status Slot::Restore(const std::vector &key_values) { - return rocksdb::Status::OK(); -} - -SlotsMgrtSenderThread::~SlotsMgrtSenderThread() { - if (is_migrating_) { - StopMigrateSlot(); - slotsmgrt_cond_.SignalAll(); - } -} - -Status SlotsMgrtSenderThread::Start() { - try { - t_ = std::thread([this]() { - Util::ThreadSetName("slots-mgrt-sender-thread"); - this->loop(); - }); - } catch (const std::system_error &e) { - return Status(Status::NotOK, e.what()); - } - return Status::OK(); -} - -void SlotsMgrtSenderThread::Stop() { - stop_ = true; - LOG(WARNING) << "[slots-mgrt-sender-thread] stopped"; -} - -void SlotsMgrtSenderThread::StopMigrateSlot() { - is_migrating_ = false; - LOG(WARNING) << "[slots-mgrt-sender-thread] migrate slot " + std::to_string(slot_num_) + " stopped"; -} - -void SlotsMgrtSenderThread::Join() { - if (t_.joinable()) t_.join(); -} - -Status SlotsMgrtSenderThread::SlotsMigrateOne(const std::string &key, int *ret) { - std::lock_guard guard(db_mu_); - std::lock_guard ones_guard(ones_mu_); - Redis::Slot slot_db(storage_); - - std::string bytes; - auto s = slot_db.GetRawMetadataByUserKey(key, &bytes); - if (!s.ok()) { - if (s.IsNotFound()) { - *ret = 0; - return Status(Status::NotOK, "Migrate key: " + key + " not found"); - } else { - *ret = -1; - return Status(Status::NotOK, "Migrate one key: " + key + " error: " + s.ToString()); - } - } - Metadata metadata(kRedisNone, false); - metadata.Decode(bytes); - if (metadata.Expired()) { - *ret = 0; - return Status(Status::NotOK, "the key was Expired"); - } - if (metadata.Type() != kRedisString && metadata.size == 0) { - *ret = 0; - return Status(Status::NotOK, "no elements"); - } - - // when this slot has been finished migrating, and the thread migrating status is reset, but the result - // has not been returned to proxy, during this time, some more request on this slot are sent to the server, - // so need to check if the key exists first, if not exists the proxy forwards the request to destination server - // if the key exists, it is an error which should not happen. - auto slot_num = GetSlotNumFromKey(key); - if (slot_num != slot_num_) { - *ret = -1; - return Status(Status::NotOK, - "Slot : " + std::to_string(slot_num) + " is not the migrating slot:" + std::to_string(slot_num_)); - } else if (!is_migrating_) { - *ret = -1; - return Status(Status::NotOK, "Slot : " + std::to_string(slot_num) + " is not migrating"); - } - - if (std::find(migrating_ones_.begin(), migrating_ones_.end(), key) != migrating_ones_.end()) { - *ret = 1; - return Status::OK(); - } - - migrating_ones_.push_back(key); - *ret = 1; - return Status::OK(); -} - -Status SlotsMgrtSenderThread::SlotsMigrateBatch(const std::string &ip, - int port, - uint64_t time_out, - uint32_t slot, - int keys_num) { - MutexLock guard(&slotsmgrt_cond_mu_); - if (is_migrating_) { - if (!(dest_ip_ == ip && dest_port_ == port && slot_num_ == slot)) { - return Status(Status::NotOK, "wrong dest_ip, dest_port or slot_num"); - } - timeout_ms_ = time_out; - keys_num_.fetch_add(keys_num, std::memory_order_relaxed); - return Status::OK(); - } else { - Redis::Slot slot_db(storage_); - auto s = slot_db.Size(slot_num_, &remained_keys_num_); - if (!s.ok() && !s.IsNotFound()) { - return Status(Status::NotOK, "get slot_size error"); - } - dest_ip_ = ip; - dest_port_ = port; - timeout_ms_ = time_out; - slot_num_ = slot; - keys_num_ = keys_num; - is_migrating_ = true; - error_ = false; - LOG(INFO) << "[slots-mgrt-sender-thread] Migrate batch slot: " << slot; - } - return Status::OK(); -} - -Status SlotsMgrtSenderThread::GetSlotsMigrateResult(uint64_t *moved, uint64_t *remained) { - MutexLock guard(&slotsmgrt_cond_mu_); - slotsmgrt_cond_.TimedWait(timeout_ms_); - *moved = moved_keys_num_; - *remained = remained_keys_num_; - return Status::OK(); -} - -Status SlotsMgrtSenderThread::GetSlotsMgrtSenderStatus(std::string *ip, - int *port, - uint32_t *slot_num, - bool *migrating, - uint64_t *moved, - uint64_t *remained) { - std::lock_guard guard(db_mu_); - std::lock_guard ones_guard(ones_mu_); - *ip = dest_ip_; - *port = dest_port_; - *slot_num = slot_num_; - *migrating = is_migrating_; - *moved = moved_keys_num_; - *remained = remained_keys_num_; - return Status::OK(); -} - -Status SlotsMgrtSenderThread::SlotsMigrateAsyncCancel() { - std::lock_guard guard(db_mu_); - dest_ip_ = "none"; - dest_port_ = 0; - timeout_ms_ = 3000; - slot_num_ = 0; - moved_keys_num_ = 0; - moved_keys_all_ = 0; - remained_keys_num_ = 0; - StopMigrateSlot(); - std::vector().swap(migrating_ones_); - return Status::OK(); -} - -Status SlotsMgrtSenderThread::ElectMigrateKeys(std::vector *keys) { - std::lock_guard guard(db_mu_); - Redis::Slot slot_db(storage_); - - SlotMetadata metadata(false); - auto s = slot_db.GetMetadata(slot_num_, &metadata); - if (!s.ok()) { - StopMigrateSlot(); - return Status(Status::NotOK, s.ToString()); - } - remained_keys_num_ = metadata.size; - if (remained_keys_num_ == 0) { - LOG(WARNING) << "[slots-mgrt-sender-thread] No keys in slot: " << slot_num_; - return Status::OK(); - } - auto ss = slot_db.Scan(slot_num_, std::string(), keys_num_, keys); - keys_num_ = 0; - if (!ss.ok()) { - return Status(Status::NotOK, s.ToString()); - } - if (keys->size() == 0) { - LOG(WARNING) << "No keys in slot: " << slot_num_; - StopMigrateSlot(); - return Status(Status::NotOK, "slot is empty"); - } - return Status::OK(); -} - -void SlotsMgrtSenderThread::loop() { - Redis::Slot slot_db(storage_); - - while (!IsStopped()) { - if (!is_migrating_) { - sleep(1); - continue; - } - LOG(INFO) << "[slots-mgrt-sender-thread] Start migrate slot:" << slot_num_; - int sock_fd = 0; - auto s = Util::SockConnect(dest_ip_, dest_port_, &sock_fd, timeout_ms_, timeout_ms_); - if (!s.IsOK()) { - LOG(WARNING) << "[slots-mgrt-sender-thread] Failed to connect the server (" - << dest_ip_ << ":" << dest_port_ << ") " << s.Msg(); - slotsmgrt_cond_.Signal(); - StopMigrateSlot(); - sleep(1); - continue; - } - LOG(INFO) << "[slots-mgrt-sender-thread] Succ Connect server (" << dest_ip_ << ":" << dest_port_ << ") "; - moved_keys_all_ = 0; - while (is_migrating_) { - if (keys_num_ <= 0) { - sleep(1); - continue; - } - std::vector migrate_batch_keys; - auto s = ElectMigrateKeys(&migrate_batch_keys); - if (!s.IsOK()) { - LOG(WARNING) << "[slots-mgrt-sender-thread] Failed to get batch keys: " + s.Msg(); - slotsmgrt_cond_.Signal(); - StopMigrateSlot(); - break; - } else if (remained_keys_num_ == 0) { - StopMigrateSlot(); - break; - } - - { - std::lock_guard ones_guard(ones_mu_); - // add ones to batch end; empty ones - std::copy(migrating_ones_.begin(), migrating_ones_.end(), std::back_inserter(migrate_batch_keys)); - if (migrate_batch_keys.size() != 0) { - moved_keys_num_ = 0; - } - std::vector().swap(migrating_ones_); - } - - for (auto const &key : migrate_batch_keys) { - auto s = slot_db.MigrateOneKey(sock_fd, key); - if (!s.IsOK()) { - LOG(WARNING) << "[slots-mgrt-sender-thread] Failed to Migrate batch key " << key << ": " << s.Msg(); - if (!s.IsNotFound()) { - slot_db.AddKey(key); - } - slotsmgrt_cond_.Signal(); - StopMigrateSlot(); - error_ = true; - break; - } - moved_keys_num_++; - moved_keys_all_++; - remained_keys_num_--; - } - if (error_) { - break; - } - - if (remained_keys_num_ == 0) { - LOG(INFO) << "[slots-mgrt-sender-thread] Migrate slot: " << slot_num_ << " finished"; - slotsmgrt_cond_.Signal(); - StopMigrateSlot(); - break; - } - } - if (sock_fd != 0) { - close(sock_fd); - } - if (error_) { - sleep(1); - } - } - LOG(INFO) << "[slots-mgrt-sender-thread] Stopped!"; - return; -} - -} // namespace Redis - - - - diff --git a/src/redis_slot.h b/src/redis_slot.h deleted file mode 100644 index 3cacc786ed1..00000000000 --- a/src/redis_slot.h +++ /dev/null @@ -1,227 +0,0 @@ -#pragma once -#include -#include -#include -#include -#include -#include - -#include "encoding.h" -#include "redis_db.h" - -// crc32 -#define HASH_SLOTS_MASK 0x000003ff -#define HASH_SLOTS_SIZE (HASH_SLOTS_MASK + 1) - -const uint32_t IEEE_POLY = 0xedb88320; -extern uint32_t crc32tab[256]; - -typedef struct { - uint32_t slot_num; - uint64_t count; -} SlotCount; - -typedef struct KeyValue { - std::string key; - int ttl; - std::string value; -} KeyValue; - -void CRC32TableInit(uint32_t poly); -void InitCRC32Table(); -uint32_t CRC32Update(uint32_t crc, const char *buf, int len); -uint32_t GetSlotNumFromKey(const std::string &key); -std::string GetTagFromKey(const std::string &key); - -class SlotInternalKey { - public: - explicit SlotInternalKey(rocksdb::Slice key, uint64_t version); - explicit SlotInternalKey(uint32_t slot_num, uint64_t version); - explicit SlotInternalKey(rocksdb::Slice input); - ~SlotInternalKey(); - - uint64_t GetSlotNum() const; - uint64_t GetVersion() const; - rocksdb::Slice GetKey() const; - void Encode(std::string *out); - bool operator==(const SlotInternalKey &that) const; - - private: - uint32_t slot_num_; - uint64_t version_; - rocksdb::Slice key_; - char *buf_; - char prealloc_[256]; -}; - -class SlotMetadata { - public: - uint64_t version; - uint64_t size; - - public: - explicit SlotMetadata(bool generate_version = true); - - void Encode(std::string *dst) const; - rocksdb::Status Decode(const std::string &bytes); - bool operator==(const SlotMetadata &that) const; - - private: - uint64_t generateVersion(); -}; - -class Mutex { - public: - Mutex(); - ~Mutex(); - - void Lock(); - void Unlock(); - void AssertHeld() {} - - private: - friend class CondVar; - pthread_mutex_t mu_; - - // No copying - Mutex(const Mutex &); - void operator=(const Mutex &); -}; - -class CondVar { - public: - explicit CondVar(Mutex *mu); - ~CondVar(); - void Wait(); - /* - * timeout is millisecond - * so if you want to wait for 1 s, you should call - * TimeWait(1000); - * return false if timeout - */ - bool TimedWait(uint32_t timeout); - void Signal(); - void SignalAll(); - - private: - pthread_cond_t cv_; - Mutex *mu_; -}; - -class MutexLock { - public: - explicit MutexLock(Mutex *mu) - : mu_(mu) { - this->mu_->Lock(); - } - ~MutexLock() { this->mu_->Unlock(); } - - private: - Mutex *const mu_; - // No copying allowed - MutexLock(const MutexLock &); - void operator=(const MutexLock &); -}; - -namespace Redis { -class Slot : public SubKeyScanner { - public: - explicit Slot(Engine::Storage *storage) : - SubKeyScanner(storage, kDefaultNamespace), - slot_metadata_cf_handle_(storage->GetCFHandle("slot_metadata")), - slot_key_cf_handle_(storage->GetCFHandle("slot")) {} - - rocksdb::Status GetMetadata(uint32_t slot_num, SlotMetadata *metadata); - Status MigrateOne(const std::string &host, int port, uint64_t timeout, const rocksdb::Slice &key); - Status MigrateOneKey(int sock_fd, const rocksdb::Slice &key); - Status MigrateSlotRandomOne(const std::string &host, - int port, - uint64_t timeout, - uint32_t slot_num); - Status MigrateTagSlot(const std::string &host, - int port, - uint64_t timeout, - uint32_t slot_num, - int *ret); - Status MigrateTag(const std::string &host, - int port, - uint64_t timeout, - const std::string &key, - int *ret); - rocksdb::Status Check(); - rocksdb::Status GetInfo(uint32_t start, int count, std::vector *slot_counts); - rocksdb::Status Del(uint32_t slot_num); - rocksdb::Status AddKey(const Slice &key); - rocksdb::Status DeleteKey(const Slice &key); - rocksdb::Status DeleteAll(); - rocksdb::Status Size(uint32_t slot_num, uint64_t *ret); - rocksdb::Status UpdateKeys(const std::vector &put_keys, - const std::vector &delete_keys, - rocksdb::WriteBatch *updates); - rocksdb::Status IsKeyExist(const Slice &key); - rocksdb::Status Scan(uint32_t slot_num, - const std::string &cursor, - uint64_t limit, - std::vector *keys); - rocksdb::Status Restore(const std::vector &key_values); - - private: - rocksdb::ColumnFamilyHandle *slot_metadata_cf_handle_; - rocksdb::ColumnFamilyHandle *slot_key_cf_handle_; - Status generateMigrateCommandComplexKV(const Slice &key, const Metadata &metadata, std::string *output); -}; - -class SlotsMgrtSenderThread { - public: - explicit SlotsMgrtSenderThread(Engine::Storage *storage) : - slotsmgrt_cond_(&slotsmgrt_cond_mu_), - storage_(storage), - is_migrating_(false) {} - virtual ~SlotsMgrtSenderThread(); - - Status Start(); - void Stop(); - void StopMigrateSlot(); - void Join(); - bool IsStopped() { return stop_; } - Status SlotsMigrateOne(const std::string &key, int *ret); - Status SlotsMigrateBatch(const std::string &ip, int port, uint64_t time_out, uint32_t slot, int keys_num); - Status GetSlotsMigrateResult(uint64_t *moved, uint64_t *remained); - Status GetSlotsMgrtSenderStatus(std::string *ip, - int *port, - uint32_t *slot_num, - bool *migrating, - uint64_t *moved, - uint64_t *remained); - Status SlotsMigrateAsyncCancel(); - - private: - std::thread t_; - std::mutex db_mu_; - std::mutex ones_mu_; - Mutex slotsmgrt_cond_mu_; - CondVar slotsmgrt_cond_; - bool stop_ = false; - Engine::Storage *storage_; - - std::string dest_ip_; - int dest_port_ = 0; - uint64_t timeout_ms_ = 0; - uint32_t slot_num_ = 0; - std::atomic keys_num_{0}; - uint64_t moved_keys_num_ = 0; // during one batch moved - uint64_t moved_keys_all_ = 0; // all keys moved in the slot - uint64_t remained_keys_num_ = 0; - bool error_ = false; - std::vector migrating_ones_; - std::atomic is_migrating_; - - Status ElectMigrateKeys(std::vector *keys); - void loop(); -}; - -} // namespace Redis - - - - diff --git a/src/server.cc b/src/server.cc index 50e9fd23cdf..fa464a5ffa8 100644 --- a/src/server.cc +++ b/src/server.cc @@ -47,7 +47,6 @@ Server::~Server() { for (const auto &iter : conn_ctxs_) { delete iter.first; } - delete slotsmgrt_sender_thread_; } Status Server::Start() { @@ -87,17 +86,13 @@ Status Server::Start() { // compact once per day if (now != 0 && last_compact_date != now/86400) { last_compact_date = now/86400; - compaction_checker.CompactPubsubAndSlotFiles(); + compaction_checker.CompactPubsubFiles(); } } usleep(100000); } }); - if (config_->codis_enabled) { - slotsmgrt_sender_thread_ = new Redis::SlotsMgrtSenderThread(storage_); - slotsmgrt_sender_thread_->Start(); - } return Status::OK(); } @@ -113,9 +108,6 @@ void Server::Stop() { cleanupExitedSlaves(); rocksdb::CancelAllBackgroundWork(storage_->GetDB()); task_runner_.Stop(); - if (slotsmgrt_sender_thread_ != nullptr) { - slotsmgrt_sender_thread_->Stop(); - } } void Server::Join() { @@ -124,9 +116,6 @@ void Server::Join() { } task_runner_.Join(); if (cron_thread_.joinable()) cron_thread_.join(); - if (slotsmgrt_sender_thread_ != nullptr) { - slotsmgrt_sender_thread_->Join(); - } if (compaction_checker_thread_.joinable()) compaction_checker_thread_.join(); } diff --git a/src/server.h b/src/server.h index a593dc6142d..5b649485e6e 100644 --- a/src/server.h +++ b/src/server.h @@ -14,7 +14,6 @@ #include "task_runner.h" #include "replication.h" #include "redis_metadata.h" -#include "redis_slot.h" #include "log_collector.h" #include "worker.h" @@ -118,7 +117,6 @@ class Server { Stats stats_; Engine::Storage *storage_; - Redis::SlotsMgrtSenderThread *slotsmgrt_sender_thread_ = nullptr; static std::atomic unix_time_; private: diff --git a/src/storage.cc b/src/storage.cc index e0f9b798874..674a7241640 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -19,7 +19,6 @@ #include "redis_db.h" #include "rocksdb_crc32c.h" #include "redis_metadata.h" -#include "redis_slot.h" #include "event_listener.h" #include "compact_filter.h" #include "table_properties_collector.h" @@ -30,19 +29,15 @@ const char *kPubSubColumnFamilyName = "pubsub"; const char *kZSetScoreColumnFamilyName = "zset_score"; const char *kMetadataColumnFamilyName = "metadata"; const char *kSubkeyColumnFamilyName = "default"; -const char *kSlotMetadataColumnFamilyName = "slot_metadata"; -const char *kSlotColumnFamilyName = "slot"; const uint64_t kIORateLimitMaxMb = 1024000; using rocksdb::Slice; -using Redis::WriteBatchExtractor; Storage::Storage(Config *config) : backup_env_(rocksdb::Env::Default()), config_(config), lock_mgr_(16) { - InitCRC32Table(); Metadata::InitVersionCounter(); SetCreatingCheckpoint(false); SetCheckpointCreateTime(0); @@ -137,10 +132,6 @@ Status Storage::CreateColumnFamilies(const rocksdb::Options &options) { std::vector cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, kPubSubColumnFamilyName}; - if (config_->codis_enabled) { - cf_names.emplace_back(kSlotMetadataColumnFamilyName); - cf_names.emplace_back(kSlotColumnFamilyName); - } std::vector cf_handles; s = tmp_db->CreateColumnFamilies(cf_options, cf_names, &cf_handles); if (!s.ok()) { @@ -212,36 +203,6 @@ Status Storage::Open(bool read_only) { std::vector old_column_families; auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, &old_column_families); if (!s.ok()) return Status(Status::NotOK, s.ToString()); - if (!config_->codis_enabled && column_families.size() != old_column_families.size()) { - return Status(Status::NotOK, "the db enabled codis mode at first open, please set codis-enabled 'yes'"); - } - if (config_->codis_enabled && column_families.size() == old_column_families.size()) { - return Status(Status::NotOK, "the db disabled codis mode at first open, please set codis-enabled 'no'"); - } - if (config_->codis_enabled) { - rocksdb::BlockBasedTableOptions slot_metadata_table_opts; - slot_metadata_table_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, true)); - slot_metadata_table_opts.block_cache = - rocksdb::NewLRUCache(metadata_block_cache_size, -1, false, 0.75); - slot_metadata_table_opts.cache_index_and_filter_blocks = true; - slot_metadata_table_opts.cache_index_and_filter_blocks_with_high_priority = true; - rocksdb::ColumnFamilyOptions slot_metadata_opts(options); - slot_metadata_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(slot_metadata_table_opts)); - - rocksdb::BlockBasedTableOptions slotkey_table_opts; - slotkey_table_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, true)); - slotkey_table_opts.block_cache = - rocksdb::NewLRUCache(subkey_block_cache_size, -1, false, 0.75); - slotkey_table_opts.cache_index_and_filter_blocks = true; - slotkey_table_opts.cache_index_and_filter_blocks_with_high_priority = true; - rocksdb::ColumnFamilyOptions slotkey_opts(options); - slotkey_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(slotkey_table_opts)); - slotkey_opts.compaction_filter_factory = std::make_shared(this); - slotkey_opts.disable_auto_compactions = config_->RocksDB.disable_auto_compactions; - - column_families.emplace_back(rocksdb::ColumnFamilyDescriptor(kSlotMetadataColumnFamilyName, slot_metadata_opts)); - column_families.emplace_back(rocksdb::ColumnFamilyDescriptor(kSlotColumnFamilyName, slotkey_opts)); - } auto start = std::chrono::high_resolution_clock::now(); if (read_only) { @@ -425,14 +386,6 @@ rocksdb::Status Storage::Write(const rocksdb::WriteOptions &options, rocksdb::Wr if (reach_db_size_limit_) { return rocksdb::Status::SpaceLimit(); } - if (config_->codis_enabled) { - WriteBatchExtractor write_batch_extractor; - auto s = updates->Iterate(&write_batch_extractor); - if (!s.ok()) return s; - - Redis::Slot slot_db(this); - slot_db.UpdateKeys(*write_batch_extractor.GetPutKeys(), *write_batch_extractor.GetDeleteKeys(), updates); - } auto s = db_->Write(options, updates); if (!s.ok()) return s; @@ -445,15 +398,6 @@ rocksdb::Status Storage::Delete(const rocksdb::WriteOptions &options, const rocksdb::Slice &key) { rocksdb::WriteBatch batch; batch.Delete(cf_handle, key); - if (config_->codis_enabled && cf_handle == GetCFHandle("metadata")) { - std::vector delete_keys; - std::string ns, user_key; - ExtractNamespaceKey(key, &ns, &user_key); - delete_keys.emplace_back(user_key); - Redis::Slot slot_db(this); - auto s = slot_db.UpdateKeys({}, delete_keys, &batch); - if (!s.ok()) return s; - } return db_->Write(options, &batch); } @@ -466,13 +410,6 @@ rocksdb::Status Storage::DeleteRange(const std::string &first_key, const std::st if (!s.ok()) { return s; } - if (config_->codis_enabled) { - // currently only flushall and flushdb use deleterange , so we can delete all codis slot data - // please do not use Storage::DeleteRange in other scenario - Redis::Slot slot_db(this); - auto s = slot_db.DeleteAll(); - if (!s.ok()) return s; - } return rocksdb::Status::OK(); } @@ -495,10 +432,6 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) { return cf_handles_[2]; } else if (name == kPubSubColumnFamilyName) { return cf_handles_[3]; - } else if (name == kSlotMetadataColumnFamilyName) { - return cf_handles_[4]; - } else if (name == kSlotColumnFamilyName) { - return cf_handles_[5]; } return cf_handles_[0]; } diff --git a/src/storage.h b/src/storage.h index afcae392cfb..006306d914f 100644 --- a/src/storage.h +++ b/src/storage.h @@ -27,8 +27,6 @@ extern const char *kPubSubColumnFamilyName; extern const char *kZSetScoreColumnFamilyName; extern const char *kMetadataColumnFamilyName; extern const char *kSubkeyColumnFamilyName; -extern const char *kSlotMetadataColumnFamilyName; -extern const char *kSlotColumnFamilyName; class Storage { public: @@ -78,7 +76,6 @@ class Storage { void IncrFlushCount(uint64_t n) { flush_count_.fetch_add(n); } uint64_t GetCompactionCount() { return compaction_count_; } void IncrCompactionCount(uint64_t n) { compaction_count_.fetch_add(n); } - bool CodisEnabled() { return config_->codis_enabled; } Storage(const Storage &) = delete; Storage &operator=(const Storage &) = delete; diff --git a/tests/config_test.cc b/tests/config_test.cc index 0e1c7c311f5..5740aa01a64 100644 --- a/tests/config_test.cc +++ b/tests/config_test.cc @@ -71,7 +71,6 @@ TEST(Config, GetAndSet) { {"workers", "8"}, {"repl-workers", "8"}, {"tcp-backlog", "500"}, - {"codis-enabled", "yes"}, {"slaveof", "no one"}, {"db-name", "test_dbname"}, {"dir", "test_dir"}, diff --git a/tools/kvrocks2redis/parser.cc b/tools/kvrocks2redis/parser.cc index 4359da97482..43a70e7e2f6 100644 --- a/tools/kvrocks2redis/parser.cc +++ b/tools/kvrocks2redis/parser.cc @@ -230,7 +230,6 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic case kRedisBitmap: { auto args = log_data_.GetArguments(); if (args->size() < 1) { - // codis migrate special cmd msetbit will fall into this, will opti to slotsrestore, so ignore temporary LOG(ERROR) << "Fail to parse write_batch in putcf cmd setbit : args error ,should contain setbit offset"; return rocksdb::Status::OK(); }