Skip to content

Commit

Permalink
Merge branch 'new_log_playback_to_latest' of https://github.com/panle…
Browse files Browse the repository at this point in the history
…i-coder/pikiwidb into new_log_playback_to_latest
  • Loading branch information
panlei-coder committed Jun 4, 2024
2 parents 3f8ae41 + 42d7d0a commit dcdf232
Show file tree
Hide file tree
Showing 24 changed files with 745 additions and 49 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Run this command, compare with redis use pipeline commands, try it.
- sadd scard srem sismember smembers sdiff sdiffstore sinter sinterstore sunion sunionstore smove spop srandmember sscan

#### sorted set commands
- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore
- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore zpopmin zpopmax zunionstore zinterstore

#### pubsub commands
- subscribe unsubscribe publish psubscribe punsubscribe pubsub
Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ RocksDB 可以配置为 PikiwiDB 的持久化存储引擎,可以存储更多

#### sorted set commands

- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore
- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore zpopmin zpopmax zunionstore zinterstore

#### pubsub commands

Expand Down
6 changes: 5 additions & 1 deletion cmake/braft.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ ExternalProject_Add(
extern_braft
${EXTERNAL_PROJECT_LOG_ARGS}
DEPENDS brpc
# The pr on braft is not merged, so I am using my own warehouse to run the test for the time being
GIT_REPOSITORY https://github.com/panlei-coder/braft.git
GIT_TAG merge-master-playback
# GIT_REPOSITORY "https://github.com/pikiwidb/braft.git"
# GIT_TAG master
GIT_SHALLOW true
PREFIX ${BRAFT_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
Expand All @@ -38,7 +42,7 @@ ExternalProject_Add(
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
BUILD_IN_SOURCE 1
BUILD_COMMAND $(MAKE) -j ${CPU_CORE} braft-static
INSTALL_COMMAND mkdir -p ${BRAFT_INSTALL_DIR}/lib/ COMMAND cp ${BRAFT_SOURCES_DIR}/src/extern_braft/output/lib/libbraft.a ${BRAFT_LIBRARIES} COMMAND cp -r ${BRAFT_SOURCES_DIR}/src/extern_braft/output/include ${BRAFT_INCLUDE_DIR}/
INSTALL_COMMAND mkdir -p ${BRAFT_INSTALL_DIR}/lib/ COMMAND cp ${BRAFT_SOURCES_DIR}/src/extern_braft/output/lib/libbraft.a ${BRAFT_LIBRARIES} COMMAND rm -rf ${BRAFT_INCLUDE_DIR} COMMAND cp -r ${BRAFT_SOURCES_DIR}/src/extern_braft/output/include ${BRAFT_INCLUDE_DIR}
)
ADD_DEPENDENCIES(extern_braft brpc)
ADD_LIBRARY(braft STATIC IMPORTED GLOBAL)
Expand Down
1 change: 0 additions & 1 deletion cmake/brpc.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ EXTERNALPROJECT_ADD(
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
BUILD_IN_SOURCE 1
BUILD_COMMAND $(MAKE) -j ${CPU_CORE} brpc-static
INSTALL_COMMAND mkdir -p ${BRPC_INSTALL_DIR}/lib/ COMMAND cp ${BRPC_SOURCES_DIR}/src/extern_brpc/output/lib/libbrpc.a ${BRPC_LIBRARIES} COMMAND cp -r ${BRPC_SOURCES_DIR}/src/extern_brpc/output/include ${BRPC_INCLUDE_DIR}/
)
ADD_DEPENDENCIES(extern_brpc ssl crypto zlib protobuf leveldb gflags)
ADD_LIBRARY(brpc STATIC IMPORTED GLOBAL)
Expand Down
11 changes: 8 additions & 3 deletions cmake/gflags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

INCLUDE_GUARD()

SET(GFLAGS_BUILD_TYPE "Release")
SET(CMAKE_BUILD_TYPE ${GFLAGS_BUILD_TYPE})

FetchContent_Declare(gflags
URL https://github.com/gflags/gflags/archive/v2.2.2.zip
URL_HASH SHA256=19713a36c9f32b33df59d1c79b4958434cb005b5b47dc5400a7a4b078111d9b5
Expand All @@ -21,6 +24,8 @@ FIND_PACKAGE(Threads REQUIRED)

TARGET_LINK_LIBRARIES(gflags_static Threads::Threads)

SET(GFLAGS_INCLUDE_PATH ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/include)
SET(GFLAGS_LIBRARY ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a)
SET(GFLAGS_LIB ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a)
SET(GFLAGS_INCLUDE_PATH ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/include CACHE PATH "" FORCE)
SET(GFLAGS_LIBRARY ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a CACHE PATH "" FORCE)
SET(GFLAGS_LIB ${CMAKE_CURRENT_BINARY_DIR}/_deps/gflags-build/libgflags.a CACHE PATH "" FORCE)

SET(CMAKE_BUILD_TYPE ${THIRD_PARTY_BUILD_TYPE})
1 change: 1 addition & 0 deletions cmake/zlib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ExternalProject_Add(
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/madler/zlib.git"
GIT_TAG "v1.2.8"
GIT_SHALLOW true
PREFIX ${ZLIB_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
Expand Down
4 changes: 4 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ const std::string kCmdNameRPoplpush = "rpoplpush";

// zset cmd
const std::string kCmdNameZAdd = "zadd";
const std::string kCmdNameZPopMin = "zpopmin";
const std::string kCmdNameZPopMax = "zpopmax";
const std::string kCmdNameZInterstore = "zinterstore";
const std::string kCmdNameZUnionstore = "zunionstore";
const std::string kCmdNameZRevrange = "zrevrange";
const std::string kCmdNameZRangebyscore = "zrangebyscore";
const std::string kCmdNameZRemrangebyscore = "zremrangebyscore";
Expand Down
63 changes: 60 additions & 3 deletions src/cmd_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,72 @@ void GetCmd::DoCmd(PClient* client) {
SetCmd::SetCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryString) {}

// SET key value [NX | XX] [EX seconds | PX milliseconds]
bool SetCmd::DoInitial(PClient* client) {
client->SetKey(client->argv_[1]);

auto argv_ = client->argv_;
value_ = argv_[2];
condition_ = SetCmd::kNONE;
sec_ = 0;
size_t index = 3;

while (index != argv_.size()) {
std::string opt = argv_[index];
if (strcasecmp(opt.data(), "xx") == 0) {
condition_ = SetCmd::kXX;
} else if (strcasecmp(opt.data(), "nx") == 0) {
condition_ = SetCmd::kNX;
} else if ((strcasecmp(opt.data(), "ex") == 0) || (strcasecmp(opt.data(), "px") == 0)) {
condition_ = (condition_ == SetCmd::kNONE) ? SetCmd::kEXORPX : condition_;
index++;
if (index == argv_.size()) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
if (pstd::String2int(argv_[index].data(), argv_[index].size(), &sec_) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return false;
}

if (strcasecmp(opt.data(), "px") == 0) {
sec_ /= 1000;
}
} else {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
index++;
}

return true;
}

void SetCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Set(client->Key(), client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
int32_t res = 1;
storage::Status s;
auto key_ = client->Key();
switch (condition_) {
case SetCmd::kXX:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Setxx(key_, value_, &res, sec_);
break;
case SetCmd::kNX:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Setnx(key_, value_, &res, sec_);
break;
case SetCmd::kEXORPX:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Setex(key_, value_, sec_);
break;
default:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Set(key_, value_);
break;
}

if (s.ok() || s.IsNotFound()) {
if (res == 1) {
client->SetRes(CmdRes::kOK);
} else {
client->AppendStringLen(-1);
}
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
Expand Down
6 changes: 6 additions & 0 deletions src/cmd_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ class GetCmd : public BaseCmd {

class SetCmd : public BaseCmd {
public:
enum SetCondition { kNONE, kNX, kXX, kEXORPX };
SetCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;

std::string value_;
std::string target_;
int64_t sec_ = 0;
SetCmd::SetCondition condition_{kNONE};
};

class BitOpCmd : public BaseCmd {
Expand Down
4 changes: 4 additions & 0 deletions src/cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ void CmdTableManager::InitCmdTable() {

// zset
ADD_COMMAND(ZAdd, -4);
ADD_COMMAND(ZPopMin, -2);
ADD_COMMAND(ZPopMax, -2);
ADD_COMMAND(ZInterstore, -4);
ADD_COMMAND(ZUnionstore, -4);
ADD_COMMAND(ZRevrange, -4);
ADD_COMMAND(ZRangebyscore, -4);
ADD_COMMAND(ZRemrangebyscore, 4);
Expand Down
181 changes: 181 additions & 0 deletions src/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,187 @@ void ZAddCmd::DoCmd(PClient* client) {
}
}

ZPopMinCmd::ZPopMinCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {}

bool ZPopMinCmd::DoInitial(PClient* client) {
if (client->argv_.size() > 3) {
client->SetRes(CmdRes::kWrongNum, client->CmdName());
return false;
}

client->SetKey(client->argv_[1]);
return true;
}

void ZPopMinCmd::DoCmd(PClient* client) {
int32_t count = 1;
if (client->argv_.size() == 3) {
if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}
}

std::vector<storage::ScoreMember> score_members;
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMin(client->Key(), count, &score_members);
if (s.ok()) {
char buf[32];
int64_t len = 0;
client->AppendArrayLen(static_cast<int64_t>(score_members.size()) * 2);
for (auto& score_member : score_members) {
client->AppendStringLenUint64(score_member.member.size());
client->AppendContent(score_member.member);
len = pstd::D2string(buf, sizeof(buf), score_member.score);
client->AppendStringLen(len);
client->AppendContent(buf);
}
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZPopMaxCmd::ZPopMaxCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {}

bool ZPopMaxCmd::DoInitial(PClient* client) {
if (client->argv_.size() > 3) {
client->SetRes(CmdRes::kWrongNum, client->CmdName());
return false;
}

client->SetKey(client->argv_[1]);
return true;
}

void ZPopMaxCmd::DoCmd(PClient* client) {
int32_t count = 1;
if (client->argv_.size() == 3) {
if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}
}

std::vector<storage::ScoreMember> score_members;
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMax(client->Key(), count, &score_members);
if (s.ok()) {
char buf[32];
int64_t len = 0;
client->AppendArrayLen(static_cast<int64_t>(score_members.size()) * 2);
for (auto& score_member : score_members) {
client->AppendStringLenUint64(score_member.member.size());
client->AppendContent(score_member.member);
len = pstd::D2string(buf, sizeof(buf), score_member.score);
client->AppendStringLen(len);
client->AppendContent(buf);
}
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZsetUIstoreParentCmd::ZsetUIstoreParentCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {}

// ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE <SUM | MIN | MAX>]
// ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE <SUM | MIN | MAX>]
bool ZsetUIstoreParentCmd::DoInitial(PClient* client) {
auto argv_ = client->argv_;
dest_key_ = argv_[1];
if (pstd::String2int(argv_[2].data(), argv_[2].size(), &num_keys_) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return false;
}
if (num_keys_ < 1) {
client->SetRes(CmdRes::kErrOther, "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
return false;
}
auto argc = argv_.size();
if (argc < num_keys_ + 3) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
keys_.assign(argv_.begin() + 3, argv_.begin() + 3 + num_keys_);
weights_.assign(num_keys_, 1);
auto index = num_keys_ + 3;
while (index < argc) {
if (strcasecmp(argv_[index].data(), "weights") == 0) {
index++;
if (argc < index + num_keys_) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
double weight;
auto base = index;
for (; index < base + num_keys_; index++) {
if (pstd::String2d(argv_[index].data(), argv_[index].size(), &weight) == 0) {
client->SetRes(CmdRes::kErrOther, "weight value is not a float");
return false;
}
weights_[index - base] = weight;
}
} else if (strcasecmp(argv_[index].data(), "aggregate") == 0) {
index++;
if (argc < index + 1) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
if (strcasecmp(argv_[index].data(), "sum") == 0) {
aggregate_ = storage::SUM;
} else if (strcasecmp(argv_[index].data(), "min") == 0) {
aggregate_ = storage::MIN;
} else if (strcasecmp(argv_[index].data(), "max") == 0) {
aggregate_ = storage::MAX;
} else {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
index++;
} else {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
}
return true;
}

ZInterstoreCmd::ZInterstoreCmd(const std::string& name, int16_t arity) : ZsetUIstoreParentCmd(name, arity) {}

bool ZInterstoreCmd::DoInitial(PClient* client) { return ZsetUIstoreParentCmd::DoInitial(client); }

void ZInterstoreCmd::DoCmd(PClient* client) {
int32_t count = 0;
std::vector<storage::ScoreMember> value_to_dest_;
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->ZInterstore(dest_key_, keys_, weights_, aggregate_, value_to_dest_, &count);
if (s.ok()) {
client->AppendInteger(count);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZUnionstoreCmd::ZUnionstoreCmd(const std::string& name, int16_t arity) : ZsetUIstoreParentCmd(name, arity) {}

bool ZUnionstoreCmd::DoInitial(PClient* client) { return ZsetUIstoreParentCmd::DoInitial(client); }

void ZUnionstoreCmd::DoCmd(PClient* client) {
int32_t count = 0;
std::map<std::string, double> value_to_dest;
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->ZUnionstore(dest_key_, keys_, weights_, aggregate_, value_to_dest, &count);
if (s.ok()) {
client->AppendInteger(count);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZRevrangeCmd::ZRevrangeCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySortedSet) {}

Expand Down
Loading

0 comments on commit dcdf232

Please sign in to comment.