diff --git a/circonus/circonus.go b/circonus/circonus.go index eb41b99..2892d24 100644 --- a/circonus/circonus.go +++ b/circonus/circonus.go @@ -97,6 +97,13 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m s.metrics.RecordValue(flatKey, float64(val)) } +func (s *CirconusSink) Shutdown() { + // The used version of the circonus metrics library does not support a shutdown operation. + // Instead we call Flush which blocks until metrics are submitted to storage, and then exit + // as the README examples do. + s.metrics.Flush() +} + // Flattens key to Circonus metric name func (s *CirconusSink) flattenKey(parts []string) string { joined := strings.Join(parts, "`") diff --git a/circonus/circonus_test.go b/circonus/circonus_test.go index 2644d57..77e9f57 100644 --- a/circonus/circonus_test.go +++ b/circonus/circonus_test.go @@ -8,6 +8,8 @@ import ( "net/http/httptest" "strings" "testing" + + "github.com/armon/go-metrics" ) func TestNewCirconusSink(t *testing.T) { @@ -152,3 +154,8 @@ func TestAddSample(t *testing.T) { } } + +func TestMetricSinkInterface(t *testing.T) { + var cs *CirconusSink + _ = metrics.MetricSink(cs) +} diff --git a/const_unix.go b/const_unix.go index 31098dd..511202d 100644 --- a/const_unix.go +++ b/const_unix.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package metrics diff --git a/const_windows.go b/const_windows.go index 38136af..6bb1897 100644 --- a/const_windows.go +++ b/const_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package metrics diff --git a/datadog/dogstatsd.go b/datadog/dogstatsd.go index fe021d0..637ddf8 100644 --- a/datadog/dogstatsd.go +++ b/datadog/dogstatsd.go @@ -120,6 +120,10 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels [] s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate) } +func (s *DogStatsdSink) Shutdown() { + s.client.Close() +} + func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) { key, parsedLabels := s.parseKey(key) flatKey := s.flattenKey(key) diff --git a/datadog/dogstatsd_test.go b/datadog/dogstatsd_test.go index cd3f833..3545e8d 100644 --- a/datadog/dogstatsd_test.go +++ b/datadog/dogstatsd_test.go @@ -151,3 +151,8 @@ func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte, t.Fatalf("Line %s does not match expected: %s", string(msg), expected) } } + +func TestMetricSinkInterface(t *testing.T) { + var dd *DogStatsdSink + _ = metrics.MetricSink(dd) +} diff --git a/inmem.go b/inmem.go index 7c427ac..a52d0ff 100644 --- a/inmem.go +++ b/inmem.go @@ -230,6 +230,10 @@ func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Labe agg.Ingest(float64(val), i.rateDenom) } +func (i *InmemSink) Shutdown() { + // Do nothing. InmemSink does not have cleanup associated with shutdown. +} + // Data is used to retrieve all the aggregated metrics // Intervals may be in use, and a read lock should be acquired func (i *InmemSink) Data() []*IntervalMetrics { diff --git a/metrics.go b/metrics.go index 6753b13..047657b 100644 --- a/metrics.go +++ b/metrics.go @@ -172,6 +172,10 @@ func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabe } } +func (m *Metrics) Shutdown() { + m.sink.Shutdown() +} + // labelIsAllowed return true if a should be included in metric // the caller should lock m.filterLock while calling this method func (m *Metrics) labelIsAllowed(label *Label) bool { diff --git a/prometheus/prometheus.go b/prometheus/prometheus.go index 5a8282f..9789e20 100644 --- a/prometheus/prometheus.go +++ b/prometheus/prometheus.go @@ -1,3 +1,4 @@ +//go:build go1.9 // +build go1.9 package prometheus @@ -20,7 +21,7 @@ var ( // PrometheusSink. DefaultPrometheusOpts = PrometheusOpts{ Expiration: 60 * time.Second, - Name: "default_prometheus_sink", + Name: "default_prometheus_sink", } ) @@ -393,6 +394,10 @@ func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labe } } +// Shutdown is not implemented. PrometheusSink is in memory storage. +func (p *PrometheusSink) Shutdown() { +} + // PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address // on an interval. type PrometheusPushSink struct { diff --git a/prometheus/prometheus_test.go b/prometheus/prometheus_test.go index 6190874..321a7b9 100644 --- a/prometheus/prometheus_test.go +++ b/prometheus/prometheus_test.go @@ -54,6 +54,7 @@ func TestNewPrometheusSink(t *testing.T) { t.Fatalf("Unregister(sink) = false, want true") } } + // TestMultiplePrometheusSink tests registering multiple sinks on the same registerer with different descriptors func TestMultiplePrometheusSink(t *testing.T) { gaugeDef := GaugeDefinition{ @@ -66,14 +67,14 @@ func TestMultiplePrometheusSink(t *testing.T) { GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef), SummaryDefinitions: append([]SummaryDefinition{}), CounterDefinitions: append([]CounterDefinition{}), - Name: "sink1", + Name: "sink1", } sink1, err := NewPrometheusSinkFrom(cfg) if err != nil { t.Fatalf("err = %v, want nil", err) } - + reg := prometheus.DefaultRegisterer if reg == nil { t.Fatalf("Expected default register to be non nil, got nil.") @@ -359,3 +360,10 @@ func TestDefinitionsWithLabels(t *testing.T) { return true }) } + +func TestMetricSinkInterface(t *testing.T) { + var ps *PrometheusSink + _ = metrics.MetricSink(ps) + var pps *PrometheusPushSink + _ = metrics.MetricSink(pps) +} diff --git a/sink.go b/sink.go index 0b7d6e4..4e6a8df 100644 --- a/sink.go +++ b/sink.go @@ -22,6 +22,9 @@ type MetricSink interface { // Samples are for timing information, where quantiles are used AddSample(key []string, val float32) AddSampleWithLabels(key []string, val float32, labels []Label) + + // Shutdown the sink, flushing data, and performing cleanup as necessary. + Shutdown() } // BlackholeSink is used to just blackhole messages @@ -34,6 +37,7 @@ func (*BlackholeSink) IncrCounter(key []string, val float32) func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {} func (*BlackholeSink) AddSample(key []string, val float32) {} func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {} +func (*BlackholeSink) Shutdown() {} // FanoutSink is used to sink to fanout values to multiple sinks type FanoutSink []MetricSink @@ -74,6 +78,12 @@ func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Lab } } +func (fh FanoutSink) Shutdown() { + for _, s := range fh { + s.Shutdown() + } +} + // sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided // by each sink type type sinkURLFactoryFunc func(*url.URL) (MetricSink, error) diff --git a/sink_test.go b/sink_test.go index 36da370..e10f678 100644 --- a/sink_test.go +++ b/sink_test.go @@ -63,6 +63,7 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label m.vals = append(m.vals, val) m.labels = append(m.labels, labels) } +func (m *MockSink) Shutdown() {} func TestFanoutSink_Gauge(t *testing.T) { m1 := &MockSink{} diff --git a/start.go b/start.go index 6aa0bd3..7747d0b 100644 --- a/start.go +++ b/start.go @@ -144,3 +144,13 @@ func UpdateFilter(allow, block []string) { func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) { globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels) } + +// Shutdown flushes and disables metric collection, blocking while waiting for this to complete. +// WARNING: Not all MetricSink backends support this functionality, and calling this will cause resource leaks. +// This is intended for use immediately prior to application exit. +func Shutdown() { + m := globalMetrics.Load().(*Metrics) + // Replace global metrics with the BlackholeSink like how init setup the library. + globalMetrics.Store(&Metrics{sink: &BlackholeSink{}}) + m.Shutdown() +}