Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support of the BF.MADD command #1758

Merged
merged 18 commits into from
Sep 19, 2023
Merged
38 changes: 37 additions & 1 deletion src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,46 @@ class CommandBFAdd : public Commander {
auto s = bloom_db.Add(args_[1], args_[2], &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(ret);
if (ret == -1) {
*output = redis::Error("ERR nonscaling filter is full");
} else {
*output = redis::Integer(ret);
}
return Status::OK();
}
};

class CommandBFMAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
items_.reserve(args_.size() - 2);
for (size_t i = 2; i < args_.size(); ++i) {
items_.emplace_back(args_[i]);
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<int> rets(items_.size(), 0);
zncleon marked this conversation as resolved.
Show resolved Hide resolved
auto s = bloom_db.MAdd(args_[1], items_, &rets);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
for (size_t i = 0; i < items_.size(); ++i) {
if (rets[i] == -1) {
*output += Error("ERR nonscaling filter is full");
} else {
*output += Integer(rets[i]);
}
}
return Status::OK();
}

private:
std::vector<Slice> items_;
};

class CommandBFExists : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -226,6 +261,7 @@ class CommandBFCard : public Commander {

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFMAdd>("bf.madd", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1),
Expand Down
120 changes: 68 additions & 52 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ void BloomChain::createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadat
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
}

rocksdb::Status BloomChain::getBFDataList(const std::vector<std::string> &bf_key_list,
std::vector<std::string> *bf_data_list) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();

bf_data_list->reserve(bf_key_list.size());
for (const auto &bf_key : bf_key_list) {
std::string bf_data;
rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
if (!s.ok()) return s;
bf_data_list->push_back(std::move(bf_data));
}
return rocksdb::Status::OK();
}

void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(*bf_data));
Expand All @@ -95,26 +111,12 @@ void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
*bf_data = std::move(block_split_bloom_filter).GetData();
}

rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::vector<Slice> &items,
std::vector<bool> *exists) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string bf_data;
rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
if (!s.ok()) return s;
bool BloomChain::bloomCheck(const Slice &item, std::string &bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(bf_data));
block_split_bloom_filter.Init(bf_data);
git-hulk marked this conversation as resolved.
Show resolved Hide resolved

for (size_t i = 0; i < items.size(); ++i) {
// this item exists in other bloomfilter already, and it's not necessary to check in this bloomfilter.
if ((*exists)[i]) {
continue;
}
uint64_t h = BlockSplitBloomFilter::Hash(items[i].data(), items[i].size());
(*exists)[i] = block_split_bloom_filter.FindHash(h);
}
return rocksdb::Status::OK();
uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
return block_split_bloom_filter.FindHash(h);
}

rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion) {
Expand All @@ -132,6 +134,13 @@ rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, do
}

rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *ret) {
std::vector<int> tmp{0};
rocksdb::Status s = MAdd(user_key, {item}, &tmp);
*ret = tmp[0];
return s;
}

rocksdb::Status BloomChain::MAdd(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

Expand All @@ -146,46 +155,49 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

std::vector<std::string> bf_data_list;
s = getBFDataList(bf_key_list, &bf_data_list);
if (!s.ok()) return s;

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
batch->PutLogData(log_data.Encode());

// check
std::vector<bool> exist{false}; // TODO: to refine in BF.MADD
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], {item}, &exist);
if (!s.ok()) return s;
if (exist[0]) {
*ret = 0;
break;
for (size_t i = 0; i < items.size(); ++i) {
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
// check
bool exists = false;
// TODO: to test which direction for searching is better
for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
exists = bloomCheck(items[i], bf_data_list[ii]);
if (exists) break;
}
}

// insert
if (!exist[0]) { // TODO: to refine in BF.MADD
std::string bf_data;
s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;

if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data);
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
return rocksdb::Status::Aborted("filter is full and is nonscaling");
// insert
if (exists) {
(*rets)[i] = 0;
} else {
if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data_list.back());
std::string bf_data;
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_data_list.push_back(std::move(bf_data));
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
(*rets)[i] = -1; // -1 means nonscaling filter is full
continue;
}
}
bloomAdd(items[i], &bf_data_list.back());
(*rets)[i] = 1;
metadata.size += 1;
}
bloomAdd(item, &bf_data);
batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
}

std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);

batch->Put(bf_key_list.back(), bf_data_list.back());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a "modified bf indices", rather than this. Assume we trigger "scaling", the logic here would be trickey, because it only put the final bf_data_list. And when no data has been added, here will also write a batch.

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

Expand All @@ -210,15 +222,19 @@ rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Sli
std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

// check
std::vector<bool> exists(items.size(), false);
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], items, &exists);
if (!s.ok()) return s;
}
std::vector<std::string> bf_data_list;
s = getBFDataList(bf_key_list, &bf_data_list);
if (!s.ok()) return s;

for (size_t i = 0; i < items.size(); ++i) {
(*rets)[i] = exists[i] ? 1 : 0;
// check
bool exists = false;
// TODO: to test which direction for searching is better
for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
exists = bloomCheck(items[i], bf_data_list[ii]);
if (exists) break;
}
(*rets)[i] = exists ? 1 : 0;
}

