diff --git a/CHANGELOG.md b/CHANGELOG.md index 28dc6ea78..c2680075b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ * Our super spiffy logo, designed by [mercedes](https://github.com/mercedes-stripe), is now included at the top of the README! * Refactor internals to use a new intermediary metric struct to facilitate new plugins. Thanks [gphat](https://github.com/gphat)! +## Improvements +* Introduced a new `metricSink` which provides a common interface for metric backends. In an upcoming release all plugins will be converted to this interface. Thanks [gphat](https://github.com/gphat)! + ## Deprecations * `veneur-emit` no longer supports the `-config` argument, as it's no longer possible to reliably detect which statsd host/port to connect to. The `-hostport` option now takes a URL of the same form `statsd_listen_addresses` takes to explicitly tell it what address it should send to. diff --git a/fixtures/aws/PutObject/2016/10/13/1476370612.tsv.gz b/fixtures/aws/PutObject/2016/10/13/1476370612.tsv.gz index 44d35bcda..f7449eaa6 100644 Binary files a/fixtures/aws/PutObject/2016/10/13/1476370612.tsv.gz and b/fixtures/aws/PutObject/2016/10/13/1476370612.tsv.gz differ diff --git a/fixtures/aws/PutObject/2016/10/14/1476481302.tsv.gz b/fixtures/aws/PutObject/2016/10/14/1476481302.tsv.gz index 16608d1dd..8aed71d41 100644 Binary files a/fixtures/aws/PutObject/2016/10/14/1476481302.tsv.gz and b/fixtures/aws/PutObject/2016/10/14/1476481302.tsv.gz differ diff --git a/flusher.go b/flusher.go index 04d1543d1..42062b673 100644 --- a/flusher.go +++ b/flusher.go @@ -10,9 +10,6 @@ import ( "net" "net/http" "net/url" - "runtime" - "strings" - "sync" "time" "github.com/DataDog/datadog-go/statsd" @@ -44,8 +41,12 @@ func (s *Server) FlushGlobal(ctx context.Context) { span, _ := trace.StartSpanFromContext(ctx, "") defer span.Finish() - go s.flushEventsChecks(span.Attach(ctx)) // we can do all of this separately - go s.flushTraces(span.Attach(ctx)) // this too! + events, checks := s.EventWorker.Flush() + s.Statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0) + s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0) + + go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately + go s.flushTraces(span.Attach(ctx)) // this too! percentiles := s.HistogramPercentiles @@ -56,7 +57,7 @@ func (s *Server) FlushGlobal(ctx context.Context) { ms.totalLength += ms.totalSets ms.totalLength += ms.totalGlobalCounters - finalMetrics := s.generateDDMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) + finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) s.reportMetricsFlushCounts(ms) @@ -65,7 +66,7 @@ func (s *Server) FlushGlobal(ctx context.Context) { go func() { for _, p := range s.getPlugins() { start := time.Now() - err := p.Flush(finalMetrics, s.Hostname) + err := p.Flush(span.Attach(ctx), finalMetrics) s.Statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0) if err != nil { countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name()) @@ -75,7 +76,8 @@ func (s *Server) FlushGlobal(ctx context.Context) { } }() - s.flushRemote(span.Attach(ctx), finalMetrics) + // TODO Don't hardcode this + s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics) } // FlushLocal takes the slices of metrics, combines then and marshals them to json @@ -84,8 +86,11 @@ func (s *Server) FlushLocal(ctx context.Context) { span, _ := trace.StartSpanFromContext(ctx, "") defer span.Finish() - go s.flushEventsChecks(span.Attach(ctx)) // we can do all of this separately - go s.flushTraces(span.Attach(ctx)) // this too! + events, checks := s.EventWorker.Flush() + s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0) + + go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately + go s.flushTraces(span.Attach(ctx)) // don't publish percentiles if we're a local veneur; that's the global // veneur's job @@ -93,7 +98,7 @@ func (s *Server) FlushLocal(ctx context.Context) { tempMetrics, ms := s.tallyMetrics(percentiles) - finalMetrics := s.generateDDMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) + finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) s.reportMetricsFlushCounts(ms) @@ -103,10 +108,15 @@ func (s *Server) FlushLocal(ctx context.Context) { // since not everything in tempMetrics is safe for sharing go s.flushForward(span.Attach(ctx), tempMetrics) + // If there's nothing to flush, don't bother calling the plugins and stuff. + if len(finalMetrics) == 0 { + return + } + go func() { for _, p := range s.getPlugins() { start := time.Now() - err := p.Flush(finalMetrics, s.Hostname) + err := p.Flush(span.Attach(ctx), finalMetrics) s.Statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0) if err != nil { countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name()) @@ -116,7 +126,8 @@ func (s *Server) FlushLocal(ctx context.Context) { } }() - s.flushRemote(span.Attach(ctx), finalMetrics) + // TODO Don't harcode this + s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics) } type metricsSummary struct { @@ -141,7 +152,7 @@ type metricsSummary struct { // for performance func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSummary) { // allocating this long array to count up the sizes is cheaper than appending - // the []DDMetrics together one at a time + // the []WorkerMetrics together one at a time tempMetrics := make([]WorkerMetrics, 0, len(s.Workers)) gatherStart := time.Now() @@ -180,15 +191,15 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu return tempMetrics, ms } -// generateDDMetrics calls the Flush method on each +// generateInterMetrics calls the Flush method on each // counter/gauge/histogram/timer/set in order to -// generate a DDMetric corresponding to that value -func (s *Server) generateDDMetrics(ctx context.Context, percentiles []float64, tempMetrics []WorkerMetrics, ms metricsSummary) []samplers.DDMetric { +// generate an InterMetric corresponding to that value +func (s *Server) generateInterMetrics(ctx context.Context, percentiles []float64, tempMetrics []WorkerMetrics, ms metricsSummary) []samplers.InterMetric { span, _ := trace.StartSpanFromContext(ctx, "") defer span.Finish() - finalMetrics := make([]samplers.DDMetric, 0, ms.totalLength) + finalMetrics := make([]samplers.InterMetric, 0, ms.totalLength) for _, wm := range tempMetrics { for _, c := range wm.counters { finalMetrics = append(finalMetrics, c.Flush(s.interval)...) @@ -237,7 +248,6 @@ func (s *Server) generateDDMetrics(ctx context.Context, percentiles []float64, t } } - finalizeMetrics(s.Hostname, s.Tags, finalMetrics) s.Statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(span.Start).Nanoseconds()), []string{"part:combine"}, 1.0) return finalMetrics @@ -274,83 +284,6 @@ func (s *Server) reportGlobalMetricsFlushCounts(ms metricsSummary) { s.Statsd.Count("worker.metrics_flushed_total", int64(ms.totalTimers), []string{"metric_type:timer"}, 1.0) } -// flushRemote breaks up the final metrics into chunks -// (to avoid hitting the size cap) and POSTs them to the remote API -func (s *Server) flushRemote(ctx context.Context, finalMetrics []samplers.DDMetric) { - span, _ := trace.StartSpanFromContext(ctx, "") - defer span.Finish() - - mem := &runtime.MemStats{} - runtime.ReadMemStats(mem) - - s.Statsd.Gauge("mem.heap_alloc_bytes", float64(mem.HeapAlloc), nil, 1.0) - s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0) - s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0) - - s.Statsd.Gauge("flush.post_metrics_total", float64(len(finalMetrics)), nil, 1.0) - // Check to see if we have anything to do - if len(finalMetrics) == 0 { - log.Info("Nothing to flush, skipping.") - return - } - - // break the metrics into chunks of approximately equal size, such that - // each chunk is less than the limit - // we compute the chunks using rounding-up integer division - workers := ((len(finalMetrics) - 1) / s.FlushMaxPerBody) + 1 - chunkSize := ((len(finalMetrics) - 1) / workers) + 1 - log.WithField("workers", workers).Debug("Worker count chosen") - log.WithField("chunkSize", chunkSize).Debug("Chunk size chosen") - var wg sync.WaitGroup - flushStart := time.Now() - for i := 0; i < workers; i++ { - chunk := finalMetrics[i*chunkSize:] - if i < workers-1 { - // trim to chunk size unless this is the last one - chunk = chunk[:chunkSize] - } - wg.Add(1) - go s.flushPart(span.Attach(ctx), chunk, &wg) - } - wg.Wait() - s.Statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(flushStart).Nanoseconds()), []string{"part:post"}, 1.0) - - log.WithField("metrics", len(finalMetrics)).Info("Completed flush to Datadog") -} - -func finalizeMetrics(hostname string, tags []string, finalMetrics []samplers.DDMetric) { - for i := range finalMetrics { - // Let's look for "magic tags" that override metric fields host and device. - for j, tag := range finalMetrics[i].Tags { - // This overrides hostname - if strings.HasPrefix(tag, "host:") { - // delete the tag from the list - finalMetrics[i].Tags = append(finalMetrics[i].Tags[:j], finalMetrics[i].Tags[j+1:]...) - // Override the hostname with the tag, trimming off the prefix - finalMetrics[i].Hostname = tag[5:] - } else if strings.HasPrefix(tag, "device:") { - // Same as above, but device this time - finalMetrics[i].Tags = append(finalMetrics[i].Tags[:j], finalMetrics[i].Tags[j+1:]...) - finalMetrics[i].DeviceName = tag[7:] - } - } - if finalMetrics[i].Hostname == "" { - // No magic tag, set the hostname - finalMetrics[i].Hostname = hostname - } - - finalMetrics[i].Tags = append(finalMetrics[i].Tags, tags...) - } -} - -// flushPart flushes a set of metrics to the remote API server -func (s *Server) flushPart(ctx context.Context, metricSlice []samplers.DDMetric, wg *sync.WaitGroup) { - defer wg.Done() - postHelper(ctx, s.HTTPClient, s.Statsd, fmt.Sprintf("%s/api/v1/series?api_key=%s", s.DDHostname, s.DDAPIKey), map[string][]samplers.DDMetric{ - "series": metricSlice, - }, "flush", true) -} - func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) { span, _ := trace.StartSpanFromContext(ctx, "") defer span.Finish() @@ -480,62 +413,6 @@ func (s *Server) flushTraces(ctx context.Context) { s.SpanWorker.Flush() } -func (s *Server) flushEventsChecks(ctx context.Context) { - span, _ := trace.StartSpanFromContext(ctx, "") - defer span.Finish() - - events, checks := s.EventWorker.Flush() - s.Statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0) - s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0) - - // fill in the default hostname for packets that didn't set it - for i := range events { - if events[i].Hostname == "" { - events[i].Hostname = s.Hostname - } - events[i].Tags = append(events[i].Tags, s.Tags...) - } - for i := range checks { - if checks[i].Hostname == "" { - checks[i].Hostname = s.Hostname - } - checks[i].Tags = append(checks[i].Tags, s.Tags...) - } - - if len(events) != 0 { - // this endpoint is not documented at all, its existence is only known from - // the official dd-agent - // we don't actually pass all the body keys that dd-agent passes here... but - // it still works - err := postHelper(context.TODO(), s.HTTPClient, s.Statsd, fmt.Sprintf("%s/intake?api_key=%s", s.DDHostname, s.DDAPIKey), map[string]map[string][]samplers.UDPEvent{ - "events": { - "api": events, - }, - }, "flush_events", true) - if err == nil { - log.WithField("events", len(events)).Info("Completed flushing events to Datadog") - } else { - log.WithFields(logrus.Fields{ - "events": len(events), - logrus.ErrorKey: err}).Warn("Error flushing events to Datadog") - } - } - - if len(checks) != 0 { - // this endpoint is not documented to take an array... but it does - // another curious constraint of this endpoint is that it does not - // support "Content-Encoding: deflate" - err := postHelper(context.TODO(), s.HTTPClient, s.Statsd, fmt.Sprintf("%s/api/v1/check_run?api_key=%s", s.DDHostname, s.DDAPIKey), checks, "flush_checks", false) - if err == nil { - log.WithField("checks", len(checks)).Info("Completed flushing service checks to Datadog") - } else { - log.WithFields(logrus.Fields{ - "checks": len(checks), - logrus.ErrorKey: err}).Warn("Error flushing checks to Datadog") - } - } -} - // shared code for POSTing to an endpoint, that consumes JSON, that is zlib- // compressed, that returns 202 on success, that has a small response // action is a string used for statsd metric names and log messages emitted from diff --git a/flusher_test.go b/flusher_test.go index 582d4adc8..0f63f0b22 100644 --- a/flusher_test.go +++ b/flusher_test.go @@ -16,48 +16,83 @@ import ( "github.com/stripe/veneur/samplers" ) +func TestDatadogRate(t *testing.T) { + ddSink := datadogMetricSink{ + hostname: "somehostname", + tags: []string{"a:b", "c:d"}, + interval: 10, + } + + metrics := []samplers.InterMetric{{ + Name: "foo.bar.baz", + Timestamp: time.Now().Unix(), + Value: float64(10), + Tags: []string{"gorch:frobble", "x:e"}, + Type: samplers.CounterMetric, + }} + ddMetrics := ddSink.finalizeMetrics(metrics) + assert.Equal(t, "rate", ddMetrics[0].MetricType, "Metric type should be rate") + assert.Equal(t, float64(1.0), ddMetrics[0].Value[0][1], "Metric rate wasnt computed correctly") +} + func TestServerTags(t *testing.T) { - metrics := []samplers.DDMetric{{ - Name: "foo.bar.baz", - Value: [1][2]float64{{float64(time.Now().Unix()), float64(1.0)}}, - Tags: []string{"gorch:frobble", "x:e"}, - MetricType: "rate", - Interval: 10, + ddSink := datadogMetricSink{ + hostname: "somehostname", + tags: []string{"a:b", "c:d"}, + interval: 10, + } + + metrics := []samplers.InterMetric{{ + Name: "foo.bar.baz", + Timestamp: time.Now().Unix(), + Value: float64(10), + Tags: []string{"gorch:frobble", "x:e"}, + Type: samplers.CounterMetric, }} - finalizeMetrics("somehostname", []string{"a:b", "c:d"}, metrics) - assert.Equal(t, "somehostname", metrics[0].Hostname, "Metric hostname uses argument") - assert.Contains(t, metrics[0].Tags, "a:b", "Tags should contain server tags") + ddMetrics := ddSink.finalizeMetrics(metrics) + assert.Equal(t, "somehostname", ddMetrics[0].Hostname, "Metric hostname uses argument") + assert.Contains(t, ddMetrics[0].Tags, "a:b", "Tags should contain server tags") } func TestHostMagicTag(t *testing.T) { - metrics := []samplers.DDMetric{{ - Name: "foo.bar.baz", - Value: [1][2]float64{{float64(time.Now().Unix()), float64(1.0)}}, - Tags: []string{"gorch:frobble", "host:abc123", "x:e"}, - MetricType: "rate", - Interval: 10, + ddSink := datadogMetricSink{ + hostname: "badhostname", + tags: []string{"a:b", "c:d"}, + } + + metrics := []samplers.InterMetric{{ + Name: "foo.bar.baz", + Timestamp: time.Now().Unix(), + Value: float64(10), + Tags: []string{"gorch:frobble", "host:abc123", "x:e"}, + Type: samplers.CounterMetric, }} - finalizeMetrics("badhostname", []string{"a:b", "c:d"}, metrics) - assert.Equal(t, "abc123", metrics[0].Hostname, "Metric hostname should be from tag") - assert.NotContains(t, metrics[0].Tags, "host:abc123", "Host tag should be removed") - assert.Contains(t, metrics[0].Tags, "x:e", "Last tag is still around") + ddMetrics := ddSink.finalizeMetrics(metrics) + assert.Equal(t, "abc123", ddMetrics[0].Hostname, "Metric hostname should be from tag") + assert.NotContains(t, ddMetrics[0].Tags, "host:abc123", "Host tag should be removed") + assert.Contains(t, ddMetrics[0].Tags, "x:e", "Last tag is still around") } func TestDeviceMagicTag(t *testing.T) { - metrics := []samplers.DDMetric{{ - Name: "foo.bar.baz", - Value: [1][2]float64{{float64(time.Now().Unix()), float64(1.0)}}, - Tags: []string{"gorch:frobble", "device:abc123", "x:e"}, - MetricType: "rate", - Interval: 10, + ddSink := datadogMetricSink{ + hostname: "badhostname", + tags: []string{"a:b", "c:d"}, + } + + metrics := []samplers.InterMetric{{ + Name: "foo.bar.baz", + Timestamp: time.Now().Unix(), + Value: float64(10), + Tags: []string{"gorch:frobble", "device:abc123", "x:e"}, + Type: samplers.CounterMetric, }} - finalizeMetrics("badhostname", []string{"a:b", "c:d"}, metrics) - assert.Equal(t, "abc123", metrics[0].DeviceName, "Metric devicename should be from tag") - assert.NotContains(t, metrics[0].Tags, "device:abc123", "Host tag should be removed") - assert.Contains(t, metrics[0].Tags, "x:e", "Last tag is still around") + ddMetrics := ddSink.finalizeMetrics(metrics) + assert.Equal(t, "abc123", ddMetrics[0].DeviceName, "Metric devicename should be from tag") + assert.NotContains(t, ddMetrics[0].Tags, "device:abc123", "Host tag should be removed") + assert.Contains(t, ddMetrics[0].Tags, "x:e", "Last tag is still around") } func TestFlushTracesDatadog(t *testing.T) { @@ -135,8 +170,6 @@ func testFlushTraceDatadog(t *testing.T, protobuf, jsn io.Reader) { server.spanSinks = append(server.spanSinks, ddSink) - assert.Equal(t, server.DDTraceAddress, config.DatadogTraceAPIAddress) - packet, err := ioutil.ReadAll(protobuf) assert.NoError(t, err) @@ -169,8 +202,6 @@ func testFlushTraceLightstep(t *testing.T, protobuf, jsn io.Reader) { lsSink, err := NewLightStepSpanSink(&config, server.Statsd, server.TagsAsMap) server.spanSinks = append(server.spanSinks, lsSink) - assert.Equal(t, server.DDTraceAddress, config.DatadogTraceAPIAddress) - packet, err := ioutil.ReadAll(protobuf) assert.NoError(t, err) diff --git a/metric_sink.go b/metric_sink.go index c0b43427e..b32d899b3 100644 --- a/metric_sink.go +++ b/metric_sink.go @@ -32,6 +32,18 @@ type datadogMetricSink struct { interval float64 } +// DDMetric is a data structure that represents the JSON that Datadog +// wants when posting to the API +type DDMetric struct { + Name string `json:"metric"` + Value [1][2]float64 `json:"points"` + Tags []string `json:"tags,omitempty"` + MetricType string `json:"type"` + Hostname string `json:"host,omitempty"` + DeviceName string `json:"device_name,omitempty"` + Interval int32 `json:"interval,omitempty"` +} + // NewDatadogMetricSink creates a new Datadog sink for trace spans. func NewDatadogMetricSink(config *Config, interval float64, httpClient *http.Client, stats *statsd.Client) (*datadogMetricSink, error) { return &datadogMetricSink{ @@ -103,7 +115,7 @@ func (dd *datadogMetricSink) FlushEventsChecks(ctx context.Context, events []sam // the official dd-agent // we don't actually pass all the body keys that dd-agent passes here... but // it still works - err := postHelper(context.TODO(), dd.HTTPClient, dd.statsd, fmt.Sprintf("%s/intake?api_key=%s"), map[string]map[string][]samplers.UDPEvent{ + err := postHelper(context.TODO(), dd.HTTPClient, dd.statsd, fmt.Sprintf("%s/intake?api_key=%s", dd.ddHostname, dd.apiKey), map[string]map[string][]samplers.UDPEvent{ "events": { "api": events, }, @@ -121,7 +133,7 @@ func (dd *datadogMetricSink) FlushEventsChecks(ctx context.Context, events []sam // this endpoint is not documented to take an array... but it does // another curious constraint of this endpoint is that it does not // support "Content-Encoding: deflate" - err := postHelper(context.TODO(), dd.HTTPClient, dd.statsd, fmt.Sprintf("%s/api/v1/check_run?api_key=%s"), checks, "flush_checks", false) + err := postHelper(context.TODO(), dd.HTTPClient, dd.statsd, fmt.Sprintf("%s/api/v1/check_run?api_key=%s", dd.ddHostname, dd.apiKey), checks, "flush_checks", false) if err == nil { log.WithField("checks", len(checks)).Info("Completed flushing service checks to Datadog") } else { @@ -132,20 +144,29 @@ func (dd *datadogMetricSink) FlushEventsChecks(ctx context.Context, events []sam } } -func (dd *datadogMetricSink) finalizeMetrics(metrics []samplers.InterMetric) []samplers.DDMetric { - ddMetrics := make([]samplers.DDMetric, len(metrics)) +func (dd *datadogMetricSink) finalizeMetrics(metrics []samplers.InterMetric) []DDMetric { + ddMetrics := make([]DDMetric, len(metrics)) for i, m := range metrics { // Defensively copy tags since we're gonna mutate it tags := make([]string, len(dd.tags)) copy(tags, dd.tags) - metricType := m.Type.String() + + metricType := "" value := m.Value - // We convert Datadog counters into rates - if metricType == "counter" { + + switch m.Type { + case samplers.CounterMetric: + // We convert counters into rates for Datadog metricType = "rate" value = m.Value / dd.interval + case samplers.GaugeMetric: + metricType = "gauge" + default: + log.WithField("metric_type", m.Type).Warn("Encountered an unknown metric type") + continue } - ddMetric := samplers.DDMetric{ + + ddMetric := DDMetric{ Name: m.Name, Value: [1][2]float64{ [2]float64{ @@ -181,9 +202,9 @@ func (dd *datadogMetricSink) finalizeMetrics(metrics []samplers.InterMetric) []s return ddMetrics } -func (dd *datadogMetricSink) flushPart(ctx context.Context, metricSlice []samplers.DDMetric, wg *sync.WaitGroup) { +func (dd *datadogMetricSink) flushPart(ctx context.Context, metricSlice []DDMetric, wg *sync.WaitGroup) { defer wg.Done() - postHelper(ctx, dd.HTTPClient, dd.statsd, fmt.Sprintf("%s/api/v1/series?api_key=%s", dd.ddHostname, dd.apiKey), map[string][]samplers.DDMetric{ + postHelper(ctx, dd.HTTPClient, dd.statsd, fmt.Sprintf("%s/api/v1/series?api_key=%s", dd.ddHostname, dd.apiKey), map[string][]DDMetric{ "series": metricSlice, }, "flush", true) } diff --git a/plugin_test.go b/plugin_test.go index fedca768d..cafb0b5c4 100644 --- a/plugin_test.go +++ b/plugin_test.go @@ -2,6 +2,7 @@ package veneur import ( "compress/gzip" + "context" "encoding/csv" "io" "os" @@ -22,11 +23,11 @@ import ( type dummyPlugin struct { logger *logrus.Logger statsd *statsd.Client - flush func([]samplers.DDMetric, string) error + flush func(context.Context, []samplers.InterMetric) error } -func (dp *dummyPlugin) Flush(metrics []samplers.DDMetric, hostname string) error { - return dp.flush(metrics, hostname) +func (dp *dummyPlugin) Flush(ctx context.Context, metrics []samplers.InterMetric) error { + return dp.flush(ctx, metrics) } func (dp *dummyPlugin) Name() string { @@ -56,13 +57,11 @@ func TestGlobalServerPluginFlush(t *testing.T) { dp := &dummyPlugin{logger: log, statsd: f.server.Statsd} - dp.flush = func(metrics []samplers.DDMetric, hostname string) error { + dp.flush = func(ctx context.Context, metrics []samplers.InterMetric) error { assert.Equal(t, len(expectedMetrics), len(metrics)) firstName := metrics[0].Name - assert.Equal(t, expectedMetrics[firstName], metrics[0].Value[0][1]) - - assert.Equal(t, hostname, f.server.Hostname) + assert.Equal(t, expectedMetrics[firstName], metrics[0].Value) RemoteResponseChan <- struct{}{} return nil @@ -138,14 +137,14 @@ func TestGlobalServerS3PluginFlush(t *testing.T) { assert.Equal(t, len(expectedRecords), len(records)) - assertCSVFieldsMatch(t, expectedRecords, records, []int{0, 1, 2, 3, 4, 5, 6}) + assertCSVFieldsMatch(t, expectedRecords, records, []int{0, 1, 2, 3, 4, 6}) //assert.Equal(t, expectedRecords, records) RemoteResponseChan <- struct{}{} return &s3.PutObjectOutput{ETag: aws.String("912ec803b2ce49e4a541068d495ab570")}, nil }) - s3p := &s3p.S3Plugin{Logger: log, Svc: client} + s3p := &s3p.S3Plugin{Logger: log, Svc: client, Interval: 10} f.server.registerPlugin(s3p) diff --git a/plugins/influxdb/influxdb.go b/plugins/influxdb/influxdb.go index 9177797d7..8e4020a9d 100644 --- a/plugins/influxdb/influxdb.go +++ b/plugins/influxdb/influxdb.go @@ -2,6 +2,7 @@ package influxdb import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -58,7 +59,7 @@ func NewInfluxDBPlugin(logger *logrus.Logger, addr string, consistency string, d } // Flush sends a slice of metrics to InfluxDB -func (p *InfluxDBPlugin) Flush(metrics []samplers.DDMetric, hostname string) error { +func (p *InfluxDBPlugin) Flush(ctx context.Context, metrics []samplers.InterMetric) error { p.Statsd.Gauge("flush.post_metrics_total", float64(len(metrics)), nil, 1.0) // Check to see if we have anything to do if len(metrics) == 0 { @@ -74,7 +75,7 @@ func (p *InfluxDBPlugin) Flush(metrics []samplers.DDMetric, hostname string) err // rather than name value pairs, we have to do this ugly conversion cleanTags := colons.ReplaceAllLiteralString(tags, "=") buff.WriteString( - fmt.Sprintf("%s,%s value=%f %d\n", metric.Name, cleanTags, metric.Value[0][1], int64(metric.Value[0][0])), + fmt.Sprintf("%s,%s value=%f %d\n", metric.Name, cleanTags, metric.Value, metric.Timestamp), ) } diff --git a/plugins/localfile/localfile.go b/plugins/localfile/localfile.go index 2a113a6ae..e746e50b8 100644 --- a/plugins/localfile/localfile.go +++ b/plugins/localfile/localfile.go @@ -2,6 +2,7 @@ package localfile import ( "compress/gzip" + "context" "encoding/csv" "fmt" "io" @@ -20,31 +21,33 @@ var _ plugins.Plugin = &Plugin{} type Plugin struct { FilePath string Logger *logrus.Logger + hostname string + interval int } // Delimiter defines what kind of delimiter we'll use in the CSV format -- in this case, we want TSV const Delimiter = '\t' // Flush the metrics from the LocalFilePlugin -func (p *Plugin) Flush(metrics []samplers.DDMetric, hostname string) error { +func (p *Plugin) Flush(ctx context.Context, metrics []samplers.InterMetric) error { f, err := os.OpenFile(p.FilePath, os.O_RDWR|os.O_APPEND|os.O_CREATE, os.ModePerm) defer f.Close() if err != nil { return fmt.Errorf("couldn't open %s for appending: %s", p.FilePath, err) } - appendToWriter(f, metrics, hostname) + appendToWriter(f, metrics, p.hostname, p.interval) return nil } -func appendToWriter(appender io.Writer, metrics []samplers.DDMetric, hostname string) error { +func appendToWriter(appender io.Writer, metrics []samplers.InterMetric, hostname string, interval int) error { gzW := gzip.NewWriter(appender) csvW := csv.NewWriter(gzW) csvW.Comma = Delimiter partitionDate := time.Now() for _, metric := range metrics { - s3.EncodeDDMetricCSV(metric, csvW, &partitionDate, hostname) + s3.EncodeInterMetricCSV(metric, csvW, &partitionDate, hostname, interval) } csvW.Flush() gzW.Close() diff --git a/plugins/localfile/localfile_test.go b/plugins/localfile/localfile_test.go index bc1ef09c1..cf9b9687e 100644 --- a/plugins/localfile/localfile_test.go +++ b/plugins/localfile/localfile_test.go @@ -2,6 +2,7 @@ package localfile import ( "bytes" + "context" "fmt" "testing" @@ -24,27 +25,20 @@ func TestName(t *testing.T) { func TestAppendToWriter(t *testing.T) { b := &bytes.Buffer{} - metrics := []samplers.DDMetric{ - samplers.DDMetric{ - Name: "a.b.c.max", - Value: [1][2]float64{ - [2]float64{ - 1476119058, - 100, - }, - }, + metrics := []samplers.InterMetric{ + samplers.InterMetric{ + Name: "a.b.c.max", + Timestamp: 1476119058, + Value: float64(100), Tags: []string{ "foo:bar", "baz:quz", }, - MetricType: "gauge", - Hostname: "globalstats", - DeviceName: "food", - Interval: 0, + Type: samplers.GaugeMetric, }, } - err := appendToWriter(b, metrics, metrics[0].Hostname) + err := appendToWriter(b, metrics, "globblestoots", 10) assert.NoError(t, err) assert.NotEqual(t, b.Len(), 0) } @@ -52,49 +46,43 @@ func TestAppendToWriter(t *testing.T) { func TestHandlesErrorsInAppendToWriter(t *testing.T) { b := &badWriter{} - err := appendToWriter(b, []samplers.DDMetric{ - samplers.DDMetric{ - Name: "sketchy.metric", - Value: [1][2]float64{[2]float64{1476119058, 100}}, - Tags: []string{"skepticism:high"}, - MetricType: "gauge", - Hostname: "globblestoots", - DeviceName: "¬_¬", - Interval: -1, + err := appendToWriter(b, []samplers.InterMetric{ + samplers.InterMetric{ + Name: "sketchy.metric", + Timestamp: 1476119058, + Value: float64(100), + Tags: []string{"skepticism:high"}, + Type: samplers.GaugeMetric, }, - }, "globblestoots") + }, "globblestoots", 10) assert.Error(t, err) } func TestWritesToDevNull(t *testing.T) { - plugin := Plugin{FilePath: "/dev/null", Logger: logrus.New()} - err := plugin.Flush([]samplers.DDMetric{ - samplers.DDMetric{ - Name: "sketchy.metric", - Value: [1][2]float64{[2]float64{1476119058, 100}}, - Tags: []string{"skepticism:high"}, - MetricType: "gauge", - Hostname: "globblestoots", - DeviceName: "¬_¬", - Interval: -1, + plugin := Plugin{FilePath: "/dev/null", Logger: logrus.New(), hostname: "globblestoots"} + err := plugin.Flush(context.TODO(), []samplers.InterMetric{ + samplers.InterMetric{ + Name: "sketchy.metric", + Timestamp: 1476119058, + Value: float64(100), + Tags: []string{"skepticism:high"}, + Type: samplers.GaugeMetric, }, - }, "globblestoots") + }) assert.NoError(t, err) } func TestFailsWritingToInvalidPath(t *testing.T) { - plugin := Plugin{FilePath: "", Logger: logrus.New()} - err := plugin.Flush([]samplers.DDMetric{ - samplers.DDMetric{ - Name: "sketchy.metric", - Value: [1][2]float64{[2]float64{1476119058, 100}}, - Tags: []string{"skepticism:high"}, - MetricType: "gauge", - Hostname: "globblestoots", - DeviceName: "¬_¬", - Interval: -1, + plugin := Plugin{FilePath: "", Logger: logrus.New(), hostname: "globblestoots"} + err := plugin.Flush(context.TODO(), []samplers.InterMetric{ + samplers.InterMetric{ + Name: "sketchy.metric", + Timestamp: 1476119058, + Value: float64(100), + Tags: []string{"skepticism:high"}, + Type: samplers.GaugeMetric, }, - }, "globblestoots") + }) assert.Error(t, err) } diff --git a/plugins/plugins.go b/plugins/plugins.go index c461dcbbe..59567939f 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -1,8 +1,12 @@ package plugins -import "github.com/stripe/veneur/samplers" +import ( + "context" -// A plugin flushes the metrics provided to an arbitrary destination. + "github.com/stripe/veneur/samplers" +) + +// Plugin flushes the metrics provided to an arbitrary destination. // The metrics slice may be shared between plugins, so the plugin may not // write to it or modify any of its components. // The name should be a short, lowercase, snake-cased identifier for the plugin. @@ -10,6 +14,6 @@ import "github.com/stripe/veneur/samplers" // the number of errors encountered are automatically reported by veneur, using // the plugin name. type Plugin interface { - Flush(metrics []samplers.DDMetric, hostname string) error + Flush(ctx context.Context, metrics []samplers.InterMetric) error Name() string } diff --git a/plugins/s3/csv.go b/plugins/s3/csv.go index 388364a73..d9b39ace3 100644 --- a/plugins/s3/csv.go +++ b/plugins/s3/csv.go @@ -2,6 +2,7 @@ package s3 import ( "encoding/csv" + "errors" "fmt" "strconv" "strings" @@ -22,13 +23,9 @@ const ( TsvTags TsvMetricType - // The hostName attached to the metric - TsvHostname - // The hostName of the server flushing the data TsvVeneurHostname - TsvDeviceName TsvInterval TsvTimestamp @@ -44,8 +41,6 @@ var tsvSchema = [...]string{ TsvName: "Name", TsvTags: "Tags", TsvMetricType: "MetricType", - TsvHostname: "Hostname", - TsvDeviceName: "DeviceName", TsvInterval: "Interval", TsvVeneurHostname: "VeneurHostname", TsvTimestamp: "Timestamp", @@ -53,34 +48,40 @@ var tsvSchema = [...]string{ TsvPartition: "Partition", } -// EncodeDDMetricCSV generates a newline-terminated CSV row that describes -// the data represented by the DDMetric. +// EncodeInterMetricCSV generates a newline-terminated CSV row that describes +// the data represented by the InterMetric. // The caller is responsible for setting w.Comma as the appropriate delimiter. // For performance, encodeCSV does not flush after every call; the caller is // expected to flush at the end of the operation cycle -func EncodeDDMetricCSV(d samplers.DDMetric, w *csv.Writer, partitionDate *time.Time, hostName string) error { - - timestamp := d.Value[0][0] - value := strconv.FormatFloat(d.Value[0][1], 'f', -1, 64) - interval := strconv.Itoa(int(d.Interval)) - +func EncodeInterMetricCSV(d samplers.InterMetric, w *csv.Writer, partitionDate *time.Time, hostName string, interval int) error { // TODO(aditya) some better error handling for this // to guarantee that the result is proper JSON tags := "{" + strings.Join(d.Tags, ",") + "}" + metricType := "" + metricValue := d.Value + switch d.Type { + case samplers.CounterMetric: + metricValue = d.Value / float64(interval) + metricType = "rate" + + case samplers.GaugeMetric: + metricType = "gauge" + default: + return errors.New(fmt.Sprintf("Encountered an unknown metric type %s", d.Type.String())) + } + fields := [...]string{ // the order here doesn't actually matter // as long as the keys are right TsvName: d.Name, TsvTags: tags, - TsvMetricType: d.MetricType, - TsvHostname: d.Hostname, - TsvDeviceName: d.DeviceName, - TsvInterval: interval, + TsvMetricType: metricType, + TsvInterval: strconv.Itoa(interval), TsvVeneurHostname: hostName, - TsvValue: value, + TsvValue: strconv.FormatFloat(metricValue, 'f', -1, 64), - TsvTimestamp: time.Unix(int64(timestamp), 0).UTC().Format(RedshiftDateFormat), + TsvTimestamp: time.Unix(d.Timestamp, 0).UTC().Format(RedshiftDateFormat), // TODO avoid edge case at midnight TsvPartition: partitionDate.UTC().Format(PartitionDateFormat), diff --git a/plugins/s3/csv_test.go b/plugins/s3/csv_test.go index cfb463ea1..ca05ffa91 100644 --- a/plugins/s3/csv_test.go +++ b/plugins/s3/csv_test.go @@ -15,9 +15,9 @@ import ( ) type CSVTestCase struct { - Name string - DDMetric samplers.DDMetric - Row io.Reader + Name string + InterMetric samplers.InterMetric + Row io.Reader } func CSVTestCases() []CSVTestCase { @@ -27,52 +27,43 @@ func CSVTestCases() []CSVTestCase { return []CSVTestCase{ { Name: "BasicDDMetric", - DDMetric: samplers.DDMetric{ - Name: "a.b.c.max", - Value: [1][2]float64{[2]float64{1476119058, - 100}}, + InterMetric: samplers.InterMetric{ + Name: "a.b.c.max", + Timestamp: 1476119058, + Value: float64(100), Tags: []string{"foo:bar", "baz:quz"}, - MetricType: "gauge", - Hostname: "globalstats", - DeviceName: "food", - Interval: 0, + Type: samplers.GaugeMetric, }, - Row: strings.NewReader(fmt.Sprintf("a.b.c.max\t{foo:bar,baz:quz}\tgauge\tglobalstats\ttestbox-c3eac9\tfood\t0\t2016-10-10 05:04:18\t100\t%s\n", partition)), + Row: strings.NewReader(fmt.Sprintf("a.b.c.max\t{foo:bar,baz:quz}\tgauge\ttestbox-c3eac9\t10\t2016-10-10 05:04:18\t100\t%s\n", partition)), }, { // Test that we are able to handle a missing field (DeviceName) Name: "MissingDeviceName", - DDMetric: samplers.DDMetric{ - Name: "a.b.c.max", - Value: [1][2]float64{[2]float64{1476119058, - 100}}, + InterMetric: samplers.InterMetric{ + Name: "a.b.c.max", + Timestamp: 1476119058, + Value: float64(100), Tags: []string{"foo:bar", "baz:quz"}, - MetricType: "rate", - Hostname: "localhost", - DeviceName: "", - Interval: 10, + Type: samplers.CounterMetric, }, - Row: strings.NewReader(fmt.Sprintf("a.b.c.max\t{foo:bar,baz:quz}\trate\tlocalhost\ttestbox-c3eac9\t\t10\t2016-10-10 05:04:18\t100\t%s\n", partition)), + Row: strings.NewReader(fmt.Sprintf("a.b.c.max\t{foo:bar,baz:quz}\trate\ttestbox-c3eac9\t10\t2016-10-10 05:04:18\t10\t%s\n", partition)), }, { // Test that we are able to handle tags which have tab characters in them // by quoting the entire field // (tags shouldn't do this, but we should handle them properly anyway) Name: "TabTag", - DDMetric: samplers.DDMetric{ - Name: "a.b.c.max", - Value: [1][2]float64{[2]float64{1476119058, - 100}}, + InterMetric: samplers.InterMetric{ + Name: "a.b.c.count", + Timestamp: 1476119058, + Value: float64(100), Tags: []string{"foo:b\tar", "baz:quz"}, - MetricType: "rate", - Hostname: "localhost", - DeviceName: "eniac", - Interval: 10, + Type: samplers.CounterMetric, }, - Row: strings.NewReader(fmt.Sprintf("a.b.c.max\t\"{foo:b\tar,baz:quz}\"\trate\tlocalhost\ttestbox-c3eac9\teniac\t10\t2016-10-10 05:04:18\t100\t%s\n", partition)), + Row: strings.NewReader(fmt.Sprintf("a.b.c.count\t\"{foo:b\tar,baz:quz}\"\trate\ttestbox-c3eac9\t10\t2016-10-10 05:04:18\t10\t%s\n", partition)), }, } } @@ -89,7 +80,7 @@ func TestEncodeCSV(t *testing.T) { w.Comma = '\t' tm := time.Now() - err := EncodeDDMetricCSV(tc.DDMetric, w, &tm, "testbox-c3eac9") + err := EncodeInterMetricCSV(tc.InterMetric, w, &tm, "testbox-c3eac9", 10) assert.NoError(t, err) // We need to flush or there won't actually be any data there diff --git a/plugins/s3/s3.go b/plugins/s3/s3.go index 0e0491a38..af1693b20 100644 --- a/plugins/s3/s3.go +++ b/plugins/s3/s3.go @@ -3,6 +3,7 @@ package s3 import ( "bytes" "compress/gzip" + "context" "encoding/csv" "errors" "io" @@ -28,13 +29,14 @@ type S3Plugin struct { Svc s3iface.S3API S3Bucket string Hostname string + Interval int } -func (p *S3Plugin) Flush(metrics []samplers.DDMetric, hostname string) error { +func (p *S3Plugin) Flush(ctx context.Context, metrics []samplers.InterMetric) error { const Delimiter = '\t' const IncludeHeaders = false - csv, err := EncodeDDMetricsCSV(metrics, Delimiter, IncludeHeaders, p.Hostname) + csv, err := EncodeInterMetricsCSV(metrics, Delimiter, IncludeHeaders, p.Hostname, p.Interval) if err != nil { p.Logger.WithFields(logrus.Fields{ logrus.ErrorKey: err, @@ -43,7 +45,7 @@ func (p *S3Plugin) Flush(metrics []samplers.DDMetric, hostname string) error { return err } - err = p.S3Post(hostname, csv, tsvGzFt) + err = p.S3Post(p.Hostname, csv, tsvGzFt) if err != nil { p.Logger.WithFields(logrus.Fields{ logrus.ErrorKey: err, @@ -93,10 +95,10 @@ func S3Path(hostname string, ft filetype) *string { return aws.String(path.Join(t.Format("2006/01/02"), hostname, filename)) } -// EncodeDDMetricsCSV returns a reader containing the gzipped CSV representation of the -// DDMetrics data, one row per DDMetric. +// EncodeInterMetricsCSV returns a reader containing the gzipped CSV representation of the +// InterMetric data, one row per InterMetric. // the AWS sdk requires seekable input, so we return a ReadSeeker here -func EncodeDDMetricsCSV(metrics []samplers.DDMetric, delimiter rune, includeHeaders bool, hostname string) (io.ReadSeeker, error) { +func EncodeInterMetricsCSV(metrics []samplers.InterMetric, delimiter rune, includeHeaders bool, hostname string, interval int) (io.ReadSeeker, error) { b := &bytes.Buffer{} gzw := gzip.NewWriter(b) w := csv.NewWriter(gzw) @@ -110,8 +112,6 @@ func EncodeDDMetricsCSV(metrics []samplers.DDMetric, delimiter rune, includeHead TsvName: TsvName.String(), TsvTags: TsvTags.String(), TsvMetricType: TsvMetricType.String(), - TsvHostname: TsvHostname.String(), - TsvDeviceName: TsvDeviceName.String(), TsvInterval: TsvInterval.String(), TsvVeneurHostname: TsvVeneurHostname.String(), TsvValue: TsvValue.String(), @@ -125,7 +125,7 @@ func EncodeDDMetricsCSV(metrics []samplers.DDMetric, delimiter rune, includeHead // TODO avoid edge case at midnight partitionDate := time.Now() for _, metric := range metrics { - EncodeDDMetricCSV(metric, w, &partitionDate, hostname) + EncodeInterMetricCSV(metric, w, &partitionDate, hostname, interval) } w.Flush() diff --git a/plugins/s3/s3_test.go b/plugins/s3/s3_test.go index c14af35c4..02eab4b16 100644 --- a/plugins/s3/s3_test.go +++ b/plugins/s3/s3_test.go @@ -134,23 +134,23 @@ func TestS3PostNoCredentials(t *testing.T) { } func TestEncodeDDMetricsCSV(t *testing.T) { - const ExpectedHeader = "Name\tTags\tMetricType\tHostname\tVeneurHostname\tDeviceName\tInterval\tTimestamp\tValue\tPartition" + const ExpectedHeader = "Name\tTags\tMetricType\tVeneurHostname\tInterval\tTimestamp\tValue\tPartition" const Delimiter = '\t' const VeneurHostname = "testbox-c3eac9" testCases := CSVTestCases() - metrics := make([]samplers.DDMetric, len(testCases)) + metrics := make([]samplers.InterMetric, len(testCases)) for i, tc := range testCases { - metrics[i] = tc.DDMetric + metrics[i] = tc.InterMetric } - c, err := EncodeDDMetricsCSV(metrics, Delimiter, true, VeneurHostname) + c, err := EncodeInterMetricsCSV(metrics, Delimiter, true, VeneurHostname, 10) assert.NoError(t, err) gzr, err := gzip.NewReader(c) assert.NoError(t, err) r := csv.NewReader(gzr) - r.FieldsPerRecord = 10 + r.FieldsPerRecord = 8 r.Comma = Delimiter // first line should always contain header information diff --git a/samplers/metrictype_string.go b/samplers/metrictype_string.go index dff126e95..f45d9914f 100644 --- a/samplers/metrictype_string.go +++ b/samplers/metrictype_string.go @@ -4,9 +4,9 @@ package samplers import "fmt" -const _MetricType_name = "CounterMetricGaugeMetricHistogramMetricRateMetricSetMetricTimerMetric" +const _MetricType_name = "CounterMetricGaugeMetric" -var _MetricType_index = [...]uint8{0, 13, 24, 39, 49, 58, 69} +var _MetricType_index = [...]uint8{0, 13, 24} func (i MetricType) String() string { if i < 0 || i >= MetricType(len(_MetricType_index)-1) { diff --git a/samplers/samplers.go b/samplers/samplers.go index 019256938..18dece7c2 100644 --- a/samplers/samplers.go +++ b/samplers/samplers.go @@ -21,14 +21,6 @@ const ( CounterMetric MetricType = iota // GaugeMetric is a gauge GaugeMetric - // HistogramMetric is a histogram - HistogramMetric - // RateMetric is a rate - RateMetric - // SetMetric is a set - SetMetric - // TimerMetric is a timer - TimerMetric ) // InterMetric represents a metric that has been completed and is ready for @@ -41,41 +33,18 @@ type InterMetric struct { Type MetricType } -// DDMetric is a data structure that represents the JSON that Datadog -// wants when posting to the API -type DDMetric struct { - Name string `json:"metric"` - Value [1][2]float64 `json:"points"` - Tags []string `json:"tags,omitempty"` - MetricType string `json:"type"` - Hostname string `json:"host,omitempty"` - DeviceName string `json:"device_name,omitempty"` - Interval int32 `json:"interval,omitempty"` -} - -// Aggregate is one of the supported aggregation formulas. type Aggregate int const ( - // AggregateMin is a minimum aggregation AggregateMin Aggregate = 1 << iota - // AggregateMax is a maximum aggregation AggregateMax - // AggregateMedian is a median aggregation AggregateMedian - // AggregateAverage is an average aggregation AggregateAverage - // AggregateCount is a count aggregation, aka how many measurements were - // received. AggregateCount - // AggregateSum is a sum aggregation, aka the sum of all measurements AggregateSum - // AggregateHarmonicMean is a harmonic mean aggregation AggregateHarmonicMean ) -// AggregatesLookup is a map to Aggregate table for converting a config value -// to the actual Aggregate type var AggregatesLookup = map[string]Aggregate{ "min": AggregateMin, "max": AggregateMax, @@ -86,9 +55,6 @@ var AggregatesLookup = map[string]Aggregate{ "hmean": AggregateHarmonicMean, } -// HistogramAggregates is the aggregates that need to be exported, ANDed -// together and a count summing the number of "on" aggregations contained in -// Value. type HistogramAggregates struct { Value Aggregate Count int @@ -126,16 +92,16 @@ func (c *Counter) Sample(sample float64, sampleRate float32) { c.value += int64(sample) * int64(1/sampleRate) } -// Flush generates a DDMetric from the current state of this Counter. -func (c *Counter) Flush(interval time.Duration) []DDMetric { +// Flush generates an InterMetric from the current state of this Counter. +func (c *Counter) Flush(interval time.Duration) []InterMetric { tags := make([]string, len(c.Tags)) copy(tags, c.Tags) - return []DDMetric{{ - Name: c.Name, - Value: [1][2]float64{{float64(time.Now().Unix()), float64(c.value) / interval.Seconds()}}, - Tags: tags, - MetricType: "rate", - Interval: int32(interval.Seconds()), + return []InterMetric{{ + Name: c.Name, + Timestamp: time.Now().Unix(), + Value: float64(c.value), + Tags: tags, + Type: CounterMetric, }} } @@ -191,15 +157,16 @@ func (g *Gauge) Sample(sample float64, sampleRate float32) { g.value = sample } -// Flush generates a DDMetric from the current state of this gauge. -func (g *Gauge) Flush() []DDMetric { +// Flush generates an InterMetric from the current state of this gauge. +func (g *Gauge) Flush() []InterMetric { tags := make([]string, len(g.Tags)) copy(tags, g.Tags) - return []DDMetric{{ - Name: g.Name, - Value: [1][2]float64{{float64(time.Now().Unix()), float64(g.value)}}, - Tags: tags, - MetricType: "gauge", + return []InterMetric{{ + Name: g.Name, + Timestamp: time.Now().Unix(), + Value: float64(g.value), + Tags: tags, + Type: GaugeMetric, }} } @@ -233,15 +200,16 @@ func NewSet(Name string, Tags []string) *Set { } } -// Flush generates a DDMetric for the state of this Set. -func (s *Set) Flush() []DDMetric { +// Flush generates an InterMetric for the state of this Set. +func (s *Set) Flush() []InterMetric { tags := make([]string, len(s.Tags)) copy(tags, s.Tags) - return []DDMetric{{ - Name: s.Name, - Value: [1][2]float64{{float64(time.Now().Unix()), float64(s.Hll.Estimate())}}, - Tags: tags, - MetricType: "gauge", + return []InterMetric{{ + Name: s.Name, + Timestamp: time.Now().Unix(), + Value: float64(s.Hll.Estimate()), + Tags: tags, + Type: GaugeMetric, }} } @@ -321,47 +289,46 @@ func NewHist(Name string, Tags []string) *Histo { } } -// Flush generates DDMetrics for the current state of the Histo. percentiles +// Flush generates InterMetrics for the current state of the Histo. percentiles // indicates what percentiles should be exported from the histogram. -func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates HistogramAggregates) []DDMetric { - now := float64(time.Now().Unix()) - // we only want to flush the number of samples we received locally, since - // any other samples have already been flushed by a local veneur instance - // before this was forwarded to us - rate := h.LocalWeight / interval.Seconds() - metrics := make([]DDMetric, 0, aggregates.Count+len(percentiles)) +func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates HistogramAggregates) []InterMetric { + now := time.Now().Unix() + metrics := make([]InterMetric, 0, aggregates.Count+len(percentiles)) if (aggregates.Value&AggregateMax) == AggregateMax && !math.IsInf(h.LocalMax, 0) { - // Defensively recopy tags to avoid aliasing bugs in case multiple DDMetrics share the same + // Defensively recopy tags to avoid aliasing bugs in case multiple InterMetrics share the same // tag array in the future tags := make([]string, len(h.Tags)) copy(tags, h.Tags) - metrics = append(metrics, DDMetric{ - Name: fmt.Sprintf("%s.max", h.Name), - Value: [1][2]float64{{now, h.LocalMax}}, - Tags: tags, - MetricType: "gauge", + metrics = append(metrics, InterMetric{ + Name: fmt.Sprintf("%s.max", h.Name), + Timestamp: now, + Value: float64(h.LocalMax), + Tags: tags, + Type: GaugeMetric, }) } if (aggregates.Value&AggregateMin) == AggregateMin && !math.IsInf(h.LocalMin, 0) { tags := make([]string, len(h.Tags)) copy(tags, h.Tags) - metrics = append(metrics, DDMetric{ - Name: fmt.Sprintf("%s.min", h.Name), - Value: [1][2]float64{{now, h.LocalMin}}, - Tags: tags, - MetricType: "gauge", + metrics = append(metrics, InterMetric{ + Name: fmt.Sprintf("%s.min", h.Name), + Timestamp: now, + Value: float64(h.LocalMin), + Tags: tags, + Type: GaugeMetric, }) } if (aggregates.Value&AggregateSum) == AggregateSum && h.LocalSum != 0 { tags := make([]string, len(h.Tags)) copy(tags, h.Tags) - metrics = append(metrics, DDMetric{ - Name: fmt.Sprintf("%s.sum", h.Name), - Value: [1][2]float64{{now, h.LocalSum}}, - Tags: tags, - MetricType: "gauge", + metrics = append(metrics, InterMetric{ + Name: fmt.Sprintf("%s.sum", h.Name), + Timestamp: now, + Value: float64(h.LocalSum), + Tags: tags, + Type: GaugeMetric, }) } @@ -370,26 +337,27 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates // to submit an average tags := make([]string, len(h.Tags)) copy(tags, h.Tags) - metrics = append(metrics, DDMetric{ - Name: fmt.Sprintf("%s.avg", h.Name), - Value: [1][2]float64{{now, h.LocalSum / h.LocalWeight}}, - Tags: tags, - MetricType: "gauge", + metrics = append(metrics, InterMetric{ + Name: fmt.Sprintf("%s.avg", h.Name), + Timestamp: now, + Value: float64(h.LocalSum / h.LocalWeight), + Tags: tags, + Type: GaugeMetric, }) } - if (aggregates.Value&AggregateCount) == AggregateCount && rate != 0 { + if (aggregates.Value&AggregateCount) == AggregateCount && h.LocalWeight != 0 { // if we haven't received any local samples, then leave this sparse, // otherwise it can lead to some misleading zeroes in between the // flushes of downstream instances tags := make([]string, len(h.Tags)) copy(tags, h.Tags) - metrics = append(metrics, DDMetric{ - Name: fmt.Sprintf("%s.count", h.Name), - Value: [1][2]float64{{now, rate}}, - Tags: tags, - MetricType: "rate", - Interval: int32(interval.Seconds()), + metrics = append(metrics, InterMetric{ + Name: fmt.Sprintf("%s.count", h.Name), + Timestamp: now, + Value: float64(h.LocalWeight), + Tags: tags, + Type: CounterMetric, }) } @@ -398,11 +366,12 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates copy(tags, h.Tags) metrics = append( metrics, - DDMetric{ - Name: fmt.Sprintf("%s.median", h.Name), - Value: [1][2]float64{{now, h.Value.Quantile(0.5)}}, - Tags: tags, - MetricType: "gauge", + InterMetric{ + Name: fmt.Sprintf("%s.median", h.Name), + Timestamp: now, + Value: float64(h.Value.Quantile(0.5)), + Tags: tags, + Type: GaugeMetric, }, ) } @@ -412,11 +381,12 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates // to submit an average tags := make([]string, len(h.Tags)) copy(tags, h.Tags) - metrics = append(metrics, DDMetric{ - Name: fmt.Sprintf("%s.hmean", h.Name), - Value: [1][2]float64{{now, h.LocalWeight / h.LocalReciprocalSum}}, - Tags: tags, - MetricType: "gauge", + metrics = append(metrics, InterMetric{ + Name: fmt.Sprintf("%s.hmean", h.Name), + Timestamp: now, + Value: float64(h.LocalWeight / h.LocalReciprocalSum), + Tags: tags, + Type: GaugeMetric, }) } @@ -426,11 +396,12 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates metrics = append( metrics, // TODO Fix to allow for p999, etc - DDMetric{ - Name: fmt.Sprintf("%s.%dpercentile", h.Name, int(p*100)), - Value: [1][2]float64{{now, h.Value.Quantile(p)}}, - Tags: tags, - MetricType: "gauge", + InterMetric{ + Name: fmt.Sprintf("%s.%dpercentile", h.Name, int(p*100)), + Timestamp: now, + Value: float64(h.Value.Quantile(p)), + Tags: tags, + Type: GaugeMetric, }, ) } diff --git a/samplers/samplers_test.go b/samplers/samplers_test.go index 3b56ebd25..5ad27a2cc 100644 --- a/samplers/samplers_test.go +++ b/samplers/samplers_test.go @@ -22,12 +22,10 @@ func TestCounterEmpty(t *testing.T) { assert.Len(t, metrics, 1, "Flushes 1 metric") m1 := metrics[0] - assert.Equal(t, int32(10), m1.Interval, "Interval") - assert.Equal(t, "rate", m1.MetricType, "Type") + assert.Equal(t, CounterMetric, m1.Type, "Type") assert.Len(t, c.Tags, 1, "Tag length") assert.Equal(t, c.Tags[0], "a:b", "Tag contents") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, 0.1, m1.Value[0][1], "Metric value") + assert.Equal(t, float64(1), m1.Value, "Metric value") } func TestCounterRate(t *testing.T) { @@ -35,9 +33,8 @@ func TestCounterRate(t *testing.T) { c.Sample(5, 1.0) - // The counter returns an array with a single tuple of timestamp,value metrics := c.Flush(10 * time.Second) - assert.Equal(t, 0.5, metrics[0].Value[0][1], "Metric value") + assert.Equal(t, float64(5), metrics[0].Value, "Metric value") } func TestCounterSampleRate(t *testing.T) { @@ -45,9 +42,8 @@ func TestCounterSampleRate(t *testing.T) { c.Sample(5, 0.5) - // The counter returns an array with a single tuple of timestamp,value metrics := c.Flush(10 * time.Second) - assert.Equal(t, float64(1), metrics[0].Value[0][1], "Metric value") + assert.Equal(t, float64(10), metrics[0].Value, "Metric value") } func TestCounterMerge(t *testing.T) { @@ -68,12 +64,12 @@ func TestCounterMerge(t *testing.T) { assert.NoError(t, cGlobal.Combine(jm.Value), "should have combined counters successfully") metrics := cGlobal.Flush(10 * time.Second) - assert.Equal(t, float64(1), metrics[0].Value[0][1]) + assert.Equal(t, float64(10), metrics[0].Value) assert.NoError(t, cGlobal.Combine(jm2.Value), "should have combined counters successfully") metrics = cGlobal.Flush(10 * time.Second) - assert.Equal(t, float64(3.8), metrics[0].Value[0][1]) + assert.Equal(t, float64(38), metrics[0].Value) } func TestGauge(t *testing.T) { @@ -89,15 +85,12 @@ func TestGauge(t *testing.T) { assert.Len(t, metrics, 1, "Flushed metric count") m1 := metrics[0] - // Interval is not meaningful for this - assert.Equal(t, int32(0), m1.Interval, "Interval") - assert.Equal(t, "gauge", m1.MetricType, "Type") + assert.Equal(t, GaugeMetric, m1.Type, "Type") tags := m1.Tags assert.Len(t, tags, 1, "Tag length") assert.Equal(t, tags[0], "a:b", "Tag contents") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, float64(5), m1.Value[0][1], "Value") + assert.Equal(t, float64(5), m1.Value, "Value") } func TestSet(t *testing.T) { @@ -120,12 +113,10 @@ func TestSet(t *testing.T) { assert.Len(t, metrics, 1, "Flush") m1 := metrics[0] - // Interval is not meaningful for this - assert.Equal(t, int32(0), m1.Interval, "Interval") - assert.Equal(t, "gauge", m1.MetricType, "Type") + assert.Equal(t, GaugeMetric, m1.Type, "Type") assert.Len(t, m1.Tags, 1, "Tag count") assert.Equal(t, "a:b", m1.Tags[0], "First tag") - assert.Equal(t, float64(4), m1.Value[0][1], "Value") + assert.Equal(t, float64(4), m1.Value, "Value") } func TestSetMerge(t *testing.T) { @@ -180,83 +171,67 @@ func TestHisto(t *testing.T) { // the max m2 := metrics[0] assert.Equal(t, "a.b.c.max", m2.Name, "Name") - assert.Equal(t, int32(0), m2.Interval, "Interval") - assert.Equal(t, "gauge", m2.MetricType, "Type") + assert.Equal(t, GaugeMetric, m2.Type, "Type") assert.Len(t, m2.Tags, 1, "Tag count") assert.Equal(t, "a:b", m2.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, float64(25), m2.Value[0][1], "Value") + assert.Equal(t, float64(25), m2.Value, "Value") // the min m3 := metrics[1] assert.Equal(t, "a.b.c.min", m3.Name, "Name") - assert.Equal(t, int32(0), m3.Interval, "Interval") - assert.Equal(t, "gauge", m3.MetricType, "Type") + assert.Equal(t, GaugeMetric, m3.Type, "Type") assert.Len(t, m3.Tags, 1, "Tag count") assert.Equal(t, "a:b", m3.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, float64(5), m3.Value[0][1], "Value") + assert.Equal(t, float64(5), m3.Value, "Value") // the sum m4 := metrics[2] assert.Equal(t, "a.b.c.sum", m4.Name, "Name") - assert.Equal(t, int32(0), m4.Interval, "Interval") - assert.Equal(t, "gauge", m4.MetricType, "Type") + assert.Equal(t, GaugeMetric, m4.Type, "Type") assert.Len(t, m4.Tags, 1, "Tag count") assert.Equal(t, "a:b", m4.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, float64(75), m4.Value[0][1], "Value") + assert.Equal(t, float64(75), m4.Value, "Value") // the average m5 := metrics[3] assert.Equal(t, "a.b.c.avg", m5.Name, "Name") - assert.Equal(t, int32(0), m5.Interval, "Interval") - assert.Equal(t, "gauge", m5.MetricType, "Type") + assert.Equal(t, GaugeMetric, m5.Type, "Type") assert.Len(t, m5.Tags, 1, "Tag count") assert.Equal(t, "a:b", m5.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, float64(15), m5.Value[0][1], "Value") + assert.Equal(t, float64(15), m5.Value, "Value") // the count m1 := metrics[4] assert.Equal(t, "a.b.c.count", m1.Name, "Name") - assert.Equal(t, int32(10), m1.Interval, "Interval") - assert.Equal(t, "rate", m1.MetricType, "Type") + assert.Equal(t, CounterMetric, m1.Type, "Type") assert.Len(t, m1.Tags, 1, "Tag count") assert.Equal(t, "a:b", m1.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, float64(0.5), m1.Value[0][1], "Value") + assert.Equal(t, float64(5), m1.Value, "Value") // the median m6 := metrics[5] assert.Equal(t, "a.b.c.median", m6.Name, "Name") - assert.Equal(t, int32(0), m6.Interval, "Interval") - assert.Equal(t, "gauge", m6.MetricType, "Type") + assert.Equal(t, GaugeMetric, m6.Type, "Type") assert.Len(t, m6.Tags, 1, "Tag count") assert.Equal(t, "a:b", m6.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, float64(15), m6.Value[0][1], "Value") + assert.Equal(t, float64(15), m6.Value, "Value") // the average m8 := metrics[6] assert.Equal(t, "a.b.c.hmean", m8.Name, "Name") - assert.Equal(t, int32(0), m8.Interval, "Interval") - assert.Equal(t, "gauge", m8.MetricType, "Type") + assert.Equal(t, GaugeMetric, m8.Type, "Type") assert.Len(t, m8.Tags, 1, "Tag count") assert.Equal(t, "a:b", m8.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value expected := float64(5.0 / ((1.0 / 5) + (1.0 / 10) + (1.0 / 15) + (1.0 / 20) + (1.0 / 25))) - assert.Equal(t, expected, m8.Value[0][1], "Value") + assert.Equal(t, expected, m8.Value, "Value") // And the percentile m7 := metrics[7] assert.Equal(t, "a.b.c.90percentile", m7.Name, "Name") - assert.Equal(t, int32(0), m7.Interval, "Interval") - assert.Equal(t, "gauge", m7.MetricType, "Type") + assert.Equal(t, GaugeMetric, m7.Type, "Type") assert.Len(t, m7.Tags, 1, "Tag count") assert.Equal(t, "a:b", m7.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, float64(23.75), m7.Value[0][1], "Value") + assert.Equal(t, float64(23.75), m7.Value, "Value") } func TestHistoAvgOnly(t *testing.T) { @@ -287,12 +262,10 @@ func TestHistoAvgOnly(t *testing.T) { // the average m5 := metrics[0] assert.Equal(t, "a.b.c.avg", m5.Name, "Name") - assert.Equal(t, int32(0), m5.Interval, "Interval") - assert.Equal(t, "gauge", m5.MetricType, "Type") + assert.Equal(t, GaugeMetric, m5.Type, "Type") assert.Len(t, m5.Tags, 1, "Tag count") assert.Equal(t, "a:b", m5.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value - assert.Equal(t, float64(15), m5.Value[0][1], "Value") + assert.Equal(t, float64(15), m5.Value, "Value") } func TestHistoHMeanOnly(t *testing.T) { @@ -323,13 +296,11 @@ func TestHistoHMeanOnly(t *testing.T) { // the average m5 := metrics[0] assert.Equal(t, "a.b.c.hmean", m5.Name, "Name") - assert.Equal(t, int32(0), m5.Interval, "Interval") - assert.Equal(t, "gauge", m5.MetricType, "Type") + assert.Equal(t, GaugeMetric, m5.Type, "Type") assert.Len(t, m5.Tags, 1, "Tag count") assert.Equal(t, "a:b", m5.Tags[0], "First tag") - // The counter returns an array with a single tuple of timestamp,value expected := float64(5.0 / ((1.0 / 5) + (1.0 / 10) + (1.0 / 15) + (1.0 / 20) + (1.0 / 25))) - assert.Equal(t, expected, m5.Value[0][1], "Value") + assert.Equal(t, expected, m5.Value, "Value") } func TestHistoSampleRate(t *testing.T) { @@ -355,11 +326,11 @@ func TestHistoSampleRate(t *testing.T) { // First the max m1 := metrics[0] assert.Equal(t, "a.b.c.max", m1.Name, "Max name") - assert.Equal(t, float64(25), m1.Value[0][1], "Sampled max as rate") + assert.Equal(t, float64(25), m1.Value, "Sampled max as rate") count := metrics[2] assert.Equal(t, "a.b.c.count", count.Name, "count name") - assert.Equal(t, float64(1), count.Value[0][1], "count value") + assert.Equal(t, float64(10), count.Value, "count value") } func TestHistoMerge(t *testing.T) { diff --git a/server.go b/server.go index c02472b0c..a925acff1 100644 --- a/server.go +++ b/server.go @@ -73,10 +73,7 @@ type Server struct { Tags []string TagsAsMap map[string]string - DDHostname string - DDAPIKey string - DDTraceAddress string - HTTPClient *http.Client + HTTPClient *http.Client HTTPAddr string @@ -98,7 +95,6 @@ type Server struct { shutdown chan struct{} HistogramPercentiles []float64 - FlushMaxPerBody int plugins []plugins.Plugin pluginMtx sync.Mutex @@ -107,7 +103,8 @@ type Server struct { HistogramAggregates samplers.HistogramAggregates - spanSinks []spanSink + spanSinks []spanSink + metricSinks []metricSink traceLightstepAccessToken string } @@ -128,9 +125,6 @@ func NewFromConfig(conf Config) (ret Server, err error) { } ret.TagsAsMap = mappedTags - ret.DDHostname = conf.DatadogAPIHostname - ret.DDAPIKey = conf.DatadogAPIKey - ret.DDTraceAddress = conf.DatadogTraceAPIAddress ret.traceLightstepAccessToken = conf.TraceLightstepAccessToken ret.HistogramPercentiles = conf.Percentiles if len(conf.Aggregates) == 0 { @@ -153,7 +147,6 @@ func NewFromConfig(conf Config) (ret Server, err error) { Timeout: ret.interval * 9 / 10, // we're fine with using the default transport and redirect behavior } - ret.FlushMaxPerBody = conf.FlushMaxPerBody ret.Statsd, err = statsd.NewBuffered(conf.StatsAddress, 1024) if err != nil { @@ -274,9 +267,14 @@ func NewFromConfig(conf Config) (ret Server, err error) { conf.TLSKey = REDACTED log.WithField("config", conf).Debug("Initialized server") - // Configure tracing sinks - if ret.tracingSinkEnabled() && len(conf.SsfListenAddresses) > 0 { + ddSink, err := NewDatadogMetricSink(&conf, ret.interval.Seconds(), ret.HTTPClient, ret.Statsd) + if err != nil { + return + } + ret.metricSinks = append(ret.metricSinks, ddSink) + // Configure tracing sinks + if len(conf.SsfListenAddresses) > 0 && (conf.DatadogTraceAPIAddress != "" || conf.TraceLightstepAccessToken != "") { // Set a sane default if conf.SsfBufferSize == 0 { conf.SsfBufferSize = defaultSpanBufferSize @@ -284,8 +282,8 @@ func NewFromConfig(conf Config) (ret Server, err error) { trace.Enable() - // configure Datadog as sink - if ret.DDTraceAddress != "" { + // configure Datadog as a Span sink + if conf.DatadogTraceAPIAddress != "" { ddSink, err := NewDatadogSpanSink(&conf, ret.Statsd, ret.HTTPClient, ret.TagsAsMap) if err != nil { @@ -296,7 +294,7 @@ func NewFromConfig(conf Config) (ret Server, err error) { log.Info("Configured Datadog trace sink") } - // configure Lightstep as Sink + // configure Lightstep as a Span Sink if ret.traceLightstepAccessToken != "" { var lsSink spanSink @@ -851,9 +849,3 @@ func (s *Server) TracingEnabled() bool { //TODO we now need to check that the backends are flushing the data too return s.SpanWorker != nil } - -// tracingSinkEnabled returns true if at least one -// tracing sink has been enabled -func (s *Server) tracingSinkEnabled() bool { - return s.DDTraceAddress != "" || s.traceLightstepAccessToken != "" -} diff --git a/server_test.go b/server_test.go index fb36a7d88..58e20369e 100644 --- a/server_test.go +++ b/server_test.go @@ -176,7 +176,7 @@ func setupVeneurServer(t *testing.T, config Config, transport http.RoundTripper) // for sending metrics data to Datadog // Eventually we'll want to define this symmetrically. type DDMetricsRequest struct { - Series []samplers.DDMetric + Series []DDMetric } // fixture sets up a mock Datadog API server and Veneur @@ -852,31 +852,6 @@ func TestHandleTCPGoroutineTimeout(t *testing.T) { } } -func TestNewFromServerConfigRenamedVariables(t *testing.T) { - // test the variables that have been renamed - config := Config{ - DatadogAPIKey: "apikey", - DatadogAPIHostname: "http://api", - DatadogTraceAPIAddress: "http://trace", - SsfListenAddresses: []string{"udp://127.0.0.1:99"}, - - // required or NewFromConfig fails - Interval: "10s", - StatsAddress: "localhost:62251", - } - s, err := NewFromConfig(config) - if err != nil { - t.Fatal(err) - } - - assert.Equal(t, "apikey", s.DDAPIKey) - assert.Equal(t, "http://api", s.DDHostname) - assert.Equal(t, "http://trace", s.DDTraceAddress) - addr := s.SSFListenAddrs[0].(*net.UDPAddr) - assert.True(t, addr.IP.IsLoopback(), "TraceAddr should be loopback") - assert.Equal(t, 99, addr.Port) -} - // This is necessary until we can import // github.com/sirupsen/logrus/test - it's currently failing due to dep // insisting on pulling the repo in with its capitalized name.