Skip to content

Commit

Permalink
Merge pull request #261 from stripe/cory-more-dd-plugification
Browse files Browse the repository at this point in the history
Move Datadog to a plugin
  • Loading branch information
cory-stripe authored Oct 10, 2017
2 parents 3a2e442 + 41c6c5f commit 44fff07
Show file tree
Hide file tree
Showing 20 changed files with 369 additions and 541 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
* Our super spiffy logo, designed by [mercedes](https://github.com/mercedes-stripe), is now included at the top of the README!
* Refactor internals to use a new intermediary metric struct to facilitate new plugins. Thanks [gphat](https://github.com/gphat)!

## Improvements
* Introduced a new `metricSink` which provides a common interface for metric backends. In an upcoming release all plugins will be converted to this interface. Thanks [gphat](https://github.com/gphat)!

## Deprecations
* `veneur-emit` no longer supports the `-config` argument, as it's no longer possible to reliably detect which statsd host/port to connect to. The `-hostport` option now takes a URL of the same form `statsd_listen_addresses` takes to explicitly tell it what address it should send to.

Expand Down
Binary file modified fixtures/aws/PutObject/2016/10/13/1476370612.tsv.gz
Binary file not shown.
Binary file modified fixtures/aws/PutObject/2016/10/14/1476481302.tsv.gz
Binary file not shown.
181 changes: 29 additions & 152 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import (
"net"
"net/http"
"net/url"
"runtime"
"strings"
"sync"
"time"

"github.com/DataDog/datadog-go/statsd"
Expand Down Expand Up @@ -44,8 +41,12 @@ func (s *Server) FlushGlobal(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

go s.flushEventsChecks(span.Attach(ctx)) // we can do all of this separately
go s.flushTraces(span.Attach(ctx)) // this too!
events, checks := s.EventWorker.Flush()
s.Statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0)
s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0)

go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately
go s.flushTraces(span.Attach(ctx)) // this too!

percentiles := s.HistogramPercentiles

Expand All @@ -56,7 +57,7 @@ func (s *Server) FlushGlobal(ctx context.Context) {
ms.totalLength += ms.totalSets
ms.totalLength += ms.totalGlobalCounters

finalMetrics := s.generateDDMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)
finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)

s.reportMetricsFlushCounts(ms)

Expand All @@ -65,7 +66,7 @@ func (s *Server) FlushGlobal(ctx context.Context) {
go func() {
for _, p := range s.getPlugins() {
start := time.Now()
err := p.Flush(finalMetrics, s.Hostname)
err := p.Flush(span.Attach(ctx), finalMetrics)
s.Statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0)
if err != nil {
countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name())
Expand All @@ -75,7 +76,8 @@ func (s *Server) FlushGlobal(ctx context.Context) {
}
}()

s.flushRemote(span.Attach(ctx), finalMetrics)
// TODO Don't hardcode this
s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics)
}

// FlushLocal takes the slices of metrics, combines then and marshals them to json
Expand All @@ -84,16 +86,19 @@ func (s *Server) FlushLocal(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

go s.flushEventsChecks(span.Attach(ctx)) // we can do all of this separately
go s.flushTraces(span.Attach(ctx)) // this too!
events, checks := s.EventWorker.Flush()
s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0)

go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately
go s.flushTraces(span.Attach(ctx))

// don't publish percentiles if we're a local veneur; that's the global
// veneur's job
var percentiles []float64

tempMetrics, ms := s.tallyMetrics(percentiles)

finalMetrics := s.generateDDMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)
finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)

s.reportMetricsFlushCounts(ms)

Expand All @@ -103,10 +108,15 @@ func (s *Server) FlushLocal(ctx context.Context) {
// since not everything in tempMetrics is safe for sharing
go s.flushForward(span.Attach(ctx), tempMetrics)

// If there's nothing to flush, don't bother calling the plugins and stuff.
if len(finalMetrics) == 0 {
return
}

go func() {
for _, p := range s.getPlugins() {
start := time.Now()
err := p.Flush(finalMetrics, s.Hostname)
err := p.Flush(span.Attach(ctx), finalMetrics)
s.Statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0)
if err != nil {
countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name())
Expand All @@ -116,7 +126,8 @@ func (s *Server) FlushLocal(ctx context.Context) {
}
}()

s.flushRemote(span.Attach(ctx), finalMetrics)
// TODO Don't harcode this
s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics)
}

