Skip to content

Commit

Permalink
compactkey for spop timely (OpenAtomFoundation#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep authored Jul 24, 2018
1 parent e7b29f1 commit 0c79d77
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 14 deletions.
13 changes: 9 additions & 4 deletions include/blackwidow/blackwidow.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,17 @@ enum Operation {
kCleanZSets,
kCleanSets,
kCleanLists,
kCompactKey
};

struct BGTask {
DataType type;
Operation operation;
std::string argv;

BGTask() : type(DataType::kAll) {}
BGTask(const DataType _type) : type(_type) {}
BGTask(const DataType& _type = DataType::kAll,
const Operation& _opeation = Operation::kNone,
const std::string& _argv = "") : type(_type), operation(_opeation), argv(_argv) {}
};

class BlackWidow {
Expand Down Expand Up @@ -971,8 +975,9 @@ class BlackWidow {
Status RunBGTask();
Status AddBGTask(const BGTask& bg_task);

Status Compact(DataType type, bool sync = false);
Status DoCompact(DataType type);
Status Compact(const DataType& type, bool sync = false);
Status DoCompact(const DataType& type);
Status CompactKey(const DataType& type, const std::string& key);

std::string GetCurrentTaskType();
Status GetUsage(const std::string& type, uint64_t *result);
Expand Down
1 change: 1 addition & 0 deletions include/blackwidow/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace blackwidow {
int mkpath(const char *path, mode_t mode);
int delete_dir(const char* dirname);
int is_dir(const char* filename);
int CalculateStartAndEndKey(const std::string& key, std::string* start_key, std::string* end_key);
}

#endif // SRC_UTIL_H_
33 changes: 28 additions & 5 deletions src/blackwidow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,12 @@ Status BlackWidow::SMove(const Slice& source, const Slice& destination,
}

Status BlackWidow::SPop(const Slice& key, std::string* member) {
return sets_db_->SPop(key, member);
bool need_compact = false;
Status status = sets_db_->SPop(key, member, &need_compact);
if (need_compact) {
AddBGTask({kSets, kCompactKey, key.ToString()});
}
return status;
}

Status BlackWidow::SRandmember(const Slice& key, int32_t count,
Expand Down Expand Up @@ -1280,21 +1285,26 @@ Status BlackWidow::RunBGTask() {
if (bg_tasks_should_exit_) {
return Status::Incomplete("bgtask return with bg_tasks_should_exit_ true");
}
DoCompact(task.type);

if (task.operation == kCleanAll) {
DoCompact(task.type);
} else if (task.operation == kCompactKey) {
CompactKey(task.type, task.argv);
}
}
return Status::OK();
}

Status BlackWidow::Compact(DataType type, bool sync) {
Status BlackWidow::Compact(const DataType& type, bool sync) {
if (sync) {
return DoCompact(type);
} else {
AddBGTask({type});
AddBGTask({type, kCleanAll});
}
return Status::OK();
}

Status BlackWidow::DoCompact(DataType type) {
Status BlackWidow::DoCompact(const DataType& type) {
if (type != kAll
&& type != kStrings
&& type != kHashes
Expand Down Expand Up @@ -1332,6 +1342,19 @@ Status BlackWidow::DoCompact(DataType type) {
return s;
}

Status BlackWidow::CompactKey(const DataType& type, const std::string& key) {

Status s;
std::string start_key, end_key;
CalculateStartAndEndKey(key, &start_key, &end_key);
if (type == kSets) {
Slice slice_begin(start_key);
Slice slice_end(end_key);
s = sets_db_->CompactRange(&slice_begin, &slice_end);
}
return s;
}

std::string BlackWidow::GetCurrentTaskType() {
int type = current_task_type_;
switch (type) {
Expand Down
2 changes: 2 additions & 0 deletions src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p
}

Status RedisHashes::GetHScanStartField(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_field) {
slash::MutexLock l(&hscan_cursors_mutex_);
std::string index_key = key.ToString() + "_" + pattern.ToString() + "_" + std::to_string(cursor);
if (hscan_cursors_store_.map_.find(index_key) == hscan_cursors_store_.map_.end()) {
return Status::NotFound();
Expand All @@ -764,6 +765,7 @@ Status RedisHashes::GetHScanStartField(const Slice& key, const Slice& pattern, i
}

Status RedisHashes::StoreHScanNextField(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_field) {
slash::MutexLock l(&hscan_cursors_mutex_);
std::string index_key = key.ToString() + "_" + pattern.ToString() + "_" + std::to_string(cursor);
if (hscan_cursors_store_.list_.size() > hscan_cursors_store_.max_size_) {
std::string tail = hscan_cursors_store_.list_.back();
Expand Down
3 changes: 2 additions & 1 deletion src/redis_hashes.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ class RedisHashes : public Redis {
private:
std::vector<rocksdb::ColumnFamilyHandle*> handles_;

// For HScan
slash::Mutex hscan_cursors_mutex_;
BlackWidow::LRU<std::string, std::string> hscan_cursors_store_;
std::shared_ptr<Mutex> sscan_cursors_mutex_;

Status GetHScanStartField(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_field);
Status StoreHScanNextField(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_field);
Expand Down
44 changes: 43 additions & 1 deletion src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
namespace blackwidow {

RedisSets::RedisSets() {
spop_counts_store_.max_size_ = 1000;
sscan_cursors_store_.max_size_ = 5000;
}

Expand Down Expand Up @@ -727,14 +728,15 @@ Status RedisSets::SMove(const Slice& source, const Slice& destination,
return db_->Write(default_write_options_, &batch);
}

Status RedisSets::SPop(const Slice& key, std::string* member) {
Status RedisSets::SPop(const Slice& key, std::string* member, bool* need_compact) {

std::default_random_engine engine;

std::string meta_value;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t start_us = slash::NowMicros();
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
Expand Down Expand Up @@ -770,9 +772,47 @@ Status RedisSets::SPop(const Slice& key, std::string* member) {
} else {
return s;
}

uint64_t count = 0;
uint64_t duration = slash::NowMicros() - start_us;
AddAndGetSpopCount(key.ToString(), &count);
if (duration >= SPOP_COMPACT_THRESHOLD_DURATION
|| count >= SPOP_COMPACT_THRESHOLD_COUNT) {
*need_compact = true;
ResetSpopCount(key.ToString());
}
return db_->Write(default_write_options_, &batch);
}

Status RedisSets::ResetSpopCount(const std::string& key) {
slash::MutexLock l(&spop_counts_mutex_);
if (spop_counts_store_.map_.find(key) == spop_counts_store_.map_.end()) {
return Status::NotFound();
}
spop_counts_store_.map_.erase(key);
spop_counts_store_.list_.remove(key);
return Status::OK();
}

Status RedisSets::AddAndGetSpopCount(const std::string& key, uint64_t* count) {
slash::MutexLock l(&spop_counts_mutex_);
if (spop_counts_store_.map_.find(key) == spop_counts_store_.map_.end()) {
*count = ++spop_counts_store_.map_[key];
spop_counts_store_.list_.push_front(key);
} else {
*count = ++spop_counts_store_.map_[key];
spop_counts_store_.list_.remove(key);
spop_counts_store_.list_.push_front(key);
}

if (spop_counts_store_.list_.size() > spop_counts_store_.max_size_) {
std::string tail = sscan_cursors_store_.list_.back();
spop_counts_store_.map_.erase(tail);
spop_counts_store_.list_.pop_back();
}
return Status::OK();
}

Status RedisSets::SRandmember(const Slice& key, int32_t count,
std::vector<std::string>* members) {
if (count == 0) {
Expand Down Expand Up @@ -1072,6 +1112,7 @@ Status RedisSets::SScan(const Slice& key, int64_t cursor, const std::string& pat
}

Status RedisSets::GetSScanStartMember(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_member) {
slash::MutexLock l(&sscan_cursors_mutex_);
std::string index_key = key.ToString() + "_" + pattern.ToString() + "_" + std::to_string(cursor);
if (sscan_cursors_store_.map_.find(index_key) == sscan_cursors_store_.map_.end()) {
return Status::NotFound();
Expand All @@ -1082,6 +1123,7 @@ Status RedisSets::GetSScanStartMember(const Slice& key, const Slice& pattern, in
}

Status RedisSets::StoreSScanNextMember(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_member) {
slash::MutexLock l(&sscan_cursors_mutex_);
std::string index_key = key.ToString() + "_" + pattern.ToString() + "_" + std::to_string(cursor);
if (sscan_cursors_store_.list_.size() > sscan_cursors_store_.max_size_) {
std::string tail = sscan_cursors_store_.list_.back();
Expand Down
17 changes: 15 additions & 2 deletions src/redis_sets.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
#include <vector>
#include <unordered_set>

#include "slash/include/env.h"

#include "src/redis.h"
#include "src/custom_comparator.h"
#include "blackwidow/blackwidow.h"

#define SPOP_COMPACT_THRESHOLD_COUNT 500
#define SPOP_COMPACT_THRESHOLD_DURATION 1000 // 1000us

namespace blackwidow {

class RedisSets : public Redis {
Expand Down Expand Up @@ -51,7 +56,7 @@ class RedisSets : public Redis {
std::vector<std::string>* members);
Status SMove(const Slice& source, const Slice& destination,
const Slice& member, int32_t* ret);
Status SPop(const Slice& key, std::string* member);
Status SPop(const Slice& key, std::string* member, bool* need_compact);
Status SRandmember(const Slice& key, int32_t count,
std::vector<std::string>* members);
Status SRem(const Slice& key, const std::vector<std::string>& members,
Expand Down Expand Up @@ -80,11 +85,19 @@ class RedisSets : public Redis {
private:
std::vector<rocksdb::ColumnFamilyHandle*> handles_;

// For compact in time after multiple spop
slash::Mutex spop_counts_mutex_;
BlackWidow::LRU<std::string, uint64_t> spop_counts_store_;
Status ResetSpopCount(const std::string& key);
Status AddAndGetSpopCount(const std::string& key, uint64_t* count);

// For SScan
slash::Mutex sscan_cursors_mutex_;
BlackWidow::LRU<std::string, std::string> sscan_cursors_store_;
std::shared_ptr<Mutex> sscan_cursors_mutex_;

Status GetSScanStartMember(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_member);
Status StoreSScanNextMember(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_member);

};

} // namespace blackwidow
Expand Down
2 changes: 2 additions & 0 deletions src/redis_zsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,7 @@ Status RedisZSets::ZScan(const Slice& key, int64_t cursor, const std::string& pa
}

Status RedisZSets::GetZScanStartMember(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_member) {
slash::MutexLock l(&zscan_cursors_mutex_);
std::string index_key = key.ToString() + "_" + pattern.ToString() + "_" + std::to_string(cursor);
if (zscan_cursors_store_.map_.find(index_key) == zscan_cursors_store_.map_.end()) {
return Status::NotFound();
Expand All @@ -1403,6 +1404,7 @@ Status RedisZSets::GetZScanStartMember(const Slice& key, const Slice& pattern, i
}

Status RedisZSets::StoreZScanNextMember(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_member) {
slash::MutexLock l(&zscan_cursors_mutex_);
std::string index_key = key.ToString() + "_" + pattern.ToString() + "_" + std::to_string(cursor);
if (zscan_cursors_store_.list_.size() > zscan_cursors_store_.max_size_) {
std::string tail = zscan_cursors_store_.list_.back();
Expand Down
3 changes: 2 additions & 1 deletion src/redis_zsets.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ class RedisZSets : public Redis {
private:
std::vector<rocksdb::ColumnFamilyHandle*> handles_;

// For ZScan
slash::Mutex zscan_cursors_mutex_;
BlackWidow::LRU<std::string, std::string> zscan_cursors_store_;
std::shared_ptr<Mutex> zscan_cursors_mutex_;

Status GetZScanStartMember(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_member);
Status StoreZScanNextMember(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_member);
Expand Down
16 changes: 16 additions & 0 deletions src/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <stdint.h>
#include <limits.h>

#include "src/coding.h"
#include "blackwidow/util.h"

namespace blackwidow {
Expand Down Expand Up @@ -452,4 +453,19 @@ int is_dir(const char* filename) {
return -1;
}

int CalculateStartAndEndKey(const std::string& key, std::string* start_key, std::string* end_key) {
size_t needed = sizeof(int32_t) + key.size() + 1;
char* dst = new char[needed];
const char* start = dst;
EncodeFixed32(dst, key.size());
dst += sizeof(int32_t);
memcpy(dst, key.data(), key.size());
dst += key.size();
start_key->assign(start, sizeof(int32_t) + key.size());
*dst = static_cast<uint8_t>(0xff);
end_key->assign(start, sizeof(int32_t) + key.size() + 1);
delete[] start;
return 0;
}

} // namespace blackwidow

0 comments on commit 0c79d77

Please sign in to comment.