Skip to content

Commit

Permalink
add pika specialization
Browse files Browse the repository at this point in the history
Signed-off-by: lizhen6 <lizhen6@360.cn>
  • Loading branch information
tedli committed Jul 1, 2023
1 parent 60c7d4a commit 3196b3a
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 73 deletions.
35 changes: 29 additions & 6 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,22 +463,27 @@ class CommandCmd : public Cmd {
const static std::unordered_map<std::string, int> kFieldNameOrder;
};

class Encodable;
using EncodablePtr = std::shared_ptr<Encodable>;

class Encodable {
public:
friend CmdRes& operator<<(CmdRes& res, const Encodable& e) { return e.EncodeTo(res); };
friend CmdRes& operator<<(CmdRes& res, const Encodable& e) { return e.EncodeTo(res); }
EncodablePtr operator+(const EncodablePtr& other) { return MergeFrom(other); }

protected:
virtual CmdRes& EncodeTo(CmdRes&) const = 0;
virtual EncodablePtr MergeFrom(const EncodablePtr& other) const = 0;
};

using EncodablePtr = std::shared_ptr<Encodable>;

class EncodableInt : public Encodable {
public:
EncodableInt(int value) : value_(value) {}
EncodableInt(unsigned long long value) : value_(value) {}

protected:
CmdRes& EncodeTo(CmdRes& res) const override;
EncodablePtr MergeFrom(const EncodablePtr& other) const override;

private:
int value_;
Expand All @@ -490,6 +495,7 @@ class CommandCmd : public Cmd {

protected:
CmdRes& EncodeTo(CmdRes& res) const override;
EncodablePtr MergeFrom(const EncodablePtr& other) const override;

private:
std::string value_;
Expand All @@ -499,11 +505,12 @@ class CommandCmd : public Cmd {
public:
using RedisMap = std::map<std::string, EncodablePtr, CommandFieldCompare>;
EncodableMap(RedisMap values) : values_(std::move(values)) {}
template <class Map>
static CmdRes& EncodeTo(CmdRes& res, const Map& map);
template <typename Map>
static CmdRes& EncodeTo(CmdRes& res, const Map& map, const Map& specialization = Map());

protected:
CmdRes& EncodeTo(CmdRes& res) const override;
EncodablePtr MergeFrom(const EncodablePtr& other) const override;

private:
RedisMap values_;
Expand All @@ -517,6 +524,7 @@ class CommandCmd : public Cmd {

protected:
CmdRes& EncodeTo(CmdRes& res) const override;
EncodablePtr MergeFrom(const EncodablePtr& other) const override;

private:
std::vector<EncodablePtr> values_;
Expand All @@ -530,6 +538,7 @@ class CommandCmd : public Cmd {

protected:
CmdRes& EncodeTo(CmdRes& res) const override;
EncodablePtr MergeFrom(const EncodablePtr& other) const override;

private:
std::vector<EncodablePtr> values_;
Expand All @@ -541,6 +550,7 @@ class CommandCmd : public Cmd {

protected:
CmdRes& EncodeTo(CmdRes& res) const override;
EncodablePtr MergeFrom(const EncodablePtr& other) const override;

private:
std::string value_;
Expand All @@ -554,7 +564,20 @@ class CommandCmd : public Cmd {
std::string command_;
std::vector<std::string>::const_iterator cmds_begin_, cmds_end_;

const static std::string kNullReply;
const static std::string kPikaField;
const static EncodablePtr kNotSupportedLiteral;
const static EncodablePtr kCompatibleLiteral;
const static EncodablePtr kBitSpecLiteral;
const static EncodablePtr kHyperLogLiteral;
const static EncodablePtr kPubSubLiteral;

const static EncodablePtr kNotSupportedSpecialization;
const static EncodablePtr kCompatibleSpecialization;
const static EncodablePtr kBitSpecialization;
const static EncodablePtr kHyperLogSpecialization;
const static EncodablePtr kPubSubSpecialization;

const static std::unordered_map<std::string, EncodablePtr> kPikaSpecialization;
const static std::unordered_map<std::string, EncodablePtr> kCommandDocs;
};

Expand Down
3 changes: 2 additions & 1 deletion include/pika_cmd_table_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
class PikaCmdTableManager {
public:
PikaCmdTableManager();
virtual ~PikaCmdTableManager(){};
virtual ~PikaCmdTableManager() = default;
std::shared_ptr<Cmd> GetCmd(const std::string& opt);
uint32_t DistributeKey(const std::string& key, uint32_t slot_num);
bool CmdExist(const std::string& cmd) const;

private:
std::shared_ptr<Cmd> NewCommand(const std::string& opt);
Expand Down
15 changes: 7 additions & 8 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
#ifndef PIKA_COMMAND_H_
#define PIKA_COMMAND_H_

#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <memory>

#include "net/include/net_conn.h"
#include "net/include/redis_conn.h"
Expand Down Expand Up @@ -49,7 +50,7 @@ const std::string kCmdNameQuit = "quit";
const std::string kCmdNameHello = "hello";
const std::string kCmdNameCommand = "command";

//Migrate slot
// Migrate slot
const std::string kCmdNameSlotsMgrtSlot = "slotsmgrtslot";
const std::string kCmdNameSlotsMgrtTagSlot = "slotsmgrttagslot";
const std::string kCmdNameSlotsMgrtOne = "slotsmgrtone";
Expand Down Expand Up @@ -411,15 +412,13 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
};
struct ProcessArg {
ProcessArg() = default;
ProcessArg(std::shared_ptr<Slot> _slot, std::shared_ptr<SyncMasterSlot> _sync_slot,
HintKeys _hint_keys)
ProcessArg(std::shared_ptr<Slot> _slot, std::shared_ptr<SyncMasterSlot> _sync_slot, HintKeys _hint_keys)
: slot(std::move(_slot)), sync_slot(std::move(_sync_slot)), hint_keys(std::move(_hint_keys)) {}
std::shared_ptr<Slot> slot;
std::shared_ptr<SyncMasterSlot> sync_slot;
HintKeys hint_keys;
};
Cmd(std::string name, int arity, uint16_t flag)
: name_(std::move(name)), arity_(arity), flag_(flag) {}
Cmd(std::string name, int arity, uint16_t flag) : name_(std::move(name)), arity_(arity), flag_(flag) {}
virtual ~Cmd() = default;

virtual std::vector<std::string> current_key() const;
Expand Down Expand Up @@ -464,6 +463,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
void SetStage(CmdStage stage);

virtual void DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot);

protected:
// enable copy, used default copy
// Cmd(const Cmd&);
Expand All @@ -479,7 +479,6 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
int arity_ = -2;
uint16_t flag_ = 0;


protected:
CmdRes res_;
PikaCmdArgsType argv_;
Expand All @@ -497,7 +496,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
Cmd& operator=(const Cmd&);
};

using CmdTable = std::unordered_map<std::string, std::unique_ptr<Cmd>>;
using CmdTable = std::unordered_map<std::string, std::unique_ptr<Cmd>>;

// Method for Cmd Table
void InitCmdTable(CmdTable* cmd_table);
Expand Down
142 changes: 107 additions & 35 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <glog/logging.h>

#include "include/build_version.h"
#include "include/pika_cmd_table_manager.h"
#include "include/pika_conf.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
Expand Down Expand Up @@ -943,8 +944,8 @@ void InfoCmd::InfoReplication(std::string& info) {
for (const auto& db_item : g_pika_server->dbs_) {
std::shared_lock slot_rwl(db_item.second->slots_rw_);
for (const auto& slot_item : db_item.second->slots_) {
std::shared_ptr<SyncSlaveSlot> slave_slot = g_pika_rm->GetSyncSlaveSlotByName(
SlotInfo(db_item.second->GetDBName(), slot_item.second->GetSlotID()));
std::shared_ptr<SyncSlaveSlot> slave_slot =
g_pika_rm->GetSyncSlaveSlotByName(SlotInfo(db_item.second->GetDBName(), slot_item.second->GetSlotID()));
if (!slave_slot) {
out_of_sync << "(" << slot_item.second->GetSlotName() << ": InternalError)";
continue;
Expand Down Expand Up @@ -1177,20 +1178,20 @@ void InfoCmd::InfoRocksDB(std::string& info) {
tmp_stream << "# RocksDB"
<< "\r\n";

std::shared_lock table_rwl(g_pika_server->dbs_rw_);
for (const auto& table_item : g_pika_server->dbs_) {
if (!table_item.second) {
continue;
}
std::shared_lock slot_rwl(table_item.second->slots_rw_);
for (const auto& slot_item : table_item.second->slots_) {
std::string rocksdb_info;
slot_item.second->DbRWLockReader();
slot_item.second->db()->GetRocksDBInfo(rocksdb_info);
slot_item.second->DbRWUnLock();
tmp_stream << rocksdb_info;
}
std::shared_lock table_rwl(g_pika_server->dbs_rw_);
for (const auto& table_item : g_pika_server->dbs_) {
if (!table_item.second) {
continue;
}
std::shared_lock slot_rwl(table_item.second->slots_rw_);
for (const auto& slot_item : table_item.second->slots_) {
std::string rocksdb_info;
slot_item.second->DbRWLockReader();
slot_item.second->db()->GetRocksDBInfo(rocksdb_info);
slot_item.second->DbRWUnLock();
tmp_stream << rocksdb_info;
}
}

info.append(tmp_stream.str());
}
Expand Down Expand Up @@ -2409,25 +2410,60 @@ CmdRes& CommandCmd::EncodableInt::EncodeTo(CmdRes& res) const {
return res;
}

CommandCmd::EncodablePtr CommandCmd::EncodableInt::MergeFrom(const CommandCmd::EncodablePtr& other) const {
if (auto pe = std::dynamic_pointer_cast<CommandCmd::EncodableInt>(other)) {
return std::make_shared<CommandCmd::EncodableInt>(value_ + pe->value_);
}
return std::make_shared<CommandCmd::EncodableInt>(value_);
}

CmdRes& CommandCmd::EncodableString::EncodeTo(CmdRes& res) const {
res.AppendString(value_);
return res;
}

template <class Map>
CmdRes& CommandCmd::EncodableMap::EncodeTo(CmdRes& res, const Map& map) {
CommandCmd::EncodablePtr CommandCmd::EncodableString::MergeFrom(const CommandCmd::EncodablePtr& other) const {
if (auto pe = std::dynamic_pointer_cast<CommandCmd::EncodableString>(other)) {
return std::make_shared<CommandCmd::EncodableString>(value_ + pe->value_);
}
return std::make_shared<CommandCmd::EncodableString>(value_);
}

template <typename Map>
CmdRes& CommandCmd::EncodableMap::EncodeTo(CmdRes& res, const Map& map, const Map& specialization) {
std::string raw_string;
RedisAppendLen(raw_string, map.size() * 2, kPrefix);
res.AppendStringRaw(raw_string);
for (const auto& kv : map) {
res.AppendString(kv.first);
res << *kv.second;
if (auto iter = specialization.find(kv.first); iter != specialization.end()) {
res << *(*kv.second + iter->second);
} else {
res << *kv.second;
}
}
return res;
}

CmdRes& CommandCmd::EncodableMap::EncodeTo(CmdRes& res) const { return EncodeTo(res, values_); }

CommandCmd::EncodablePtr CommandCmd::EncodableMap::MergeFrom(const CommandCmd::EncodablePtr& other) const {
if (auto pe = std::dynamic_pointer_cast<CommandCmd::EncodableMap>(other)) {
auto values = CommandCmd::EncodableMap::RedisMap(values_.cbegin(), values_.cend());
for (const auto& pair : pe->values_) {
auto iter = values.find(pair.first);
if (iter == values.end()) {
values[pair.first] = pair.second;
} else {
iter->second = (*iter->second + pair.second);
}
}
return std::make_shared<CommandCmd::EncodableMap>(values);
}
return std::make_shared<CommandCmd::EncodableMap>(
CommandCmd::EncodableMap::RedisMap(values_.cbegin(), values_.cend()));
}

CmdRes& CommandCmd::EncodableSet::EncodeTo(CmdRes& res) const {
std::string raw_string;
RedisAppendLen(raw_string, values_.size(), kPrefix);
Expand All @@ -2438,6 +2474,16 @@ CmdRes& CommandCmd::EncodableSet::EncodeTo(CmdRes& res) const {
return res;
}

CommandCmd::EncodablePtr CommandCmd::EncodableSet::MergeFrom(const CommandCmd::EncodablePtr& other) const {
if (auto pe = std::dynamic_pointer_cast<CommandCmd::EncodableSet>(other)) {
auto values = std::vector<CommandCmd::EncodablePtr>(values_.cbegin(), values_.cend());
values.insert(values.end(), pe->values_.cbegin(), pe->values_.cend());
return std::make_shared<CommandCmd::EncodableSet>(values);
}
return std::make_shared<CommandCmd::EncodableSet>(
std::vector<CommandCmd::EncodablePtr>(values_.cbegin(), values_.cend()));
}

CmdRes& CommandCmd::EncodableArray::EncodeTo(CmdRes& res) const {
res.AppendArrayLen(values_.size());
for (const auto& item : values_) {
Expand All @@ -2446,28 +2492,40 @@ CmdRes& CommandCmd::EncodableArray::EncodeTo(CmdRes& res) const {
return res;
}

CommandCmd::EncodablePtr CommandCmd::EncodableArray::MergeFrom(const CommandCmd::EncodablePtr& other) const {
if (auto pe = std::dynamic_pointer_cast<CommandCmd::EncodableArray>(other)) {
auto values = std::vector<CommandCmd::EncodablePtr>(values_.cbegin(), values_.cend());
values.insert(values.end(), pe->values_.cbegin(), pe->values_.cend());
return std::make_shared<CommandCmd::EncodableArray>(values);
}
return std::make_shared<CommandCmd::EncodableArray>(
std::vector<CommandCmd::EncodablePtr>(values_.cbegin(), values_.cend()));
}

CmdRes& CommandCmd::EncodableStatus::EncodeTo(CmdRes& res) const {
res.AppendStringRaw(kPrefix + value_ + kNewLine);
return res;
}

const std::string CommandCmd::kNullReply = "_" + kNewLine;
CommandCmd::EncodablePtr CommandCmd::EncodableStatus::MergeFrom(const CommandCmd::EncodablePtr& other) const {
if (auto pe = std::dynamic_pointer_cast<CommandCmd::EncodableStatus>(other)) {
return std::make_shared<CommandCmd::EncodableStatus>(value_ + pe->value_);
}
return std::make_shared<CommandCmd::EncodableStatus>(value_);
}

const std::unordered_map<std::string, int> CommandCmd::CommandFieldCompare::kFieldNameOrder{
{"name", 0}, {"type", 1},
{"spec", 2}, {"index", 3},
{"display_text", 4}, {"key_spec_index", 5},
{"token", 6}, {"summary", 7},
{"since", 8}, {"group", 9},
{"complexity", 10}, {"module", 11},
{"doc_flags", 12}, {"deprecated_since", 13},
{"notes", 14}, {"flags", 15},
{"begin_search", 16}, {"replaced_by", 17},
{"history", 18}, {"arguments", 19},
{"subcommands", 20}, {"keyword", 21},
{"startfrom", 22}, {"find_keys", 23},
{"lastkey", 24}, {"keynum", 25},
{"keynumidx", 26}, {"firstkey", 27},
{"keystep", 28}, {"limit", 29},
{kPikaField, 0}, {"name", 100}, {"type", 101},
{"spec", 102}, {"index", 103}, {"display_text", 104},
{"key_spec_index", 105}, {"token", 106}, {"summary", 107},
{"since", 108}, {"group", 109}, {"complexity", 110},
{"module", 111}, {"doc_flags", 112}, {"deprecated_since", 113},
{"notes", 114}, {"flags", 15}, {"begin_search", 116},
{"replaced_by", 17}, {"history", 18}, {"arguments", 119},
{"subcommands", 120}, {"keyword", 121}, {"startfrom", 122},
{"find_keys", 123}, {"lastkey", 124}, {"keynum", 125},
{"keynumidx", 126}, {"firstkey", 127}, {"keystep", 128},
{"limit", 129},
};
const std::string CommandCmd::EncodableMap::kPrefix = "*";
const std::string CommandCmd::EncodableSet::kPrefix = "*";
Expand All @@ -2490,16 +2548,30 @@ void CommandCmd::DoInitial() {
cmds_end_ = argv_.cend();
}

extern std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;

void CommandCmd::Do(std::shared_ptr<Slot> slots) {
std::unordered_map<std::string, CommandCmd::EncodablePtr> cmds;
std::unordered_map<std::string, CommandCmd::EncodablePtr> specializations;
if (cmds_begin_ == cmds_end_) {
cmds = kCommandDocs;
specializations.insert(kPikaSpecialization.cbegin(), kPikaSpecialization.cend());
} else {
for (auto iter = cmds_begin_; iter != cmds_end_; ++iter) {
if (auto cmd = kCommandDocs.find(*iter); cmd != kCommandDocs.end()) {
cmds.insert(*cmd);
}
if (auto specialization = kPikaSpecialization.find(*iter); specialization != kPikaSpecialization.end()) {
specializations.insert(*specialization);
}
}
}
for (const auto& cmd : cmds) {
if (!g_pika_cmd_table_manager->CmdExist(cmd.first)) {
specializations[cmd.first] = kNotSupportedSpecialization;
} else if (auto iter = specializations.find(cmd.first); iter == specializations.end()) {
specializations[cmd.first] = kCompatibleSpecialization;
}
}
EncodableMap::EncodeTo(res_, cmds);
EncodableMap::EncodeTo(res_, cmds, specializations);
}
Loading

0 comments on commit 3196b3a

Please sign in to comment.