Skip to content

Commit

Permalink
Refactor BlockSplitBloomFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
zncleon committed Sep 16, 2023
1 parent 9fa5445 commit 36157cf
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 133 deletions.
49 changes: 19 additions & 30 deletions src/types/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
#include <cstdint>
#include <memory>

#include "glog/logging.h"
#include "xxh3.h"

BlockSplitBloomFilter::BlockSplitBloomFilter() = default;

void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
std::string BlockSplitBloomFilter::Init(uint32_t num_bytes) {
if (num_bytes < kMinimumBloomFilterBytes) {
num_bytes = kMinimumBloomFilterBytes;
}
Expand All @@ -38,38 +37,21 @@ void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
num_bytes = kMaximumBloomFilterBytes;
}

num_bytes_ = num_bytes;
data_.resize(num_bytes_, 0);
}

bool BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
if (num_bytes < kMinimumBloomFilterBytes || num_bytes > kMaximumBloomFilterBytes ||
(num_bytes & (num_bytes - 1)) != 0) {
return false;
}

num_bytes_ = num_bytes;
data_ = {reinterpret_cast<const char*>(bitset), num_bytes};
return true;
std::string bitset;
bitset.resize(num_bytes, 0);
return bitset;
}

bool BlockSplitBloomFilter::Init(std::string bitset) {
bool BlockSplitBloomFilter::FindHash(uint64_t hash, const std::string& bitset) {
if (bitset.size() < kMinimumBloomFilterBytes || bitset.size() > kMaximumBloomFilterBytes ||
(bitset.size() & (bitset.size() - 1)) != 0) {
LOG(ERROR) << "bitset size error, you should get bitset from Init";
return false;
}

num_bytes_ = bitset.size();
data_ = std::move(bitset);
return true;
}

static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;

bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
const auto bucket_index = static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32);
const auto bucket_index = static_cast<uint32_t>(((hash >> 32) * (bitset.size() / kBytesPerFilterBlock)) >> 32);
const auto key = static_cast<uint32_t>(hash);
const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_.data());
const auto* bitset32 = reinterpret_cast<const uint32_t*>(bitset.data());

for (int i = 0; i < kBitsSetPerBlock; ++i) {
// Calculate mask for key in the given bitset.
Expand All @@ -81,16 +63,23 @@ bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
return true;
}

void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
const auto bucket_index = static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32);
bool BlockSplitBloomFilter::InsertHash(uint64_t hash, std::string* bitset) {
if (bitset->size() < kMinimumBloomFilterBytes || bitset->size() > kMaximumBloomFilterBytes ||
(bitset->size() & (bitset->size() - 1)) != 0) {
LOG(ERROR) << "bitset size error, you should get bitset from Init";
return false;
}

const auto bucket_index = static_cast<uint32_t>(((hash >> 32) * (bitset->size() / kBytesPerFilterBlock)) >> 32);
const auto key = static_cast<uint32_t>(hash);
auto* bitset32 = reinterpret_cast<uint32_t*>(data_.data());
auto* bitset32 = reinterpret_cast<uint32_t*>(bitset->data());

for (int i = 0; i < kBitsSetPerBlock; i++) {
// Calculate mask for key in the given bitset.
const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
bitset32[bucket_index * kBitsSetPerBlock + i] |= mask;
}
return true;
}

uint64_t BlockSplitBloomFilter::Hash(const char* data, size_t length) { return XXH64(data, length, /*seed=*/0); }
69 changes: 19 additions & 50 deletions src/types/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ constexpr bool IsMultipleOf64(int64_t n) { return (n & 63) == 0; }

constexpr bool IsMultipleOf8(int64_t n) { return (n & 7) == 0; }

// Maximum Bloom filter size, it sets to HDFS default block size 128MB
// This value will be reconsidered when implementing Bloom filter producer.
static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024;

