Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of the SORT command #2262

Merged
merged 28 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6238f8d
feat: support for the sort command[draft]
PokIsemaine Apr 8, 2024
760bba8
fix: delete some comments
PokIsemaine Apr 8, 2024
04f549d
feat: support sort_ro
PokIsemaine Apr 12, 2024
2a77348
style: clang format
PokIsemaine Apr 12, 2024
bcf78d4
style: golangci-lint
PokIsemaine Apr 12, 2024
dd4c638
fix: sorting a set with no sort specified and TODO
PokIsemaine Apr 13, 2024
db94c8d
feat: sorting a set with no sort specified testcase
PokIsemaine Apr 13, 2024
d70aa4a
fix: TestZsetSort
PokIsemaine Apr 13, 2024
d63bab8
fix: TestZsetSort
PokIsemaine Apr 13, 2024
90d970d
refactor: move cmd_sort to cmd_key
PokIsemaine Apr 21, 2024
6ce9dfc
fix: wrong typo "unknown"
PokIsemaine Apr 21, 2024
4d51525
Merge branch 'apache:unstable' into unstable
PokIsemaine Apr 21, 2024
bbd2549
fix: SortResult
PokIsemaine Apr 21, 2024
84d4a02
docs: add doc strings for SortCompare
PokIsemaine Apr 22, 2024
cdba915
docs: fix typo
PokIsemaine Apr 22, 2024
5c8ce9e
Merge branch 'unstable' into unstable
jihuayu Apr 23, 2024
6b8880d
refactor: refactor the code based on review suggestions
PokIsemaine Apr 24, 2024
75b3e4c
fix: sort in case of get empty and add test
PokIsemaine Apr 26, 2024
b2e728b
fix: clang-tidy
PokIsemaine Apr 26, 2024
866fe85
fix: clang-tidy
PokIsemaine Apr 26, 2024
1b6d115
fix: distinguish between nil and empty string
PokIsemaine Apr 27, 2024
2c3ae39
refactor: use move_iterator
PokIsemaine May 4, 2024
85e572d
Merge branch 'unstable' into unstable
git-hulk May 4, 2024
7667559
refactor: remove move_iterator on vector<Slice>
PokIsemaine May 4, 2024
b7d7f68
fix: Return => Returns
PokIsemaine May 6, 2024
6cfa198
refactor: modify code according to reviewer suggestions
PokIsemaine May 6, 2024
a424037
Merge branch 'unstable' into unstable
mapleFU May 6, 2024
8785d45
Merge branch 'unstable' into unstable
mapleFU May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 117 additions & 1 deletion src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,120 @@ class CommandCopy : public Commander {
bool replace_ = false;
};

