Skip to content

Commit

Permalink
Merge pull request #283 from iksaif/corentin.chary/max-samples-and-di…
Browse files Browse the repository at this point in the history
…strib-rates

Add a way to limit the number of samples we keep for buffered metrics
  • Loading branch information
remeh authored Oct 16, 2023
2 parents fe1c33e + 1ee2618 commit aafbe8f
Show file tree
Hide file tree
Showing 14 changed files with 450 additions and 131 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

[//]: # (comment: Don't forget to update statsd/telemetry.go:clientVersionTelemetryTag when releasing a new version)

# 5.4.0 / xxxx-xx-xx

- [FEATURE] Add `WithMaxSamplesPerContext()` option to limit the number of samples per context. See [#292][].
- [BUGFIX] Fix the `rate` of distributions and histograms when using client side aggregation. See [#283][].

# 5.3.0 / 2023-03-06

- [FEATURE] Added support for `DD_DOGSTATSD_URL`. You can now use this env var to set the URL to use to connect to DogStatsD. See [#273][]
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ interesting data to know how useful extended aggregation is to your app.

This can be enabled with the `WithExtendedClientSideAggregation()` option.

### Maximum samples per context

This feature is best coupled with the previous aggregation mechanism. It allows to limit the number of samples per
context for `histogram`, `distribution` and `timing` metrics.

This can be enabled with the `WithMaxSamplesPerContext(n int)` option. When enabled up to `n` samples will be kept in
per context. The default value is 0 which means no limit.

The selection of the samples is using an algorithm that tries to keep the distribution of kept sample over time uniform.

## Performance / Metric drops

### Monitoring this client
Expand Down
8 changes: 4 additions & 4 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ type aggregator struct {
wg sync.WaitGroup
}

func newAggregator(c *Client) *aggregator {
func newAggregator(c *Client, maxSamplesPerContext int64) *aggregator {
return &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
histograms: newBufferedContexts(newHistogramMetric, maxSamplesPerContext),
distributions: newBufferedContexts(newDistributionMetric, maxSamplesPerContext),
timings: newBufferedContexts(newTimingMetric, maxSamplesPerContext),
closed: make(chan struct{}),
stopChannelMode: make(chan struct{}),
}
Expand Down
129 changes: 125 additions & 4 deletions statsd/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TestAggregatorSample(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, 0)

tags := []string{"tag1", "tag2"}

Expand Down Expand Up @@ -47,7 +47,7 @@ func TestAggregatorSample(t *testing.T) {
}

func TestAggregatorFlush(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, 0)

tags := []string{"tag1", "tag2"}

Expand Down Expand Up @@ -195,8 +195,129 @@ func TestAggregatorFlush(t *testing.T) {
metrics)
}

func TestAggregatorFlushWithMaxSamplesPerContext(t *testing.T) {
// In this test we keep only 2 samples per context for metrics where it's relevant.
maxSamples := int64(2)
a := newAggregator(nil, maxSamples)

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest1", 21, tags)
a.gauge("gaugeTest1", 10, tags)
a.gauge("gaugeTest1", 15, tags)

a.count("countTest1", 21, tags)
a.count("countTest1", 10, tags)
a.count("countTest1", 1, tags)

a.set("setTest1", "value1", tags)
a.set("setTest1", "value1", tags)
a.set("setTest1", "value2", tags)

a.histogram("histogramTest1", 21, tags, 1)
a.histogram("histogramTest1", 22, tags, 1)
a.histogram("histogramTest1", 23, tags, 1)

a.distribution("distributionTest1", 21, tags, 1)
a.distribution("distributionTest1", 22, tags, 1)
a.distribution("distributionTest1", 23, tags, 1)

a.timing("timingTest1", 21, tags, 1)
a.timing("timingTest1", 22, tags, 1)
a.timing("timingTest1", 23, tags, 1)

metrics := a.flushMetrics()

assert.Len(t, a.gauges, 0)
assert.Len(t, a.counts, 0)
assert.Len(t, a.sets, 0)
assert.Len(t, a.histograms.values, 0)
assert.Len(t, a.distributions.values, 0)
assert.Len(t, a.timings.values, 0)

assert.Len(t, metrics, 7)

sort.Slice(metrics, func(i, j int) bool {
if metrics[i].metricType == metrics[j].metricType {
res := strings.Compare(metrics[i].name, metrics[j].name)
// this happens fo set
if res == 0 {
return strings.Compare(metrics[i].svalue, metrics[j].svalue) != 1
}
return res != 1
}
return metrics[i].metricType < metrics[j].metricType
})

expectedMetrics := []metric{
metric{
metricType: gauge,
name: "gaugeTest1",
tags: tags,
rate: 1,
fvalue: float64(10),
},
metric{
metricType: count,
name: "countTest1",
tags: tags,
rate: 1,
ivalue: int64(31),
},
metric{
metricType: histogramAggregated,
name: "histogramTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: float64(maxSamples) / 3,
fvalues: []float64{21.0, 22.0, 23.0},
},
metric{
metricType: distributionAggregated,
name: "distributionTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: float64(maxSamples) / 3,
fvalues: []float64{21.0, 22.0, 23.0},
},
metric{
metricType: set,
name: "setTest1",
tags: tags,
rate: 1,
svalue: "value1",
},
metric{
metricType: set,
name: "setTest1",
tags: tags,
rate: 1,
svalue: "value2",
},
metric{
metricType: timingAggregated,
name: "timingTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: float64(maxSamples) / 3,
fvalues: []float64{21.0, 22.0, 23.0},
},
}

for i, m := range metrics {
assert.Equal(t, expectedMetrics[i].metricType, m.metricType)
assert.Equal(t, expectedMetrics[i].name, m.name)
assert.Equal(t, expectedMetrics[i].tags, m.tags)
if m.metricType == timingAggregated || m.metricType == histogramAggregated || m.metricType == distributionAggregated {
assert.Equal(t, expectedMetrics[i].rate, float64(len(m.fvalues))/float64(len(expectedMetrics[i].fvalues)))
assert.Subset(t, expectedMetrics[i].fvalues, m.fvalues)
assert.Len(t, m.fvalues, int(maxSamples))
} else {
assert.Equal(t, expectedMetrics[i].rate, m.rate)
assert.Equal(t, expectedMetrics[i].fvalues, m.fvalues)
}
}
}

func TestAggregatorFlushConcurrency(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, 0)

var wg sync.WaitGroup
wg.Add(10)
Expand Down Expand Up @@ -228,7 +349,7 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
}

func TestAggregatorTagsCopy(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, 0)
tags := []string{"tag1", "tag2"}

a.gauge("gauge", 21, tags)
Expand Down
3 changes: 2 additions & 1 deletion statsd/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, nam
}

// writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped.
func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int) (int, error) {
func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int, rate float64) (int, error) {
if b.elementCount >= b.maxElements {
return 0, errBufferFull
}
Expand Down Expand Up @@ -112,6 +112,7 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, gl

b.buffer = append(b.buffer, '|')
b.buffer = append(b.buffer, metricSymbol...)
b.buffer = appendRate(b.buffer, rate)
b.buffer = appendTagsAggregated(b.buffer, globalTags, tags)
b.buffer = appendContainerID(b.buffer)
b.writeSeparator()
Expand Down
31 changes: 19 additions & 12 deletions statsd/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,58 +166,65 @@ func TestBufferSeparator(t *testing.T) {

func TestBufferAggregated(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
pos, err := buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1)
pos, err := buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1, 1)
assert.Nil(t, err)
assert.Equal(t, 1, pos)
assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes()))

