Skip to content

Commit

Permalink
Merge branch 'apache:unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
p1u3o authored Nov 20, 2023
2 parents 754044e + 358cc41 commit 8e6ea24
Show file tree
Hide file tree
Showing 29 changed files with 1,294 additions and 77 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/kvrocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,19 @@ jobs:
tags: kvrocks:ci
outputs: type=docker
- name: Test built image
run: docker run --rm kvrocks:ci -v
run: |
docker run --rm kvrocks:ci -v
ID="$(docker run --rm -d -p 6666:6666 kvrocks:ci)"
sleep 1m
if [ "$(docker inspect --format='{{.State.Health.Status}}' $ID)" != "healthy" ]; then
echo "The container is not healthy."
exit 1
fi
if [ "$(ss --listening --no-header --tcp '( sport = :6666 )')" == "" ]; then
echo "The container listening port can not be accessed from outside."
exit 1
fi
docker stop $ID
build-and-test-in-container:
name: Build and test in container
Expand Down
5 changes: 3 additions & 2 deletions cmake/tbb.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(tbb
oneapi-src/oneTBB v2021.10.0
MD5=ed595f9b088e8ffbd612e183dd6811d3
oneapi-src/oneTBB v2021.11.0
MD5=eea2bdc5ae0a51389da27480617ccff9
)

FetchContent_MakeAvailableWithArgs(tbb
TBB_STRICT=OFF
TBB_TEST=OFF
TBB_EXAMPLES=OFF
TBBMALLOC_BUILD=OFF
Expand Down
13 changes: 5 additions & 8 deletions src/cli/daemon_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,20 @@ inline bool SupervisedSystemd() {
return false;
}

sockaddr_un su;
memset(&su, 0, sizeof(su));
sockaddr_un su = {};
su.sun_family = AF_UNIX;
strncpy(su.sun_path, notify_socket, sizeof(su.sun_path) - 1);
su.sun_path[sizeof(su.sun_path) - 1] = '\0';
if (notify_socket[0] == '@') su.sun_path[0] = '\0';

iovec iov;
memset(&iov, 0, sizeof(iov));
iovec iov = {};
std::string ready = "READY=1";
iov.iov_base = &ready[0];
iov.iov_base = ready.data();
iov.iov_len = ready.size();

msghdr hdr;
memset(&hdr, 0, sizeof(hdr));
msghdr hdr = {};
hdr.msg_name = &su;
hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) + strlen(notify_socket);
hdr.msg_namelen = offsetof(sockaddr_un, sun_path) + strlen(su.sun_path);
hdr.msg_iov = &iov;
hdr.msg_iovlen = 1;

Expand Down
1 change: 0 additions & 1 deletion src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class BlockingCommander : public Commander,
// in other words, returning true indicates ending the blocking
virtual bool OnBlockingWrite() = 0;

bool IsBlocking() const override { return true; }
// to start the blocking process
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
Expand Down
194 changes: 193 additions & 1 deletion src/commands/cmd_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

#include "commander.h"
#include "commands/command_parser.h"
#include "commands/error_constants.h"
#include "error_constants.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "storage/redis_metadata.h"
Expand Down Expand Up @@ -142,6 +144,47 @@ class CommandJsonArrAppend : public Commander {
}
};

class CommandJsonArrInsert : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<int>(args[3], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}

index_ = *parse_result;
return Commander::Parse(args);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Json json(srv->storage, conn->GetNamespace());

std::vector<std::optional<uint64_t>> result_count;
auto parse_result = ParseInt<int>(args_[3], 10);

auto s = json.ArrInsert(args_[1], args_[2], index_, {args_.begin() + 4, args_.end()}, &result_count);
if (s.IsNotFound()) {
*output = redis::NilString();
return Status::OK();
}
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(result_count.size());
for (auto c : result_count) {
if (c.has_value()) {
*output += redis::Integer(c.value());
} else {
*output += redis::NilString();
}
}

return Status::OK();
}

private:
int index_;
};

class CommandJsonType : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -219,6 +262,35 @@ class CommandJsonClear : public Commander {
}
};

class CommandJsonToggle : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::Json json(svr->storage, conn->GetNamespace());

std::string path = (args_.size() > 2) ? args_[2] : "$";
std::vector<std::optional<bool>> results;
auto s = json.Toggle(args_[1], path, results);

