From 53b7333b2b6cdbd7bc363a1c6fad968c6a4d6ae4 Mon Sep 17 00:00:00 2001 From: Rama Date: Thu, 19 Apr 2018 06:46:30 +0530 Subject: [PATCH 01/16] native log linear implementation Signed-off-by: Rama --- bazel/external/libcircllhist.BUILD | 9 + bazel/repositories.bzl | 11 + bazel/repository_locations.bzl | 4 + include/envoy/stats/stats.h | 81 +++++- include/envoy/thread_local/thread_local.h | 9 + source/common/common/logger.h | 4 +- source/common/stats/BUILD | 5 + source/common/stats/stats_impl.cc | 32 +++ source/common/stats/stats_impl.h | 54 +++- source/common/stats/thread_local_store.cc | 144 +++++++++- source/common/stats/thread_local_store.h | 126 ++++++++- .../common/thread_local/thread_local_impl.cc | 15 + .../common/thread_local/thread_local_impl.h | 4 + .../stat_sinks/common/statsd/statsd.h | 3 + .../grpc_metrics_service_impl.cc | 36 +++ .../grpc_metrics_service_impl.h | 26 +- source/server/http/admin.cc | 60 +++- source/server/http/admin.h | 3 +- source/server/server.cc | 40 ++- source/server/server.h | 10 +- test/common/stats/thread_local_store_test.cc | 266 +++++++++++++++++- .../thread_local/thread_local_impl_test.cc | 36 +++ .../grpc_metrics_service_impl_test.cc | 4 + .../metrics_service_integration_test.cc | 24 +- test/integration/server.h | 6 + test/mocks/stats/mocks.cc | 11 + test/mocks/stats/mocks.h | 26 ++ test/mocks/thread_local/mocks.h | 8 + test/server/server_test.cc | 2 +- 29 files changed, 986 insertions(+), 73 deletions(-) create mode 100644 bazel/external/libcircllhist.BUILD diff --git a/bazel/external/libcircllhist.BUILD b/bazel/external/libcircllhist.BUILD new file mode 100644 index 000000000000..4e109f0b38d4 --- /dev/null +++ b/bazel/external/libcircllhist.BUILD @@ -0,0 +1,9 @@ +cc_library( + name = "libcircllhist", + srcs = ["src/circllhist.c"], + hdrs = [ + "src/circllhist.h", + ], + includes = ["src"], + visibility = ["//visibility:public"], +) diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 7b3ba8242984..8f2d60a31583 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -226,6 +226,7 @@ def envoy_dependencies(path = "@envoy_deps//", skip_targets = []): _boringssl() _com_google_absl() _com_github_bombela_backward() + _com_github_circonus_labs_libcircllhist() _com_github_cyan4973_xxhash() _com_github_eile_tclap() _com_github_fmtlib_fmt() @@ -264,6 +265,16 @@ def _com_github_bombela_backward(): actual = "@com_github_bombela_backward//:backward", ) +def _com_github_circonus_labs_libcircllhist(): + _repository_impl( + name = "com_github_circonus_labs_libcircllhist", + build_file = "@envoy//bazel/external:libcircllhist.BUILD", + ) + native.bind( + name = "libcircllhist", + actual = "@com_github_circonus_labs_libcircllhist//:libcircllhist", + ) + def _com_github_cyan4973_xxhash(): _repository_impl( name = "com_github_cyan4973_xxhash", diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index cf9bd707dec8..bf19fc0fc56b 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -12,6 +12,10 @@ REPOSITORY_LOCATIONS = dict( commit = "44ae9609e860e3428cd057f7052e505b4819eb84", # 2018-02-06 remote = "https://github.com/bombela/backward-cpp", ), + com_github_circonus_labs_libcircllhist = dict( + commit = "97ef5e088fd01fa8ec5a86334a6308ac0d51ea6f", # 2018-04-07 + remote = "https://github.com/circonus-labs/libcircllhist", + ), com_github_cyan4973_xxhash = dict( commit = "7caf8bd76440c75dfe1070d3acfbd7891aea8fca", # v0.6.4 remote = "https://github.com/Cyan4973/xxHash", diff --git a/include/envoy/stats/stats.h b/include/envoy/stats/stats.h index b84e811047fa..1526c9f673cb 100644 --- a/include/envoy/stats/stats.h +++ b/include/envoy/stats/stats.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -114,6 +115,11 @@ class Metric { * Returns the name of the Metric with the portions designated as tags removed. */ virtual const std::string& tagExtractedName() const PURE; + + /** + * Indicates whether this metric has been updated since the server was started. + */ + virtual bool used() const PURE; }; /** @@ -128,7 +134,6 @@ class Counter : public virtual Metric { virtual void inc() PURE; virtual uint64_t latch() PURE; virtual void reset() PURE; - virtual bool used() const PURE; virtual uint64_t value() const PURE; }; @@ -146,12 +151,34 @@ class Gauge : public virtual Metric { virtual void inc() PURE; virtual void set(uint64_t value) PURE; virtual void sub(uint64_t amount) PURE; - virtual bool used() const PURE; virtual uint64_t value() const PURE; }; typedef std::shared_ptr GaugeSharedPtr; +/** + * Holds the computed statistics for a histogram. + */ +class HistogramStatistics { +public: + virtual ~HistogramStatistics() {} + + /** + * Returns summary representation of the histogram. + */ + virtual std::string summary() const PURE; + + /** + * Returns supported quantiles. + */ + virtual const std::vector& supportedQuantiles() const PURE; + + /** + * Returns computed quantile values during the period. + */ + virtual const std::vector& computedQuantiles() const PURE; +}; + /** * A histogram that records values one at a time. * Note: Histograms now incorporate what used to be timers because the only difference between the @@ -171,6 +198,33 @@ class Histogram : public virtual Metric { typedef std::shared_ptr HistogramSharedPtr; +/** + * A histogram that is stored in main thread, manages all thread local histograms and provides + * summary view of the histogram. + */ +class ParentHistogram : public virtual Metric { +public: + virtual ~ParentHistogram() {} + + /** + * This method is called during the main stats flush process for each of the histogram and used + * to merge the histogram values. + */ + virtual void merge() PURE; + + /** + * Returns the interval histogram summary statistics for the flush interval. + */ + virtual const HistogramStatistics& intervalStatistics() const PURE; + + /** + * Returns the cumulative histogram summary statistics. + */ + virtual const HistogramStatistics& cumulativeStatistics() const PURE; +}; + +typedef std::shared_ptr ParentHistogramSharedPtr; + /** * A sink for stats. Each sink is responsible for writing stats to a backing store. */ @@ -194,6 +248,11 @@ class Sink { */ virtual void flushGauge(const Gauge& gauge, uint64_t value) PURE; + /** + * Flush a histogram. + */ + virtual void flushHistogram(const ParentHistogram& histogram) PURE; + /** * This will be called after beginFlush(), some number of flushCounter(), and some number of * flushGauge(). Sinks can use this to optimize writing if desired. @@ -263,10 +322,20 @@ class Store : public Scope { * @return a list of all known gauges. */ virtual std::list gauges() const PURE; + + /** + * @return a list of all known histograms. + */ + virtual std::list histograms() const PURE; }; typedef std::unique_ptr StorePtr; +/** + * Callback invoked when a store's mergeHistogram() runs. + */ +typedef std::function PostMergeCb; + /** * The root of the stat store. */ @@ -294,6 +363,14 @@ class StoreRoot : public Store { * down. */ virtual void shutdownThreading() PURE; + + /** + * Called during the flush process to merge all the thread local histograms. The passed in + * callback will be called on the main thread, but it will happen after the method returns + * which means that the actual flush process will happen on the main thread after this method + * returns. + */ + virtual void mergeHistograms(PostMergeCb merge_complete_cb) PURE; }; typedef std::unique_ptr StoreRootPtr; diff --git a/include/envoy/thread_local/thread_local.h b/include/envoy/thread_local/thread_local.h index 1db262c95720..f349715e94ff 100644 --- a/include/envoy/thread_local/thread_local.h +++ b/include/envoy/thread_local/thread_local.h @@ -46,6 +46,15 @@ class Slot { */ virtual void runOnAllThreads(Event::PostCb cb) PURE; + /** + * Run a callback on all registered threads with a barrier. A shutdown initiated during the + * running of the PostCBs may prevent all_threads_complete_cb from being called. + * @param cb supplies the callback to run on each thread. + * @param all_threads_complete_cb supplies the callback to run on main thread after threads are + * done. + */ + virtual void runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) PURE; + /** * Set thread local data on all threads previously registered via registerThread(). * @param initializeCb supplies the functor that will be called *on each thread*. The functor diff --git a/source/common/common/logger.h b/source/common/common/logger.h index 1002591cffdb..4bbda589c86b 100644 --- a/source/common/common/logger.h +++ b/source/common/common/logger.h @@ -43,7 +43,9 @@ namespace Logger { FUNCTION(testing) \ FUNCTION(tracing) \ FUNCTION(upstream) \ - FUNCTION(grpc) + FUNCTION(grpc) \ + FUNCTION(stats) + enum class Id { ALL_LOGGER_IDS(GENERATE_ENUM) diff --git a/source/common/stats/BUILD b/source/common/stats/BUILD index 1d6698e72c95..04ef67fafcf1 100644 --- a/source/common/stats/BUILD +++ b/source/common/stats/BUILD @@ -12,12 +12,17 @@ envoy_cc_library( name = "stats_lib", srcs = ["stats_impl.cc"], hdrs = ["stats_impl.h"], + external_deps = [ + "abseil_optional", + "libcircllhist", + ], deps = [ "//include/envoy/common:time_interface", "//include/envoy/server:options_interface", "//include/envoy/stats:stats_interface", "//source/common/common:assert_lib", "//source/common/common:hash_lib", + "//source/common/common:non_copyable", "//source/common/common:perf_annotation_lib", "//source/common/common:utility_lib", "//source/common/config:well_known_names", diff --git a/source/common/stats/stats_impl.cc b/source/common/stats/stats_impl.cc index 03b88d6b625d..2dce5d6be2e4 100644 --- a/source/common/stats/stats_impl.cc +++ b/source/common/stats/stats_impl.cc @@ -273,5 +273,37 @@ void RawStatData::initialize(absl::string_view key) { name_[xfer_size] = '\0'; } +HistogramStatisticsImpl::HistogramStatisticsImpl(const histogram_t* histogram_ptr) + : computed_quantiles_(supportedQuantiles().size(), 0.0) { + hist_approx_quantile(histogram_ptr, supportedQuantiles().data(), supportedQuantiles().size(), + computed_quantiles_.data()); +} + +const std::vector& HistogramStatisticsImpl::supportedQuantiles() const { + static const std::vector supported_quantiles = {0, 0.25, 0.5, 0.75, 0.90, + 0.95, 0.99, 0.999, 1}; + return supported_quantiles; +} + +std::string HistogramStatisticsImpl::summary() const { + std::vector summary; + const std::vector& supported_quantiles_ref = supportedQuantiles(); + for (size_t i = 0; i < supported_quantiles_ref.size(); ++i) { + summary.push_back( + fmt::format("P{}: {}", 100 * supported_quantiles_ref[i], computed_quantiles_[i])); + } + return absl::StrJoin(summary, ", "); +} + +/** + * Clears the old computed values and refreshes it with values computed from passed histogram. + */ +void HistogramStatisticsImpl::refresh(const histogram_t* new_histogram_ptr) { + std::fill(computed_quantiles_.begin(), computed_quantiles_.end(), 0.0); + ASSERT(supportedQuantiles().size() == computed_quantiles_.size()); + hist_approx_quantile(new_histogram_ptr, supportedQuantiles().data(), supportedQuantiles().size(), + computed_quantiles_.data()); +} + } // namespace Stats } // namespace Envoy diff --git a/source/common/stats/stats_impl.h b/source/common/stats/stats_impl.h index 85629d3449ea..fb609a5fa0b9 100644 --- a/source/common/stats/stats_impl.h +++ b/source/common/stats/stats_impl.h @@ -18,10 +18,13 @@ #include "common/common/assert.h" #include "common/common/hash.h" +#include "common/common/non_copyable.h" #include "common/common/utility.h" #include "common/protobuf/protobuf.h" +#include "absl/strings/str_join.h" #include "absl/strings/string_view.h" +#include "circllhist.h" namespace Envoy { namespace Stats { @@ -167,9 +170,6 @@ class Utility { * RawStatData::size() instead. */ struct RawStatData { - struct Flags { - static const uint8_t Used = 0x1; - }; /** * Due to the flexible-array-length of name_, c-style allocation @@ -284,6 +284,14 @@ class MetricImpl : public virtual Metric { const std::string& tagExtractedName() const override { return tag_extracted_name_; } const std::vector& tags() const override { return tags_; } +protected: + /** + * Flags used by all stats types to figure out whether they have been used. + */ + struct Flags { + static const uint8_t Used = 0x1; + }; + private: const std::string name_; const std::string tag_extracted_name_; @@ -305,13 +313,13 @@ class CounterImpl : public Counter, public MetricImpl { void add(uint64_t amount) override { data_.value_ += amount; data_.pending_increment_ += amount; - data_.flags_ |= RawStatData::Flags::Used; + data_.flags_ |= Flags::Used; } void inc() override { add(1); } uint64_t latch() override { return data_.pending_increment_.exchange(0); } void reset() override { data_.value_ = 0; } - bool used() const override { return data_.flags_ & RawStatData::Flags::Used; } + bool used() const override { return data_.flags_ & Flags::Used; } uint64_t value() const override { return data_.value_; } private: @@ -333,13 +341,13 @@ class GaugeImpl : public Gauge, public MetricImpl { // Stats::Gauge virtual void add(uint64_t amount) override { data_.value_ += amount; - data_.flags_ |= RawStatData::Flags::Used; + data_.flags_ |= Flags::Used; } virtual void dec() override { sub(1); } virtual void inc() override { add(1); } virtual void set(uint64_t value) override { data_.value_ = value; - data_.flags_ |= RawStatData::Flags::Used; + data_.flags_ |= Flags::Used; } virtual void sub(uint64_t amount) override { ASSERT(data_.value_ >= amount); @@ -347,13 +355,36 @@ class GaugeImpl : public Gauge, public MetricImpl { data_.value_ -= amount; } virtual uint64_t value() const override { return data_.value_; } - bool used() const override { return data_.flags_ & RawStatData::Flags::Used; } + bool used() const override { return data_.flags_ & Flags::Used; } private: RawStatData& data_; RawStatDataAllocator& alloc_; }; +/** + * Implementation of HistogramStatistics for circllhist. + */ +class HistogramStatisticsImpl : public HistogramStatistics, NonCopyable { +public: + HistogramStatisticsImpl() : computed_quantiles_(supportedQuantiles().size(), 0.0) {} + /** + * HistogramStatisticsImpl object is constructed using the passed in histogram. + * @param histogram_ptr pointer to the histogram for which stats will be calculated. This pointer + * will not be retained. + */ + HistogramStatisticsImpl(const histogram_t* histogram_ptr); + + std::string summary() const override; + const std::vector& supportedQuantiles() const override; + const std::vector& computedQuantiles() const override { return computed_quantiles_; } + + void refresh(const histogram_t* new_histogram_ptr); + +private: + std::vector computed_quantiles_; +}; + /** * Histogram implementation for the heap. */ @@ -366,6 +397,10 @@ class HistogramImpl : public Histogram, public MetricImpl { // Stats::Histogram void recordValue(uint64_t value) override { parent_.deliverHistogramToSinks(*this, value); } + bool used() const override { return true; } + +private: + // This is used for delivering the histogram data to sinks. Store& parent_; }; @@ -446,6 +481,9 @@ class IsolatedStoreImpl : public Store { // Stats::Store std::list counters() const override { return counters_.toList(); } std::list gauges() const override { return gauges_.toList(); } + std::list histograms() const override { + return std::list{}; + } private: struct ScopeImpl : public Scope { diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 6d0036c6b1c7..56d7f5539325 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -61,6 +61,30 @@ std::list ThreadLocalStoreImpl::gauges() const { return ret; } +std::list ThreadLocalStoreImpl::histograms() const { + // Handle de-dup due to overlapping scopes. + std::list ret; + std::unordered_set names; + std::unique_lock lock(lock_); + // TODO(ramaraochavali): As histograms don't share storage, there is a chance of duplicate names + // here. We need process global storage for histograms similar to how we have a central storage + // in shared memory for counters/gauges. + for (ScopeImpl* scope : scopes_) { + for (const auto& name_histogram_pair : scope->central_cache_.histograms_) { + const std::string& hist_name = name_histogram_pair.first; + const ParentHistogramSharedPtr& parent_hist = name_histogram_pair.second; + if (names.insert(hist_name).second) { + ret.push_back(parent_hist); + } else { + ENVOY_LOG(warn, "duplicate histogram {}.{}: data loss will occur on output", scope->prefix_, + hist_name); + } + } + } + + return ret; +} + void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_dispatcher, ThreadLocal::Instance& tls) { main_thread_dispatcher_ = &main_thread_dispatcher; @@ -75,6 +99,31 @@ void ThreadLocalStoreImpl::shutdownThreading() { shutting_down_ = true; } +void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) { + if (!shutting_down_) { + tls_->runOnAllThreads( + [this]() -> void { + for (ScopeImpl* scope : scopes_) { + for (const auto& name_histogram_pair : + tls_->getTyped().scope_cache_[scope].histograms_) { + const TlsHistogramSharedPtr& tls_hist = name_histogram_pair.second; + tls_hist->beginMerge(); + } + } + }, + [this, merge_complete_cb]() -> void { mergeInternal(merge_complete_cb); }); + } +} + +void ThreadLocalStoreImpl::mergeInternal(PostMergeCb merge_complete_cb) { + if (!shutting_down_) { + for (const ParentHistogramSharedPtr& histogram : histograms()) { + histogram->merge(); + } + merge_complete_cb(); + } +} + void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) { std::unique_lock lock(lock_); ASSERT(scopes_.count(scope) == 1); @@ -204,7 +253,7 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { // See comments in counter(). There is no super clean way (via templates or otherwise) to // share this code so I'm leaving it largely duplicated for now. std::string final_name = prefix_ + name; - HistogramSharedPtr* tls_ref = nullptr; + TlsHistogramSharedPtr* tls_ref = nullptr; if (!parent_.shutting_down_ && parent_.tls_) { tls_ref = &parent_.tls_->getTyped().scope_cache_[this].histograms_[final_name]; } @@ -214,19 +263,98 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { } std::unique_lock lock(parent_.lock_); - HistogramSharedPtr& central_ref = central_cache_.histograms_[final_name]; + ParentHistogramImplSharedPtr& central_ref = central_cache_.histograms_[final_name]; + + std::vector tags; + std::string tag_extracted_name = parent_.getTagsForName(final_name, tags); if (!central_ref) { - std::vector tags; - std::string tag_extracted_name = parent_.getTagsForName(final_name, tags); - central_ref.reset( - new HistogramImpl(final_name, parent_, std::move(tag_extracted_name), std::move(tags))); + // Since MetricImpl only has move constructor, we are explicitly copying here. + std::string central_tag_extracted_name(tag_extracted_name); + std::vector central_tags(tags); + central_ref.reset(new ParentHistogramImpl( + final_name, parent_, std::move(central_tag_extracted_name), std::move(central_tags))); } + TlsHistogramSharedPtr hist_tls_ptr(new ThreadLocalHistogramImpl( + final_name, parent_, std::move(tag_extracted_name), std::move(tags))); + central_ref->addTlsHistogram(hist_tls_ptr); if (tls_ref) { - *tls_ref = central_ref; + *tls_ref = hist_tls_ptr; } + return *hist_tls_ptr; +} - return *central_ref; +ThreadLocalHistogramImpl::ThreadLocalHistogramImpl(const std::string& name, Store& parent, + std::string&& tag_extracted_name, + std::vector&& tags) + : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), parent_(parent), + current_active_(0), flags_(0) { + histograms_[0] = hist_alloc(); + histograms_[1] = hist_alloc(); +} + +ThreadLocalHistogramImpl::~ThreadLocalHistogramImpl() { + hist_free(histograms_[0]); + hist_free(histograms_[1]); +} + +void ThreadLocalHistogramImpl::recordValue(uint64_t value) { + hist_insert_intscale(histograms_[current_active_], value, 0, 1); + parent_.deliverHistogramToSinks(*this, value); + flags_ |= Flags::Used; +} + +void ThreadLocalHistogramImpl::merge(histogram_t* target) { + uint64_t other_histogram_index = otherHistogramIndex(); + hist_accumulate(target, &histograms_[other_histogram_index], 1); + hist_clear(histograms_[other_histogram_index]); +} + +ParentHistogramImpl::ParentHistogramImpl(const std::string& name, Store& parent, + std::string&& tag_extracted_name, std::vector&& tags) + : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), parent_(parent), + interval_histogram_(hist_alloc()), cumulative_histogram_(hist_alloc()), + interval_statistics_(interval_histogram_), cumulative_statistics_(cumulative_histogram_) {} + +bool ParentHistogramImpl::used() const { + std::unique_lock lock(merge_lock_); + return usedWorker(); +} + +ParentHistogramImpl::~ParentHistogramImpl() { + hist_free(interval_histogram_); + hist_free(cumulative_histogram_); +} + +void ParentHistogramImpl::merge() { + std::unique_lock lock(merge_lock_); + if (usedWorker()) { + hist_clear(interval_histogram_); + for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) { + tls_histogram->merge(interval_histogram_); + } + // Since TLS merge is done, we can release the lock here. + lock.unlock(); + hist_accumulate(cumulative_histogram_, &interval_histogram_, 1); + cumulative_statistics_.refresh(cumulative_histogram_); + interval_statistics_.refresh(interval_histogram_); + } +} + +void ParentHistogramImpl::addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr) { + std::unique_lock lock(merge_lock_); + tls_histograms_.emplace_back(hist_ptr); +} + +bool ParentHistogramImpl::usedWorker() const { + bool any_tls_used = false; + for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) { + if (tls_histogram->used()) { + any_tls_used = true; + break; + } + } + return any_tls_used; } } // namespace Stats diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 030bb4dd4f28..1be78bd17d51 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -16,6 +16,95 @@ namespace Envoy { namespace Stats { +/** + * A histogram that is stored in TLS and used to record values per thread. This holds two + * histograms, one to collect the values and other as backup that is used for merge process. The + * swap happens during the merge process. + */ +class ThreadLocalHistogram : public virtual Histogram { +public: + virtual ~ThreadLocalHistogram() {} + + /** + * Called in the beginning of merge process. Swaps the histogram used for collection so that we do + * not have to lock the histogram in high throughput TLS writes. + */ + virtual void beginMerge() PURE; +}; + +/** + * Log Linear Histogram implementation per thread. + */ +class ThreadLocalHistogramImpl : public ThreadLocalHistogram, public MetricImpl { +public: + ThreadLocalHistogramImpl(const std::string& name, Store& parent, std::string&& tag_extracted_name, + std::vector&& tags); + + ~ThreadLocalHistogramImpl(); + // Stats::Histogram + void recordValue(uint64_t value) override; + + bool used() const override { return flags_ & Flags::Used; } + void beginMerge() override { + // this switches the current_active_ between 1 and 0. + current_active_ = otherHistogramIndex(); + } + + void merge(histogram_t* target); + + Store& parent_; + +private: + uint64_t otherHistogramIndex() const { return 1 - current_active_; } + uint64_t current_active_; + histogram_t* histograms_[2]; + std::atomic flags_; +}; + +typedef std::shared_ptr TlsHistogramSharedPtr; + +/** + * Log Linear Histogram implementation that is stored in the main thread. + */ +class ParentHistogramImpl : public ParentHistogram, public MetricImpl { +public: + ParentHistogramImpl(const std::string& name, Store& parent, std::string&& tag_extracted_name, + std::vector&& tags); + + virtual ~ParentHistogramImpl(); + + bool used() const override; + + /** + * This method is called during the main stats flush process for each of the histograms. It + * iterates through the TLS histograms and collects the histogram data of all of them + * in to "interval_histogram". Then the collected "interval_histogram" is merged to a + * "cumulative_histogram". + */ + void merge() override; + + const HistogramStatistics& intervalStatistics() const override { return interval_statistics_; } + const HistogramStatistics& cumulativeStatistics() const override { + return cumulative_statistics_; + } + + void addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr); + + Store& parent_; + std::list tls_histograms_; + +private: + bool usedWorker() const; + + histogram_t* interval_histogram_; + histogram_t* cumulative_histogram_; + HistogramStatisticsImpl interval_statistics_; + HistogramStatisticsImpl cumulative_statistics_; + mutable std::mutex merge_lock_; +}; + +typedef std::shared_ptr ParentHistogramImplSharedPtr; + /** * Store implementation with thread local caching. This implementation supports the following * features: @@ -44,8 +133,27 @@ namespace Stats { * back to heap allocated stats if needed. NOTE: In this case, overlapping scopes will not share * the same backing store. This is to keep things simple, it could be done in the future if * needed. + * + * The threading model for managing histograms is as described below. + * Each Histogram implementation will have 2 parts. + * - "main" thread parent which is called "ParentHistogram". + * - "per-thread" collector which is called "ThreadLocalHistogram". + * Worker threads will write into their per-thread collector, without needing any locking. + * During the flush process the following sequence is followed. + * - The main thread starts the flush process by posting a message to every worker which tells the + * worker to swap its "active" histogram with its "backup" histogram. This is acheived via a call + * to "beginMerge" method. + * - Each TLS histogram has 2 histograms it makes use of, swapping back and forth. It manages a + * current_active index via which it writes to the correct histogram. + * - When all workers have done, the main thread continues with the flush process where the + * "actual" merging happens. + * - As the active histograms are swapped in TLS histograms, on the main thread, we can be sure + * that no worker is writing into the "backup" histogram. + * - The main thread now goes through all histograms, collect them across each worker and + * accumulates in to "interval" histograms. + * - Finally the main "interval" histogram is merged to "cumulative" histogram. */ -class ThreadLocalStoreImpl : public StoreRoot { +class ThreadLocalStoreImpl : Logger::Loggable, public StoreRoot { public: ThreadLocalStoreImpl(RawStatDataAllocator& alloc); ~ThreadLocalStoreImpl(); @@ -62,8 +170,11 @@ class ThreadLocalStoreImpl : public StoreRoot { }; // Stats::Store + // TODO(ramaraochavali): Consider changing the implementation of these methods to use vectors and + // use std::sort, rather than inserting into a map and pulling it out for better performance. std::list counters() const override; std::list gauges() const override; + std::list histograms() const override; // Stats::StoreRoot void addSink(Sink& sink) override { timer_sinks_.push_back(sink); } @@ -74,11 +185,19 @@ class ThreadLocalStoreImpl : public StoreRoot { ThreadLocal::Instance& tls) override; void shutdownThreading() override; + void mergeHistograms(PostMergeCb mergeCb) override; + private: struct TlsCacheEntry { std::unordered_map counters_; std::unordered_map gauges_; - std::unordered_map histograms_; + std::unordered_map histograms_; + }; + + struct CentralCacheEntry { + std::unordered_map counters_; + std::unordered_map gauges_; + std::unordered_map histograms_; }; struct ScopeImpl : public Scope { @@ -97,7 +216,7 @@ class ThreadLocalStoreImpl : public StoreRoot { ThreadLocalStoreImpl& parent_; const std::string prefix_; - TlsCacheEntry central_cache_; + CentralCacheEntry central_cache_; }; struct TlsCache : public ThreadLocal::ThreadLocalObject { @@ -113,6 +232,7 @@ class ThreadLocalStoreImpl : public StoreRoot { void clearScopeFromCaches(ScopeImpl* scope); void releaseScopeCrossThread(ScopeImpl* scope); SafeAllocData safeAlloc(const std::string& name); + void mergeInternal(PostMergeCb mergeCb); RawStatDataAllocator& alloc_; Event::Dispatcher* main_thread_dispatcher_{}; diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index e1805e3e85f3..b43737993ae3 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -91,6 +91,21 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb) { cb(); } +void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) { + ASSERT(std::this_thread::get_id() == main_thread_id_); + ASSERT(!shutdown_); + std::shared_ptr> worker_count = + std::make_shared>(registered_threads_.size()); + for (Event::Dispatcher& dispatcher : registered_threads_) { + dispatcher.post([this, worker_count, cb, all_threads_complete_cb]() -> void { + cb(); + if (--*worker_count == 0) { + main_thread_dispatcher_->post(all_threads_complete_cb); + } + }); + } +} + void InstanceImpl::SlotImpl::set(InitializeCb cb) { ASSERT(std::this_thread::get_id() == parent_.main_thread_id_); ASSERT(!parent_.shutdown_); diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index 17a835ac5d0d..820cd1504a95 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -35,6 +35,9 @@ class InstanceImpl : Logger::Loggable, public Instance { // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override; void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); } + void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override { + parent_.runOnAllThreads(cb, main_callback); + } void set(InitializeCb cb) override; InstanceImpl& parent_; @@ -48,6 +51,7 @@ class InstanceImpl : Logger::Loggable, public Instance { void removeSlot(SlotImpl& slot); void runOnAllThreads(Event::PostCb cb); + void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback); static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object); static thread_local ThreadLocalData thread_local_data_; diff --git a/source/extensions/stat_sinks/common/statsd/statsd.h b/source/extensions/stat_sinks/common/statsd/statsd.h index 78f0a76fc9a7..93e90234693a 100644 --- a/source/extensions/stat_sinks/common/statsd/statsd.h +++ b/source/extensions/stat_sinks/common/statsd/statsd.h @@ -51,6 +51,7 @@ class UdpStatsdSink : public Stats::Sink { void beginFlush() override {} void flushCounter(const Stats::Counter& counter, uint64_t delta) override; void flushGauge(const Stats::Gauge& gauge, uint64_t value) override; + void flushHistogram(const Stats::ParentHistogram&) override {} void endFlush() override {} void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override; @@ -87,6 +88,8 @@ class TcpStatsdSink : public Stats::Sink { tls_->getTyped().flushGauge(gauge.name(), value); } + void flushHistogram(const Stats::ParentHistogram&) override {} + void endFlush() override { tls_->getTyped().endFlush(true); } void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override { diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index 5aa68b19a3b8..010557966291 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -61,6 +61,42 @@ void GrpcMetricsStreamerImpl::ThreadLocalStreamer::send( MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer) : grpc_metrics_streamer_(grpc_metrics_streamer) {} +void MetricsServiceSink::flushCounter(const Stats::Counter& counter, uint64_t) { + io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); + metrics_family->set_name(counter.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); + auto* counter_metric = metric->mutable_counter(); + counter_metric->set_value(counter.value()); +} + +void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge, uint64_t value) { + io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); + metrics_family->set_name(gauge.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); + auto* gauage_metric = metric->mutable_gauge(); + gauage_metric->set_value(value); +} +void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& histogram) { + io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); + metrics_family->set_name(histogram.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); + auto* summary_metric = metric->mutable_summary(); + const Stats::HistogramStatistics& hist_stats = histogram.intervalStatistics(); + size_t index = 0; + for (double supported_quantile : hist_stats.supportedQuantiles()) { + auto* quantile = summary_metric->add_quantile(); + quantile->set_quantile(supported_quantile); + quantile->set_value(hist_stats.computedQuantiles()[index]); + index++; + } +} + } // namespace MetricsService } // namespace StatSinks } // namespace Extensions diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h index cafaafc9ad03..084712cad7c8 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h @@ -112,25 +112,9 @@ class MetricsServiceSink : public Stats::Sink { void beginFlush() override { message_.clear_envoy_metrics(); } - void flushCounter(const Stats::Counter& counter, uint64_t) override { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); - metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); - metrics_family->set_name(counter.name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); - auto* counter_metric = metric->mutable_counter(); - counter_metric->set_value(counter.value()); - } - - void flushGauge(const Stats::Gauge& gauge, uint64_t value) override { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); - metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); - metrics_family->set_name(gauge.name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); - auto* gauage_metric = metric->mutable_gauge(); - gauage_metric->set_value(value); - } + void flushCounter(const Stats::Counter& counter, uint64_t) override; + void flushGauge(const Stats::Gauge& gauge, uint64_t value) override; + void flushHistogram(const Stats::ParentHistogram& histogram) override; void endFlush() override { grpc_metrics_streamer_->send(message_); @@ -140,9 +124,7 @@ class MetricsServiceSink : public Stats::Sink { } } - void onHistogramComplete(const Stats::Histogram&, uint64_t) override { - // TODO : Need to figure out how to map existing histogram to Proto Model - } + void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} private: GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; diff --git a/source/server/http/admin.cc b/source/server/http/admin.cc index 6cb19190dc22..e2996bb9ce2a 100644 --- a/source/server/http/admin.cc +++ b/source/server/http/admin.cc @@ -36,6 +36,7 @@ #include "common/network/listen_socket_impl.h" #include "common/profiler/profiler.h" #include "common/router/config_impl.h" +#include "common/stats/stats_impl.h" #include "common/upstream/host_utility.h" #include "extensions/access_loggers/file/file_access_log_impl.h" @@ -388,11 +389,10 @@ Http::Code AdminImpl::handlerServerInfo(absl::string_view, Http::HeaderMap&, Http::Code AdminImpl::handlerStats(absl::string_view url, Http::HeaderMap& response_headers, Buffer::Instance& response) { - // We currently don't support timers locally (only via statsd) so just group all the counters - // and gauges together, alpha sort them, and spit them out. Http::Code rc = Http::Code::OK; const Http::Utility::QueryParams params = Http::Utility::parseQueryString(url); std::map all_stats; + std::map all_histograms; for (const Stats::CounterSharedPtr& counter : server_.stats().counters()) { all_stats.emplace(counter->name(), counter->value()); } @@ -401,18 +401,34 @@ Http::Code AdminImpl::handlerStats(absl::string_view url, Http::HeaderMap& respo all_stats.emplace(gauge->name(), gauge->value()); } + for (const Stats::ParentHistogramSharedPtr& histogram : server_.stats().histograms()) { + std::vector summary; + const std::vector& supported_quantiles_ref = + histogram->intervalStatistics().supportedQuantiles(); + for (size_t i = 0; i < supported_quantiles_ref.size(); ++i) { + summary.push_back(fmt::format("P{}({},{})", 100 * supported_quantiles_ref[i], + histogram->intervalStatistics().computedQuantiles()[i], + histogram->cumulativeStatistics().computedQuantiles()[i])); + } + + all_histograms.emplace(histogram->name(), absl::StrJoin(summary, " ")); + } + if (params.size() == 0) { // No Arguments so use the standard. for (auto stat : all_stats) { response.add(fmt::format("{}: {}\n", stat.first, stat.second)); } + for (auto histogram : all_histograms) { + response.add(fmt::format("{}: {}\n", histogram.first, histogram.second)); + } } else { const std::string format_key = params.begin()->first; const std::string format_value = params.begin()->second; if (format_key == "format" && format_value == "json") { response_headers.insertContentType().value().setReference( Http::Headers::get().ContentTypeValues.Json); - response.add(AdminImpl::statsAsJson(all_stats)); + response.add(AdminImpl::statsAsJson(all_stats, server_.stats().histograms())); } else if (format_key == "format" && format_value == "prometheus") { return handlerPrometheusStats(url, response_headers, response); } else { @@ -480,7 +496,9 @@ PrometheusStatsFormatter::statsAsPrometheus(const std::list& all_stats) { +std::string +AdminImpl::statsAsJson(const std::map& all_stats, + const std::list& all_histograms) { rapidjson::Document document; document.SetObject(); rapidjson::Value stats_array(rapidjson::kArrayType); @@ -496,6 +514,40 @@ std::string AdminImpl::statsAsJson(const std::map& all_st stat_obj.AddMember("value", stat_value, allocator); stats_array.PushBack(stat_obj, allocator); } + + for (Stats::ParentHistogramSharedPtr histogram : all_histograms) { + Value histogram_obj; + histogram_obj.SetObject(); + Value histogram_name; + histogram_name.SetString(histogram->name().c_str(), allocator); + histogram_obj.AddMember("name", histogram_name, allocator); + + rapidjson::Value quantile_array(rapidjson::kArrayType); + + // TODO(ramaraochavali): consider optimizing the model here. Quantiles can be added once, + // followed by two arrays interval and cumulative. + for (size_t i = 0; i < histogram->intervalStatistics().supportedQuantiles().size(); ++i) { + Value quantile_obj; + quantile_obj.SetObject(); + Value quantile_type; + quantile_type.SetDouble(histogram->intervalStatistics().supportedQuantiles()[i] * 100); + quantile_obj.AddMember("quantile", quantile_type, allocator); + Value interval_value; + if (!std::isnan(histogram->intervalStatistics().computedQuantiles()[i])) { + interval_value.SetDouble(histogram->intervalStatistics().computedQuantiles()[i]); + } + quantile_obj.AddMember("interval_value", interval_value, allocator); + Value cumulative_value; + if (!std::isnan(histogram->cumulativeStatistics().computedQuantiles()[i])) { + cumulative_value.SetDouble(histogram->cumulativeStatistics().computedQuantiles()[i]); + } + quantile_obj.AddMember("cumulative_value", cumulative_value, allocator); + quantile_array.PushBack(quantile_obj, allocator); + } + histogram_obj.AddMember("quantiles", quantile_array, allocator); + stats_array.PushBack(histogram_obj, allocator); + } + document.AddMember("stats", stats_array, allocator); rapidjson::StringBuffer strbuf; rapidjson::PrettyWriter writer(strbuf); diff --git a/source/server/http/admin.h b/source/server/http/admin.h index 18409fc15c25..31f571b40147 100644 --- a/source/server/http/admin.h +++ b/source/server/http/admin.h @@ -133,7 +133,8 @@ class AdminImpl : public Admin, void addOutlierInfo(const std::string& cluster_name, const Upstream::Outlier::Detector* outlier_detector, Buffer::Instance& response); - static std::string statsAsJson(const std::map& all_stats); + static std::string statsAsJson(const std::map& all_stats, + const std::list& all_histograms); static std::string runtimeAsJson(const std::vector>& entries); std::vector sortedHandlers() const; diff --git a/source/server/server.cc b/source/server/server.cc index 74ea43d58989..04f91e3aeadf 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -99,8 +99,8 @@ void InstanceImpl::failHealthcheck(bool fail) { server_stats_->live_.set(!fail); } -void InstanceUtil::flushCountersAndGaugesToSinks(const std::list& sinks, - Stats::Store& store) { +void InstanceUtil::flushMetricsToSinks(const std::list& sinks, + Stats::Store& store) { for (const auto& sink : sinks) { sink->beginFlush(); } @@ -122,6 +122,14 @@ void InstanceUtil::flushCountersAndGaugesToSinks(const std::list } } + for (const Stats::ParentHistogramSharedPtr& histogram : store.histograms()) { + if (histogram->used()) { + for (const auto& sink : sinks) { + sink->flushHistogram(*histogram); + } + } + } + for (const auto& sink : sinks) { sink->endFlush(); } @@ -129,19 +137,21 @@ void InstanceUtil::flushCountersAndGaugesToSinks(const std::list void InstanceImpl::flushStats() { ENVOY_LOG(debug, "flushing stats"); - HotRestart::GetParentStatsInfo info; - restarter_.getParentStats(info); - server_stats_->uptime_.set(time(nullptr) - original_start_time_); - server_stats_->memory_allocated_.set(Memory::Stats::totalCurrentlyAllocated() + - info.memory_allocated_); - server_stats_->memory_heap_size_.set(Memory::Stats::totalCurrentlyReserved()); - server_stats_->parent_connections_.set(info.num_connections_); - server_stats_->total_connections_.set(numConnections() + info.num_connections_); - server_stats_->days_until_first_cert_expiring_.set( - sslContextManager().daysUntilFirstCertExpires()); - - InstanceUtil::flushCountersAndGaugesToSinks(config_->statsSinks(), stats_store_); - stat_flush_timer_->enableTimer(config_->statsFlushInterval()); + // TODO(ramaraochavali): consider adding different flush interval for histograms. + stats_store_.mergeHistograms([this]() -> void { + HotRestart::GetParentStatsInfo info; + restarter_.getParentStats(info); + server_stats_->uptime_.set(time(nullptr) - original_start_time_); + server_stats_->memory_allocated_.set(Memory::Stats::totalCurrentlyAllocated() + + info.memory_allocated_); + server_stats_->memory_heap_size_.set(Memory::Stats::totalCurrentlyReserved()); + server_stats_->parent_connections_.set(info.num_connections_); + server_stats_->total_connections_.set(numConnections() + info.num_connections_); + server_stats_->days_until_first_cert_expiring_.set( + sslContextManager().daysUntilFirstCertExpires()); + InstanceUtil::flushMetricsToSinks(config_->statsSinks(), stats_store_); + stat_flush_timer_->enableTimer(config_->statsFlushInterval()); + }); } void InstanceImpl::getParentStats(HotRestart::GetParentStatsInfo& info) { diff --git a/source/server/server.h b/source/server/server.h index 645f0a006746..60c2b148f270 100644 --- a/source/server/server.h +++ b/source/server/server.h @@ -82,13 +82,13 @@ class InstanceUtil : Logger::Loggable { static Runtime::LoaderPtr createRuntime(Instance& server, Server::Configuration::Initial& config); /** - * Helper for flushing counters and gauges to sinks. This takes care of calling beginFlush(), - * latching of counters and flushing, flushing of gauges, and calling endFlush(), on each sink. + * Helper for flushing counters, gauges and hisograms to sinks. This takes care of calling + * beginFlush(), latching of counters and flushing, flushing of gauges, and calling endFlush(), on + * each sink. * @param sinks supplies the list of sinks. * @param store supplies the store to flush. */ - static void flushCountersAndGaugesToSinks(const std::list& sinks, - Stats::Store& store); + static void flushMetricsToSinks(const std::list& sinks, Stats::Store& store); /** * Load a bootstrap config from either v1 or v2 and perform validation. @@ -208,5 +208,5 @@ class InstanceImpl : Logger::Loggable, public Instance { std::unique_ptr file_logger_; }; -} // Server +} // namespace Server } // namespace Envoy diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 9b352c236d48..5e76ebd116b7 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -11,6 +11,7 @@ #include "test/mocks/thread_local/mocks.h" #include "test/test_common/utility.h" +#include "absl/strings/str_split.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -90,6 +91,128 @@ class StatsThreadLocalStoreTest : public testing::Test, public RawStatDataAlloca std::unique_ptr store_; }; +class HistogramTest : public testing::Test, public RawStatDataAllocator { +public: + void SetUp() override { + InSequence s; + ON_CALL(*this, alloc(_)).WillByDefault(Invoke([this](const std::string& name) -> RawStatData* { + return alloc_.alloc(name); + })); + + ON_CALL(*this, free(_)).WillByDefault(Invoke([this](RawStatData& data) -> void { + return alloc_.free(data); + })); + + EXPECT_CALL(*this, alloc("stats.overflow")); + store_.reset(new ThreadLocalStoreImpl(*this)); + store_->addSink(sink_); + store_->initializeThreading(main_thread_dispatcher_, tls_); + } + + void TearDown() override { + store_->shutdownThreading(); + tls_.shutdownThread(); + // Includes overflow stat. + EXPECT_CALL(*this, free(_)); + } + + std::vector h1_cumulative_values, h2_cumulative_values, h1_interval_values, + h2_interval_values; + + typedef std::map NameHistogramMap; + + NameHistogramMap makeHistogramMap(const std::list& hist_list) { + NameHistogramMap name_histogram_map; + for (const Stats::ParentHistogramSharedPtr& histogram : hist_list) { + // Exclude the scope part of the name. + const std::vector& split_vector = absl::StrSplit(histogram->name(), '.'); + name_histogram_map.insert(std::make_pair(split_vector.back(), histogram)); + } + return name_histogram_map; + } + + /** + * Validates that Histogram merge happens as desired and returns the processed histogram count + * that can be asserted later. + */ + uint64_t validateMerge() { + std::atomic merge_called{false}; + store_->mergeHistograms([&merge_called]() -> void { merge_called = true; }); + + EXPECT_TRUE(merge_called); + + std::list histogram_list = store_->histograms(); + + histogram_t* hist1_cumulative = hist_alloc(); + for (uint64_t value : h1_cumulative_values) { + hist_insert_intscale(hist1_cumulative, value, 0, 1); + } + + histogram_t* hist2_cumulative = hist_alloc(); + for (uint64_t value : h2_cumulative_values) { + hist_insert_intscale(hist2_cumulative, value, 0, 1); + } + + histogram_t* hist1_interval = hist_alloc(); + for (uint64_t value : h1_interval_values) { + hist_insert_intscale(hist1_interval, value, 0, 1); + } + + histogram_t* hist2_interval = hist_alloc(); + for (uint64_t value : h2_interval_values) { + hist_insert_intscale(hist2_interval, value, 0, 1); + } + + HistogramStatisticsImpl h1_cumulative_statistics(hist1_cumulative); + HistogramStatisticsImpl h2_cumulative_statistics(hist2_cumulative); + HistogramStatisticsImpl h1_interval_statistics(hist1_interval); + HistogramStatisticsImpl h2_interval_statistics(hist2_interval); + + NameHistogramMap name_histogram_map = makeHistogramMap(histogram_list); + const Stats::ParentHistogramSharedPtr& h1 = name_histogram_map["h1"]; + EXPECT_EQ(h1->cumulativeStatistics().summary(), h1_cumulative_statistics.summary()); + EXPECT_EQ(h1->intervalStatistics().summary(), h1_interval_statistics.summary()); + + if (histogram_list.size() > 1) { + const Stats::ParentHistogramSharedPtr& h2 = name_histogram_map["h2"]; + EXPECT_EQ(h2->cumulativeStatistics().summary(), h2_cumulative_statistics.summary()); + EXPECT_EQ(h2->intervalStatistics().summary(), h2_interval_statistics.summary()); + } + + hist_free(hist1_cumulative); + hist_free(hist2_cumulative); + hist_free(hist1_interval); + hist_free(hist2_interval); + + h1_interval_values.clear(); + h2_interval_values.clear(); + + return histogram_list.size(); + } + + void expectCallAndAccumulate(Histogram& histogram, uint64_t record_value) { + EXPECT_CALL(sink_, onHistogramComplete(Ref(histogram), record_value)); + histogram.recordValue(record_value); + + if (histogram.name() == "h1") { + h1_cumulative_values.push_back(record_value); + h1_interval_values.push_back(record_value); + } else { + h2_cumulative_values.push_back(record_value); + h2_interval_values.push_back(record_value); + } + } + + MOCK_METHOD1(alloc, RawStatData*(const std::string& name)); + MOCK_METHOD1(free, void(RawStatData& data)); + + NiceMock main_thread_dispatcher_; + NiceMock tls_; + TestAllocator alloc_; + MockSink sink_; + std::unique_ptr store_; +}; + TEST_F(StatsThreadLocalStoreTest, NoTls) { InSequence s; EXPECT_CALL(*this, alloc(_)).Times(2); @@ -101,7 +224,7 @@ TEST_F(StatsThreadLocalStoreTest, NoTls) { EXPECT_EQ(&g1, &store_->gauge("g1")); Histogram& h1 = store_->histogram("h1"); - EXPECT_EQ(&h1, &store_->histogram("h1")); + EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 200)); h1.recordValue(200); EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 100)); @@ -343,5 +466,146 @@ TEST_F(StatsThreadLocalStoreTest, ShuttingDown) { EXPECT_CALL(*this, free(_)).Times(5); } +// Histogram tests +TEST_F(HistogramTest, BasicSingleHistogramMerge) { + Histogram& h1 = store_->histogram("h1"); + EXPECT_EQ("h1", h1.name()); + + expectCallAndAccumulate(h1, 0); + expectCallAndAccumulate(h1, 43); + expectCallAndAccumulate(h1, 41); + expectCallAndAccumulate(h1, 415); + expectCallAndAccumulate(h1, 2201); + expectCallAndAccumulate(h1, 3201); + expectCallAndAccumulate(h1, 125); + expectCallAndAccumulate(h1, 13); + + EXPECT_EQ(1, validateMerge()); +} + +TEST_F(HistogramTest, BasicMultiHistogramMerge) { + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = store_->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("h2", h2.name()); + + expectCallAndAccumulate(h1, 1); + expectCallAndAccumulate(h2, 1); + expectCallAndAccumulate(h2, 2); + + EXPECT_EQ(2, validateMerge()); +} + +TEST_F(HistogramTest, MultiHistogramMultipleMerges) { + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = store_->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("h2", h2.name()); + + // Insert one value in to one histogram and validate + expectCallAndAccumulate(h1, 1); + EXPECT_EQ(2, validateMerge()); + + // Insert value into second histogram and validate that it is merged properly. + expectCallAndAccumulate(h2, 1); + EXPECT_EQ(2, validateMerge()); + + // Insert more values into both the histograms and validate that it is merged properly. + expectCallAndAccumulate(h1, 2); + EXPECT_EQ(2, validateMerge()); + + expectCallAndAccumulate(h2, 3); + EXPECT_EQ(2, validateMerge()); + + expectCallAndAccumulate(h2, 2); + EXPECT_EQ(2, validateMerge()); + + // Do not insert any value and validate that intervalSummary is empty for both the histograms and + // cumulativeSummary has right values. + EXPECT_EQ(2, validateMerge()); +} + +TEST_F(HistogramTest, BasicScopeHistogramMerge) { + ScopePtr scope1 = store_->createScope("scope1."); + + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = scope1->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("scope1.h2", h2.name()); + + expectCallAndAccumulate(h1, 2); + expectCallAndAccumulate(h2, 2); + EXPECT_EQ(2, validateMerge()); +} + +TEST_F(HistogramTest, BasicHistogramSummaryValidate) { + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = store_->histogram("h2"); + + expectCallAndAccumulate(h1, 1); + + EXPECT_EQ(2, validateMerge()); + + const std::string h1_expected_summary = + "P0: 1, P25: 1.025, P50: 1.05, P75: 1.075, P90: 1.09, P95: 1.095, " + "P99: 1.099, P99.9: 1.0999, P100: 1.1"; + const std::string h2_expected_summary = + "P0: 0, P25: 25, P50: 50, P75: 75, P90: 90, P95: 95, P99: 99, P99.9: 99.9, P100: 100"; + + for (size_t i = 0; i < 100; ++i) { + expectCallAndAccumulate(h2, i); + } + + EXPECT_EQ(2, validateMerge()); + + NameHistogramMap name_histogram_map = makeHistogramMap(store_->histograms()); + EXPECT_EQ(h1_expected_summary, name_histogram_map["h1"]->cumulativeStatistics().summary()); + EXPECT_EQ(h2_expected_summary, name_histogram_map["h2"]->cumulativeStatistics().summary()); +} + +// Validates the summary after known value merge in to same histogram. +TEST_F(HistogramTest, BasicHistogramMergeSummary) { + Histogram& h1 = store_->histogram("h1"); + + for (size_t i = 0; i < 50; ++i) { + expectCallAndAccumulate(h1, i); + } + EXPECT_EQ(1, validateMerge()); + + for (size_t i = 50; i < 100; ++i) { + expectCallAndAccumulate(h1, i); + } + EXPECT_EQ(1, validateMerge()); + + const std::string expected_summary = + "P0: 0, P25: 25, P50: 50, P75: 75, P90: 90, P95: 95, P99: 99, P99.9: 99.9, P100: 100"; + + NameHistogramMap name_histogram_map = makeHistogramMap(store_->histograms()); + EXPECT_EQ(expected_summary, name_histogram_map["h1"]->cumulativeStatistics().summary()); +} + +TEST_F(HistogramTest, BasicHistogramUsed) { + ScopePtr scope1 = store_->createScope("scope1."); + + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = scope1->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("scope1.h2", h2.name()); + + EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 1)); + h1.recordValue(1); + + NameHistogramMap name_histogram_map = makeHistogramMap(store_->histograms()); + EXPECT_TRUE(name_histogram_map["h1"]->used()); + EXPECT_FALSE(name_histogram_map["h2"]->used()); + + EXPECT_CALL(sink_, onHistogramComplete(Ref(h2), 2)); + h2.recordValue(2); + + for (const Stats::ParentHistogramSharedPtr& histogram : store_->histograms()) { + EXPECT_TRUE(histogram->used()); + } +} + } // namespace Stats } // namespace Envoy diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 8641bb6508b5..70f1ebdd1377 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -82,6 +82,42 @@ TEST_F(ThreadLocalInstanceImplTest, All) { tls_.shutdownThread(); } +// Validate ThreadLocal::runOnAllThreads behavior with all_thread_complete call back. +TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { + SlotPtr tlsptr = tls_.allocateSlot(); + + EXPECT_CALL(thread_dispatcher_, post(_)); + EXPECT_CALL(main_dispatcher_, post(_)); + + // Ensure that the thread local call back and all_thread_complete call back are called. + struct { + std::atomic thread_local_calls_{0}; + std::condition_variable condvar_; + std::mutex condvar_mutex_; + bool all_threads_complete_; + } thread_local_data; + + tlsptr->runOnAllThreads( + [&thread_local_data]() -> void { ++thread_local_data.thread_local_calls_; }, + [&thread_local_data]() -> void { + EXPECT_EQ(thread_local_data.thread_local_calls_, 1); + std::unique_lock lock(thread_local_data.condvar_mutex_); + thread_local_data.all_threads_complete_ = true; + thread_local_data.condvar_.notify_one(); + }); + + { + std::unique_lock lock(thread_local_data.condvar_mutex_); + thread_local_data.condvar_.wait( + lock, [&thread_local_data] { return thread_local_data.all_threads_complete_; }); + } + + EXPECT_TRUE(thread_local_data.all_threads_complete_); + + tls_.shutdownGlobalThreading(); + tls_.shutdownThread(); +} + // Validate ThreadLocal::InstanceImpl's dispatcher() behavior. TEST(ThreadLocalInstanceImplDispatcherTest, Dispatcher) { InstanceImpl tls; diff --git a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc index b8adeb1700f6..42dba8622621 100644 --- a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc +++ b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc @@ -110,6 +110,10 @@ TEST(MetricsServiceSinkTest, CheckSendCall) { NiceMock gauge; gauge.name_ = "test_gauge"; sink.flushGauge(gauge, 1); + + NiceMock histogram; + histogram.name_ = "test_histogram"; + sink.flushHistogram(histogram); EXPECT_CALL(*streamer_, send(_)); sink.endFlush(); diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index c81aea51577c..907f6213cc67 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -68,6 +68,7 @@ class MetricsServiceIntegrationTest : public HttpIntegrationTest, request_msg.envoy_metrics(); bool known_counter_exists = false; bool known_gauge_exists = false; + bool known_histogram_exists = false; for (::io::prometheus::client::MetricFamily metrics_family : envoy_metrics) { if (metrics_family.name() == "cluster.cluster_0.membership_change" && metrics_family.type() == ::io::prometheus::client::MetricType::COUNTER) { @@ -79,13 +80,21 @@ class MetricsServiceIntegrationTest : public HttpIntegrationTest, known_gauge_exists = true; EXPECT_EQ(1, metrics_family.metric(0).gauge().value()); } + if (metrics_family.name() == "cluster.cluster_0.upstream_rq_time" && + metrics_family.type() == ::io::prometheus::client::MetricType::SUMMARY) { + known_histogram_exists = true; + Stats::HistogramStatisticsImpl empty_statistics; + EXPECT_EQ(metrics_family.metric(0).summary().quantile_size(), + empty_statistics.supportedQuantiles().size()); + } ASSERT(metrics_family.metric(0).has_timestamp_ms()); - if (known_counter_exists && known_gauge_exists) { + if (known_counter_exists && known_gauge_exists && known_histogram_exists) { break; } } EXPECT_TRUE(known_counter_exists); EXPECT_TRUE(known_gauge_exists); + EXPECT_TRUE(known_histogram_exists); } void cleanup() { @@ -102,18 +111,29 @@ class MetricsServiceIntegrationTest : public HttpIntegrationTest, INSTANTIATE_TEST_CASE_P(IpVersionsClientType, MetricsServiceIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); -// Test a basic full access logging flow. +// Test a basic metric service flow. TEST_P(MetricsServiceIntegrationTest, BasicFlow) { initialize(); + // Send an empty request so that histogram values merged for cluster_0. + codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http"))); + Http::TestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-lyft-user-id", "123"}}; + sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0); + waitForMetricsServiceConnection(); waitForMetricsStream(); waitForMetricsRequest(); + // Send an empty response and end the stream. This should never happen but make sure nothing // breaks and we make a new stream on a follow up request. metrics_service_request_->startGrpcStream(); envoy::service::metrics::v2::StreamMetricsResponse response_msg; metrics_service_request_->sendGrpcMessage(response_msg); metrics_service_request_->finishGrpcStream(Grpc::Status::Ok); + switch (clientType()) { case Grpc::ClientType::EnvoyGrpc: test_server_->waitForGaugeEq("cluster.metrics_service.upstream_rq_active", 0); diff --git a/test/integration/server.h b/test/integration/server.h index 15591dcd213c..117dd6cae051 100644 --- a/test/integration/server.h +++ b/test/integration/server.h @@ -176,11 +176,17 @@ class TestIsolatedStoreImpl : public StoreRoot { return store_.gauges(); } + std::list histograms() const override { + std::unique_lock lock(lock_); + return store_.histograms(); + } + // Stats::StoreRoot void addSink(Sink&) override {} void setTagProducer(TagProducerPtr&&) override {} void initializeThreading(Event::Dispatcher&, ThreadLocal::Instance&) override {} void shutdownThreading() override {} + void mergeHistograms(PostMergeCb) override {} private: mutable std::mutex lock_; diff --git a/test/mocks/stats/mocks.cc b/test/mocks/stats/mocks.cc index cb107c441ce3..a62ff79f8faf 100644 --- a/test/mocks/stats/mocks.cc +++ b/test/mocks/stats/mocks.cc @@ -5,6 +5,7 @@ using testing::Invoke; using testing::NiceMock; +using testing::Return; using testing::ReturnRef; using testing::_; @@ -34,8 +35,18 @@ MockHistogram::MockHistogram() { ON_CALL(*this, tagExtractedName()).WillByDefault(ReturnRef(name_)); ON_CALL(*this, tags()).WillByDefault(ReturnRef(tags_)); } + MockHistogram::~MockHistogram() {} +MockParentHistogram::MockParentHistogram() { + ON_CALL(*this, tagExtractedName()).WillByDefault(ReturnRef(name_)); + ON_CALL(*this, tags()).WillByDefault(ReturnRef(tags_)); + ON_CALL(*this, intervalStatistics()).WillByDefault(ReturnRef(*histogram_stats_)); + ON_CALL(*this, cumulativeStatistics()).WillByDefault(ReturnRef(*histogram_stats_)); +} + +MockParentHistogram::~MockParentHistogram() {} + MockSink::MockSink() {} MockSink::~MockSink() {} diff --git a/test/mocks/stats/mocks.h b/test/mocks/stats/mocks.h index d70478d5b6a3..7012b96f4750 100644 --- a/test/mocks/stats/mocks.h +++ b/test/mocks/stats/mocks.h @@ -68,10 +68,34 @@ class MockHistogram : public Histogram { MOCK_CONST_METHOD0(tagExtractedName, const std::string&()); MOCK_CONST_METHOD0(tags, const std::vector&()); MOCK_METHOD1(recordValue, void(uint64_t value)); + MOCK_CONST_METHOD0(used, bool()); + + std::string name_; + std::vector tags_; + Store* store_; +}; + +class MockParentHistogram : public ParentHistogram { +public: + MockParentHistogram(); + ~MockParentHistogram(); + + // Note: cannot be mocked because it is accessed as a Property in a gmock EXPECT_CALL. This + // creates a deadlock in gmock and is an unintended use of mock functions. + const std::string& name() const override { return name_; }; + void merge() override {} + + MOCK_CONST_METHOD0(used, bool()); + MOCK_CONST_METHOD0(tagExtractedName, const std::string&()); + MOCK_CONST_METHOD0(tags, const std::vector&()); + MOCK_CONST_METHOD0(cumulativeStatistics, const HistogramStatistics&()); + MOCK_CONST_METHOD0(intervalStatistics, const HistogramStatistics&()); std::string name_; std::vector tags_; Store* store_; + std::shared_ptr histogram_stats_ = + std::make_shared(); }; class MockSink : public Sink { @@ -82,6 +106,7 @@ class MockSink : public Sink { MOCK_METHOD0(beginFlush, void()); MOCK_METHOD2(flushCounter, void(const Counter& counter, uint64_t delta)); MOCK_METHOD2(flushGauge, void(const Gauge& gauge, uint64_t value)); + MOCK_METHOD1(flushHistogram, void(const ParentHistogram& histogram)); MOCK_METHOD0(endFlush, void()); MOCK_METHOD2(onHistogramComplete, void(const Histogram& histogram, uint64_t value)); }; @@ -100,6 +125,7 @@ class MockStore : public Store { MOCK_METHOD1(gauge, Gauge&(const std::string&)); MOCK_CONST_METHOD0(gauges, std::list()); MOCK_METHOD1(histogram, Histogram&(const std::string& name)); + MOCK_CONST_METHOD0(histograms, std::list()); testing::NiceMock counter_; std::vector> histograms_; diff --git a/test/mocks/thread_local/mocks.h b/test/mocks/thread_local/mocks.h index fb9d3ab01331..af871695420d 100644 --- a/test/mocks/thread_local/mocks.h +++ b/test/mocks/thread_local/mocks.h @@ -28,6 +28,11 @@ class MockInstance : public Instance { SlotPtr allocateSlot_() { return SlotPtr{new SlotImpl(*this, current_slot_++)}; } void runOnAllThreads_(Event::PostCb cb) { cb(); } + void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) { + cb(); + main_callback(); + } + void shutdownThread_() { shutdown_ = true; // Reverse order which is same as the production code. @@ -53,6 +58,9 @@ class MockInstance : public Instance { // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override { return parent_.data_[index_]; } void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); } + void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override { + parent_.runOnAllThreads(cb, main_callback); + } void set(InitializeCb cb) override { parent_.data_[index_] = cb(parent_.dispatcher_); } MockInstance& parent_; diff --git a/test/server/server_test.cc b/test/server/server_test.cc index 80f62cb4c918..06b0afe019ec 100644 --- a/test/server/server_test.cc +++ b/test/server/server_test.cc @@ -36,7 +36,7 @@ TEST(ServerInstanceUtil, flushHelper) { std::list sinks; sinks.emplace_back(std::move(sink)); - InstanceUtil::flushCountersAndGaugesToSinks(sinks, store); + InstanceUtil::flushMetricsToSinks(sinks, store); } class RunHelperTest : public testing::Test { From 5ba7949b516ad2487c4cd0bd9c6f11e0646edd87 Mon Sep 17 00:00:00 2001 From: Rama Date: Thu, 19 Apr 2018 20:12:02 +0530 Subject: [PATCH 02/16] addressed review comments Signed-off-by: Rama --- source/server/http/admin.cc | 2 +- source/server/server.cc | 4 +- test/common/stats/thread_local_store_test.cc | 55 +++++++++----------- 3 files changed, 28 insertions(+), 33 deletions(-) diff --git a/source/server/http/admin.cc b/source/server/http/admin.cc index e2996bb9ce2a..66d1c32bf6c8 100644 --- a/source/server/http/admin.cc +++ b/source/server/http/admin.cc @@ -515,7 +515,7 @@ AdminImpl::statsAsJson(const std::map& all_stats, stats_array.PushBack(stat_obj, allocator); } - for (Stats::ParentHistogramSharedPtr histogram : all_histograms) { + for (const Stats::ParentHistogramSharedPtr& histogram : all_histograms) { Value histogram_obj; histogram_obj.SetObject(); Value histogram_name; diff --git a/source/server/server.cc b/source/server/server.cc index 04f91e3aeadf..d2f499216b72 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -137,7 +137,8 @@ void InstanceUtil::flushMetricsToSinks(const std::list& sinks, void InstanceImpl::flushStats() { ENVOY_LOG(debug, "flushing stats"); - // TODO(ramaraochavali): consider adding different flush interval for histograms. + // A shutdown initiated before this callback may prevent this from being called as per + // the semantics documented in ThreadLocal's runAllThreads method. stats_store_.mergeHistograms([this]() -> void { HotRestart::GetParentStatsInfo info; restarter_.getParentStats(info); @@ -150,6 +151,7 @@ void InstanceImpl::flushStats() { server_stats_->days_until_first_cert_expiring_.set( sslContextManager().daysUntilFirstCertExpires()); InstanceUtil::flushMetricsToSinks(config_->statsSinks(), stats_store_); + // TODO(ramaraochavali): consider adding different flush interval for histograms. stat_flush_timer_->enableTimer(config_->statsFlushInterval()); }); } diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 5e76ebd116b7..962fe264eeff 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -93,8 +93,10 @@ class StatsThreadLocalStoreTest : public testing::Test, public RawStatDataAlloca class HistogramTest : public testing::Test, public RawStatDataAllocator { public: + typedef std::map NameHistogramMap; + InSequence s; + void SetUp() override { - InSequence s; ON_CALL(*this, alloc(_)).WillByDefault(Invoke([this](const std::string& name) -> RawStatData* { return alloc_.alloc(name); })); @@ -116,10 +118,8 @@ class HistogramTest : public testing::Test, public RawStatDataAllocator { EXPECT_CALL(*this, free(_)); } - std::vector h1_cumulative_values, h2_cumulative_values, h1_interval_values, - h2_interval_values; - - typedef std::map NameHistogramMap; + std::vector h1_cumulative_values_, h2_cumulative_values_, h1_interval_values_, + h2_interval_values_; NameHistogramMap makeHistogramMap(const std::list& hist_list) { NameHistogramMap name_histogram_map; @@ -136,32 +136,17 @@ class HistogramTest : public testing::Test, public RawStatDataAllocator { * that can be asserted later. */ uint64_t validateMerge() { - std::atomic merge_called{false}; + bool merge_called = false; store_->mergeHistograms([&merge_called]() -> void { merge_called = true; }); EXPECT_TRUE(merge_called); std::list histogram_list = store_->histograms(); - histogram_t* hist1_cumulative = hist_alloc(); - for (uint64_t value : h1_cumulative_values) { - hist_insert_intscale(hist1_cumulative, value, 0, 1); - } - - histogram_t* hist2_cumulative = hist_alloc(); - for (uint64_t value : h2_cumulative_values) { - hist_insert_intscale(hist2_cumulative, value, 0, 1); - } - - histogram_t* hist1_interval = hist_alloc(); - for (uint64_t value : h1_interval_values) { - hist_insert_intscale(hist1_interval, value, 0, 1); - } - - histogram_t* hist2_interval = hist_alloc(); - for (uint64_t value : h2_interval_values) { - hist_insert_intscale(hist2_interval, value, 0, 1); - } + histogram_t* hist1_cumulative = makeHistogram(h1_cumulative_values_); + histogram_t* hist2_cumulative = makeHistogram(h2_cumulative_values_); + histogram_t* hist1_interval = makeHistogram(h1_interval_values_); + histogram_t* hist2_interval = makeHistogram(h2_interval_values_); HistogramStatisticsImpl h1_cumulative_statistics(hist1_cumulative); HistogramStatisticsImpl h2_cumulative_statistics(hist2_cumulative); @@ -184,8 +169,8 @@ class HistogramTest : public testing::Test, public RawStatDataAllocator { hist_free(hist1_interval); hist_free(hist2_interval); - h1_interval_values.clear(); - h2_interval_values.clear(); + h1_interval_values_.clear(); + h2_interval_values_.clear(); return histogram_list.size(); } @@ -195,12 +180,20 @@ class HistogramTest : public testing::Test, public RawStatDataAllocator { histogram.recordValue(record_value); if (histogram.name() == "h1") { - h1_cumulative_values.push_back(record_value); - h1_interval_values.push_back(record_value); + h1_cumulative_values_.push_back(record_value); + h1_interval_values_.push_back(record_value); } else { - h2_cumulative_values.push_back(record_value); - h2_interval_values.push_back(record_value); + h2_cumulative_values_.push_back(record_value); + h2_interval_values_.push_back(record_value); + } + } + + histogram_t* makeHistogram(const std::vector& values) { + histogram_t* histogram = hist_alloc(); + for (uint64_t value : values) { + hist_insert_intscale(histogram, value, 0, 1); } + return histogram; } MOCK_METHOD1(alloc, RawStatData*(const std::string& name)); From 5fa7ea671af46dfb643978be66d152717b683436 Mon Sep 17 00:00:00 2001 From: Rama Date: Fri, 20 Apr 2018 16:56:10 +0530 Subject: [PATCH 03/16] address review comments,updated release notes Signed-off-by: Rama --- docs/root/intro/version_history.rst | 1 + docs/root/operations/admin.rst | 10 ++-- include/envoy/stats/stats.h | 5 +- include/envoy/thread_local/thread_local.h | 4 +- source/common/stats/stats_impl.cc | 1 + source/common/stats/stats_impl.h | 5 +- source/common/stats/thread_local_store.cc | 16 +++-- source/common/stats/thread_local_store.h | 10 ++-- .../grpc_metrics_service_impl.cc | 8 +-- source/server/http/admin.cc | 1 + .../thread_local/thread_local_impl_test.cc | 59 +++++++++++++++---- 11 files changed, 79 insertions(+), 41 deletions(-) diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index b91e5f614d92..845e317abb83 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -44,6 +44,7 @@ Version history :ref:`cluster specific ` options. * sockets: added `IP_TRANSPARENT` socket option support for :ref:`listeners `. +* stats: added support for histograms. * tracing: the sampling decision is now delegated to the tracers, allowing the tracer to decide when and if to use it. For example, if the :ref:`x-b3-sampled ` header is supplied with the client request, its value will override any sampling decision made by the Envoy proxy. diff --git a/docs/root/operations/admin.rst b/docs/root/operations/admin.rst index 69ce90f25c59..88627320cbce 100644 --- a/docs/root/operations/admin.rst +++ b/docs/root/operations/admin.rst @@ -181,10 +181,12 @@ The fields are: .. http:get:: /stats - Outputs all statistics on demand. This includes only counters and gauges. Histograms are not - output as Envoy currently has no built in histogram support and relies on statsd for - aggregation. This command is very useful for local debugging. See :ref:`here ` - for more information. + Outputs all statistics on demand. This command is very useful for local debugging. + Histograms will output the computed statistics of it like P0,P50,P75 etc. The output + for each quantile will be in the form of (inteval,cumulative) where interval value + represents the summary since last flush interval and cumulative value represents the + summary since the start of envoy instance. + See :ref:`here ` for more information. .. http:get:: /stats?format=json diff --git a/include/envoy/stats/stats.h b/include/envoy/stats/stats.h index 1526c9f673cb..37e0788a1365 100644 --- a/include/envoy/stats/stats.h +++ b/include/envoy/stats/stats.h @@ -199,15 +199,14 @@ class Histogram : public virtual Metric { typedef std::shared_ptr HistogramSharedPtr; /** - * A histogram that is stored in main thread, manages all thread local histograms and provides - * summary view of the histogram. + * A histogram that is stored in main thread and provides summary view of the histogram. */ class ParentHistogram : public virtual Metric { public: virtual ~ParentHistogram() {} /** - * This method is called during the main stats flush process for each of the histogram and used + * This method is called during the main stats flush process for each of the histograms and used * to merge the histogram values. */ virtual void merge() PURE; diff --git a/include/envoy/thread_local/thread_local.h b/include/envoy/thread_local/thread_local.h index f349715e94ff..c6eb0b54bb4a 100644 --- a/include/envoy/thread_local/thread_local.h +++ b/include/envoy/thread_local/thread_local.h @@ -50,8 +50,8 @@ class Slot { * Run a callback on all registered threads with a barrier. A shutdown initiated during the * running of the PostCBs may prevent all_threads_complete_cb from being called. * @param cb supplies the callback to run on each thread. - * @param all_threads_complete_cb supplies the callback to run on main thread after threads are - * done. + * @param all_threads_complete_cb supplies the callback to run on main thread after cb has + * been run on all registered threads. */ virtual void runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) PURE; diff --git a/source/common/stats/stats_impl.cc b/source/common/stats/stats_impl.cc index 2dce5d6be2e4..8e67c38390dd 100644 --- a/source/common/stats/stats_impl.cc +++ b/source/common/stats/stats_impl.cc @@ -288,6 +288,7 @@ const std::vector& HistogramStatisticsImpl::supportedQuantiles() const { std::string HistogramStatisticsImpl::summary() const { std::vector summary; const std::vector& supported_quantiles_ref = supportedQuantiles(); + summary.reserve(supported_quantiles_ref.size()); for (size_t i = 0; i < supported_quantiles_ref.size(); ++i) { summary.push_back( fmt::format("P{}: {}", 100 * supported_quantiles_ref[i], computed_quantiles_[i])); diff --git a/source/common/stats/stats_impl.h b/source/common/stats/stats_impl.h index fb609a5fa0b9..424ff8d2bb93 100644 --- a/source/common/stats/stats_impl.h +++ b/source/common/stats/stats_impl.h @@ -375,12 +375,13 @@ class HistogramStatisticsImpl : public HistogramStatistics, NonCopyable { */ HistogramStatisticsImpl(const histogram_t* histogram_ptr); + void refresh(const histogram_t* new_histogram_ptr); + + // HistogramStatistics std::string summary() const override; const std::vector& supportedQuantiles() const override; const std::vector& computedQuantiles() const override { return computed_quantiles_; } - void refresh(const histogram_t* new_histogram_ptr); - private: std::vector computed_quantiles_; }; diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 56d7f5539325..537763151687 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -76,7 +76,7 @@ std::list ThreadLocalStoreImpl::histograms() const { if (names.insert(hist_name).second) { ret.push_back(parent_hist); } else { - ENVOY_LOG(warn, "duplicate histogram {}.{}: data loss will occur on output", scope->prefix_, + ENVOY_LOG(warn, "duplicate histogram {}{}: data loss will occur on output", scope->prefix_, hist_name); } } @@ -305,7 +305,7 @@ void ThreadLocalHistogramImpl::recordValue(uint64_t value) { } void ThreadLocalHistogramImpl::merge(histogram_t* target) { - uint64_t other_histogram_index = otherHistogramIndex(); + const uint64_t other_histogram_index = otherHistogramIndex(); hist_accumulate(target, &histograms_[other_histogram_index], 1); hist_clear(histograms_[other_histogram_index]); } @@ -318,7 +318,7 @@ ParentHistogramImpl::ParentHistogramImpl(const std::string& name, Store& parent, bool ParentHistogramImpl::used() const { std::unique_lock lock(merge_lock_); - return usedWorker(); + return usedLockHeld(); } ParentHistogramImpl::~ParentHistogramImpl() { @@ -328,7 +328,7 @@ ParentHistogramImpl::~ParentHistogramImpl() { void ParentHistogramImpl::merge() { std::unique_lock lock(merge_lock_); - if (usedWorker()) { + if (usedLockHeld()) { hist_clear(interval_histogram_); for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) { tls_histogram->merge(interval_histogram_); @@ -346,15 +346,13 @@ void ParentHistogramImpl::addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr) tls_histograms_.emplace_back(hist_ptr); } -bool ParentHistogramImpl::usedWorker() const { - bool any_tls_used = false; +bool ParentHistogramImpl::usedLockHeld() const { for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) { if (tls_histogram->used()) { - any_tls_used = true; - break; + return true; } } - return any_tls_used; + return false; } } // namespace Stats diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 1be78bd17d51..98777a1cc157 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -43,15 +43,14 @@ class ThreadLocalHistogramImpl : public ThreadLocalHistogram, public MetricImpl ~ThreadLocalHistogramImpl(); // Stats::Histogram void recordValue(uint64_t value) override; + void merge(histogram_t* target); bool used() const override { return flags_ & Flags::Used; } void beginMerge() override { - // this switches the current_active_ between 1 and 0. + // This switches the current_active_ between 1 and 0. current_active_ = otherHistogramIndex(); } - void merge(histogram_t* target); - Store& parent_; private: @@ -72,6 +71,7 @@ class ParentHistogramImpl : public ParentHistogram, public MetricImpl { std::vector&& tags); virtual ~ParentHistogramImpl(); + void addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr); bool used() const override; @@ -88,13 +88,11 @@ class ParentHistogramImpl : public ParentHistogram, public MetricImpl { return cumulative_statistics_; } - void addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr); - Store& parent_; std::list tls_histograms_; private: - bool usedWorker() const; + bool usedLockHeld() const; histogram_t* interval_histogram_; histogram_t* cumulative_histogram_; diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index 010557966291..d4c4e9d07970 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -88,12 +88,10 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& histogram) metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); auto* summary_metric = metric->mutable_summary(); const Stats::HistogramStatistics& hist_stats = histogram.intervalStatistics(); - size_t index = 0; - for (double supported_quantile : hist_stats.supportedQuantiles()) { + for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) { auto* quantile = summary_metric->add_quantile(); - quantile->set_quantile(supported_quantile); - quantile->set_value(hist_stats.computedQuantiles()[index]); - index++; + quantile->set_quantile(hist_stats.supportedQuantiles()[i]); + quantile->set_value(hist_stats.computedQuantiles()[i]); } } diff --git a/source/server/http/admin.cc b/source/server/http/admin.cc index 66d1c32bf6c8..e521615f0f21 100644 --- a/source/server/http/admin.cc +++ b/source/server/http/admin.cc @@ -469,6 +469,7 @@ std::string PrometheusStatsFormatter::metricName(const std::string& extractedNam return fmt::format("envoy_{0}", sanitizeName(extractedName)); } +// TODO(ramaraochavali): Add summary histogram output for Prometheus. uint64_t PrometheusStatsFormatter::statsAsPrometheus(const std::list& counters, const std::list& gauges, diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 70f1ebdd1377..53e51e994f06 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -92,30 +92,69 @@ TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { // Ensure that the thread local call back and all_thread_complete call back are called. struct { std::atomic thread_local_calls_{0}; - std::condition_variable condvar_; - std::mutex condvar_mutex_; - bool all_threads_complete_; + bool all_threads_complete_ = false; } thread_local_data; tlsptr->runOnAllThreads( [&thread_local_data]() -> void { ++thread_local_data.thread_local_calls_; }, [&thread_local_data]() -> void { EXPECT_EQ(thread_local_data.thread_local_calls_, 1); - std::unique_lock lock(thread_local_data.condvar_mutex_); thread_local_data.all_threads_complete_ = true; - thread_local_data.condvar_.notify_one(); }); + EXPECT_TRUE(thread_local_data.all_threads_complete_); + + tls_.shutdownGlobalThreading(); + tls_.shutdownThread(); +} + +// Validate ThreadLocal::runOnAllThreads behavior with real threads. +TEST(RunOnAllThreadsTest, RunOnAllThreads) { + InstanceImpl tls; + Event::DispatcherImpl main_dispatcher; + Event::DispatcherImpl thread_dispatcher_1, thread_dispatcher_2; + SlotPtr tlsptr = tls.allocateSlot(); + + tls.registerThread(main_dispatcher, true); + tls.registerThread(thread_dispatcher_1, false); + tls.registerThread(thread_dispatcher_2, false); + + main_dispatcher.run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(&main_dispatcher, &tls.dispatcher()); + + Thread::Thread( + [&thread_dispatcher_1]() { thread_dispatcher_1.run(Event::Dispatcher::RunType::Block); }); + + Thread::Thread( + [&thread_dispatcher_2]() { thread_dispatcher_2.run(Event::Dispatcher::RunType::Block); }); + + struct { + std::atomic thread_local_calls_{0}; + std::condition_variable condvar_; + std::mutex condvar_mutex_; + bool all_threads_complete_; + } thread_local_data; + + tlsptr->runOnAllThreads( + [&thread_local_data, &tls]() -> void { + std::unique_lock lock(thread_local_data.condvar_mutex_); + ++thread_local_data.thread_local_calls_; + thread_local_data.condvar_.notify_one(); + tls.dispatcher().exit(); + }, + [&thread_local_data, &tls]() -> void { + thread_local_data.all_threads_complete_ = true; + tls.dispatcher().exit(); + }); { std::unique_lock lock(thread_local_data.condvar_mutex_); thread_local_data.condvar_.wait( - lock, [&thread_local_data] { return thread_local_data.all_threads_complete_; }); + lock, [&thread_local_data] { return (thread_local_data.thread_local_calls_ == 2); }); } + EXPECT_EQ(2, thread_local_data.thread_local_calls_); - EXPECT_TRUE(thread_local_data.all_threads_complete_); - - tls_.shutdownGlobalThreading(); - tls_.shutdownThread(); + tls.shutdownGlobalThreading(); + tls.shutdownThread(); } // Validate ThreadLocal::InstanceImpl's dispatcher() behavior. From 0c0f8ef1d88686ab88cec6ec222bbf030f2ff561 Mon Sep 17 00:00:00 2001 From: Rama Date: Fri, 20 Apr 2018 18:02:10 +0530 Subject: [PATCH 04/16] removed the real thread test Signed-off-by: Rama --- .../thread_local/thread_local_impl_test.cc | 51 +------------------ 1 file changed, 1 insertion(+), 50 deletions(-) diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 53e51e994f06..59519891363e 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -91,7 +91,7 @@ TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { // Ensure that the thread local call back and all_thread_complete call back are called. struct { - std::atomic thread_local_calls_{0}; + uint64_t thread_local_calls_{0}; bool all_threads_complete_ = false; } thread_local_data; @@ -108,55 +108,6 @@ TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { tls_.shutdownThread(); } -// Validate ThreadLocal::runOnAllThreads behavior with real threads. -TEST(RunOnAllThreadsTest, RunOnAllThreads) { - InstanceImpl tls; - Event::DispatcherImpl main_dispatcher; - Event::DispatcherImpl thread_dispatcher_1, thread_dispatcher_2; - SlotPtr tlsptr = tls.allocateSlot(); - - tls.registerThread(main_dispatcher, true); - tls.registerThread(thread_dispatcher_1, false); - tls.registerThread(thread_dispatcher_2, false); - - main_dispatcher.run(Event::Dispatcher::RunType::Block); - EXPECT_EQ(&main_dispatcher, &tls.dispatcher()); - - Thread::Thread( - [&thread_dispatcher_1]() { thread_dispatcher_1.run(Event::Dispatcher::RunType::Block); }); - - Thread::Thread( - [&thread_dispatcher_2]() { thread_dispatcher_2.run(Event::Dispatcher::RunType::Block); }); - - struct { - std::atomic thread_local_calls_{0}; - std::condition_variable condvar_; - std::mutex condvar_mutex_; - bool all_threads_complete_; - } thread_local_data; - - tlsptr->runOnAllThreads( - [&thread_local_data, &tls]() -> void { - std::unique_lock lock(thread_local_data.condvar_mutex_); - ++thread_local_data.thread_local_calls_; - thread_local_data.condvar_.notify_one(); - tls.dispatcher().exit(); - }, - [&thread_local_data, &tls]() -> void { - thread_local_data.all_threads_complete_ = true; - tls.dispatcher().exit(); - }); - { - std::unique_lock lock(thread_local_data.condvar_mutex_); - thread_local_data.condvar_.wait( - lock, [&thread_local_data] { return (thread_local_data.thread_local_calls_ == 2); }); - } - EXPECT_EQ(2, thread_local_data.thread_local_calls_); - - tls.shutdownGlobalThreading(); - tls.shutdownThread(); -} - // Validate ThreadLocal::InstanceImpl's dispatcher() behavior. TEST(ThreadLocalInstanceImplDispatcherTest, Dispatcher) { InstanceImpl tls; From 15f48f1282d56a9f28e82ad63738eae1700ef489 Mon Sep 17 00:00:00 2001 From: Rama Date: Fri, 20 Apr 2018 19:56:05 +0530 Subject: [PATCH 05/16] correct the docs Signed-off-by: Rama --- docs/root/operations/admin.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/root/operations/admin.rst b/docs/root/operations/admin.rst index 88627320cbce..adfe07c711d0 100644 --- a/docs/root/operations/admin.rst +++ b/docs/root/operations/admin.rst @@ -182,8 +182,8 @@ The fields are: .. http:get:: /stats Outputs all statistics on demand. This command is very useful for local debugging. - Histograms will output the computed statistics of it like P0,P50,P75 etc. The output - for each quantile will be in the form of (inteval,cumulative) where interval value + Histograms will output the computed quantiles i.e P0,P25,P50,P75,P90, P99, P99.9 and P100. + The output for each quantile will be in the form of (inteval,cumulative) where interval value represents the summary since last flush interval and cumulative value represents the summary since the start of envoy instance. See :ref:`here ` for more information. From fb32a09ec8a8a66973652862f1768ecee6a7929c Mon Sep 17 00:00:00 2001 From: Rama Date: Sat, 21 Apr 2018 17:29:19 +0530 Subject: [PATCH 06/16] moved the logic of TLS histogram creation to recordValue Signed-off-by: Rama --- include/envoy/stats/stats.h | 2 +- source/common/stats/thread_local_store.cc | 65 ++++++++++++------- source/common/stats/thread_local_store.h | 67 +++++++++++--------- test/common/stats/thread_local_store_test.cc | 1 + test/mocks/stats/mocks.cc | 5 ++ test/mocks/stats/mocks.h | 1 + 6 files changed, 86 insertions(+), 55 deletions(-) diff --git a/include/envoy/stats/stats.h b/include/envoy/stats/stats.h index 37e0788a1365..7993b7cd4f6e 100644 --- a/include/envoy/stats/stats.h +++ b/include/envoy/stats/stats.h @@ -201,7 +201,7 @@ typedef std::shared_ptr HistogramSharedPtr; /** * A histogram that is stored in main thread and provides summary view of the histogram. */ -class ParentHistogram : public virtual Metric { +class ParentHistogram : public virtual Histogram { public: virtual ~ParentHistogram() {} diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 537763151687..12f223afc3fb 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -253,15 +253,6 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { // See comments in counter(). There is no super clean way (via templates or otherwise) to // share this code so I'm leaving it largely duplicated for now. std::string final_name = prefix_ + name; - TlsHistogramSharedPtr* tls_ref = nullptr; - if (!parent_.shutting_down_ && parent_.tls_) { - tls_ref = &parent_.tls_->getTyped().scope_cache_[this].histograms_[final_name]; - } - - if (tls_ref && *tls_ref) { - return **tls_ref; - } - std::unique_lock lock(parent_.lock_); ParentHistogramImplSharedPtr& central_ref = central_cache_.histograms_[final_name]; @@ -271,11 +262,31 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { // Since MetricImpl only has move constructor, we are explicitly copying here. std::string central_tag_extracted_name(tag_extracted_name); std::vector central_tags(tags); - central_ref.reset(new ParentHistogramImpl( - final_name, parent_, std::move(central_tag_extracted_name), std::move(central_tags))); + central_ref.reset(new ParentHistogramImpl(final_name, parent_, *this, + std::move(central_tag_extracted_name), + std::move(central_tags))); + } + return *central_ref; +} + +Histogram& ThreadLocalStoreImpl::ScopeImpl::tlsHistogram(const std::string& name) { + // Here prefix will not be considered because, by the time ParentHistogram calls this method + // during recordValue, the prefix is already attached to the name. + TlsHistogramSharedPtr* tls_ref = nullptr; + if (!parent_.shutting_down_ && parent_.tls_) { + tls_ref = &parent_.tls_->getTyped().scope_cache_[this].histograms_[name]; + } + + if (tls_ref && *tls_ref) { + return **tls_ref; } - TlsHistogramSharedPtr hist_tls_ptr(new ThreadLocalHistogramImpl( - final_name, parent_, std::move(tag_extracted_name), std::move(tags))); + + std::unique_lock lock(parent_.lock_); + std::vector tags; + std::string tag_extracted_name = parent_.getTagsForName(name, tags); + TlsHistogramSharedPtr hist_tls_ptr( + new ThreadLocalHistogramImpl(name, std::move(tag_extracted_name), std::move(tags))); + ParentHistogramImplSharedPtr& central_ref = central_cache_.histograms_[name]; central_ref->addTlsHistogram(hist_tls_ptr); if (tls_ref) { @@ -284,11 +295,11 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { return *hist_tls_ptr; } -ThreadLocalHistogramImpl::ThreadLocalHistogramImpl(const std::string& name, Store& parent, +ThreadLocalHistogramImpl::ThreadLocalHistogramImpl(const std::string& name, std::string&& tag_extracted_name, std::vector&& tags) - : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), parent_(parent), - current_active_(0), flags_(0) { + : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), current_active_(0), + flags_(0), created_thread_id_(std::this_thread::get_id()) { histograms_[0] = hist_alloc(); histograms_[1] = hist_alloc(); } @@ -299,8 +310,8 @@ ThreadLocalHistogramImpl::~ThreadLocalHistogramImpl() { } void ThreadLocalHistogramImpl::recordValue(uint64_t value) { + ASSERT(std::this_thread::get_id() == created_thread_id_); hist_insert_intscale(histograms_[current_active_], value, 0, 1); - parent_.deliverHistogramToSinks(*this, value); flags_ |= Flags::Used; } @@ -310,22 +321,28 @@ void ThreadLocalHistogramImpl::merge(histogram_t* target) { hist_clear(histograms_[other_histogram_index]); } -ParentHistogramImpl::ParentHistogramImpl(const std::string& name, Store& parent, +ParentHistogramImpl::ParentHistogramImpl(const std::string& name, Store& parent, TlsScope& tlsScope, std::string&& tag_extracted_name, std::vector&& tags) : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), parent_(parent), - interval_histogram_(hist_alloc()), cumulative_histogram_(hist_alloc()), + tlsScope_(tlsScope), interval_histogram_(hist_alloc()), cumulative_histogram_(hist_alloc()), interval_statistics_(interval_histogram_), cumulative_statistics_(cumulative_histogram_) {} -bool ParentHistogramImpl::used() const { - std::unique_lock lock(merge_lock_); - return usedLockHeld(); -} - ParentHistogramImpl::~ParentHistogramImpl() { hist_free(interval_histogram_); hist_free(cumulative_histogram_); } +void ParentHistogramImpl::recordValue(uint64_t value) { + Histogram& tls_histogram = tlsScope_.tlsHistogram(name()); + tls_histogram.recordValue(value); + parent_.deliverHistogramToSinks(*this, value); +} + +bool ParentHistogramImpl::used() const { + std::unique_lock lock(merge_lock_); + return usedLockHeld(); +} + void ParentHistogramImpl::merge() { std::unique_lock lock(merge_lock_); if (usedLockHeld()) { diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 98777a1cc157..c935a3baec0a 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -21,60 +21,65 @@ namespace Stats { * histograms, one to collect the values and other as backup that is used for merge process. The * swap happens during the merge process. */ -class ThreadLocalHistogram : public virtual Histogram { +class ThreadLocalHistogramImpl : public Histogram, public MetricImpl { public: - virtual ~ThreadLocalHistogram() {} - - /** - * Called in the beginning of merge process. Swaps the histogram used for collection so that we do - * not have to lock the histogram in high throughput TLS writes. - */ - virtual void beginMerge() PURE; -}; - -/** - * Log Linear Histogram implementation per thread. - */ -class ThreadLocalHistogramImpl : public ThreadLocalHistogram, public MetricImpl { -public: - ThreadLocalHistogramImpl(const std::string& name, Store& parent, std::string&& tag_extracted_name, + ThreadLocalHistogramImpl(const std::string& name, std::string&& tag_extracted_name, std::vector&& tags); ~ThreadLocalHistogramImpl(); - // Stats::Histogram - void recordValue(uint64_t value) override; + void merge(histogram_t* target); - bool used() const override { return flags_ & Flags::Used; } - void beginMerge() override { + /** + * Called in the beginning of merge process. Swaps the histogram used for collection so that we do + * not have to lock the histogram in high throughput TLS writes. + */ + void beginMerge() { // This switches the current_active_ between 1 and 0. current_active_ = otherHistogramIndex(); } - Store& parent_; + // Stats::Histogram + void recordValue(uint64_t value) override; + bool used() const override { return flags_ & Flags::Used; } private: uint64_t otherHistogramIndex() const { return 1 - current_active_; } uint64_t current_active_; histogram_t* histograms_[2]; std::atomic flags_; + std::thread::id created_thread_id_; }; typedef std::shared_ptr TlsHistogramSharedPtr; +/** + * Class used to create ThreadLocalHistogram in the scope. + */ +class TlsScope : public Scope { +public: + virtual ~TlsScope() {} + + /** + * @return a ThreadLocalHistogram within the scope's namespace. + */ + virtual Histogram& tlsHistogram(const std::string& name) PURE; +}; + /** * Log Linear Histogram implementation that is stored in the main thread. */ class ParentHistogramImpl : public ParentHistogram, public MetricImpl { public: - ParentHistogramImpl(const std::string& name, Store& parent, std::string&& tag_extracted_name, - std::vector&& tags); + ParentHistogramImpl(const std::string& name, Store& parent, TlsScope& tlsScope, + std::string&& tag_extracted_name, std::vector&& tags); virtual ~ParentHistogramImpl(); + void addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr); bool used() const override; - + void recordValue(uint64_t value) override; /** * This method is called during the main stats flush process for each of the histograms. It * iterates through the TLS histograms and collects the histogram data of all of them @@ -88,17 +93,17 @@ class ParentHistogramImpl : public ParentHistogram, public MetricImpl { return cumulative_statistics_; } - Store& parent_; - std::list tls_histograms_; - private: bool usedLockHeld() const; + Store& parent_; + TlsScope& tlsScope_; histogram_t* interval_histogram_; histogram_t* cumulative_histogram_; HistogramStatisticsImpl interval_statistics_; HistogramStatisticsImpl cumulative_statistics_; mutable std::mutex merge_lock_; + std::list tls_histograms_; }; typedef std::shared_ptr ParentHistogramImplSharedPtr; @@ -136,8 +141,9 @@ typedef std::shared_ptr ParentHistogramImplSharedPtr; * Each Histogram implementation will have 2 parts. * - "main" thread parent which is called "ParentHistogram". * - "per-thread" collector which is called "ThreadLocalHistogram". - * Worker threads will write into their per-thread collector, without needing any locking. - * During the flush process the following sequence is followed. + * Worker threads will write to ParentHistogram which checks whether a TLS histogram is available. + * If there is a TLS histogram already available it will write to it otherwise creates one and + * writes to it. During the flush process the following sequence is followed. * - The main thread starts the flush process by posting a message to every worker which tells the * worker to swap its "active" histogram with its "backup" histogram. This is acheived via a call * to "beginMerge" method. @@ -198,7 +204,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo std::unordered_map histograms_; }; - struct ScopeImpl : public Scope { + struct ScopeImpl : public TlsScope { ScopeImpl(ThreadLocalStoreImpl& parent, const std::string& prefix) : parent_(parent), prefix_(Utility::sanitizeStatsName(prefix)) {} ~ScopeImpl(); @@ -211,6 +217,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo void deliverHistogramToSinks(const Histogram& histogram, uint64_t value) override; Gauge& gauge(const std::string& name) override; Histogram& histogram(const std::string& name) override; + Histogram& tlsHistogram(const std::string& name) override; ThreadLocalStoreImpl& parent_; const std::string prefix_; diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 962fe264eeff..3866a5cb05ad 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -217,6 +217,7 @@ TEST_F(StatsThreadLocalStoreTest, NoTls) { EXPECT_EQ(&g1, &store_->gauge("g1")); Histogram& h1 = store_->histogram("h1"); + EXPECT_EQ(&h1, &store_->histogram("h1")); EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 200)); h1.recordValue(200); diff --git a/test/mocks/stats/mocks.cc b/test/mocks/stats/mocks.cc index a62ff79f8faf..7aea6e68deb7 100644 --- a/test/mocks/stats/mocks.cc +++ b/test/mocks/stats/mocks.cc @@ -39,6 +39,11 @@ MockHistogram::MockHistogram() { MockHistogram::~MockHistogram() {} MockParentHistogram::MockParentHistogram() { + ON_CALL(*this, recordValue(_)).WillByDefault(Invoke([this](uint64_t value) { + if (store_ != nullptr) { + store_->deliverHistogramToSinks(*this, value); + } + })); ON_CALL(*this, tagExtractedName()).WillByDefault(ReturnRef(name_)); ON_CALL(*this, tags()).WillByDefault(ReturnRef(tags_)); ON_CALL(*this, intervalStatistics()).WillByDefault(ReturnRef(*histogram_stats_)); diff --git a/test/mocks/stats/mocks.h b/test/mocks/stats/mocks.h index 7012b96f4750..9df1ae1014a0 100644 --- a/test/mocks/stats/mocks.h +++ b/test/mocks/stats/mocks.h @@ -88,6 +88,7 @@ class MockParentHistogram : public ParentHistogram { MOCK_CONST_METHOD0(used, bool()); MOCK_CONST_METHOD0(tagExtractedName, const std::string&()); MOCK_CONST_METHOD0(tags, const std::vector&()); + MOCK_METHOD1(recordValue, void(uint64_t value)); MOCK_CONST_METHOD0(cumulativeStatistics, const HistogramStatistics&()); MOCK_CONST_METHOD0(intervalStatistics, const HistogramStatistics&()); From 4930a24b6c6e90948adeeaca97996a75154772e6 Mon Sep 17 00:00:00 2001 From: Rama Date: Sat, 21 Apr 2018 19:10:12 +0530 Subject: [PATCH 07/16] address review comments Signed-off-by: Rama --- source/common/stats/thread_local_store.h | 4 ++-- .../common/thread_local/thread_local_impl_test.cc | 15 +++++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index c935a3baec0a..eb4d7f60e2b9 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -142,8 +142,8 @@ typedef std::shared_ptr ParentHistogramImplSharedPtr; * - "main" thread parent which is called "ParentHistogram". * - "per-thread" collector which is called "ThreadLocalHistogram". * Worker threads will write to ParentHistogram which checks whether a TLS histogram is available. - * If there is a TLS histogram already available it will write to it otherwise creates one and - * writes to it. During the flush process the following sequence is followed. + * If there is one it will write to it, otherwise creates new one and writes to it. + * During the flush process the following sequence is followed. * - The main thread starts the flush process by posting a message to every worker which tells the * worker to swap its "active" histogram with its "backup" histogram. This is acheived via a call * to "beginMerge" method. diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 59519891363e..0a442684e4b6 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -82,6 +82,9 @@ TEST_F(ThreadLocalInstanceImplTest, All) { tls_.shutdownThread(); } +// TODO(ramaraochavali): Run this test with real threads. The current issue in the unit +// testing environment is, the post to main_dispatcher is not working as expected. + // Validate ThreadLocal::runOnAllThreads behavior with all_thread_complete call back. TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { SlotPtr tlsptr = tls_.allocateSlot(); @@ -93,16 +96,16 @@ TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { struct { uint64_t thread_local_calls_{0}; bool all_threads_complete_ = false; - } thread_local_data; + } thread_status; tlsptr->runOnAllThreads( - [&thread_local_data]() -> void { ++thread_local_data.thread_local_calls_; }, - [&thread_local_data]() -> void { - EXPECT_EQ(thread_local_data.thread_local_calls_, 1); - thread_local_data.all_threads_complete_ = true; + [&thread_status]() -> void { ++thread_status.thread_local_calls_; }, + [&thread_status]() -> void { + EXPECT_EQ(thread_status.thread_local_calls_, 1); + thread_status.all_threads_complete_ = true; }); - EXPECT_TRUE(thread_local_data.all_threads_complete_); + EXPECT_TRUE(thread_status.all_threads_complete_); tls_.shutdownGlobalThreading(); tls_.shutdownThread(); From e9e685ab23bc08c704822db00922058994b5e66b Mon Sep 17 00:00:00 2001 From: Rama Date: Sat, 21 Apr 2018 19:13:17 +0530 Subject: [PATCH 08/16] formatted Signed-off-by: Rama --- source/common/stats/thread_local_store.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index eb4d7f60e2b9..7d8348206e0c 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -142,7 +142,7 @@ typedef std::shared_ptr ParentHistogramImplSharedPtr; * - "main" thread parent which is called "ParentHistogram". * - "per-thread" collector which is called "ThreadLocalHistogram". * Worker threads will write to ParentHistogram which checks whether a TLS histogram is available. - * If there is one it will write to it, otherwise creates new one and writes to it. + * If there is one it will write to it, otherwise creates new one and writes to it. * During the flush process the following sequence is followed. * - The main thread starts the flush process by posting a message to every worker which tells the * worker to swap its "active" histogram with its "backup" histogram. This is acheived via a call From d6b613dee5715870e86c5ffc210d47d663e5954d Mon Sep 17 00:00:00 2001 From: Rama Date: Sat, 21 Apr 2018 19:15:39 +0530 Subject: [PATCH 09/16] formatted Signed-off-by: Rama --- test/common/thread_local/thread_local_impl_test.cc | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 0a442684e4b6..bb7fe0810ae8 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -98,12 +98,11 @@ TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreads) { bool all_threads_complete_ = false; } thread_status; - tlsptr->runOnAllThreads( - [&thread_status]() -> void { ++thread_status.thread_local_calls_; }, - [&thread_status]() -> void { - EXPECT_EQ(thread_status.thread_local_calls_, 1); - thread_status.all_threads_complete_ = true; - }); + tlsptr->runOnAllThreads([&thread_status]() -> void { ++thread_status.thread_local_calls_; }, + [&thread_status]() -> void { + EXPECT_EQ(thread_status.thread_local_calls_, 1); + thread_status.all_threads_complete_ = true; + }); EXPECT_TRUE(thread_status.all_threads_complete_); From a3b7b972b5b9049023fbab221dcc99abaee19606 Mon Sep 17 00:00:00 2001 From: Rama Date: Sun, 22 Apr 2018 09:51:42 +0530 Subject: [PATCH 10/16] addressed review feedback Signed-off-by: Rama --- source/common/stats/thread_local_store.cc | 17 ++++++++++++----- source/server/server.cc | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 12f223afc3fb..7259f07a018d 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -270,6 +270,8 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { } Histogram& ThreadLocalStoreImpl::ScopeImpl::tlsHistogram(const std::string& name) { + // See comments in counter() which explains the logic here. + // Here prefix will not be considered because, by the time ParentHistogram calls this method // during recordValue, the prefix is already attached to the name. TlsHistogramSharedPtr* tls_ref = nullptr; @@ -284,8 +286,9 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::tlsHistogram(const std::string& name std::unique_lock lock(parent_.lock_); std::vector tags; std::string tag_extracted_name = parent_.getTagsForName(name, tags); - TlsHistogramSharedPtr hist_tls_ptr( - new ThreadLocalHistogramImpl(name, std::move(tag_extracted_name), std::move(tags))); + TlsHistogramSharedPtr hist_tls_ptr = std::make_shared( + name, std::move(tag_extracted_name), std::move(tags)); + ParentHistogramImplSharedPtr& central_ref = central_cache_.histograms_[name]; central_ref->addTlsHistogram(hist_tls_ptr); @@ -316,9 +319,9 @@ void ThreadLocalHistogramImpl::recordValue(uint64_t value) { } void ThreadLocalHistogramImpl::merge(histogram_t* target) { - const uint64_t other_histogram_index = otherHistogramIndex(); - hist_accumulate(target, &histograms_[other_histogram_index], 1); - hist_clear(histograms_[other_histogram_index]); + histogram_t** other_histogram = &histograms_[otherHistogramIndex()]; + hist_accumulate(target, other_histogram, 1); + hist_clear(*other_histogram); } ParentHistogramImpl::ParentHistogramImpl(const std::string& name, Store& parent, TlsScope& tlsScope, @@ -347,6 +350,10 @@ void ParentHistogramImpl::merge() { std::unique_lock lock(merge_lock_); if (usedLockHeld()) { hist_clear(interval_histogram_); + // Here we could copy all the pointers to TLS histograms in the tls_histogram_ list, + // then release the lock before we do the actual merge. However it is not a big deal + // because the tls_histogram merge is not that expensive as it is a single histogram + // merge and adding TLS histograms is rare. for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) { tls_histogram->merge(interval_histogram_); } diff --git a/source/server/server.cc b/source/server/server.cc index d2f499216b72..b49d2f563c22 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -138,7 +138,7 @@ void InstanceUtil::flushMetricsToSinks(const std::list& sinks, void InstanceImpl::flushStats() { ENVOY_LOG(debug, "flushing stats"); // A shutdown initiated before this callback may prevent this from being called as per - // the semantics documented in ThreadLocal's runAllThreads method. + // the semantics documented in ThreadLocal's runOnAllThreads method. stats_store_.mergeHistograms([this]() -> void { HotRestart::GetParentStatsInfo info; restarter_.getParentStats(info); From e4805b7949b3c5a2f9ceb49b9e28945f70648f4f Mon Sep 17 00:00:00 2001 From: Rama Date: Sun, 22 Apr 2018 19:47:18 +0530 Subject: [PATCH 11/16] ensure only one merge is in progress Signed-off-by: Rama --- source/common/stats/thread_local_store.cc | 4 +++- source/common/stats/thread_local_store.h | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 7259f07a018d..0107f7e983bf 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -100,7 +100,8 @@ void ThreadLocalStoreImpl::shutdownThreading() { } void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) { - if (!shutting_down_) { + if (!shutting_down_ && !merge_in_progress_) { + merge_in_progress_ = true; tls_->runOnAllThreads( [this]() -> void { for (ScopeImpl* scope : scopes_) { @@ -121,6 +122,7 @@ void ThreadLocalStoreImpl::mergeInternal(PostMergeCb merge_complete_cb) { histogram->merge(); } merge_complete_cb(); + merge_in_progress_ = false; } } diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 7d8348206e0c..b41871176b0b 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -248,6 +248,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo std::list> timer_sinks_; TagProducerPtr tag_producer_; std::atomic shutting_down_{}; + std::atomic merge_in_progress_{}; Counter& num_last_resort_stats_; HeapRawStatDataAllocator heap_allocator_; }; From 0a1927f29007fe26dd2fde71a20921c92f454cdb Mon Sep 17 00:00:00 2001 From: Rama Date: Mon, 23 Apr 2018 10:02:36 +0530 Subject: [PATCH 12/16] moved variable declarations Signed-off-by: Rama --- test/common/stats/thread_local_store_test.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 3866a5cb05ad..803cdd54d39a 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -94,7 +94,6 @@ class StatsThreadLocalStoreTest : public testing::Test, public RawStatDataAlloca class HistogramTest : public testing::Test, public RawStatDataAllocator { public: typedef std::map NameHistogramMap; - InSequence s; void SetUp() override { ON_CALL(*this, alloc(_)).WillByDefault(Invoke([this](const std::string& name) -> RawStatData* { @@ -118,9 +117,6 @@ class HistogramTest : public testing::Test, public RawStatDataAllocator { EXPECT_CALL(*this, free(_)); } - std::vector h1_cumulative_values_, h2_cumulative_values_, h1_interval_values_, - h2_interval_values_; - NameHistogramMap makeHistogramMap(const std::list& hist_list) { NameHistogramMap name_histogram_map; for (const Stats::ParentHistogramSharedPtr& histogram : hist_list) { @@ -204,6 +200,9 @@ class HistogramTest : public testing::Test, public RawStatDataAllocator { TestAllocator alloc_; MockSink sink_; std::unique_ptr store_; + InSequence s; + std::vector h1_cumulative_values_, h2_cumulative_values_, h1_interval_values_, + h2_interval_values_; }; TEST_F(StatsThreadLocalStoreTest, NoTls) { From 0707303045b0108fec917c2a1406579c338b7be8 Mon Sep 17 00:00:00 2001 From: Rama Date: Mon, 23 Apr 2018 19:48:40 +0530 Subject: [PATCH 13/16] added ASSERT for merge in progress Signed-off-by: Rama --- include/envoy/stats/stats.h | 3 ++- source/common/stats/thread_local_store.cc | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/include/envoy/stats/stats.h b/include/envoy/stats/stats.h index 7993b7cd4f6e..f11045a6169d 100644 --- a/include/envoy/stats/stats.h +++ b/include/envoy/stats/stats.h @@ -367,7 +367,8 @@ class StoreRoot : public Store { * Called during the flush process to merge all the thread local histograms. The passed in * callback will be called on the main thread, but it will happen after the method returns * which means that the actual flush process will happen on the main thread after this method - * returns. + * returns. It is expected that only one merge runs at any time and concurrent calls to this + * method would be asserted. */ virtual void mergeHistograms(PostMergeCb merge_complete_cb) PURE; }; diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 0107f7e983bf..65f91ec65dcc 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -100,7 +100,8 @@ void ThreadLocalStoreImpl::shutdownThreading() { } void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) { - if (!shutting_down_ && !merge_in_progress_) { + ASSERT(!merge_in_progress_); + if (!shutting_down_) { merge_in_progress_ = true; tls_->runOnAllThreads( [this]() -> void { From 1eaf0fdd594ee1e7a6eca3977eccc3d36947585d Mon Sep 17 00:00:00 2001 From: Rama Date: Mon, 23 Apr 2018 20:59:25 +0530 Subject: [PATCH 14/16] addressed review comments Signed-off-by: Rama --- source/common/stats/thread_local_store.cc | 8 +++---- source/common/stats/thread_local_store.h | 29 +++++++++++++---------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 65f91ec65dcc..b5231aa369a4 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -272,7 +272,8 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { return *central_ref; } -Histogram& ThreadLocalStoreImpl::ScopeImpl::tlsHistogram(const std::string& name) { +Histogram& ThreadLocalStoreImpl::ScopeImpl::tlsHistogram(const std::string& name, + ParentHistogramImpl& parent) { // See comments in counter() which explains the logic here. // Here prefix will not be considered because, by the time ParentHistogram calls this method @@ -292,8 +293,7 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::tlsHistogram(const std::string& name TlsHistogramSharedPtr hist_tls_ptr = std::make_shared( name, std::move(tag_extracted_name), std::move(tags)); - ParentHistogramImplSharedPtr& central_ref = central_cache_.histograms_[name]; - central_ref->addTlsHistogram(hist_tls_ptr); + parent.addTlsHistogram(hist_tls_ptr); if (tls_ref) { *tls_ref = hist_tls_ptr; @@ -339,7 +339,7 @@ ParentHistogramImpl::~ParentHistogramImpl() { } void ParentHistogramImpl::recordValue(uint64_t value) { - Histogram& tls_histogram = tlsScope_.tlsHistogram(name()); + Histogram& tls_histogram = tlsScope_.tlsHistogram(name(), *this); tls_histogram.recordValue(value); parent_.deliverHistogramToSinks(*this, value); } diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index b41871176b0b..5e1fd96fdd2b 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -53,18 +53,7 @@ class ThreadLocalHistogramImpl : public Histogram, public MetricImpl { typedef std::shared_ptr TlsHistogramSharedPtr; -/** - * Class used to create ThreadLocalHistogram in the scope. - */ -class TlsScope : public Scope { -public: - virtual ~TlsScope() {} - - /** - * @return a ThreadLocalHistogram within the scope's namespace. - */ - virtual Histogram& tlsHistogram(const std::string& name) PURE; -}; +class TlsScope; /** * Log Linear Histogram implementation that is stored in the main thread. @@ -108,6 +97,20 @@ class ParentHistogramImpl : public ParentHistogram, public MetricImpl { typedef std::shared_ptr ParentHistogramImplSharedPtr; +/** + * Class used to create ThreadLocalHistogram in the scope. + */ +class TlsScope : public Scope { +public: + virtual ~TlsScope() {} + + /** + * @return a ThreadLocalHistogram within the scope's namespace. + * @param name name of the histogram with scope prefix attached. + */ + virtual Histogram& tlsHistogram(const std::string& name, ParentHistogramImpl& parent) PURE; +}; + /** * Store implementation with thread local caching. This implementation supports the following * features: @@ -217,7 +220,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo void deliverHistogramToSinks(const Histogram& histogram, uint64_t value) override; Gauge& gauge(const std::string& name) override; Histogram& histogram(const std::string& name) override; - Histogram& tlsHistogram(const std::string& name) override; + Histogram& tlsHistogram(const std::string& name, ParentHistogramImpl& parent) override; ThreadLocalStoreImpl& parent_; const std::string prefix_; From 1fc140ad0f7aa0e7bd6c297c8def21f11e255aaf Mon Sep 17 00:00:00 2001 From: Rama Date: Mon, 23 Apr 2018 22:37:12 +0530 Subject: [PATCH 15/16] added tls cache for parent histograms Signed-off-by: Rama --- source/common/stats/thread_local_store.cc | 14 ++++++++++++++ source/common/stats/thread_local_store.h | 1 + 2 files changed, 15 insertions(+) diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index b5231aa369a4..e1b7186d6bad 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -256,6 +256,16 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { // See comments in counter(). There is no super clean way (via templates or otherwise) to // share this code so I'm leaving it largely duplicated for now. std::string final_name = prefix_ + name; + ParentHistogramSharedPtr* tls_ref = nullptr; + + if (!parent_.shutting_down_ && parent_.tls_) { + tls_ref = &parent_.tls_->getTyped().scope_cache_[this].parent_histograms_[final_name]; + } + + if (tls_ref && *tls_ref) { + return **tls_ref; + } + std::unique_lock lock(parent_.lock_); ParentHistogramImplSharedPtr& central_ref = central_cache_.histograms_[final_name]; @@ -269,6 +279,10 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { std::move(central_tag_extracted_name), std::move(central_tags))); } + + if (tls_ref) { + *tls_ref = central_ref; + } return *central_ref; } diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 5e1fd96fdd2b..5e28e8a6ad90 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -199,6 +199,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo std::unordered_map counters_; std::unordered_map gauges_; std::unordered_map histograms_; + std::unordered_map parent_histograms_; }; struct CentralCacheEntry { From e26731ff486e9870355770f4a60e17c13f194985 Mon Sep 17 00:00:00 2001 From: Rama Date: Mon, 23 Apr 2018 23:03:59 +0530 Subject: [PATCH 16/16] addressed review comments Signed-off-by: Rama --- docs/root/operations/admin.rst | 2 +- source/common/stats/thread_local_store.cc | 9 +++++---- source/common/stats/thread_local_store.h | 3 ++- source/server/http/admin.cc | 1 + 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/docs/root/operations/admin.rst b/docs/root/operations/admin.rst index adfe07c711d0..43c10553263e 100644 --- a/docs/root/operations/admin.rst +++ b/docs/root/operations/admin.rst @@ -182,7 +182,7 @@ The fields are: .. http:get:: /stats Outputs all statistics on demand. This command is very useful for local debugging. - Histograms will output the computed quantiles i.e P0,P25,P50,P75,P90, P99, P99.9 and P100. + Histograms will output the computed quantiles i.e P0,P25,P50,P75,P90,P99,P99.9 and P100. The output for each quantile will be in the form of (inteval,cumulative) where interval value represents the summary since last flush interval and cumulative value represents the summary since the start of envoy instance. diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index e1b7186d6bad..d929566241ae 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -341,10 +341,11 @@ void ThreadLocalHistogramImpl::merge(histogram_t* target) { hist_clear(*other_histogram); } -ParentHistogramImpl::ParentHistogramImpl(const std::string& name, Store& parent, TlsScope& tlsScope, - std::string&& tag_extracted_name, std::vector&& tags) +ParentHistogramImpl::ParentHistogramImpl(const std::string& name, Store& parent, + TlsScope& tls_scope, std::string&& tag_extracted_name, + std::vector&& tags) : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), parent_(parent), - tlsScope_(tlsScope), interval_histogram_(hist_alloc()), cumulative_histogram_(hist_alloc()), + tls_scope_(tls_scope), interval_histogram_(hist_alloc()), cumulative_histogram_(hist_alloc()), interval_statistics_(interval_histogram_), cumulative_statistics_(cumulative_histogram_) {} ParentHistogramImpl::~ParentHistogramImpl() { @@ -353,7 +354,7 @@ ParentHistogramImpl::~ParentHistogramImpl() { } void ParentHistogramImpl::recordValue(uint64_t value) { - Histogram& tls_histogram = tlsScope_.tlsHistogram(name(), *this); + Histogram& tls_histogram = tls_scope_.tlsHistogram(name(), *this); tls_histogram.recordValue(value); parent_.deliverHistogramToSinks(*this, value); } diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 5e28e8a6ad90..6520cb4c3057 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -86,7 +86,7 @@ class ParentHistogramImpl : public ParentHistogram, public MetricImpl { bool usedLockHeld() const; Store& parent_; - TlsScope& tlsScope_; + TlsScope& tls_scope_; histogram_t* interval_histogram_; histogram_t* cumulative_histogram_; HistogramStatisticsImpl interval_statistics_; @@ -104,6 +104,7 @@ class TlsScope : public Scope { public: virtual ~TlsScope() {} + // TODO(ramaraochavali): Allow direct TLS access for the advanced consumers. /** * @return a ThreadLocalHistogram within the scope's namespace. * @param name name of the histogram with scope prefix attached. diff --git a/source/server/http/admin.cc b/source/server/http/admin.cc index e521615f0f21..7d1414a2c408 100644 --- a/source/server/http/admin.cc +++ b/source/server/http/admin.cc @@ -405,6 +405,7 @@ Http::Code AdminImpl::handlerStats(absl::string_view url, Http::HeaderMap& respo std::vector summary; const std::vector& supported_quantiles_ref = histogram->intervalStatistics().supportedQuantiles(); + summary.reserve(supported_quantiles_ref.size()); for (size_t i = 0; i < supported_quantiles_ref.size(); ++i) { summary.push_back(fmt::format("P{}({},{})", 100 * supported_quantiles_ref[i], histogram->intervalStatistics().computedQuantiles()[i],