Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core-local statistics #2258

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Rocksdb Change Log
## Unreleased
### New Features
* Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads.

## 5.5.0 (05/17/2017)
### New Features
Expand Down
129 changes: 34 additions & 95 deletions monitoring/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<StatisticsImpl>(nullptr, false);
}

StatisticsImpl::StatisticsImpl(
std::shared_ptr<Statistics> stats,
bool enable_internal_stats)
: stats_shared_(stats),
stats_(stats.get()),
enable_internal_stats_(enable_internal_stats) {
}
StatisticsImpl::StatisticsImpl(std::shared_ptr<Statistics> stats,
bool enable_internal_stats)
: stats_(std::move(stats)), enable_internal_stats_(enable_internal_stats) {}

StatisticsImpl::~StatisticsImpl() {}

Expand All @@ -43,79 +39,36 @@ uint64_t StatisticsImpl::getTickerCountLocked(uint32_t tickerType) const {
enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX);
uint64_t thread_local_sum = 0;
tickers_[tickerType].thread_value->Fold(
[](void* curr_ptr, void* res) {
auto* sum_ptr = static_cast<uint64_t*>(res);
*sum_ptr += static_cast<std::atomic_uint_fast64_t*>(curr_ptr)->load(
std::memory_order_relaxed);
},
&thread_local_sum);
return thread_local_sum +
tickers_[tickerType].merged_sum.load(std::memory_order_relaxed);
}

std::unique_ptr<HistogramImpl>
StatisticsImpl::HistogramInfo::getMergedHistogram() const {
std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
{
MutexLock lock(&merge_lock);
res_hist->Merge(merged_hist);
uint64_t res = 0;
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType];
}
thread_value->Fold(
[](void* curr_ptr, void* res) {
auto tmp_res_hist = static_cast<HistogramImpl*>(res);
auto curr_hist = static_cast<HistogramImpl*>(curr_ptr);
tmp_res_hist->Merge(*curr_hist);
},
res_hist.get());
return res_hist;
return res;
}

void StatisticsImpl::histogramData(uint32_t histogramType,
HistogramData* const data) const {
MutexLock lock(&aggregate_lock_);
histogramDataLocked(histogramType, data);
getHistogramImplLocked(histogramType)->Data(data);
}

void StatisticsImpl::histogramDataLocked(uint32_t histogramType,
HistogramData* const data) const {
std::unique_ptr<HistogramImpl> StatisticsImpl::getHistogramImplLocked(
uint32_t histogramType) const {
assert(
enable_internal_stats_ ?
histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
histogramType < HISTOGRAM_ENUM_MAX);
histograms_[histogramType].getMergedHistogram()->Data(data);
std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
res_hist->Merge(
per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]);
}
return res_hist;
}

std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const {
MutexLock lock(&aggregate_lock_);
assert(enable_internal_stats_ ? histogramType < INTERNAL_HISTOGRAM_ENUM_MAX
: histogramType < HISTOGRAM_ENUM_MAX);
return histograms_[histogramType].getMergedHistogram()->ToString();
}

StatisticsImpl::ThreadTickerInfo* StatisticsImpl::getThreadTickerInfo(
uint32_t tickerType) {
auto info_ptr =
static_cast<ThreadTickerInfo*>(tickers_[tickerType].thread_value->Get());
if (info_ptr == nullptr) {
info_ptr =
new ThreadTickerInfo(0 /* value */, &tickers_[tickerType].merged_sum);
tickers_[tickerType].thread_value->Reset(info_ptr);
}
return info_ptr;
}

StatisticsImpl::ThreadHistogramInfo* StatisticsImpl::getThreadHistogramInfo(
uint32_t histogram_type) {
auto info_ptr = static_cast<ThreadHistogramInfo*>(
histograms_[histogram_type].thread_value->Get());
if (info_ptr == nullptr) {
info_ptr = new ThreadHistogramInfo(&histograms_[histogram_type].merged_hist,
&histograms_[histogram_type].merge_lock);
histograms_[histogram_type].thread_value->Reset(info_ptr);
}
return info_ptr;
return getHistogramImplLocked(histogramType)->ToString();
}

