Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support of the bloom BF.INFO command #1710

Merged
merged 15 commits into from
Aug 31, 2023
65 changes: 59 additions & 6 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ class CommandBFReserve : public Commander {
}

CommandParser parser(args, 4);
bool nonscaling = false;
bool is_nonscaling = false;
bool has_expansion = false;
while (parser.Good()) {
if (parser.EatEqICase("nonscaling")) {
nonscaling = true;
is_nonscaling = true;
} else if (parser.EatEqICase("expansion")) {
has_expansion = true;
expansion_ = GET_OR_RET(parser.TakeInt<uint16_t>());
if (expansion_ < 1) {
return {Status::RedisParseErr, "expansion should be greater or equal to 1"};
Expand All @@ -62,8 +64,7 @@ class CommandBFReserve : public Commander {
}
}

// if nonscaling is true, expansion should be 0
if (nonscaling && expansion_ != 0) {
if (is_nonscaling && has_expansion) {
return {Status::RedisParseErr, "nonscaling filters cannot expand"};
}

Expand All @@ -82,7 +83,7 @@ class CommandBFReserve : public Commander {
private:
double error_rate_;
uint32_t capacity_;
uint16_t expansion_ = 0;
uint16_t expansion_ = kBFDefaultExpansion;
};

class CommandBFAdd : public Commander {
Expand Down Expand Up @@ -111,7 +112,59 @@ class CommandBFExists : public Commander {
}
};

class CommandBFInfo : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 2);
while (parser.Good()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while (parser.Good()) {
if (parser.Good()) {

if (parser.EatEqICase("capacity")) {
type_ = CAPACITY;
} else if (parser.EatEqICase("size")) {
type_ = SIZE;
} else if (parser.EatEqICase("filters")) {
type_ = FILTERS;
} else if (parser.EatEqICase("items")) {
type_ = ITEMS;
} else if (parser.EatEqICase("expansion")) {
type_ = EXPANSION;
} else {
return {Status::RedisParseErr, "Invalid info argument"};
}
}

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<int> rets;
rets.reserve(all_nums_);
auto s = bloom_db.Info(args_[1], type_, &rets);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

if (type_ == ALL) {
*output = "*" + std::to_string(2 * all_nums_) +
CRLF; // todo: this reply only used in here, whether should I place it in redis_reply.h
for (int i = 0; i < all_nums_; ++i) {
*output += redis::SimpleString(all_info_rets_[i]);
*output += redis::Integer(rets[i]);
}
} else {
*output = redis::Integer(rets[0]);
}

return Status::OK();
}

private:
BloomInfoType type_ = ALL;
const int all_nums_ = 5;
const std::vector<std::string> all_info_rets_ = {"Capacity", "Size", "Number of filters", "Number of items inserted",
"Expansion rate"};
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1), )
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1), )
} // namespace redis
2 changes: 1 addition & 1 deletion src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,5 +450,5 @@ uint32_t BloomChainMetadata::GetCapacity() const {
if (expansion == 1) {
return base_capacity * n_filters;
}
return static_cast<uint32_t>(base_capacity * (1 - pow(expansion, n_filters)) / 1 - expansion);
return static_cast<uint32_t>(base_capacity * (1 - pow(expansion, n_filters)) / (1 - expansion));
}
40 changes: 40 additions & 0 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,44 @@ rocksdb::Status BloomChain::Exist(const Slice &user_key, const Slice &item, int

return rocksdb::Status::OK();
}

rocksdb::Status BloomChain::Info(const Slice &user_key, const BloomInfoType &type, std::vector<int> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info doesn't need to lock since it just atomicly get the metadata?


BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
if (s.IsNotFound()) return rocksdb::Status::NotFound("key is not found");
if (!s.ok()) return s;

switch (type) {
case ALL:
rets->push_back(static_cast<int>(metadata.GetCapacity()));
rets->push_back(static_cast<int>(metadata.bloom_bytes));
rets->push_back(static_cast<int>(metadata.n_filters));
rets->push_back(static_cast<int>(metadata.size));
rets->push_back(static_cast<int>(metadata.expansion));
break;
case CAPACITY:
rets->push_back(static_cast<int>(metadata.GetCapacity()));
break;
case SIZE:
rets->push_back(static_cast<int>(metadata.bloom_bytes));
break;
case FILTERS:
rets->push_back(static_cast<int>(metadata.n_filters));
break;
case ITEMS:
rets->push_back(static_cast<int>(metadata.size));
break;
case EXPANSION:
rets->push_back(static_cast<int>(metadata.expansion));
break;
default:
LOG(ERROR) << "Failed to parse the type of BF.INFO command";
}

return rocksdb::Status::OK();
}

} // namespace redis
10 changes: 10 additions & 0 deletions src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,22 @@ const uint32_t kBFDefaultInitCapacity = 100;
const double kBFDefaultErrorRate = 0.01;
const uint16_t kBFDefaultExpansion = 2;

enum BloomInfoType {
ALL,
CAPACITY,
SIZE,
FILTERS,
ITEMS,
EXPANSION,
};

class BloomChain : public Database {
public:
BloomChain(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}
rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion);
rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status Exist(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status Info(const Slice &user_key, const BloomInfoType &type, std::vector<int> *rets);

private:
std::string getBFKey(const Slice &ns_key, const BloomChainMetadata &metadata, uint16_t filters_index);
Expand Down
42 changes: 41 additions & 1 deletion tests/gocase/unit/type/bloom/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestBloom(t *testing.T) {
})

t.Run("Check no exists key", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.exists", "no_exist_key", "item1").Err(), "ERR NotFound: key is not found")
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
})
Expand Down Expand Up @@ -116,4 +116,44 @@ func TestBloom(t *testing.T) {
}
require.LessOrEqual(t, float64(exist), fpp*float64(totalCount))
})

t.Run("Get info of no exists key ", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.info", "no_exist_key").Err(), "ERR NotFound: key is not found")
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
})

t.Run("Get all info of bloom filter", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", "1000", "expansion", "3").Err())
require.Equal(t, []interface{}{"Capacity", int64(1000), "Size", int64(2048), "Number of filters", int64(1), "Number of items inserted", int64(0), "Expansion rate", int64(3)}, rdb.Do(ctx, "bf.info", key).Val())
})

t.Run("Get capacity of bloom filter", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "2000").Err())
require.Equal(t, int64(2000), rdb.Do(ctx, "bf.info", key, "capacity").Val())
})

t.Run("Get expansion of bloom filter", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "2000", "expansion", "1").Err())
require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key, "expansion").Val())
})

t.Run("Get size of bloom filter", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", "1000", "expansion", "1").Err())
require.Equal(t, int64(2048), rdb.Do(ctx, "bf.info", key, "size").Val())
})

t.Run("Get items of bloom filter", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", "1000", "expansion", "1").Err())
require.Equal(t, int64(0), rdb.Do(ctx, "bf.info", key, "items").Val())
require.NoError(t, rdb.Do(ctx, "bf.add", key, "item").Err())
require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key, "items").Val())
})

// TODO: Add the testcase of get filters of bloom filter after complete the scaling.
}