From 500c7d6567e3843f7e38bbc07fe47e761368bae4 Mon Sep 17 00:00:00 2001 From: Gianni Gambetti <99784476+ggambetti@users.noreply.github.com> Date: Thu, 14 Apr 2022 12:27:04 -0400 Subject: [PATCH] Adds Shutdown to Metrics API Short lived applications run the risk of losing metrics that are generated near the end of their lifetimes because MetricSinks can and do buffer data locally. The exact amount of data loss depends on MetricSink implementation, caller configuration, and the timing of the last metric sync. This adds a Shutdown function to the package API and the MetricSink interface. Shutdown flushes locally buffered data to storage, and then frees resources allocated to the MetricSink. Currently not all MetricSinks support exiting, and so resource release when calling Shutdown is best effort. Since Shutdown is intended for use immediately prior to application exit this is deemed acceptable, since resource leakage is minimized in this case. --- circonus/circonus.go | 7 +++++++ circonus/circonus_test.go | 7 +++++++ const_unix.go | 1 + const_windows.go | 1 + datadog/dogstatsd.go | 4 ++++ datadog/dogstatsd_test.go | 5 +++++ inmem.go | 4 ++++ metrics.go | 4 ++++ prometheus/prometheus.go | 7 ++++++- prometheus/prometheus_test.go | 12 ++++++++++-- sink.go | 10 ++++++++++ sink_test.go | 1 + start.go | 10 ++++++++++ 13 files changed, 70 insertions(+), 3 deletions(-) diff --git a/circonus/circonus.go b/circonus/circonus.go index eb41b99..2892d24 100644 --- a/circonus/circonus.go +++ b/circonus/circonus.go @@ -97,6 +97,13 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m s.metrics.RecordValue(flatKey, float64(val)) } +func (s *CirconusSink) Shutdown() { + // The used version of the circonus metrics library does not support a shutdown operation. + // Instead we call Flush which blocks until metrics are submitted to storage, and then exit + // as the README examples do. + s.metrics.Flush() +} + // Flattens key to Circonus metric name func (s *CirconusSink) flattenKey(parts []string) string { joined := strings.Join(parts, "`") diff --git a/circonus/circonus_test.go b/circonus/circonus_test.go index 2644d57..77e9f57 100644 --- a/circonus/circonus_test.go +++ b/circonus/circonus_test.go @@ -8,6 +8,8 @@ import ( "net/http/httptest" "strings" "testing" + + "github.com/armon/go-metrics" ) func TestNewCirconusSink(t *testing.T) { @@ -152,3 +154,8 @@ func TestAddSample(t *testing.T) { } } + +func TestMetricSinkInterface(t *testing.T) { + var cs *CirconusSink + _ = metrics.MetricSink(cs) +} diff --git a/const_unix.go b/const_unix.go index 31098dd..511202d 100644 --- a/const_unix.go +++ b/const_unix.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package metrics diff --git a/const_windows.go b/const_windows.go index 38136af..6bb1897 100644 --- a/const_windows.go +++ b/const_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package metrics diff --git a/datadog/dogstatsd.go b/datadog/dogstatsd.go index fe021d0..637ddf8 100644 --- a/datadog/dogstatsd.go +++ b/datadog/dogstatsd.go @@ -120,6 +120,10 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels [] s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate) } +func (s *DogStatsdSink) Shutdown() { + s.client.Close() +} + func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) { key, parsedLabels := s.parseKey(key) flatKey := s.flattenKey(key) diff --git a/datadog/dogstatsd_test.go b/datadog/dogstatsd_test.go index cd3f833..3545e8d 100644 --- a/datadog/dogstatsd_test.go +++ b/datadog/dogstatsd_test.go @@ -151,3 +151,8 @@ func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte, t.Fatalf("Line %s does not match expected: %s", string(msg), expected) } } + +func TestMetricSinkInterface(t *testing.T) { + var dd *DogStatsdSink + _ = metrics.MetricSink(dd) +} diff --git a/inmem.go b/inmem.go index 7c427ac..a52d0ff 100644 --- a/inmem.go +++ b/inmem.go @@ -230,6 +230,10 @@ func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Labe agg.Ingest(float64(val), i.rateDenom) } +func (i *InmemSink) Shutdown() { + // Do nothing. InmemSink does not have cleanup associated with shutdown. +} + // Data is used to retrieve all the aggregated metrics // Intervals may be in use, and a read lock should be acquired func (i *InmemSink) Data() []*IntervalMetrics { diff --git a/metrics.go b/metrics.go index 6753b13..047657b 100644 --- a/metrics.go +++ b/metrics.go @@ -172,6 +172,10 @@ func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabe } } +func (m *Metrics) Shutdown() { + m.sink.Shutdown() +} + // labelIsAllowed return true if a should be included in metric // the caller should lock m.filterLock while calling this method func (m *Metrics) labelIsAllowed(label *Label) bool { diff --git a/prometheus/prometheus.go b/prometheus/prometheus.go index 5a8282f..9789e20 100644 --- a/prometheus/prometheus.go +++ b/prometheus/prometheus.go @@ -1,3 +1,4 @@ +//go:build go1.9 // +build go1.9 package prometheus @@ -20,7 +21,7 @@ var ( // PrometheusSink. DefaultPrometheusOpts = PrometheusOpts{ Expiration: 60 * time.Second, - Name: "default_prometheus_sink", + Name: "default_prometheus_sink", } ) @@ -393,6 +394,10 @@ func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labe } } +// Shutdown is not implemented. PrometheusSink is in memory storage. +func (p *PrometheusSink) Shutdown() { +} + // PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address // on an interval. type PrometheusPushSink struct { diff --git a/prometheus/prometheus_test.go b/prometheus/prometheus_test.go index 6190874..321a7b9 100644 --- a/prometheus/prometheus_test.go +++ b/prometheus/prometheus_test.go @@ -54,6 +54,7 @@ func TestNewPrometheusSink(t *testing.T) { t.Fatalf("Unregister(sink) = false, want true") } } + // TestMultiplePrometheusSink tests registering multiple sinks on the same registerer with different descriptors func TestMultiplePrometheusSink(t *testing.T) { gaugeDef := GaugeDefinition{ @@ -66,14 +67,14 @@ func TestMultiplePrometheusSink(t *testing.T) { GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef), SummaryDefinitions: append([]SummaryDefinition{}), CounterDefinitions: append([]CounterDefinition{}), - Name: "sink1", + Name: "sink1", } sink1, err := NewPrometheusSinkFrom(cfg) if err != nil { t.Fatalf("err = %v, want nil", err) } - + reg := prometheus.DefaultRegisterer if reg == nil { t.Fatalf("Expected default register to be non nil, got nil.") @@ -359,3 +360,10 @@ func TestDefinitionsWithLabels(t *testing.T) { return true }) } + +func TestMetricSinkInterface(t *testing.T) { + var ps *PrometheusSink + _ = metrics.MetricSink(ps) + var pps *PrometheusPushSink + _ = metrics.MetricSink(pps) +} diff --git a/sink.go b/sink.go index 0b7d6e4..4e6a8df 100644 --- a/sink.go +++ b/sink.go @@ -22,6 +22,9 @@ type MetricSink interface { // Samples are for timing information, where quantiles are used AddSample(key []string, val float32) AddSampleWithLabels(key []string, val float32, labels []Label) + + // Shutdown the sink, flushing data, and performing cleanup as necessary. + Shutdown() } // BlackholeSink is used to just blackhole messages @@ -34,6 +37,7 @@ func (*BlackholeSink) IncrCounter(key []string, val float32) func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {} func (*BlackholeSink) AddSample(key []string, val float32) {} func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {} +func (*BlackholeSink) Shutdown() {} // FanoutSink is used to sink to fanout values to multiple sinks type FanoutSink []MetricSink @@ -74,6 +78,12 @@ func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Lab } } +func (fh FanoutSink) Shutdown() { + for _, s := range fh { + s.Shutdown() + } +} + // sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided // by each sink type type sinkURLFactoryFunc func(*url.URL) (MetricSink, error) diff --git a/sink_test.go b/sink_test.go index 36da370..e10f678 100644 --- a/sink_test.go +++ b/sink_test.go @@ -63,6 +63,7 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label m.vals = append(m.vals, val) m.labels = append(m.labels, labels) } +func (m *MockSink) Shutdown() {} func TestFanoutSink_Gauge(t *testing.T) { m1 := &MockSink{} diff --git a/start.go b/start.go index 6aa0bd3..7747d0b 100644 --- a/start.go +++ b/start.go @@ -144,3 +144,13 @@ func UpdateFilter(allow, block []string) { func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) { globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels) } + +// Shutdown flushes and disables metric collection, blocking while waiting for this to complete. +// WARNING: Not all MetricSink backends support this functionality, and calling this will cause resource leaks. +// This is intended for use immediately prior to application exit. +func Shutdown() { + m := globalMetrics.Load().(*Metrics) + // Replace global metrics with the BlackholeSink like how init setup the library. + globalMetrics.Store(&Metrics{sink: &BlackholeSink{}}) + m.Shutdown() +}