void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
Expand All @@ -131,14 +84,12 @@ void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) {
assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
: tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
tickers_[tickerType].thread_value->Fold(
[](void* curr_ptr, void* res) {
static_cast<std::atomic<uint64_t>*>(curr_ptr)->store(
0, std::memory_order_relaxed);
},
nullptr /* res */);
tickers_[tickerType].merged_sum.store(count, std::memory_order_relaxed);
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
if (core_idx == 0) {
per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count;
} else {
per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0;
}
}
}

Expand All @@ -148,16 +99,10 @@ uint64_t StatisticsImpl::getAndResetTickerCount(uint32_t tickerType) {
MutexLock lock(&aggregate_lock_);
assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX
: tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
tickers_[tickerType].thread_value->Fold(
[](void* curr_ptr, void* res) {
auto* sum_ptr = static_cast<uint64_t*>(res);
*sum_ptr += static_cast<std::atomic<uint64_t>*>(curr_ptr)->exchange(
0, std::memory_order_relaxed);
},
&sum);
sum += tickers_[tickerType].merged_sum.exchange(
0, std::memory_order_relaxed);
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
sum +=
per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange(
0, std::memory_order_relaxed);
}
}
if (stats_ && tickerType < TICKER_ENUM_MAX) {
Expand All @@ -171,10 +116,8 @@ void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) {
enable_internal_stats_ ?
tickerType < INTERNAL_TICKER_ENUM_MAX :
tickerType < TICKER_ENUM_MAX);
if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) {
auto info_ptr = getThreadTickerInfo(tickerType);
info_ptr->value.fetch_add(count, std::memory_order_relaxed);
}
per_core_stats_.Access()->tickers_[tickerType].fetch_add(
count, std::memory_order_relaxed);
if (stats_ && tickerType < TICKER_ENUM_MAX) {
stats_->recordTick(tickerType, count);
}
Expand All @@ -185,9 +128,7 @@ void StatisticsImpl::measureTime(uint32_t histogramType, uint64_t value) {
enable_internal_stats_ ?
histogramType < INTERNAL_HISTOGRAM_ENUM_MAX :
histogramType < HISTOGRAM_ENUM_MAX);
if (histogramType < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
getThreadHistogramInfo(histogramType)->value.Add(value);
}
per_core_stats_.Access()->histograms_[histogramType].Add(value);
if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) {
stats_->measureTime(histogramType, value);
}
Expand All @@ -199,11 +140,9 @@ Status StatisticsImpl::Reset() {
setTickerCountLocked(i, 0);
}
for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) {
histograms_[i].thread_value->Fold(
[](void* curr_ptr, void* res) {
static_cast<HistogramImpl*>(curr_ptr)->Clear();
},
nullptr /* res */);
for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear();
}
}
return Status::OK();
}
Expand Down Expand Up @@ -231,7 +170,7 @@ std::string StatisticsImpl::ToString() const {
if (h.first < HISTOGRAM_ENUM_MAX || enable_internal_stats_) {
char buffer[kTmpStrBufferSize];
HistogramData hData;
histogramDataLocked(h.first, &hData);
getHistogramImplLocked(h.first)->Data(&hData);
snprintf(
buffer, kTmpStrBufferSize,
"%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f 100 : %f\n",
Expand Down
111 changes: 29 additions & 82 deletions monitoring/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
#include "monitoring/histogram.h"
#include "port/likely.h"
#include "port/port.h"
#include "util/core_local.h"
#include "util/mutexlock.h"
#include "util/thread_local.h"

#ifdef __clang__
#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
#else
#define ROCKSDB_FIELD_UNUSED
#endif // __clang__

namespace rocksdb {

Expand Down Expand Up @@ -52,97 +58,38 @@ class StatisticsImpl : public Statistics {
virtual bool HistEnabledForType(uint32_t type) const override;

private:
std::shared_ptr<Statistics> stats_shared_;
Statistics* stats_;
// If non-nullptr, forwards updates to the object pointed to by `stats_`.
std::shared_ptr<Statistics> stats_;
// TODO(ajkr): clean this up since there are no internal stats anymore
bool enable_internal_stats_;
// Synchronizes anything that operates on other threads' thread-specific data
// Synchronizes anything that operates across other cores' local data,
// such that operations like Reset() can be performed atomically.
mutable port::Mutex aggregate_lock_;

// Holds data maintained by each thread for implementing tickers.
struct ThreadTickerInfo {
std::atomic_uint_fast64_t value;
// During teardown, value will be summed into *merged_sum.
std::atomic_uint_fast64_t* merged_sum;

ThreadTickerInfo(uint_fast64_t _value,
std::atomic_uint_fast64_t* _merged_sum)
: value(_value), merged_sum(_merged_sum) {}
// The ticker/histogram data are stored in this structure, which we will store
// per-core. It is cache-aligned, so tickers/histograms belonging to different
// cores can never share the same cache line.
//
// Alignment attributes expand to nothing depending on the platform
struct StatisticsData {
std::atomic_uint_fast64_t tickers_[INTERNAL_TICKER_ENUM_MAX] = {{0}};
HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
char
padding[(CACHE_LINE_SIZE -
(INTERNAL_TICKER_ENUM_MAX * sizeof(std::atomic_uint_fast64_t) +
INTERNAL_HISTOGRAM_ENUM_MAX * sizeof(HistogramImpl)) %
CACHE_LINE_SIZE) %
CACHE_LINE_SIZE] ROCKSDB_FIELD_UNUSED;
};

// Holds data maintained by each thread for implementing histograms.
struct ThreadHistogramInfo {
HistogramImpl value;
// During teardown, value will be merged into *merged_hist while holding
// *merge_lock, which also syncs with the merges necessary for reads.
HistogramImpl* merged_hist;
port::Mutex* merge_lock;
static_assert(sizeof(StatisticsData) % 64 == 0, "Expected 64-byte aligned");

ThreadHistogramInfo(HistogramImpl* _merged_hist, port::Mutex* _merge_lock)
: value(), merged_hist(_merged_hist), merge_lock(_merge_lock) {}
};

// Holds global data for implementing tickers.
struct TickerInfo {
TickerInfo()
: thread_value(new ThreadLocalPtr(&mergeThreadValue)), merged_sum(0) {}
// Holds thread-specific pointer to ThreadTickerInfo
std::unique_ptr<ThreadLocalPtr> thread_value;
// Sum of thread-specific values for tickers that have been reset due to
// thread termination or ThreadLocalPtr destruction. Also, this is used by
// setTickerCount() to conveniently change the global value by setting this
// while simultaneously zeroing all thread-local values.
std::atomic_uint_fast64_t merged_sum;

static void mergeThreadValue(void* ptr) {
auto info_ptr = static_cast<ThreadTickerInfo*>(ptr);
*info_ptr->merged_sum += info_ptr->value;
delete info_ptr;
}
};

// Holds global data for implementing histograms.
struct HistogramInfo {
HistogramInfo()
: merged_hist(),
merge_lock(),
thread_value(new ThreadLocalPtr(&mergeThreadValue)) {}
// Merged thread-specific values for histograms that have been reset due to
// thread termination or ThreadLocalPtr destruction. Note these must be
// destroyed after thread_value since its destructor accesses them.
HistogramImpl merged_hist;
mutable port::Mutex merge_lock;
// Holds thread-specific pointer to ThreadHistogramInfo
std::unique_ptr<ThreadLocalPtr> thread_value;

static void mergeThreadValue(void* ptr) {
auto info_ptr = static_cast<ThreadHistogramInfo*>(ptr);
{
MutexLock lock(info_ptr->merge_lock);
info_ptr->merged_hist->Merge(info_ptr->value);
}
delete info_ptr;
}

// Returns a histogram that merges all histograms (thread-specific and
// previously merged ones).
std::unique_ptr<HistogramImpl> getMergedHistogram() const;
};
CoreLocalArray<StatisticsData> per_core_stats_;

uint64_t getTickerCountLocked(uint32_t ticker_type) const;
void histogramDataLocked(uint32_t histogram_type,
HistogramData* const data) const;
std::unique_ptr<HistogramImpl> getHistogramImplLocked(
uint32_t histogram_type) const;
void setTickerCountLocked(uint32_t ticker_type, uint64_t count);

// Returns the info for this tickerType/thread. It sets a new info with zeroed
// counter if none exists.
ThreadTickerInfo* getThreadTickerInfo(uint32_t ticker_type);
// Returns the info for this histogramType/thread. It sets a new histogram
// with zeroed data if none exists.
ThreadHistogramInfo* getThreadHistogramInfo(uint32_t histogram_type);

TickerInfo tickers_[INTERNAL_TICKER_ENUM_MAX];
HistogramInfo histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
};

// Utility functions
Expand Down
Loading