buffer = newStatsdBuffer(1024, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Nil(t, err)
assert.Equal(t, 4, pos)
assert.Equal(t, "namespace.metric:1:2:3:4|h|#tag:tag\n", string(buffer.bytes()))

// With a sampling rate
buffer = newStatsdBuffer(1024, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 0.33)
assert.Nil(t, err)
assert.Equal(t, 4, pos)
assert.Equal(t, "namespace.metric:1:2:3:4|h|@0.33|#tag:tag\n", string(buffer.bytes()))

// max element already used
buffer = newStatsdBuffer(1024, 1)
buffer.elementCount = 1
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)

// not enought size to start serializing (tags and header too big)
// not enough size to start serializing (tags and header too big)
buffer = newStatsdBuffer(4, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)

// not enought size to serializing one message
// not enough size to serializing one message
buffer = newStatsdBuffer(29, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)

// space for only 1 number
buffer = newStatsdBuffer(30, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errPartialWrite, err)
assert.Equal(t, 1, pos)
assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes()))

// first value too big
buffer = newStatsdBuffer(30, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)
assert.Equal(t, 0, pos)
assert.Equal(t, "", string(buffer.bytes())) // checking that the buffer was reset

// not enough space left
buffer = newStatsdBuffer(40, 1)
buffer.buffer = append(buffer.buffer, []byte("abcdefghij")...)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)
assert.Equal(t, 0, pos)
assert.Equal(t, "abcdefghij", string(buffer.bytes())) // checking that the buffer was reset

