Skip to content

Commit

Permalink
Merge branch 'unstable' into blmpop
Browse files Browse the repository at this point in the history
  • Loading branch information
HolyLow authored Sep 22, 2023
2 parents 0b4df62 + 723829f commit b9d795f
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 31 deletions.
136 changes: 125 additions & 11 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,39 @@
#include "server/server.h"
#include "types/redis_bloom_chain.h"

namespace {

constexpr const char *errBadErrorRate = "Bad error rate";
constexpr const char *errBadCapacity = "Bad capacity";
constexpr const char *errBadExpansion = "Bad expansion";
constexpr const char *errInvalidErrorRate = "error rate should be between 0 and 1";
constexpr const char *errInvalidCapacity = "capacity should be larger than 0";
constexpr const char *errInvalidExpansion = "expansion should be greater or equal to 1";
constexpr const char *errNonscalingButExpand = "nonscaling filters cannot expand";
constexpr const char *errFilterFull = "ERR nonscaling filter is full";
} // namespace

namespace redis {

class CommandBFReserve : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_error_rate = ParseFloat<double>(args[2]);
if (!parse_error_rate) {
return {Status::RedisParseErr, errValueIsNotFloat};
return {Status::RedisParseErr, errBadErrorRate};
}
error_rate_ = *parse_error_rate;
if (error_rate_ >= 1 || error_rate_ <= 0) {
return {Status::RedisParseErr, "error rate should be between 0 and 1"};
return {Status::RedisParseErr, errInvalidErrorRate};
}

auto parse_capacity = ParseInt<uint32_t>(args[3], 10);
if (!parse_capacity) {
return {Status::RedisParseErr, errValueNotInteger};
return {Status::RedisParseErr, errBadCapacity};
}
capacity_ = *parse_capacity;
if (capacity_ <= 0) {
return {Status::RedisParseErr, "capacity should be larger than 0"};
return {Status::RedisParseErr, errInvalidCapacity};
}

CommandParser parser(args, 4);
Expand All @@ -56,17 +68,21 @@ class CommandBFReserve : public Commander {
expansion_ = 0;
} else if (parser.EatEqICase("expansion")) {
has_expansion = true;
expansion_ = GET_OR_RET(parser.TakeInt<uint16_t>());
auto parse_expansion = parser.TakeInt<uint16_t>();
if (!parse_expansion.IsOK()) {
return {Status::RedisParseErr, errBadExpansion};
}
expansion_ = parse_expansion.GetValue();
if (expansion_ < 1) {
return {Status::RedisParseErr, "expansion should be greater or equal to 1"};
return {Status::RedisParseErr, errInvalidExpansion};
}
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}

if (is_nonscaling && has_expansion) {
return {Status::RedisParseErr, "nonscaling filters cannot expand"};
return {Status::RedisParseErr, errNonscalingButExpand};
}

return Commander::Parse(args);
Expand Down Expand Up @@ -103,7 +119,7 @@ class CommandBFAdd : public Commander {
*output = redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output = redis::Error("ERR nonscaling filter is full");
*output = redis::Error(errFilterFull);
break;
}
return Status::OK();
Expand Down Expand Up @@ -136,15 +152,112 @@ class CommandBFMAdd : public Commander {
*output += redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output += redis::Error("ERR nonscaling filter is full");
*output += redis::Error(errFilterFull);
break;
}
}
return Status::OK();
}

private:
std::vector<std::string> items_;
};

