Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of the SORT command #2262

Merged
merged 28 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6238f8d
feat: support for the sort command[draft]
PokIsemaine Apr 8, 2024
760bba8
fix: delete some comments
PokIsemaine Apr 8, 2024
04f549d
feat: support sort_ro
PokIsemaine Apr 12, 2024
2a77348
style: clang format
PokIsemaine Apr 12, 2024
bcf78d4
style: golangci-lint
PokIsemaine Apr 12, 2024
dd4c638
fix: sorting a set with no sort specified and TODO
PokIsemaine Apr 13, 2024
db94c8d
feat: sorting a set with no sort specified testcase
PokIsemaine Apr 13, 2024
d70aa4a
fix: TestZsetSort
PokIsemaine Apr 13, 2024
d63bab8
fix: TestZsetSort
PokIsemaine Apr 13, 2024
90d970d
refactor: move cmd_sort to cmd_key
PokIsemaine Apr 21, 2024
6ce9dfc
fix: wrong typo "unknown"
PokIsemaine Apr 21, 2024
4d51525
Merge branch 'apache:unstable' into unstable
PokIsemaine Apr 21, 2024
bbd2549
fix: SortResult
PokIsemaine Apr 21, 2024
84d4a02
docs: add doc strings for SortCompare
PokIsemaine Apr 22, 2024
cdba915
docs: fix typo
PokIsemaine Apr 22, 2024
5c8ce9e
Merge branch 'unstable' into unstable
jihuayu Apr 23, 2024
6b8880d
refactor: refactor the code based on review suggestions
PokIsemaine Apr 24, 2024
75b3e4c
fix: sort in case of get empty and add test
PokIsemaine Apr 26, 2024
b2e728b
fix: clang-tidy
PokIsemaine Apr 26, 2024
866fe85
fix: clang-tidy
PokIsemaine Apr 26, 2024
1b6d115
fix: distinguish between nil and empty string
PokIsemaine Apr 27, 2024
2c3ae39
refactor: use move_iterator
PokIsemaine May 4, 2024
85e572d
Merge branch 'unstable' into unstable
git-hulk May 4, 2024
7667559
refactor: remove move_iterator on vector<Slice>
PokIsemaine May 4, 2024
b7d7f68
fix: Return => Returns
PokIsemaine May 6, 2024
6cfa198
refactor: modify code according to reviewer suggestions
PokIsemaine May 6, 2024
a424037
Merge branch 'unstable' into unstable
mapleFU May 6, 2024
8785d45
Merge branch 'unstable' into unstable
mapleFU May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 108 additions & 1 deletion src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,111 @@ class CommandCopy : public Commander {
bool replace_ = false;
};

