Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support of the BF.MEXISTS and BF.CARD Command #1756

Merged
merged 10 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,38 @@ class CommandBFExists : public Commander {
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
int ret = 0;
auto s = bloom_db.Exist(args_[1], args_[2], &ret);
auto s = bloom_db.Exists(args_[1], args_[2], &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(ret);
return Status::OK();
}
};

class CommandBFMExists : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
items_.reserve(args_.size() - 2);
for (size_t i = 2; i < args_.size(); ++i) {
zncleon marked this conversation as resolved.
Show resolved Hide resolved
items_.emplace_back(args_[i]);
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<int> rets(items_.size(), 0);
auto s = bloom_db.MExists(args_[1], items_, &rets);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiInteger(rets);
return Status::OK();
}

private:
std::vector<Slice> items_;
};

class CommandBFInfo : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -184,8 +208,26 @@ class CommandBFInfo : public Commander {
BloomInfoType type_ = BloomInfoType::kAll;
};

class CommandBFCard : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
BloomFilterInfo info;
auto s = bloom_db.Info(args_[1], &info);
if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, s.ToString()};
if (s.IsNotFound()) {
*output = redis::Integer(0);
} else {
*output = redis::Integer(info.size);
}
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1), )
MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFCard>("bf.card", 2, "read-only", 1, 1, 1), )
} // namespace redis
9 changes: 9 additions & 0 deletions src/server/redis_reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ std::string Integer(T data) {
return ":" + std::to_string(data) + CRLF;
}

template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
std::string MultiInteger(const std::vector<T> &multi_data) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick question: would other Redis response can be help with MultiInteger?

Copy link
Member

@mapleFU mapleFU Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Oh and @PragmaTwice and can we support a vector<bool> version to convert vector<bool> to 0/1 response?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure since resp3 supports boolean.

Copy link
Contributor Author

@zncleon zncleon Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's seems only here(BF.MExists and BF.MAdd) and SMIsMember can be help with MultiInteger. Should we give up MultiInteger?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can put it in src/commands/cmd_bloom_filter.cc in anonymous namespace as a helper first. If you find any other user would use it, you can move it out.

Copy link
Member

@PragmaTwice PragmaTwice Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function looks ok to me.

std::string result = "*" + std::to_string(multi_data.size()) + CRLF;
for (const auto &data : multi_data) {
result += Integer(data);
}
return result;
}

std::string BulkString(const std::string &data);
std::string NilString();

Expand Down
52 changes: 31 additions & 21 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void BloomChain::createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadat
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
}

void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(*bf_data));

Expand All @@ -95,7 +95,8 @@ void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
*bf_data = std::move(block_split_bloom_filter).GetData();
}

rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string &item, bool *exist) {
rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::vector<Slice> &items,
std::vector<bool> *exists) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
Expand All @@ -105,9 +106,14 @@ rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string &i
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(bf_data));

uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
*exist = block_split_bloom_filter.FindHash(h);

for (size_t i = 0; i < items.size(); ++i) {
// this item exists in other bloomfilter already, and it's not necessary to check in this bloomfilter.
if (exists->at(i)) {
zncleon marked this conversation as resolved.
Show resolved Hide resolved
zncleon marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
uint64_t h = BlockSplitBloomFilter::Hash(items[i].data(), items[i].size());
exists->at(i) = block_split_bloom_filter.FindHash(h);
zncleon marked this conversation as resolved.
Show resolved Hide resolved
}
return rocksdb::Status::OK();
}

Expand Down Expand Up @@ -144,21 +150,19 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
batch->PutLogData(log_data.Encode());

std::string item_string = item.ToString();

// check
bool exist = false;
std::vector<bool> exist(1, false); // TODO: to refine in BF.MADD
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], item_string, &exist);
s = bloomCheck(bf_key_list[i], {item}, &exist);
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
if (!s.ok()) return s;
if (exist) {
if (exist[0]) {
*ret = 0;
break;
}
}

// insert
if (!exist) {
if (!exist[0]) { // TODO: to refine in BF.MADD
std::string bf_data;
s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;
Expand All @@ -172,7 +176,7 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
return rocksdb::Status::Aborted("filter is full and is nonscaling");
}
}
bloomAdd(item_string, &bf_data);
bloomAdd(item, &bf_data);
batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
Expand All @@ -185,31 +189,37 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status BloomChain::Exist(const Slice &user_key, const Slice &item, int *ret) {
rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, int *ret) {
std::vector<int> tmp(1, 0);
zncleon marked this conversation as resolved.
Show resolved Hide resolved
rocksdb::Status s = MExists(user_key, {item}, &tmp);
*ret = tmp[0];
return s;
}

rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
if (s.IsNotFound()) {
*ret = 0;
std::fill(rets->begin(), rets->end(), 0);
return rocksdb::Status::OK();
}
if (!s.ok()) return s;

