Skip to content

Commit

Permalink
txn:use weak ptr instead of shared ptr in Cmd
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Lee <1838249551@qq.com>
  • Loading branch information
ForestLH committed Sep 12, 2023
1 parent d497d08 commit e1da3da
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 20 deletions.
2 changes: 1 addition & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class PikaClientConn : public net::RedisConn {
void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t start_us, uint64_t do_duration);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared_ptr<std::string>& resp_ptr);
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr);
void TryWriteResp();

AuthStat auth_stat_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
std::string db_name_{};

std::weak_ptr<net::NetConn> conn_;
std::shared_ptr<std::string> resp_;
std::weak_ptr<std::string> resp_;
CmdStage stage_ = kNone;
uint64_t do_duration_ = 0;

Expand Down
2 changes: 1 addition & 1 deletion src/pika_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ void BitOpCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
set_args.emplace_back(value_to_dest_);
set_cmd_->Initial(set_args, db_name_);
set_cmd_->SetConn(GetConn());
set_cmd_->SetResp(resp_);
set_cmd_->SetResp(resp_.lock());
//value of this binlog might be strange if you print it out(eg. set bitkey_out1 «ѦFO<t·), but it's ok.
set_cmd_->DoBinlog(slot);
}
3 changes: 1 addition & 2 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ void PikaClientConn::TryWriteResp() {
}
void PikaClientConn::PushCmdToQue(std::shared_ptr<Cmd> cmd) {
txn_cmd_que_.push(cmd);
cmd->SetResp(std::make_shared<std::string>());
}