template <bool ReadOnly>
class CommandSort : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 2);
while (parser.Good()) {
if (parser.EatEqICase("BY")) {
if (parser.Remains() < 1) {
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
return parser.InvalidSyntax();
}
sort_argument_.sortby = GET_OR_RET(parser.TakeStr());
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved

if (sort_argument_.sortby.find('*') == std::string::npos) {
sort_argument_.dontsort = true;
} else {
/* TODO:
* If BY is specified with a real pattern, we can't accept it in cluster mode,
* unless we can make sure the keys formed by the pattern are in the same slot
* as the key to sort.
* If BY is specified with a real pattern, we can't accept
* it if no full ACL key access is applied for this command. */
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (parser.EatEqICase("LIMIT")) {
if (parser.Remains() < 2) {
return parser.InvalidSyntax();
}
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
sort_argument_.offset = GET_OR_RET(parser.template TakeInt<int>());
sort_argument_.count = GET_OR_RET(parser.template TakeInt<int>());
} else if (parser.EatEqICase("GET") && parser.Remains() >= 1) {
if (parser.Remains() < 1) {
return parser.InvalidSyntax();
}
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
/* TODO:
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
* If GET is specified with a real pattern, we can't accept it in cluster mode,
* unless we can make sure the keys formed by the pattern are in the same slot
* as the key to sort. */
sort_argument_.getpatterns.push_back(GET_OR_RET(parser.TakeStr()));
} else if (parser.EatEqICase("ASC")) {
sort_argument_.desc = false;
} else if (parser.EatEqICase("DESC")) {
sort_argument_.desc = true;
} else if (parser.EatEqICase("ALPHA")) {
sort_argument_.alpha = true;
} else if (parser.EatEqICase("STORE")) {
if constexpr (ReadOnly) {
return parser.InvalidSyntax();
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
}
if (parser.Remains() < 1) {
return parser.InvalidSyntax();
}
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
sort_argument_.storekey = GET_OR_RET(parser.TakeStr());
} else {
return parser.InvalidSyntax();
}
}

return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
RedisType type = kRedisNone;
auto s = redis.Type(args_[1], &type);
if (s.ok()) {
if (type >= RedisTypeNames.size()) {
return {Status::RedisExecErr, "Invalid type"};
} else if (type != RedisType::kRedisList && type != RedisType::kRedisSet && type != RedisType::kRedisZSet) {
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
*output = Error("WRONGTYPE Operation against a key holding the wrong kind of value");
return Status::OK();
}
} else {
return {Status::RedisExecErr, s.ToString()};
}

std::vector<std::string> output_vec;
Database::SortResult res = Database::SortResult::DONE;
s = redis.Sort(type, args_[1], sort_argument_, conn->GetProtocolVersion(), &output_vec, &res);

if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

switch (res) {
case Database::SortResult::UNKNOWN_TYPE:
*output = redis::Error("Unknown Type");
break;
case Database::SortResult::DOUBLE_CONVERT_ERROR:
*output = redis::Error("One or more scores can't be converted into double");
break;
case Database::SortResult::DONE:
if (sort_argument_.storekey.empty()) {
*output = ArrayOfBulkStrings(output_vec);
} else {
*output = Integer(output_vec.size());
}
break;
}

return Status::OK();
}

private:
SortArgument sort_argument_;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandPTTL>("pttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandType>("type", 2, "read-only", 1, 1, 1),
Expand All @@ -442,6 +547,8 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandDel>("unlink", -2, "write no-dbsize-check", 1, -1, 1),
MakeCmdAttr<CommandRename>("rename", 3, "write", 1, 2, 1),
MakeCmdAttr<CommandRenameNX>("renamenx", 3, "write", 1, 2, 1),
MakeCmdAttr<CommandCopy>("copy", -3, "write", 1, 2, 1), )
MakeCmdAttr<CommandCopy>("copy", -3, "write", 1, 2, 1),
MakeCmdAttr<CommandSort<false>>("sort", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandSort<true>>("sort_ro", -2, "read-only", 1, 1, 1))

} // namespace redis
209 changes: 209 additions & 0 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "time_util.h"
#include "types/redis_hash.h"
#include "types/redis_list.h"
#include "types/redis_set.h"
#include "types/redis_string.h"
#include "types/redis_zset.h"