/// The BlockSplitBloomFilter is implemented using block-based Bloom filters from
/// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic idea is to
/// hash the item to a tiny Bloom filter which size fit a single cache line or smaller.
Expand All @@ -54,38 +50,23 @@ static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024;
/// filter is 32 bytes to take advantage of 32-byte SIMD instructions.
class BlockSplitBloomFilter {
public:
/// The constructor of BlockSplitBloomFilter. It uses XXH64 as hash function.
BlockSplitBloomFilter();
// Maximum Bloom filter size, it sets to HDFS default block size 128MB
// This value will be reconsidered when implementing Bloom filter producer.
static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024;

// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter.
static constexpr uint32_t kMinimumBloomFilterBytes = 32;

BlockSplitBloomFilter() = delete;

/// Initialize the BlockSplitBloomFilter. The range of num_bytes should be within
/// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes], it will be
/// rounded up/down to lower/upper bound if num_bytes is out of range and also
/// will be rounded up to a power of 2.
///
/// @param num_bytes The number of bytes to store Bloom filter bitset.
void Init(uint32_t num_bytes);

/// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
/// bitset because the given bitset may not satisfy the 32-byte alignment requirement
/// which may lead to segfault when performing SIMD instructions. It is the caller's
/// responsibility to free the bitset passed in.
///
/// @param bitset The given bitset to initialize the Bloom filter.
/// @param num_bytes The number of bytes of given bitset.
/// @return false if the number of bytes of Bloom filter bitset is not a power of 2, and true means successfully init
bool Init(const uint8_t* bitset, uint32_t num_bytes);

/// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
/// bitset because the given bitset may not satisfy the 32-byte alignment requirement
/// which may lead to segfault when performing SIMD instructions. It is the caller's
/// responsibility to free the bitset passed in.
///
/// @param bitset The given bitset to initialize the Bloom filter.
/// @return false if the number of bytes of Bloom filter bitset is not a power of 2, and true means successfully init
bool Init(std::string bitset);

/// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter.
static constexpr uint32_t kMinimumBloomFilterBytes = 32;
/// @return The underlying bitset of the bloomfilter.
static std::string Init(uint32_t num_bytes);

/// Calculate optimal size according to the number of distinct values and false
/// positive probability.
Expand All @@ -96,7 +77,6 @@ class BlockSplitBloomFilter {
/// kMaximumBloomFilterBytes, and the return value is always a power of 2
static uint32_t OptimalNumOfBytes(uint32_t ndv, double fpp) {
uint32_t optimal_num_of_bits = OptimalNumOfBits(ndv, fpp);
// CHECK(IsMultipleOf8(optimal_num_of_bits));
return optimal_num_of_bits >> 3;
}

Expand All @@ -108,7 +88,6 @@ class BlockSplitBloomFilter {
/// @return it always return a value between kMinimumBloomFilterBytes * 8 and
/// kMaximumBloomFilterBytes * 8, and the return value is always a power of 16
static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
// CHECK(fpp > 0.0 && fpp < 1.0);
const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
uint32_t num_bits = 0;

Expand Down Expand Up @@ -140,23 +119,19 @@ class BlockSplitBloomFilter {
/// Determine whether an element exist in set or not.
///
/// @param hash the element to contain.
/// @return false if value is definitely not in set, and true means PROBABLY
/// in set.
bool FindHash(uint64_t hash) const;
/// @param bitset the underlying bitset of the bloomfilter. The size of bitset should be within
/// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes] and also a power of 2
/// @return false if the size of bitset is not meet the requirement or if value is definitely not in set, and true
/// means PROBABLY in set.
static bool FindHash(uint64_t hash, const std::string& bitset);

/// Insert element to set represented by Bloom filter bitset.
///
/// @param hash the hash of value to insert into Bloom filter.
void InsertHash(uint64_t hash);

uint32_t GetBitsetSize() const { return num_bytes_; }

/// Get the plain bitset value from the Bloom filter bitset.
///
/// @return bitset value;
const std::string& GetData() const& { return data_; }

std::string&& GetData() && { return std::move(data_); }
/// @param bitset the underlying bitset of the bloomfilter. The size of bitset should be within
/// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes] and also a power of 2
/// @return false if the size of bitset is not meet the requirement
static bool InsertHash(uint64_t hash, std::string* bitset);

/// Compute hash for string value by using its plain encoding result.
///
Expand All @@ -176,10 +151,4 @@ class BlockSplitBloomFilter {
// of bit to set, one bit in each 32-bit word.
static constexpr uint32_t SALT[kBitsSetPerBlock] = {0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};

// The underlying buffer of bitset.
std::string data_;

// The number of bytes of Bloom filter bitset.
uint32_t num_bytes_;
};
22 changes: 5 additions & 17 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double error_r
metadata->base_capacity = capacity;
metadata->bloom_bytes = BlockSplitBloomFilter::OptimalNumOfBytes(capacity, error_rate);

BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(metadata->bloom_bytes);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"createBloomChain"});
batch->PutLogData(log_data.Encode());
Expand All @@ -64,7 +61,7 @@ rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double error_r
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);

std::string bf_key = getBFKey(ns_key, *metadata, metadata->n_filters - 1);
batch->Put(bf_key, block_split_bloom_filter.GetData());
batch->Put(bf_key, BlockSplitBloomFilter::Init(metadata->bloom_bytes));

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
Expand All @@ -77,9 +74,7 @@ void BloomChain::createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadat
metadata->n_filters += 1;
metadata->bloom_bytes += bloom_filter_bytes;

BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(bloom_filter_bytes);
*bf_data = std::move(block_split_bloom_filter).GetData();
*bf_data = BlockSplitBloomFilter::Init(bloom_filter_bytes);

std::string bloom_chain_meta_bytes;
metadata->Encode(&bloom_chain_meta_bytes);
Expand All @@ -103,20 +98,13 @@ rocksdb::Status BloomChain::getBFDataList(const std::vector<std::string> &bf_key
}

void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(*bf_data));

uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
block_split_bloom_filter.InsertHash(h);
*bf_data = std::move(block_split_bloom_filter).GetData();
BlockSplitBloomFilter::InsertHash(h, bf_data);
}

bool BloomChain::bloomCheck(const Slice &item, std::string &bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(bf_data);

bool BloomChain::bloomCheck(const Slice &item, const std::string &bf_data) {
uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
return block_split_bloom_filter.FindHash(h);
return BlockSplitBloomFilter::FindHash(h, bf_data);
}

rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion) {
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ class BloomChain : public Database {
/// bf_data: [in/out] The content string of bloomfilter.
static void bloomAdd(const Slice &item, std::string *bf_data);

static bool bloomCheck(const Slice &item, std::string &bf_data);
static bool bloomCheck(const Slice &item, const std::string &bf_data);
};
} // namespace redis
60 changes: 25 additions & 35 deletions tests/cppunit/types/bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@
namespace test {

TEST(ConstructorTest, TestBloomFilter) {
BlockSplitBloomFilter bloom_filter;

// It return false because the number of bytes of Bloom filter bitset must be a power of 2.
std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]());
EXPECT_FALSE(bloom_filter.Init(bitset1.get(), 1023));
std::string bitset1(1023, ' ');
EXPECT_FALSE(BlockSplitBloomFilter::FindHash(0, bitset1));

// It return false because the number of bytes of Bloom filter bitset must be a power of 2.
std::string bitset2(1022, 's');
EXPECT_FALSE(bloom_filter.Init(bitset2));
EXPECT_FALSE(BlockSplitBloomFilter::InsertHash(0, &bitset2));

std::string bitset3(1024, 's');
EXPECT_TRUE(BlockSplitBloomFilter::InsertHash(0, &bitset3));
}

// The BasicTest is used to test basic operations including InsertHash, FindHash and
Expand All @@ -51,47 +52,33 @@ TEST(BasicTest, TestBloomFilter) {
"-8", "b", "acb", "tyuio", "trewq"};

for (const auto bloom_filter_bytes : kBloomFilterSizes) {
BlockSplitBloomFilter bloom_filter;
bloom_filter.Init(bloom_filter_bytes);
std::string bloom_filter_bitset = BlockSplitBloomFilter::Init(bloom_filter_bytes);

// Empty bloom filter deterministically returns false
for (const auto& v : kStringInserts) {
EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(v.data(), v.size())));
EXPECT_FALSE(
BlockSplitBloomFilter::FindHash(BlockSplitBloomFilter::Hash(v.data(), v.size()), bloom_filter_bitset));
}

// Insert all values
for (const auto& v : kStringInserts) {
bloom_filter.InsertHash(bloom_filter.Hash(v.data(), v.size()));
BlockSplitBloomFilter::InsertHash(BlockSplitBloomFilter::Hash(v.data(), v.size()), &bloom_filter_bitset);
}