// space for only 2 number
buffer = newStatsdBuffer(32, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errPartialWrite, err)
assert.Equal(t, 2, pos)
assert.Equal(t, "namespace.metric:1:2|h|#tag:tag\n", string(buffer.bytes()))
Expand All @@ -227,7 +234,7 @@ func TestBufferAggregated(t *testing.T) {
defer resetContainerID()

buffer = newStatsdBuffer(1024, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1, 1)
assert.Nil(t, err)
assert.Equal(t, 1, pos)
assert.Equal(t, "namespace.metric:1|h|#tag:tag|c:container-id\n", string(buffer.bytes()))
Expand Down
46 changes: 29 additions & 17 deletions statsd/buffered_metric_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ type bufferedMetricContexts struct {
randomLock sync.Mutex
}

func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts {
func newBufferedContexts(newMetric func(string, float64, string, int64) *bufferedMetric, maxSamples int64) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: newMetric,
values: bufferedMetricMap{},
newMetric: func(name string, value float64, stringTags string) *bufferedMetric {
return newMetric(name, value, stringTags, maxSamples)
},
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return
// very similar values. That's fine for seeding the worker-specific random
// source because we just need an evenly distributed stream of float values.
Expand All @@ -44,36 +46,46 @@ func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
bc.mutex.Unlock()

for _, d := range values {
d.Lock()
metrics = append(metrics, d.flushUnsafe())
d.Unlock()
}
atomic.AddUint64(&bc.nbContext, uint64(len(values)))
return metrics
}

func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error {
if !shouldSample(rate, bc.random, &bc.randomLock) {
return nil
}
keepingSample := shouldSample(rate, bc.random, &bc.randomLock)

context, stringTags := getContextAndTags(name, tags)
var v *bufferedMetric = nil

bc.mutex.RLock()
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.RUnlock()
return nil
}
v, _ = bc.values[context]
bc.mutex.RUnlock()

// Create it if it wasn't found
bc.mutex.Lock()
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.Unlock()
return nil
if v == nil {
// It might have been created by another goroutine since last call
v, _ = bc.values[context]
if v == nil {
// If we might keep a sample that we should have skipped, but that should not drastically affect performances.
bc.values[context] = bc.newMetric(name, value, stringTags)
// We added a new value, we need to unlock the mutex and quit
bc.mutex.Unlock()
return nil
}
}
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.mutex.Unlock()

// Now we can keep the sample or skip it
if keepingSample {
v.maybeKeepSample(value, bc.random, &bc.randomLock)
} else {
v.skipSample()
}

return nil
}

Expand Down
Loading

0 comments on commit aafbe8f

Please sign in to comment.