class CommandBFInsert : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 2);
bool is_nonscaling = false;
bool has_expansion = false;
bool has_items = false;
while (parser.Good()) {
if (parser.EatEqICase("capacity")) {
auto parse_capacity = parser.TakeInt<uint32_t>();
if (!parse_capacity.IsOK()) {
return {Status::RedisParseErr, errBadCapacity};
}
insert_options_.capacity = parse_capacity.GetValue();
if (insert_options_.capacity <= 0) {
return {Status::RedisParseErr, errInvalidCapacity};
}
} else if (parser.EatEqICase("error")) {
auto parse_error_rate = parser.TakeFloat<double>();
if (!parse_error_rate.IsOK()) {
return {Status::RedisParseErr, errBadErrorRate};
}
insert_options_.error_rate = parse_error_rate.GetValue();
if (insert_options_.error_rate >= 1 || insert_options_.error_rate <= 0) {
return {Status::RedisParseErr, errInvalidErrorRate};
}
} else if (parser.EatEqICase("nocreate")) {
insert_options_.auto_create = false;
} else if (parser.EatEqICase("nonscaling")) {
is_nonscaling = true;
insert_options_.expansion = 0;
} else if (parser.EatEqICase("expansion")) {
has_expansion = true;
auto parse_expansion = parser.TakeInt<uint16_t>();
if (!parse_expansion.IsOK()) {
return {Status::RedisParseErr, errBadExpansion};
}
insert_options_.expansion = parse_expansion.GetValue();
if (insert_options_.expansion < 1) {
return {Status::RedisParseErr, errInvalidExpansion};
}
} else if (parser.EatEqICase("items")) {
has_items = true;
break;
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}

if (is_nonscaling && has_expansion) {
return {Status::RedisParseErr, errNonscalingButExpand};
}

if (not has_items) {
return {Status::RedisParseErr, errInvalidSyntax};
}

while (parser.Good()) {
items_.emplace_back(GET_OR_RET(parser.TakeStr()));
}

if (items_.size() == 0) {
return {Status::RedisParseErr, "num of items should be greater than 0"};
}

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<BloomFilterAddResult> rets(items_.size(), BloomFilterAddResult::kOk);
auto s = bloom_db.InsertCommon(args_[1], items_, insert_options_, &rets);
if (s.IsNotFound()) return {Status::RedisExecErr, "key is not found"};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
for (size_t i = 0; i < items_.size(); ++i) {
switch (rets[i]) {
case BloomFilterAddResult::kOk:
*output += redis::Integer(1);
break;
case BloomFilterAddResult::kExist:
*output += redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output += redis::Error(errFilterFull);
break;
}
}
return Status::OK();
}

private:
std::vector<Slice> items_;
std::vector<std::string> items_;
BloomFilterInsertOptions insert_options_;
};

class CommandBFExists : public Commander {
Expand Down Expand Up @@ -184,7 +297,7 @@ class CommandBFMExists : public Commander {
}

private:
std::vector<Slice> items_;
std::vector<std::string> items_;
};

