Skip to content

Commit

Permalink
Core-local statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
ajkr committed May 6, 2017
1 parent 73f0b5b commit 23214d0
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 189 deletions.
129 changes: 34 additions & 95 deletions monitoring/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<StatisticsImpl>(nullptr, false);
}

StatisticsImpl::StatisticsImpl(
std::shared_ptr<Statistics> stats,
bool enable_internal_stats)
: stats_shared_(stats),
stats_(stats.get()),
enable_internal_stats_(enable_internal_stats) {
}
StatisticsImpl::StatisticsImpl(std::shared_ptr<Statistics> stats,
bool enable_internal_stats)
: stats_(std::move(stats)), enable_internal_stats_(enable_internal_stats) {}

StatisticsImpl::~StatisticsImpl() {}

Expand All @@ -43,79 +39,36 @@ uint64_t StatisticsImpl::getTickerCountLocked(uint32_t tickerType) const {
enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX);
uint64_t thread_local_sum = 0;
tickers_[tickerType].thread_value->Fold(
[](void* curr_ptr, void* res) {
auto* sum_ptr = static_cast<uint64_t*>(res);
*sum_ptr += static_cast<std::atomic_uint_fast64_t*>(curr_ptr)->load(
std::memory_order_relaxed);
},
&thread_local_sum);
return thread_local_sum +
tickers_[tickerType].merged_sum.load(std::memory_order_relaxed);
}

std::unique_ptr<HistogramImpl>
StatisticsImpl::HistogramInfo::getMergedHistogram() const {
std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
{
MutexLock lock(&merge_lock);
res_hist->Merge(merged_hist);
uint64_t res = 0;
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType];
}
thread_value->Fold(
[](void* curr_ptr, void* res) {
auto tmp_res_hist = static_cast<HistogramImpl*>(res);
auto curr_hist = static_cast<HistogramImpl*>(curr_ptr);
tmp_res_hist->Merge(*curr_hist);
},
res_hist.get());
return res_hist;
return res;
}

void StatisticsImpl::histogramData(uint32_t histogramType,
HistogramData* const data) const {
MutexLock lock(&aggregate_lock_);
histogramDataLocked(histogramType, data);
getHistogramImplLocked(histogramType)->Data(data);
}

void StatisticsImpl::histogramDataLocked(uint32_t histogramType,
HistogramData* const data) const {
std::unique_ptr<HistogramImpl> StatisticsImpl::getHistogramImplLocked(
uint32_t histogramType) const {
assert(
enable_internal_stats_ ?
histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
histogramType < HISTOGRAM_ENUM_MAX);
histograms_[histogramType].getMergedHistogram()->Data(data);
std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
res_hist->Merge(
per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]);
}
return std::move(res_hist);
}

std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const {
MutexLock lock(&aggregate_lock_);
assert(enable_internal_stats_ ? histogramType < INTERNAL_HISTOGRAM_ENUM_MAX
: histogramType < HISTOGRAM_ENUM_MAX);
return histograms_[histogramType].getMergedHistogram()->ToString();
}

StatisticsImpl::ThreadTickerInfo* StatisticsImpl::getThreadTickerInfo(
uint32_t tickerType) {
auto info_ptr =
static_cast<ThreadTickerInfo*>(tickers_[tickerType].thread_value->Get());
if (info_ptr == nullptr) {
info_ptr =
new ThreadTickerInfo(0 /* value */, &tickers_[tickerType].merged_sum);
tickers_[tickerType].thread_value->Reset(info_ptr);
}
return info_ptr;
}

StatisticsImpl::ThreadHistogramInfo* StatisticsImpl::getThreadHistogramInfo(
uint32_t histogram_type) {
auto info_ptr = static_cast<ThreadHistogramInfo*>(
histograms_[histogram_type].thread_value->Get());
if (info_ptr == nullptr) {
info_ptr = new ThreadHistogramInfo(&histograms_[histogram_type].merged_hist,
&histograms_[histogram_type].merge_lock);
histograms_[histogram_type].thread_value->Reset(info_ptr);
}
return info_ptr;
return getHistogramImplLocked(histogramType)->ToString();
}

