Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify batching on flush to cortex sink #1022

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
## Updated
* Use `T.TempDir` to create temporary directory in tests ([#944](https://github.com/stripe/veneur/pull/944)).
* When the request to send data from Cloudwatch & SFX sink fails, log the count of metrics that are dropped.
* Refactor Cortex sink to use a channel plus gofunc to batch work
asynchronously.

## Bugfixes
* A fix for forwarding metrics with gRPC using the kubernetes discoverer. Thanks, [androohan](https://github.com/androohan)!
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/golang/protobuf v1.4.2
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/hashicorp/consul/api v1.1.0
github.com/hashicorp/go-multierror v1.1.1
github.com/kelseyhightower/envconfig v1.3.0
github.com/lightstep/lightstep-tracer-go v0.13.1-0.20170818234450-ea8cdd9df863
github.com/mitchellh/mapstructure v1.4.1
Expand Down Expand Up @@ -71,6 +72,7 @@ require (
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.5.4 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,9 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack v0.5.4 h1:SFT72YqIkOcLdWJUYcriVX7hbrZpwc/f7h8aW2NUqrA=
github.com/hashicorp/go-msgpack v0.5.4/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-retryablehttp v0.6.4/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-retryablehttp v0.6.6/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-retryablehttp v0.6.7 h1:8/CAEZt/+F7kR7GevNHulKkUjLht3CPmn7egmhieNKo=
Expand Down
73 changes: 20 additions & 53 deletions sinks/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/hashicorp/go-multierror"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No concern from me, but if anyone has concern about increasing the dependency surface area alternatives are listed here: https://stackoverflow.com/questions/33470649/combine-multiple-error-strings

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice! golang/go#53435
1.20 (can't use yet)

"github.com/pkg/errors"
"github.com/prometheus/prometheus/prompb"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -207,66 +208,32 @@ func (s *CortexMetricSink) Flush(ctx context.Context, metrics []samplers.InterMe
return sinks.MetricFlushResult{}, nil
}

if s.batchWriteSize == 0 || len(metrics) <= s.batchWriteSize {
err := s.writeMetrics(ctx, metrics)
if err == nil {
flushedMetrics = len(metrics)
} else {
s.logger.Error(err)
droppedMetrics = len(metrics)
}

return sinks.MetricFlushResult{MetricsFlushed: flushedMetrics, MetricsDropped: droppedMetrics}, err
var allErrs *multierror.Error
batchSize := s.batchWriteSize
if s.batchWriteSize == 0 {
batchSize = len(metrics)
}

doIfNotDone := func(fn func() error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 hooray for not needing a closure now!

batching:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure we can avoid using a label. Should a method be introduced?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or a done bool for the for loop

for i := 0; i < len(metrics); i += batchSize {
end := i + batchSize
if end > len(metrics) {
end = len(metrics)
}
Comment on lines +218 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
end := i + batchSize
if end > len(metrics) {
end = len(metrics)
}
end := math.Min(i + batchSize, len(metrics))

batch := metrics[i:end]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this made a new array&slice for batch.
Now it uses a slice from metrics.

I had to read https://go.dev/blog/slices-intro, but it looks like

  1. this will be more performant
  2. It's possible that changes (including append()) inside of anything that batch is passed to may write into the array metrics

I think this fixes it (alternatively we can read the code for where its passed, but then we have to hope it never gets changed)?

Suggested change
batch := metrics[i:end]
batch := metrics[i:end:end-i]

select {
case <-ctx.Done():
return errors.New("context finished before completing metrics flush")
droppedMetrics += len(metrics[i:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not important, but since we are looking at performance improvements anyway...
I think metrics[i:] allocates a single pointer. We could avoid the allocation by...

Suggested change
droppedMetrics += len(metrics[i:])
droppedMetrics += len(metrics)-i

break batching
Comment on lines +225 to +226
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous behavior had this drop observable with s.logger.Error(err)

default:
return fn()
}
}

var batch []samplers.InterMetric
for _, metric := range metrics {
err := doIfNotDone(func() error {
batch = append(batch, metric)
if len(batch)%s.batchWriteSize == 0 {
err := s.writeMetrics(ctx, batch)
if err != nil {
return err
}

flushedMetrics += len(batch)
batch = []samplers.InterMetric{}
err := s.writeMetrics(ctx, batch)
if err != nil {
allErrs = multierror.Append(allErrs, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this right...
Previously if we had a single failure, we stopped processing (it returned from the method). Now it will continue through the batch?

This sounds like a fix, but might cause unintended build up of failures on remote failures.

I think, if this was intended, a test should be added for it which failed before and passes now. That'll help show @arnavdugar-stripe the functional change as well for validation.

s.logger.Error(err)
droppedMetrics += len(batch)
}

return nil
})

if err != nil {
s.logger.Error(err)
droppedMetrics += len(metrics) - flushedMetrics
return sinks.MetricFlushResult{MetricsFlushed: flushedMetrics, MetricsDropped: droppedMetrics}, err
}
}

var err error
if len(batch) > 0 {
err = doIfNotDone(func() error {
return s.writeMetrics(ctx, batch)
})

if err == nil {
flushedMetrics += len(batch)
} else {
s.logger.Error(err)
droppedMetrics += len(batch)
}
}

return sinks.MetricFlushResult{MetricsFlushed: flushedMetrics, MetricsDropped: droppedMetrics}, err
return sinks.MetricFlushResult{MetricsFlushed: len(metrics) - droppedMetrics, MetricsDropped: droppedMetrics}, allErrs.ErrorOrNil()
}

func (s *CortexMetricSink) writeMetrics(ctx context.Context, metrics []samplers.InterMetric) error {
Expand Down
Loading