Skip to content

Commit

Permalink
tools: Make FilterPolicy settable via URI/CreateFromString (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrambacher authored and Yuval-Ariel committed May 4, 2023
1 parent f0eeb55 commit 0586cb8
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 91 deletions.
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ DECLARE_bool(allow_concurrent_memtable_write);
DECLARE_double(experimental_mempurge_threshold);
DECLARE_bool(enable_write_thread_adaptive_yield);
DECLARE_int32(reopen);
DECLARE_string(filter_uri);
DECLARE_double(bloom_bits);
DECLARE_int32(ribbon_starting_level);
DECLARE_bool(partition_filters);
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ DEFINE_int32(reopen, 10, "Number of times database reopens");
static const bool FLAGS_reopen_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_reopen, &ValidateInt32Positive);

DEFINE_string(filter_uri, "", "Filter Policy URI");
DEFINE_double(bloom_bits, 10,
"Bloom filter bits per key. "
"Negative means use default settings.");
Expand Down
43 changes: 33 additions & 10 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,39 @@ namespace ROCKSDB_NAMESPACE {
namespace {

std::shared_ptr<const FilterPolicy> CreateFilterPolicy() {
if (FLAGS_bloom_bits < 0) {
if (!FLAGS_filter_uri.empty()) {
ConfigOptions config_options;
std::shared_ptr<const FilterPolicy> policy;
config_options.ignore_unsupported_options = false;
std::string bits_str;
if (FLAGS_bloom_bits > 0) {
bits_str = ":" + FormatDoubleParam(FLAGS_bloom_bits);
fprintf(stderr, "note: appending --bloom-bits (%f) to --filter-uri\n",
FLAGS_bloom_bits);
}
Status s = FilterPolicy::CreateFromString(
config_options, FLAGS_filter_uri + bits_str, &policy);
if (!s.ok() || !policy) {
fprintf(stderr, "Cannot create filter policy(%s%s): %s\n",
FLAGS_filter_uri.c_str(), bits_str.c_str(), s.ToString().c_str());
exit(1);
}
return policy;
} else if (FLAGS_bloom_bits < 0) {
return BlockBasedTableOptions().filter_policy;
}
const FilterPolicy* new_policy;
if (FLAGS_ribbon_starting_level >= 999) {
// Use Bloom API
new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, false);
} else {
new_policy = NewRibbonFilterPolicy(
FLAGS_bloom_bits, /* bloom_before_level */ FLAGS_ribbon_starting_level);
const FilterPolicy* new_policy;
if (FLAGS_ribbon_starting_level >= 999) {
// Use Bloom API
new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, false);
} else {
new_policy = NewRibbonFilterPolicy(
FLAGS_bloom_bits,
/* bloom_before_level */ FLAGS_ribbon_starting_level);
}
return std::shared_ptr<const FilterPolicy>(new_policy);
}
return std::shared_ptr<const FilterPolicy>(new_policy);
}

} // namespace

StressTest::StressTest()
Expand Down Expand Up @@ -2397,6 +2416,10 @@ void StressTest::PrintEnv() const {
FLAGS_file_checksum_impl.c_str());
fprintf(stdout, "Bloom bits / key : %s\n",
FormatDoubleParam(FLAGS_bloom_bits).c_str());
if (!FLAGS_filter_uri.empty()) {
fprintf(stdout, "Filter Policy : %s\n",
FLAGS_filter_uri.c_str());
}
fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
FLAGS_subcompactions);
fprintf(stdout, "Use MultiGet : %s\n",
Expand Down
28 changes: 24 additions & 4 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -739,9 +739,9 @@ DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024,
"Maximum write buffer for Writable File");

DEFINE_int32(bloom_bits, -1,
"Bloom filter bits per key. Negative means use default."
"Zero disables.");
DEFINE_double(bloom_bits, -1,
"Bloom filter bits per key. Negative means use default."
"Zero disables.");

DEFINE_bool(use_ribbon_filter, false, "Use Ribbon instead of Bloom filter");

Expand Down Expand Up @@ -1341,6 +1341,8 @@ static bool ValidateTableCacheNumshardbits(const char* flagname,
}
DEFINE_int32(table_cache_numshardbits, 4, "");