class CommandBFInfo : public Commander {
Expand Down Expand Up @@ -277,6 +390,7 @@ class CommandBFCard : public Commander {
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFMAdd>("bf.madd", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFInsert>("bf.insert", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1),
Expand Down
23 changes: 16 additions & 7 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ rocksdb::Status BloomChain::getBFDataList(const std::vector<std::string> &bf_key
return rocksdb::Status::OK();
}

void BloomChain::getItemHashList(const std::vector<Slice> &items, std::vector<uint64_t> *item_hash_list) {
void BloomChain::getItemHashList(const std::vector<std::string> &items, std::vector<uint64_t> *item_hash_list) {
item_hash_list->reserve(items.size());
for (const auto &item : items) {
item_hash_list->push_back(BlockSplitBloomFilter::Hash(item.data(), item.size()));
Expand Down Expand Up @@ -132,23 +132,31 @@ rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, do
return createBloomChain(ns_key, error_rate, capacity, expansion, &bloom_chain_metadata);
}

rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, BloomFilterAddResult *ret) {
rocksdb::Status BloomChain::Add(const Slice &user_key, const std::string &item, BloomFilterAddResult *ret) {
std::vector<BloomFilterAddResult> tmp{BloomFilterAddResult::kOk};
rocksdb::Status s = MAdd(user_key, {item}, &tmp);
*ret = tmp[0];
return s;
}

rocksdb::Status BloomChain::MAdd(const Slice &user_key, const std::vector<Slice> &items,
rocksdb::Status BloomChain::MAdd(const Slice &user_key, const std::vector<std::string> &items,
std::vector<BloomFilterAddResult> *rets) {
BloomFilterInsertOptions insert_options;
return InsertCommon(user_key, items, insert_options, rets);
}

rocksdb::Status BloomChain::InsertCommon(const Slice &user_key, const std::vector<std::string> &items,
const BloomFilterInsertOptions &insert_options,
std::vector<BloomFilterAddResult> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);

if (s.IsNotFound()) {
s = createBloomChain(ns_key, kBFDefaultErrorRate, kBFDefaultInitCapacity, kBFDefaultExpansion, &metadata);
if (s.IsNotFound() && insert_options.auto_create) {
s = createBloomChain(ns_key, insert_options.error_rate, insert_options.capacity, insert_options.expansion,
&metadata);
}
if (!s.ok()) return s;

Expand Down Expand Up @@ -207,14 +215,15 @@ rocksdb::Status BloomChain::MAdd(const Slice &user_key, const std::vector<Slice>
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, bool *exist) {
rocksdb::Status BloomChain::Exists(const Slice &user_key, const std::string &item, bool *exist) {
std::vector<bool> tmp{false};
rocksdb::Status s = MExists(user_key, {item}, &tmp);
*exist = tmp[0];
return s;
}

rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<bool> *exists) {
rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<std::string> &items,
std::vector<bool> *exists) {
std::string ns_key = AppendNamespacePrefix(user_key);

BloomChainMetadata metadata;
Expand Down
20 changes: 15 additions & 5 deletions src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ enum class BloomFilterAddResult {
kFull,
};

struct BloomFilterInsertOptions {
double error_rate = kBFDefaultErrorRate;
uint32_t capacity = kBFDefaultInitCapacity;
uint16_t expansion = kBFDefaultExpansion;
bool auto_create = true;
};

struct BloomFilterInfo {
uint32_t capacity;
uint32_t bloom_bytes;
Expand All @@ -57,18 +64,21 @@ class BloomChain : public Database {
public:
BloomChain(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}
rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion);
rocksdb::Status Add(const Slice &user_key, const Slice &item, BloomFilterAddResult *ret);
rocksdb::Status MAdd(const Slice &user_key, const std::vector<Slice> &items, std::vector<BloomFilterAddResult> *rets);
rocksdb::Status Exists(const Slice &user_key, const Slice &item, bool *exist);
rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<bool> *exists);
rocksdb::Status Add(const Slice &user_key, const std::string &item, BloomFilterAddResult *ret);
rocksdb::Status MAdd(const Slice &user_key, const std::vector<std::string> &items,
std::vector<BloomFilterAddResult> *rets);
rocksdb::Status InsertCommon(const Slice &user_key, const std::vector<std::string> &items,
const BloomFilterInsertOptions &insert_options, std::vector<BloomFilterAddResult> *rets);
rocksdb::Status Exists(const Slice &user_key, const std::string &item, bool *exist);
rocksdb::Status MExists(const Slice &user_key, const std::vector<std::string> &items, std::vector<bool> *exists);
rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);

private:
rocksdb::Status getBloomChainMetadata(const Slice &ns_key, BloomChainMetadata *metadata);
std::string getBFKey(const Slice &ns_key, const BloomChainMetadata &metadata, uint16_t filters_index);
void getBFKeyList(const Slice &ns_key, const BloomChainMetadata &metadata, std::vector<std::string> *bf_key_list);
rocksdb::Status getBFDataList(const std::vector<std::string> &bf_key_list, std::vector<std::string> *bf_data_list);
static void getItemHashList(const std::vector<Slice> &items, std::vector<uint64_t> *item_hash_list);
static void getItemHashList(const std::vector<std::string> &items, std::vector<uint64_t> *item_hash_list);

rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate, uint32_t capacity, uint16_t expansion,
BloomChainMetadata *metadata);
Expand Down
Loading

0 comments on commit b9d795f

Please sign in to comment.