Skip to content

Commit

Permalink
Record and export the keyspace hit/miss count to INFO command
Browse files Browse the repository at this point in the history
Currently, Redis supports to inspect the keyspace hit/miss count via
INFO command to see if the hit ratio is expected. So we also export
the same metric to align with Redis metrics.
  • Loading branch information
git-hulk committed Dec 26, 2023
1 parent 343bbd6 commit 27f7000
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 62 deletions.
6 changes: 3 additions & 3 deletions src/commands/cmd_replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class CommandPSync : public Commander {
}

if (need_full_sync) {
srv->stats.IncrPSyncErrCounter();
srv->stats.IncrPSyncErrCount();
return {Status::RedisExecErr, *output};
}

Expand All @@ -98,7 +98,7 @@ class CommandPSync : public Commander {
return s.Prefixed("failed to set blocking mode on socket");
}

srv->stats.IncrPSyncOKCounter();
srv->stats.IncrPSyncOKCount();
s = srv->AddSlave(conn, next_repl_seq_);
if (!s.IsOK()) {
std::string err = "-ERR " + s.Msg() + "\r\n";
Expand Down Expand Up @@ -216,7 +216,7 @@ class CommandFetchMeta : public Commander {

conn->NeedNotFreeBufferEvent();
conn->EnableFlag(redis::Connection::kCloseAsync);
srv->stats.IncrFullSyncCounter();
srv->stats.IncrFullSyncCount();

// Feed-replica-meta thread
auto t = GET_OR_RET(util::CreateThread("feed-repl-info", [srv, repl_fd, ip, bev = conn->GetBufferEvent()] {
Expand Down
2 changes: 1 addition & 1 deletion src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void Connection::OnEvent(bufferevent *bev, int16_t events) {
}

void Connection::Reply(const std::string &msg) {
owner_->srv->stats.IncrOutbondBytes(msg.size());
owner_->srv->stats.IncrOutboundBytes(msg.size());
redis::Reply(bufferevent_get_output(bev_), msg);
}

Expand Down
6 changes: 3 additions & 3 deletions src/server/redis_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status Request::Tokenize(evbuffer *input) {
}

pipeline_size++;
srv_->stats.IncrInbondBytes(line.length);
srv_->stats.IncrInboundBytes(line.length);
if (line[0] == '*') {
auto parse_result = ParseInt<int64_t>(std::string(line.get() + 1, line.length - 1), 10);
if (!parse_result) {
Expand Down Expand Up @@ -101,7 +101,7 @@ Status Request::Tokenize(evbuffer *input) {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line || line.length <= 0) return Status::OK();

srv_->stats.IncrInbondBytes(line.length);
srv_->stats.IncrInboundBytes(line.length);
if (line[0] != '$') {
return {Status::NotOK, "Protocol error: expected '$'"};
}
Expand All @@ -125,7 +125,7 @@ Status Request::Tokenize(evbuffer *input) {
char *data = reinterpret_cast<char *>(evbuffer_pullup(input, static_cast<ssize_t>(bulk_len_ + 2)));
tokens_.emplace_back(data, bulk_len_);
evbuffer_drain(input, bulk_len_ + 2);
srv_->stats.IncrInbondBytes(bulk_len_ + 2);
srv_->stats.IncrInboundBytes(bulk_len_ + 2);
--multi_bulk_len_;
if (multi_bulk_len_ == 0) {
state_ = ArrayLen;
Expand Down
22 changes: 14 additions & 8 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -801,8 +801,8 @@ void Server::GetRocksDBInfo(std::string *info) {
<< "]:" << cf_stats_map["memtable-limit-stops"] << "\r\n";
}

auto db_stats = storage->GetDB()->GetDBOptions().statistics;
if (db_stats) {
auto rocksdb_stats = storage->GetDB()->GetDBOptions().statistics;
if (rocksdb_stats) {
std::map<std::string, uint32_t> block_cache_stats = {
{"block_cache_hit", rocksdb::Tickers::BLOCK_CACHE_HIT},
{"block_cache_index_hit", rocksdb::Tickers::BLOCK_CACHE_INDEX_HIT},
Expand All @@ -814,7 +814,7 @@ void Server::GetRocksDBInfo(std::string *info) {
{"block_cache_data_miss", rocksdb::Tickers::BLOCK_CACHE_DATA_MISS},
};
for (const auto &iter : block_cache_stats) {
string_stream << iter.first << ":" << db_stats->getTickerCount(iter.second) << "\r\n";
string_stream << iter.first << ":" << rocksdb_stats->getTickerCount(iter.second) << "\r\n";
}
}

Expand All @@ -829,8 +829,9 @@ void Server::GetRocksDBInfo(std::string *info) {
string_stream << "num_live_versions:" << num_live_versions << "\r\n";
string_stream << "num_super_version:" << num_super_version << "\r\n";
string_stream << "num_background_errors:" << num_background_errors << "\r\n";
string_stream << "flush_count:" << storage->GetFlushCount() << "\r\n";
string_stream << "compaction_count:" << storage->GetCompactionCount() << "\r\n";
auto db_stats = storage->GetDBStats();
string_stream << "flush_count:" << db_stats->flush_count << "\r\n";
string_stream << "compaction_count:" << db_stats->compaction_count << "\r\n";
string_stream << "put_per_sec:" << stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_PUT) << "\r\n";
string_stream << "get_per_sec:"
<< stats.GetInstantaneousMetric(STATS_METRIC_ROCKSDB_GET) +
Expand Down Expand Up @@ -1027,9 +1028,14 @@ void Server::GetStatsInfo(std::string *info) {
<< static_cast<float>(stats.GetInstantaneousMetric(STATS_METRIC_NET_INPUT) / 1024) << "\r\n";
string_stream << "instantaneous_output_kbps:"
<< static_cast<float>(stats.GetInstantaneousMetric(STATS_METRIC_NET_OUTPUT) / 1024) << "\r\n";
string_stream << "sync_full:" << stats.fullsync_counter << "\r\n";
string_stream << "sync_partial_ok:" << stats.psync_ok_counter << "\r\n";
string_stream << "sync_partial_err:" << stats.psync_err_counter << "\r\n";
string_stream << "sync_full:" << stats.fullsync_count << "\r\n";
string_stream << "sync_partial_ok:" << stats.psync_ok_count << "\r\n";
string_stream << "sync_partial_err:" << stats.psync_err_count << "\r\n";

auto db_stats = storage->GetDBStats();
string_stream << "keyspace_hits:" << db_stats->keyspace_hits << "\r\n";
string_stream << "keyspace_misses:" << db_stats->keyspace_misses << "\r\n";

{
std::lock_guard<std::mutex> lg(pubsub_channels_mu_);
string_stream << "pubsub_channels:" << pubsub_channels_.size() << "\r\n";
Expand Down
16 changes: 8 additions & 8 deletions src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ class Stats {
mutable std::shared_mutex inst_metrics_mutex;
std::vector<InstMetric> inst_metrics;

std::atomic<uint64_t> fullsync_counter = {0};
std::atomic<uint64_t> psync_err_counter = {0};
std::atomic<uint64_t> psync_ok_counter = {0};
std::atomic<uint64_t> fullsync_count = {0};
std::atomic<uint64_t> psync_err_count = {0};
std::atomic<uint64_t> psync_ok_count = {0};
std::map<std::string, CommandStat> commands_stats;

Stats();
void IncrCalls(const std::string &command_name);
void IncrLatency(uint64_t latency, const std::string &command_name);
void IncrInbondBytes(uint64_t bytes) { in_bytes.fetch_add(bytes, std::memory_order_relaxed); }
void IncrOutbondBytes(uint64_t bytes) { out_bytes.fetch_add(bytes, std::memory_order_relaxed); }
void IncrFullSyncCounter() { fullsync_counter.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncErrCounter() { psync_err_counter.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncOKCounter() { psync_ok_counter.fetch_add(1, std::memory_order_relaxed); }
void IncrInboundBytes(uint64_t bytes) { in_bytes.fetch_add(bytes, std::memory_order_relaxed); }
void IncrOutboundBytes(uint64_t bytes) { out_bytes.fetch_add(bytes, std::memory_order_relaxed); }
void IncrFullSyncCount() { fullsync_count.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncErrCount() { psync_err_count.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncOKCount() { psync_ok_count.fetch_add(1, std::memory_order_relaxed); }
static int64_t GetMemoryRSS();
void TrackInstantaneousMetric(int metric, uint64_t current_reading);
uint64_t GetInstantaneousMetric(int metric) const;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/event_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void EventListener::OnCompactionCompleted(rocksdb::DB *db, const rocksdb::Compac
<< ", input bytes: " << ci.stats.total_input_bytes << ", output bytes:" << ci.stats.total_output_bytes
<< ", is_manual_compaction:" << (ci.stats.is_manual_compaction ? "yes" : "no")
<< ", elapsed(micro): " << ci.stats.elapsed_micros;
storage_->IncrCompactionCount(1);
storage_->RecordStat(engine::StatType::CompactionCount, 1);
storage_->CheckDBSizeLimit();
}

Expand All @@ -94,7 +94,7 @@ void EventListener::OnFlushBegin(rocksdb::DB *db, const rocksdb::FlushJobInfo &f
}

void EventListener::OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &fi) {
storage_->IncrFlushCount(1);
storage_->RecordStat(engine::StatType::FlushCount, 1);
storage_->CheckDBSizeLimit();
LOG(INFO) << "[event_listener/flush_completed] column family: " << fi.cf_name << ", thread_id: " << fi.thread_id
<< ", job_id: " << fi.job_id << ", file: " << fi.file_path
Expand Down
15 changes: 14 additions & 1 deletion src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <utility>

#include "cluster/redis_slot.h"
#include "common/scope_exit.h"
#include "db_util.h"
#include "parse_util.h"
#include "rocksdb/iterator.h"
Expand All @@ -44,6 +45,15 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, Metadata
std::string old_metadata;
metadata->Encode(&old_metadata);

bool is_keyspace_hit = false;
ScopeExit se([this, &is_keyspace_hit] {
if (is_keyspace_hit) {
storage_->RecordStat(engine::StatType::KeyspaceHits, 1);
} else {
storage_->RecordStat(engine::StatType::KeyspaceMisses, 1);
}
});

auto s = metadata->Decode(bytes);
if (!s.ok()) return s;

Expand All @@ -64,6 +74,7 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, Metadata
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound("no element found");
}
is_keyspace_hit = true;
return s;
}

Expand All @@ -77,7 +88,9 @@ rocksdb::Status Database::GetMetadata(RedisType type, const Slice &ns_key, std::
Slice *rest) {
auto s = GetRawMetadata(ns_key, raw_value);
*rest = *raw_value;
if (!s.ok()) return s;
if (!s.ok()) {
return s;
}
return ParseMetadata({type}, rest, metadata);
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Database {
static constexpr uint64_t RANDOM_KEY_SCAN_LIMIT = 60;

explicit Database(engine::Storage *storage, std::string ns = "");
[[nodiscard]] static rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata);
[[nodiscard]] rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata);
[[nodiscard]] rocksdb::Status GetMetadata(RedisType type, const Slice &ns_key, Metadata *metadata);
[[nodiscard]] rocksdb::Status GetMetadata(RedisType type, const Slice &ns_key, std::string *raw_value,
Metadata *metadata, Slice *rest);
Expand Down
49 changes: 45 additions & 4 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,15 @@ rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, const rocksdb:

rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Slice &key, std::string *value) {
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
return txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else {
s = db_->Get(options, column_family, key, value);
}
return db_->Get(options, column_family, key, value);

recordKeyspaceStat(column_family, s);
return s;
}

rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, const rocksdb::Slice &key,
Expand All @@ -532,16 +537,31 @@ rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, const rocksdb:

rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Slice &key, rocksdb::PinnableSlice *value) {
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
return txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else {
s = db_->Get(options, column_family, key, value);
}
return db_->Get(options, column_family, key, value);

recordKeyspaceStat(column_family, s);
return s;
}

rocksdb::Iterator *Storage::NewIterator(const rocksdb::ReadOptions &options) {
return NewIterator(options, db_->DefaultColumnFamily());
}

void Storage::recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s) {
if (column_family->GetName() != kMetadataColumnFamilyName) return;

// Don't record keyspace hits here because we cannot tell
// if the key was expired or not. So we record it when parsing the metadata.
if (s.IsNotFound() || s.IsInvalidArgument()) {
RecordStat(StatType::KeyspaceMisses, 1);
}
}

rocksdb::Iterator *Storage::NewIterator(const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family) {
auto iter = db_->NewIterator(options, column_family);
Expand All @@ -560,6 +580,10 @@ void Storage::MultiGet(const rocksdb::ReadOptions &options, rocksdb::ColumnFamil
} else {
db_->MultiGet(options, column_family, num_keys, keys, values, statuses, false);
}

for (size_t i = 0; i < num_keys; i++) {
recordKeyspaceStat(column_family, statuses[i]);
}
}

rocksdb::Status Storage::Write(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates) {
Expand Down Expand Up @@ -635,6 +659,23 @@ Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return Status::OK();
}

void Storage::RecordStat(StatType type, uint64_t v) {
switch (type) {
case StatType::FlushCount:
db_stats_.flush_count += v;
break;
case StatType::CompactionCount:
db_stats_.compaction_count += v;
break;
case StatType::KeyspaceHits:
db_stats_.keyspace_hits += v;
break;
case StatType::KeyspaceMisses:
db_stats_.keyspace_misses += v;
break;
}
}

rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) {
if (name == kMetadataColumnFamilyName) {
return cf_handles_[1];
Expand Down
26 changes: 20 additions & 6 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ inline const std::vector<CompressionOption> CompressionOptions = {
{rocksdb::kZSTD, "zstd", "kZSTD"},
};

enum class StatType {
CompactionCount,
FlushCount,
KeyspaceHits,
KeyspaceMisses,
};

struct DBStats {
std::atomic<uint64_t> compaction_count = 0;
std::atomic<uint64_t> flush_count = 0;
std::atomic<uint64_t> keyspace_hits = 0;
std::atomic<uint64_t> keyspace_misses = 0;
};

class Storage {
public:
explicit Storage(Config *config);
Expand Down Expand Up @@ -144,13 +158,12 @@ class Storage {
std::shared_lock<std::shared_mutex> ReadLockGuard();
std::unique_lock<std::shared_mutex> WriteLockGuard();

uint64_t GetFlushCount() const { return flush_count_; }
void IncrFlushCount(uint64_t n) { flush_count_.fetch_add(n); }
uint64_t GetCompactionCount() const { return compaction_count_; }
void IncrCompactionCount(uint64_t n) { compaction_count_.fetch_add(n); }
bool IsSlotIdEncoded() const { return config_->slot_id_encoded; }
Config *GetConfig() const { return config_; }

const DBStats *GetDBStats() const { return &db_stats_; }
void RecordStat(StatType type, uint64_t v);

Status BeginTxn();
Status CommitTxn();
ObserverOrUniquePtr<rocksdb::WriteBatchBase> GetWriteBatchBase();
Expand Down Expand Up @@ -213,8 +226,8 @@ class Storage {
std::vector<rocksdb::ColumnFamilyHandle *> cf_handles_;
LockManager lock_mgr_;
bool db_size_limit_reached_ = false;
std::atomic<uint64_t> flush_count_{0};
std::atomic<uint64_t> compaction_count_{0};

DBStats db_stats_;

std::shared_mutex db_rw_lock_;
bool db_closing_ = true;
Expand All @@ -233,6 +246,7 @@ class Storage {
rocksdb::WriteOptions write_opts_ = rocksdb::WriteOptions();

rocksdb::Status writeToDB(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates);
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s);
};

} // namespace engine
Loading

0 comments on commit 27f7000

Please sign in to comment.