Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add option for Processor to drop metrics #164

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ddlambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ type (
// TraceContextExtractor is the function that extracts a root/parent trace context from the Lambda event body.
// See trace.DefaultTraceExtractor for an example.
TraceContextExtractor trace.ContextExtractor
// MetricsChannelCapacity sets the capacity to buffer metrics if ShouldUseLogForwarder is set to false.
// default: 2000
MetricsChannelCapacity uint32
// DropMetricsAtCapacity controls if metrics should be dropped
// instead of blocking to wait for capacity on the metrics channel.
// default: false
DropMetricsAtCapacity bool
}
)

Expand Down Expand Up @@ -273,6 +280,8 @@ func (cfg *Config) toMetricsConfig(isExtensionRunning bool) metrics.Config {
mc.Site = cfg.Site
mc.ShouldUseLogForwarder = cfg.ShouldUseLogForwarder
mc.HTTPClientTimeout = cfg.HTTPClientTimeout
mc.MetricsChannelCapacity = cfg.MetricsChannelCapacity
mc.DropMetricsAtCapacity = cfg.DropMetricsAtCapacity
}

if mc.Site == "" {
Expand Down
1 change: 1 addition & 0 deletions internal/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
defaultCircuitBreakerInterval = time.Second * 30
defaultCircuitBreakerTimeout = time.Second * 60
defaultCircuitBreakerTotalFailures = 4
defaultMetricsChannelCapacity = 2000
)

// MetricType enumerates all the available metric types
Expand Down
7 changes: 6 additions & 1 deletion internal/metrics/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
CircuitBreakerInterval time.Duration
CircuitBreakerTimeout time.Duration
CircuitBreakerTotalFailures uint32
MetricsChannelCapacity uint32
DropMetricsAtCapacity bool
LocalTest bool
}

Expand Down Expand Up @@ -85,6 +87,9 @@ func MakeListener(config Config, extensionManager *extension.ExtensionManager) L
if config.BatchInterval <= 0 {
config.BatchInterval = defaultBatchInterval
}
if config.MetricsChannelCapacity == 0 {
config.MetricsChannelCapacity = defaultMetricsChannelCapacity
}

var statsdClient *statsd.Client
// immediate call to the Agent, if not a 200, fallback to API
Expand Down Expand Up @@ -119,7 +124,7 @@ func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) cont
}

ts := MakeTimeService()
pr := MakeProcessor(ctx, l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure, l.config.CircuitBreakerInterval, l.config.CircuitBreakerTimeout, l.config.CircuitBreakerTotalFailures)
pr := MakeProcessor(ctx, l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure, l.config.CircuitBreakerInterval, l.config.CircuitBreakerTimeout, l.config.CircuitBreakerTotalFailures, l.config.MetricsChannelCapacity, l.config.DropMetricsAtCapacity)
l.processor = pr

ctx = AddListener(ctx, l)
Expand Down
59 changes: 37 additions & 22 deletions internal/metrics/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package metrics

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -19,6 +20,9 @@ import (
"github.com/sony/gobreaker"
)

// ErrMetricsChannelAtCapacity reports that the processors metricsChan is at capacity and unable receive more metrics.
var ErrMetricsChannelAtCapacity = errors.New("channel is at capacity, metrics will be dropped")