// They should always lookup successfully
// They should always look up successfully
for (const auto& v : kStringInserts) {
EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(v.data(), v.size())));
EXPECT_TRUE(
BlockSplitBloomFilter::FindHash(BlockSplitBloomFilter::Hash(v.data(), v.size()), bloom_filter_bitset));
}

// Values not inserted in the filter should only rarely lookup successfully
int false_positives = 0;
for (const auto& v : kStringLookups) {
false_positives += bloom_filter.FindHash(bloom_filter.Hash(v.data(), v.size()));
false_positives +=
BlockSplitBloomFilter::FindHash(BlockSplitBloomFilter::Hash(v.data(), v.size()), bloom_filter_bitset);
}
// (this is a crude check, see FPPTest below for a more rigorous formula)
EXPECT_LE(false_positives, 2);

// Serialize Bloom filter to string bitset
std::string data_saved = bloom_filter.GetData();
// ReBuild Bloom filter from string bitset
BlockSplitBloomFilter bloom_filter_new;
bloom_filter_new.Init(data_saved);

// Lookup previously inserted values
for (const auto& v : kStringInserts) {
EXPECT_TRUE(bloom_filter_new.FindHash(bloom_filter_new.Hash(v.data(), v.size())));
}
false_positives = 0;
for (const auto& v : kStringLookups) {
false_positives += bloom_filter_new.FindHash(bloom_filter_new.Hash(v.data(), v.size()));
}
EXPECT_LE(false_positives, 2);
}
}

Expand Down Expand Up @@ -120,22 +107,23 @@ TEST(FPPTest, TestBloomFilter) {
const double fpp = 0.01;

std::vector<std::string> members;
BlockSplitBloomFilter bloom_filter;
bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBytes(total_count, fpp));
std::string bloom_filter_bitset =
BlockSplitBloomFilter::Init(BlockSplitBloomFilter::OptimalNumOfBytes(total_count, fpp));

// Insert elements into the Bloom filter
for (int i = 0; i < total_count; i++) {
// Insert random string which length is 8
std::string tmp = GetRandomString(8);
members.push_back(tmp);
bloom_filter.InsertHash(bloom_filter.Hash(tmp.data(), tmp.size()));
BlockSplitBloomFilter::InsertHash(BlockSplitBloomFilter::Hash(tmp.data(), tmp.size()), &bloom_filter_bitset);
}

for (int i = 0; i < total_count; i++) {
ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(members[i].data(), members[i].size())));
ASSERT_TRUE(BlockSplitBloomFilter::FindHash(BlockSplitBloomFilter::Hash(members[i].data(), members[i].size()),
bloom_filter_bitset));
std::string tmp = GetRandomString(7);

if (bloom_filter.FindHash(bloom_filter.Hash(tmp.data(), tmp.size()))) {
if (BlockSplitBloomFilter::FindHash(BlockSplitBloomFilter::Hash(tmp.data(), tmp.size()), bloom_filter_bitset)) {
exist++;
}
}
Expand Down Expand Up @@ -180,8 +168,10 @@ TEST(OptimalValueTest, TestBloomFilter) {
test_optimal_num_estimation(4, 0.01, BlockSplitBloomFilter::kMinimumBloomFilterBytes * 8);
test_optimal_num_estimation(4, 0.25, BlockSplitBloomFilter::kMinimumBloomFilterBytes * 8);

test_optimal_num_estimation(std::numeric_limits<uint32_t>::max(), 0.01, kMaximumBloomFilterBytes * 8);
test_optimal_num_estimation(std::numeric_limits<uint32_t>::max(), 0.25, kMaximumBloomFilterBytes * 8);
test_optimal_num_estimation(std::numeric_limits<uint32_t>::max(), 0.01,
BlockSplitBloomFilter::kMaximumBloomFilterBytes * 8);
test_optimal_num_estimation(std::numeric_limits<uint32_t>::max(), 0.25,
BlockSplitBloomFilter::kMaximumBloomFilterBytes * 8);
}

} // namespace test

0 comments on commit 36157cf

Please sign in to comment.