Skip to content

Commit

Permalink
feat: supported 'ptype' command (OpenAtomFoundation#1586)
Browse files Browse the repository at this point in the history
* feat: supported 'ptype' command

* Simplified command `type` and `ptype` code

* Remove the capacity judgment

* fix slavaof serialize response bug (OpenAtomFoundation#1583)

Co-authored-by: liuyuecai <liuyuecai@360.cn>

* fix (OpenAtomFoundation#1587) (OpenAtomFoundation#1588)

* fix unit test:type/set (OpenAtomFoundation#1577)

* Fix memory leaks in HandleBGWorkerWriteDB and HandleMetaSyncResponse (OpenAtomFoundation#1590)

* fix memory leaks in PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) and PikaReplClientConn::HandleMetaSyncResponse(void* arg).

* add address/thread sanitizer to CMakeLists

---------

Co-authored-by: cjh <1271435567@qq.com>

* refactor: replace pstd/env with std::filesystem (OpenAtomFoundation#1470)

* fix bug issue 1554: using unique_ptr.reset to fix the SIGABRT error (OpenAtomFoundation#1595)

using unique_ptr.reset to fix the SIGABRT error && using time-wait lock to notify quickly

* fix: incorrect manner of terminating the process (OpenAtomFoundation#1596)

* fix issue#1597: add rocksdb dependency to pstd (OpenAtomFoundation#1599)

* fix g++15 compile failure

* add rocksdb dependency to pstd

---------

Co-authored-by: J1senn <J1senn@outlook.com>

* fix_info_command

* fix: gcc13 compile failed (OpenAtomFoundation#1601)

* ci: add unit test to github action (OpenAtomFoundation#1604)

* ci: add unit test in github workflow

* chore:change `PLATFORM` field logic (OpenAtomFoundation#1615)

* fix issue 1517: scan 命令不支持 type 参数 (OpenAtomFoundation#1582)

* fix: the unit test of type/set (OpenAtomFoundation#1617)

* test: optimize the return results of srandmember to avoid approximate results

* fix: use last_seed for random engine

* [fix issue1621] fix deadlock (OpenAtomFoundation#1620)

* [fix] fix deadlock

* [fix] fix

* command `type` and `ptype` add unit test

* fix member variable initialization

* Update issue templates

* feat: supported 'ptype' command

* fix unit test:type/set (OpenAtomFoundation#1577)

* fix issue#1597: add rocksdb dependency to pstd (OpenAtomFoundation#1599)

* fix g++15 compile failure

* add rocksdb dependency to pstd

---------

Co-authored-by: J1senn <J1senn@outlook.com>

* fix: gcc13 compile failed (OpenAtomFoundation#1601)

* fix: the unit test of type/set (OpenAtomFoundation#1617)

* test: optimize the return results of srandmember to avoid approximate results

* fix: use last_seed for random engine

* Modify other modules to use the new `type` function

* fix function repeat

---------

Co-authored-by: Yuecai Liu <38887641+luky116@users.noreply.github.com>
Co-authored-by: liuyuecai <liuyuecai@360.cn>
Co-authored-by: Peter Chan <luckygoose@foxmail.com>
Co-authored-by: chenbt <34958405+chenbt-hz@users.noreply.github.com>
Co-authored-by: Junhua Chen <41671101+cheniujh@users.noreply.github.com>
Co-authored-by: cjh <1271435567@qq.com>
Co-authored-by: kang jinci <jincikang@gmail.com>
Co-authored-by: Xin.Zh <dragoncharlie@foxmail.com>
Co-authored-by: machinly <machinlyg@gmail.com>
Co-authored-by: A2ureStone <73770413+A2ureStone@users.noreply.github.com>
Co-authored-by: J1senn <J1senn@outlook.com>
Co-authored-by: chejinge <945997690@qq.com>
Co-authored-by: baerwang <52104949+baerwang@users.noreply.github.com>
Co-authored-by: ptbxzrt <89020404+ptbxzrt@users.noreply.github.com>
Co-authored-by: 你不要过来啊 <73388438+iiiuwioajdks@users.noreply.github.com>
  • Loading branch information
16 people authored Jul 2, 2023
1 parent 395e5ba commit 807713d
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 73 deletions.
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ const std::string kCmdNameTtl = "ttl";
const std::string kCmdNamePttl = "pttl";
const std::string kCmdNamePersist = "persist";
const std::string kCmdNameType = "type";
const std::string kCmdNamePType = "ptype";
const std::string kCmdNameScan = "scan";
const std::string kCmdNameScanx = "scanx";
const std::string kCmdNamePKSetexAt = "pksetexat";
Expand Down
18 changes: 18 additions & 0 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,24 @@ class TypeCmd : public Cmd {
void DoInitial() override;
};

class PTypeCmd : public Cmd {
public:
PTypeCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PTypeCmd(*this); }

private:
std::string key_;
void DoInitial() override;
};

class ScanCmd : public Cmd {
public:
ScanCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag), pattern_("*") {}
Expand Down
3 changes: 3 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ void InitCmdTable(CmdTable* cmd_table) {
std::unique_ptr<Cmd> typeptr =
std::make_unique<TypeCmd>(kCmdNameType, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsKv);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameType, std::move(typeptr)));
////PTypeCmd
std::unique_ptr<Cmd> pTypeptr = std::make_unique<PTypeCmd>(kCmdNamePType, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsKv);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePType, std::move(pTypeptr)));
////ScanCmd
std::unique_ptr<Cmd> scanptr =
std::make_unique<ScanCmd>(kCmdNameScan, -2, kCmdFlagsRead | kCmdFlagsMultiSlot | kCmdFlagsKv);
Expand Down
40 changes: 29 additions & 11 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,7 @@ void DelCmd::Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) {
}
}

void DelCmd::Merge() {
res_.AppendInteger(split_res_);
}
void DelCmd::Merge() { res_.AppendInteger(split_res_); }

void IncrCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand Down Expand Up @@ -804,9 +802,7 @@ void ExistsCmd::Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) {
}
}

void ExistsCmd::Merge() {
res_.AppendInteger(split_res_);
}
void ExistsCmd::Merge() { res_.AppendInteger(split_res_); }

void ExpireCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand Down Expand Up @@ -1088,10 +1084,33 @@ void TypeCmd::DoInitial() {
}

void TypeCmd::Do(std::shared_ptr<Slot> slot) {
std::string res;
rocksdb::Status s = slot->db()->Type(key_, &res);
std::vector<std::string> types(1);
rocksdb::Status s = slot->db()->GetType(key_, true, types);
if (s.ok()) {
res_.AppendContent("+" + res);
res_.AppendContent("+" + types[0]);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
}

void PTypeCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameType);
return;
}
key_ = argv_[1];
}

void PTypeCmd::Do(std::shared_ptr<Slot> slot) {
std::vector<std::string> types(5);
rocksdb::Status s = slot->db()->GetType(key_, false, types);

if (s.ok()) {
res_.AppendArrayLen(types.size());
for (const auto& vs : types) {
res_.AppendStringLen(vs.size());
res_.AppendContent(vs);
}
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
Expand Down Expand Up @@ -1328,8 +1347,7 @@ void PKScanRangeCmd::Do(std::shared_ptr<Slot> slot) {
std::string next_key;
std::vector<std::string> keys;
std::vector<storage::KeyValue> kvs;
rocksdb::Status s =
slot->db()->PKScanRange(type_, key_start_, key_end_, pattern_, limit_, &keys, &kvs, &next_key);
rocksdb::Status s = slot->db()->PKScanRange(type_, key_start_, key_end_, pattern_, limit_, &keys, &kvs, &next_key);

if (s.ok()) {
res_.AppendArrayLen(2);
Expand Down
18 changes: 9 additions & 9 deletions src/pika_migrate_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,9 @@ int PikaMigrateThread::ReqMigrateOne(const std::string &key, const std::shared_p
std::unique_lock lm(migrator_mutex_);

int slot_id = GetSlotID(key);
std::string type_str;
std::vector<std::string> type_str(1);
char key_type;
rocksdb::Status s = slot->db()->Type(key, &type_str);
rocksdb::Status s = slot->db()->GetType(key, true, type_str);
if (!s.ok()) {
if (s.IsNotFound()) {
LOG(INFO) << "PikaMigrateThread::ReqMigrateOne key: " << key << " not found";
Expand All @@ -675,20 +675,20 @@ int PikaMigrateThread::ReqMigrateOne(const std::string &key, const std::shared_p
}
}

if (type_str == "string") {
if (type_str[0] == "string") {
key_type = 'k';
} else if (type_str == "hash") {
} else if (type_str[0] == "hash") {
key_type = 'h';
} else if (type_str == "list") {
} else if (type_str[0] == "list") {
key_type = 'l';
} else if (type_str == "set") {
} else if (type_str[0] == "set") {
key_type = 's';
} else if (type_str == "zset") {
} else if (type_str[0] == "zset") {
key_type = 'z';
} else if (type_str == "none") {
} else if (type_str[0] == "none") {
return 0;
} else {
LOG(WARNING) << "PikaMigrateThread::ReqMigrateOne key: " << key << " type: " << type_str << " is illegal";
LOG(WARNING) << "PikaMigrateThread::ReqMigrateOne key: " << key << " type: " << type_str[0] << " is illegal";
return -1;
}

Expand Down
28 changes: 14 additions & 14 deletions src/pika_slot_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -891,22 +891,22 @@ void RemSlotKey(const std::string key, const std::shared_ptr<Slot>& slot) {
}

int GetKeyType(const std::string key, std::string &key_type, const std::shared_ptr<Slot>& slot) {
std::string type_str;
rocksdb::Status s = slot->db()->Type(key, &type_str);
std::vector<std::string> type_str(1);
rocksdb::Status s = slot->db()->GetType(key, true, type_str);
if (!s.ok()) {
LOG(WARNING) << "Get key type error: " << key << " " << s.ToString();
key_type = "";
return -1;
}
if (type_str == "string") {
if (type_str[0] == "string") {
key_type = "k";
} else if (type_str == "hash") {
} else if (type_str[0] == "hash") {
key_type = "h";
} else if (type_str == "list") {
} else if (type_str[0] == "list") {
key_type = "l";
} else if (type_str == "set") {
} else if (type_str[0] == "set") {
key_type = "s";
} else if (type_str == "zset") {
} else if (type_str[0] == "zset") {
key_type = "z";
} else {
LOG(WARNING) << "Get key type error: " << key;
Expand Down Expand Up @@ -1079,8 +1079,8 @@ void SlotsMgrtTagSlotCmd::Do(std::shared_ptr<Slot> slot) {

// check key type
int SlotsMgrtTagOneCmd::KeyTypeCheck(const std::shared_ptr<Slot>& slot) {
std::string type_str;
rocksdb::Status s = slot->db()->Type(key_, &type_str);
std::vector<std::string> type_str(1);
rocksdb::Status s = slot->db()->GetType(key_, true, type_str);
if (!s.ok()) {
if (s.IsNotFound()) {
LOG(INFO) << "Migrate slot key " << key_ << " not found";
Expand All @@ -1091,15 +1091,15 @@ int SlotsMgrtTagOneCmd::KeyTypeCheck(const std::shared_ptr<Slot>& slot) {
}
return -1;
}
if (type_str == "string") {
if (type_str[0] == "string") {
key_type_ = 'k';
} else if (type_str == "hash") {
} else if (type_str[0] == "hash") {
key_type_ = 'h';
} else if (type_str == "list") {
} else if (type_str[0] == "list") {
key_type_ = 'l';
} else if (type_str == "set") {
} else if (type_str[0] == "set") {
key_type_ = 's';
} else if (type_str == "zset") {
} else if (type_str[0] == "zset") {
key_type_ = 'z';
} else {
LOG(WARNING) << "Migrate slot key: " << key_ << " not found";
Expand Down
25 changes: 14 additions & 11 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ struct BGTask {
Operation operation;
std::string argv;

BGTask(const DataType& _type = DataType::kAll, const Operation& _opeation = Operation::kNone,
std::string _argv = "")
BGTask(const DataType& _type = DataType::kAll, const Operation& _opeation = Operation::kNone, std::string _argv = "")
: type(_type), operation(_opeation), argv(std::move(_argv)) {}
};

Expand Down Expand Up @@ -424,7 +423,7 @@ class Storage {

// Removes and returns several random elements specified by count from the set value store at key.
Status SPop(const Slice& key, std::vector<std::string>* members, int64_t count);

// When called with just the key argument, return a random element from the
// set value stored at key.
// when called with the additional count argument, return an array of count
Expand Down Expand Up @@ -968,8 +967,12 @@ class Storage {
// return > 0 TTL in seconds
std::map<DataType, int64_t> TTL(const Slice& key, std::map<DataType, Status>* type_status);

// Reutrns the data type of the key
Status Type(const std::string& key, std::string* type);
// Reutrns the data all type of the key
// if single is true, the query will return the first one
Status GetType(const std::string& key, bool single, std::vector<std::string>& types);

// Reutrns the data all type of the key
Status Type(const std::string& key, std::vector<std::string>& types);

Status Keys(const DataType& data_type, const std::string& pattern, std::vector<std::string>* keys);

Expand Down Expand Up @@ -1019,29 +1022,29 @@ class Storage {

Status SetOptions(const OptionType& option_type, const std::string& db_type,
const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string &info);
void GetRocksDBInfo(std::string& info);

private:
std::unique_ptr<RedisStrings> strings_db_;
std::unique_ptr<RedisHashes> hashes_db_;
std::unique_ptr<RedisSets> sets_db_;
std::unique_ptr<RedisZSets> zsets_db_;
std::unique_ptr<RedisLists> lists_db_;
std::atomic<bool> is_opened_;
std::atomic<bool> is_opened_ = false;

std::unique_ptr<LRUCache<std::string, std::string>> cursors_store_;

// Storage start the background thread for compaction task
pthread_t bg_tasks_thread_id_;
pthread_t bg_tasks_thread_id_ = 0;
pstd::Mutex bg_tasks_mutex_;
pstd::CondVar bg_tasks_cond_var_;
std::queue<BGTask> bg_tasks_queue_;

std::atomic<int> current_task_type_;
std::atomic<bool> bg_tasks_should_exit_;
std::atomic<int> current_task_type_ = kNone;
std::atomic<bool> bg_tasks_should_exit_ = false;

// For scan keys in data base
std::atomic<bool> scan_keynum_exit_;
std::atomic<bool> scan_keynum_exit_ = false;
};

} // namespace storage
Expand Down
Loading

0 comments on commit 807713d

Please sign in to comment.