Skip to content

Commit

Permalink
feature: add txn for pika completely
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Lee <1838249551@qq.com>
  • Loading branch information
ForestLH committed Jul 9, 2023
1 parent 0404fd1 commit 54b8825
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 154 deletions.
13 changes: 4 additions & 9 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ class PikaClientConn : public net::RedisConn {

bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentTable(const std::string& db_name) { current_db_ = db_name; }
void SetCurrentDb(const std::string& db_name) { current_db_ = db_name; }
std::string GetCurrentDb() { return current_db_; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }

// Txn
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
std::vector<CmdRes> ExecTxnCmds();
std::shared_ptr<Cmd> PopCmdFromQue();
std::queue<std::shared_ptr<Cmd>> GetTxnCmdQue();
void ClearTxnCmdQue();
bool IsInTxn();
bool IsTxnFailed();
Expand All @@ -82,9 +82,7 @@ class PikaClientConn : public net::RedisConn {

void AddKeysToWatch(const std::vector<std::string> &db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string> &table_keys = {});
std::vector<std::string> GetTxnInvolvedDbs() { return txn_exec_dbs_; }
std::mutex& GetTxnDbMutex() { return txn_db_mu_; }
void SetTxnFailedFromKeys(const std::vector<std::string> & db_keys = {});
void ExitTxn();

net::ServerThread* server_thread() { return server_thread_; }
Expand All @@ -102,10 +100,7 @@ class PikaClientConn : public net::RedisConn {
std::queue<std::shared_ptr<Cmd>> txn_cmd_que_; // redis事务的队列
std::bitset<16> txn_state_; // class TxnStateBitMask
std::unordered_set<std::string> watched_db_keys_;
std::vector<std::string> txn_exec_dbs_;
std::mutex txn_state_mu_; // 用于锁事务状态
std::mutex txn_db_mu_; // 在执行事务的时候,采用加db锁的方式加锁,那么就会有多个db被加锁,那么就会有加锁顺序的问题,所以加了这把大锁
// 在void Cmd::ProcessExecCmd();中使用这把锁

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);
Expand Down
9 changes: 3 additions & 6 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ class CmdRes {

class Cmd : public std::enable_shared_from_this<Cmd> {
public:
//! TODO(lee) : 这里是不是加一个queued比较好?
enum CmdStage { kNone, kBinlogStage, kExecuteStage };
struct HintKeys {
HintKeys() = default;
Expand Down Expand Up @@ -461,7 +460,6 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
virtual void Execute();
virtual void ProcessFlushDBCmd();
virtual void ProcessFlushAllCmd();
virtual void ProcessExecCmd();
virtual void ProcessSingleSlotCmd();
virtual void ProcessMultiSlotCmd();
virtual void ProcessDoNotSpecifySlotCmd();
Expand All @@ -482,9 +480,8 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool is_multi_slot() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };
void SetDbName(const std::string& db_name) {
db_name_ = db_name;
}
void SetDbName(const std::string& db_name) { db_name_ = db_name; }
std::string GetDBName() { return db_name_; }

std::string name() const;
CmdRes& res();
Expand Down Expand Up @@ -522,7 +519,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
protected:
CmdRes res_;
PikaCmdArgsType argv_;
std::string db_name_;
std::string db_name_{};

std::weak_ptr<net::NetConn> conn_;
std::shared_ptr<std::string> resp_;
Expand Down
1 change: 1 addition & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
friend class InfoCmd;
friend class PkClusterInfoCmd;
friend class PikaServer;
friend class ExecCmd;

std::string GetDBName();
void BgSaveDB();
Expand Down
3 changes: 0 additions & 3 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ class SetCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new SetCmd(*this); }
~SetCmd() override {
LOG(INFO) << "SetCmd dector";
}
private:
std::string key_;
std::string value_;
Expand Down
4 changes: 3 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
#include "include/pika_migrate_thread.h"
#include "include/pika_statistic.h"
#include "include/pika_slot_command.h"
#include "include/pika_migrate_thread.h"
#include "include/pika_transaction.h"



Expand Down Expand Up @@ -459,6 +460,7 @@ class PikaServer : public pstd::noncopyable {
friend class InfoCmd;
friend class PikaReplClientConn;
friend class PkClusterInfoCmd;
friend class ExecCmd;

private:
/*
Expand Down
6 changes: 2 additions & 4 deletions include/pika_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ class ExecCmd : public Cmd {
ExecCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
Cmd* Clone() override { return new ExecCmd(*this); }
// void Execute() override;
void Execute() override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {}
void Merge() override {}
//NOTE(leeHao): 这个命令的key无法确定,因为每个key的db可能不一样
std::vector<std::string> current_key() const override { return {}; }
std::vector<std::string> GetInvolvedSlots();

private:
void DoInitial() override;
std::vector<std::string> keys_;
Expand All @@ -56,6 +53,7 @@ class WatchCmd : public Cmd {
WatchCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}

void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Execute() override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {}
Cmd* Clone() override { return new WatchCmd(*this); }
void Merge() override {}
Expand Down
2 changes: 1 addition & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ void SelectCmd::Do(std::shared_ptr<Slot> slot) {
LOG(WARNING) << name_ << " weak ptr is empty";
return;
}
conn->SetCurrentTable(db_name_);
conn->SetCurrentDb(db_name_);
res_.SetRes(CmdRes::kOk);
}

Expand Down
2 changes: 1 addition & 1 deletion src/pika_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ void BitOpCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
set_args.emplace_back(value_to_dest_);
set_cmd_->Initial(std::move(set_args), db_name_);
set_cmd_->SetConn(GetConn());
set_cmd_->SetResp(resp_.lock());
set_cmd_->SetResp(resp_);
//value of this binlog might be strange if you print it out(eg. set bitkey_out1 «ѦFO<t·), but it's ok.
set_cmd_->DoBinlog(slot);
}
52 changes: 10 additions & 42 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->res().SetRes(CmdRes::kTxnQueued);
return c_ptr;
}
//! 更新一下qps之类的
g_pika_server->UpdateQueryNumAndExecCountDB(current_db_, opt, c_ptr->is_write());

// PubSub connection
Expand Down Expand Up @@ -117,9 +116,6 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->res().SetRes(CmdRes::kErrOther, "Writing binlog failed, maybe no space left on device");
return c_ptr;
}
//! 这里有个问题,就是exec这个命令应该算是write的,那么该如何得到他的key呢
// 所以我在下面的这里加了opt != exec,因为exec没法得到key,
//TODO(leeHao): 这里还是看看吧
std::vector<std::string> cur_key = c_ptr->current_key();
if (cur_key.empty() && opt != kCmdNameExec) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Internal ERROR");
Expand All @@ -141,10 +137,9 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
(*iter)[opt]->state.cmd_count.fetch_add(1);
(*iter)[opt]->state.cmd_time_consuming.fetch_add(duration);

//NOTE(leeHao): exec命令的话,会在ExecTxnCmds函数中去设置,因为可能部分成功,部分不成功,并且里面可能还会有select命令
if (c_ptr->res().ok() && c_ptr->is_write() && name() != kCmdNameExec) {
if (c_ptr->name() == kCmdNameFlushdb || c_ptr->name() == kCmdNameFlushall) {

SetTxnFailedFromKeys(); // 这里就将所有watch的事务给设置成为失败状态
} else {
auto table_keys = c_ptr->current_key();
for (auto& key : table_keys) {
Expand Down Expand Up @@ -290,15 +285,13 @@ void PikaClientConn::TryWriteResp() {
write_completed_cb_();
write_completed_cb_ = nullptr;
}
//! 这里会将resp的string指针给清掉,所以后面执行的时候就会error
resp_array.clear();
NotifyEpoll(true);
}
}
void PikaClientConn::PushCmdToQue(std::shared_ptr<Cmd> cmd) {
txn_cmd_que_.push(cmd);
cmd->SetResp(std::make_shared<std::string>());
txn_exec_dbs_.emplace_back(cmd->db_name());
}

bool PikaClientConn::IsInTxn() {
Expand Down Expand Up @@ -339,34 +332,6 @@ void PikaClientConn::SetTxnStartState(bool is_start) {
txn_state_[TxnStateBitMask::Start] = is_start;
}


std::vector<CmdRes> PikaClientConn::ExecTxnCmds() {
auto ret_res = std::vector<CmdRes>{};
while (!txn_cmd_que_.empty()) {
auto cmd = txn_cmd_que_.front();
txn_cmd_que_.pop();
cmd->res().SetRes(CmdRes::CmdRet::kNone);
cmd->SetDbName(current_db_);
cmd->ProcessSingleSlotCmd();
if (cmd->res().ok() && cmd->is_write()) {
auto db_keys = cmd->current_key();
for (auto& item : db_keys) {
item = cmd->db_name().append(item);
}
SetTxnFailedFromKeys(db_keys);
}
ret_res.emplace_back(cmd->res());
}
return ret_res;
}
std::shared_ptr<Cmd> PikaClientConn::PopCmdFromQue() {
if (txn_cmd_que_.empty()) {
return nullptr;
}
auto ret = txn_cmd_que_.front();
txn_cmd_que_.pop();
return ret;
}
void PikaClientConn::ClearTxnCmdQue() {
txn_cmd_que_ = std::queue<std::shared_ptr<Cmd>>{};
}
Expand All @@ -384,25 +349,25 @@ void PikaClientConn::AddKeysToWatch(const std::vector<std::string> &db_keys) {
}

void PikaClientConn::RemoveWatchedKeys() {
auto worker_thread = dynamic_cast<net::WorkerThread *>(server_thread());
if (worker_thread != nullptr) {
auto dispatcher = dynamic_cast<net::DispatchThread *>(worker_thread->GetServerThread());
auto dispatcher = dynamic_cast<net::DispatchThread *>(server_thread());
if (dispatcher != nullptr) {
watched_db_keys_.clear();
dispatcher->RemoveWatchKeys(shared_from_this());
}
}

/**
* @param db_keys 如果为空的话,代表将所有事务都设置成为失败,一般给flush此类命令使用
* @brief 去修改被watch的key的一些连接
*/
void PikaClientConn::SetTxnFailedFromKeys(const std::vector<std::string> &table_keys) {
void PikaClientConn::SetTxnFailedFromKeys(const std::vector<std::string> & db_keys) {
auto dispatcher = dynamic_cast<net::DispatchThread *>(server_thread());
if (dispatcher != nullptr) {
auto involved_conns = std::vector<std::shared_ptr<NetConn>>{};
if (table_keys.empty()) {
if (db_keys.empty()) {
involved_conns = dispatcher->GetAllTxns();
} else {
involved_conns = dispatcher->GetInvolvedTxn(table_keys);
involved_conns = dispatcher->GetInvolvedTxn(db_keys);
}
for (auto &conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr && c.get() != this) {
Expand Down Expand Up @@ -439,6 +404,9 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared
resp_num--;
}
}
std::queue<std::shared_ptr<Cmd>> PikaClientConn::GetTxnCmdQue() {
return txn_cmd_que_;
}

// Initial permission status
void PikaClientConn::AuthStat::Init() {
Expand Down
53 changes: 6 additions & 47 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -678,13 +678,15 @@ void InitCmdTable(CmdTable* cmd_table) {

// Transaction
////Multi
std::unique_ptr<Cmd> multiptr = std::make_unique<MultiCmd>(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsWrite);
std::unique_ptr<Cmd> multiptr =
std::make_unique<MultiCmd>(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsMultiSlot);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameMulti, std::move(multiptr)));
////Exec
std::unique_ptr<Cmd> execptr = std::make_unique<ExecCmd>(kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite);
std::unique_ptr<Cmd> execptr =
std::make_unique<ExecCmd>(kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsMultiSlot | kCmdFlagsSuspend);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameExec, std::move(execptr)));
////Discard
std::unique_ptr<Cmd> discardptr = std::make_unique<DiscardCmd>(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsWrite);
std::unique_ptr<Cmd> discardptr = std::make_unique<DiscardCmd>(kCmdNameDiscard, 1, kCmdFlagsRead);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameDiscard, std::move(discardptr)));
////Watch
std::unique_ptr<Cmd> watchptr = std::make_unique<WatchCmd>(kCmdNameWatch, -2, kCmdFlagsRead);
Expand Down Expand Up @@ -723,8 +725,6 @@ void Cmd::Execute() {
ProcessFlushAllCmd();
} else if (name_ == kCmdNameInfo || name_ == kCmdNameConfig) {
ProcessDoNotSpecifySlotCmd();
} else if (name_ == kCmdNameExec) {
ProcessExecCmd();
} else {
ProcessSingleSlotCmd();
}
Expand Down Expand Up @@ -778,38 +778,7 @@ void Cmd::ProcessFlushAllCmd() {
}
res_.SetRes(CmdRes::kOk);
}
/**
* TODO(leeHao): 这个exec命令本身的binlog我不确定如何处理,我感觉应该还是需要写的,因为如果执行队列中的命令,执行到了一半但是退出了
* 那么下次启动的时候,应该回滚部分操作?
* TODO(leeHao): 还有个得到执行时间的函数,这个统计命令时间,是统计事务队列中,每个命令单独统计还是统计一整个队列的执行时长
* 这里因为是多个db的锁,所以需要注意执行顺序,否则就会死锁,我的选择是,先拿取一把大锁
* 然后将所有的锁给拿齐了,再释放掉这把大锁
*/
void Cmd::ProcessExecCmd() {
auto conn_ptr = GetConn();
if (auto cli_conn = std::dynamic_pointer_cast<PikaClientConn>(conn_ptr); cli_conn != nullptr) {
std::lock_guard<std::mutex> lg(cli_conn->GetTxnDbMutex());
auto dbs = cli_conn->GetTxnInvolvedDbs();
auto slots = std::unordered_set<std::shared_ptr<Slot>>{};
for (const auto& db_name : dbs) {
slots.emplace(g_pika_server->GetSlotByDBName(db_name));
}

for (const auto& slot : slots) {
slot->DbRWLockWriter();
}
Do();
const auto cur_slot = g_pika_server->GetSlotByDBName(db_name());
std::shared_ptr<SyncMasterSlot> sync_slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(cur_slot->GetDBName(), cur_slot->GetSlotID()));
DoBinlog(sync_slot);
for (const auto& slot : slots) {
slot->DbRWUnLock();
}
} else {
res_.SetRes(CmdRes::kErrOther, "Client connection error");
}
}
//TODO(leeHao): 还有个得到执行时间的函数,这个统计命令时间,是统计事务队列中,每个命令单独统计还是统计一整个队列的执行时长

void Cmd::ProcessSingleSlotCmd() {
std::shared_ptr<Slot> slot;
Expand Down Expand Up @@ -865,17 +834,8 @@ void Cmd::InternalProcessCommand(const std::shared_ptr<Slot>& slot, const std::s
}
}

//NOTE 这里会加锁
// 这不是一个override的函数
void Cmd::DoCommand(const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys) {
if (!is_suspend()) {
auto cli_conn = std::dynamic_pointer_cast<PikaClientConn>(GetConn());
if (cli_conn != nullptr) {
if (cli_conn->IsInTxn()) {
Do(slot);
return ;
}
}
slot->DbRWLockReader();
}

Expand Down Expand Up @@ -913,7 +873,6 @@ void Cmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
}
}