namespace redis {

Expand Down Expand Up @@ -777,4 +782,208 @@ rocksdb::Status Database::Copy(const std::string &key, const std::string &new_ke
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

std::string Database::lookupKeyByPattern(const std::string &pattern, const std::string &subst) {
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
if (pattern == "#") {
return subst;
}

auto match_pos = pattern.find('*');
if (match_pos == std::string::npos) {
return "";
}

// hash field
std::string field;
auto arrow_pos = pattern.find("->", match_pos + 1);
if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
field = pattern.substr(arrow_pos + 2);
}

std::string key = pattern.substr(0, match_pos + 1);
key.replace(match_pos, 1, subst);

std::string value;
if (!field.empty()) {
auto hash_db = redis::Hash(storage_, namespace_);
RedisType type = RedisType::kRedisNone;
if (auto s = hash_db.Type(key, &type); !s.ok() || type >= RedisTypeNames.size()) {
return "";
}

hash_db.Get(key, field, &value);
} else {
auto string_db = redis::String(storage_, namespace_);
RedisType type = RedisType::kRedisNone;
if (auto s = string_db.Type(key, &type); !s.ok() || type >= RedisTypeNames.size()) {
return "";
}
string_db.Get(key, &value);
}
return value;
}

rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, SortArgument &args, const RESP &version,
std::vector<std::string> *output_vec, SortResult *res) {
/* When sorting a set with no sort specified, we must sort the output
* so the result is consistent across scripting and replication.
*
* The other types (list, sorted set) will retain their native order
* even if no sort order is requested, so they remain stable across
* scripting and replication.
*
* TODO: support CLIENT_SCRIPT flag, (!storekey_.empty() || c->flags & CLIENT_SCRIPT)) */
if (args.dontsort && type == RedisType::kRedisSet && (!args.storekey.empty())) {
/* Force ALPHA sorting */
args.dontsort = false;
args.alpha = true;
args.sortby = "";
}
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved

// Obtain the length of the object to sort.
const std::string ns_key = AppendNamespacePrefix(key);
Metadata metadata(type, false);
auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
if (!s.ok()) {
return s;
}

int vectorlen = (int)metadata.size;
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved

// Adjust the offset and count of the limit
int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, vectorlen - 1);
int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, vectorlen - offset);
if (count == -1) count = vectorlen - offset;
mapleFU marked this conversation as resolved.
Show resolved Hide resolved

