Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

stats: add option for percentiles to be emitted by client for distribution metrics #41

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ type Options struct {
// GlobalTags holds a set of tags that will automatically be applied to all
// exported spans.
GlobalTags map[string]interface{}

// DisableCountPerBuckets specifies whether to emit count_per_bucket metrics
DisableCountPerBuckets bool

// HistogramPercentiles given a list of percentiles [0.5, 0.95, 0.99], for each one will estimate the
// percentile from the Distribution metric and emit a unique metric for each
HistogramPercentiles []float64
bbassingthwaite marked this conversation as resolved.
Show resolved Hide resolved
}

func (o *Options) onError(err error) {
Expand Down
45 changes: 45 additions & 0 deletions datadog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,48 @@ func TestHistogram(t *testing.T) {
t.Errorf("Expected: %v, Got: %v\n", vd, actual)
}
}

func TestPercentile_buildMetricNameForPercentile(t *testing.T) {
testCases := []struct {
Percentile float64
Expected string
}{
{
0.5,
"50percentile",
},
{
0.75,
"75percentile",
},
{
0.92,
"92percentile",
},
{
0.95,
"95percentile",
},
{
0.99,
"99percentile",
},
{
0.995,
"995percentile",
},
{
0.999,
"999percentile",
},
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("%f", tc.Percentile), func(t *testing.T) {
got := buildMetricNameForPercentile(tc.Percentile)
if got != tc.Expected {
t.Errorf("Expected: %v, Got %v\n", tc.Expected, got)
}
})
}
}
45 changes: 41 additions & 4 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package datadog

import (
"fmt"
"math"
"sync"

"github.com/DataDog/datadog-go/statsd"
Expand Down Expand Up @@ -43,6 +44,12 @@ func newStatsExporter(o Options) (*statsExporter, error) {
return nil, err
}

for _, percentile := range o.HistogramPercentiles {
if percentile < 0 || percentile > 1 {
return nil, fmt.Errorf("'HistogramPercentiles' must be between 0 and 1: Received %f", percentile)
}
}

return &statsExporter{
opts: o,
viewData: make(map[string]*view.Data),
Expand Down Expand Up @@ -92,17 +99,47 @@ func (s *statsExporter) submitMetric(v *view.View, row *view.Row, metricName str
"avg": data.Mean,
"squared_dev_sum": data.SumOfSquaredDev,
}
for _, percentile := range s.opts.HistogramPercentiles {
metrics[buildMetricNameForPercentile(percentile)] = calculatePercentile(percentile, v.Aggregation.Buckets, data.CountPerBucket)
}

for name, value := range metrics {
err = client.Gauge(metricName+"."+name, value, opt.tagMetrics(row.Tags, tags), rate)
}

for x := range data.CountPerBucket {
addlTags := []string{"bucket_idx:" + fmt.Sprint(x)}
err = client.Gauge(metricName+".count_per_bucket", float64(data.CountPerBucket[x]), opt.tagMetrics(row.Tags, addlTags), rate)
if !s.opts.DisableCountPerBuckets {
for x := range data.CountPerBucket {
addlTags := []string{"bucket_idx:" + fmt.Sprint(x)}
err = client.Gauge(metricName+".count_per_bucket", float64(data.CountPerBucket[x]), opt.tagMetrics(row.Tags, addlTags), rate)
}
}
return err
default:
return fmt.Errorf("aggregation %T is not supported", v.Aggregation)
}
}

func calculatePercentile(percentile float64, buckets []float64, countPerBucket []int64) float64 {
cumulativePerBucket := make([]int64, len(countPerBucket))
var sum int64
for n, count := range countPerBucket {
sum += count
cumulativePerBucket[n] = sum
}
atBin := int64(math.Floor(percentile * float64(sum)))

var previousCount int64
for n, count := range cumulativePerBucket {
if atBin >= previousCount && atBin <= count {
return buckets[n]
}
previousCount = count
}
return buckets[len(buckets)-1]
}

func buildMetricNameForPercentile(percentile float64) string {
if percentile > 0.99 {
return fmt.Sprintf("%dpercentile", int64(percentile*1000+0.5))
}
return fmt.Sprintf("%dpercentile", int64(percentile*100+0.5))
}
76 changes: 76 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package datadog

import (
"fmt"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -103,3 +104,78 @@ func TestNilAggregation(t *testing.T) {
t.Errorf("Expected: %v, Got: %v", fmt.Errorf("aggregation *view.Aggregation is not supported"), actual)
}
}

func Test_calculatePercentile(t *testing.T) {
var buckets []float64
for i := float64(-100); i < 100; i += 0.1 {
buckets = append(buckets, i)
}

normalDistribution := calculateNormalDistribution(buckets, 0, 100)
AlexandreYang marked this conversation as resolved.
Show resolved Hide resolved
tsts := []struct {
expected int64
percentile float64
buckets []float64
countsPerBucket []int64
}{
{
0,
0.5,
buckets,
normalDistribution,
},
{
44,
0.75,
buckets,
normalDistribution,
},
{
86,
0.95,
buckets,
normalDistribution,
},
{
97,
0.99,
buckets,
normalDistribution,
},
{
99,
0.999,
buckets,
normalDistribution,
},
}

for _, tst := range tsts {
t.Run(fmt.Sprintf("%v", tst.percentile), func(t *testing.T) {
got := calculatePercentile(tst.percentile, tst.buckets, tst.countsPerBucket)

if tst.expected != int64(got) {
t.Errorf("Expected: %v, Got: %v", tst.expected, got)
}
})

}
}

func calculateNormalDistribution(buckets []float64, seed int64, standardDeviation float64) []int64 {
r := rand.New(rand.NewSource(seed))

normalDistribution := make([]int64, len(buckets))
for n := 0; n < 1e6; n++ {
rnd := r.NormFloat64() * standardDeviation
var previousBucket float64
for bidx, bucket := range buckets {
if rnd > previousBucket && rnd <= bucket {
normalDistribution[bidx]++
break
}
previousBucket = bucket
}
}
return normalDistribution
}