diff --git a/conf/pika.conf b/conf/pika.conf index 4e1f9630e7..8492bd9e5b 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -231,6 +231,8 @@ slave-priority : 100 # The disable_auto_compactions option is [true | false] disable_auto_compactions : false +# Rocksdb max_subcompactions +max-subcompactions : 1 # The minimum disk usage ratio for checking resume. # If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume. # Its default value is 0.7. @@ -296,6 +298,25 @@ max-write-buffer-size : 10737418240 # If max-write-buffer-num > 3, writing will be slowed down. max-write-buffer-num : 2 +# `min_write_buffer_number_to_merge` is the minimum number of memtables +# that need to be merged before placing the order. For example, if the +# option is set to 2, immutable memtables will only be flushed if there +# are two of them - a single immutable memtable will never be flushed. +# If multiple memtables are merged together, less data will be written +# to storage because the two updates are merged into a single key. However, +# each Get() must linearly traverse all unmodifiable memtables and check +# whether the key exists. Setting this value too high may hurt performance. +min-write-buffer-number-to-merge : 1 + +# rocksdb level0_stop_writes_trigger +level0-stop-writes-trigger : 36 + +# rocksdb level0_slowdown_writes_trigger +level0-slowdown-writes-trigger : 20 + +# rocksdb level0_file_num_compaction_trigger +level0-file-num-compaction-trigger : 4 + # The maximum size of the response package to client to prevent memory # exhaustion caused by commands like 'keys *' and 'Scan' which can generate huge response. # Supported Units [K|M|G]. The default unit is in [bytes]. diff --git a/include/pika_conf.h b/include/pika_conf.h index b302b27142..9fe1cc19ac 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -102,6 +102,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return compact_interval_; } + int max_subcompactions() { + std::shared_lock l(rwlock_); + return max_subcompactions_; + } bool disable_auto_compactions() { std::shared_lock l(rwlock_); return disable_auto_compactions_; @@ -122,6 +126,22 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return write_buffer_size_; } + int min_write_buffer_number_to_merge() { + std::shared_lock l(rwlock_); + return min_write_buffer_number_to_merge_; + } + int level0_stop_writes_trigger() { + std::shared_lock l(rwlock_); + return level0_stop_writes_trigger_; + } + int level0_slowdown_writes_trigger() { + std::shared_lock l(rwlock_); + return level0_slowdown_writes_trigger_; + } + int level0_file_num_compaction_trigger() { + std::shared_lock l(rwlock_); + return level0_file_num_compaction_trigger_; + } int64_t arena_block_size() { std::shared_lock l(rwlock_); return arena_block_size_; @@ -577,6 +597,11 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("disable_auto_compactions", value); disable_auto_compactions_ = value == "true"; } + void SetMaxSubcompactions(const int& value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("max-subcompactions", std::to_string(value)); + max_subcompactions_ = value; + } void SetLeastResumeFreeDiskSize(const int64_t& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("least-free-disk-resume-size", std::to_string(value)); @@ -620,6 +645,26 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("write-buffer-size", std::to_string(value)); write_buffer_size_ = value; } + void SetMinWriteBufferNumberToMerge(const int& value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("min-write-buffer-number-to-merge", std::to_string(value)); + min_write_buffer_number_to_merge_ = value; + } + void SetLevel0StopWritesTrigger(const int& value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("level0-stop-writes-trigger", std::to_string(value)); + level0_stop_writes_trigger_ = value; + } + void SetLevel0SlowdownWritesTrigger(const int& value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("level0-slowdown-writes-trigger", std::to_string(value)); + level0_slowdown_writes_trigger_ = value; + } + void SetLevel0FileNumCompactionTrigger(const int& value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("level0-file-num-compaction-trigger", std::to_string(value)); + level0_file_num_compaction_trigger_ = value; + } void SetMaxWriteBufferNumber(const int& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("max-write-buffer-num", std::to_string(value)); @@ -706,8 +751,11 @@ class PikaConf : public pstd::BaseConf { std::string db_path_; int db_instance_num_ = 0; std::string db_sync_path_; + + // compact std::string compact_cron_; std::string compact_interval_; + int max_subcompactions_ = 1; bool disable_auto_compactions_ = false; int64_t resume_check_interval_ = 60; // seconds int64_t least_free_disk_to_resume_ = 268435456; // 256 MB @@ -718,6 +766,10 @@ class PikaConf : public pstd::BaseConf { int64_t thread_migrate_keys_num_ = 0; int64_t max_write_buffer_size_ = 0; int max_write_buffer_num_ = 0; + int min_write_buffer_number_to_merge_ = 1; + int level0_stop_writes_trigger_ = 36; + int level0_slowdown_writes_trigger_ = 20; + int level0_file_num_compaction_trigger_ = 4; int64_t max_client_response_size_ = 0; bool daemonize_ = false; int timeout_ = 0; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index ccab653218..015d1c2a0e 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1858,6 +1858,30 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeNumber(&config_body, g_pika_conf->max_write_buffer_size()); } + if (pstd::stringmatch(pattern.data(), "min-write-buffer-number-to-merge", 1) != 0) { + elements += 2; + EncodeString(&config_body, "min-write-buffer-number-to-merge"); + EncodeNumber(&config_body, g_pika_conf->min_write_buffer_number_to_merge()); + } + + if (pstd::stringmatch(pattern.data(), "level0-stop-writes-trigger", 1) != 0) { + elements += 2; + EncodeString(&config_body, "level0-stop-writes-trigger"); + EncodeNumber(&config_body, g_pika_conf->level0_stop_writes_trigger()); + } + + if (pstd::stringmatch(pattern.data(), "level0-slowdown-writes-trigger", 1) != 0) { + elements += 2; + EncodeString(&config_body, "level0-slowdown-writes-trigger"); + EncodeNumber(&config_body, g_pika_conf->level0_slowdown_writes_trigger()); + } + + if (pstd::stringmatch(pattern.data(), "level0-file-num-compaction-trigger", 1) != 0) { + elements += 2; + EncodeString(&config_body, "level0-file-num-compaction-trigger"); + EncodeNumber(&config_body, g_pika_conf->level0_file_num_compaction_trigger()); + } + if (pstd::stringmatch(pattern.data(), "max-client-response-size", 1) != 0) { elements += 2; EncodeString(&config_body, "max-client-response-size"); @@ -2193,6 +2217,10 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { // MutableColumnFamilyOptions "write-buffer-size", "max-write-buffer-num", + "min-write-buffer-number-to-merge", + "level0-slowdown-writes-trigger", + "level0-stop-writes-trigger", + "level0-file-num-compaction-trigger", "arena-block-size", "throttle-bytes-per-second", "max-rsync-parallel-num", @@ -2547,6 +2575,58 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { } g_pika_conf->SetMaxWriteBufferNumber(static_cast(ival)); res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "min-write-buffer-number-to-merge") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'min-write-buffer-number-to-merge'\r\n"); + return; + } + std::unordered_map options_map{{"min_write_buffer_number_to_merge", value}}; + storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map); + if (!s.ok()) { + res_.AppendStringRaw("-ERR Set min-write-buffer-number-to-merge wrong: " + s.ToString() + "\r\n"); + return; + } + g_pika_conf->SetMinWriteBufferNumberToMerge(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "level0-stop-writes-trigger") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'level0-stop-writes-trigger'\r\n"); + return; + } + std::unordered_map options_map{{"level0_stop_writes_trigger", value}}; + storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map); + if (!s.ok()) { + res_.AppendStringRaw("-ERR Set level0-stop-writes-trigger wrong: " + s.ToString() + "\r\n"); + return; + } + g_pika_conf->SetLevel0StopWritesTrigger(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "level0-slowdown-writes-trigger") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'level0-slowdown-writes-trigger'\r\n"); + return; + } + std::unordered_map options_map{{"level0_slowdown_writes_trigger", value}}; + storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map); + if (!s.ok()) { + res_.AppendStringRaw("-ERR Set level0-slowdown-writes-trigger wrong: " + s.ToString() + "\r\n"); + return; + } + g_pika_conf->SetLevel0SlowdownWritesTrigger(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "level0-file-num-compaction-trigger") { + if (pstd::string2int(value.data(), value.size(), &ival) == 0) { + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'level0-file-num-compaction-trigger'\r\n"); + return; + } + std::unordered_map options_map{{"level0_file_num_compaction_trigger", value}}; + storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map); + if (!s.ok()) { + res_.AppendStringRaw("-ERR Set level0-file-num-compaction-trigger wrong: " + s.ToString() + "\r\n"); + return; + } + g_pika_conf->SetLevel0FileNumCompactionTrigger(static_cast(ival)); + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "arena-block-size") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'arena-block-size'\r\n"); diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 1a09ac9dfc..be7ac23ab1 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -251,6 +251,11 @@ int PikaConf::Load() { } } + GetConfInt("max-subcompactions", &max_subcompactions_); + if (max_subcompactions_ < 1) { + max_subcompactions_ = 1; + } + // least-free-disk-resume-size GetConfInt64Human("least-free-disk-resume-size", &least_free_disk_to_resume_); if (least_free_disk_to_resume_ <= 0) { @@ -273,6 +278,26 @@ int PikaConf::Load() { write_buffer_size_ = 268435456; // 256Mb } + GetConfInt("level0-stop-writes-trigger", &level0_stop_writes_trigger_); + if (level0_stop_writes_trigger_ < 36) { + level0_stop_writes_trigger_ = 36; + } + + GetConfInt("level0-slowdown-writes-trigger", &level0_slowdown_writes_trigger_); + if (level0_slowdown_writes_trigger_ < 20) { + level0_slowdown_writes_trigger_ = 20; + } + + GetConfInt("level0-file-num-compaction-trigger", &level0_file_num_compaction_trigger_); + if (level0_file_num_compaction_trigger_ < 4) { + level0_file_num_compaction_trigger_ = 4; + } + + GetConfInt("min-write-buffer-number-to-merge", &min_write_buffer_number_to_merge_); + if (min_write_buffer_number_to_merge_ < 1) { + min_write_buffer_number_to_merge_ = 1; // 1 for immutable memtable to merge + } + // arena_block_size GetConfInt64Human("arena-block-size", &arena_block_size_); if (arena_block_size_ <= 0) { @@ -685,6 +710,10 @@ int PikaConf::ConfigRewrite() { SetConfInt("max-background-jobs", max_background_jobs_); SetConfInt("max-write-buffer-num", max_write_buffer_num_); SetConfInt64("write-buffer-size", write_buffer_size_); + SetConfInt("min-write-buffer-number-to-merge", min_write_buffer_number_to_merge_); + SetConfInt("level0-stop-writes-trigger", level0_stop_writes_trigger_); + SetConfInt("level0-slowdown-writes-trigger", level0_slowdown_writes_trigger_); + SetConfInt("level0-file-num-compaction-trigger", level0_file_num_compaction_trigger_); SetConfInt64("arena-block-size", arena_block_size_); SetConfInt64("slotmigrate", slotmigrate_); // slaveof config item is special diff --git a/src/pika_server.cc b/src/pika_server.cc index f123f82209..5f2b2c51ff 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1306,6 +1306,12 @@ void PikaServer::InitStorageOptions() { storage_options_.options.write_buffer_manager = std::make_shared(g_pika_conf->max_write_buffer_size()); storage_options_.options.max_write_buffer_number = g_pika_conf->max_write_buffer_number(); + storage_options_.options.level0_file_num_compaction_trigger = g_pika_conf->level0_file_num_compaction_trigger(); + storage_options_.options.level0_stop_writes_trigger = g_pika_conf->level0_stop_writes_trigger(); + storage_options_.options.level0_slowdown_writes_trigger = g_pika_conf->level0_slowdown_writes_trigger(); + storage_options_.options.min_write_buffer_number_to_merge = g_pika_conf->min_write_buffer_number_to_merge(); + storage_options_.options.max_bytes_for_level_base = g_pika_conf->level0_file_num_compaction_trigger() * g_pika_conf->write_buffer_size(); + storage_options_.options.max_subcompactions = g_pika_conf->max_subcompactions(); storage_options_.options.target_file_size_base = g_pika_conf->target_file_size_base(); storage_options_.options.max_background_flushes = g_pika_conf->max_background_flushes(); storage_options_.options.max_background_compactions = g_pika_conf->max_background_compactions();