if (s.IsNotFound()) {
*output = redis::NilString();
return Status::OK();
}

*output = redis::MultiLen(results.size());
for (auto it = results.rbegin(); it != results.rend(); ++it) {
if (it->has_value()) {
*output += redis::Integer(it->value());
} else {
*output += redis::NilString();
}
}

if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

return Status::OK();
}
};

class CommandJsonArrLen : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -252,6 +324,32 @@ class CommandJsonArrLen : public Commander {
}
};

class CommandJsonMerge : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::Json json(svr->storage, conn->GetNamespace());

std::string key = args_[1];
std::string path = args_[2];
std::string value = args_[3];
bool result = false;

auto s = json.Merge(key, path, value, result);

if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (!result) {
*output = redis::NilString();
} else {
*output = redis::SimpleString("OK");
}

return Status::OK();
}
};

class CommandJsonArrPop : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -291,14 +389,108 @@ class CommandJsonArrPop : public Commander {
int64_t index_ = -1;
};

class CommandJsonArrTrim : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
path_ = args_[2];
start_ = GET_OR_RET(ParseInt<int64_t>(args_[3], 10));
stop_ = GET_OR_RET(ParseInt<int64_t>(args_[4], 10));

return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Json json(srv->storage, conn->GetNamespace());

std::vector<std::optional<uint64_t>> results;

auto s = json.ArrTrim(args_[1], path_, start_, stop_, results);

if (s.IsNotFound()) {
*output = redis::NilString();
return Status::OK();
}
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(results.size());
for (const auto &len : results) {
if (len.has_value()) {
*output += redis::Integer(len.value());
} else {
*output += redis::NilString();
}
}

return Status::OK();
}

private:
std::string path_;
int64_t start_ = 0;
int64_t stop_ = 0;
};

class CommanderJsonArrIndex : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() > 6) {
return {Status::RedisExecErr, errWrongNumOfArguments};
}
start_ = 0;
end_ = std::numeric_limits<ssize_t>::max();

if (args.size() > 4) {
start_ = GET_OR_RET(ParseInt<ssize_t>(args[4], 10));
}
if (args.size() > 5) {
end_ = GET_OR_RET(ParseInt<ssize_t>(args[5], 10));
}
return Status::OK();
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::Json json(svr->storage, conn->GetNamespace());

std::vector<ssize_t> result;

auto s = json.ArrIndex(args_[1], args_[2], args_[3], start_, end_, &result);

if (s.IsNotFound()) {
*output = redis::NilString();
return Status::OK();
}

if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(result.size());
for (const auto &found_index : result) {
if (found_index == NOT_ARRAY) {
*output += redis::NilString();
continue;
}
*output += redis::Integer(found_index);
}
return Status::OK();
}

private:
ssize_t start_;
ssize_t end_;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandJsonSet>("json.set", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandJsonGet>("json.get", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandJsonInfo>("json.info", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandJsonType>("json.type", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandJsonArrAppend>("json.arrappend", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandJsonArrInsert>("json.arrinsert", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandJsonArrTrim>("json.arrtrim", 5, "write", 1, 1, 1),
MakeCmdAttr<CommandJsonClear>("json.clear", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandJsonToggle>("json.toggle", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandJsonArrLen>("json.arrlen", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandJsonMerge>("json.merge", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandJsonObjkeys>("json.objkeys", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandJsonArrPop>("json.arrpop", -2, "write", 1, 1, 1), );
MakeCmdAttr<CommandJsonArrPop>("json.arrpop", -2, "write", 1, 1, 1),
MakeCmdAttr<CommanderJsonArrIndex>("json.arrindex", -4, "read-only", 1, 1, 1), );

} // namespace redis
2 changes: 0 additions & 2 deletions src/commands/cmd_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ void SubscribeCommandReply(std::string *output, const std::string &name, const s

class CommandSubscribe : public Commander {
public:
bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (unsigned i = 1; i < args_.size(); i++) {
conn->SubscribeChannel(args_[i]);
Expand Down Expand Up @@ -112,7 +111,6 @@ class CommandUnSubscribe : public Commander {

class CommandPSubscribe : public Commander {
public:
bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (size_t i = 1; i < args_.size(); i++) {
conn->PSubscribeChannel(args_[i]);
Expand Down
Loading

0 comments on commit 8e6ea24

Please sign in to comment.