Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a way to limit the number of samples we keep for buffered metrics #283

Merged
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

[//]: # (comment: Don't forget to update statsd/telemetry.go:clientVersionTelemetryTag when releasing a new version)

# 5.4.0 / xxxx-xx-xx

- [FEATURE] Add `WithMaxSamplesPerContext()` option to limit the number of samples per context. See [#292][].
- [BUGFIX] Fix the `rate` of distributions and histograms when using client side aggregation. See [#283][].

# 5.3.0 / 2023-03-06

- [FEATURE] Added support for `DD_DOGSTATSD_URL`. You can now use this env var to set the URL to use to connect to DogStatsD. See [#273][]
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ interesting data to know how useful extended aggregation is to your app.

This can be enabled with the `WithExtendedClientSideAggregation()` option.

### Maximum samples per context

This feature is best coupled with the previous aggregation mechanism. It allows to limit the number of samples per
context for `histogram`, `distribution` and `timing` metrics.

This can be enabled with the `WithMaxSamplesPerContext(n int)` option. When enabled up to `n` samples will be kept in
per context. The default value is 0 which means no limit.

The selection of the samples is using an algorithm that tries to keep the distribution of kept sample over time uniform.

## Performance / Metric drops

### Monitoring this client
Expand Down
8 changes: 4 additions & 4 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ type aggregator struct {
wg sync.WaitGroup
}

func newAggregator(c *Client) *aggregator {
func newAggregator(c *Client, maxSamplesPerContext int64) *aggregator {
return &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
histograms: newBufferedContexts(newHistogramMetric, maxSamplesPerContext),
distributions: newBufferedContexts(newDistributionMetric, maxSamplesPerContext),
timings: newBufferedContexts(newTimingMetric, maxSamplesPerContext),
closed: make(chan struct{}),
stopChannelMode: make(chan struct{}),
}
Expand Down
129 changes: 125 additions & 4 deletions statsd/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TestAggregatorSample(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, 0)

tags := []string{"tag1", "tag2"}

Expand Down Expand Up @@ -47,7 +47,7 @@ func TestAggregatorSample(t *testing.T) {
}

func TestAggregatorFlush(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, 0)

tags := []string{"tag1", "tag2"}

Expand Down Expand Up @@ -195,8 +195,129 @@ func TestAggregatorFlush(t *testing.T) {
metrics)
}

func TestAggregatorFlushWithMaxSamplesPerContext(t *testing.T) {
// In this test we keep only 2 samples per context for metrics where it's relevant.
maxSamples := int64(2)
a := newAggregator(nil, maxSamples)

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest1", 21, tags)
a.gauge("gaugeTest1", 10, tags)
a.gauge("gaugeTest1", 15, tags)

a.count("countTest1", 21, tags)
a.count("countTest1", 10, tags)
a.count("countTest1", 1, tags)

a.set("setTest1", "value1", tags)
a.set("setTest1", "value1", tags)
a.set("setTest1", "value2", tags)

a.histogram("histogramTest1", 21, tags, 1)
a.histogram("histogramTest1", 22, tags, 1)
a.histogram("histogramTest1", 23, tags, 1)

a.distribution("distributionTest1", 21, tags, 1)
a.distribution("distributionTest1", 22, tags, 1)
a.distribution("distributionTest1", 23, tags, 1)

a.timing("timingTest1", 21, tags, 1)
a.timing("timingTest1", 22, tags, 1)
a.timing("timingTest1", 23, tags, 1)

metrics := a.flushMetrics()

assert.Len(t, a.gauges, 0)
assert.Len(t, a.counts, 0)
assert.Len(t, a.sets, 0)
assert.Len(t, a.histograms.values, 0)
assert.Len(t, a.distributions.values, 0)
assert.Len(t, a.timings.values, 0)

assert.Len(t, metrics, 7)

sort.Slice(metrics, func(i, j int) bool {
if metrics[i].metricType == metrics[j].metricType {
res := strings.Compare(metrics[i].name, metrics[j].name)
// this happens fo set
if res == 0 {
return strings.Compare(metrics[i].svalue, metrics[j].svalue) != 1
}
return res != 1
}
return metrics[i].metricType < metrics[j].metricType
})

expectedMetrics := []metric{
metric{
metricType: gauge,
name: "gaugeTest1",
tags: tags,
rate: 1,
fvalue: float64(10),
},
metric{
metricType: count,
name: "countTest1",
tags: tags,
rate: 1,
ivalue: int64(31),
},
metric{
metricType: histogramAggregated,
name: "histogramTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: float64(maxSamples) / 3,
fvalues: []float64{21.0, 22.0, 23.0},
},
metric{
metricType: distributionAggregated,
name: "distributionTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: float64(maxSamples) / 3,
fvalues: []float64{21.0, 22.0, 23.0},
},
metric{
metricType: set,
name: "setTest1",
tags: tags,
rate: 1,
svalue: "value1",
},
metric{
metricType: set,
name: "setTest1",
tags: tags,
rate: 1,
svalue: "value2",
},
metric{
metricType: timingAggregated,
name: "timingTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: float64(maxSamples) / 3,
fvalues: []float64{21.0, 22.0, 23.0},
},
}

for i, m := range metrics {
assert.Equal(t, expectedMetrics[i].metricType, m.metricType)
assert.Equal(t, expectedMetrics[i].name, m.name)
assert.Equal(t, expectedMetrics[i].tags, m.tags)
if m.metricType == timingAggregated || m.metricType == histogramAggregated || m.metricType == distributionAggregated {
assert.Equal(t, expectedMetrics[i].rate, float64(len(m.fvalues))/float64(len(expectedMetrics[i].fvalues)))
assert.Subset(t, expectedMetrics[i].fvalues, m.fvalues)
assert.Len(t, m.fvalues, int(maxSamples))
} else {
assert.Equal(t, expectedMetrics[i].rate, m.rate)
assert.Equal(t, expectedMetrics[i].fvalues, m.fvalues)
}
}
}

func TestAggregatorFlushConcurrency(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, 0)

var wg sync.WaitGroup
wg.Add(10)
Expand Down Expand Up @@ -228,7 +349,7 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
}

func TestAggregatorTagsCopy(t *testing.T) {
a := newAggregator(nil)
a := newAggregator(nil, 0)
tags := []string{"tag1", "tag2"}

a.gauge("gauge", 21, tags)
Expand Down
3 changes: 2 additions & 1 deletion statsd/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, nam
}

// writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped.
func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int) (int, error) {
func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int, rate float64) (int, error) {
if b.elementCount >= b.maxElements {
return 0, errBufferFull
}
Expand Down Expand Up @@ -112,6 +112,7 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, gl

b.buffer = append(b.buffer, '|')
b.buffer = append(b.buffer, metricSymbol...)
b.buffer = appendRate(b.buffer, rate)
b.buffer = appendTagsAggregated(b.buffer, globalTags, tags)
b.buffer = appendContainerID(b.buffer)
b.writeSeparator()
Expand Down
31 changes: 19 additions & 12 deletions statsd/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,58 +166,65 @@ func TestBufferSeparator(t *testing.T) {

func TestBufferAggregated(t *testing.T) {
buffer := newStatsdBuffer(1024, 1)
pos, err := buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1)
pos, err := buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1, 1)
assert.Nil(t, err)
assert.Equal(t, 1, pos)
assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes()))

buffer = newStatsdBuffer(1024, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Nil(t, err)
assert.Equal(t, 4, pos)
assert.Equal(t, "namespace.metric:1:2:3:4|h|#tag:tag\n", string(buffer.bytes()))

// With a sampling rate
buffer = newStatsdBuffer(1024, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 0.33)
assert.Nil(t, err)
assert.Equal(t, 4, pos)
assert.Equal(t, "namespace.metric:1:2:3:4|h|@0.33|#tag:tag\n", string(buffer.bytes()))

// max element already used
buffer = newStatsdBuffer(1024, 1)
buffer.elementCount = 1
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)

// not enought size to start serializing (tags and header too big)
// not enough size to start serializing (tags and header too big)
buffer = newStatsdBuffer(4, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)

// not enought size to serializing one message
// not enough size to serializing one message
buffer = newStatsdBuffer(29, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)

// space for only 1 number
buffer = newStatsdBuffer(30, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errPartialWrite, err)
assert.Equal(t, 1, pos)
assert.Equal(t, "namespace.metric:1|h|#tag:tag\n", string(buffer.bytes()))

// first value too big
buffer = newStatsdBuffer(30, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)
assert.Equal(t, 0, pos)
assert.Equal(t, "", string(buffer.bytes())) // checking that the buffer was reset

// not enough space left
buffer = newStatsdBuffer(40, 1)
buffer.buffer = append(buffer.buffer, []byte("abcdefghij")...)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errBufferFull, err)
assert.Equal(t, 0, pos)
assert.Equal(t, "abcdefghij", string(buffer.bytes())) // checking that the buffer was reset

