Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support of the BF.MADD command #1758

Merged
merged 18 commits into from
Sep 19, 2023
Merged
67 changes: 59 additions & 8 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,71 @@ class CommandBFAdd : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
int ret = 0;
BloomFilterAddResult ret = BloomFilterAddResult::kOk;
auto s = bloom_db.Add(args_[1], args_[2], &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(ret);
switch (ret) {
case BloomFilterAddResult::kOk:
*output = redis::Integer(1);
break;
case BloomFilterAddResult::kExist:
*output = redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output = redis::Error("ERR nonscaling filter is full");
zncleon marked this conversation as resolved.
Show resolved Hide resolved
break;
}
return Status::OK();
}
};

class CommandBFMAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
items_.reserve(args_.size() - 2);
for (size_t i = 2; i < args_.size(); ++i) {
items_.emplace_back(args_[i]);
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<BloomFilterAddResult> rets(items_.size(), BloomFilterAddResult::kOk);
auto s = bloom_db.MAdd(args_[1], items_, &rets);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
for (size_t i = 0; i < items_.size(); ++i) {
switch (rets[i]) {
case BloomFilterAddResult::kOk:
*output += redis::Integer(1);
break;
case BloomFilterAddResult::kExist:
*output += redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output += redis::Error("ERR nonscaling filter is full");
break;
}
}
return Status::OK();
}

private:
std::vector<Slice> items_;
};

class CommandBFExists : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
int ret = 0;
auto s = bloom_db.Exists(args_[1], args_[2], &ret);
bool exist = false;
auto s = bloom_db.Exists(args_[1], args_[2], &exist);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(ret);
*output = redis::Integer(exist ? 1 : 0);
return Status::OK();
}
};
Expand All @@ -125,11 +172,14 @@ class CommandBFMExists : public Commander {

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<int> rets(items_.size(), 0);
auto s = bloom_db.MExists(args_[1], items_, &rets);
std::vector<bool> exists(items_.size(), false);
auto s = bloom_db.MExists(args_[1], items_, &exists);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiInteger(rets);
*output = redis::MultiLen(items_.size());
for (size_t i = 0; i < items_.size(); ++i) {
*output += Integer(exists[i] ? 1 : 0);
}
return Status::OK();
}

Expand Down Expand Up @@ -226,6 +276,7 @@ class CommandBFCard : public Commander {

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFMAdd>("bf.madd", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1),
Expand Down
9 changes: 0 additions & 9 deletions src/server/redis_reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ std::string Integer(T data) {
return ":" + std::to_string(data) + CRLF;
}

template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
std::string MultiInteger(const std::vector<T> &multi_data) {
std::string result = "*" + std::to_string(multi_data.size()) + CRLF;
for (const auto &data : multi_data) {
result += Integer(data);
}
return result;
}

std::string BulkString(const std::string &data);
std::string NilString();

Expand Down
165 changes: 96 additions & 69 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

namespace redis {

rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key, BloomChainMetadata *metadata) {
return Database::GetMetadata(kRedisBloomFilter, ns_key, metadata);
}

std::string BloomChain::getBFKey(const Slice &ns_key, const BloomChainMetadata &metadata, uint16_t filters_index) {
std::string sub_key;
PutFixed16(&sub_key, filters_index);
Expand All @@ -40,8 +44,27 @@ void BloomChain::getBFKeyList(const Slice &ns_key, const BloomChainMetadata &met
}
}

rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key, BloomChainMetadata *metadata) {
return Database::GetMetadata(kRedisBloomFilter, ns_key, metadata);
rocksdb::Status BloomChain::getBFDataList(const std::vector<std::string> &bf_key_list,
std::vector<std::string> *bf_data_list) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();

bf_data_list->reserve(bf_key_list.size());
for (const auto &bf_key : bf_key_list) {
std::string bf_data;
rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
if (!s.ok()) return s;
bf_data_list->push_back(std::move(bf_data));
}
return rocksdb::Status::OK();
}

void BloomChain::getItemHashList(const std::vector<Slice> &items, std::vector<uint64_t> *item_hash_list) {
item_hash_list->reserve(items.size());
for (const auto &item : items) {
item_hash_list->push_back(BlockSplitBloomFilter::Hash(item.data(), item.size()));
}
}

rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double error_rate, uint32_t capacity,
Expand Down Expand Up @@ -85,32 +108,14 @@ void BloomChain::createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadat
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
}

void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter(*bf_data);

uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
block_split_bloom_filter.InsertHash(h);
}

rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::vector<Slice> &items,
std::vector<bool> *exists) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string bf_data;
rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
if (!s.ok()) return s;
void BloomChain::bloomAdd(uint64_t item_hash, std::string &bf_data) {
BlockSplitBloomFilter block_split_bloom_filter(bf_data);
block_split_bloom_filter.InsertHash(item_hash);
}

for (size_t i = 0; i < items.size(); ++i) {
// this item exists in other bloomfilter already, and it's not necessary to check in this bloomfilter.
if ((*exists)[i]) {
continue;
}
uint64_t h = BlockSplitBloomFilter::Hash(items[i].data(), items[i].size());
(*exists)[i] = block_split_bloom_filter.FindHash(h);
}
return rocksdb::Status::OK();
bool BloomChain::bloomCheck(uint64_t item_hash, std::string &bf_data) {
const BlockSplitBloomFilter block_split_bloom_filter(bf_data);
return block_split_bloom_filter.FindHash(item_hash);
}

rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion) {
Expand All @@ -127,7 +132,15 @@ rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, do
return createBloomChain(ns_key, error_rate, capacity, expansion, &bloom_chain_metadata);
}

rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *ret) {
rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, BloomFilterAddResult *ret) {
std::vector<BloomFilterAddResult> tmp{BloomFilterAddResult::kOk};
rocksdb::Status s = MAdd(user_key, {item}, &tmp);
*ret = tmp[0];
return s;
}

rocksdb::Status BloomChain::MAdd(const Slice &user_key, const std::vector<Slice> &items,
std::vector<BloomFilterAddResult> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

Expand All @@ -142,79 +155,93 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

std::vector<std::string> bf_data_list;
s = getBFDataList(bf_key_list, &bf_data_list);
if (!s.ok()) return s;

std::vector<uint64_t> item_hash_list;
getItemHashList(items, &item_hash_list);

uint64_t origin_size = metadata.size;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
batch->PutLogData(log_data.Encode());

// check
std::vector<bool> exist{false}; // TODO: to refine in BF.MADD
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], {item}, &exist);
if (!s.ok()) return s;
if (exist[0]) {
*ret = 0;
break;
for (size_t i = 0; i < items.size(); ++i) {
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
// check
bool exist = false;
// TODO: to test which direction for searching is better
for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
exist = bloomCheck(item_hash_list[i], bf_data_list[ii]);
if (exist) break;
}
}

// insert
if (!exist[0]) { // TODO: to refine in BF.MADD
std::string bf_data;
s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;

if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data);
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
return rocksdb::Status::Aborted("filter is full and is nonscaling");
// insert
if (exist) {
(*rets)[i] = BloomFilterAddResult::kExist;
} else {
if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data_list.back());
std::string bf_data;
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_data_list.push_back(std::move(bf_data));
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
(*rets)[i] = BloomFilterAddResult::kFull;
continue;
}
}
bloomAdd(item_hash_list[i], bf_data_list.back());
(*rets)[i] = BloomFilterAddResult::kOk;
metadata.size += 1;
}
bloomAdd(item, &bf_data);
batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
}

std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);

if (metadata.size != origin_size) {
std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);
batch->Put(bf_key_list.back(), bf_data_list.back());
}
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, int *ret) {
std::vector<int> tmp{0};
rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, bool *exist) {
std::vector<bool> tmp{false};
rocksdb::Status s = MExists(user_key, {item}, &tmp);
*ret = tmp[0];
*exist = tmp[0];
return s;
}

rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets) {
rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<bool> *exists) {
std::string ns_key = AppendNamespacePrefix(user_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
if (s.IsNotFound()) {
std::fill(rets->begin(), rets->end(), 0);
std::fill(exists->begin(), exists->end(), false);
return rocksdb::Status::OK();
}
if (!s.ok()) return s;

std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

// check
std::vector<bool> exists(items.size(), false);
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], items, &exists);
if (!s.ok()) return s;
}
std::vector<std::string> bf_data_list;
s = getBFDataList(bf_key_list, &bf_data_list);
if (!s.ok()) return s;

std::vector<uint64_t> item_hash_list;
getItemHashList(items, &item_hash_list);

for (size_t i = 0; i < items.size(); ++i) {
(*rets)[i] = exists[i] ? 1 : 0;
// check
// TODO: to test which direction for searching is better
for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
(*exists)[i] = bloomCheck(item_hash_list[i], bf_data_list[ii]);
if ((*exists)[i]) break;
}
}

return rocksdb::Status::OK();
Expand Down
Loading