return rocksdb::Status::OK();
Expand Down
8 changes: 4 additions & 4 deletions src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class BloomChain : public Database {
BloomChain(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}
rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion);
rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status MAdd(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets);
rocksdb::Status Exists(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets);
rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);
Expand All @@ -65,12 +66,11 @@ class BloomChain : public Database {
void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata *metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data);

rocksdb::Status getBFDataList(const std::vector<std::string> &bf_key_list, std::vector<std::string> *bf_data_list);

/// bf_data: [in/out] The content string of bloomfilter.
static void bloomAdd(const Slice &item, std::string *bf_data);

/// exists: [in/out] The items exist in bloomfilter already or not.
/// exists[i] is true means items[i] exists in other bloomfilter already, and it's not necessary to check in this
/// bloomfilter.
rocksdb::Status bloomCheck(const Slice &bf_key, const std::vector<Slice> &items, std::vector<bool> *exists);
static bool bloomCheck(const Slice &item, std::string &bf_data);
};
} // namespace redis
79 changes: 78 additions & 1 deletion tests/gocase/unit/type/bloom/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -145,6 +146,82 @@ func TestBloom(t *testing.T) {
require.LessOrEqual(t, float64(falseExist), fpp*float64(totalCount))
})

t.Run("MAdd Basic Test", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.Equal(t, []interface{}{int64(0), int64(0), int64(0)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

require.Equal(t, []interface{}{int64(1), int64(1)}, rdb.Do(ctx, "bf.madd", key, "xxx", "zzz").Val())
require.Equal(t, int64(2), rdb.Do(ctx, "bf.card", key).Val())
require.Equal(t, []interface{}{int64(1), int64(0), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

// add the existed value
require.Equal(t, []interface{}{int64(0)}, rdb.Do(ctx, "bf.madd", key, "zzz").Val())
require.Equal(t, []interface{}{int64(1), int64(0), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

// add the same value
require.Equal(t, []interface{}{int64(1), int64(0)}, rdb.Do(ctx, "bf.madd", key, "yyy", "yyy").Val())
require.Equal(t, []interface{}{int64(1), int64(1), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
})

t.Run("MAdd nonscaling Test", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", "25", "nonscaling").Err())

// insert items, suppose false positives is 0
require.Equal(t, []interface{}{int64(1), int64(1), int64(1), int64(1)}, rdb.Do(ctx, "bf.madd", key, "x", "y", "z", "k").Val())
for i := 0; i < 10; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.madd", key, buf, buf+"xx")
require.NoError(t, Add.Err())
}
require.Equal(t, int64(24), rdb.Do(ctx, "bf.card", key).Val())

Add := rdb.Do(ctx, "bf.madd", key, "a", "x", "xxx", "y", "z").Val()
ret := make([]interface{}, 0, 5)
for _, value := range Add.([]interface{}) {
switch v := value.(type) {
case int64:
ret = append(ret, v)
case error:
ret = append(ret, v.Error())
default:
}
}
assert.Equal(t, []interface{}{int64(1), int64(0), "ERR nonscaling filter is full", int64(0), int64(0)}, ret)
require.Equal(t, int64(25), rdb.Do(ctx, "bf.card", key).Val())
})

t.Run("MAdd scaling Test", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", "30", "expansion", "2").Err())

// insert items, suppose false positives is 0
for i := 0; i < 10; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.madd", key, buf, buf+"xx", buf+"yy")
require.NoError(t, Add.Err())
}
require.Equal(t, []interface{}{"Capacity", int64(30), "Size", int64(128), "Number of filters", int64(1), "Number of items inserted", int64(30), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// bloom filter is full and scaling
require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxx").Err())
require.Equal(t, []interface{}{"Capacity", int64(90), "Size", int64(384), "Number of filters", int64(2), "Number of items inserted", int64(31), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// insert items, suppose false positives is 0
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < 59; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, []interface{}{"Capacity", int64(90), "Size", int64(384), "Number of filters", int64(2), "Number of items inserted", int64(90), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
// add the existed value would not scaling
require.NoError(t, rdb.Do(ctx, "bf.madd", key, "xxx").Err())
require.Equal(t, []interface{}{"Capacity", int64(90), "Size", int64(384), "Number of filters", int64(2), "Number of items inserted", int64(90), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
// bloom filter is full and scaling
require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxxx").Err())
require.Equal(t, []interface{}{"Capacity", int64(210), "Size", int64(896), "Number of filters", int64(3), "Number of items inserted", int64(91), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
})

t.Run("MExists Basic Test", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.Equal(t, []interface{}{int64(0), int64(0), int64(0)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
Expand Down Expand Up @@ -229,7 +306,7 @@ func TestBloom(t *testing.T) {
require.NoError(t, Add.Err())
}
require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, "items").Val())
require.ErrorContains(t, rdb.Do(ctx, "bf.add", key, "xxx").Err(), "filter is full and is nonscaling")
require.ErrorContains(t, rdb.Do(ctx, "bf.add", key, "xxx").Err(), "ERR nonscaling filter is full")
require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, "items").Val())
})

Expand Down
Loading