Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use bloom filter, add event listener to collect stats of compaction, improve update rocksdb options #1959

Merged
merged 4 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/common/base/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

namespace nebula {

Configuration::Configuration() {
content_ = std::make_unique<folly::dynamic>(folly::dynamic::object());
}

Configuration::Configuration(folly::dynamic content) {
CHECK(content.isObject()) << "The content is not a valid configuration";
content_ = std::make_unique<folly::dynamic>(std::move(content));
Expand Down Expand Up @@ -160,7 +164,7 @@ Status Configuration::fetchAsSubConf(const char *key, Configuration &subconf) co
}


Status Configuration::updateStringField(const char* key, const std::string& val) {
Status Configuration::upsertStringField(const char* key, const std::string& val) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question, Why naming upsert?
What I understand that upsert is : should be override the old value if key exists; should be insert a new value if the key doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, so we can insert some option which doesn't exists, and update some option with a new value. The function actually is a upsert, a bad name.

DCHECK(content_ != nullptr);
auto iter = content_->find(key);
if (iter == content_->items().end() || iter->second.isString()) {
Expand Down
4 changes: 2 additions & 2 deletions src/common/base/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace nebula {

class Configuration final {
public:
Configuration() = default;
Configuration();
explicit Configuration(folly::dynamic content);

~Configuration() = default;
Expand Down Expand Up @@ -58,7 +58,7 @@ class Configuration final {

Status MUST_USE_RESULT fetchAsSubConf(const char *key, Configuration &val) const;

Status MUST_USE_RESULT updateStringField(const char* key, const std::string& val);
Status MUST_USE_RESULT upsertStringField(const char* key, const std::string& val);

// Iterate through every key in the configuration
Status forEachKey(std::function<void(const std::string&)> processor) const;
Expand Down
28 changes: 28 additions & 0 deletions src/kvstore/EventListner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/Base.h"
#include "rocksdb/db.h"
#include "rocksdb/listener.h"

namespace nebula {
namespace kvstore {

class EventListener : public rocksdb::EventListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Job

public:
void OnCompactionCompleted(rocksdb::DB*, const rocksdb::CompactionJobInfo& info) override {
LOG(INFO) << "Rocksdb compact column family: " << info.cf_name
<< " because of " << static_cast<int32_t>(info.compaction_reason)
<< ", status: " << info.status.ToString()
<< ", compacted " << info.input_files.size()
<< " files into " << info.output_files.size()
<< ", base level is " << info.base_input_level
<< ", output level is " << info.output_level;
}
};

} // namespace kvstore
} // namespace nebula
2 changes: 2 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ ResultCode NebulaStore::compact(GraphSpaceID spaceId) {

auto code = ResultCode::SUCCEEDED;
std::vector<std::thread> threads;
LOG(INFO) << "Space " << spaceId << " start compaction.";
for (auto& engine : space->engines_) {
threads.emplace_back(std::thread([&engine, &code] {
auto ret = engine->compact();
Expand All @@ -640,6 +641,7 @@ ResultCode NebulaStore::compact(GraphSpaceID spaceId) {
for (auto& t : threads) {
t.join();
}
LOG(INFO) << "Space " << spaceId << " compaction done.";
return code;
}

Expand Down
9 changes: 1 addition & 8 deletions src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,19 @@ void MetaServerBasedPartManager::onSpaceOptionUpdated(
static std::unordered_set<std::string> supportedOpt = {
"disable_auto_compactions",
"max_write_buffer_number",
// TODO: write_buffer_size will cause rocksdb crash
// "write_buffer_size",
"compression",
"write_buffer_size",
"level0_file_num_compaction_trigger",
"level0_slowdown_writes_trigger",
"level0_stop_writes_trigger",
"target_file_size_base",
"target_file_size_multiplier",
"max_bytes_for_level_base",
"max_bytes_for_level_multiplier",
"ttl",
"block_size",
"block_restart_interval"
};
static std::unordered_set<std::string> supportedDbOpt = {
"max_total_wal_size",
"delete_obsolete_files_period_micros",
"max_background_jobs",
Copy link
Contributor

@dangleptr dangleptr Mar 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support max_subcompactions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can't be changed dynamically, for those options, it would take effect when reboot.

"base_background_compactions",
"max_background_compactions",
"stats_dump_period_sec",
"compaction_readahead_size",
"writable_file_max_buffer_size",
Expand Down
13 changes: 13 additions & 0 deletions src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

#include "base/Base.h"
#include "kvstore/RocksEngineConfig.h"
#include "kvstore/EventListner.h"
#include "rocksdb/db.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/utilities/options_util.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/filter_policy.h"
#include "base/Configuration.h"

// [WAL]
Expand Down Expand Up @@ -44,6 +46,7 @@ DEFINE_int32(rocksdb_batch_size,
DEFINE_int64(rocksdb_block_cache, 1024,
"The default block cache size used in BlockBasedTable. The unit is MB");

DEFINE_bool(enable_partitioned_index_filter, false, "True for partitioned index filters");

namespace nebula {
namespace kvstore {
Expand All @@ -62,6 +65,7 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options &baseOpts) {
if (!s.ok()) {
return s;
}
dbOpts.listeners.emplace_back(new EventListener());

std::unordered_map<std::string, std::string> cfOptsMap;
if (!loadOptionsMap(cfOptsMap, FLAGS_rocksdb_column_family_options)) {
Expand All @@ -87,6 +91,15 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options &baseOpts) {
static std::shared_ptr<rocksdb::Cache> blockCache
= rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024);
bbtOpts.block_cache = blockCache;
bbtOpts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));
if (FLAGS_enable_partitioned_index_filter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about add prefix bloom filter? for the prefix 12bytes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Later I will check if it improves perf.

bbtOpts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
bbtOpts.partition_filters = true;
bbtOpts.cache_index_and_filter_blocks = true;
bbtOpts.cache_index_and_filter_blocks_with_high_priority = true;
bbtOpts.pin_l0_filter_and_index_blocks_in_cache =
baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel;
}
baseOpts.table_factory.reset(NewBlockBasedTableFactory(bbtOpts));
baseOpts.create_if_missing = true;
return s;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ DEFINE_uint64(raft_snapshot_timeout, 60 * 5, "Max seconds between two snapshot r

DEFINE_uint32(max_batch_size, 256, "The max number of logs in a batch");

DEFINE_int32(wal_ttl, 86400, "Default wal ttl");
DEFINE_int32(wal_ttl, 14400, "Default wal ttl");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current is 4 hour ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current is 1 day, @whitewum suggest change it shorter.

DEFINE_int64(wal_file_size, 16 * 1024 * 1024, "Default wal file size");
DEFINE_int32(wal_buffer_size, 8 * 1024 * 1024, "Default wal buffer size");
DEFINE_int32(wal_buffer_num, 2, "Default wal buffer number");
Expand Down
18 changes: 6 additions & 12 deletions src/meta/processors/configMan/SetConfigProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ std::unordered_set<std::string> SetConfigProcessor::mutableFields_ = {
// rocksdb_column_family_options
"disable_auto_compactions",
// TODO: write_buffer_size will cause rocksdb crash
// "write_buffer_size",
"write_buffer_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"write_buffer_size" works well now ? if yes , do we need to uncomment in MetaServerBasedPartManager::onSpaceOptionUpdated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"write_buffer_size" works well now ? if yes , do we need to uncomment in MetaServerBasedPartManager::onSpaceOptionUpdated?

Sorry, the comment has been removed. I'm wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to remove the TODO : )

"max_write_buffer_number",
"level0_file_num_compaction_trigger",
"level0_slowdown_writes_trigger",
Expand All @@ -23,14 +23,11 @@ std::unordered_set<std::string> SetConfigProcessor::mutableFields_ = {
"target_file_size_multiplier",
"max_bytes_for_level_base",
"max_bytes_for_level_multiplier",
"ttl",

// rocksdb_db_options
"max_total_wal_size",
"delete_obsolete_files_period_micros",
"max_background_jobs",
"base_background_compactions",
"max_background_compactions",
"stats_dump_period_sec",
"compaction_readahead_size",
"writable_file_max_buffer_size",
Expand Down Expand Up @@ -95,7 +92,6 @@ void SetConfigProcessor::process(const cpp2::SetConfigReq& req) {
doSyncPutAndUpdate(std::move(data));
return;
}
return;
} while (false);

handleErrorCode(code);
Expand Down Expand Up @@ -141,9 +137,6 @@ cpp2::ErrorCode SetConfigProcessor::setNestedConfig(const cpp2::ConfigModule& mo
}

Configuration conf;
auto confRet = conf.parseFromString(item.get_value());
CHECK(confRet.ok());

std::vector<std::string> updateFields;
folly::split(",", updateList, updateFields, true);
bool updated = false;
Expand All @@ -156,11 +149,12 @@ cpp2::ErrorCode SetConfigProcessor::setNestedConfig(const cpp2::ConfigModule& mo
auto key = field.substr(0, pos);
auto value = field.substr(pos + 1);
// TODO: Maybe need to handle illegal value here
if (mutableFields_.count(key) && conf.updateStringField(key.c_str(), value).ok()) {
if (!conf.upsertStringField(key.c_str(), value).ok()) {
LOG(ERROR) << "Update configs failed: " << key;
return cpp2::ErrorCode::E_UNSUPPORTED;
}
if (mutableFields_.count(key)) {
updated = true;
} else {
LOG(ERROR) << "Unsupported configs " << key;
return cpp2::ErrorCode::E_NOT_FOUND;
}
}

Expand Down
26 changes: 14 additions & 12 deletions src/meta/test/ConfigManTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ DEFINE_int64(int64_key, 101, "test");
DEFINE_bool(bool_key, false, "test");
DEFINE_double(double_key, 1.23, "test");
DEFINE_string(string_key, "something", "test");
DEFINE_string(nested_key, R"({"max_background_compactions":"4"})", "test");
DEFINE_string(nested_key, R"({"max_background_jobs":"4"})", "test");
DEFINE_string(test0, "v0", "test");
DEFINE_string(test1, "v1", "test");
DEFINE_string(test2, "v2", "test");
Expand Down Expand Up @@ -166,7 +166,7 @@ TEST(ConfigManTest, ConfigProcessorTest) {
item3.set_mode(cpp2::ConfigMode::MUTABLE);
// default value is a json string
std::string defaultValue = R"({
"max_background_compactions":"4"
"max_background_jobs":"4"
})";
item3.set_value(defaultValue);

Expand All @@ -190,7 +190,7 @@ TEST(ConfigManTest, ConfigProcessorTest) {
updated.set_type(cpp2::ConfigType::NESTED);
updated.set_mode(cpp2::ConfigMode::MUTABLE);
// update from consle as format of update list
updated.set_value("max_background_compactions=8,level0_file_num_compaction_trigger=10");
updated.set_value("max_background_jobs=8,level0_file_num_compaction_trigger=10");

cpp2::SetConfigReq req;
req.set_item(updated);
Expand Down Expand Up @@ -221,7 +221,7 @@ TEST(ConfigManTest, ConfigProcessorTest) {
ASSERT_TRUE(confRet.ok());

std::string val;
auto status = conf.fetchAsString("max_background_compactions", val);
auto status = conf.fetchAsString("max_background_jobs", val);
ASSERT_TRUE(status.ok());
ASSERT_EQ(val, "8");
status = conf.fetchAsString("level0_file_num_compaction_trigger", val);
Expand Down Expand Up @@ -423,10 +423,10 @@ TEST(ConfigManTest, MetaConfigManTest) {
{
std::string name = "nested_key";
auto type = cpp2::ConfigType::NESTED;
ASSERT_EQ(FLAGS_nested_key, R"({"max_background_compactions":"4"})");
ASSERT_EQ(FLAGS_nested_key, R"({"max_background_jobs":"4"})");

// update config
std::string newValue = "max_background_compactions=8";
std::string newValue = "max_background_jobs=8";
auto setRet = cfgMan.setConfig(module, name, type, newValue).get();
ASSERT_TRUE(setRet.ok());

Expand All @@ -440,15 +440,15 @@ TEST(ConfigManTest, MetaConfigManTest) {
auto confRet = conf.parseFromString(value);
ASSERT_TRUE(confRet.ok());
std::string val;
auto status = conf.fetchAsString("max_background_compactions", val);
auto status = conf.fetchAsString("max_background_jobs", val);
ASSERT_TRUE(status.ok());
ASSERT_EQ(val, "8");

// get from cache
sleep(FLAGS_heartbeat_interval_secs + 1);
confRet = conf.parseFromString(FLAGS_nested_key);
ASSERT_TRUE(confRet.ok());
status = conf.fetchAsString("max_background_compactions", val);
status = conf.fetchAsString("max_background_jobs", val);
ASSERT_TRUE(status.ok());
ASSERT_EQ(val, "8");
}
Expand Down Expand Up @@ -557,7 +557,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) {
{
std::vector<cpp2::ConfigItem> configItems;
FLAGS_rocksdb_db_options = R"({
"max_background_compactions":"4"
"max_background_jobs":"4"
})";
configItems.emplace_back(toThriftConfigItem(
module, "rocksdb_db_options", type,
Expand Down Expand Up @@ -586,7 +586,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) {
storage::TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 9);
{
std::string name = "rocksdb_db_options";
std::string updateValue = "max_background_compactions=10";
std::string updateValue = "max_background_jobs=10";
// update config
auto setRet = cfgMan.setConfig(module, name, type, updateValue).get();
ASSERT_TRUE(setRet.ok());
Expand All @@ -603,7 +603,8 @@ TEST(ConfigManTest, RocksdbOptionsTest) {
{
std::string name = "rocksdb_column_family_options";
std::string updateValue = "disable_auto_compactions=true,"
"level0_file_num_compaction_trigger=8";
"level0_file_num_compaction_trigger=8,"
"write_buffer_size=1048576";
// update config
auto setRet = cfgMan.setConfig(module, name, type, updateValue).get();
ASSERT_TRUE(setRet.ok());
Expand All @@ -627,9 +628,10 @@ TEST(ConfigManTest, RocksdbOptionsTest) {
rocksdb::Status status = rocksdb::LoadLatestOptions(rocksPath, rocksdb::Env::Default(),
&loadedDbOpt, &loadedCfDescs);
ASSERT_TRUE(status.ok());
EXPECT_EQ(10, loadedDbOpt.max_background_compactions);
EXPECT_EQ(10, loadedDbOpt.max_background_jobs);
EXPECT_EQ(true, loadedCfDescs[0].options.disable_auto_compactions);
EXPECT_EQ(8, loadedCfDescs[0].options.level0_file_num_compaction_trigger);
EXPECT_EQ(1048576, loadedCfDescs[0].options.write_buffer_size);
}
}

Expand Down