std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

std::string item_string = item.ToString();
// check
bool exist = false;
std::vector<bool> exists(items.size(), false);
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], item_string, &exist);
s = bloomCheck(bf_key_list[i], items, &exists);
if (!s.ok()) return s;
if (exist) {
break;
}
}
*ret = exist ? 1 : 0;

for (size_t i = 0; i < items.size(); ++i) {
rets->at(i) = exists[i] ? 1 : 0;
zncleon marked this conversation as resolved.
Show resolved Hide resolved
}

return rocksdb::Status::OK();
}
Expand Down
13 changes: 10 additions & 3 deletions src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class BloomChain : public Database {
BloomChain(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}
rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion);
rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status Exist(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status Exists(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets);
rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);

private:
Expand All @@ -63,7 +64,13 @@ class BloomChain : public Database {
BloomChainMetadata *metadata);
void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata *metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data);
static void bloomAdd(const std::string &item, std::string *bf_data);
rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, bool *exist);

/// bf_data: [in/out] The content string of bloomfilter.
static void bloomAdd(const Slice &item, std::string *bf_data);

/// exists: [in/out] The items exist in bloomfilter already or not.
zncleon marked this conversation as resolved.
Show resolved Hide resolved
/// exists[i] is true means items[i] exists in other bloomfilter already, and it's not necessary to check in this
/// bloomfilter.
rocksdb::Status bloomCheck(const Slice &bf_key, const std::vector<Slice> &items, std::vector<bool> *exists);
};
} // namespace redis
6 changes: 3 additions & 3 deletions tests/cppunit/types/bloom_chain_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TEST_F(RedisBloomChainTest, Reserve) {
TEST_F(RedisBloomChainTest, BasicAddAndTest) {
int ret = 0;

auto s = sb_chain_->Exist("no_exist_key", "test_item", &ret);
auto s = sb_chain_->Exists("no_exist_key", "test_item", &ret);
EXPECT_EQ(ret, 0);
s = sb_chain_->Del("no_exist_key");

Expand All @@ -67,14 +67,14 @@ TEST_F(RedisBloomChainTest, BasicAddAndTest) {
}

for (const auto& insert_item : insert_items) {
s = sb_chain_->Exist(key_, insert_item, &ret);
s = sb_chain_->Exists(key_, insert_item, &ret);
EXPECT_TRUE(s.ok());
EXPECT_EQ(ret, 1);
}

std::string no_insert_items[] = {"item303", "item404", "1", "2", "3"};
for (const auto& no_insert_item : no_insert_items) {
s = sb_chain_->Exist(key_, no_insert_item, &ret);
s = sb_chain_->Exists(key_, no_insert_item, &ret);
EXPECT_TRUE(s.ok());
EXPECT_EQ(ret, 0);
}
Expand Down
32 changes: 31 additions & 1 deletion tests/gocase/unit/type/bloom/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ func TestBloom(t *testing.T) {
require.LessOrEqual(t, float64(falseExist), fpp*float64(totalCount))
})

t.Run("MExists Basic Test", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.Equal(t, []interface{}{int64(0), int64(0), int64(0)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, "xxx").Val())
require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
require.Equal(t, []interface{}{int64(1), int64(0)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy").Val())

require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, "zzz").Val())
require.Equal(t, []interface{}{int64(1), int64(0), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, "yyy").Val())
require.Equal(t, []interface{}{int64(1), int64(1), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
})

t.Run("Get info of no exists key ", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.info", "no_exist_key").Err(), "ERR key is not found")
Expand Down Expand Up @@ -253,6 +268,21 @@ func TestBloom(t *testing.T) {
require.Equal(t, "MBbloom--", rdb.Type(ctx, key).Val())
})

// TODO: Add the testcase of get filters of bloom filter after complete the scaling.
t.Run("Get Card of bloom filter", func(t *testing.T) {
// if bf.card no exist key, it would return 0
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
require.Equal(t, int64(0), rdb.Do(ctx, "bf.card", "no_exist_key").Val())

require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", "1000", "expansion", "1").Err())
require.Equal(t, int64(0), rdb.Do(ctx, "bf.card", key).Val())
require.NoError(t, rdb.Do(ctx, "bf.add", key, "item1").Err())
require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
// insert the duplicate key, insert would return 0 and the card of bloom filter would not change
require.Equal(t, int64(0), rdb.Do(ctx, "bf.add", key, "item1").Val())
require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
require.NoError(t, rdb.Do(ctx, "bf.add", key, "item2").Err())
require.Equal(t, int64(2), rdb.Do(ctx, "bf.card", key).Val())
zncleon marked this conversation as resolved.
Show resolved Hide resolved
})

}