//NOTE 虽然这里multiSlot,但是好像只能针对于一个db,所以这里无法在exec中使用
void Cmd::ProcessMultiSlotCmd() {
std::shared_ptr<Slot> slot;
std::vector<std::string> cur_key = current_key();
Expand Down
2 changes: 1 addition & 1 deletion src/pika_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void PfMergeCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
set_args.emplace_back(value_to_dest_);
set_cmd_->Initial(std::move(set_args), db_name_);
set_cmd_->SetConn(GetConn());
set_cmd_->SetResp(resp_.lock());
set_cmd_->SetResp(resp_);
//value of this binlog might be strange, it's an string with size of 128KB
set_cmd_->DoBinlog(slot);
}
4 changes: 2 additions & 2 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ void MsetCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
//used "set" instead of "SET" to distinguish the binlog of Set
set_argv[0] = "set";
set_cmd_->SetConn(GetConn());
set_cmd_->SetResp(resp_.lock());
set_cmd_->SetResp(resp_);
for(auto& kv: kvs_){
set_argv[1] = kv.key;
set_argv[2] = kv.value;
Expand Down Expand Up @@ -740,7 +740,7 @@ void MsetnxCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
//used "set" instead of "SET" to distinguish the binlog of SetCmd
set_argv[0] = "set";
set_cmd_->SetConn(GetConn());
set_cmd_->SetResp(resp_.lock());
set_cmd_->SetResp(resp_);
for(auto& kv: kvs_){
set_argv[1] = kv.key;
set_argv[2] = kv.value;
Expand Down
Loading

0 comments on commit 54b8825

Please sign in to comment.