Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Datadog to a plugin #261

Merged
merged 13 commits into from
Oct 10, 2017
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
* Our super spiffy logo, designed by [mercedes](https://github.com/mercedes-stripe), is now included at the top of the README!
* Refactor internals to use a new intermediary metric struct to facilitate new plugins. Thanks [gphat](https://github.com/gphat)!

## Improvements
* Introduced a new `metricSink` which provides a common interface for metric backends. In an upcoming release all plugins will be converted to this interface. Thanks [gphat](https://github.com/gphat)!

## Deprecations
* `veneur-emit` no longer supports the `-config` argument, as it's no longer possible to reliably detect which statsd host/port to connect to. The `-hostport` option now takes a URL of the same form `statsd_listen_addresses` takes to explicitly tell it what address it should send to.

Expand Down
Binary file modified fixtures/aws/PutObject/2016/10/13/1476370612.tsv.gz
Binary file not shown.
Binary file modified fixtures/aws/PutObject/2016/10/14/1476481302.tsv.gz
Binary file not shown.
181 changes: 29 additions & 152 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import (
"net"
"net/http"
"net/url"
"runtime"
"strings"
"sync"
"time"

"github.com/DataDog/datadog-go/statsd"
Expand Down Expand Up @@ -44,8 +41,12 @@ func (s *Server) FlushGlobal(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

go s.flushEventsChecks(span.Attach(ctx)) // we can do all of this separately
go s.flushTraces(span.Attach(ctx)) // this too!
events, checks := s.EventWorker.Flush()
s.Statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0)
s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0)

go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately
go s.flushTraces(span.Attach(ctx)) // this too!

percentiles := s.HistogramPercentiles

Expand All @@ -56,7 +57,7 @@ func (s *Server) FlushGlobal(ctx context.Context) {
ms.totalLength += ms.totalSets
ms.totalLength += ms.totalGlobalCounters

finalMetrics := s.generateDDMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)
finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)

s.reportMetricsFlushCounts(ms)

Expand All @@ -65,7 +66,7 @@ func (s *Server) FlushGlobal(ctx context.Context) {
go func() {
for _, p := range s.getPlugins() {
start := time.Now()
err := p.Flush(finalMetrics, s.Hostname)
err := p.Flush(span.Attach(ctx), finalMetrics)
s.Statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0)
if err != nil {
countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name())
Expand All @@ -75,7 +76,8 @@ func (s *Server) FlushGlobal(ctx context.Context) {
}
}()

s.flushRemote(span.Attach(ctx), finalMetrics)
// TODO Don't hardcode this
s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics)
}

// FlushLocal takes the slices of metrics, combines then and marshals them to json
Expand All @@ -84,16 +86,19 @@ func (s *Server) FlushLocal(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

go s.flushEventsChecks(span.Attach(ctx)) // we can do all of this separately
go s.flushTraces(span.Attach(ctx)) // this too!
events, checks := s.EventWorker.Flush()
s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0)

go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately
go s.flushTraces(span.Attach(ctx))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like we could provide a method to cancel those goroutines somehow, passing in a context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You think it's ok to defer this work until it's no longer a hardcoded s.metricSinks[0]? My next move is to convert plugins into this interface and make this agnostic to backends. That feels like a good time to fix the cancellation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me!


// don't publish percentiles if we're a local veneur; that's the global
// veneur's job
var percentiles []float64

tempMetrics, ms := s.tallyMetrics(percentiles)

finalMetrics := s.generateDDMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)
finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)

s.reportMetricsFlushCounts(ms)

Expand All @@ -103,10 +108,15 @@ func (s *Server) FlushLocal(ctx context.Context) {
// since not everything in tempMetrics is safe for sharing
go s.flushForward(span.Attach(ctx), tempMetrics)

// If there's nothing to flush, don't bother calling the plugins and stuff.
if len(finalMetrics) == 0 {
return
}

go func() {
for _, p := range s.getPlugins() {
start := time.Now()
err := p.Flush(finalMetrics, s.Hostname)
err := p.Flush(span.Attach(ctx), finalMetrics)
s.Statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0)
if err != nil {
countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name())
Expand All @@ -116,7 +126,8 @@ func (s *Server) FlushLocal(ctx context.Context) {
}
}()

s.flushRemote(span.Attach(ctx), finalMetrics)
// TODO Don't harcode this
s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics)
}