bool PikaClientConn::IsInTxn() {
Expand Down Expand Up @@ -390,7 +389,7 @@ void PikaClientConn::ExitTxn() {
}


void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared_ptr<std::string>& resp_ptr) {
void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr) {
// get opt
std::string opt = argv[0];
pstd::StringToLower(opt);
Expand Down
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,6 @@ std::shared_ptr<net::NetConn> Cmd::GetConn() { return conn_.lock(); }

void Cmd::SetResp(const std::shared_ptr<std::string>& resp) { resp_ = resp; }

std::shared_ptr<std::string> Cmd::GetResp() { return resp_; }
std::shared_ptr<std::string> Cmd::GetResp() { return resp_.lock(); }

void Cmd::SetStage(CmdStage stage) { stage_ = stage; }
2 changes: 1 addition & 1 deletion src/pika_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void PfMergeCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
set_args.emplace_back(value_to_dest_);
set_cmd_->Initial(set_args, db_name_);
set_cmd_->SetConn(GetConn());
set_cmd_->SetResp(resp_);
set_cmd_->SetResp(resp_.lock());
//value of this binlog might be strange, it's an string with size of 128KB
set_cmd_->DoBinlog(slot);
}
4 changes: 2 additions & 2 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ void MsetCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
//used "set" instead of "SET" to distinguish the binlog of Set
set_argv[0] = "set";
set_cmd_->SetConn(GetConn());
set_cmd_->SetResp(resp_);
set_cmd_->SetResp(resp_.lock());
for(auto& kv: kvs_){
set_argv[1] = kv.key;
set_argv[2] = kv.value;
Expand Down Expand Up @@ -740,7 +740,7 @@ void MsetnxCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
//used "set" instead of "SET" to distinguish the binlog of SetCmd
set_argv[0] = "set";
set_cmd_->SetConn(GetConn());
set_cmd_->SetResp(resp_);
set_cmd_->SetResp(resp_.lock());
for(auto& kv: kvs_){
set_argv[1] = kv.key;
set_argv[2] = kv.value;
Expand Down
4 changes: 2 additions & 2 deletions src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -632,9 +632,9 @@ void RPopLPushCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
lpush_cmd_->Initial(lpush_args, db_name_);

rpop_cmd_->SetConn(GetConn());
rpop_cmd_->SetResp(resp_);
rpop_cmd_->SetResp(resp_.lock());
lpush_cmd_->SetConn(GetConn());
lpush_cmd_->SetResp(resp_);
lpush_cmd_->SetResp(resp_.lock());

rpop_cmd_->DoBinlog(slot);
lpush_cmd_->DoBinlog(slot);
Expand Down
8 changes: 4 additions & 4 deletions src/pika_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void SetOperationCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
del_args.emplace_back(dest_key_);
del_cmd_->Initial(del_args, db_name_);
del_cmd_->SetConn(GetConn());
del_cmd_->SetResp(resp_);
del_cmd_->SetResp(resp_.lock());
del_cmd_->DoBinlog(slot);

if(value_to_dest_.size() == 0){
Expand All @@ -246,7 +246,7 @@ void SetOperationCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
initial_args.emplace_back(value_to_dest_[0]);
sadd_cmd_->Initial(initial_args, db_name_);
sadd_cmd_->SetConn(GetConn());
sadd_cmd_->SetResp(resp_);
sadd_cmd_->SetResp(resp_.lock());

auto& sadd_argv = sadd_cmd_->argv();
size_t data_size = value_to_dest_[0].size();
Expand Down Expand Up @@ -405,9 +405,9 @@ void SMoveCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
sadd_cmd_->Initial(sadd_args, db_name_);

srem_cmd_->SetConn(GetConn());
srem_cmd_->SetResp(resp_);
srem_cmd_->SetResp(resp_.lock());
sadd_cmd_->SetConn(GetConn());
sadd_cmd_->SetResp(resp_);
sadd_cmd_->SetResp(resp_.lock());

srem_cmd_->DoBinlog(slot);
sadd_cmd_->DoBinlog(slot);
Expand Down
8 changes: 7 additions & 1 deletion src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ void ExecCmd::Do(std::shared_ptr<Slot> slot) {
auto conn = GetConn();
auto client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
std::vector<CmdRes> res_vec = {};
std::for_each(cmds_.begin(), cmds_.end(), [&client_conn, &res_vec](CmdInfo& each_cmd_info) {
std::vector<std::shared_ptr<std::string>> resp_strs;
for (int i = 0; i < cmds_.size(); ++i) {
resp_strs.emplace_back(std::make_shared<std::string>());
}
auto resp_strs_iter = resp_strs.begin();
std::for_each(cmds_.begin(), cmds_.end(), [&client_conn, &res_vec, &resp_strs_iter](CmdInfo& each_cmd_info) {
each_cmd_info.cmd_->SetResp(*resp_strs_iter++);
auto& cmd = each_cmd_info.cmd_;
auto& slot = each_cmd_info.slot_;
auto sync_slot = each_cmd_info.sync_slot_;
Expand Down
8 changes: 4 additions & 4 deletions src/pika_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ void ZUnionstoreCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
del_args.emplace_back(dest_key_);
del_cmd_->Initial(del_args, db_name_);
del_cmd_->SetConn(GetConn());
del_cmd_->SetResp(resp_);
del_cmd_->SetResp(resp_.lock());
del_cmd_->DoBinlog(slot);

if(value_to_dest_.empty()){
Expand All @@ -557,7 +557,7 @@ void ZUnionstoreCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
value_to_dest_.erase(value_to_dest_.begin());
zadd_cmd_->Initial(initial_args, db_name_);
zadd_cmd_->SetConn(GetConn());
zadd_cmd_->SetResp(resp_);
zadd_cmd_->SetResp(resp_.lock());

auto& zadd_argv = zadd_cmd_->argv();
size_t data_size = d_len + zadd_argv[3].size();
Expand Down Expand Up @@ -603,7 +603,7 @@ void ZInterstoreCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
del_args.emplace_back(dest_key_);
del_cmd_->Initial(del_args, db_name_);
del_cmd_->SetConn(GetConn());
del_cmd_->SetResp(resp_);
del_cmd_->SetResp(resp_.lock());
del_cmd_->DoBinlog(slot);

if(value_to_dest_.size() == 0){
Expand All @@ -620,7 +620,7 @@ void ZInterstoreCmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
initial_args.emplace_back(value_to_dest_[0].member);
zadd_cmd_->Initial(initial_args, db_name_);
zadd_cmd_->SetConn(GetConn());
zadd_cmd_->SetResp(resp_);
zadd_cmd_->SetResp(resp_.lock());

auto& zadd_argv = zadd_cmd_->argv();
size_t data_size = d_len + value_to_dest_[0].member.size();
Expand Down

0 comments on commit e1da3da

Please sign in to comment.