diff --git a/conf/pika.conf b/conf/pika.conf index 7f91a580ca..e902d45a1e 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -268,6 +268,7 @@ max-cache-statistic-keys : 0 # a small compact is triggered automatically if the small compaction feature is enabled. # small-compaction-threshold default value is 5000 and the value range is [1, 100000]. small-compaction-threshold : 5000 +small-compaction-duration-threshold : 10000 # The maximum total size of all live memtables of the RocksDB instance that owned by Pika. # Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB diff --git a/include/pika_conf.h b/include/pika_conf.h index 40bf3206a9..e65cd9a132 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -220,6 +220,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return small_compaction_threshold_; } + int small_compaction_duration_threshold() { + std::shared_lock l(rwlock_); + return small_compaction_duration_threshold_; + } int max_background_flushes() { std::shared_lock l(rwlock_); return max_background_flushes_; @@ -425,6 +429,11 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("small-compaction-threshold", std::to_string(value)); small_compaction_threshold_ = value; } + void SetSmallCompactionDurationThreshold(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("small-compaction-duration-threshold", std::to_string(value)); + small_compaction_duration_threshold_ = value; + } void SetMaxClientResponseSize(const int value) { std::lock_guard l(rwlock_); TryPushDiffCommands("max-client-response-size", std::to_string(value)); @@ -684,6 +693,7 @@ class PikaConf : public pstd::BaseConf { int max_cache_statistic_keys_ = 0; int small_compaction_threshold_ = 0; + int small_compaction_duration_threshold_ = 0; int max_background_flushes_ = 0; int max_background_compactions_ = 0; int max_background_jobs_ = 0; diff --git a/include/pika_server.h b/include/pika_server.h index 72a59a3f2c..98594cbf8f 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -218,6 +218,7 @@ class PikaServer : public pstd::noncopyable { void PrepareSlotTrySync(); void SlotSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); void SlotSetSmallCompactionThreshold(uint32_t small_compaction_threshold); + void SlotSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold); bool GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id, BinlogOffset* boffset); std::shared_ptr GetSlotByDBName(const std::string& db_name); std::shared_ptr GetDBSlotById(const std::string& db_name, uint32_t slot_id); diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 5977f05965..d2f02610f6 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1733,6 +1733,12 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeNumber(&config_body, g_pika_conf->small_compaction_threshold()); } + if (pstd::stringmatch(pattern.data(), "small-compaction-duration-threshold", 1) != 0) { + elements += 2; + EncodeString(&config_body, "small-compaction-duration-threshold"); + EncodeNumber(&config_body, g_pika_conf->small_compaction_duration_threshold()); + } + if (pstd::stringmatch(pattern.data(), "max-background-flushes", 1) != 0) { elements += 2; EncodeString(&config_body, "max-background-flushes"); @@ -2090,7 +2096,7 @@ void ConfigCmd::ConfigGet(std::string& ret) { void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { std::string set_item = config_args_v_[1]; if (set_item == "*") { - ret = "*28\r\n"; + ret = "*29\r\n"; EncodeString(&ret, "timeout"); EncodeString(&ret, "requirepass"); EncodeString(&ret, "masterauth"); @@ -2109,6 +2115,7 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { EncodeString(&ret, "write-binlog"); EncodeString(&ret, "max-cache-statistic-keys"); EncodeString(&ret, "small-compaction-threshold"); + EncodeString(&ret, "small-compaction-duration-threshold"); EncodeString(&ret, "max-client-response-size"); EncodeString(&ret, "db-sync-speed"); EncodeString(&ret, "compact-cron"); @@ -2241,6 +2248,14 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { g_pika_conf->SetSmallCompactionThreshold(static_cast(ival)); g_pika_server->SlotSetSmallCompactionThreshold(static_cast(ival)); ret = "+OK\r\n"; + } else if (set_item == "small-compaction-duration-threshold") { + if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'small-compaction-duration-threshold'\r\n"; + return; + } + g_pika_conf->SetSmallCompactionDurationThreshold(static_cast(ival)); + g_pika_server->SlotSetSmallCompactionDurationThreshold(static_cast(ival)); + ret = "+OK\r\n"; } else if (set_item == "max-client-response-size") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-client-response-size'\r\n"; diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 9aa414ae78..007d7490d7 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -451,8 +451,18 @@ int PikaConf::Load() { small_compaction_threshold_ = 5000; GetConfInt("small-compaction-threshold", &small_compaction_threshold_); - if (small_compaction_threshold_ <= 0 || small_compaction_threshold_ >= 100000) { - small_compaction_threshold_ = 5000; + if (small_compaction_threshold_ < 0) { + small_compaction_threshold_ = 0; + } else if (small_compaction_threshold_ >= 100000) { + small_compaction_threshold_ = 100000; + } + + small_compaction_duration_threshold_ = 10000; + GetConfInt("small-compaction-duration-threshold", &small_compaction_duration_threshold_); + if (small_compaction_duration_threshold_ < 0) { + small_compaction_duration_threshold_ = 0; + } else if (small_compaction_duration_threshold_ >= 1000000) { + small_compaction_duration_threshold_ = 1000000; } max_background_flushes_ = 1; @@ -724,6 +734,7 @@ int PikaConf::ConfigRewrite() { SetConfStr("replication-id", replication_id_); SetConfInt("max-cache-statistic-keys", max_cache_statistic_keys_); SetConfInt("small-compaction-threshold", small_compaction_threshold_); + SetConfInt("small-compaction-duration-threshold", small_compaction_duration_threshold_); SetConfInt("max-client-response-size", static_cast(max_client_response_size_)); SetConfInt("db-sync-speed", db_sync_speed_); SetConfStr("compact-cron", compact_cron_); diff --git a/src/pika_server.cc b/src/pika_server.cc index 7b9cdf4f9b..3d09f47625 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -527,6 +527,17 @@ void PikaServer::SlotSetSmallCompactionThreshold(uint32_t small_compaction_thres } } +void PikaServer::SlotSetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) { + std::shared_lock rwl(dbs_rw_); + for (const auto& db_item : dbs_) { + for (const auto& slot_item : db_item.second->slots_) { + slot_item.second->DbRWLockReader(); + slot_item.second->db()->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold); + slot_item.second->DbRWUnLock(); + } + } +} + bool PikaServer::GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id, BinlogOffset* const boffset) { std::shared_ptr slot = diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index b756c5635f..fce6d546c5 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -66,6 +66,7 @@ struct StorageOptions { bool share_block_cache = false; size_t statistics_max_size = 0; size_t small_compaction_threshold = 5000; + size_t small_compaction_duration_threshold = 10000; Status ResetOptions(const OptionType& option_type, const std::unordered_map& options_map); }; @@ -1034,6 +1035,7 @@ class Storage { Status SetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint32_t small_compaction_threshold); + Status SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold); std::string GetCurrentTaskType(); Status GetUsage(const std::string& property, uint64_t* result); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index ca56c55091..b6b848c6d4 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -12,8 +12,9 @@ Redis::Redis(Storage* const s, const DataType& type) : storage_(s), type_(type), lock_mgr_(std::make_shared(1000, 0, std::make_shared())), - small_compaction_threshold_(5000) { - statistics_store_ = std::make_unique>(); + small_compaction_threshold_(5000), + small_compaction_duration_threshold_(10000) { + statistics_store_ = std::make_unique>(); scan_cursors_store_ = std::make_unique>(); scan_cursors_store_->SetCapacity(5000); default_compact_range_options_.exclusive_manual_compaction = false; @@ -46,23 +47,40 @@ Status Redis::SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys) { return Status::OK(); } -Status Redis::SetSmallCompactionThreshold(size_t small_compaction_threshold) { +Status Redis::SetSmallCompactionThreshold(uint64_t small_compaction_threshold) { small_compaction_threshold_ = small_compaction_threshold; return Status::OK(); } -Status Redis::UpdateSpecificKeyStatistics(const std::string& key, size_t count) { - if ((statistics_store_->Capacity() != 0U) && (count != 0U)) { - size_t total = 0; - statistics_store_->Lookup(key, &total); - statistics_store_->Insert(key, total + count); - AddCompactKeyTaskIfNeeded(key, total + count); +Status Redis::SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold) { + small_compaction_duration_threshold_ = small_compaction_duration_threshold; + return Status::OK(); +} + +Status Redis::UpdateSpecificKeyStatistics(const std::string& key, uint64_t count) { + if ((statistics_store_->Capacity() != 0U) && (count != 0U) && (small_compaction_threshold_ != 0U)) { + KeyStatistics data; + statistics_store_->Lookup(key, &data); + data.AddModifyCount(count); + statistics_store_->Insert(key, data); + AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration()); + } + return Status::OK(); +} + +Status Redis::UpdateSpecificKeyDuration(const std::string& key, uint64_t duration) { + if ((statistics_store_->Capacity() != 0U) && (duration != 0U) && (small_compaction_duration_threshold_ != 0U)) { + KeyStatistics data; + statistics_store_->Lookup(key, &data); + data.AddDuration(duration); + statistics_store_->Insert(key, data); + AddCompactKeyTaskIfNeeded(key, data.ModifyCount(), data.AvgDuration()); } return Status::OK(); } -Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, size_t total) { - if (total < small_compaction_threshold_) { +Status Redis::AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration) { + if (count < small_compaction_threshold_ || duration < small_compaction_duration_threshold_) { return Status::OK(); } else { storage_->AddBGTask({type_, kCompactRange, {key, key}}); diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index ac1b560f51..f1615e7b8f 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -14,6 +14,8 @@ #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "pstd/include/env.h" + #include "src/lock_mgr.h" #include "src/lru_cache.h" #include "src/mutex_impl.h" @@ -30,6 +32,61 @@ class Redis { rocksdb::DB* GetDB() { return db_; } + struct KeyStatistics { + size_t window_size; + std::deque durations; + + uint64_t modify_count; + + KeyStatistics() : KeyStatistics(10) {} + + KeyStatistics(size_t size) : window_size(size + 2), modify_count(0) {} + + void AddDuration(uint64_t duration) { + durations.push_back(duration); + while (durations.size() > window_size) { + durations.pop_front(); + } + } + uint64_t AvgDuration() { + if (durations.size () < window_size) { + return 0; + } + uint64_t min = durations[0]; + uint64_t max = durations[0]; + uint64_t sum = 0; + for (auto duration : durations) { + if (duration < min) { + min = duration; + } + if (duration > max) { + max = duration; + } + sum += duration; + } + return (sum - max - min) / (durations.size() - 2); + } + void AddModifyCount(uint64_t count) { + modify_count += count; + } + uint64_t ModifyCount() { + return modify_count; + } + }; + + struct KeyStatisticsDurationGuard { + Redis* ctx; + std::string key; + uint64_t start_us; + KeyStatisticsDurationGuard(Redis* that, const std::string& key): ctx(that), key(key), start_us(pstd::NowMicros()) { + } + ~KeyStatisticsDurationGuard() { + uint64_t end_us = pstd::NowMicros(); + uint64_t duration = end_us > start_us ? end_us - start_us : 0; + ctx->UpdateSpecificKeyDuration(key, duration); + } + }; + Status SetOptions(const OptionType& option_type, const std::unordered_map& options); void SetWriteWalOptions(const bool is_wal_disable); @@ -54,7 +111,8 @@ class Redis { virtual Status TTL(const Slice& key, int64_t* timestamp) = 0; Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys); - Status SetSmallCompactionThreshold(size_t small_compaction_threshold); + Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold); + Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold); void GetRocksDBInfo(std::string &info, const char *prefix); protected: @@ -75,11 +133,13 @@ class Redis { Status StoreScanNextPoint(const Slice& key, const Slice& pattern, int64_t cursor, const std::string& next_point); // For Statistics - std::atomic small_compaction_threshold_; - std::unique_ptr> statistics_store_; + std::atomic_uint64_t small_compaction_threshold_; + std::atomic_uint64_t small_compaction_duration_threshold_; + std::unique_ptr> statistics_store_; - Status UpdateSpecificKeyStatistics(const std::string& key, size_t count); - Status AddCompactKeyTaskIfNeeded(const std::string& key, size_t total); + Status UpdateSpecificKeyStatistics(const std::string& key, uint64_t count); + Status UpdateSpecificKeyDuration(const std::string& key, uint64_t duration); + Status AddCompactKeyTaskIfNeeded(const std::string& key, uint64_t count, uint64_t duration); }; } // namespace storage diff --git a/src/storage/src/redis_hashes.cc b/src/storage/src/redis_hashes.cc index 6d28f76ec7..4d1c9bf6b7 100644 --- a/src/storage/src/redis_hashes.cc +++ b/src/storage/src/redis_hashes.cc @@ -22,6 +22,7 @@ RedisHashes::RedisHashes(Storage* const s, const DataType& type) : Redis(s, type Status RedisHashes::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); Status s = rocksdb::DB::Open(ops, db_path, &db_); @@ -298,6 +299,7 @@ Status RedisHashes::HGetall(const Slice& key, std::vector* fvs) { version = parsed_hashes_meta_value.version(); HashesDataKey hashes_data_key(key, version, ""); Slice prefix = hashes_data_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedHashesDataKey parsed_hashes_data_key(iter->key()); @@ -516,6 +518,7 @@ Status RedisHashes::HKeys(const Slice& key, std::vector* fields) { version = parsed_hashes_meta_value.version(); HashesDataKey hashes_data_key(key, version, ""); Slice prefix = hashes_data_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedHashesDataKey parsed_hashes_data_key(iter->key()); @@ -788,6 +791,7 @@ Status RedisHashes::HVals(const Slice& key, std::vector* values) { version = parsed_hashes_meta_value.version(); HashesDataKey hashes_data_key(key, version, ""); Slice prefix = hashes_data_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { values->push_back(iter->value().ToString()); @@ -850,6 +854,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p HashesDataKey hashes_data_prefix(key, version, sub_field); HashesDataKey hashes_start_data_key(key, version, start_point); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { @@ -900,6 +905,7 @@ Status RedisHashes::HScanx(const Slice& key, const std::string& start_field, con HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, version, start_field); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(hashes_start_data_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { @@ -956,6 +962,7 @@ Status RedisHashes::PKHScanRange(const Slice& key, const Slice& field_start, con HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, version, field_start); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(start_no_limit ? prefix : hashes_start_data_key.Encode()); iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Next()) { @@ -1016,6 +1023,7 @@ Status RedisHashes::PKHRScanRange(const Slice& key, const Slice& field_start, co HashesDataKey hashes_data_prefix(key, version, Slice()); HashesDataKey hashes_start_data_key(key, start_key_version, start_key_field); std::string prefix = hashes_data_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->SeekForPrev(hashes_start_data_key.Encode().ToString()); iter->Valid() && remain > 0 && iter->key().starts_with(prefix); iter->Prev()) { diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index 6360f94620..e2d484b3e4 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -26,6 +26,7 @@ RedisLists::RedisLists(Storage* const s, const DataType& type) : Redis(s, type) Status RedisLists::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); Status s = rocksdb::DB::Open(ops, db_path, &db_); diff --git a/src/storage/src/redis_sets.cc b/src/storage/src/redis_sets.cc index 4d8c7b2eee..f76217eb32 100644 --- a/src/storage/src/redis_sets.cc +++ b/src/storage/src/redis_sets.cc @@ -21,8 +21,6 @@ namespace storage { RedisSets::RedisSets(Storage* const s, const DataType& type) : Redis(s, type) { - spop_counts_store_ = std::make_unique>(); - spop_counts_store_->SetCapacity(1000); } RedisSets::~RedisSets() = default; @@ -30,6 +28,7 @@ RedisSets::~RedisSets() = default; rocksdb::Status RedisSets::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); rocksdb::Status s = rocksdb::DB::Open(ops, db_path, &db_); @@ -325,6 +324,7 @@ rocksdb::Status RedisSets::SDiff(const std::vector& keys, std::vect version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -393,6 +393,7 @@ rocksdb::Status RedisSets::SDiffstore(const Slice& destination, const std::vecto version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -493,6 +494,7 @@ rocksdb::Status RedisSets::SInter(const std::vector& keys, std::vec version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -575,6 +577,7 @@ rocksdb::Status RedisSets::SInterstore(const Slice& destination, const std::vect version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(keys[0], version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, keys[0]); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -685,6 +688,7 @@ rocksdb::Status RedisSets::SMembers(const Slice& key, std::vector* version = parsed_sets_meta_value.version(); SetsMemberKey sets_member_key(key, version, Slice()); Slice prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -832,7 +836,7 @@ rocksdb::Status RedisSets::SMove(const Slice& source, const Slice& destination, return s; } -rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* members, bool* need_compact, int64_t cnt) { +rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* members, int64_t cnt) { std::default_random_engine engine; std::string meta_value; @@ -890,6 +894,7 @@ rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* memb SetsMemberKey sets_member_key(key, version, Slice()); int64_t del_count = 0; + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(default_read_options_, handles_[1]); for (iter->Seek(sets_member_key.Encode()); iter->Valid() && cur_index < size; @@ -911,34 +916,14 @@ rocksdb::Status RedisSets::SPop(const Slice& key, std::vector* memb parsed_sets_meta_value.ModifyCount(static_cast(-cnt)); batch.Put(handles_[0], key, meta_value); delete iter; - } - } } else { return s; } - uint64_t count = 0; - uint64_t duration = pstd::NowMicros() - start_us; - AddAndGetSpopCount(key.ToString(), &count); - if (duration >= SPOP_COMPACT_THRESHOLD_DURATION - || count >= SPOP_COMPACT_THRESHOLD_COUNT) { - *need_compact = true; - ResetSpopCount(key.ToString()); - } return db_->Write(default_write_options_, &batch); } -rocksdb::Status RedisSets::ResetSpopCount(const std::string& key) { return spop_counts_store_->Remove(key); } - -rocksdb::Status RedisSets::AddAndGetSpopCount(const std::string& key, uint64_t* count) { - size_t old_count = 0; - spop_counts_store_->Lookup(key, &old_count); - spop_counts_store_->Insert(key, old_count + 1); - *count = old_count + 1; - return rocksdb::Status::OK(); -} - rocksdb::Status RedisSets::SRandmember(const Slice& key, int32_t count, std::vector* members) { if (count == 0) { return rocksdb::Status::OK(); @@ -988,6 +973,7 @@ rocksdb::Status RedisSets::SRandmember(const Slice& key, int32_t count, std::vec int32_t cur_index = 0; int32_t idx = 0; SetsMemberKey sets_member_key(key, version, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); auto iter = db_->NewIterator(default_read_options_, handles_[1]); for (iter->Seek(sets_member_key.Encode()); iter->Valid() && cur_index < size; iter->Next(), cur_index++) { if (static_cast(idx) >= targets.size()) { @@ -1087,6 +1073,7 @@ rocksdb::Status RedisSets::SUnion(const std::vector& keys, std::vec for (const auto& key_version : vaild_sets) { SetsMemberKey sets_member_key(key_version.key, key_version.version, Slice()); prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, key_version.key); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -1136,6 +1123,7 @@ rocksdb::Status RedisSets::SUnionstore(const Slice& destination, const std::vect for (const auto& key_version : vaild_sets) { SetsMemberKey sets_member_key(key_version.key, key_version.version, Slice()); prefix = sets_member_key.Encode(); + KeyStatisticsDurationGuard guard(this, key_version.key); auto iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ParsedSetsMemberKey parsed_sets_member_key(iter->key()); @@ -1220,6 +1208,7 @@ rocksdb::Status RedisSets::SScan(const Slice& key, int64_t cursor, const std::st SetsMemberKey sets_member_prefix(key, version, sub_member); SetsMemberKey sets_member_key(key, version, start_point); std::string prefix = sets_member_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(sets_member_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { diff --git a/src/storage/src/redis_sets.h b/src/storage/src/redis_sets.h index aa303b16a5..2898d0e9e7 100644 --- a/src/storage/src/redis_sets.h +++ b/src/storage/src/redis_sets.h @@ -10,15 +10,10 @@ #include #include -#include "pstd/include/env.h" - #include "src/custom_comparator.h" #include "src/lru_cache.h" #include "src/redis.h" -#define SPOP_COMPACT_THRESHOLD_COUNT 500 -#define SPOP_COMPACT_THRESHOLD_DURATION (1000 * 1000) // 1000ms - namespace storage { class RedisSets : public Redis { @@ -46,7 +41,7 @@ class RedisSets : public Redis { Status SMembers(const Slice& key, std::vector* members); Status SMembersWithTTL(const Slice& key, std::vector* members, int64_t* ttl); Status SMove(const Slice& source, const Slice& destination, const Slice& member, int32_t* ret); - Status SPop(const Slice& key, std::vector* members, bool* need_compact, int64_t cnt); + Status SPop(const Slice& key, std::vector* members, int64_t cnt); Status SRandmember(const Slice& key, int32_t count, std::vector* members); Status SRem(const Slice& key, const std::vector& members, int32_t* ret); Status SUnion(const std::vector& keys, std::vector* members); @@ -71,12 +66,6 @@ class RedisSets : public Redis { // Iterate all data void ScanDatabase(); - - private: - // For compact in time after multiple spop - std::unique_ptr> spop_counts_store_; - Status ResetSpopCount(const std::string& key); - Status AddAndGetSpopCount(const std::string& key, uint64_t* count); }; } // namespace storage diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index e516f9d140..4da415901f 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -31,6 +31,7 @@ RedisZSets::RedisZSets(Storage* const s, const DataType& type) : Redis(s, type) Status RedisZSets::Open(const StorageOptions& storage_options, const std::string& db_path) { statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; + small_compaction_duration_threshold_ = storage_options.small_compaction_duration_threshold; rocksdb::Options ops(storage_options.options); Status s = rocksdb::DB::Open(ops, db_path, &db_); @@ -229,6 +230,7 @@ Status RedisZSets::ZPopMax(const Slice& key, const int64_t count, std::vector::max(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); int32_t del_cnt = 0; for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && del_cnt < num; iter->Prev()) { @@ -274,6 +276,7 @@ Status RedisZSets::ZPopMin(const Slice& key, const int64_t count, std::vector::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); int32_t del_cnt = 0; for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && del_cnt < num; iter->Next()) { @@ -439,6 +442,7 @@ Status RedisZSets::ZCount(const Slice& key, double min, double max, bool left_cl int32_t stop_index = parsed_zsets_meta_value.count() - 1; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, min, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -563,6 +567,7 @@ Status RedisZSets::ZRange(const Slice& key, int32_t start, int32_t stop, std::ve int32_t cur_index = 0; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { if (cur_index >= start_index) { @@ -661,6 +666,7 @@ Status RedisZSets::ZRangebyscore(const Slice& key, double min, double max, bool int64_t skipped = 0; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, min, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && index <= stop_index; iter->Next(), ++index) { bool left_pass = false; @@ -725,6 +731,7 @@ Status RedisZSets::ZRank(const Slice& key, const Slice& member, int32_t* rank) { int32_t stop_index = parsed_zsets_meta_value.count() - 1; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && index <= stop_index; iter->Next(), ++index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); @@ -830,6 +837,7 @@ Status RedisZSets::ZRemrangebyrank(const Slice& key, int32_t start, int32_t stop return s; } ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { if (cur_index >= start_index) { @@ -878,6 +886,7 @@ Status RedisZSets::ZRemrangebyscore(const Slice& key, double min, double max, bo int32_t stop_index = parsed_zsets_meta_value.count() - 1; int32_t version = parsed_zsets_meta_value.version(); ZSetsScoreKey zsets_score_key(key, version, min, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -953,6 +962,7 @@ Status RedisZSets::ZRevrange(const Slice& key, int32_t start, int32_t stop, std: int32_t cur_index = count - 1; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::max(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && cur_index >= start_index; iter->Prev(), --cur_index) { @@ -991,6 +1001,7 @@ Status RedisZSets::ZRevrangebyscore(const Slice& key, double min, double max, bo int64_t skipped = 0; ScoreMember score_member; ZSetsScoreKey zsets_score_key(key, version, std::nextafter(max, std::numeric_limits::max()), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && left > 0; iter->Prev(), --left) { bool left_pass = false; @@ -1055,6 +1066,7 @@ Status RedisZSets::ZRevrank(const Slice& key, const Slice& member, int32_t* rank int32_t left = parsed_zsets_meta_value.count(); int32_t version = parsed_zsets_meta_value.version(); ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::max(), Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && left >= 0; iter->Prev(), --left, ++rev_index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); @@ -1137,6 +1149,7 @@ Status RedisZSets::ZUnionstore(const Slice& destination, const std::vector::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, keys[idx]); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { @@ -1253,6 +1266,7 @@ Status RedisZSets::ZInterstore(const Slice& destination, const std::vector::lowest(), Slice()); + KeyStatisticsDurationGuard guard(this, vaild_zsets[0].key); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[2]); for (iter->Seek(zsets_score_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); @@ -1357,6 +1371,7 @@ Status RedisZSets::ZRangebylex(const Slice& key, const Slice& min, const Slice& int32_t cur_index = 0; int32_t stop_index = parsed_zsets_meta_value.count() - 1; ZSetsMemberKey zsets_member_key(key, version, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(zsets_member_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -1417,6 +1432,7 @@ Status RedisZSets::ZRemrangebylex(const Slice& key, const Slice& min, const Slic int32_t cur_index = 0; int32_t stop_index = parsed_zsets_meta_value.count() - 1; ZSetsMemberKey zsets_member_key(key, version, Slice()); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(zsets_member_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { bool left_pass = false; @@ -1641,6 +1657,7 @@ Status RedisZSets::ZScan(const Slice& key, int64_t cursor, const std::string& pa ZSetsMemberKey zsets_member_prefix(key, version, sub_member); ZSetsMemberKey zsets_member_key(key, version, start_point); std::string prefix = zsets_member_prefix.Encode().ToString(); + KeyStatisticsDurationGuard guard(this, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[1]); for (iter->Seek(zsets_member_key.Encode()); iter->Valid() && rest > 0 && iter->key().starts_with(prefix); iter->Next()) { diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index a7c25043c7..d16548b9c2 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -337,11 +337,7 @@ Status Storage::SMove(const Slice& source, const Slice& destination, const Slice } Status Storage::SPop(const Slice& key, std::vector* members, int64_t count) { - bool need_compact = false; - Status status = sets_db_->SPop(key, members, &need_compact, count); - if (need_compact) { - AddBGTask({kSets, kCompactRange, {key.ToString(), key.ToString()}}); - } + Status status = sets_db_->SPop(key, members, count); return status; } @@ -1661,6 +1657,14 @@ Status Storage::SetSmallCompactionThreshold(uint32_t small_compaction_threshold) return Status::OK(); } +Status Storage::SetSmallCompactionDurationThreshold(uint32_t small_compaction_duration_threshold) { + std::vector dbs = {sets_db_.get(), zsets_db_.get(), hashes_db_.get(), lists_db_.get()}; + for (const auto& db : dbs) { + db->SetSmallCompactionDurationThreshold(small_compaction_duration_threshold); + } + return Status::OK(); +} + std::string Storage::GetCurrentTaskType() { int type = current_task_type_; switch (type) { diff --git a/tests/helpers/redis_queue.py b/tests/helpers/redis_queue.py index 3d0eb8fe12..9203c2d0db 100644 --- a/tests/helpers/redis_queue.py +++ b/tests/helpers/redis_queue.py @@ -6,13 +6,18 @@ START_FLAG = True + def enqueue(client: redis.Redis, queue_name: str): while START_FLAG: + n = client.zcard(queue_name) + if n >= 1000: + time.sleep(0.1) + continue now_ms = int(time.time() * 1000) pipeline = client.pipeline(transaction=False) for i in range(10): score = now_ms << 5 | i - pipeline.zadd(queue_name, {str(i): score}) + pipeline.zadd(queue_name, {str(score): score}) pipeline.execute() print("enqueue exit") @@ -23,6 +28,7 @@ def dequeue(client: redis.Redis, queue_name: str): start_time = time.time() n = client.zcard(queue_name) if n <= 10: + time.sleep(0.1) continue res = client.zremrangebyrank(queue_name, 0, 9) latency = time.time() - start_time @@ -45,37 +51,58 @@ def compact(client: redis.Redis, queue_name: str): print("compact exit") +def auto_compact(client: redis.Redis): + client.config_set("max-cache-statistic-keys", 10000) + client.config_set("small-compaction-threshold", 10000) + client.config_set("small-compaction-duration-threshold", 10000) + + def main(): - if len(sys.argv) != 4: - print("Usage: python redis_queue.py ") + if len(sys.argv) != 5: + print("Usage: python redis_queue.py $redis_host $port $passwd [compact | auto_compact]") sys.exit(1) host = sys.argv[1] port = int(sys.argv[2]) passwd = sys.argv[3] - client_enqueue = redis.Redis(host=host, port=port, password=passwd) - client_dequeue = redis.Redis(host=host, port=port, password=passwd) - client_compact = redis.Redis(host=host, port=port, password=passwd) + mode = sys.argv[4] + + thread_list = [] queue_name = "test_queue" + + client_enqueue = redis.Redis(host=host, port=port, password=passwd) t1 = threading.Thread(target=enqueue, args=(client_enqueue, queue_name)) - t1.start() + thread_list.append(t1) + + client_dequeue = redis.Redis(host=host, port=port, password=passwd) t2 = threading.Thread(target=dequeue, args=(client_dequeue, queue_name)) - t2.start() - t3 = threading.Thread(target=compact, args=(client_compact, queue_name)) - t3.start() - - def signal_handler(signal, frame): + thread_list.append(t2) + + client_compact = redis.Redis(host=host, port=port, password=passwd) + if mode == "compact": + t3 = threading.Thread(target=compact, args=(client_compact, queue_name)) + thread_list.append(t3) + elif mode == "auto_compact": + auto_compact(client_compact) + else: + print("invalid compact mode: {}".format(mode)) + sys.exit(1) + + for t in thread_list: + t.start() + + def signal_handler(signal, frame): print("revc signal: {}".format(signal)) global START_FLAG START_FLAG = False - t1.join() - t2.join() - t3.join() + for t in thread_list: + t.join() print("exit") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGQUIT, signal_handler) + while True: time.sleep(60)