// space for only 2 number
buffer = newStatsdBuffer(32, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12, -1, 1)
assert.Equal(t, errPartialWrite, err)
assert.Equal(t, 2, pos)
assert.Equal(t, "namespace.metric:1:2|h|#tag:tag\n", string(buffer.bytes()))
Expand All @@ -227,7 +234,7 @@ func TestBufferAggregated(t *testing.T) {
defer resetContainerID()

buffer = newStatsdBuffer(1024, 1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1)
pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12, -1, 1)
assert.Nil(t, err)
assert.Equal(t, 1, pos)
assert.Equal(t, "namespace.metric:1|h|#tag:tag|c:container-id\n", string(buffer.bytes()))
Expand Down
46 changes: 29 additions & 17 deletions statsd/buffered_metric_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ type bufferedMetricContexts struct {
randomLock sync.Mutex
}

func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts {
func newBufferedContexts(newMetric func(string, float64, string, int64) *bufferedMetric, maxSamples int64) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: newMetric,
values: bufferedMetricMap{},
newMetric: func(name string, value float64, stringTags string) *bufferedMetric {
return newMetric(name, value, stringTags, maxSamples)
},
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return
// very similar values. That's fine for seeding the worker-specific random
// source because we just need an evenly distributed stream of float values.
Expand All @@ -44,36 +46,46 @@ func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
bc.mutex.Unlock()

for _, d := range values {
d.Lock()
metrics = append(metrics, d.flushUnsafe())
d.Unlock()
}
atomic.AddUint64(&bc.nbContext, uint64(len(values)))
return metrics
}

func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error {
if !shouldSample(rate, bc.random, &bc.randomLock) {
return nil
}
keepingSample := shouldSample(rate, bc.random, &bc.randomLock)

context, stringTags := getContextAndTags(name, tags)
var v *bufferedMetric = nil

bc.mutex.RLock()
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.RUnlock()
return nil
}
v, _ = bc.values[context]
bc.mutex.RUnlock()

// Create it if it wasn't found
remeh marked this conversation as resolved.
Show resolved Hide resolved
bc.mutex.Lock()
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.Unlock()
return nil
if v == nil {
// It might have been created by another goroutine since last call
v, _ = bc.values[context]
if v == nil {
// If we might keep a sample that we should have skipped, but that should not drastically affect performances.
bc.values[context] = bc.newMetric(name, value, stringTags)
// We added a new value, we need to unlock the mutex and quit
bc.mutex.Unlock()
return nil
}
}
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.mutex.Unlock()

// Now we can keep the sample or skip it
if keepingSample {
v.maybeKeepSample(value, bc.random, &bc.randomLock)
} else {
v.skipSample()
}

return nil
}

Expand Down
Loading