type metricsSummary struct {
Expand All @@ -141,7 +152,7 @@ type metricsSummary struct {
// for performance
func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSummary) {
// allocating this long array to count up the sizes is cheaper than appending
// the []DDMetrics together one at a time
// the []WorkerMetrics together one at a time
tempMetrics := make([]WorkerMetrics, 0, len(s.Workers))

gatherStart := time.Now()
Expand Down Expand Up @@ -180,15 +191,15 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu
return tempMetrics, ms
}

// generateDDMetrics calls the Flush method on each
// generateInterMetrics calls the Flush method on each
// counter/gauge/histogram/timer/set in order to
// generate a DDMetric corresponding to that value
func (s *Server) generateDDMetrics(ctx context.Context, percentiles []float64, tempMetrics []WorkerMetrics, ms metricsSummary) []samplers.DDMetric {
// generate an InterMetric corresponding to that value
func (s *Server) generateInterMetrics(ctx context.Context, percentiles []float64, tempMetrics []WorkerMetrics, ms metricsSummary) []samplers.InterMetric {

span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

finalMetrics := make([]samplers.DDMetric, 0, ms.totalLength)
finalMetrics := make([]samplers.InterMetric, 0, ms.totalLength)
for _, wm := range tempMetrics {
for _, c := range wm.counters {
finalMetrics = append(finalMetrics, c.Flush(s.interval)...)
Expand Down Expand Up @@ -237,7 +248,6 @@ func (s *Server) generateDDMetrics(ctx context.Context, percentiles []float64, t
}
}

finalizeMetrics(s.Hostname, s.Tags, finalMetrics)
s.Statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(span.Start).Nanoseconds()), []string{"part:combine"}, 1.0)

return finalMetrics
Expand Down Expand Up @@ -274,83 +284,6 @@ func (s *Server) reportGlobalMetricsFlushCounts(ms metricsSummary) {
s.Statsd.Count("worker.metrics_flushed_total", int64(ms.totalTimers), []string{"metric_type:timer"}, 1.0)
}

// flushRemote breaks up the final metrics into chunks
// (to avoid hitting the size cap) and POSTs them to the remote API
func (s *Server) flushRemote(ctx context.Context, finalMetrics []samplers.DDMetric) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

mem := &runtime.MemStats{}
runtime.ReadMemStats(mem)

s.Statsd.Gauge("mem.heap_alloc_bytes", float64(mem.HeapAlloc), nil, 1.0)
s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0)
s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0)

s.Statsd.Gauge("flush.post_metrics_total", float64(len(finalMetrics)), nil, 1.0)
// Check to see if we have anything to do
if len(finalMetrics) == 0 {
log.Info("Nothing to flush, skipping.")
return
}

// break the metrics into chunks of approximately equal size, such that
// each chunk is less than the limit
// we compute the chunks using rounding-up integer division
workers := ((len(finalMetrics) - 1) / s.FlushMaxPerBody) + 1
chunkSize := ((len(finalMetrics) - 1) / workers) + 1
log.WithField("workers", workers).Debug("Worker count chosen")
log.WithField("chunkSize", chunkSize).Debug("Chunk size chosen")
var wg sync.WaitGroup
flushStart := time.Now()
for i := 0; i < workers; i++ {
chunk := finalMetrics[i*chunkSize:]
if i < workers-1 {
// trim to chunk size unless this is the last one
chunk = chunk[:chunkSize]
}
wg.Add(1)
go s.flushPart(span.Attach(ctx), chunk, &wg)
}
wg.Wait()
s.Statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(flushStart).Nanoseconds()), []string{"part:post"}, 1.0)

