Skip to content

Commit

Permalink
Add the support of the BF.MADD command (#1758)
Browse files Browse the repository at this point in the history
Co-authored-by: hulk <hulk.website@gmail.com>
Co-authored-by: mwish <maplewish117@gmail.com>
  • Loading branch information
3 people authored Sep 19, 2023
1 parent 84a6289 commit 3092081
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 110 deletions.
67 changes: 59 additions & 8 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,71 @@ class CommandBFAdd : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
int ret = 0;
BloomFilterAddResult ret = BloomFilterAddResult::kOk;
auto s = bloom_db.Add(args_[1], args_[2], &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(ret);
switch (ret) {
case BloomFilterAddResult::kOk:
*output = redis::Integer(1);
break;
case BloomFilterAddResult::kExist:
*output = redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output = redis::Error("ERR nonscaling filter is full");
break;
}
return Status::OK();
}
};

class CommandBFMAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
items_.reserve(args_.size() - 2);
for (size_t i = 2; i < args_.size(); ++i) {
items_.emplace_back(args_[i]);
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<BloomFilterAddResult> rets(items_.size(), BloomFilterAddResult::kOk);
auto s = bloom_db.MAdd(args_[1], items_, &rets);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
for (size_t i = 0; i < items_.size(); ++i) {
switch (rets[i]) {
case BloomFilterAddResult::kOk:
*output += redis::Integer(1);
break;
case BloomFilterAddResult::kExist:
*output += redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output += redis::Error("ERR nonscaling filter is full");
break;
}
}
return Status::OK();
}

private:
std::vector<Slice> items_;
};

class CommandBFExists : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
int ret = 0;
auto s = bloom_db.Exists(args_[1], args_[2], &ret);
bool exist = false;
auto s = bloom_db.Exists(args_[1], args_[2], &exist);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(ret);
*output = redis::Integer(exist ? 1 : 0);
return Status::OK();
}
};
Expand All @@ -125,11 +172,14 @@ class CommandBFMExists : public Commander {

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<int> rets(items_.size(), 0);
auto s = bloom_db.MExists(args_[1], items_, &rets);
std::vector<bool> exists(items_.size(), false);
auto s = bloom_db.MExists(args_[1], items_, &exists);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiInteger(rets);
*output = redis::MultiLen(items_.size());
for (size_t i = 0; i < items_.size(); ++i) {
*output += Integer(exists[i] ? 1 : 0);
}
return Status::OK();
}

Expand Down Expand Up @@ -226,6 +276,7 @@ class CommandBFCard : public Commander {

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFMAdd>("bf.madd", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1),
Expand Down
9 changes: 0 additions & 9 deletions src/server/redis_reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ std::string Integer(T data) {
return ":" + std::to_string(data) + CRLF;
}

template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
std::string MultiInteger(const std::vector<T> &multi_data) {
std::string result = "*" + std::to_string(multi_data.size()) + CRLF;
for (const auto &data : multi_data) {
result += Integer(data);
}
return result;
}

std::string BulkString(const std::string &data);
std::string NilString();

Expand Down
165 changes: 96 additions & 69 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

namespace redis {

rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key, BloomChainMetadata *metadata) {
return Database::GetMetadata(kRedisBloomFilter, ns_key, metadata);
}

std::string BloomChain::getBFKey(const Slice &ns_key, const BloomChainMetadata &metadata, uint16_t filters_index) {
std::string sub_key;
PutFixed16(&sub_key, filters_index);
Expand All @@ -40,8 +44,27 @@ void BloomChain::getBFKeyList(const Slice &ns_key, const BloomChainMetadata &met
}
}

rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key, BloomChainMetadata *metadata) {
return Database::GetMetadata(kRedisBloomFilter, ns_key, metadata);
rocksdb::Status BloomChain::getBFDataList(const std::vector<std::string> &bf_key_list,
std::vector<std::string> *bf_data_list) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();

bf_data_list->reserve(bf_key_list.size());
for (const auto &bf_key : bf_key_list) {
std::string bf_data;
rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
if (!s.ok()) return s;
bf_data_list->push_back(std::move(bf_data));
}
return rocksdb::Status::OK();
}

void BloomChain::getItemHashList(const std::vector<Slice> &items, std::vector<uint64_t> *item_hash_list) {
item_hash_list->reserve(items.size());
for (const auto &item : items) {
item_hash_list->push_back(BlockSplitBloomFilter::Hash(item.data(), item.size()));
}
}

rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double error_rate, uint32_t capacity,
Expand Down Expand Up @@ -85,32 +108,14 @@ void BloomChain::createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadat
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
}

void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter(*bf_data);

uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
block_split_bloom_filter.InsertHash(h);
}

rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::vector<Slice> &items,
std::vector<bool> *exists) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string bf_data;
rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
if (!s.ok()) return s;
void BloomChain::bloomAdd(uint64_t item_hash, std::string &bf_data) {
BlockSplitBloomFilter block_split_bloom_filter(bf_data);
block_split_bloom_filter.InsertHash(item_hash);
}

for (size_t i = 0; i < items.size(); ++i) {
// this item exists in other bloomfilter already, and it's not necessary to check in this bloomfilter.
if ((*exists)[i]) {
continue;
}
uint64_t h = BlockSplitBloomFilter::Hash(items[i].data(), items[i].size());
(*exists)[i] = block_split_bloom_filter.FindHash(h);
}
return rocksdb::Status::OK();
bool BloomChain::bloomCheck(uint64_t item_hash, std::string &bf_data) {
const BlockSplitBloomFilter block_split_bloom_filter(bf_data);
return block_split_bloom_filter.FindHash(item_hash);
}

rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion) {
Expand All @@ -127,7 +132,15 @@ rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, do
return createBloomChain(ns_key, error_rate, capacity, expansion, &bloom_chain_metadata);
}

rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *ret) {
rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, BloomFilterAddResult *ret) {
std::vector<BloomFilterAddResult> tmp{BloomFilterAddResult::kOk};
rocksdb::Status s = MAdd(user_key, {item}, &tmp);
*ret = tmp[0];
return s;
}

rocksdb::Status BloomChain::MAdd(const Slice &user_key, const std::vector<Slice> &items,
std::vector<BloomFilterAddResult> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

Expand All @@ -142,79 +155,93 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

std::vector<std::string> bf_data_list;
s = getBFDataList(bf_key_list, &bf_data_list);
if (!s.ok()) return s;

std::vector<uint64_t> item_hash_list;
getItemHashList(items, &item_hash_list);

uint64_t origin_size = metadata.size;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
batch->PutLogData(log_data.Encode());

// check
std::vector<bool> exist{false}; // TODO: to refine in BF.MADD
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], {item}, &exist);
if (!s.ok()) return s;
if (exist[0]) {
*ret = 0;
break;
for (size_t i = 0; i < items.size(); ++i) {
// check
bool exist = false;
// TODO: to test which direction for searching is better
for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
exist = bloomCheck(item_hash_list[i], bf_data_list[ii]);
if (exist) break;
}
}

// insert
if (!exist[0]) { // TODO: to refine in BF.MADD
std::string bf_data;
s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;

if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data);
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
return rocksdb::Status::Aborted("filter is full and is nonscaling");
// insert
if (exist) {
(*rets)[i] = BloomFilterAddResult::kExist;
} else {
if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data_list.back());
std::string bf_data;
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_data_list.push_back(std::move(bf_data));
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
(*rets)[i] = BloomFilterAddResult::kFull;
continue;
}
}
bloomAdd(item_hash_list[i], bf_data_list.back());
(*rets)[i] = BloomFilterAddResult::kOk;
metadata.size += 1;
}
bloomAdd(item, &bf_data);
batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
}

std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);

if (metadata.size != origin_size) {
std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);
batch->Put(bf_key_list.back(), bf_data_list.back());
}
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, int *ret) {
std::vector<int> tmp{0};
rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, bool *exist) {
std::vector<bool> tmp{false};
rocksdb::Status s = MExists(user_key, {item}, &tmp);
*ret = tmp[0];
*exist = tmp[0];
return s;
}

rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets) {
rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<bool> *exists) {
std::string ns_key = AppendNamespacePrefix(user_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
if (s.IsNotFound()) {
std::fill(rets->begin(), rets->end(), 0);
std::fill(exists->begin(), exists->end(), false);
return rocksdb::Status::OK();
}
if (!s.ok()) return s;

std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

// check
std::vector<bool> exists(items.size(), false);
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], items, &exists);
if (!s.ok()) return s;
}
std::vector<std::string> bf_data_list;
s = getBFDataList(bf_key_list, &bf_data_list);
if (!s.ok()) return s;

std::vector<uint64_t> item_hash_list;
getItemHashList(items, &item_hash_list);

for (size_t i = 0; i < items.size(); ++i) {
(*rets)[i] = exists[i] ? 1 : 0;
// check
// TODO: to test which direction for searching is better
for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
(*exists)[i] = bloomCheck(item_hash_list[i], bf_data_list[ii]);
if ((*exists)[i]) break;
}
}

return rocksdb::Status::OK();
Expand Down
Loading

0 comments on commit 3092081

Please sign in to comment.