void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
Expand All @@ -131,14 +84,12 @@ void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) {
assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
: tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
tickers_[tickerType].thread_value->Fold(
[](void* curr_ptr, void* res) {
static_cast<std::atomic<uint64_t>*>(curr_ptr)->store(
0, std::memory_order_relaxed);
},
nullptr /* res */);
tickers_[tickerType].merged_sum.store(count, std::memory_order_relaxed);
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
if (core_idx == 0) {
per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count;
} else {
per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0;
}
}
}

Expand All @@ -148,16 +99,10 @@ uint64_t StatisticsImpl::getAndResetTickerCount(uint32_t tickerType) {
MutexLock lock(&aggregate_lock_);
assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
: tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
tickers_[tickerType].thread_value->Fold(
[](void* curr_ptr, void* res) {
auto* sum_ptr = static_cast<uint64_t*>(res);
*sum_ptr += static_cast<std::atomic<uint64_t>*>(curr_ptr)->exchange(
0, std::memory_order_relaxed);
},
&sum);
sum += tickers_[tickerType].merged_sum.exchange(
0, std::memory_order_relaxed);
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
sum +=
per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange(
0, std::memory_order_relaxed);
}
}
if (stats_ && tickerType < TICKER_ENUM_MAX) {
Expand All @@ -171,10 +116,8 @@ void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) {
enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
auto info_ptr = getThreadTickerInfo(tickerType);
info_ptr->value.fetch_add(count, std::memory_order_relaxed);
}
per_core_stats_.Access()->tickers_[tickerType].fetch_add(
count, std::memory_order_relaxed);
if (stats_ && tickerType < TICKER_ENUM_MAX) {
stats_->recordTick(tickerType, count);
}
Expand All @@ -185,9 +128,7 @@ void StatisticsImpl::measureTime(uint32_t histogramType, uint64_t value) {
enable_internal_stats_ ?
histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
histogramType < HISTOGRAM_ENUM_MAX);
if (histogramType < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
getThreadHistogramInfo(histogramType)->value.Add(value);
}
per_core_stats_.Access()->histograms_[histogramType].Add(value);
if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) {
stats_->measureTime(histogramType, value);
}
Expand All @@ -199,11 +140,9 @@ Status StatisticsImpl::Reset() {
setTickerCountLocked(i, 0);
}
for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) {
histograms_[i].thread_value->Fold(
[](void* curr_ptr, void* res) {
static_cast<HistogramImpl*>(curr_ptr)->Clear();
},
nullptr /* res */);
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear();
}
}
return Status::OK();
}
Expand Down Expand Up @@ -231,7 +170,7 @@ std::string StatisticsImpl::ToString() const {
if (h.first < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
char buffer[kTmpStrBufferSize];
HistogramData hData;
histogramDataLocked(h.first, &hData);
getHistogramImplLocked(h.first)->Data(&hData);
snprintf(
buffer, kTmpStrBufferSize,
"%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f 100 : %f\n",
Expand Down
104 changes: 22 additions & 82 deletions monitoring/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#include "monitoring/histogram.h"
#include "port/likely.h"
#include "port/port.h"
#include "util/core_local.h"
#include "util/mutexlock.h"
#include "util/thread_local.h"

namespace rocksdb {

Expand Down Expand Up @@ -52,97 +52,37 @@ class StatisticsImpl : public Statistics {
virtual bool HistEnabledForType(uint32_t type) const override;

private:
std::shared_ptr<Statistics> stats_shared_;
Statistics* stats_;
// If non-nullptr, forwards updates to the object pointed to by `stats_`.
std::shared_ptr<Statistics> stats_;
// TODO(ajkr): clean this up since there are no internal stats anymore
bool enable_internal_stats_;
// Synchronizes anything that operates on other threads' thread-specific data
// Synchronizes anything that operates across other cores' local data,
// such that operations like Reset() can be performed atomically.
mutable port::Mutex aggregate_lock_;

// Holds data maintained by each thread for implementing tickers.
struct ThreadTickerInfo {
std::atomic_uint_fast64_t value;
// During teardown, value will be summed into *merged_sum.
std::atomic_uint_fast64_t* merged_sum;

ThreadTickerInfo(uint_fast64_t _value,
std::atomic_uint_fast64_t* _merged_sum)
: value(_value), merged_sum(_merged_sum) {}
};

// Holds data maintained by each thread for implementing histograms.
struct ThreadHistogramInfo {
HistogramImpl value;
// During teardown, value will be merged into *merged_hist while holding
// *merge_lock, which also syncs with the merges necessary for reads.
HistogramImpl* merged_hist;
port::Mutex* merge_lock;

ThreadHistogramInfo(HistogramImpl* _merged_hist, port::Mutex* _merge_lock)
: value(), merged_hist(_merged_hist), merge_lock(_merge_lock) {}
};

// Holds global data for implementing tickers.
struct TickerInfo {
TickerInfo()
: thread_value(new ThreadLocalPtr(&mergeThreadValue)), merged_sum(0) {}
// Holds thread-specific pointer to ThreadTickerInfo
std::unique_ptr<ThreadLocalPtr> thread_value;
// Sum of thread-specific values for tickers that have been reset due to
// thread termination or ThreadLocalPtr destruction. Also, this is used by
// setTickerCount() to conveniently change the global value by setting this
// while simultaneously zeroing all thread-local values.
std::atomic_uint_fast64_t merged_sum;

static void mergeThreadValue(void* ptr) {
auto info_ptr = static_cast<ThreadTickerInfo*>(ptr);
*info_ptr->merged_sum += info_ptr->value;
delete info_ptr;
}
};

// Holds global data for implementing histograms.
struct HistogramInfo {
HistogramInfo()
: merged_hist(),
merge_lock(),
thread_value(new ThreadLocalPtr(&mergeThreadValue)) {}
// Merged thread-specific values for histograms that have been reset due to
// thread termination or ThreadLocalPtr destruction. Note these must be
// destroyed after thread_value since its destructor accesses them.
HistogramImpl merged_hist;
mutable port::Mutex merge_lock;
// Holds thread-specific pointer to ThreadHistogramInfo
std::unique_ptr<ThreadLocalPtr> thread_value;

static void mergeThreadValue(void* ptr) {
auto info_ptr = static_cast<ThreadHistogramInfo*>(ptr);
{
MutexLock lock(info_ptr->merge_lock);
info_ptr->merged_hist->Merge(info_ptr->value);
// The ticker/histogram data are stored in this structure, which we will store
// per-core. It is cache-aligned, so tickers/histograms belonging to different
// cores can never share the same cache line.
//
// Alignment attributes expand to nothing depending on the platform
__declspec(align(64)) struct StatisticsData {
StatisticsData() {
for (uint32_t i = 0; i < INTERNAL_TICKER_ENUM_MAX; ++i) {
std::atomic_init(&tickers_[i], static_cast<uint64_t>(0));
}
delete info_ptr;
}
std::atomic_uint_fast64_t tickers_[INTERNAL_TICKER_ENUM_MAX];
HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
} __attribute__((__aligned__(64)));

static_assert(sizeof(StatisticsData) % 64 == 0, "Expected 64-byte aligned");

// Returns a histogram that merges all histograms (thread-specific and
// previously merged ones).
std::unique_ptr<HistogramImpl> getMergedHistogram() const;
};
CoreLocalArray<StatisticsData> per_core_stats_;

uint64_t getTickerCountLocked(uint32_t ticker_type) const;
void histogramDataLocked(uint32_t histogram_type,
HistogramData* const data) const;
std::unique_ptr<HistogramImpl> getHistogramImplLocked(
uint32_t histogram_type) const;
void setTickerCountLocked(uint32_t ticker_type, uint64_t count);

// Returns the info for this tickerType/thread. It sets a new info with zeroed
// counter if none exists.
ThreadTickerInfo* getThreadTickerInfo(uint32_t ticker_type);
// Returns the info for this histogramType/thread. It sets a new histogram
// with zeroed data if none exists.
ThreadHistogramInfo* getThreadHistogramInfo(uint32_t histogram_type);

TickerInfo tickers_[INTERNAL_TICKER_ENUM_MAX];
HistogramInfo histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
};

// Utility functions
Expand Down
Loading

0 comments on commit 23214d0

Please sign in to comment.