type metricsSummary struct {
Expand All @@ -141,7 +152,7 @@ type metricsSummary struct {
// for performance
func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSummary) {
// allocating this long array to count up the sizes is cheaper than appending
// the []DDMetrics together one at a time
// the []WorkerMetrics together one at a time
tempMetrics := make([]WorkerMetrics, 0, len(s.Workers))

gatherStart := time.Now()
Expand Down Expand Up @@ -180,15 +191,15 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu
return tempMetrics, ms
}

// generateDDMetrics calls the Flush method on each
// generateInterMetrics calls the Flush method on each
// counter/gauge/histogram/timer/set in order to
// generate a DDMetric corresponding to that value
func (s *Server) generateDDMetrics(ctx context.Context, percentiles []float64, tempMetrics []WorkerMetrics, ms metricsSummary) []samplers.DDMetric {
// generate an InterMetric corresponding to that value
func (s *Server) generateInterMetrics(ctx context.Context, percentiles []float64, tempMetrics []WorkerMetrics, ms metricsSummary) []samplers.InterMetric {

span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

finalMetrics := make([]samplers.DDMetric, 0, ms.totalLength)
finalMetrics := make([]samplers.InterMetric, 0, ms.totalLength)
for _, wm := range tempMetrics {
for _, c := range wm.counters {
finalMetrics = append(finalMetrics, c.Flush(s.interval)...)
Expand Down Expand Up @@ -237,7 +248,6 @@ func (s *Server) generateDDMetrics(ctx context.Context, percentiles []float64, t
}
}

finalizeMetrics(s.Hostname, s.Tags, finalMetrics)
s.Statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(span.Start).Nanoseconds()), []string{"part:combine"}, 1.0)

return finalMetrics
Expand Down Expand Up @@ -274,83 +284,6 @@ func (s *Server) reportGlobalMetricsFlushCounts(ms metricsSummary) {
s.Statsd.Count("worker.metrics_flushed_total", int64(ms.totalTimers), []string{"metric_type:timer"}, 1.0)
}

// flushRemote breaks up the final metrics into chunks
// (to avoid hitting the size cap) and POSTs them to the remote API
func (s *Server) flushRemote(ctx context.Context, finalMetrics []samplers.DDMetric) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

mem := &runtime.MemStats{}
runtime.ReadMemStats(mem)

s.Statsd.Gauge("mem.heap_alloc_bytes", float64(mem.HeapAlloc), nil, 1.0)
s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0)
s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0)

s.Statsd.Gauge("flush.post_metrics_total", float64(len(finalMetrics)), nil, 1.0)
// Check to see if we have anything to do
if len(finalMetrics) == 0 {
log.Info("Nothing to flush, skipping.")
return
}

// break the metrics into chunks of approximately equal size, such that
// each chunk is less than the limit
// we compute the chunks using rounding-up integer division
workers := ((len(finalMetrics) - 1) / s.FlushMaxPerBody) + 1
chunkSize := ((len(finalMetrics) - 1) / workers) + 1
log.WithField("workers", workers).Debug("Worker count chosen")
log.WithField("chunkSize", chunkSize).Debug("Chunk size chosen")
var wg sync.WaitGroup
flushStart := time.Now()
for i := 0; i < workers; i++ {
chunk := finalMetrics[i*chunkSize:]
if i < workers-1 {
// trim to chunk size unless this is the last one
chunk = chunk[:chunkSize]
}
wg.Add(1)
go s.flushPart(span.Attach(ctx), chunk, &wg)
}
wg.Wait()
s.Statsd.TimeInMilliseconds("flush.total_duration_ns", float64(time.Since(flushStart).Nanoseconds()), []string{"part:post"}, 1.0)

