Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/txn #1585

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3541c5a
fix: fix select cmd return inconsistent with redis
ForestLH Jul 16, 2023
da524e0
refactor:modified lock style while involve db level
ForestLH Jul 13, 2023
821434f
feature:txn basic
ForestLH May 24, 2023
0823cb6
fix:merge upstream
ForestLH May 27, 2023
b9e92ee
feature:txn udpate
ForestLH Jun 3, 2023
a24b9ec
feature:add txn for pika(#1446)
ForestLH Jun 4, 2023
3439146
update unwatch cmd
ForestLH Jun 4, 2023
c8ca075
clear watched key when connection closed
ForestLH Jun 4, 2023
867dbfb
merge upstream code
ForestLH Jul 4, 2023
bc24a33
update
ForestLH Jul 6, 2023
d3c63a4
feature: add txn for pika completely
ForestLH Jul 9, 2023
20f7148
add set txn failed for modified watch key
ForestLH Jul 15, 2023
2a0f9a5
update:reduce the particle size of the lock in txn
ForestLH Jul 16, 2023
7b6e279
merge upsteam code
ForestLH Jul 25, 2023
3e4cae9
chore:remove redundant comment
ForestLH Jul 26, 2023
5a3aacf
test:add go ci test for txn
ForestLH Jul 31, 2023
fcf3015
Merge remote-tracking branch 'upstream/unstable' into feature/txn
ForestLH Jul 31, 2023
6f97281
fix compile error for linux
ForestLH Aug 1, 2023
8ed5dc1
Merge remote-tracking branch 'upstream/unstable' into feature/txn
ForestLH Aug 5, 2023
ebc3abf
update txn go ci test
ForestLH Aug 6, 2023
684bcca
update txn for block list pop command
ForestLH Aug 6, 2023
d553036
Improve blpop-related in Redis transactions
ForestLH Aug 7, 2023
84651a4
blpop_txn_fix
cheniujh Aug 8, 2023
dca87ee
add some test for go test txn
ForestLH Aug 8, 2023
bd78516
Merge remote-tracking branch 'upstream/unstable' into feature/txn
ForestLH Sep 5, 2023
f664260
update txn integration test
ForestLH Sep 6, 2023
d497d08
txn change class to struct
ForestLH Sep 12, 2023
e1da3da
txn:use weak ptr instead of shared ptr in Cmd
ForestLH Sep 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "storage/storage.h"

#include "include/pika_command.h"
#include "pika_db.h"

/*
* Admin
Expand Down Expand Up @@ -149,6 +150,7 @@ class SelectCmd : public Cmd {
void DoInitial() override;
void Clear() override { db_name_.clear(); }
std::string db_name_;
std::shared_ptr<DB> select_db_;
};

class FlushallCmd : public Cmd {
Expand All @@ -158,11 +160,13 @@ class FlushallCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }

void Execute() override;
void FlushAllWithoutLock();
private:
void DoInitial() override;
std::string ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum,
uint64_t offset) override;
void DoWithoutLock(std::shared_ptr<Slot> slot);
};

class FlushdbCmd : public Cmd {
Expand All @@ -172,11 +176,15 @@ class FlushdbCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushdbCmd(*this); }
void FlushAllSlotsWithoutLock(std::shared_ptr<DB> db);
void Execute() override;
std::string GetFlushDname() { return db_name_; }

private:
std::string db_name_;
void DoInitial() override;
void Clear() override { db_name_.clear(); }
void DoWithoutLock(std::shared_ptr<Slot> slot);
};

class ClientCmd : public Cmd {
Expand Down Expand Up @@ -219,6 +227,7 @@ class InfoCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new InfoCmd(*this); }
void Execute() override;

private:
InfoSection info_section_;
Expand Down Expand Up @@ -280,6 +289,7 @@ class ConfigCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new ConfigCmd(*this); }
void Execute() override;

private:
std::vector<std::string> config_args_v_;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some suggestions for code review and improvements:

  1. Make sure the necessary header files are included, such as "pika_db.h". Check if all required dependencies are correctly included to avoid compilation errors.

  2. Review the changes made to the SelectCmd class. Check if the addition of the select_db_ member variable and related methods (Execute, FlushAllWithoutLock, and DoWithoutLock) are implemented correctly. Verify that these changes do not introduce any new bugs or affect the overall functionality of the class.

  3. For the FlushallCmd class, check the implementation of the new methods Execute, FlushAllWithoutLock, and DoWithoutLock. Ensure that they are correctly implemented and handle flushing behavior appropriately. Review the existing methods (Split, Merge, Clone, DoInitial, and ToBinlog) for any related modifications required due to the new changes.

  4. Similarly, review the changes made to the FlushdbCmd class, including the implementation of the new methods FlushAllSlotsWithoutLock, Execute, GetFlushDname, and DoWithoutLock. Verify that these changes are consistent with the expected behavior of the class and do not introduce any bugs. Review the existing methods (Split, Merge, Clone, DoInitial, and Clear) for any necessary adjustments.

  5. Check the implementation of the new methods Execute in the InfoCmd and ConfigCmd classes. Ensure that they perform the intended actions correctly and consider any impact they may have on other parts of the code.

  6. Confirm that all added methods in the classes follow a consistent naming convention and adhere to the coding style guidelines of the project.

  7. If possible, conduct thorough testing after incorporating these changes to ensure the desired functionality is achieved and there are no regression issues.

Remember to consult project-specific coding guidelines, conventions, and best practices while reviewing the code.

Expand Down
41 changes: 37 additions & 4 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef PIKA_CLIENT_CONN_H_
#define PIKA_CLIENT_CONN_H_

#include <bitset>
#include <utility>

#include "include/pika_command.h"
Expand All @@ -24,6 +25,14 @@ class PikaClientConn : public net::RedisConn {
uint32_t slot_id;
};

struct TxnStateBitMask {
public:
static constexpr uint8_t Start = 0;
static constexpr uint8_t InitCmdFailed = 1;
static constexpr uint8_t WatchFailed = 2;
static constexpr uint8_t Execing = 3;
};

// Auth related
class AuthStat {
public:
Expand All @@ -42,7 +51,7 @@ class PikaClientConn : public net::RedisConn {

PikaClientConn(int fd, const std::string& ip_port, net::Thread* server_thread, net::NetMultiplexer* mpx,
const net::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() override = default;
~PikaClientConn() = default;

void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async,
std::string* response) override;
Expand All @@ -54,10 +63,30 @@ class PikaClientConn : public net::RedisConn {

bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentTable(const std::string& db_name) { current_db_ = db_name; }
const std::string& GetCurrentTable() override{ return current_db_; }
void SetCurrentDb(const std::string& db_name) { current_db_ = db_name; }
const std::string& GetCurrentTable() override { return current_db_; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }

// Txn
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
std::queue<std::shared_ptr<Cmd>> GetTxnCmdQue();
void ClearTxnCmdQue();
bool IsInTxn();
bool IsTxnFailed();
bool IsTxnInitFailed();
bool IsTxnWatchFailed();
bool IsTxnExecing(void);
void SetTxnWatchFailState(bool is_failed);
void SetTxnInitFailState(bool is_failed);
void SetTxnStartState(bool is_start);

void AddKeysToWatch(const std::vector<std::string> &db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string> &db_keys);
void SetAllTxnFailed();
void SetTxnFailedFromDBs(std::string db_name);
void ExitTxn();

net::ServerThread* server_thread() { return server_thread_; }

AuthStat& auth_stat() { return auth_stat_; }
Expand All @@ -70,14 +99,18 @@ class PikaClientConn : public net::RedisConn {
std::string current_db_;
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_ = false;
std::queue<std::shared_ptr<Cmd>> txn_cmd_que_;
std::bitset<16> txn_state_;
std::unordered_set<std::string> watched_db_keys_;
std::mutex txn_state_mu_;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code patch seems to add Redis transactions support for a PikaClientConn class. Here are some suggestions and improvements:

  • It seems that the destructor is overridden but not doing anything. It could be removed altogether as it does not seem to provide any additional functionality.
  • The TxnStateBitMask class containing static constants is related to transaction states, but the use of bit shifting is unclear. Enum values could be used instead.
  • There are utility methods for managing transactions (e.g., IsInTxn(), AddKeysToWatch()) in the PikaClientConn class, but there is no documentation explaining how they should be used or what their side effects might be. Adding docstrings would help improve code readability and maintainability.
  • The comment about the large txn_db_mu_ lock mentions that "there will be an issue with the order in which locks are acquired." This statement is not explained further, so users of the class may not understand the rationale behind the design decision or the potential for deadlock if the locking scheme is not followed properly.
  • It's difficult to determine whether there are any bugs without more context or code coverage information.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code review consists of the following points:

  • The code adds a new class TxnStateBitMask to manage transaction states, which seems like a reasonable and clean approach.
  • The default destructor is modified to be non-default in PikaClientConn, but no reason is given for this change, so it's hard to tell if it was necessary or not.
  • The use of std::queue<std::shared_ptr<Cmd>> for storing transaction commands seems appropriate for managing transactions.
  • The use of locks for txn_state_ and txn_db_mu_ also appears appropriate since these variables are shared between threads.
  • There are some incomplete comments, such as //TODO(leeHao): 将一个超时关闭的连接所watch的key给从全局的那个变量里面给清除掉 that need to be addressed.

Some possible improvements include providing more detailed comments, catching potential exceptional scenarios, adding unit tests, and implementing standard C++ exception handling to improve code robustness.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the code does not seem to contain any syntax or semantic errors. However, some suggestions for improvement are:

  1. The destructor of PikaClientConn is overriden but doesn't have any implementation. Since the parent class has a virtual destructor, it should be explicitly called in the child class destructor or its only declaration can be left as default.

  2. The use of raw locks and mutexes might lead to deadlocks if proper care is not taken. Consider using RAII wrappers like std::lock_guard or std::unique_lock to mitigate this risk.

  3. Some comments mention "db" but it's unclear what it refers to. Adding more descriptive comments and naming conventions would improve its readability and maintainability.

  4. The current design uses a queue and a bitset to implement a transaction. This may suffice, but there are more efficient and flexible ways to structure transactions, such as using a directed acyclic graph(DAG) to represent dependencies among operations within a transaction.

  5. The name PikaClientConn could be improved by following existing conventions in the project or industry standards.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some suggestions and improvements for the code:

  1. In the constructor PikaClientConn, initialize is_pubsub_ explicitly along with other member variables to improve readability.

  2. In the destructor ~PikaClientConn, use curly braces {} instead of default. Even though the default destructor is fine, using curly braces makes it explicit.

  3. Consider making the member function GetCurrentDb a const member function since it only retrieves the value and doesn't modify the object.

  4. Provide descriptive comments for the PushCmdToQue function and other functions in the Txn section to explain their purpose and usage.

  5. Consider using more descriptive variable names rather than abbreviations. For example, replace db_keys with databaseKeys or something similar. This will improve code understandability.

  6. Review the error handling and exception handling mechanisms in the code to ensure that all potential error scenarios are properly handled.

  7. Consider adding unit tests for the code, especially for the functions related to transaction processing, to ensure their correctness.

  8. Review the usage of locks and mutexes in the code to prevent any potential race conditions when accessing shared data structures, such as txn_cmd_que_ and txn_state_. Ensure that the locking mechanism is correctly implemented to avoid deadlocks.

These suggestions aim to improve the overall clarity, maintainability, and reliability of the code. It's important to thoroughly test and validate the code after implementing any changes to ensure its correctness.


void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t start_us, uint64_t do_duration);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared_ptr<std::string>& resp_ptr);
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr);
void TryWriteResp();

AuthStat auth_stat_;
Expand Down
33 changes: 28 additions & 5 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef PIKA_COMMAND_H_
#define PIKA_COMMAND_H_

#include <string>
#include <memory>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -196,6 +197,13 @@ const std::string kCmdNameSDiffstore = "sdiffstore";
const std::string kCmdNameSMove = "smove";
const std::string kCmdNameSRandmember = "srandmember";

// transation
const std::string kCmdNameMulti = "multi";
const std::string kCmdNameExec = "exec";
const std::string kCmdNameDiscard = "discard";
const std::string kCmdNameWatch = "watch";
const std::string kCmdNameUnWatch = "unwatch";

// HyperLogLog
const std::string kCmdNamePfAdd = "pfadd";
const std::string kCmdNamePfCount = "pfcount";
Expand Down Expand Up @@ -296,6 +304,9 @@ class CmdRes {
kInconsistentHashTag,
kErrOther,
KIncrByOverFlow,
kInvalidTransaction,
kTxnQueued,
kTxnAbort,
};

CmdRes() = default;
Expand Down Expand Up @@ -367,6 +378,17 @@ class CmdRes {
result.append(message_);
result.append("'\r\n");
break;
case kInvalidTransaction:
return "-ERR WATCH inside MULTI is not allowed\r\n";
case kTxnQueued:
result = "+QUEUED";
result.append("\r\n");
break;
case kTxnAbort:
result = "-EXECABORT ";
result.append(message_);
result.append(kNewLine);
break;
case kErrOther:
result = "-ERR ";
result.append(message_);
Expand Down Expand Up @@ -401,7 +423,9 @@ class CmdRes {
message_ = content;
}
}

CmdRet GetCmdRet() const {
return ret_;
}
private:
std::string message_;
CmdRet ret_ = kNone;
Expand Down Expand Up @@ -445,11 +469,8 @@ class Cmd : public std::enable_shared_from_this<Cmd> {

virtual std::vector<std::string> current_key() const;
virtual void Execute();
virtual void ProcessFlushDBCmd();
virtual void ProcessFlushAllCmd();
virtual void ProcessSingleSlotCmd();
virtual void ProcessMultiSlotCmd();
virtual void ProcessDoNotSpecifySlotCmd();
virtual void Do(std::shared_ptr<Slot> slot = nullptr) = 0;
virtual Cmd* Clone() = 0;
// used for execute multikey command into different slots
Expand All @@ -468,6 +489,8 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool is_multi_slot() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };
void SetDbName(const std::string& db_name) { db_name_ = db_name; }
std::string GetDBName() { return db_name_; }

std::string name() const;
CmdRes& res();
Expand Down Expand Up @@ -505,7 +528,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
protected:
CmdRes res_;
PikaCmdArgsType argv_;
std::string db_name_;
std::string db_name_{};

std::weak_ptr<net::NetConn> conn_;
std::weak_ptr<std::string> resp_;
Expand Down
3 changes: 3 additions & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
friend class InfoCmd;
friend class PkClusterInfoCmd;
friend class PikaServer;
friend class ExecCmd;
friend class FlushdbCmd;
friend class FlushallCmd;

std::string GetDBName();
void BgSaveDB();
Expand Down
1 change: 0 additions & 1 deletion include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class SetCmd : public Cmd {
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new SetCmd(*this); }

private:
std::string key_;
std::string value_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ class RPopLPushCmd : public BlockingBaseCmd {
}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(source_);
res.push_back(receiver_);
res.push_back(source_);
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
Expand Down
3 changes: 3 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ class PikaReplicaManager {
~PikaReplicaManager() = default;

friend Cmd;
friend class FlushdbCmd;
friend class FlushallCmd;
friend class ExecCmd;

void Start();
void Stop();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the provided code patch, it appears that you have added several friend classes to the PikaReplicaManager class. This grants those classes access to the private and protected members of PikaReplicaManager. This change seems reasonable assuming that you have a valid reason for doing so.

Without more context or the complete codebase, it is challenging to provide an extensive code review. However, based on the provided information, here are a few general suggestions:

  1. Documentation: Ensure that the purpose and behavior of the PikaReplicaManager class and its associated functions (Start(), Stop(), etc.) are adequately documented. Clear documentation will help improve code maintainability.
  2. Error Handling: Review the error handling mechanism in place. It's essential to handle potential errors gracefully and provide appropriate feedback or recovery mechanisms where needed.
  3. Testing: It's good practice to have unit tests covering the functionality of PikaReplicaManager and any associated classes. Automated tests can help catch bugs and ensure that changes made to the codebase do not introduce new issues.
  4. Code Organization: Check if there is an opportunity to refactor the code for better organization and readability. Aim for clear and concise functions that follow established coding standards and best practices.

Remember, a thorough code review often requires a comprehensive understanding of the entire codebase. Consider involving other developers familiar with the project to perform a more detailed and contextualized review.

Expand Down
5 changes: 4 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
#include "include/pika_migrate_thread.h"
#include "include/rsync_server.h"
#include "include/pika_statistic.h"
#include "include/pika_slot_command.h"
#include "include/pika_migrate_thread.h"
#include "include/pika_transaction.h"
#include "include/pika_cmd_table_manager.h"


Expand Down Expand Up @@ -506,6 +507,8 @@ class PikaServer : public pstd::noncopyable {
friend class InfoCmd;
friend class PikaReplClientConn;
friend class PkClusterInfoCmd;
friend class FlushallCmd;
friend class ExecCmd;

private:
/*
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the code patch you provided, here are a few observations:

  1. In line 38, there seems to be a redundant inclusion of "pika_migrate_thread.h" since it's already included in line 11.

  2. In line 39, there is an additional inclusion of "pika_transaction.h". Make sure it is necessary and doesn't conflict with existing code.

  3. It seems like classes named "FlushallCmd" and "ExecCmd" are being declared. Double-check if they are properly defined elsewhere.

  4. Since this code snippet only provides the inclusion part and not the definition or implementation, it's challenging to identify specific bugs or improvements. A more thorough review would require examining the logic and functionality of the classes and functions involved.

Please provide more details or specific sections of the code if you would like a more comprehensive review.

Expand Down
2 changes: 2 additions & 0 deletions include/pika_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
// FlushDB & FlushSubDB use
bool FlushDB();
bool FlushSubDB(const std::string& db_name);
bool FlushDBWithoutLock();
bool FlushSubDBWithoutLock(const std::string& db_name);

// key scan info use
pstd::Status GetKeyNum(std::vector<storage::KeyInfo>* key_info);
Expand Down
Loading