// Get the elements that need to be sorted
std::vector<std::string> str_vec;
if (count != 0) {
if (type == RedisType::kRedisList) {
auto list_db = redis::List(storage_, namespace_);

if (args.dontsort) {
if (args.desc) {
list_db.Range(key, -count - offset, -1 - offset, &str_vec);
std::reverse(str_vec.begin(), str_vec.end());
} else {
list_db.Range(key, offset, offset + count - 1, &str_vec);
}
} else {
list_db.Range(key, 0, -1, &str_vec);
}
} else if (type == RedisType::kRedisSet) {
auto set_db = redis::Set(storage_, namespace_);
set_db.Members(key, &str_vec);

if (args.dontsort) {
str_vec = std::vector(str_vec.begin() + offset, str_vec.begin() + offset + count);
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (type == RedisType::kRedisZSet) {
auto zset_db = redis::ZSet(storage_, namespace_);
std::vector<MemberScore> member_scores;

if (args.dontsort) {
RangeRankSpec spec;
spec.start = offset;
spec.stop = offset + count - 1;
spec.reversed = args.desc;
zset_db.RangeByRank(key, spec, &member_scores, nullptr);

for (auto &member_score : member_scores) {
str_vec.emplace_back(member_score.member);
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
zset_db.GetAllMemberScores(key, &member_scores);
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved

for (auto &member_score : member_scores) {
str_vec.emplace_back(member_score.member);
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
}
}
} else {
*res = SortResult::UNKNOWN_TYPE;
return s;
}
}

std::vector<RedisSortObject> sort_vec(str_vec.size());
for (size_t i = 0; i < str_vec.size(); ++i) {
sort_vec[i].obj = str_vec[i];
}

// Sort by BY, ALPHA, ASC/DESC
if (!args.dontsort) {
for (size_t i = 0; i < sort_vec.size(); ++i) {
std::string byval;
if (!args.sortby.empty()) {
byval = lookupKeyByPattern(args.sortby, str_vec[i]);
if (byval.empty()) continue;
} else {
byval = str_vec[i];
}

if (args.alpha) {
if (!args.sortby.empty()) {
sort_vec[i].v = byval;
}
} else {
try {
sort_vec[i].v = std::stod(byval);
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
} catch (const std::exception &e) {
*res = SortResult::DOUBLE_CONVERT_ERROR;
return rocksdb::Status::OK();
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

std::sort(sort_vec.begin(), sort_vec.end(),
[args](const RedisSortObject &a, const RedisSortObject &b) { return SortCompare(a, b, args); });

// Gets the element specified by Limit
if (offset != 0 || count != vectorlen) {
sort_vec = std::vector(sort_vec.begin() + offset, sort_vec.begin() + offset + count);
}
}

// Get the output
for (auto &elem : sort_vec) {
if (args.getpatterns.empty()) {
output_vec->emplace_back(elem.obj);
}
for (const std::string &pattern : args.getpatterns) {
std::string val = lookupKeyByPattern(pattern, elem.obj);
if (val.empty()) {
output_vec->emplace_back(redis::NilString(version));
} else {
output_vec->emplace_back(val);
}
}
}
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved

// Perform storage
if (!args.storekey.empty()) {
redis::List list_db(storage_, namespace_);
std::vector<Slice> elems(output_vec->begin(), output_vec->end());
list_db.Trim(args.storekey, 0, -1);
uint64_t new_size = 0;
list_db.Push(args.storekey, elems, false, &new_size);
}

return rocksdb::Status::OK();
}

bool SortCompare(const RedisSortObject &a, const RedisSortObject &b, const SortArgument &args) {
if (!args.alpha) {
double score_a = std::get<double>(a.v);
double score_b = std::get<double>(b.v);
return !args.desc ? score_a < score_b : score_a > score_b;
} else {
if (!args.sortby.empty()) {
std::string cmp_a = std::get<std::string>(a.v);
std::string cmp_b = std::get<std::string>(b.v);
return !args.desc ? cmp_a < cmp_b : cmp_a > cmp_b;
} else {
return !args.desc ? a.obj < b.obj : a.obj > b.obj;
}
}
}

} // namespace redis
25 changes: 25 additions & 0 deletions src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,33 @@
#include <map>
#include <string>
#include <utility>
#include <variant>
#include <vector>

#include "redis_metadata.h"
#include "server/redis_reply.h"
#include "storage.h"

namespace redis {

struct SortArgument {
std::string sortby; // BY
bool dontsort = false; // DONT SORT
int offset = 0; // LIMIT OFFSET
int count = -1; // LIMIT COUNT
std::vector<std::string> getpatterns; // GET
bool desc = false; // ASC/DESC
bool alpha = false; // ALPHA
std::string storekey; // STORE
};

struct RedisSortObject {
std::string obj;
std::variant<double, std::string> v;
};

bool SortCompare(const RedisSortObject &a, const RedisSortObject &b, const SortArgument &args);
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved

/// Database is a wrapper of underlying storage engine, it provides
/// some common operations for redis commands.
class Database {
Expand Down Expand Up @@ -107,6 +127,9 @@ class Database {
enum class CopyResult { KEY_NOT_EXIST, KEY_ALREADY_EXIST, DONE };
[[nodiscard]] rocksdb::Status Copy(const std::string &key, const std::string &new_key, bool nx, bool delete_old,
CopyResult *res);
enum class SortResult { UNKNOWN_TYPE, DOUBLE_CONVERT_ERROR, DONE };
[[nodiscard]] rocksdb::Status Sort(const RedisType &type, const std::string &key, SortArgument &args,
const RESP &version, std::vector<std::string> *output_vec, SortResult *res);

protected:
engine::Storage *storage_;
Expand All @@ -119,6 +142,8 @@ class Database {
// Already internal keys
[[nodiscard]] rocksdb::Status existsInternal(const std::vector<std::string> &keys, int *ret);
[[nodiscard]] rocksdb::Status typeInternal(const Slice &key, RedisType *type);
// Sort helper
std::string lookupKeyByPattern(const std::string &pattern, const std::string &subst);
};
class LatestSnapShot {
public:
Expand Down
Loading
Loading