Skip to content

Commit

Permalink
Add the support of the BF.MEXISTS and BF.CARD Command (#1756)
Browse files Browse the repository at this point in the history
Co-authored-by: Twice <twice@apache.org>
  • Loading branch information
zncleon and PragmaTwice authored Sep 14, 2023
1 parent 5c9137c commit 83aaa75
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 30 deletions.
46 changes: 44 additions & 2 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,38 @@ class CommandBFExists : public Commander {
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
int ret = 0;
auto s = bloom_db.Exist(args_[1], args_[2], &ret);
auto s = bloom_db.Exists(args_[1], args_[2], &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(ret);
return Status::OK();
}
};

class CommandBFMExists : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
items_.reserve(args_.size() - 2);
for (size_t i = 2; i < args_.size(); ++i) {
items_.emplace_back(args_[i]);
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<int> rets(items_.size(), 0);
auto s = bloom_db.MExists(args_[1], items_, &rets);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiInteger(rets);
return Status::OK();
}

private:
std::vector<Slice> items_;
};

class CommandBFInfo : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -184,8 +208,26 @@ class CommandBFInfo : public Commander {
BloomInfoType type_ = BloomInfoType::kAll;
};

class CommandBFCard : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
BloomFilterInfo info;
auto s = bloom_db.Info(args_[1], &info);
if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, s.ToString()};
if (s.IsNotFound()) {
*output = redis::Integer(0);
} else {
*output = redis::Integer(info.size);
}
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1), )
MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFCard>("bf.card", 2, "read-only", 1, 1, 1), )
} // namespace redis
9 changes: 9 additions & 0 deletions src/server/redis_reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ std::string Integer(T data) {
return ":" + std::to_string(data) + CRLF;
}

template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
std::string MultiInteger(const std::vector<T> &multi_data) {
std::string result = "*" + std::to_string(multi_data.size()) + CRLF;
for (const auto &data : multi_data) {
result += Integer(data);
}
return result;
}

std::string BulkString(const std::string &data);
std::string NilString();

Expand Down
52 changes: 31 additions & 21 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void BloomChain::createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadat
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
}

void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(*bf_data));

Expand All @@ -95,7 +95,8 @@ void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
*bf_data = std::move(block_split_bloom_filter).GetData();
}

rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string &item, bool *exist) {
rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::vector<Slice> &items,
std::vector<bool> *exists) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
Expand All @@ -105,9 +106,14 @@ rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string &i
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(bf_data));

uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
*exist = block_split_bloom_filter.FindHash(h);

for (size_t i = 0; i < items.size(); ++i) {
// this item exists in other bloomfilter already, and it's not necessary to check in this bloomfilter.
if ((*exists)[i]) {
continue;
}
uint64_t h = BlockSplitBloomFilter::Hash(items[i].data(), items[i].size());
(*exists)[i] = block_split_bloom_filter.FindHash(h);
}
return rocksdb::Status::OK();
}

Expand Down Expand Up @@ -144,21 +150,19 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
batch->PutLogData(log_data.Encode());

std::string item_string = item.ToString();

// check
bool exist = false;
std::vector<bool> exist{false}; // TODO: to refine in BF.MADD
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], item_string, &exist);
s = bloomCheck(bf_key_list[i], {item}, &exist);
if (!s.ok()) return s;
if (exist) {
if (exist[0]) {
*ret = 0;
break;
}
}

// insert
if (!exist) {
if (!exist[0]) { // TODO: to refine in BF.MADD
std::string bf_data;
s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;
Expand All @@ -172,7 +176,7 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
return rocksdb::Status::Aborted("filter is full and is nonscaling");
}
}
bloomAdd(item_string, &bf_data);
bloomAdd(item, &bf_data);
batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
Expand All @@ -185,31 +189,37 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status BloomChain::Exist(const Slice &user_key, const Slice &item, int *ret) {
rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, int *ret) {
std::vector<int> tmp{0};
rocksdb::Status s = MExists(user_key, {item}, &tmp);
*ret = tmp[0];
return s;
}

rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
if (s.IsNotFound()) {
*ret = 0;
std::fill(rets->begin(), rets->end(), 0);
return rocksdb::Status::OK();
}
if (!s.ok()) return s;

std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