template <bool ReadOnly>
class CommandSort : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 2);
while (parser.Good()) {
if (parser.EatEqICase("BY")) {
if (!sort_argument_.sortby.empty()) return {Status::InvalidArgument, "don't use multiple BY parameters"};
sort_argument_.sortby = GET_OR_RET(parser.TakeStr());
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved

if (sort_argument_.sortby.find('*') == std::string::npos) {
sort_argument_.dontsort = true;
} else {
/* TODO:
* If BY is specified with a real pattern, we can't accept it in cluster mode,
* unless we can make sure the keys formed by the pattern are in the same slot
* as the key to sort.
* If BY is specified with a real pattern, we can't accept
* it if no full ACL key access is applied for this command. */
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (parser.EatEqICase("LIMIT")) {
sort_argument_.offset = GET_OR_RET(parser.template TakeInt<int>());
sort_argument_.count = GET_OR_RET(parser.template TakeInt<int>());
} else if (parser.EatEqICase("GET")) {
/* TODO:
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
* If GET is specified with a real pattern, we can't accept it in cluster mode,
* unless we can make sure the keys formed by the pattern are in the same slot
* as the key to sort. */
sort_argument_.getpatterns.push_back(GET_OR_RET(parser.TakeStr()));
} else if (parser.EatEqICase("ASC")) {
sort_argument_.desc = false;
} else if (parser.EatEqICase("DESC")) {
sort_argument_.desc = true;
} else if (parser.EatEqICase("ALPHA")) {
sort_argument_.alpha = true;
} else if (parser.EatEqICase("STORE")) {
if constexpr (ReadOnly) {
return {Status::RedisParseErr, "SORT_RO is read-only and does not support the STORE parameter"};
PragmaTwice marked this conversation as resolved.
Show resolved Hide resolved
}
sort_argument_.storekey = GET_OR_RET(parser.TakeStr());
} else {
return parser.InvalidSyntax();
}
}

return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
RedisType type = kRedisNone;
if (auto s = redis.Type(args_[1], &type); !s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (type != RedisType::kRedisList && type != RedisType::kRedisSet && type != RedisType::kRedisZSet) {
*output = Error("WRONGTYPE Operation against a key holding the wrong kind of value");
return Status::OK();
}

/* When sorting a set with no sort specified, we must sort the output
* so the result is consistent across scripting and replication.
*
* The other types (list, sorted set) will retain their native order
* even if no sort order is requested, so they remain stable across
* scripting and replication.
*
* TODO: support CLIENT_SCRIPT flag, (!storekey_.empty() || c->flags & CLIENT_SCRIPT)) */
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
if (sort_argument_.dontsort && type == RedisType::kRedisSet && (!sort_argument_.storekey.empty())) {
/* Force ALPHA sorting */
sort_argument_.dontsort = false;
sort_argument_.alpha = true;
sort_argument_.sortby = "";
}

std::vector<std::optional<std::string>> sorted_elems;
Database::SortResult res = Database::SortResult::DONE;

if (auto s = redis.Sort(type, args_[1], sort_argument_, &sorted_elems, &res); !s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

switch (res) {
case Database::SortResult::UNKNOWN_TYPE:
*output = redis::Error("Unknown Type");
break;
case Database::SortResult::DOUBLE_CONVERT_ERROR:
*output = redis::Error("One or more scores can't be converted into double");
break;
case Database::SortResult::LIMIT_EXCEEDED:
*output = redis::Error("The number of elements to be sorted exceeds SORT_LENGTH_LIMIT = " +
std::to_string(SORT_LENGTH_LIMIT));
break;
case Database::SortResult::DONE:
if (sort_argument_.storekey.empty()) {
std::vector<std::string> output_vec;
output_vec.reserve(sorted_elems.size());
for (const auto &elem : sorted_elems) {
output_vec.emplace_back(elem.has_value() ? redis::BulkString(elem.value()) : conn->NilString());
}
*output = redis::Array(output_vec);
} else {
*output = Integer(sorted_elems.size());
}
break;
}

return Status::OK();
}

private:
SortArgument sort_argument_;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandPTTL>("pttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandType>("type", 2, "read-only", 1, 1, 1),
Expand All @@ -442,6 +556,8 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandDel>("unlink", -2, "write no-dbsize-check", 1, -1, 1),
MakeCmdAttr<CommandRename>("rename", 3, "write", 1, 2, 1),
MakeCmdAttr<CommandRenameNX>("renamenx", 3, "write", 1, 2, 1),
MakeCmdAttr<CommandCopy>("copy", -3, "write", 1, 2, 1), )
MakeCmdAttr<CommandCopy>("copy", -3, "write", 1, 2, 1),
MakeCmdAttr<CommandSort<false>>("sort", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandSort<true>>("sort_ro", -2, "read-only", 1, 1, 1))

} // namespace redis
212 changes: 212 additions & 0 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "time_util.h"
#include "types/redis_hash.h"
#include "types/redis_list.h"
#include "types/redis_set.h"
#include "types/redis_string.h"
#include "types/redis_zset.h"

namespace redis {

Expand Down Expand Up @@ -768,4 +773,211 @@ rocksdb::Status Database::Copy(const std::string &key, const std::string &new_ke
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

std::optional<std::string> Database::lookupKeyByPattern(const std::string &pattern, const std::string &subst) {
if (pattern == "#") {
return subst;
}

auto match_pos = pattern.find('*');
if (match_pos == std::string::npos) {
return std::nullopt;
}

// hash field
std::string field;
auto arrow_pos = pattern.find("->", match_pos + 1);
if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
field = pattern.substr(arrow_pos + 2);
}

std::string key = pattern.substr(0, match_pos + 1);
key.replace(match_pos, 1, subst);

std::string value;
RedisType type = RedisType::kRedisNone;
if (!field.empty()) {
auto hash_db = redis::Hash(storage_, namespace_);
if (auto s = hash_db.Type(key, &type); !s.ok() || type != RedisType::kRedisHash) {
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
return std::nullopt;
}

if (auto s = hash_db.Get(key, field, &value); !s.ok()) {
return std::nullopt;
}
} else {
auto string_db = redis::String(storage_, namespace_);
if (auto s = string_db.Type(key, &type); !s.ok() || type != RedisType::kRedisString) {
return std::nullopt;
}
if (auto s = string_db.Get(key, &value); !s.ok()) {
return std::nullopt;
}
}
return value;
}

rocksdb::Status Database::Sort(RedisType type, const std::string &key, const SortArgument &args,
std::vector<std::optional<std::string>> *elems, SortResult *res) {
// Obtain the length of the object to sort.
const std::string ns_key = AppendNamespacePrefix(key);
Metadata metadata(type, false);
auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
if (!s.ok()) return s;

if (metadata.size > SORT_LENGTH_LIMIT) {
*res = SortResult::LIMIT_EXCEEDED;
return rocksdb::Status::OK();
}
auto vectorlen = static_cast<int>(metadata.size);

// Adjust the offset and count of the limit
int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, vectorlen - 1);
int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, vectorlen - offset);
if (count == -1) count = vectorlen - offset;
mapleFU marked this conversation as resolved.
Show resolved Hide resolved

// Get the elements that need to be sorted
std::vector<std::string> str_vec;
if (count != 0) {
if (type == RedisType::kRedisList) {
auto list_db = redis::List(storage_, namespace_);

if (args.dontsort) {
if (args.desc) {
s = list_db.Range(key, -count - offset, -1 - offset, &str_vec);
if (!s.ok()) return s;
std::reverse(str_vec.begin(), str_vec.end());
} else {
s = list_db.Range(key, offset, offset + count - 1, &str_vec);
if (!s.ok()) return s;
}
} else {
s = list_db.Range(key, 0, -1, &str_vec);
if (!s.ok()) return s;
}
} else if (type == RedisType::kRedisSet) {
auto set_db = redis::Set(storage_, namespace_);
s = set_db.Members(key, &str_vec);
if (!s.ok()) return s;

if (args.dontsort) {
str_vec = std::vector(std::make_move_iterator(str_vec.begin() + offset),
std::make_move_iterator(str_vec.begin() + offset + count));
}
} else if (type == RedisType::kRedisZSet) {
auto zset_db = redis::ZSet(storage_, namespace_);
std::vector<MemberScore> member_scores;

if (args.dontsort) {
RangeRankSpec spec;
spec.start = offset;
spec.stop = offset + count - 1;
spec.reversed = args.desc;
s = zset_db.RangeByRank(key, spec, &member_scores, nullptr);
if (!s.ok()) return s;

for (auto &member_score : member_scores) {
str_vec.emplace_back(std::move(member_score.member));
}
} else {
s = zset_db.GetAllMemberScores(key, &member_scores);
if (!s.ok()) return s;

for (auto &member_score : member_scores) {
str_vec.emplace_back(std::move(member_score.member));
}
}
} else {
*res = SortResult::UNKNOWN_TYPE;
return s;
}
}

std::vector<RedisSortObject> sort_vec(str_vec.size());
for (size_t i = 0; i < str_vec.size(); ++i) {
sort_vec[i].obj = str_vec[i];
}

// Sort by BY, ALPHA, ASC/DESC
if (!args.dontsort) {
for (size_t i = 0; i < sort_vec.size(); ++i) {
std::string byval;
if (!args.sortby.empty()) {
auto lookup = lookupKeyByPattern(args.sortby, str_vec[i]);
if (!lookup.has_value()) continue;
byval = std::move(lookup.value());
} else {
byval = str_vec[i];
}

if (args.alpha && !args.sortby.empty()) {
sort_vec[i].v = byval;
} else if (!args.alpha && !byval.empty()) {
auto double_byval = ParseFloat<double>(byval);
if (!double_byval) {
*res = SortResult::DOUBLE_CONVERT_ERROR;
return rocksdb::Status::OK();
PokIsemaine marked this conversation as resolved.
Show resolved Hide resolved
}
sort_vec[i].v = *double_byval;
}
}

std::sort(sort_vec.begin(), sort_vec.end(), [&args](const RedisSortObject &a, const RedisSortObject &b) {
return RedisSortObject::SortCompare(a, b, args);
});

// Gets the element specified by Limit
if (offset != 0 || count != vectorlen) {
sort_vec = std::vector(std::make_move_iterator(sort_vec.begin() + offset),
std::make_move_iterator(sort_vec.begin() + offset + count));
}
}

// Perform storage
for (auto &elem : sort_vec) {
if (args.getpatterns.empty()) {
elems->emplace_back(elem.obj);
}
for (const std::string &pattern : args.getpatterns) {
std::optional<std::string> val = lookupKeyByPattern(pattern, elem.obj);
if (val.has_value()) {
elems->emplace_back(val.value());
} else {
elems->emplace_back(std::nullopt);
}
}
}

if (!args.storekey.empty()) {
std::vector<std::string> store_elems;
store_elems.reserve(elems->size());
for (const auto &e : *elems) {
store_elems.emplace_back(e.value_or(""));
}
redis::List list_db(storage_, namespace_);
s = list_db.Trim(args.storekey, -1, 0);
if (!s.ok()) return s;
uint64_t new_size = 0;
s = list_db.Push(args.storekey, std::vector<Slice>(store_elems.cbegin(), store_elems.cend()), false, &new_size);
if (!s.ok()) return s;
}

return rocksdb::Status::OK();
}

bool RedisSortObject::SortCompare(const RedisSortObject &a, const RedisSortObject &b, const SortArgument &args) {
if (!args.alpha) {
double score_a = std::get<double>(a.v);
double score_b = std::get<double>(b.v);
return !args.desc ? score_a < score_b : score_a > score_b;
} else {
if (!args.sortby.empty()) {
std::string cmp_a = std::get<std::string>(a.v);
std::string cmp_b = std::get<std::string>(b.v);
return !args.desc ? cmp_a < cmp_b : cmp_a > cmp_b;
} else {
return !args.desc ? a.obj < b.obj : a.obj > b.obj;
}
}
}

} // namespace redis
Loading
Loading