Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split Inter in ZSet::InterStore into a separate function #1726

Merged
merged 6 commits into from
Sep 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ class CommandZUnion : public Commander {
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
auto s = zset_db.Union(keys_weights_, aggregate_method_, nullptr, &member_scores);
auto s = zset_db.Union(keys_weights_, aggregate_method_, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down
27 changes: 15 additions & 12 deletions src/types/redis_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,16 @@ rocksdb::Status ZSet::Overwrite(const Slice &user_key, const MemberScores &mscor

rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, uint64_t *saved_cnt) {
*saved_cnt = 0;
std::vector<MemberScore> members;
auto s = Inter(keys_weights, aggregate_method, &members);
if (!s.ok()) return s;
*saved_cnt = members.size();
return Overwrite(dst, members);
}

rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
std::vector<MemberScore> *members) {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_weights.size());
for (const auto &key_weight : keys_weights) {
Expand All @@ -634,8 +644,6 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
}
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

if (saved_cnt) *saved_cnt = 0;

std::map<std::string, double> dst_zset;
std::map<std::string, size_t> member_counters;
std::vector<MemberScore> target_mscores;
Expand Down Expand Up @@ -680,14 +688,12 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
}
}
}
if (!dst_zset.empty()) {
std::vector<MemberScore> mscores;
if (members && !dst_zset.empty()) {
members->reserve(dst_zset.size());
for (const auto &iter : dst_zset) {
if (member_counters[iter.first] != keys_weights.size()) continue;
mscores.emplace_back(MemberScore{iter.first, iter.second});
members->emplace_back(MemberScore{iter.first, iter.second});
}
if (saved_cnt) *saved_cnt = mscores.size();
Overwrite(dst, mscores);
}

return rocksdb::Status::OK();
Expand All @@ -697,14 +703,14 @@ rocksdb::Status ZSet::UnionStore(const Slice &dst, const std::vector<KeyWeight>
AggregateMethod aggregate_method, uint64_t *saved_cnt) {
*saved_cnt = 0;
std::vector<MemberScore> members;
auto s = Union(keys_weights, aggregate_method, saved_cnt, &members);
auto s = Union(keys_weights, aggregate_method, &members);
if (!s.ok()) return s;
*saved_cnt = members.size();
return Overwrite(dst, members);
}

rocksdb::Status ZSet::Union(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
uint64_t *saved_cnt, std::vector<MemberScore> *members) {
std::vector<MemberScore> *members) {
std::vector<std::string> lock_keys;
lock_keys.reserve(keys_weights.size());
for (const auto &key_weight : keys_weights) {
Expand All @@ -713,8 +719,6 @@ rocksdb::Status ZSet::Union(const std::vector<KeyWeight> &keys_weights, Aggregat
}
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

if (saved_cnt) *saved_cnt = 0;

std::map<std::string, double> dst_zset;
std::vector<MemberScore> target_mscores;
uint64_t target_size = 0;
Expand Down Expand Up @@ -753,7 +757,6 @@ rocksdb::Status ZSet::Union(const std::vector<KeyWeight> &keys_weights, Aggregat
for (const auto &iter : dst_zset) {
members->emplace_back(MemberScore{iter.first, iter.second});
}
if (saved_cnt) *saved_cnt = members->size();
}
return rocksdb::Status::OK();
}
Expand Down
4 changes: 3 additions & 1 deletion src/types/redis_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ class ZSet : public SubKeyScanner {
rocksdb::Status Overwrite(const Slice &user_key, const MemberScores &mscores);
rocksdb::Status InterStore(const Slice &dst, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, uint64_t *saved_cnt);
rocksdb::Status Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
std::vector<MemberScore> *members);
rocksdb::Status UnionStore(const Slice &dst, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, uint64_t *saved_cnt);
rocksdb::Status Union(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
uint64_t *saved_cnt, std::vector<MemberScore> *members);
std::vector<MemberScore> *members);
rocksdb::Status MGet(const Slice &user_key, const std::vector<Slice> &members, std::map<std::string, double> *scores);
rocksdb::Status GetMetadata(const Slice &ns_key, ZSetMetadata *metadata);

Expand Down