log.WithField("metrics", len(finalMetrics)).Info("Completed flush to Datadog")
}

func finalizeMetrics(hostname string, tags []string, finalMetrics []samplers.DDMetric) {
for i := range finalMetrics {
// Let's look for "magic tags" that override metric fields host and device.
for j, tag := range finalMetrics[i].Tags {
// This overrides hostname
if strings.HasPrefix(tag, "host:") {
// delete the tag from the list
finalMetrics[i].Tags = append(finalMetrics[i].Tags[:j], finalMetrics[i].Tags[j+1:]...)
// Override the hostname with the tag, trimming off the prefix
finalMetrics[i].Hostname = tag[5:]
} else if strings.HasPrefix(tag, "device:") {
// Same as above, but device this time
finalMetrics[i].Tags = append(finalMetrics[i].Tags[:j], finalMetrics[i].Tags[j+1:]...)
finalMetrics[i].DeviceName = tag[7:]
}
}
if finalMetrics[i].Hostname == "" {
// No magic tag, set the hostname
finalMetrics[i].Hostname = hostname
}

finalMetrics[i].Tags = append(finalMetrics[i].Tags, tags...)
}
}

// flushPart flushes a set of metrics to the remote API server
func (s *Server) flushPart(ctx context.Context, metricSlice []samplers.DDMetric, wg *sync.WaitGroup) {
defer wg.Done()
postHelper(ctx, s.HTTPClient, s.Statsd, fmt.Sprintf("%s/api/v1/series?api_key=%s", s.DDHostname, s.DDAPIKey), map[string][]samplers.DDMetric{
"series": metricSlice,
}, "flush", true)
}

func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()
Expand Down Expand Up @@ -480,62 +413,6 @@ func (s *Server) flushTraces(ctx context.Context) {
s.SpanWorker.Flush()
}

func (s *Server) flushEventsChecks(ctx context.Context) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()

events, checks := s.EventWorker.Flush()
s.Statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0)
s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0)

// fill in the default hostname for packets that didn't set it
for i := range events {
if events[i].Hostname == "" {
events[i].Hostname = s.Hostname
}
events[i].Tags = append(events[i].Tags, s.Tags...)
}
for i := range checks {
if checks[i].Hostname == "" {
checks[i].Hostname = s.Hostname
}
checks[i].Tags = append(checks[i].Tags, s.Tags...)
}

if len(events) != 0 {
// this endpoint is not documented at all, its existence is only known from
// the official dd-agent
// we don't actually pass all the body keys that dd-agent passes here... but
// it still works
err := postHelper(context.TODO(), s.HTTPClient, s.Statsd, fmt.Sprintf("%s/intake?api_key=%s", s.DDHostname, s.DDAPIKey), map[string]map[string][]samplers.UDPEvent{
"events": {
"api": events,
},
}, "flush_events", true)
if err == nil {
log.WithField("events", len(events)).Info("Completed flushing events to Datadog")
} else {
log.WithFields(logrus.Fields{
"events": len(events),
logrus.ErrorKey: err}).Warn("Error flushing events to Datadog")
}
}

if len(checks) != 0 {
// this endpoint is not documented to take an array... but it does
// another curious constraint of this endpoint is that it does not
// support "Content-Encoding: deflate"
err := postHelper(context.TODO(), s.HTTPClient, s.Statsd, fmt.Sprintf("%s/api/v1/check_run?api_key=%s", s.DDHostname, s.DDAPIKey), checks, "flush_checks", false)
if err == nil {
log.WithField("checks", len(checks)).Info("Completed flushing service checks to Datadog")
} else {
log.WithFields(logrus.Fields{
"checks": len(checks),
logrus.ErrorKey: err}).Warn("Error flushing checks to Datadog")
}
}
}

// shared code for POSTing to an endpoint, that consumes JSON, that is zlib-
// compressed, that returns 202 on success, that has a small response
// action is a string used for statsd metric names and log messages emitted from
Expand Down
Loading

0 comments on commit 44fff07

Please sign in to comment.