From 23214d08ddbe73da4bfbd04e9e559d51e372250a Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Fri, 5 May 2017 17:58:13 -0700 Subject: [PATCH] Core-local statistics --- monitoring/statistics.cc | 129 +++++++++++---------------------------- monitoring/statistics.h | 104 +++++++------------------------ util/core_local.h | 24 ++++---- 3 files changed, 68 insertions(+), 189 deletions(-) diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 2ba2473912e..caab71c610f 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -23,13 +23,9 @@ std::shared_ptr CreateDBStatistics() { return std::make_shared(nullptr, false); } -StatisticsImpl::StatisticsImpl( - std::shared_ptr stats, - bool enable_internal_stats) - : stats_shared_(stats), - stats_(stats.get()), - enable_internal_stats_(enable_internal_stats) { -} +StatisticsImpl::StatisticsImpl(std::shared_ptr stats, + bool enable_internal_stats) + : stats_(std::move(stats)), enable_internal_stats_(enable_internal_stats) {} StatisticsImpl::~StatisticsImpl() {} @@ -43,79 +39,36 @@ uint64_t StatisticsImpl::getTickerCountLocked(uint32_t tickerType) const { enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX : tickerType < TICKER_ENUM_MAX); - uint64_t thread_local_sum = 0; - tickers_[tickerType].thread_value->Fold( - [](void* curr_ptr, void* res) { - auto* sum_ptr = static_cast(res); - *sum_ptr += static_cast(curr_ptr)->load( - std::memory_order_relaxed); - }, - &thread_local_sum); - return thread_local_sum + - tickers_[tickerType].merged_sum.load(std::memory_order_relaxed); -} - -std::unique_ptr -StatisticsImpl::HistogramInfo::getMergedHistogram() const { - std::unique_ptr res_hist(new HistogramImpl()); - { - MutexLock lock(&merge_lock); - res_hist->Merge(merged_hist); + uint64_t res = 0; + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType]; } - thread_value->Fold( - [](void* curr_ptr, void* res) { - auto tmp_res_hist = static_cast(res); - auto curr_hist = static_cast(curr_ptr); - tmp_res_hist->Merge(*curr_hist); - }, - res_hist.get()); - return res_hist; + return res; } void StatisticsImpl::histogramData(uint32_t histogramType, HistogramData* const data) const { MutexLock lock(&aggregate_lock_); - histogramDataLocked(histogramType, data); + getHistogramImplLocked(histogramType)->Data(data); } -void StatisticsImpl::histogramDataLocked(uint32_t histogramType, - HistogramData* const data) const { +std::unique_ptr StatisticsImpl::getHistogramImplLocked( + uint32_t histogramType) const { assert( enable_internal_stats_ ? histogramType < INTERNAL_HISTOGRAM_ENUM_MAX : histogramType < HISTOGRAM_ENUM_MAX); - histograms_[histogramType].getMergedHistogram()->Data(data); + std::unique_ptr res_hist(new HistogramImpl()); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + res_hist->Merge( + per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]); + } + return std::move(res_hist); } std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const { MutexLock lock(&aggregate_lock_); - assert(enable_internal_stats_ ? histogramType < INTERNAL_HISTOGRAM_ENUM_MAX - : histogramType < HISTOGRAM_ENUM_MAX); - return histograms_[histogramType].getMergedHistogram()->ToString(); -} - -StatisticsImpl::ThreadTickerInfo* StatisticsImpl::getThreadTickerInfo( - uint32_t tickerType) { - auto info_ptr = - static_cast(tickers_[tickerType].thread_value->Get()); - if (info_ptr == nullptr) { - info_ptr = - new ThreadTickerInfo(0 /* value */, &tickers_[tickerType].merged_sum); - tickers_[tickerType].thread_value->Reset(info_ptr); - } - return info_ptr; -} - -StatisticsImpl::ThreadHistogramInfo* StatisticsImpl::getThreadHistogramInfo( - uint32_t histogram_type) { - auto info_ptr = static_cast( - histograms_[histogram_type].thread_value->Get()); - if (info_ptr == nullptr) { - info_ptr = new ThreadHistogramInfo(&histograms_[histogram_type].merged_hist, - &histograms_[histogram_type].merge_lock); - histograms_[histogram_type].thread_value->Reset(info_ptr); - } - return info_ptr; + return getHistogramImplLocked(histogramType)->ToString(); } void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) { @@ -131,14 +84,12 @@ void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) { void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) { assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX : tickerType < TICKER_ENUM_MAX); - if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) { - tickers_[tickerType].thread_value->Fold( - [](void* curr_ptr, void* res) { - static_cast*>(curr_ptr)->store( - 0, std::memory_order_relaxed); - }, - nullptr /* res */); - tickers_[tickerType].merged_sum.store(count, std::memory_order_relaxed); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + if (core_idx == 0) { + per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count; + } else { + per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0; + } } } @@ -148,16 +99,10 @@ uint64_t StatisticsImpl::getAndResetTickerCount(uint32_t tickerType) { MutexLock lock(&aggregate_lock_); assert(enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX : tickerType < TICKER_ENUM_MAX); - if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) { - tickers_[tickerType].thread_value->Fold( - [](void* curr_ptr, void* res) { - auto* sum_ptr = static_cast(res); - *sum_ptr += static_cast*>(curr_ptr)->exchange( - 0, std::memory_order_relaxed); - }, - &sum); - sum += tickers_[tickerType].merged_sum.exchange( - 0, std::memory_order_relaxed); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + sum += + per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange( + 0, std::memory_order_relaxed); } } if (stats_ && tickerType < TICKER_ENUM_MAX) { @@ -171,10 +116,8 @@ void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) { enable_internal_stats_ ? tickerType < INTERNAL_TICKER_ENUM_MAX : tickerType < TICKER_ENUM_MAX); - if (tickerType < TICKER_ENUM_MAX || enable_internal_stats_) { - auto info_ptr = getThreadTickerInfo(tickerType); - info_ptr->value.fetch_add(count, std::memory_order_relaxed); - } + per_core_stats_.Access()->tickers_[tickerType].fetch_add( + count, std::memory_order_relaxed); if (stats_ && tickerType < TICKER_ENUM_MAX) { stats_->recordTick(tickerType, count); } @@ -185,9 +128,7 @@ void StatisticsImpl::measureTime(uint32_t histogramType, uint64_t value) { enable_internal_stats_ ? histogramType < INTERNAL_HISTOGRAM_ENUM_MAX : histogramType < HISTOGRAM_ENUM_MAX); - if (histogramType < HISTOGRAM_ENUM_MAX || enable_internal_stats_) { - getThreadHistogramInfo(histogramType)->value.Add(value); - } + per_core_stats_.Access()->histograms_[histogramType].Add(value); if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) { stats_->measureTime(histogramType, value); } @@ -199,11 +140,9 @@ Status StatisticsImpl::Reset() { setTickerCountLocked(i, 0); } for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) { - histograms_[i].thread_value->Fold( - [](void* curr_ptr, void* res) { - static_cast(curr_ptr)->Clear(); - }, - nullptr /* res */); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear(); + } } return Status::OK(); } @@ -231,7 +170,7 @@ std::string StatisticsImpl::ToString() const { if (h.first < HISTOGRAM_ENUM_MAX || enable_internal_stats_) { char buffer[kTmpStrBufferSize]; HistogramData hData; - histogramDataLocked(h.first, &hData); + getHistogramImplLocked(h.first)->Data(&hData); snprintf( buffer, kTmpStrBufferSize, "%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f 100 : %f\n", diff --git a/monitoring/statistics.h b/monitoring/statistics.h index 6dc98a70d7c..acb5028c52f 100644 --- a/monitoring/statistics.h +++ b/monitoring/statistics.h @@ -15,8 +15,8 @@ #include "monitoring/histogram.h" #include "port/likely.h" #include "port/port.h" +#include "util/core_local.h" #include "util/mutexlock.h" -#include "util/thread_local.h" namespace rocksdb { @@ -52,97 +52,37 @@ class StatisticsImpl : public Statistics { virtual bool HistEnabledForType(uint32_t type) const override; private: - std::shared_ptr stats_shared_; - Statistics* stats_; + // If non-nullptr, forwards updates to the object pointed to by `stats_`. + std::shared_ptr stats_; + // TODO(ajkr): clean this up since there are no internal stats anymore bool enable_internal_stats_; - // Synchronizes anything that operates on other threads' thread-specific data + // Synchronizes anything that operates across other cores' local data, // such that operations like Reset() can be performed atomically. mutable port::Mutex aggregate_lock_; - // Holds data maintained by each thread for implementing tickers. - struct ThreadTickerInfo { - std::atomic_uint_fast64_t value; - // During teardown, value will be summed into *merged_sum. - std::atomic_uint_fast64_t* merged_sum; - - ThreadTickerInfo(uint_fast64_t _value, - std::atomic_uint_fast64_t* _merged_sum) - : value(_value), merged_sum(_merged_sum) {} - }; - - // Holds data maintained by each thread for implementing histograms. - struct ThreadHistogramInfo { - HistogramImpl value; - // During teardown, value will be merged into *merged_hist while holding - // *merge_lock, which also syncs with the merges necessary for reads. - HistogramImpl* merged_hist; - port::Mutex* merge_lock; - - ThreadHistogramInfo(HistogramImpl* _merged_hist, port::Mutex* _merge_lock) - : value(), merged_hist(_merged_hist), merge_lock(_merge_lock) {} - }; - - // Holds global data for implementing tickers. - struct TickerInfo { - TickerInfo() - : thread_value(new ThreadLocalPtr(&mergeThreadValue)), merged_sum(0) {} - // Holds thread-specific pointer to ThreadTickerInfo - std::unique_ptr thread_value; - // Sum of thread-specific values for tickers that have been reset due to - // thread termination or ThreadLocalPtr destruction. Also, this is used by - // setTickerCount() to conveniently change the global value by setting this - // while simultaneously zeroing all thread-local values. - std::atomic_uint_fast64_t merged_sum; - - static void mergeThreadValue(void* ptr) { - auto info_ptr = static_cast(ptr); - *info_ptr->merged_sum += info_ptr->value; - delete info_ptr; - } - }; - - // Holds global data for implementing histograms. - struct HistogramInfo { - HistogramInfo() - : merged_hist(), - merge_lock(), - thread_value(new ThreadLocalPtr(&mergeThreadValue)) {} - // Merged thread-specific values for histograms that have been reset due to - // thread termination or ThreadLocalPtr destruction. Note these must be - // destroyed after thread_value since its destructor accesses them. - HistogramImpl merged_hist; - mutable port::Mutex merge_lock; - // Holds thread-specific pointer to ThreadHistogramInfo - std::unique_ptr thread_value; - - static void mergeThreadValue(void* ptr) { - auto info_ptr = static_cast(ptr); - { - MutexLock lock(info_ptr->merge_lock); - info_ptr->merged_hist->Merge(info_ptr->value); + // The ticker/histogram data are stored in this structure, which we will store + // per-core. It is cache-aligned, so tickers/histograms belonging to different + // cores can never share the same cache line. + // + // Alignment attributes expand to nothing depending on the platform + __declspec(align(64)) struct StatisticsData { + StatisticsData() { + for (uint32_t i = 0; i < INTERNAL_TICKER_ENUM_MAX; ++i) { + std::atomic_init(&tickers_[i], static_cast(0)); } - delete info_ptr; } + std::atomic_uint_fast64_t tickers_[INTERNAL_TICKER_ENUM_MAX]; + HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX]; + } __attribute__((__aligned__(64))); + + static_assert(sizeof(StatisticsData) % 64 == 0, "Expected 64-byte aligned"); - // Returns a histogram that merges all histograms (thread-specific and - // previously merged ones). - std::unique_ptr getMergedHistogram() const; - }; + CoreLocalArray per_core_stats_; uint64_t getTickerCountLocked(uint32_t ticker_type) const; - void histogramDataLocked(uint32_t histogram_type, - HistogramData* const data) const; + std::unique_ptr getHistogramImplLocked( + uint32_t histogram_type) const; void setTickerCountLocked(uint32_t ticker_type, uint64_t count); - - // Returns the info for this tickerType/thread. It sets a new info with zeroed - // counter if none exists. - ThreadTickerInfo* getThreadTickerInfo(uint32_t ticker_type); - // Returns the info for this histogramType/thread. It sets a new histogram - // with zeroed data if none exists. - ThreadHistogramInfo* getThreadHistogramInfo(uint32_t histogram_type); - - TickerInfo tickers_[INTERNAL_TICKER_ENUM_MAX]; - HistogramInfo histograms_[INTERNAL_HISTOGRAM_ENUM_MAX]; }; // Utility functions diff --git a/util/core_local.h b/util/core_local.h index 4608011fe71..e45283a44ad 100644 --- a/util/core_local.h +++ b/util/core_local.h @@ -7,19 +7,20 @@ #pragma once -#include "port/likely.h" -#include "port/port.h" -#include "util/random.h" - #include #include +#include #include +#include "port/likely.h" +#include "port/port.h" +#include "util/random.h" + namespace rocksdb { // An array of core-local values. Ideally the value type, T, is cache aligned to // prevent false sharing. -template +template class CoreLocalArray { public: CoreLocalArray(); @@ -41,7 +42,7 @@ class CoreLocalArray { size_t size_; }; -template +template CoreLocalArray::CoreLocalArray() { unsigned int num_cpus = std::thread::hardware_concurrency(); // find a power of two >= num_cpus and >= 8 @@ -52,31 +53,30 @@ CoreLocalArray::CoreLocalArray() { data_.reset(new T[size_]); } -template +template size_t CoreLocalArray::Size() const { return size_; } -template +template T* CoreLocalArray::Access() const { return AccessElementAndIndex().first; } -template +template std::pair CoreLocalArray::AccessElementAndIndex() const { int cpuid = port::PhysicalCoreID(); size_t core_idx; if (UNLIKELY(cpuid < 0)) { // cpu id unavailable, just pick randomly - core_idx = - Random::GetTLSInstance()->Uniform(static_cast(size_)); + core_idx = Random::GetTLSInstance()->Uniform(static_cast(size_)); } else { core_idx = static_cast(cpuid) % size_; } return {AccessAtCore(core_idx), core_idx}; } -template +template T* CoreLocalArray::AccessAtCore(size_t core_idx) const { assert(core_idx < size_); return &data_[core_idx];