Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support of the BF.MADD command #1758

Merged
merged 18 commits into from
Sep 19, 2023
Merged
55 changes: 47 additions & 8 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,59 @@ class CommandBFAdd : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
int ret = 0;
BloomFilterAddResult ret = BloomFilterAddResult::kOk;
auto s = bloom_db.Add(args_[1], args_[2], &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(ret);
if (ret == BloomFilterAddResult::kFull) {
*output = redis::Error("ERR nonscaling filter is full");
} else {
*output = redis::Integer(ret == BloomFilterAddResult::kOk ? 1 : 0);
}
return Status::OK();
}
};

class CommandBFMAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
items_.reserve(args_.size() - 2);
for (size_t i = 2; i < args_.size(); ++i) {
items_.emplace_back(args_[i]);
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<BloomFilterAddResult> rets(items_.size(), BloomFilterAddResult::kOk);
auto s = bloom_db.MAdd(args_[1], items_, &rets);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
for (size_t i = 0; i < items_.size(); ++i) {
if (rets[i] == BloomFilterAddResult::kFull) {
*output += Error("ERR nonscaling filter is full");
} else {
*output += Integer(rets[i] == BloomFilterAddResult::kOk ? 1 : 0);
}
}
return Status::OK();
}

private:
std::vector<Slice> items_;
};

class CommandBFExists : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
int ret = 0;
auto s = bloom_db.Exists(args_[1], args_[2], &ret);
bool exist = false;
auto s = bloom_db.Exists(args_[1], args_[2], &exist);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(ret);
*output = redis::Integer(exist ? 1 : 0);
return Status::OK();
}
};
Expand All @@ -125,11 +160,14 @@ class CommandBFMExists : public Commander {

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<int> rets(items_.size(), 0);
auto s = bloom_db.MExists(args_[1], items_, &rets);
std::vector<bool> exists(items_.size(), false);
auto s = bloom_db.MExists(args_[1], items_, &exists);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiInteger(rets);
*output = redis::MultiLen(items_.size());
for (size_t i = 0; i < items_.size(); ++i) {
*output += Integer(exists[i] ? 1 : 0);
}
return Status::OK();
}

Expand Down Expand Up @@ -226,6 +264,7 @@ class CommandBFCard : public Commander {

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFMAdd>("bf.madd", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1),
Expand Down
9 changes: 0 additions & 9 deletions src/server/redis_reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ std::string Integer(T data) {
return ":" + std::to_string(data) + CRLF;
}

template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
std::string MultiInteger(const std::vector<T> &multi_data) {
std::string result = "*" + std::to_string(multi_data.size()) + CRLF;
for (const auto &data : multi_data) {
result += Integer(data);
}
return result;
}

std::string BulkString(const std::string &data);
std::string NilString();

Expand Down
6 changes: 6 additions & 0 deletions src/types/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ bool BlockSplitBloomFilter::Init(std::string bitset) {
return true;
}

std::unique_ptr<const BlockSplitBloomFilter> BlockSplitBloomFilter::CreateNonOwned(const std::string& bitset) {
return std::unique_ptr<const BlockSplitBloomFilter>(new BlockSplitBloomFilter(bitset));
}

static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;

bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
Expand Down Expand Up @@ -94,3 +98,5 @@ void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
}

uint64_t BlockSplitBloomFilter::Hash(const char* data, size_t length) { return XXH64(data, length, /*seed=*/0); }

BlockSplitBloomFilter::BlockSplitBloomFilter(const std::string& bitset) : data_(bitset), num_bytes_(bitset.size()) {}
10 changes: 10 additions & 0 deletions src/types/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ class BlockSplitBloomFilter {
/// @return false if the number of bytes of Bloom filter bitset is not a power of 2, and true means successfully init
bool Init(std::string bitset);

/// Create the non-owned BlockSplitBloomFilter. It use the bitset as underlying bitset. It is the caller's
/// responsibility to ensure the bitset would not to change.
///
/// @param bitset The given bitset for the Bloom filter underlying bitset.
/// @return the unique_ptr of the const non-owned BlockSplitBloomFilter
static std::unique_ptr<const BlockSplitBloomFilter> CreateNonOwned(const std::string& bitset);

/// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter.
static constexpr uint32_t kMinimumBloomFilterBytes = 32;

Expand Down Expand Up @@ -166,6 +173,9 @@ class BlockSplitBloomFilter {
static uint64_t Hash(const char* data, size_t length);

private:
// The private constructor of BlockSplitBloomFilter. It's only used for CreateNonOwned
explicit BlockSplitBloomFilter(const std::string& bitset);

// Bytes in a tiny Bloom filter block.
static constexpr int kBytesPerFilterBlock = 32;

Expand Down
133 changes: 74 additions & 59 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ void BloomChain::createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadat
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
}

rocksdb::Status BloomChain::getBFDataList(const std::vector<std::string> &bf_key_list,
std::vector<std::string> *bf_data_list) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();

bf_data_list->reserve(bf_key_list.size());
for (const auto &bf_key : bf_key_list) {
std::string bf_data;
rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
if (!s.ok()) return s;
bf_data_list->push_back(std::move(bf_data));
}
return rocksdb::Status::OK();
}

void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(*bf_data));
Expand All @@ -95,26 +111,12 @@ void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
*bf_data = std::move(block_split_bloom_filter).GetData();
}

rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::vector<Slice> &items,
std::vector<bool> *exists) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string bf_data;
rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
if (!s.ok()) return s;
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(bf_data));
bool BloomChain::bloomCheck(const Slice &item, std::string &bf_data) {
std::unique_ptr<const BlockSplitBloomFilter> block_split_bloom_filter_non_owned =
BlockSplitBloomFilter::CreateNonOwned(bf_data);

for (size_t i = 0; i < items.size(); ++i) {
// this item exists in other bloomfilter already, and it's not necessary to check in this bloomfilter.
if ((*exists)[i]) {
continue;
}
uint64_t h = BlockSplitBloomFilter::Hash(items[i].data(), items[i].size());
(*exists)[i] = block_split_bloom_filter.FindHash(h);
}
return rocksdb::Status::OK();
uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
return block_split_bloom_filter_non_owned->FindHash(h);
}

rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion) {
Expand All @@ -131,7 +133,15 @@ rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, do
return createBloomChain(ns_key, error_rate, capacity, expansion, &bloom_chain_metadata);
}

rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *ret) {
rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, BloomFilterAddResult *ret) {
std::vector<BloomFilterAddResult> tmp{BloomFilterAddResult::kOk};
rocksdb::Status s = MAdd(user_key, {item}, &tmp);
*ret = tmp[0];
return s;
}

rocksdb::Status BloomChain::MAdd(const Slice &user_key, const std::vector<Slice> &items,
std::vector<BloomFilterAddResult> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

Expand All @@ -146,79 +156,84 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r
std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

std::vector<std::string> bf_data_list;
s = getBFDataList(bf_key_list, &bf_data_list);
if (!s.ok()) return s;

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
batch->PutLogData(log_data.Encode());

// check
std::vector<bool> exist{false}; // TODO: to refine in BF.MADD
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], {item}, &exist);
if (!s.ok()) return s;
if (exist[0]) {
*ret = 0;
break;
for (size_t i = 0; i < items.size(); ++i) {
// check
bool exist = false;
// TODO: to test which direction for searching is better
for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
exist = bloomCheck(items[i], bf_data_list[ii]);
if (exist) break;
}
}

// insert
if (!exist[0]) { // TODO: to refine in BF.MADD
std::string bf_data;
s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;

if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data);
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
return rocksdb::Status::Aborted("filter is full and is nonscaling");
// insert
if (exist) {
(*rets)[i] = BloomFilterAddResult::kExist;
} else {
if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data_list.back());
std::string bf_data;
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_data_list.push_back(std::move(bf_data));
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
(*rets)[i] = BloomFilterAddResult::kFull;
continue;
}
}
bloomAdd(items[i], &bf_data_list.back());
(*rets)[i] = BloomFilterAddResult::kOk;
metadata.size += 1;
}
bloomAdd(item, &bf_data);
batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
}

std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);

batch->Put(bf_key_list.back(), bf_data_list.back());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a "modified bf indices", rather than this. Assume we trigger "scaling", the logic here would be trickey, because it only put the final bf_data_list. And when no data has been added, here will also write a batch.

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, int *ret) {
std::vector<int> tmp{0};
rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, bool *exist) {
std::vector<bool> tmp{false};
rocksdb::Status s = MExists(user_key, {item}, &tmp);
*ret = tmp[0];
*exist = tmp[0];
return s;
}

rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets) {
rocksdb::Status BloomChain::MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<bool> *exists) {
std::string ns_key = AppendNamespacePrefix(user_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
if (s.IsNotFound()) {
std::fill(rets->begin(), rets->end(), 0);
std::fill(exists->begin(), exists->end(), false);
return rocksdb::Status::OK();
}
if (!s.ok()) return s;

std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);

// check
std::vector<bool> exists(items.size(), false);
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which direction for searching is better
s = bloomCheck(bf_key_list[i], items, &exists);
if (!s.ok()) return s;
}
std::vector<std::string> bf_data_list;
s = getBFDataList(bf_key_list, &bf_data_list);
if (!s.ok()) return s;

for (size_t i = 0; i < items.size(); ++i) {
(*rets)[i] = exists[i] ? 1 : 0;
// check
// TODO: to test which direction for searching is better
for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
(*exists)[i] = bloomCheck(items[i], bf_data_list[ii]);
if ((*exists)[i]) break;
}
}

return rocksdb::Status::OK();
Expand Down
20 changes: 13 additions & 7 deletions src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ enum class BloomInfoType {
kExpansion,
};

enum class BloomFilterAddResult {
kOk,
kExist,
kFull,
};

struct BloomFilterInfo {
uint32_t capacity;
uint32_t bloom_bytes;
Expand All @@ -51,9 +57,10 @@ class BloomChain : public Database {
public:
BloomChain(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}
rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion);
rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status Exists(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<int> *rets);
rocksdb::Status Add(const Slice &user_key, const Slice &item, BloomFilterAddResult *ret);
rocksdb::Status MAdd(const Slice &user_key, const std::vector<Slice> &items, std::vector<BloomFilterAddResult> *rets);
rocksdb::Status Exists(const Slice &user_key, const Slice &item, bool *exist);
rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<bool> *exists);
rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);

private:
Expand All @@ -65,12 +72,11 @@ class BloomChain : public Database {
void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata *metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data);

rocksdb::Status getBFDataList(const std::vector<std::string> &bf_key_list, std::vector<std::string> *bf_data_list);

/// bf_data: [in/out] The content string of bloomfilter.
static void bloomAdd(const Slice &item, std::string *bf_data);

/// exists: [in/out] The items exist in bloomfilter already or not.
/// exists[i] is true means items[i] exists in other bloomfilter already, and it's not necessary to check in this
/// bloomfilter.
rocksdb::Status bloomCheck(const Slice &bf_key, const std::vector<Slice> &items, std::vector<bool> *exists);
static bool bloomCheck(const Slice &item, std::string &bf_data);
};
} // namespace redis
Loading