Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/unstable' into store_key
Browse files Browse the repository at this point in the history
  • Loading branch information
enjoy-binbin committed Sep 12, 2023
2 parents 3f160c4 + 11a0140 commit f27eafb
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 24 deletions.
5 changes: 2 additions & 3 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,7 @@ class BloomChainMetadata : public Metadata {
using Metadata::Decode;
rocksdb::Status Decode(Slice *bytes) override;

/// Get the total capacity of the bloom chain (the sum capacity of all sub-filters)
///
/// @return the total capacity value
uint32_t GetCapacity() const;

bool IsScaling() const { return expansion != 0; };
};
4 changes: 3 additions & 1 deletion src/types/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ class BlockSplitBloomFilter {
/// Get the plain bitset value from the Bloom filter bitset.
///
/// @return bitset value;
const std::string& GetData() { return data_; }
const std::string& GetData() const& { return data_; }

std::string&& GetData() && { return std::move(data_); }

/// Compute hash for string value by using its plain encoding result.
///
Expand Down
60 changes: 41 additions & 19 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,43 @@ rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double error_r
block_split_bloom_filter.Init(metadata->bloom_bytes);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"createSBChain"});
WriteBatchLogData log_data(kRedisBloomFilter, {"createBloomChain"});
batch->PutLogData(log_data.Encode());

std::string sb_chain_meta_bytes;
metadata->Encode(&sb_chain_meta_bytes);
batch->Put(metadata_cf_handle_, ns_key, sb_chain_meta_bytes);
std::string bloom_chain_meta_bytes;
metadata->Encode(&bloom_chain_meta_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);

std::string bf_key = getBFKey(ns_key, *metadata, metadata->n_filters - 1);
batch->Put(bf_key, block_split_bloom_filter.GetData());

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status BloomChain::bloomAdd(const Slice &bf_key, const std::string &item) {
std::string bf_data;
rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), bf_key, &bf_data);
if (!s.ok()) return s;
void BloomChain::createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata *metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data) {
uint32_t bloom_filter_bytes = BlockSplitBloomFilter::OptimalNumOfBytes(
static_cast<uint32_t>(metadata->base_capacity * pow(metadata->expansion, metadata->n_filters)),
metadata->error_rate);
metadata->n_filters += 1;
metadata->bloom_bytes += bloom_filter_bytes;

BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(bf_data));
block_split_bloom_filter.Init(bloom_filter_bytes);
*bf_data = std::move(block_split_bloom_filter).GetData();

std::string bloom_chain_meta_bytes;
metadata->Encode(&bloom_chain_meta_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
}

void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(*bf_data));

uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
block_split_bloom_filter.InsertHash(h);
auto batch = storage_->GetWriteBatchBase();
batch->Put(bf_key, block_split_bloom_filter.GetData());
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
*bf_data = std::move(block_split_bloom_filter).GetData();
}

rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string &item, bool *exist) {
Expand Down Expand Up @@ -147,18 +159,28 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r

// insert
if (!exist) {
if (metadata.size + 1 > metadata.GetCapacity()) { // TODO: scaling would be supported later
return rocksdb::Status::Aborted("filter is full");
}
s = bloomAdd(bf_key_list.back(), item_string);
std::string bf_data;
s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;

if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data);
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
return rocksdb::Status::Aborted("filter is full and is nonscaling");
}
}
bloomAdd(item_string, &bf_data);
batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
}

std::string sb_chain_metadata_bytes;
metadata.Encode(&sb_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, sb_chain_metadata_bytes);
std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
Expand Down
4 changes: 3 additions & 1 deletion src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class BloomChain : public Database {
rocksdb::Status getBloomChainMetadata(const Slice &ns_key, BloomChainMetadata *metadata);
rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate, uint32_t capacity, uint16_t expansion,
BloomChainMetadata *metadata);
rocksdb::Status bloomAdd(const Slice &bf_key, const std::string &item);
void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata *metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data);
static void bloomAdd(const std::string &item, std::string *bf_data);
rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, bool *exist);
};
} // namespace redis
45 changes: 45 additions & 0 deletions tests/gocase/unit/type/bloom/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,56 @@ func TestBloom(t *testing.T) {
require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key, "items").Val())
})

t.Run("Bloom filter full and nonscaling", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", "50", "nonscaling").Err())

// insert items, suppose false positives is 0
for i := 0; i < 50; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, "items").Val())
require.ErrorContains(t, rdb.Do(ctx, "bf.add", key, "xxx").Err(), "filter is full and is nonscaling")
require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, "items").Val())
})

t.Run("Bloom filter full and scaling", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", "50", "expansion", "2").Err())

// insert items, suppose false positives is 0
for i := 0; i < 50; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, []interface{}{"Capacity", int64(50), "Size", int64(256), "Number of filters", int64(1), "Number of items inserted", int64(50), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// bloom filter is full and scaling
require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxx").Err())
require.Equal(t, []interface{}{"Capacity", int64(150), "Size", int64(768), "Number of filters", int64(2), "Number of items inserted", int64(51), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// insert items, suppose false positives is 0
for i := 0; i < 99; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, []interface{}{"Capacity", int64(150), "Size", int64(768), "Number of filters", int64(2), "Number of items inserted", int64(150), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// bloom filter is full and scaling
require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxxx").Err())
require.Equal(t, []interface{}{"Capacity", int64(350), "Size", int64(1792), "Number of filters", int64(3), "Number of items inserted", int64(151), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
})

t.Run("Get type of bloom filter", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", "1000").Err())
require.Equal(t, "MBbloom--", rdb.Type(ctx, key).Val())
})

// TODO: Add the testcase of get filters of bloom filter after complete the scaling.

}

0 comments on commit f27eafb

Please sign in to comment.