Skip to content

Commit

Permalink
Record and export the keyspace hit/miss count to INFO command (#1971)
Browse files Browse the repository at this point in the history
Currently, Redis supports inspecting the keyspace hit/miss count via
INFO command to see if the hit ratio is expected. So we also export
the same metric to align with Redis metrics.
  • Loading branch information
git-hulk authored Dec 27, 2023
1 parent 7a08dfd commit 5b54f9c
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 107 deletions.
6 changes: 3 additions & 3 deletions src/commands/cmd_replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class CommandPSync : public Commander {
}

if (need_full_sync) {
srv->stats.IncrPSyncErrCounter();
srv->stats.IncrPSyncErrCount();
return {Status::RedisExecErr, *output};
}

Expand All @@ -98,7 +98,7 @@ class CommandPSync : public Commander {
return s.Prefixed("failed to set blocking mode on socket");
}

srv->stats.IncrPSyncOKCounter();
srv->stats.IncrPSyncOKCount();
s = srv->AddSlave(conn, next_repl_seq_);
if (!s.IsOK()) {
std::string err = "-ERR " + s.Msg() + "\r\n";
Expand Down Expand Up @@ -216,7 +216,7 @@ class CommandFetchMeta : public Commander {

conn->NeedNotFreeBufferEvent();
conn->EnableFlag(redis::Connection::kCloseAsync);
srv->stats.IncrFullSyncCounter();
srv->stats.IncrFullSyncCount();

// Feed-replica-meta thread
auto t = GET_OR_RET(util::CreateThread("feed-repl-info", [srv, repl_fd, ip, bev = conn->GetBufferEvent()] {
Expand Down
2 changes: 1 addition & 1 deletion src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void Connection::OnEvent(bufferevent *bev, int16_t events) {
}

void Connection::Reply(const std::string &msg) {
owner_->srv->stats.IncrOutbondBytes(msg.size());
owner_->srv->stats.IncrOutboundBytes(msg.size());
redis::Reply(bufferevent_get_output(bev_), msg);
}

Expand Down
6 changes: 3 additions & 3 deletions src/server/redis_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status Request::Tokenize(evbuffer *input) {
}

pipeline_size++;
srv_->stats.IncrInbondBytes(line.length);
srv_->stats.IncrInboundBytes(line.length);
if (line[0] == '*') {
auto parse_result = ParseInt<int64_t>(std::string(line.get() + 1, line.length - 1), 10);
if (!parse_result) {
Expand Down Expand Up @@ -101,7 +101,7 @@ Status Request::Tokenize(evbuffer *input) {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line || line.length <= 0) return Status::OK();

srv_->stats.IncrInbondBytes(line.length);
srv_->stats.IncrInboundBytes(line.length);
if (line[0] != '$') {
return {Status::NotOK, "Protocol error: expected '$'"};
}
Expand All @@ -125,7 +125,7 @@ Status Request::Tokenize(evbuffer *input) {
char *data = reinterpret_cast<char *>(evbuffer_pullup(input, static_cast<ssize_t>(bulk_len_ + 2)));
tokens_.emplace_back(data, bulk_len_);
evbuffer_drain(input, bulk_len_ + 2);
srv_->stats.IncrInbondBytes(bulk_len_ + 2);
srv_->stats.IncrInboundBytes(bulk_len_ + 2);
--multi_bulk_len_;
if (multi_bulk_len_ == 0) {
state_ = ArrayLen;
Expand Down
22 changes: 14 additions & 8 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -801,8 +801,8 @@ void Server::GetRocksDBInfo(std::string *info) {
<< "]:" << cf_stats_map["memtable-limit-stops"] << "\r\n";
}

auto db_stats = storage->GetDB()->GetDBOptions().statistics;
if (db_stats) {
auto rocksdb_stats = storage->GetDB()->GetDBOptions().statistics;
if (rocksdb_stats) {
std::map<std::string, uint32_t> block_cache_stats = {
{"block_cache_hit", rocksdb::Tickers::BLOCK_CACHE_HIT},
{"block_cache_index_hit", rocksdb::Tickers::BLOCK_CACHE_INDEX_HIT},
Expand All @@ -814,7 +814,7 @@ void Server::GetRocksDBInfo(std::string *info) {
{"block_cache_data_miss", rocksdb::Tickers::BLOCK_CACHE_DATA_MISS},
};
for (const auto &iter : block_cache_stats) {
string_stream << iter.first << ":" << db_stats->getTickerCount(iter.second) << "\r\n";
string_stream << iter.first << ":" << rocksdb_stats->getTickerCount(iter.second) << "\r\n";
}
}

Expand All @@ -829,8 +829,9 @@ void Server::GetRocksDBInfo(std::string *info) {
string_stream << "num_live_versions:" << num_live_versions << "\r\n";
string_stream << "num_super_version:" << num_super_version << "\r\n";
string_stream << "num_background_errors:" << num_background_errors << "\r\n";
string_stream << "flush_count:" << storage->GetFlushCount() << "\r\n";
string_stream << "compaction_count:" << storage->GetCompactionCount() << "\r\n";
auto db_stats = storage->GetDBStats();
string_stream << "flush_count:" << db_stats->flush_count << "\r\n";
string_stream << "compaction_count:" << db_stats->compaction_count << "\r\n";
string_stream << "put_per_sec:" << stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_PUT) << "\r\n";
string_stream << "get_per_sec:"
<< stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_GET) +
Expand Down Expand Up @@ -1027,9 +1028,14 @@ void Server::GetStatsInfo(std::string *info) {
<< static_cast<float>(stats.GetInstantaneousMetric(STATS_METRIC_NET_INPUT) / 1024) << "\r\n";
string_stream << "instantaneous_output_kbps:"
<< static_cast<float>(stats.GetInstantaneousMetric(STATS_METRIC_NET_OUTPUT) / 1024) << "\r\n";
string_stream << "sync_full:" << stats.fullsync_counter << "\r\n";
string_stream << "sync_partial_ok:" << stats.psync_ok_counter << "\r\n";
string_stream << "sync_partial_err:" << stats.psync_err_counter << "\r\n";
string_stream << "sync_full:" << stats.fullsync_count << "\r\n";
string_stream << "sync_partial_ok:" << stats.psync_ok_count << "\r\n";
string_stream << "sync_partial_err:" << stats.psync_err_count << "\r\n";

auto db_stats = storage->GetDBStats();
string_stream << "keyspace_hits:" << db_stats->keyspace_hits << "\r\n";
string_stream << "keyspace_misses:" << db_stats->keyspace_misses << "\r\n";

{
std::lock_guard<std::mutex> lg(pubsub_channels_mu_);
string_stream << "pubsub_channels:" << pubsub_channels_.size() << "\r\n";
Expand Down
16 changes: 8 additions & 8 deletions src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ class Stats {
mutable std::shared_mutex inst_metrics_mutex;
std::vector<InstMetric> inst_metrics;

std::atomic<uint64_t> fullsync_counter = {0};
std::atomic<uint64_t> psync_err_counter = {0};
std::atomic<uint64_t> psync_ok_counter = {0};
std::atomic<uint64_t> fullsync_count = {0};
std::atomic<uint64_t> psync_err_count = {0};
std::atomic<uint64_t> psync_ok_count = {0};
std::map<std::string, CommandStat> commands_stats;

Stats();
void IncrCalls(const std::string &command_name);
void IncrLatency(uint64_t latency, const std::string &command_name);
void IncrInbondBytes(uint64_t bytes) { in_bytes.fetch_add(bytes, std::memory_order_relaxed); }
void IncrOutbondBytes(uint64_t bytes) { out_bytes.fetch_add(bytes, std::memory_order_relaxed); }
void IncrFullSyncCounter() { fullsync_counter.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncErrCounter() { psync_err_counter.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncOKCounter() { psync_ok_counter.fetch_add(1, std::memory_order_relaxed); }
void IncrInboundBytes(uint64_t bytes) { in_bytes.fetch_add(bytes, std::memory_order_relaxed); }
void IncrOutboundBytes(uint64_t bytes) { out_bytes.fetch_add(bytes, std::memory_order_relaxed); }
void IncrFullSyncCount() { fullsync_count.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncErrCount() { psync_err_count.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncOKCount() { psync_ok_count.fetch_add(1, std::memory_order_relaxed); }
static int64_t GetMemoryRSS();
void TrackInstantaneousMetric(int metric, uint64_t current_reading);
uint64_t GetInstantaneousMetric(int metric) const;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/event_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void EventListener::OnCompactionCompleted(rocksdb::DB *db, const rocksdb::Compac
<< ", input bytes: " << ci.stats.total_input_bytes << ", output bytes:" << ci.stats.total_output_bytes
<< ", is_manual_compaction:" << (ci.stats.is_manual_compaction ? "yes" : "no")
<< ", elapsed(micro): " << ci.stats.elapsed_micros;
storage_->IncrCompactionCount(1);
storage_->RecordStat(engine::StatType::CompactionCount, 1);
storage_->CheckDBSizeLimit();
}

Expand All @@ -94,7 +94,7 @@ void EventListener::OnFlushBegin(rocksdb::DB *db, const rocksdb::FlushJobInfo &f
}

void EventListener::OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &fi) {
storage_->IncrFlushCount(1);
storage_->RecordStat(engine::StatType::FlushCount, 1);
storage_->CheckDBSizeLimit();
LOG(INFO) << "[event_listener/flush_completed] column family: " << fi.cf_name << ", thread_id: " << fi.thread_id
<< ", job_id: " << fi.job_id << ", file: " << fi.file_path
Expand Down
48 changes: 11 additions & 37 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <utility>

#include "cluster/redis_slot.h"
#include "common/scope_exit.h"
#include "db_util.h"
#include "parse_util.h"
#include "rocksdb/iterator.h"
Expand All @@ -44,6 +45,15 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, Metadata
std::string old_metadata;
metadata->Encode(&old_metadata);

bool is_keyspace_hit = false;
ScopeExit se([this, &is_keyspace_hit] {
if (is_keyspace_hit) {
storage_->RecordStat(engine::StatType::KeyspaceHits, 1);
} else {
storage_->RecordStat(engine::StatType::KeyspaceMisses, 1);
}
});

auto s = metadata->Decode(bytes);
if (!s.ok()) return s;

Expand All @@ -64,6 +74,7 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, Metadata
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound("no element found");
}
is_keyspace_hit = true;
return s;
}

Expand Down Expand Up @@ -597,43 +608,6 @@ rocksdb::Status Database::ClearKeysOfSlot(const rocksdb::Slice &ns, int slot) {
return rocksdb::Status::OK();
}

rocksdb::Status Database::GetSlotKeysInfo(int slot, std::map<int, uint64_t> *slotskeys, std::vector<std::string> *keys,
int count) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();

auto iter = util::UniqueIterator(storage_, read_options, metadata_cf_handle_);
bool end = false;
for (int i = 0; i < HASH_SLOTS_SIZE; i++) {
std::string prefix = ComposeSlotKeyPrefix(namespace_, i);
uint64_t total = 0;
int cnt = 0;
if (slot != -1 && i != slot) {
(*slotskeys)[i] = total;
continue;
}
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
if (!iter->key().starts_with(prefix)) {
break;
}
total++;
if (slot != -1 && count > 0 && !end) {
// Get user key
if (cnt < count) {
auto [_, user_key] = ExtractNamespaceKey(iter->key(), true);
keys->emplace_back(user_key.ToString());
cnt++;
}
}
}
// Maybe cnt < count
if (cnt > 0) end = true;
(*slotskeys)[i] = total;
}
return rocksdb::Status::OK();
}

rocksdb::Status Database::KeyExist(const std::string &key) {
int cnt = 0;
std::vector<rocksdb::Slice> keys;
Expand Down
4 changes: 1 addition & 3 deletions src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Database {
static constexpr uint64_t RANDOM_KEY_SCAN_LIMIT = 60;

explicit Database(engine::Storage *storage, std::string ns = "");
[[nodiscard]] static rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata);
[[nodiscard]] rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata);
[[nodiscard]] rocksdb::Status GetMetadata(RedisTypes types, const Slice &ns_key, Metadata *metadata);
[[nodiscard]] rocksdb::Status GetMetadata(RedisTypes types, const Slice &ns_key, std::string *raw_value,
Metadata *metadata, Slice *rest);
Expand All @@ -60,8 +60,6 @@ class Database {
std::string *begin, std::string *end,
rocksdb::ColumnFamilyHandle *cf_handle = nullptr);
[[nodiscard]] rocksdb::Status ClearKeysOfSlot(const rocksdb::Slice &ns, int slot);
[[nodiscard]] rocksdb::Status GetSlotKeysInfo(int slot, std::map<int, uint64_t> *slotskeys,
std::vector<std::string> *keys, int count);
[[nodiscard]] rocksdb::Status KeyExist(const std::string &key);

protected:
Expand Down
55 changes: 45 additions & 10 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,6 @@ Status Storage::SetOptionForAllColumnFamilies(const std::string &key, const std:
return Status::OK();
}

Status Storage::SetOption(const std::string &key, const std::string &value) {
auto s = db_->SetOptions({{key, value}});
if (!s.ok()) return {Status::NotOK, s.ToString()};
return Status::OK();
}

Status Storage::SetDBOption(const std::string &key, const std::string &value) {
auto s = db_->SetDBOptions({{key, value}});
if (!s.ok()) return {Status::NotOK, s.ToString()};
Expand Down Expand Up @@ -525,10 +519,15 @@ rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, const rocksdb:

rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Slice &key, std::string *value) {
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
return txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else {
s = db_->Get(options, column_family, key, value);
}
return db_->Get(options, column_family, key, value);

recordKeyspaceStat(column_family, s);
return s;
}

rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, const rocksdb::Slice &key,
Expand All @@ -538,16 +537,31 @@ rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, const rocksdb:

rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Slice &key, rocksdb::PinnableSlice *value) {
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
return txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else {
s = db_->Get(options, column_family, key, value);
}
return db_->Get(options, column_family, key, value);

recordKeyspaceStat(column_family, s);
return s;
}

rocksdb::Iterator *Storage::NewIterator(const rocksdb::ReadOptions &options) {
return NewIterator(options, db_->DefaultColumnFamily());
}

void Storage::recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s) {
if (column_family->GetName() != kMetadataColumnFamilyName) return;

// Don't record keyspace hits here because we cannot tell
// if the key was expired or not. So we record it when parsing the metadata.
if (s.IsNotFound() || s.IsInvalidArgument()) {
RecordStat(StatType::KeyspaceMisses, 1);
}
}

rocksdb::Iterator *Storage::NewIterator(const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family) {
auto iter = db_->NewIterator(options, column_family);
Expand All @@ -566,6 +580,10 @@ void Storage::MultiGet(const rocksdb::ReadOptions &options, rocksdb::ColumnFamil
} else {
db_->MultiGet(options, column_family, num_keys, keys, values, statuses, false);
}

for (size_t i = 0; i < num_keys; i++) {
recordKeyspaceStat(column_family, statuses[i]);
}
}

rocksdb::Status Storage::Write(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates) {
Expand Down Expand Up @@ -641,6 +659,23 @@ Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return Status::OK();
}

void Storage::RecordStat(StatType type, uint64_t v) {
switch (type) {
case StatType::FlushCount:
db_stats_.flush_count += v;
break;
case StatType::CompactionCount:
db_stats_.compaction_count += v;
break;
case StatType::KeyspaceHits:
db_stats_.keyspace_hits += v;
break;
case StatType::KeyspaceMisses:
db_stats_.keyspace_misses += v;
break;
}
}

rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) {
if (name == kMetadataColumnFamilyName) {
return cf_handles_[1];
Expand Down
Loading

0 comments on commit 5b54f9c

Please sign in to comment.