log.WithField("metrics", len(finalMetrics)).Info("Completed flush to Datadog")
}

func finalizeMetrics(hostname string, tags []string, finalMetrics []samplers.DDMetric) {
for i := range finalMetrics {
// Let's look for "magic tags" that override metric fields host and device.
for j, tag := range finalMetrics[i].Tags {
// This overrides hostname
if strings.HasPrefix(tag, "host:") {
// delete the tag from the list
finalMetrics[i].Tags = append(finalMetrics[i].Tags[:j], finalMetrics[i].Tags[j+1:]...)
// Override the hostname with the tag, trimming off the prefix
finalMetrics[i].Hostname = tag[5:]
} else if strings.HasPrefix(tag, "device:") {
// Same as above, but device this time
finalMetrics[i].Tags = append(finalMetrics[i].Tags[:j], finalMetrics[i].Tags[j+1:]...)
finalMetrics[i].DeviceName = tag[7:]
}
}
if finalMetrics[i].Hostname == "" {
// No magic tag, set the hostname
finalMetrics[i].Hostname = hostname
}

finalMetrics[i].Tags = append(finalMetrics[i].Tags, tags...)
}
}

// flushPart flushes a set of metrics to the remote API server
func (s *Server) flushPart(ctx context.Context, metricSlice []samplers.DDMetric, wg *sync.WaitGroup) {
defer wg.Done()
postHelper(ctx, s.HTTPClient, s.Statsd, fmt.Sprintf("%s/api/v1/series?api_key=%s", s.DDHostname, s.DDAPIKey), map[string][]samplers.DDMetric{
"series": metricSlice,
}, "flush", true)
}

func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()
Expand Down Expand Up @@ -480,62 +413,6 @@ func (s *Server) flushTraces(ctx context.Context) {
s.SpanWorker.Flush()
}

func (s *Server) flushEventsChecks(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

events, checks := s.EventWorker.Flush()
s.Statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0)
s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0)

// fill in the default hostname for packets that didn't set it
for i := range events {
if events[i].Hostname == "" {
events[i].Hostname = s.Hostname
}
events[i].Tags = append(events[i].Tags, s.Tags...)
}
for i := range checks {
if checks[i].Hostname == "" {
checks[i].Hostname = s.Hostname
}
checks[i].Tags = append(checks[i].Tags, s.Tags...)
}

if len(events) != 0 {
// this endpoint is not documented at all, its existence is only known from
// the official dd-agent
// we don't actually pass all the body keys that dd-agent passes here... but
// it still works
err := postHelper(context.TODO(), s.HTTPClient, s.Statsd, fmt.Sprintf("%s/intake?api_key=%s", s.DDHostname, s.DDAPIKey), map[string]map[string][]samplers.UDPEvent{
"events": {
"api": events,
},
}, "flush_events", true)
if err == nil {
log.WithField("events", len(events)).Info("Completed flushing events to Datadog")
} else {
log.WithFields(logrus.Fields{
"events": len(events),
logrus.ErrorKey: err}).Warn("Error flushing events to Datadog")
}
}

if len(checks) != 0 {
// this endpoint is not documented to take an array... but it does
// another curious constraint of this endpoint is that it does not
// support "Content-Encoding: deflate"
err := postHelper(context.TODO(), s.HTTPClient, s.Statsd, fmt.Sprintf("%s/api/v1/check_run?api_key=%s", s.DDHostname, s.DDAPIKey), checks, "flush_checks", false)
if err == nil {
log.WithField("checks", len(checks)).Info("Completed flushing service checks to Datadog")
} else {
log.WithFields(logrus.Fields{
"checks": len(checks),
logrus.ErrorKey: err}).Warn("Error flushing checks to Datadog")
}
}
}

// shared code for POSTing to an endpoint, that consumes JSON, that is zlib-
// compressed, that returns 202 on success, that has a small response
// action is a string used for statsd metric names and log messages emitted from
Expand Down
Loading