type (
// Processor is used to batch metrics on a background thread, and send them on to a client periodically.
Processor interface {
Expand All @@ -33,36 +37,38 @@ type (
}

processor struct {
context context.Context
metricsChan chan Metric
timeService TimeService
waitGroup sync.WaitGroup
batchInterval time.Duration
client Client
batcher *Batcher
shouldRetryOnFail bool
isProcessing bool
breaker *gobreaker.CircuitBreaker
context context.Context
metricsChan chan Metric
dropMetricsAtCapacity bool
timeService TimeService
waitGroup sync.WaitGroup
batchInterval time.Duration
client Client
batcher *Batcher
shouldRetryOnFail bool
isProcessing bool
breaker *gobreaker.CircuitBreaker
}
)

// MakeProcessor creates a new metrics context
func MakeProcessor(ctx context.Context, client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool, circuitBreakerInterval time.Duration, circuitBreakerTimeout time.Duration, circuitBreakerTotalFailures uint32) Processor {
func MakeProcessor(ctx context.Context, client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool, circuitBreakerInterval time.Duration, circuitBreakerTimeout time.Duration, circuitBreakerTotalFailures uint32, metricsChanCapacity uint32, dropMetricsAtCapacity bool) Processor {
batcher := MakeBatcher(batchInterval)

breaker := MakeCircuitBreaker(circuitBreakerInterval, circuitBreakerTimeout, circuitBreakerTotalFailures)

return &processor{
context: ctx,
metricsChan: make(chan Metric, 2000),
batchInterval: batchInterval,
waitGroup: sync.WaitGroup{},
client: client,
batcher: batcher,
shouldRetryOnFail: shouldRetryOnFail,
timeService: timeService,
isProcessing: false,
breaker: breaker,
context: ctx,
metricsChan: make(chan Metric, metricsChanCapacity),
dropMetricsAtCapacity: dropMetricsAtCapacity,
batchInterval: batchInterval,
waitGroup: sync.WaitGroup{},
client: client,
batcher: batcher,
shouldRetryOnFail: shouldRetryOnFail,
timeService: timeService,
isProcessing: false,
breaker: breaker,
}
}

Expand All @@ -83,7 +89,16 @@ func MakeCircuitBreaker(circuitBreakerInterval time.Duration, circuitBreakerTime
func (p *processor) AddMetric(metric Metric) {
// We use a large buffer in the metrics channel, to make this operation non-blocking.
// However, if the channel does fill up, this will become a blocking operation.
p.metricsChan <- metric
// Set DropMetricsAtCapacity to true in order to drop the metric instead of blocking.
if p.dropMetricsAtCapacity {
select {
case p.metricsChan <- metric:
default:
logger.Error(ErrMetricsChannelAtCapacity)
}
} else {
p.metricsChan <- metric
}
}

func (p *processor) StartProcessing() {
Expand Down
50 changes: 45 additions & 5 deletions internal/metrics/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestProcessorBatches(t *testing.T) {
mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")
nowUnix := float64(mts.now.Unix())

processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, math.MaxUint32)
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, math.MaxUint32, 2000, false)

d1 := Distribution{
Name: "metric-1",
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestProcessorBatchesPerTick(t *testing.T) {
secondTimeUnix := float64(secondTime.Unix())
mts.now = firstTime

processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, math.MaxUint32)
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, math.MaxUint32, 2000, false)

d1 := Distribution{
Name: "metric-1",
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestProcessorPerformsRetry(t *testing.T) {
mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")

shouldRetry := true
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, shouldRetry, time.Hour*1000, time.Hour*1000, math.MaxUint32)
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, shouldRetry, time.Hour*1000, time.Hour*1000, math.MaxUint32, 2000, false)

d1 := Distribution{
Name: "metric-1",
Expand All @@ -214,7 +214,7 @@ func TestProcessorCancelsWithContext(t *testing.T) {

shouldRetry := true
ctx, cancelFunc := context.WithCancel(context.Background())
processor := MakeProcessor(ctx, &mc, &mts, 1000, shouldRetry, time.Hour*1000, time.Hour*1000, math.MaxUint32)
processor := MakeProcessor(ctx, &mc, &mts, 1000, shouldRetry, time.Hour*1000, time.Hour*1000, math.MaxUint32, 2000, false)

d1 := Distribution{
Name: "metric-1",
Expand All @@ -240,7 +240,7 @@ func TestProcessorBatchesWithOpeningCircuitBreaker(t *testing.T) {

// Will open the circuit breaker at number of total failures > 1
circuitBreakerTotalFailures := uint32(1)
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, circuitBreakerTotalFailures)
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, circuitBreakerTotalFailures, 2000, false)

d1 := Distribution{
Name: "metric-1",
Expand All @@ -257,3 +257,43 @@ func TestProcessorBatchesWithOpeningCircuitBreaker(t *testing.T) {
// It should have retried 3 times, but circuit breaker opened at the second time
assert.Equal(t, 1, mc.sendMetricsCalledCount)
}

func TestProcessorBatchesDropMetricsAtCapacity(t *testing.T) {
mc := makeMockClient()
mts := makeMockTimeService()

mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")
nowUnix := float64(mts.now.Unix())

processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, math.MaxUint32, 1, true)

d1 := Distribution{
Name: "metric-1",
Tags: []string{"a", "b", "c"},
Values: []MetricValue{{Timestamp: mts.now, Value: 1}, {Timestamp: mts.now, Value: 2}, {Timestamp: mts.now, Value: 3}},
}
d2 := Distribution{
Name: "metric-1",
Tags: []string{"a", "b", "c"},
Values: []MetricValue{{Timestamp: mts.now, Value: 4}, {Timestamp: mts.now, Value: 5}, {Timestamp: mts.now, Value: 6}},
}

processor.AddMetric(&d1)
processor.AddMetric(&d2)

processor.StartProcessing()
processor.FinishProcessing()

firstBatch := <-mc.batches

assert.Equal(t, []APIMetric{{
Name: "metric-1",
Tags: []string{"a", "b", "c"},
MetricType: DistributionType,
Points: []interface{}{
[]interface{}{nowUnix, []interface{}{float64(1)}},
[]interface{}{nowUnix, []interface{}{float64(2)}},
[]interface{}{nowUnix, []interface{}{float64(3)}},
},
}}, firstBatch)
}