diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index c6a38a46d3..2d82f61854 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -110,7 +110,7 @@ class PikaClientConn : public net::RedisConn { void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t start_us, uint64_t do_duration); void ProcessMonitor(const PikaCmdArgsType& argv); - void ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared_ptr& resp_ptr); + void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr); void TryWriteResp(); AuthStat auth_stat_; diff --git a/include/pika_command.h b/include/pika_command.h index 4f7fb18806..a5fd6f69ab 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -531,7 +531,7 @@ class Cmd : public std::enable_shared_from_this { std::string db_name_{}; std::weak_ptr conn_; - std::shared_ptr resp_; + std::weak_ptr resp_; CmdStage stage_ = kNone; uint64_t do_duration_ = 0; diff --git a/src/pika_bit.cc b/src/pika_bit.cc index a2a57f5255..8b187164d8 100644 --- a/src/pika_bit.cc +++ b/src/pika_bit.cc @@ -225,7 +225,7 @@ void BitOpCmd::DoBinlog(const std::shared_ptr& slot) { set_args.emplace_back(value_to_dest_); set_cmd_->Initial(set_args, db_name_); set_cmd_->SetConn(GetConn()); - set_cmd_->SetResp(resp_); + set_cmd_->SetResp(resp_.lock()); //value of this binlog might be strange if you print it out(eg. set bitkey_out1 «ѦFODoBinlog(slot); } diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 01835e66fe..2c79afa4b0 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -280,7 +280,6 @@ void PikaClientConn::TryWriteResp() { } void PikaClientConn::PushCmdToQue(std::shared_ptr cmd) { txn_cmd_que_.push(cmd); - cmd->SetResp(std::make_shared()); } bool PikaClientConn::IsInTxn() { @@ -390,7 +389,7 @@ void PikaClientConn::ExitTxn() { } -void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared_ptr& resp_ptr) { +void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr) { // get opt std::string opt = argv[0]; pstd::StringToLower(opt); diff --git a/src/pika_command.cc b/src/pika_command.cc index 699cc52c95..6207db77d1 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -927,6 +927,6 @@ std::shared_ptr Cmd::GetConn() { return conn_.lock(); } void Cmd::SetResp(const std::shared_ptr& resp) { resp_ = resp; } -std::shared_ptr Cmd::GetResp() { return resp_; } +std::shared_ptr Cmd::GetResp() { return resp_.lock(); } void Cmd::SetStage(CmdStage stage) { stage_ = stage; } diff --git a/src/pika_hyperloglog.cc b/src/pika_hyperloglog.cc index 9fb6c263b7..0cb6615a73 100644 --- a/src/pika_hyperloglog.cc +++ b/src/pika_hyperloglog.cc @@ -79,7 +79,7 @@ void PfMergeCmd::DoBinlog(const std::shared_ptr& slot) { set_args.emplace_back(value_to_dest_); set_cmd_->Initial(set_args, db_name_); set_cmd_->SetConn(GetConn()); - set_cmd_->SetResp(resp_); + set_cmd_->SetResp(resp_.lock()); //value of this binlog might be strange, it's an string with size of 128KB set_cmd_->DoBinlog(slot); } diff --git a/src/pika_kv.cc b/src/pika_kv.cc index d2891476c2..7886a9cc73 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -692,7 +692,7 @@ void MsetCmd::DoBinlog(const std::shared_ptr& slot) { //used "set" instead of "SET" to distinguish the binlog of Set set_argv[0] = "set"; set_cmd_->SetConn(GetConn()); - set_cmd_->SetResp(resp_); + set_cmd_->SetResp(resp_.lock()); for(auto& kv: kvs_){ set_argv[1] = kv.key; set_argv[2] = kv.value; @@ -740,7 +740,7 @@ void MsetnxCmd::DoBinlog(const std::shared_ptr& slot) { //used "set" instead of "SET" to distinguish the binlog of SetCmd set_argv[0] = "set"; set_cmd_->SetConn(GetConn()); - set_cmd_->SetResp(resp_); + set_cmd_->SetResp(resp_.lock()); for(auto& kv: kvs_){ set_argv[1] = kv.key; set_argv[2] = kv.value; diff --git a/src/pika_list.cc b/src/pika_list.cc index e1ae13513e..867be2fec9 100644 --- a/src/pika_list.cc +++ b/src/pika_list.cc @@ -632,9 +632,9 @@ void RPopLPushCmd::DoBinlog(const std::shared_ptr& slot) { lpush_cmd_->Initial(lpush_args, db_name_); rpop_cmd_->SetConn(GetConn()); - rpop_cmd_->SetResp(resp_); + rpop_cmd_->SetResp(resp_.lock()); lpush_cmd_->SetConn(GetConn()); - lpush_cmd_->SetResp(resp_); + lpush_cmd_->SetResp(resp_.lock()); rpop_cmd_->DoBinlog(slot); lpush_cmd_->DoBinlog(slot); diff --git a/src/pika_set.cc b/src/pika_set.cc index 59f74d1621..2d6df4648e 100644 --- a/src/pika_set.cc +++ b/src/pika_set.cc @@ -232,7 +232,7 @@ void SetOperationCmd::DoBinlog(const std::shared_ptr& slot) { del_args.emplace_back(dest_key_); del_cmd_->Initial(del_args, db_name_); del_cmd_->SetConn(GetConn()); - del_cmd_->SetResp(resp_); + del_cmd_->SetResp(resp_.lock()); del_cmd_->DoBinlog(slot); if(value_to_dest_.size() == 0){ @@ -246,7 +246,7 @@ void SetOperationCmd::DoBinlog(const std::shared_ptr& slot) { initial_args.emplace_back(value_to_dest_[0]); sadd_cmd_->Initial(initial_args, db_name_); sadd_cmd_->SetConn(GetConn()); - sadd_cmd_->SetResp(resp_); + sadd_cmd_->SetResp(resp_.lock()); auto& sadd_argv = sadd_cmd_->argv(); size_t data_size = value_to_dest_[0].size(); @@ -405,9 +405,9 @@ void SMoveCmd::DoBinlog(const std::shared_ptr& slot) { sadd_cmd_->Initial(sadd_args, db_name_); srem_cmd_->SetConn(GetConn()); - srem_cmd_->SetResp(resp_); + srem_cmd_->SetResp(resp_.lock()); sadd_cmd_->SetConn(GetConn()); - sadd_cmd_->SetResp(resp_); + sadd_cmd_->SetResp(resp_.lock()); srem_cmd_->DoBinlog(slot); sadd_cmd_->DoBinlog(slot); diff --git a/src/pika_transaction.cc b/src/pika_transaction.cc index a8fe1b8499..b4a849fb40 100644 --- a/src/pika_transaction.cc +++ b/src/pika_transaction.cc @@ -42,7 +42,13 @@ void ExecCmd::Do(std::shared_ptr slot) { auto conn = GetConn(); auto client_conn = std::dynamic_pointer_cast(conn); std::vector res_vec = {}; - std::for_each(cmds_.begin(), cmds_.end(), [&client_conn, &res_vec](CmdInfo& each_cmd_info) { + std::vector> resp_strs; + for (int i = 0; i < cmds_.size(); ++i) { + resp_strs.emplace_back(std::make_shared()); + } + auto resp_strs_iter = resp_strs.begin(); + std::for_each(cmds_.begin(), cmds_.end(), [&client_conn, &res_vec, &resp_strs_iter](CmdInfo& each_cmd_info) { + each_cmd_info.cmd_->SetResp(*resp_strs_iter++); auto& cmd = each_cmd_info.cmd_; auto& slot = each_cmd_info.slot_; auto sync_slot = each_cmd_info.sync_slot_; diff --git a/src/pika_zset.cc b/src/pika_zset.cc index 6be99b9a6f..6d26552113 100644 --- a/src/pika_zset.cc +++ b/src/pika_zset.cc @@ -538,7 +538,7 @@ void ZUnionstoreCmd::DoBinlog(const std::shared_ptr& slot) { del_args.emplace_back(dest_key_); del_cmd_->Initial(del_args, db_name_); del_cmd_->SetConn(GetConn()); - del_cmd_->SetResp(resp_); + del_cmd_->SetResp(resp_.lock()); del_cmd_->DoBinlog(slot); if(value_to_dest_.empty()){ @@ -557,7 +557,7 @@ void ZUnionstoreCmd::DoBinlog(const std::shared_ptr& slot) { value_to_dest_.erase(value_to_dest_.begin()); zadd_cmd_->Initial(initial_args, db_name_); zadd_cmd_->SetConn(GetConn()); - zadd_cmd_->SetResp(resp_); + zadd_cmd_->SetResp(resp_.lock()); auto& zadd_argv = zadd_cmd_->argv(); size_t data_size = d_len + zadd_argv[3].size(); @@ -603,7 +603,7 @@ void ZInterstoreCmd::DoBinlog(const std::shared_ptr& slot) { del_args.emplace_back(dest_key_); del_cmd_->Initial(del_args, db_name_); del_cmd_->SetConn(GetConn()); - del_cmd_->SetResp(resp_); + del_cmd_->SetResp(resp_.lock()); del_cmd_->DoBinlog(slot); if(value_to_dest_.size() == 0){ @@ -620,7 +620,7 @@ void ZInterstoreCmd::DoBinlog(const std::shared_ptr& slot) { initial_args.emplace_back(value_to_dest_[0].member); zadd_cmd_->Initial(initial_args, db_name_); zadd_cmd_->SetConn(GetConn()); - zadd_cmd_->SetResp(resp_); + zadd_cmd_->SetResp(resp_.lock()); auto& zadd_argv = zadd_cmd_->argv(); size_t data_size = d_len + value_to_dest_[0].member.size();