diff --git a/conf/pika.conf b/conf/pika.conf index 496d974174..bd39e56c8d 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -636,4 +636,38 @@ cache-lfu-decay-time: 1 # 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election' # which serves for the scenario of codis-pika cluster reelection # You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING] -internal-used-unfinished-full-sync : \ No newline at end of file +internal-used-unfinished-full-sync : + +# Pika automatic compact compact strategy, a complement to rocksdb compact. +# Trigger the compact background task periodically according to `compact-interval` +# Can choose `full-compact` or `obd-compact`. +# obd-compact https://github.com/OpenAtomFoundation/pika/issues/2255 +compaction-strategy : obd-compact + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +compact-every-num-of-files : 10 + +# For OBD_Compact +# In another search, if the file creation time is +# greater than `force-compact-file-age-seconds`, +# a compaction of the upper and lower boundaries +# of the file will be performed at the same time +# `compact-every-num-of-files` -1 +force-compact-file-age-seconds : 300 + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +force-compact-min-delete-ratio : 10 + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +dont-compact-sst-created-in-seconds : 600 + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +best-delete-min-ratio : 10 diff --git a/conf/s_pika.conf b/conf/s_pika.conf new file mode 100644 index 0000000000..41b7f39ce4 --- /dev/null +++ b/conf/s_pika.conf @@ -0,0 +1,101 @@ +block_cache_size +# 360 dba pika conf pika3.5.2 +port : 9221 +thread-num : 8 +log-path : ./log/ +loglevel : info +db-path : ./db/ +write-buffer-size : 256M +timeout : 30 +#requirepass : 06154eee364854d5 +#masterauth : 06154eee364854d5 +#userpass : 06154eee364854d5360 +#userblacklist : bgsave,dumpoff,client +dump-prefix : pika- +dump-expire : 1 +pidfile : .pika.pid +daemonize : yes +dump-path : ./dump/block_cache_size +maxclients : 20000 +target-file-size-base : 20971520 +expire-logs-days : 7 +expire-logs-nums : 300 +root-connection-num : 10 +slowlog-log-slower-than : 100000 +binlog-file-size : 104857600 +compression : snappy +db-sync-path : ./dbsync +db-sync-speed : 60 +slowlog-write-errorlog : yes +small-compaction-threshold : 5000 +max-write-buffer-size : 20737418240 +max-cache-files : 8000 +replication-num : 0 +consensus-level : 0 +max-cache-statistic-keys : 0 +thread-pool-size : 30 +slowlog-write-errorlog : yes +default-slot-num : 1024 +instance-mode : classic +databases : 1 +sync-thread-num : 1 +arena-block-size : 33554432 +max-background-jobs : 12 +max-background-flushes : 3 +max-background-compactions : 9 +rate-limiter-bandwidth : 1099511627776 +db-instance-num : 1 +block-size : 4096 +#block-cache : 5368709120 +block-cache : 4294967296 +max-subcompactions : 8 + +#cache-maxmemory : 5368709120 +cache-lfu-decay-time : 1 +cache-maxmemory-samples : 5 +cache-maxmemory-policy : 1 +cache-num : 8 +cache-model : 0 +zset-cache-field-num-per-key : 512 +zset-cache-start-direction : 0 +cache-type : + +share-block-cache : yes +throttle-bytes-per-second : 102400000 +max-rsync-parallel-num : 4 +write-binlog : no +slotmigrate : no + +# Pika automatic compact compact strategy, a complement to rocksdb compact. +# Trigger the compact background task periodically according to `compact-interval` +# Can choose `full-compact` or `obd-compact`. +# obd-compact https://github.com/OpenAtomFoundation/pika/issues/2255 +compaction-strategy : obd-compact + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +compact-every-num-of-files : 10 + +# For OBD_Compact +# In another search, if the file creation time is +# greater than `force-compact-file-age-seconds`, +# a compaction of the upper and lower boundaries +# of the file will be performed at the same time +# `compact-every-num-of-files` -1 +force-compact-file-age-seconds : 300 + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +force-compact-min-delete-ratio : 10 + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +dont-compact-sst-created-in-seconds : 5 + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +best-delete-min-ratio : 10 diff --git a/include/pika_conf.h b/include/pika_conf.h index bec1ae14ca..244db8afe1 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -28,6 +28,12 @@ const uint32_t configReplicationIDSize = 50; // global class, class members well initialized class PikaConf : public pstd::BaseConf { + public: + enum CompactionStrategy { + FullCompact, + OldestOrBestDeleteRatioSstCompact + }; + public: PikaConf(const std::string& path); ~PikaConf() override = default; @@ -114,6 +120,30 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return max_subcompactions_; } + int compact_every_num_of_files() { + std::shared_lock l(rwlock_); + return compact_every_num_of_files_; + } + int force_compact_file_age_seconds() { + std::shared_lock l(rwlock_); + return force_compact_file_age_seconds_; + } + int force_compact_min_delete_ratio() { + std::shared_lock l(rwlock_); + return force_compact_min_delete_ratio_; + } + int dont_compact_sst_created_in_seconds() { + std::shared_lock l(rwlock_); + return dont_compact_sst_created_in_seconds_; + } + int best_delete_min_ratio() { + std::shared_lock l(rwlock_); + return best_delete_min_ratio_; + } + CompactionStrategy compaction_strategy() { + std::shared_lock l(rwlock_); + return compaction_strategy_; + } bool disable_auto_compactions() { std::shared_lock l(rwlock_); return disable_auto_compactions_; @@ -914,6 +944,15 @@ class PikaConf : public pstd::BaseConf { std::string compact_interval_; int max_subcompactions_ = 1; bool disable_auto_compactions_ = false; + + // for obd_compact + int compact_every_num_of_files_; + int force_compact_file_age_seconds_; + int force_compact_min_delete_ratio_; + int dont_compact_sst_created_in_seconds_; + int best_delete_min_ratio_; + CompactionStrategy compaction_strategy_; + int64_t resume_check_interval_ = 60; // seconds int64_t least_free_disk_to_resume_ = 268435456; // 256 MB double min_check_resume_ratio_ = 0.7; diff --git a/include/pika_db.h b/include/pika_db.h index c3d4fce211..edf691d2c5 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -125,6 +125,8 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { // Compact use; void Compact(const storage::DataType& type); void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end); + // void FullCompact(); + void LongestNotCompactiontSstCompact(const storage::DataType& type); void SetCompactRangeOptions(const bool is_canceled); diff --git a/include/pika_server.h b/include/pika_server.h index 1374158f88..a4f8a86152 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -59,6 +59,7 @@ enum TaskType { kStopKeyScan, kBgSave, kCompactRangeAll, + kCompactOldestOrBestDeleteRatioSst, }; struct TaskArg { diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 913f669880..d45d83b8cc 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -301,6 +301,41 @@ int PikaConf::Load() { max_subcompactions_ = 1; } + GetConfInt("compact-every-num-of-files", &compact_every_num_of_files_); + if (compact_every_num_of_files_ < 10) { + compact_every_num_of_files_ = 10; + } + + GetConfInt("force-compact-file-age-seconds", &force_compact_file_age_seconds_); + if (force_compact_file_age_seconds_ < 300) { + force_compact_file_age_seconds_ = 300; + } + + GetConfInt("force-compact-min-delete-ratio", &force_compact_min_delete_ratio_); + if (force_compact_min_delete_ratio_ < 10) { + force_compact_min_delete_ratio_ = 10; + } + + GetConfInt("dont-compact-sst-created-in-seconds", &dont_compact_sst_created_in_seconds_); + if (dont_compact_sst_created_in_seconds_ < 600) { + dont_compact_sst_created_in_seconds_ = 600; + } + + GetConfInt("best-delete-min-ratio", &best_delete_min_ratio_); + if (best_delete_min_ratio_ < 10) { + best_delete_min_ratio_ = 10; + } + + std::string cs_; + GetConfStr("compaction-strategy", &cs_); + if (cs_ == "full-compact") { + compaction_strategy_ = FullCompact; + } else if (cs_ == "obd-compact") { + compaction_strategy_ = OldestOrBestDeleteRatioSstCompact; + } else { + compaction_strategy_ = FullCompact; + } + // least-free-disk-resume-size GetConfInt64Human("least-free-disk-resume-size", &least_free_disk_to_resume_); if (least_free_disk_to_resume_ <= 0) { @@ -410,6 +445,11 @@ int PikaConf::Load() { max_write_buffer_num_ = 2; // 1 for immutable memtable, 1 for mutable memtable } + GetConfInt("min-write-buffer-number-to-merge", &min_write_buffer_number_to_merge_); + if (min_write_buffer_number_to_merge_ < 1) { + min_write_buffer_number_to_merge_ = 1; // 1 for immutable memtable to merge + } + // max_client_response_size GetConfInt64Human("max-client-response-size", &max_client_response_size_); if (max_client_response_size_ <= 0) { @@ -774,8 +814,40 @@ int PikaConf::ConfigRewrite() { SetConfInt("small-compaction-duration-threshold", small_compaction_duration_threshold_); SetConfInt("max-client-response-size", static_cast(max_client_response_size_)); SetConfInt("db-sync-speed", db_sync_speed_); + // compact SetConfStr("compact-cron", compact_cron_); SetConfStr("compact-interval", compact_interval_); + SetConfInt("compact-every-num-of-files", compact_every_num_of_files_); + if (compact_every_num_of_files_ < 1) { + compact_every_num_of_files_ = 1; + } + SetConfInt("force-compact-file-age-seconds", force_compact_file_age_seconds_); + if (force_compact_file_age_seconds_ < 300) { + force_compact_file_age_seconds_ = 300; + } + SetConfInt("force-compact-min-delete-ratio", force_compact_min_delete_ratio_); + if (force_compact_min_delete_ratio_ < 5) { + force_compact_min_delete_ratio_ = 5; + } + SetConfInt("dont-compact-sst-created-in-seconds", dont_compact_sst_created_in_seconds_); + if (dont_compact_sst_created_in_seconds_ < 300) { + dont_compact_sst_created_in_seconds_ = 300; + } + SetConfInt("best-delete-min-ratio", best_delete_min_ratio_); + if (best_delete_min_ratio_ < 10) { + best_delete_min_ratio_ = 10; + } + + std::string cs_; + SetConfStr("compaction-strategy", cs_); + if (cs_ == "full_compact") { + compaction_strategy_ = FullCompact; + } else if (cs_ == "obd_compact") { + compaction_strategy_ = OldestOrBestDeleteRatioSstCompact; + } else { + compaction_strategy_ = FullCompact; + } + SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false"); SetConfStr("cache-type", scachetype); SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_); diff --git a/src/pika_db.cc b/src/pika_db.cc index add26c66c1..92e1bc4899 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -166,6 +166,22 @@ void DB::CompactRange(const storage::DataType& type, const std::string& start, c storage_->CompactRange(type, start, end); } +// void DB::FullCompact() { +// std::lock_guard rwl(dbs_rw_); +// if (!opened_) { +// return; +// } +// storage_->FullCompact(); +// } + +void DB::LongestNotCompactiontSstCompact(const storage::DataType& type) { + std::lock_guard rwl(dbs_rw_); + if (!opened_) { + return; + } + storage_->LongestNotCompactiontSstCompact(type); +} + void DB::DoKeyScan(void* arg) { std::unique_ptr bg_task_arg(static_cast(arg)); bg_task_arg->db->RunKeyScan(); diff --git a/src/pika_server.cc b/src/pika_server.cc index 35efd46747..7a694559b0 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -424,6 +424,9 @@ Status PikaServer::DoSameThingSpecificDB(const std::set& dbs, const case TaskType::kCompactRangeAll: db_item.second->CompactRange(storage::DataType::kAll, arg.argv[0], arg.argv[1]); break; + case TaskType::kCompactOldestOrBestDeleteRatioSst: + db_item.second->LongestNotCompactiontSstCompact(storage::DataType::kAll); + break; default: break; } @@ -1138,7 +1141,12 @@ void PikaServer::AutoCompactRange() { gettimeofday(&last_check_compact_time_, nullptr); if ((static_cast(free_size) / static_cast(total_size)) * 100 >= usage) { std::set dbs = g_pika_server->GetAllDBName(); - Status s = DoSameThingSpecificDB(dbs, {TaskType::kCompactAll}); + Status s; + if (g_pika_conf->compaction_strategy() == PikaConf::FullCompact) { + s = DoSameThingSpecificDB(dbs, {TaskType::kCompactAll}); + } else if (g_pika_conf->compaction_strategy() == PikaConf::OldestOrBestDeleteRatioSstCompact) { + s = DoSameThingSpecificDB(dbs, {TaskType::kCompactOldestOrBestDeleteRatioSst}); + } if (s.ok()) { LOG(INFO) << "[Interval]schedule compactRange, freesize: " << free_size / 1048576 << "MB, disksize: " << total_size / 1048576 << "MB"; @@ -1333,6 +1341,12 @@ void PikaServer::InitStorageOptions() { storage_options_.options.max_bytes_for_level_base = g_pika_conf->level0_file_num_compaction_trigger() * g_pika_conf->write_buffer_size(); storage_options_.options.max_subcompactions = g_pika_conf->max_subcompactions(); storage_options_.options.target_file_size_base = g_pika_conf->target_file_size_base(); + storage_options_.options.level0_file_num_compaction_trigger = g_pika_conf->level0_file_num_compaction_trigger(); + storage_options_.options.level0_stop_writes_trigger = g_pika_conf->level0_stop_writes_trigger(); + storage_options_.options.level0_slowdown_writes_trigger = g_pika_conf->level0_slowdown_writes_trigger(); + storage_options_.options.min_write_buffer_number_to_merge = g_pika_conf->min_write_buffer_number_to_merge(); + storage_options_.options.max_bytes_for_level_base = g_pika_conf->level0_file_num_compaction_trigger() * g_pika_conf->write_buffer_size(); + storage_options_.options.max_subcompactions = g_pika_conf->max_subcompactions(); storage_options_.options.max_compaction_bytes = g_pika_conf->max_compaction_bytes(); storage_options_.options.max_background_flushes = g_pika_conf->max_background_flushes(); storage_options_.options.max_background_compactions = g_pika_conf->max_background_compactions(); @@ -1386,6 +1400,13 @@ void PikaServer::InitStorageOptions() { storage_options_.statistics_max_size = g_pika_conf->max_cache_statistic_keys(); storage_options_.small_compaction_threshold = g_pika_conf->small_compaction_threshold(); + // For Storage compaction + storage_options_.compact_param_.best_delete_min_ratio_ = g_pika_conf->best_delete_min_ratio(); + storage_options_.compact_param_.dont_compact_sst_created_in_seconds_ = g_pika_conf->dont_compact_sst_created_in_seconds(); + storage_options_.compact_param_.force_compact_file_age_seconds_ = g_pika_conf->force_compact_file_age_seconds(); + storage_options_.compact_param_.force_compact_min_delete_ratio_ = g_pika_conf->force_compact_min_delete_ratio(); + storage_options_.compact_param_.compact_every_num_of_files_ = g_pika_conf->compact_every_num_of_files(); + // rocksdb blob if (g_pika_conf->enable_blob_files()) { storage_options_.options.enable_blob_files = g_pika_conf->enable_blob_files(); diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 0b520f5800..11e00aabfd 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -71,6 +71,15 @@ struct StorageOptions { size_t statistics_max_size = 0; size_t small_compaction_threshold = 5000; size_t small_compaction_duration_threshold = 10000; + struct CompactParam { + // for LongestNotCompactiontSstCompact function + int compact_every_num_of_files_; + int force_compact_file_age_seconds_; + int force_compact_min_delete_ratio_; + int dont_compact_sst_created_in_seconds_; + int best_delete_min_ratio_; + }; + CompactParam compact_param_; Status ResetOptions(const OptionType& option_type, const std::unordered_map& options_map); }; @@ -153,7 +162,8 @@ enum BitOpType { kBitOpAnd = 1, kBitOpOr, kBitOpXor, kBitOpNot, kBitOpDefault }; enum Operation { kNone = 0, kCleanAll, - kCompactRange + kCompactRange, + kCompactOldestOrBestDeleteRatioSst, }; struct BGTask { @@ -1069,6 +1079,13 @@ class Storage { Status CompactRange(const DataType& type, const std::string& start, const std::string& end, bool sync = false); Status DoCompactRange(const DataType& type, const std::string& start, const std::string& end); Status DoCompactSpecificKey(const DataType& type, const std::string& key); + /** + * LongestNotCompactiontSstCompact will execute the compact command for any cf in the given type + * @param type. data type like `kStrings` + * @param sync. if true, block function + * @return Status + */ + Status LongestNotCompactiontSstCompact(const DataType &type, bool sync = false); Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold); @@ -1093,6 +1110,8 @@ class Storage { const std::string& db_type, const std::unordered_map& options); void GetRocksDBInfo(std::string& info); + const StorageOptions& GetStorageOptions(); + private: std::vector> insts_; std::unique_ptr slot_indexer_; @@ -1100,6 +1119,7 @@ class Storage { int db_instance_num_ = 3; int slot_num_ = 1024; bool is_classic_mode_ = true; + StorageOptions storage_options_; std::unique_ptr> cursors_store_; diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 8b796c111d..1c8e31d0b5 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -11,6 +11,8 @@ #include "src/lists_filter.h" #include "src/base_filter.h" #include "src/zsets_filter.h" +#include "pstd/include/pstd_string.h" +#include "pstd/include/pstd_defer.h" namespace storage { @@ -199,6 +201,213 @@ Status Redis::CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* en return Status::OK(); } +void SelectColumnFamilyHandles(const DataType& option_type, const ColumnFamilyType& type, + std::vector& handleIdxVec) { + switch (option_type) { + case DataType::kStrings: + handleIdxVec.push_back(kMetaCF); + break; + case DataType::kHashes: + if (type == kMeta || type == kMetaAndData) { + handleIdxVec.push_back(kMetaCF); + } + if (type == kData || type == kMetaAndData) { + handleIdxVec.push_back(kHashesDataCF); + } + break; + case DataType::kSets: + if (type == kMeta || type == kMetaAndData) { + handleIdxVec.push_back(kMetaCF); + } + if (type == kData || type == kMetaAndData) { + handleIdxVec.push_back(kSetsDataCF); + } + break; + case DataType::kLists: + if (type == kMeta || type == kMetaAndData) { + handleIdxVec.push_back(kMetaCF); + } + if (type == kData || type == kMetaAndData) { + handleIdxVec.push_back(kListsDataCF); + } + break; + case DataType::kZSets: + if (type == kMeta || type == kMetaAndData) { + handleIdxVec.push_back(kMetaCF); + } + if (type == kData || type == kMetaAndData) { + handleIdxVec.push_back(kZsetsDataCF); + handleIdxVec.push_back(kZsetsScoreCF); + } + break; + case DataType::kStreams: + if (type == kMeta || type == kMetaAndData) { + handleIdxVec.push_back(kMetaCF); + } + if (type == kData || type == kMetaAndData) { + handleIdxVec.push_back(kStreamsDataCF); + } + break; + case DataType::kNones: + enum ColumnFamilyIndex s; + for (s = kMetaCF; s <= kStreamsDataCF; s = (ColumnFamilyIndex)(s + 1)) { + handleIdxVec.push_back(s); + } + break; + default: + break; + } +} + +Status Redis::LongestNotCompactiontSstCompact(const DataType& option_type, std::vector* compact_result_vec, + const ColumnFamilyType& type) { + bool no_compact = false; + bool to_comapct = true; + if (!in_compact_flag_.compare_exchange_weak(no_compact, to_comapct, std::memory_order_relaxed, + std::memory_order_relaxed)) { + return Status::Corruption("compact running"); + } + + DEFER { in_compact_flag_.store(false); }; + std::vector handleIdxVec; + SelectColumnFamilyHandles(option_type, type, handleIdxVec); + if (handleIdxVec.size() == 0) { + return Status::Corruption("Invalid data type"); + } + + if (compact_result_vec) { + compact_result_vec->clear(); + } + + for (auto idx : handleIdxVec) { + rocksdb::TablePropertiesCollection props; + Status s = db_->GetPropertiesOfAllTables(handles_[idx], &props); + if (!s.ok()) { + if (compact_result_vec) { + compact_result_vec->push_back( + Status::Corruption(handles_[idx]->GetName() + + " LongestNotCompactiontSstCompact GetPropertiesOfAllTables error: " + s.ToString())); + } + continue; + } + + // The main goal of compaction was reclaimed the disk space and removed + // the tombstone. It seems that compaction scheduler was unnecessary here when + // the live files was too few, Hard code to 1 here. + if (props.size() <= 1) { + LOG(WARNING) << "LongestNotCompactiontSstCompact " << handles_[idx]->GetName() << " only one file"; + if (compact_result_vec) { + compact_result_vec->push_back(Status::OK()); + } + continue; + } + + size_t max_files_to_compact = 1; + const StorageOptions& storageOptions = storage_->GetStorageOptions(); + if (props.size() / storageOptions.compact_param_.compact_every_num_of_files_ > max_files_to_compact) { + max_files_to_compact = props.size() / storageOptions.compact_param_.compact_every_num_of_files_; + } + + int64_t now = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count(); + + auto force_compact_min_ratio = + static_cast(storageOptions.compact_param_.force_compact_min_delete_ratio_) / 100.0; + auto best_delete_min_ratio = static_cast(storageOptions.compact_param_.best_delete_min_ratio_) / 100.0; + + std::string best_filename; + double best_delete_ratio = 0; + int64_t total_keys = 0, deleted_keys = 0; + rocksdb::Slice start_key, stop_key, best_start_key, best_stop_key; + Status compact_result; + for (const auto& iter : props) { + uint64_t file_creation_time = iter.second->file_creation_time; + if (file_creation_time == 0) { + // Fallback to the file Modification time to prevent repeatedly compacting the same file, + // file_creation_time is 0 which means the unknown condition in rocksdb + auto s = rocksdb::Env::Default()->GetFileModificationTime(iter.first, &file_creation_time); + if (!s.ok()) { + LOG(WARNING) << handles_[idx]->GetName() << " Failed to get the file creation time: " << iter.first << " in " + << handles_[idx]->GetName() << ", err: " << s.ToString(); + continue; + } + } + + for (const auto& property_iter : iter.second->user_collected_properties) { + if (property_iter.first == "total_keys") { + if (!pstd::string2int(property_iter.second.c_str(), property_iter.second.length(), &total_keys)) { + LOG(WARNING) << handles_[idx]->GetName() << " " << iter.first << " Parse total_keys error"; + continue; + } + } + if (property_iter.first == "deleted_keys") { + if (!pstd::string2int(property_iter.second.c_str(), property_iter.second.length(), &deleted_keys)) { + LOG(WARNING) << handles_[idx]->GetName() << " " << iter.first << " Parse deleted_keys error"; + continue; + } + } + if (property_iter.first == "start_key") { + start_key = property_iter.second; + } + if (property_iter.first == "stop_key") { + stop_key = property_iter.second; + } + } + + if (start_key.empty() || stop_key.empty()) { + continue; + } + double delete_ratio = static_cast(deleted_keys) / static_cast(total_keys); + + // pick the file according to force compact policy + if (file_creation_time < + static_cast(now / 1000 - storageOptions.compact_param_.force_compact_file_age_seconds_) && + delete_ratio >= force_compact_min_ratio) { + compact_result = db_->CompactRange(default_compact_range_options_, &start_key, &stop_key); + if (--max_files_to_compact == 0) { + break; + } + continue; + } + + // don't compact the SST created in x `dont_compact_sst_created_in_seconds_`. + if (file_creation_time > + static_cast(now - storageOptions.compact_param_.dont_compact_sst_created_in_seconds_)) { + continue; + } + + // pick the file which has highest delete ratio + if (total_keys != 0 && delete_ratio > best_delete_ratio) { + best_delete_ratio = delete_ratio; + best_filename = iter.first; + best_start_key = start_key; + start_key.clear(); + best_stop_key = stop_key; + stop_key.clear(); + } + } + + if (best_delete_ratio > best_delete_min_ratio && !best_start_key.empty() && !best_stop_key.empty()) { + compact_result = + db_->CompactRange(default_compact_range_options_, handles_[idx], &best_start_key, &best_stop_key); + } + + if (!compact_result.ok()) { + if (compact_result_vec) { + compact_result_vec->push_back( + Status::Corruption(handles_[idx]->GetName() + " Failed to do compaction " + compact_result.ToString())); + } + continue; + } + + if (compact_result_vec) { + compact_result_vec->push_back(Status::OK()); + } + } + return Status::OK(); +} + Status Redis::SetSmallCompactionThreshold(uint64_t small_compaction_threshold) { small_compaction_threshold_ = small_compaction_threshold; return Status::OK(); diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index ccad635263..3b54edfa64 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -106,6 +106,9 @@ class Redis { virtual Status CompactRange(const rocksdb::Slice* begin, const rocksdb::Slice* end); + virtual Status LongestNotCompactiontSstCompact(const DataType& option_type, std::vector* compact_result_vec, + const ColumnFamilyType& type = kMetaAndData); + virtual Status GetProperty(const std::string& property, uint64_t* out); Status ScanKeyNum(std::vector* key_info); @@ -477,6 +480,7 @@ class Redis { rocksdb::WriteOptions default_write_options_; rocksdb::ReadOptions default_read_options_; rocksdb::CompactRangeOptions default_compact_range_options_; + std::atomic in_compact_flag_; // For Scan std::unique_ptr> scan_cursors_store_; diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index a264783927..4417d3dc49 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -93,6 +93,7 @@ Status Storage::Open(const StorageOptions& storage_options, const std::string& d mkpath(db_path.c_str(), 0755); int inst_count = db_instance_num_; + storage_options_ = storage_options; for (int index = 0; index < inst_count; index++) { insts_.emplace_back(std::make_unique(this, index)); Status s = insts_.back()->Open(storage_options, AppendSubDirectory(db_path, index)); @@ -1701,6 +1702,8 @@ Status Storage::RunBGTask() { if (task.operation == kCleanAll) { DoCompactRange(task.type, "", ""); + } else if (task.operation == kCompactOldestOrBestDeleteRatioSst) { + LongestNotCompactiontSstCompact(task.type, true); } else if (task.operation == kCompactRange) { if (task.argv.size() == 1) { DoCompactSpecificKey(task.type, task.argv[0]); @@ -1722,6 +1725,25 @@ Status Storage::Compact(const DataType& type, bool sync) { return Status::OK(); } +Status Storage::LongestNotCompactiontSstCompact(const DataType &type, bool sync) { + if (sync) { + Status s; + for (const auto& inst : insts_) { + std::vector compact_result_vec; + s = inst->LongestNotCompactiontSstCompact(type, &compact_result_vec); + for (auto compact_result : compact_result_vec) { + if (!compact_result.ok()) { + LOG(ERROR) << compact_result.ToString(); + } + } + } + return s; + } else { + AddBGTask({type, kCompactOldestOrBestDeleteRatioSst}); + } + return Status::OK(); +} + // run compactrange for all rocksdb instance Status Storage::DoCompactRange(const DataType& type, const std::string& start, const std::string& end) { if (type != DataType::kAll) { @@ -1740,6 +1762,9 @@ Status Storage::DoCompactRange(const DataType& type, const std::string& start, c for (const auto& inst : insts_) { current_task_type_ = Operation::kCleanAll; s = inst->CompactRange(start_ptr, end_ptr); + if (!s.ok()) { + LOG(ERROR) << "DoCompactRange error: " << s.ToString(); + } } current_task_type_ = Operation::kNone; return s; @@ -1929,6 +1954,10 @@ void Storage::GetRocksDBInfo(std::string& info) { } } +const StorageOptions& Storage::GetStorageOptions() { + return storage_options_; +} + int64_t Storage::IsExist(const Slice& key, std::map* type_status) { int64_t type_count = 0; auto& inst = GetDBInstance(key);