DEFINE_string(filter_uri, "", "URI for registry FilterPolicy");

DEFINE_string(env_uri, "",
"URI for registry Env lookup. Mutually exclusive with --fs_uri");
DEFINE_string(fs_uri, "",
Expand Down Expand Up @@ -4629,7 +4631,25 @@ class Benchmark {
// block cache, even with OPTIONS file provided.
table_options->block_cache = cache_;
}
if (table_options->filter_policy == nullptr) {
if (!FLAGS_filter_uri.empty()) {
std::string bits_str;
if (FLAGS_bloom_bits > 0) {
bits_str = ":" + std::to_string(FLAGS_bloom_bits);
fprintf(stderr, "note: appending --bloom-bits (%f) to --filter-uri\n",
FLAGS_bloom_bits);
}
ConfigOptions config_options;
config_options.ignore_unsupported_options = false;
Status s = FilterPolicy::CreateFromString(
config_options, FLAGS_filter_uri + bits_str,
&table_options->filter_policy);
if (!s.ok()) {
fprintf(stderr, "failure creating filter policy[%s%s]: %s\n",
FLAGS_filter_uri.c_str(), bits_str.c_str(),
s.ToString().c_str());
exit(1);
}
} else if (table_options->filter_policy == nullptr) {
if (FLAGS_bloom_bits < 0) {
table_options->filter_policy = BlockBasedTableOptions().filter_policy;
} else if (FLAGS_bloom_bits == 0) {
Expand Down
160 changes: 83 additions & 77 deletions util/filter_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ int main() {
#include <cinttypes>
#include <iostream>
#include <sstream>
#include <utility>
#include <vector>

#include "memory/arena.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/table.h"
Expand Down Expand Up @@ -83,10 +85,10 @@ DEFINE_bool(use_plain_table_bloom, false,
DEFINE_bool(new_builder, false,
"Whether to create a new builder for each new filter");

DEFINE_uint32(impl, 0,
DEFINE_string(impl, "0",
"Select filter implementation. Without -use_plain_table_bloom:"
"0 = legacy full Bloom filter, "
"1 = format_version 5 Bloom filter, 2 = Ribbon128 filter. With "
"1 = format_version 5 Bloom filter, 2 = Ribbon128 filter. "
"name and options of the filter to use. With "
"-use_plain_table_bloom: 0 = no locality, 1 = locality.");

DEFINE_bool(net_includes_hashing, false,
Expand Down Expand Up @@ -139,36 +141,7 @@ void _always_assert_fail(int line, const char *file, const char *expr) {
// accurate speed tests
#define PREDICT_FP_RATE
#endif

using ROCKSDB_NAMESPACE::Arena;
using ROCKSDB_NAMESPACE::BlockContents;
using ROCKSDB_NAMESPACE::BloomFilterPolicy;
using ROCKSDB_NAMESPACE::BloomHash;
using ROCKSDB_NAMESPACE::BloomLikeFilterPolicy;
using ROCKSDB_NAMESPACE::BuiltinFilterBitsBuilder;
using ROCKSDB_NAMESPACE::CachableEntry;
using ROCKSDB_NAMESPACE::Cache;
using ROCKSDB_NAMESPACE::CacheEntryRole;
using ROCKSDB_NAMESPACE::CacheEntryRoleOptions;
using ROCKSDB_NAMESPACE::EncodeFixed32;
using ROCKSDB_NAMESPACE::Env;
using ROCKSDB_NAMESPACE::FastRange32;
using ROCKSDB_NAMESPACE::FilterBitsReader;
using ROCKSDB_NAMESPACE::FilterBuildingContext;
using ROCKSDB_NAMESPACE::FilterPolicy;
using ROCKSDB_NAMESPACE::FullFilterBlockReader;
using ROCKSDB_NAMESPACE::GetSliceHash;
using ROCKSDB_NAMESPACE::GetSliceHash64;
using ROCKSDB_NAMESPACE::Lower32of64;
using ROCKSDB_NAMESPACE::LRUCacheOptions;
using ROCKSDB_NAMESPACE::ParsedFullFilterBlock;
using ROCKSDB_NAMESPACE::PlainTableBloomV1;
using ROCKSDB_NAMESPACE::Random32;
using ROCKSDB_NAMESPACE::Slice;
using ROCKSDB_NAMESPACE::static_cast_with_check;
using ROCKSDB_NAMESPACE::Status;
using ROCKSDB_NAMESPACE::StderrLogger;
using ROCKSDB_NAMESPACE::mock::MockBlockBasedTableTester;
namespace ROCKSDB_NAMESPACE {

struct KeyMaker {
KeyMaker(size_t avg_size)
Expand Down Expand Up @@ -209,17 +182,6 @@ struct KeyMaker {
}
};

void PrintWarnings() {
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
fprintf(stdout,
"WARNING: Optimization is disabled: benchmarks unnecessarily slow\n");
#endif
#ifndef NDEBUG
fprintf(stdout,
"WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
}

void PrintError(const char *error) { fprintf(stderr, "ERROR: %s\n", error); }

struct FilterInfo {
Expand Down Expand Up @@ -296,29 +258,22 @@ static uint32_t DryRunHash64(Slice &s) {
return Lower32of64(GetSliceHash64(s));
}

const std::shared_ptr<const FilterPolicy> &GetPolicy() {
static std::shared_ptr<const FilterPolicy> policy;
if (!policy) {
policy = BloomLikeFilterPolicy::Create(
BloomLikeFilterPolicy::GetAllFixedImpls().at(FLAGS_impl),
FLAGS_bits_per_key);
}
return policy;
}

struct FilterBench : public MockBlockBasedTableTester {
struct FilterBench : public mock::MockBlockBasedTableTester {
std::vector<KeyMaker> kms_;
std::vector<FilterInfo> infos_;
Random32 random_;
std::ostringstream fp_rate_report_;
Arena arena_;
double m_queries_;
StderrLogger stderr_logger_;
int filter_index_;

FilterBench()
: MockBlockBasedTableTester(GetPolicy()),
FilterBench(const std::shared_ptr<const FilterPolicy> &filter_policy,
int filter_index)
: MockBlockBasedTableTester(filter_policy),
random_(FLAGS_seed),
m_queries_(0) {
m_queries_(0),
filter_index_(filter_index) {
for (uint32_t i = 0; i < FLAGS_batch_size; ++i) {
kms_.emplace_back(FLAGS_key_size < 8 ? 8 : FLAGS_key_size);
}
Expand Down Expand Up @@ -354,17 +309,6 @@ void FilterBench::Go() {
throw std::runtime_error(
"Can't combine -use_plain_table_bloom and -use_full_block_reader");
}
if (FLAGS_use_plain_table_bloom) {
if (FLAGS_impl > 1) {
throw std::runtime_error(
"-impl must currently be >= 0 and <= 1 for Plain table");
}
} else {
if (FLAGS_impl > 2) {
throw std::runtime_error(
"-impl must currently be >= 0 and <= 2 for Block-based table");
}
}

if (FLAGS_vary_key_count_ratio < 0.0 || FLAGS_vary_key_count_ratio > 1.0) {
throw std::runtime_error("-vary_key_count_ratio must be >= 0.0 and <= 1.0");
Expand Down Expand Up @@ -395,7 +339,7 @@ void FilterBench::Go() {

std::unique_ptr<BuiltinFilterBitsBuilder> builder;

size_t total_memory_used = 0;
[[maybe_unused]] size_t total_memory_used = 0;
size_t total_size = 0;
size_t total_keys_added = 0;
#ifdef PREDICT_FP_RATE
Expand Down Expand Up @@ -432,7 +376,7 @@ void FilterBench::Go() {
info.plain_table_bloom_.reset(new PlainTableBloomV1());
info.plain_table_bloom_->SetTotalBits(
&arena_, static_cast<uint32_t>(keys_to_add * FLAGS_bits_per_key),
FLAGS_impl, 0 /*huge_page*/, nullptr /*logger*/);
filter_index_, 0 /*huge_page*/, nullptr /*logger*/);
for (uint32_t i = 0; i < keys_to_add; ++i) {
uint32_t hash = GetSliceHash(kms_[0].Get(filter_id, i));
info.plain_table_bloom_->AddHash(hash);
Expand Down Expand Up @@ -601,7 +545,8 @@ double FilterBench::RandomQueryTest(uint32_t inside_threshold, bool dry_run,

auto dry_run_hash_fn = DryRunNoHash;
if (!FLAGS_net_includes_hashing) {
if (FLAGS_impl == 0 || FLAGS_use_plain_table_bloom) {
if ((filter_index_ >= 0 && filter_index_ < 2) ||
FLAGS_use_plain_table_bloom) {
dry_run_hash_fn = DryRunHash32;
} else {
dry_run_hash_fn = DryRunHash64;
Expand Down Expand Up @@ -790,6 +735,19 @@ double FilterBench::RandomQueryTest(uint32_t inside_threshold, bool dry_run,
return ns;
}

} // namespace ROCKSDB_NAMESPACE

void PrintWarnings() {
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
fprintf(stdout,
"WARNING: Optimization is disabled: benchmarks unnecessarily slow\n");
#endif
#ifndef NDEBUG
fprintf(stdout,
"WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
}

int main(int argc, char **argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
Expand Down Expand Up @@ -825,13 +783,61 @@ int main(int argc, char **argv) {
<< " \"Skewed X% in Y%\" - like \"Random filter\" except Y% of"
<< "\n the filters are designated as \"hot\" and receive X%"
<< "\n of queries." << std::endl;
} else if (FLAGS_use_plain_table_bloom && FLAGS_use_full_block_reader) {
throw std::runtime_error(
"Can't combine -use_plain_table_bloom and -use_full_block_reader");
} else if (FLAGS_vary_key_count_ratio < 0.0 ||
FLAGS_vary_key_count_ratio > 1.0) {
throw std::runtime_error("-vary_key_count_ratio must be >= 0.0 and <= 1.0");
}
std::shared_ptr<const ROCKSDB_NAMESPACE::FilterPolicy> policy;

int bloom_idx = -1;
uint64_t id;
const auto &bloom_like_filters =
ROCKSDB_NAMESPACE::BloomLikeFilterPolicy::GetAllFixedImpls();
ROCKSDB_NAMESPACE::Slice impl(FLAGS_impl);
if (ROCKSDB_NAMESPACE::ConsumeDecimalNumber(&impl, &id) &&
id < bloom_like_filters.size() && impl.empty()) {
policy = ROCKSDB_NAMESPACE::BloomLikeFilterPolicy::Create(
bloom_like_filters.at(id), FLAGS_bits_per_key);
if (!policy) {
fprintf(stderr, "Failed to create BloomLikeFilterPolicy: %s\n",
FLAGS_impl.c_str());
exit(-1);
} else {
bloom_idx = static_cast<int>(id);
}
} else {
FilterBench b;
for (uint32_t i = 0; i < FLAGS_runs; ++i) {
b.Go();
FLAGS_seed += 100;
b.random_.Seed(FLAGS_seed);
ROCKSDB_NAMESPACE::ConfigOptions config_options;
config_options.ignore_unsupported_options = false;
std::string bits_str;
if (FLAGS_bits_per_key > 0) {
bits_str = ":" + std::to_string(FLAGS_bits_per_key);
}
auto s = ROCKSDB_NAMESPACE::FilterPolicy::CreateFromString(
config_options, FLAGS_impl + bits_str, &policy);
if (!s.ok() || !policy) {
fprintf(stderr, "Failed to create FilterPolicy[%s%s]: %s\n",
FLAGS_impl.c_str(), bits_str.c_str(), s.ToString().c_str());
exit(-1);
}
}
if (FLAGS_use_plain_table_bloom) {
if (bloom_idx < 0 || bloom_idx > 1) {
fprintf(stderr, "-impl must currently be 0 or 1 for Plain table");
exit(-1);
}
} else if (bloom_idx == 1) {
fprintf(stderr,
"Block-based filter not currently supported by filter_bench");
exit(-1);
}
ROCKSDB_NAMESPACE::FilterBench b(policy, bloom_idx);
for (uint32_t i = 0; i < FLAGS_runs; ++i) {
b.Go();
FLAGS_seed += 100;
b.random_.Seed(FLAGS_seed);
}

return 0;
Expand Down

0 comments on commit 0586cb8

Please sign in to comment.