diff --git a/src/common/base/Configuration.cpp b/src/common/base/Configuration.cpp index bdc920c1a10..7f3f2c2c591 100644 --- a/src/common/base/Configuration.cpp +++ b/src/common/base/Configuration.cpp @@ -8,6 +8,10 @@ namespace nebula { +Configuration::Configuration() { + content_ = std::make_unique(folly::dynamic::object()); +} + Configuration::Configuration(folly::dynamic content) { CHECK(content.isObject()) << "The content is not a valid configuration"; content_ = std::make_unique(std::move(content)); @@ -160,7 +164,7 @@ Status Configuration::fetchAsSubConf(const char *key, Configuration &subconf) co } -Status Configuration::updateStringField(const char* key, const std::string& val) { +Status Configuration::upsertStringField(const char* key, const std::string& val) { DCHECK(content_ != nullptr); auto iter = content_->find(key); if (iter == content_->items().end() || iter->second.isString()) { diff --git a/src/common/base/Configuration.h b/src/common/base/Configuration.h index a147aa1e64c..870d1788488 100644 --- a/src/common/base/Configuration.h +++ b/src/common/base/Configuration.h @@ -19,7 +19,7 @@ namespace nebula { class Configuration final { public: - Configuration() = default; + Configuration(); explicit Configuration(folly::dynamic content); ~Configuration() = default; @@ -58,7 +58,7 @@ class Configuration final { Status MUST_USE_RESULT fetchAsSubConf(const char *key, Configuration &val) const; - Status MUST_USE_RESULT updateStringField(const char* key, const std::string& val); + Status MUST_USE_RESULT upsertStringField(const char* key, const std::string& val); // Iterate through every key in the configuration Status forEachKey(std::function processor) const; diff --git a/src/kvstore/EventListner.h b/src/kvstore/EventListner.h new file mode 100644 index 00000000000..01fc50f5c19 --- /dev/null +++ b/src/kvstore/EventListner.h @@ -0,0 +1,28 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include "rocksdb/db.h" +#include "rocksdb/listener.h" + +namespace nebula { +namespace kvstore { + +class EventListener : public rocksdb::EventListener { +public: + void OnCompactionCompleted(rocksdb::DB*, const rocksdb::CompactionJobInfo& info) override { + LOG(INFO) << "Rocksdb compact column family: " << info.cf_name + << " because of " << static_cast(info.compaction_reason) + << ", status: " << info.status.ToString() + << ", compacted " << info.input_files.size() + << " files into " << info.output_files.size() + << ", base level is " << info.base_input_level + << ", output level is " << info.output_level; + } +}; + +} // namespace kvstore +} // namespace nebula diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index b610b769caf..8a6ae57dc19 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -627,6 +627,7 @@ ResultCode NebulaStore::compact(GraphSpaceID spaceId) { auto code = ResultCode::SUCCEEDED; std::vector threads; + LOG(INFO) << "Space " << spaceId << " start compaction."; for (auto& engine : space->engines_) { threads.emplace_back(std::thread([&engine, &code] { auto ret = engine->compact(); @@ -640,6 +641,7 @@ ResultCode NebulaStore::compact(GraphSpaceID spaceId) { for (auto& t : threads) { t.join(); } + LOG(INFO) << "Space " << spaceId << " compaction done."; return code; } diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index 21c94530d66..3bdf20e8acd 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -95,9 +95,7 @@ void MetaServerBasedPartManager::onSpaceOptionUpdated( static std::unordered_set supportedOpt = { "disable_auto_compactions", "max_write_buffer_number", - // TODO: write_buffer_size will cause rocksdb crash - // "write_buffer_size", - "compression", + "write_buffer_size", "level0_file_num_compaction_trigger", "level0_slowdown_writes_trigger", "level0_stop_writes_trigger", @@ -105,16 +103,11 @@ void MetaServerBasedPartManager::onSpaceOptionUpdated( "target_file_size_multiplier", "max_bytes_for_level_base", "max_bytes_for_level_multiplier", - "ttl", - "block_size", - "block_restart_interval" }; static std::unordered_set supportedDbOpt = { "max_total_wal_size", "delete_obsolete_files_period_micros", "max_background_jobs", - "base_background_compactions", - "max_background_compactions", "stats_dump_period_sec", "compaction_readahead_size", "writable_file_max_buffer_size", diff --git a/src/kvstore/RocksEngineConfig.cpp b/src/kvstore/RocksEngineConfig.cpp index d0033cbd5aa..70f2cacaa97 100644 --- a/src/kvstore/RocksEngineConfig.cpp +++ b/src/kvstore/RocksEngineConfig.cpp @@ -6,11 +6,13 @@ #include "base/Base.h" #include "kvstore/RocksEngineConfig.h" +#include "kvstore/EventListner.h" #include "rocksdb/db.h" #include "rocksdb/cache.h" #include "rocksdb/convenience.h" #include "rocksdb/utilities/options_util.h" #include "rocksdb/slice_transform.h" +#include "rocksdb/filter_policy.h" #include "base/Configuration.h" // [WAL] @@ -44,6 +46,7 @@ DEFINE_int32(rocksdb_batch_size, DEFINE_int64(rocksdb_block_cache, 1024, "The default block cache size used in BlockBasedTable. The unit is MB"); +DEFINE_bool(enable_partitioned_index_filter, false, "True for partitioned index filters"); namespace nebula { namespace kvstore { @@ -62,6 +65,7 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options &baseOpts) { if (!s.ok()) { return s; } + dbOpts.listeners.emplace_back(new EventListener()); std::unordered_map cfOptsMap; if (!loadOptionsMap(cfOptsMap, FLAGS_rocksdb_column_family_options)) { @@ -87,6 +91,15 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options &baseOpts) { static std::shared_ptr blockCache = rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024); bbtOpts.block_cache = blockCache; + bbtOpts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false)); + if (FLAGS_enable_partitioned_index_filter) { + bbtOpts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + bbtOpts.partition_filters = true; + bbtOpts.cache_index_and_filter_blocks = true; + bbtOpts.cache_index_and_filter_blocks_with_high_priority = true; + bbtOpts.pin_l0_filter_and_index_blocks_in_cache = + baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel; + } baseOpts.table_factory.reset(NewBlockBasedTableFactory(bbtOpts)); baseOpts.create_if_missing = true; return s; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 058cec2c36c..39dcb8eaecf 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -28,7 +28,7 @@ DEFINE_uint64(raft_snapshot_timeout, 60 * 5, "Max seconds between two snapshot r DEFINE_uint32(max_batch_size, 256, "The max number of logs in a batch"); -DEFINE_int32(wal_ttl, 86400, "Default wal ttl"); +DEFINE_int32(wal_ttl, 14400, "Default wal ttl"); DEFINE_int64(wal_file_size, 16 * 1024 * 1024, "Default wal file size"); DEFINE_int32(wal_buffer_size, 8 * 1024 * 1024, "Default wal buffer size"); DEFINE_int32(wal_buffer_num, 2, "Default wal buffer number"); diff --git a/src/meta/processors/configMan/SetConfigProcessor.cpp b/src/meta/processors/configMan/SetConfigProcessor.cpp index 2bc3019e892..70fe30b0d75 100644 --- a/src/meta/processors/configMan/SetConfigProcessor.cpp +++ b/src/meta/processors/configMan/SetConfigProcessor.cpp @@ -14,7 +14,7 @@ std::unordered_set SetConfigProcessor::mutableFields_ = { // rocksdb_column_family_options "disable_auto_compactions", // TODO: write_buffer_size will cause rocksdb crash - // "write_buffer_size", + "write_buffer_size", "max_write_buffer_number", "level0_file_num_compaction_trigger", "level0_slowdown_writes_trigger", @@ -23,14 +23,11 @@ std::unordered_set SetConfigProcessor::mutableFields_ = { "target_file_size_multiplier", "max_bytes_for_level_base", "max_bytes_for_level_multiplier", - "ttl", // rocksdb_db_options "max_total_wal_size", "delete_obsolete_files_period_micros", "max_background_jobs", - "base_background_compactions", - "max_background_compactions", "stats_dump_period_sec", "compaction_readahead_size", "writable_file_max_buffer_size", @@ -95,7 +92,6 @@ void SetConfigProcessor::process(const cpp2::SetConfigReq& req) { doSyncPutAndUpdate(std::move(data)); return; } - return; } while (false); handleErrorCode(code); @@ -141,9 +137,6 @@ cpp2::ErrorCode SetConfigProcessor::setNestedConfig(const cpp2::ConfigModule& mo } Configuration conf; - auto confRet = conf.parseFromString(item.get_value()); - CHECK(confRet.ok()); - std::vector updateFields; folly::split(",", updateList, updateFields, true); bool updated = false; @@ -156,11 +149,12 @@ cpp2::ErrorCode SetConfigProcessor::setNestedConfig(const cpp2::ConfigModule& mo auto key = field.substr(0, pos); auto value = field.substr(pos + 1); // TODO: Maybe need to handle illegal value here - if (mutableFields_.count(key) && conf.updateStringField(key.c_str(), value).ok()) { + if (!conf.upsertStringField(key.c_str(), value).ok()) { + LOG(ERROR) << "Update configs failed: " << key; + return cpp2::ErrorCode::E_UNSUPPORTED; + } + if (mutableFields_.count(key)) { updated = true; - } else { - LOG(ERROR) << "Unsupported configs " << key; - return cpp2::ErrorCode::E_NOT_FOUND; } } diff --git a/src/meta/test/ConfigManTest.cpp b/src/meta/test/ConfigManTest.cpp index fb77474d390..519bebb7b99 100644 --- a/src/meta/test/ConfigManTest.cpp +++ b/src/meta/test/ConfigManTest.cpp @@ -30,7 +30,7 @@ DEFINE_int64(int64_key, 101, "test"); DEFINE_bool(bool_key, false, "test"); DEFINE_double(double_key, 1.23, "test"); DEFINE_string(string_key, "something", "test"); -DEFINE_string(nested_key, R"({"max_background_compactions":"4"})", "test"); +DEFINE_string(nested_key, R"({"max_background_jobs":"4"})", "test"); DEFINE_string(test0, "v0", "test"); DEFINE_string(test1, "v1", "test"); DEFINE_string(test2, "v2", "test"); @@ -166,7 +166,7 @@ TEST(ConfigManTest, ConfigProcessorTest) { item3.set_mode(cpp2::ConfigMode::MUTABLE); // default value is a json string std::string defaultValue = R"({ - "max_background_compactions":"4" + "max_background_jobs":"4" })"; item3.set_value(defaultValue); @@ -190,7 +190,7 @@ TEST(ConfigManTest, ConfigProcessorTest) { updated.set_type(cpp2::ConfigType::NESTED); updated.set_mode(cpp2::ConfigMode::MUTABLE); // update from consle as format of update list - updated.set_value("max_background_compactions=8,level0_file_num_compaction_trigger=10"); + updated.set_value("max_background_jobs=8,level0_file_num_compaction_trigger=10"); cpp2::SetConfigReq req; req.set_item(updated); @@ -221,7 +221,7 @@ TEST(ConfigManTest, ConfigProcessorTest) { ASSERT_TRUE(confRet.ok()); std::string val; - auto status = conf.fetchAsString("max_background_compactions", val); + auto status = conf.fetchAsString("max_background_jobs", val); ASSERT_TRUE(status.ok()); ASSERT_EQ(val, "8"); status = conf.fetchAsString("level0_file_num_compaction_trigger", val); @@ -423,10 +423,10 @@ TEST(ConfigManTest, MetaConfigManTest) { { std::string name = "nested_key"; auto type = cpp2::ConfigType::NESTED; - ASSERT_EQ(FLAGS_nested_key, R"({"max_background_compactions":"4"})"); + ASSERT_EQ(FLAGS_nested_key, R"({"max_background_jobs":"4"})"); // update config - std::string newValue = "max_background_compactions=8"; + std::string newValue = "max_background_jobs=8"; auto setRet = cfgMan.setConfig(module, name, type, newValue).get(); ASSERT_TRUE(setRet.ok()); @@ -440,7 +440,7 @@ TEST(ConfigManTest, MetaConfigManTest) { auto confRet = conf.parseFromString(value); ASSERT_TRUE(confRet.ok()); std::string val; - auto status = conf.fetchAsString("max_background_compactions", val); + auto status = conf.fetchAsString("max_background_jobs", val); ASSERT_TRUE(status.ok()); ASSERT_EQ(val, "8"); @@ -448,7 +448,7 @@ TEST(ConfigManTest, MetaConfigManTest) { sleep(FLAGS_heartbeat_interval_secs + 1); confRet = conf.parseFromString(FLAGS_nested_key); ASSERT_TRUE(confRet.ok()); - status = conf.fetchAsString("max_background_compactions", val); + status = conf.fetchAsString("max_background_jobs", val); ASSERT_TRUE(status.ok()); ASSERT_EQ(val, "8"); } @@ -557,7 +557,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) { { std::vector configItems; FLAGS_rocksdb_db_options = R"({ - "max_background_compactions":"4" + "max_background_jobs":"4" })"; configItems.emplace_back(toThriftConfigItem( module, "rocksdb_db_options", type, @@ -586,7 +586,7 @@ TEST(ConfigManTest, RocksdbOptionsTest) { storage::TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 9); { std::string name = "rocksdb_db_options"; - std::string updateValue = "max_background_compactions=10"; + std::string updateValue = "max_background_jobs=10"; // update config auto setRet = cfgMan.setConfig(module, name, type, updateValue).get(); ASSERT_TRUE(setRet.ok()); @@ -603,7 +603,8 @@ TEST(ConfigManTest, RocksdbOptionsTest) { { std::string name = "rocksdb_column_family_options"; std::string updateValue = "disable_auto_compactions=true," - "level0_file_num_compaction_trigger=8"; + "level0_file_num_compaction_trigger=8," + "write_buffer_size=1048576"; // update config auto setRet = cfgMan.setConfig(module, name, type, updateValue).get(); ASSERT_TRUE(setRet.ok()); @@ -627,9 +628,10 @@ TEST(ConfigManTest, RocksdbOptionsTest) { rocksdb::Status status = rocksdb::LoadLatestOptions(rocksPath, rocksdb::Env::Default(), &loadedDbOpt, &loadedCfDescs); ASSERT_TRUE(status.ok()); - EXPECT_EQ(10, loadedDbOpt.max_background_compactions); + EXPECT_EQ(10, loadedDbOpt.max_background_jobs); EXPECT_EQ(true, loadedCfDescs[0].options.disable_auto_compactions); EXPECT_EQ(8, loadedCfDescs[0].options.level0_file_num_compaction_trigger); + EXPECT_EQ(1048576, loadedCfDescs[0].options.write_buffer_size); } }