Skip to content

Commit

Permalink
use bloom filter, add event listener to collect stats of compaction, …
Browse files Browse the repository at this point in the history
…improve update rocksdb options (vesoft-inc#1959)

* open bloom filter in default conf, add event listener to collect stats of compaction

* imporve update rocksdb configs

* fix ut error

Co-authored-by: yaphet <darion.wang@vesoft.com>
  • Loading branch information
critical27 and yaphet authored Mar 27, 2020
1 parent b80a4f8 commit 322ab76
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 36 deletions.
6 changes: 5 additions & 1 deletion src/common/base/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

namespace nebula {

Configuration::Configuration() {
content_ = std::make_unique<folly::dynamic>(folly::dynamic::object());
}

Configuration::Configuration(folly::dynamic content) {
CHECK(content.isObject()) << "The content is not a valid configuration";
content_ = std::make_unique<folly::dynamic>(std::move(content));
Expand Down Expand Up @@ -160,7 +164,7 @@ Status Configuration::fetchAsSubConf(const char *key, Configuration &subconf) co
}


Status Configuration::updateStringField(const char* key, const std::string& val) {
Status Configuration::upsertStringField(const char* key, const std::string& val) {
DCHECK(content_ != nullptr);
auto iter = content_->find(key);
if (iter == content_->items().end() || iter->second.isString()) {
Expand Down
4 changes: 2 additions & 2 deletions src/common/base/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace nebula {

class Configuration final {
public:
Configuration() = default;
Configuration();
explicit Configuration(folly::dynamic content);

~Configuration() = default;
Expand Down Expand Up @@ -58,7 +58,7 @@ class Configuration final {

Status MUST_USE_RESULT fetchAsSubConf(const char *key, Configuration &val) const;

Status MUST_USE_RESULT updateStringField(const char* key, const std::string& val);
Status MUST_USE_RESULT upsertStringField(const char* key, const std::string& val);

// Iterate through every key in the configuration
Status forEachKey(std::function<void(const std::string&)> processor) const;
Expand Down
28 changes: 28 additions & 0 deletions src/kvstore/EventListner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/Base.h"
#include "rocksdb/db.h"
#include "rocksdb/listener.h"

namespace nebula {
namespace kvstore {

class EventListener : public rocksdb::EventListener {
public:
void OnCompactionCompleted(rocksdb::DB*, const rocksdb::CompactionJobInfo& info) override {
LOG(INFO) << "Rocksdb compact column family: " << info.cf_name
<< " because of " << static_cast<int32_t>(info.compaction_reason)
<< ", status: " << info.status.ToString()
<< ", compacted " << info.input_files.size()
<< " files into " << info.output_files.size()
<< ", base level is " << info.base_input_level
<< ", output level is " << info.output_level;
}
};

} // namespace kvstore
} // namespace nebula
2 changes: 2 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ ResultCode NebulaStore::compact(GraphSpaceID spaceId) {

auto code = ResultCode::SUCCEEDED;
std::vector<std::thread> threads;
LOG(INFO) << "Space " << spaceId << " start compaction.";
for (auto& engine : space->engines_) {
threads.emplace_back(std::thread([&engine, &code] {
auto ret = engine->compact();
Expand All @@ -640,6 +641,7 @@ ResultCode NebulaStore::compact(GraphSpaceID spaceId) {
for (auto& t : threads) {
t.join();
}
LOG(INFO) << "Space " << spaceId << " compaction done.";
return code;
}

Expand Down
9 changes: 1 addition & 8 deletions src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,19 @@ void MetaServerBasedPartManager::onSpaceOptionUpdated(
static std::unordered_set<std::string> supportedOpt = {
"disable_auto_compactions",
"max_write_buffer_number",
// TODO: write_buffer_size will cause rocksdb crash
// "write_buffer_size",
"compression",
"write_buffer_size",
"level0_file_num_compaction_trigger",
"level0_slowdown_writes_trigger",
"level0_stop_writes_trigger",
"target_file_size_base",
"target_file_size_multiplier",
"max_bytes_for_level_base",
"max_bytes_for_level_multiplier",
"ttl",
"block_size",
"block_restart_interval"
};
static std::unordered_set<std::string> supportedDbOpt = {
"max_total_wal_size",
"delete_obsolete_files_period_micros",
"max_background_jobs",
"base_background_compactions",
"max_background_compactions",
"stats_dump_period_sec",
"compaction_readahead_size",
"writable_file_max_buffer_size",
Expand Down
13 changes: 13 additions & 0 deletions src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

#include "base/Base.h"
#include "kvstore/RocksEngineConfig.h"
#include "kvstore/EventListner.h"
#include "rocksdb/db.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/utilities/options_util.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/filter_policy.h"
#include "base/Configuration.h"

// [WAL]
Expand Down Expand Up @@ -44,6 +46,7 @@ DEFINE_int32(rocksdb_batch_size,
DEFINE_int64(rocksdb_block_cache, 1024,
"The default block cache size used in BlockBasedTable. The unit is MB");

DEFINE_bool(enable_partitioned_index_filter, false, "True for partitioned index filters");

namespace nebula {
namespace kvstore {
Expand All @@ -62,6 +65,7 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options &baseOpts) {
if (!s.ok()) {
return s;
}
dbOpts.listeners.emplace_back(new EventListener());

std::unordered_map<std::string, std::string> cfOptsMap;
if (!loadOptionsMap(cfOptsMap, FLAGS_rocksdb_column_family_options)) {
Expand All @@ -87,6 +91,15 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options &baseOpts) {
static std::shared_ptr<rocksdb::Cache> blockCache
= rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024);
bbtOpts.block_cache = blockCache;
bbtOpts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));
if (FLAGS_enable_partitioned_index_filter) {
bbtOpts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
bbtOpts.partition_filters = true;
bbtOpts.cache_index_and_filter_blocks = true;
bbtOpts.cache_index_and_filter_blocks_with_high_priority = true;
bbtOpts.pin_l0_filter_and_index_blocks_in_cache =
baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel;
}
baseOpts.table_factory.reset(NewBlockBasedTableFactory(bbtOpts));
baseOpts.create_if_missing = true;
return s;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ DEFINE_uint64(raft_snapshot_timeout, 60 * 5, "Max seconds between two snapshot r

DEFINE_uint32(max_batch_size, 256, "The max number of logs in a batch");

DEFINE_int32(wal_ttl, 86400, "Default wal ttl");
DEFINE_int32(wal_ttl, 14400, "Default wal ttl");
DEFINE_int64(wal_file_size, 16 * 1024 * 1024, "Default wal file size");
DEFINE_int32(wal_buffer_size, 8 * 1024 * 1024, "Default wal buffer size");
DEFINE_int32(wal_buffer_num, 2, "Default wal buffer number");
Expand Down
18 changes: 6 additions & 12 deletions src/meta/processors/configMan/SetConfigProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ std::unordered_set<std::string> SetConfigProcessor::mutableFields_ = {
// rocksdb_column_family_options
"disable_auto_compactions",
// TODO: write_buffer_size will cause rocksdb crash
// "write_buffer_size",
"write_buffer_size",
"max_write_buffer_number",
"level0_file_num_compaction_trigger",
"level0_slowdown_writes_trigger",
Expand All @@ -23,14 +23,11 @@ std::unordered_set<std::string> SetConfigProcessor::mutableFields_ = {
"target_file_size_multiplier",
"max_bytes_for_level_base",
"max_bytes_for_level_multiplier",
"ttl",

// rocksdb_db_options
"max_total_wal_size",
"delete_obsolete_files_period_micros",
"max_background_jobs",
"base_background_compactions",
"max_background_compactions",
"stats_dump_period_sec",
"compaction_readahead_size",
"writable_file_max_buffer_size",
Expand Down Expand Up @@ -95,7 +92,6 @@ void SetConfigProcessor::process(const cpp2::SetConfigReq& req) {
doSyncPutAndUpdate(std::move(data));
return;
}
return;
} while (false);

handleErrorCode(code);
Expand Down Expand Up @@ -141,9 +137,6 @@ cpp2::ErrorCode SetConfigProcessor::setNestedConfig(const cpp2::ConfigModule& mo
}

Configuration conf;
auto confRet = conf.parseFromString(item.get_value());
CHECK(confRet.ok());

std::vector<std::string> updateFields;
folly::split(",", updateList, updateFields, true);
bool updated = false;
Expand All @@ -156,11 +149,12 @@ cpp2::ErrorCode SetConfigProcessor::setNestedConfig(const cpp2::ConfigModule& mo
auto key = field.substr(0, pos);
auto value = field.substr(pos + 1);
// TODO: Maybe need to handle illegal value here
if (mutableFields_.count(key) && conf.updateStringField(key.c_str(), value).ok()) {
if (!conf.upsertStringField(key.c_str(), value).ok()) {
LOG(ERROR) << "Update configs failed: " << key;
return cpp2::ErrorCode::E_UNSUPPORTED;
}
if (mutableFields_.count(key)) {
updated = true;
} else {
LOG(ERROR) << "Unsupported configs " << key;
return cpp2::ErrorCode::E_NOT_FOUND;
}
}

Expand Down
26 changes: 14 additions & 12 deletions src/meta/test/ConfigManTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ DEFINE_int64(int64_key, 101, "test");
DEFINE_bool(bool_key, false, "test");
DEFINE_double(double_key, 1.23, "test");
DEFINE_string(string_key, "something", "test");
DEFINE_string(nested_key, R"({"max_background_compactions":"4"})", "test");
DEFINE_string(nested_key, R"({"max_background_jobs":"4"})", "test");
DEFINE_string(test0, "v0", "test");
DEFINE_string(test1, "v1", "test");
DEFINE_string(test2, "v2", "test");
Expand Down Expand Up @@ -166,7 +166,7 @@ TEST(ConfigManTest, ConfigProcessorTest) {
item3.set_mode(cpp2::ConfigMode::MUTABLE);
// default value is a json string
std::string defaultValue = R"({
"max_background_compactions":"4"
"max_background_jobs":"4"
})";
item3.set_value(defaultValue);

Expand All @@ -190,7 +190,7 @@ TEST(ConfigManTest, ConfigProcessorTest) {
updated.set_type(cpp2::ConfigType::NESTED);
updated.set_mode(cpp2::ConfigMode::MUTABLE);
// update from consle as format of update list
updated.set_value("max_background_compactions=8,level0_file_num_compaction_trigger=10");
updated.set_value("max_background_jobs=8,level0_file_num_compaction_trigger=10");

cpp2::SetConfigReq req;
req.set_item(updated);
Expand Down Expand Up @@ -221,7 +221,7 @@ TEST(ConfigManTest, ConfigProcessorTest) {
ASSERT_TRUE(confRet.ok());

std::string val;
auto status = conf.fetchAsString("max_background_compactions", val);
auto status = conf.fetchAsString("max_background_jobs", val);
ASSERT_TRUE(status.ok());
ASSERT_EQ(val, "8");
status = conf.fetchAsString("level0_file_num_compaction_trigger", val);
Expand Down Expand Up @@ -423,10 +423,10 @@ TEST(ConfigManTest, MetaConfigManTest) {
{
std::string name = "nested_key";
auto type = cpp2::ConfigType::NESTED;
ASSERT_EQ(FLAGS_nested_key, R"({"max_background_compactions":"4"})");
ASSERT_EQ(FLAGS_nested_key, R"({"max_background_jobs":"4"})");

// update config
std::string newValue = "max_background_compactions=8";
std::string newValue = "max_background_jobs=8";
auto setRet = cfgMan.setConfig(module, name, type, newValue).get();
ASSERT_TRUE(setRet.ok());

Expand All @@ -440,15 +440,15 @@ TEST(ConfigManTest, MetaConfigManTest) {
auto confRet = conf.parseFromString(value);
ASSERT_TRUE(confRet.ok());
std::string val;
auto status = conf.fetchAsString("max_background_compactions", val);
auto status = conf.fetchAsString("max_background_jobs", val);
ASSERT_TRUE(status.ok());
ASSERT_EQ(val, "8");

// get from cache
sleep(FLAGS_heartbeat_interval_secs + 1);
confRet = conf.parseFromString(FLAGS_nested_key);
ASSERT_TRUE(confRet.ok());
status = conf.fetchAsString("max_background_compactions", val);
status = conf.fetchAsString("max_background_jobs", val);
ASSERT_TRUE(status.ok());
ASSERT_EQ(val, "8");
}
Expand Down Expand Up @@ -557,7 +557,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) {
{
std::vector<cpp2::ConfigItem> configItems;
FLAGS_rocksdb_db_options = R"({
"max_background_compactions":"4"
"max_background_jobs":"4"
})";
configItems.emplace_back(toThriftConfigItem(
module, "rocksdb_db_options", type,
Expand Down Expand Up @@ -586,7 +586,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) {
storage::TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 9);
{
std::string name = "rocksdb_db_options";
std::string updateValue = "max_background_compactions=10";
std::string updateValue = "max_background_jobs=10";
// update config
auto setRet = cfgMan.setConfig(module, name, type, updateValue).get();
ASSERT_TRUE(setRet.ok());
Expand All @@ -603,7 +603,8 @@ TEST(ConfigManTest, RocksdbOptionsTest) {
{
std::string name = "rocksdb_column_family_options";
std::string updateValue = "disable_auto_compactions=true,"
"level0_file_num_compaction_trigger=8";
"level0_file_num_compaction_trigger=8,"
"write_buffer_size=1048576";
// update config
auto setRet = cfgMan.setConfig(module, name, type, updateValue).get();
ASSERT_TRUE(setRet.ok());
Expand All @@ -627,9 +628,10 @@ TEST(ConfigManTest, RocksdbOptionsTest) {
rocksdb::Status status = rocksdb::LoadLatestOptions(rocksPath, rocksdb::Env::Default(),
&loadedDbOpt, &loadedCfDescs);
ASSERT_TRUE(status.ok());
EXPECT_EQ(10, loadedDbOpt.max_background_compactions);
EXPECT_EQ(10, loadedDbOpt.max_background_jobs);
EXPECT_EQ(true, loadedCfDescs[0].options.disable_auto_compactions);
EXPECT_EQ(8, loadedCfDescs[0].options.level0_file_num_compaction_trigger);
EXPECT_EQ(1048576, loadedCfDescs[0].options.write_buffer_size);
}
}

Expand Down

0 comments on commit 322ab76

Please sign in to comment.