std::string item_string = item.ToString();
// check
bool exist = false;
std::vector<bool> exists(items.size(), false);
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], item_string, &exist);
s = bloomCheck(bf_key_list[i], items, &exists);
if (!s.ok()) return s;
if (exist) {
break;
}
}
*ret = exist ? 1 : 0;

for (size_t i = 0; i < items.size(); ++i) {
(*rets)[i] = exists[i] ? 1 : 0;
}

return rocksdb::Status::OK();
}
Expand Down
13 changes: 10 additions & 3 deletions src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class BloomChain : public Database {
BloomChain(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}
rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion);
rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status Exist(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status Exists(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets);
rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);

private:
Expand All @@ -63,7 +64,13 @@ class BloomChain : public Database {
BloomChainMetadata *metadata);
void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata *metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data);
static void bloomAdd(const std::string &item, std::string *bf_data);
rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, bool *exist);

/// bf_data: [in/out] The content string of bloomfilter.
static void bloomAdd(const Slice &item, std::string *bf_data);

/// exists: [in/out] The items exist in bloomfilter already or not.
/// exists[i] is true means items[i] exists in other bloomfilter already, and it's not necessary to check in this
/// bloomfilter.
rocksdb::Status bloomCheck(const Slice &bf_key, const std::vector<Slice> &items, std::vector<bool> *exists);
};
} // namespace redis
6 changes: 3 additions & 3 deletions tests/cppunit/types/bloom_chain_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TEST_F(RedisBloomChainTest, Reserve) {
TEST_F(RedisBloomChainTest, BasicAddAndTest) {
int ret = 0;

auto s = sb_chain_->Exist("no_exist_key", "test_item", &ret);
auto s = sb_chain_->Exists("no_exist_key", "test_item", &ret);
EXPECT_EQ(ret, 0);
s = sb_chain_->Del("no_exist_key");

Expand All @@ -67,14 +67,14 @@ TEST_F(RedisBloomChainTest, BasicAddAndTest) {
}

for (const auto& insert_item : insert_items) {
s = sb_chain_->Exist(key_, insert_item, &ret);
s = sb_chain_->Exists(key_, insert_item, &ret);
EXPECT_TRUE(s.ok());
EXPECT_EQ(ret, 1);
}

std::string no_insert_items[] = {"item303", "item404", "1", "2", "3"};
for (const auto& no_insert_item : no_insert_items) {
s = sb_chain_->Exist(key_, no_insert_item, &ret);
s = sb_chain_->Exists(key_, no_insert_item, &ret);
EXPECT_TRUE(s.ok());
EXPECT_EQ(ret, 0);
}
Expand Down
32 changes: 31 additions & 1 deletion tests/gocase/unit/type/bloom/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ func TestBloom(t *testing.T) {
require.LessOrEqual(t, float64(falseExist), fpp*float64(totalCount))
})

t.Run("MExists Basic Test", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.Equal(t, []interface{}{int64(0), int64(0), int64(0)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, "xxx").Val())
require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
require.Equal(t, []interface{}{int64(1), int64(0)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy").Val())

require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, "zzz").Val())
require.Equal(t, []interface{}{int64(1), int64(0), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, "yyy").Val())
require.Equal(t, []interface{}{int64(1), int64(1), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
})

t.Run("Get info of no exists key ", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.info", "no_exist_key").Err(), "ERR key is not found")
Expand Down Expand Up @@ -253,6 +268,21 @@ func TestBloom(t *testing.T) {
require.Equal(t, "MBbloom--", rdb.Type(ctx, key).Val())
})

// TODO: Add the testcase of get filters of bloom filter after complete the scaling.
t.Run("Get Card of bloom filter", func(t *testing.T) {
// if bf.card no exist key, it would return 0
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
require.Equal(t, int64(0), rdb.Do(ctx, "bf.card", "no_exist_key").Val())

require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", "1000", "expansion", "1").Err())
require.Equal(t, int64(0), rdb.Do(ctx, "bf.card", key).Val())
require.NoError(t, rdb.Do(ctx, "bf.add", key, "item1").Err())
require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
// insert the duplicate key, insert would return 0 and the card of bloom filter would not change
require.Equal(t, int64(0), rdb.Do(ctx, "bf.add", key, "item1").Val())
require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
require.NoError(t, rdb.Do(ctx, "bf.add", key, "item2").Err())
require.Equal(t, int64(2), rdb.Do(ctx, "bf.card", key).Val())
})

}

0 comments on commit 83aaa75

Please sign in to comment.