-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
simplify batching on flush to cortex sink #1022
base: master
Are you sure you want to change the base?
Changes from all commits
2a7c4ac
3df06e1
f4c74c9
46e39dc
2911a1a
9d31191
0d76a62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -13,6 +13,7 @@ import ( | |||||||||||
|
||||||||||||
"github.com/golang/protobuf/proto" | ||||||||||||
"github.com/golang/snappy" | ||||||||||||
"github.com/hashicorp/go-multierror" | ||||||||||||
"github.com/pkg/errors" | ||||||||||||
"github.com/prometheus/prometheus/prompb" | ||||||||||||
"github.com/sirupsen/logrus" | ||||||||||||
|
@@ -207,66 +208,32 @@ func (s *CortexMetricSink) Flush(ctx context.Context, metrics []samplers.InterMe | |||||||||||
return sinks.MetricFlushResult{}, nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
if s.batchWriteSize == 0 || len(metrics) <= s.batchWriteSize { | ||||||||||||
err := s.writeMetrics(ctx, metrics) | ||||||||||||
if err == nil { | ||||||||||||
flushedMetrics = len(metrics) | ||||||||||||
} else { | ||||||||||||
s.logger.Error(err) | ||||||||||||
droppedMetrics = len(metrics) | ||||||||||||
} | ||||||||||||
|
||||||||||||
return sinks.MetricFlushResult{MetricsFlushed: flushedMetrics, MetricsDropped: droppedMetrics}, err | ||||||||||||
var allErrs *multierror.Error | ||||||||||||
batchSize := s.batchWriteSize | ||||||||||||
if s.batchWriteSize == 0 { | ||||||||||||
batchSize = len(metrics) | ||||||||||||
} | ||||||||||||
|
||||||||||||
doIfNotDone := func(fn func() error) error { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👏 hooray for not needing a closure now! |
||||||||||||
batching: | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sure we can avoid using a label. Should a method be introduced? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or a |
||||||||||||
for i := 0; i < len(metrics); i += batchSize { | ||||||||||||
end := i + batchSize | ||||||||||||
if end > len(metrics) { | ||||||||||||
end = len(metrics) | ||||||||||||
} | ||||||||||||
Comment on lines
+218
to
+221
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
batch := metrics[i:end] | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, this made a new array&slice for batch. I had to read https://go.dev/blog/slices-intro, but it looks like
I think this fixes it (alternatively we can read the code for where its passed, but then we have to hope it never gets changed)?
Suggested change
|
||||||||||||
select { | ||||||||||||
case <-ctx.Done(): | ||||||||||||
return errors.New("context finished before completing metrics flush") | ||||||||||||
droppedMetrics += len(metrics[i:]) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably not important, but since we are looking at performance improvements anyway...
Suggested change
|
||||||||||||
break batching | ||||||||||||
Comment on lines
+225
to
+226
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previous behavior had this drop observable with |
||||||||||||
default: | ||||||||||||
return fn() | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
var batch []samplers.InterMetric | ||||||||||||
for _, metric := range metrics { | ||||||||||||
err := doIfNotDone(func() error { | ||||||||||||
batch = append(batch, metric) | ||||||||||||
if len(batch)%s.batchWriteSize == 0 { | ||||||||||||
err := s.writeMetrics(ctx, batch) | ||||||||||||
if err != nil { | ||||||||||||
return err | ||||||||||||
} | ||||||||||||
|
||||||||||||
flushedMetrics += len(batch) | ||||||||||||
batch = []samplers.InterMetric{} | ||||||||||||
err := s.writeMetrics(ctx, batch) | ||||||||||||
if err != nil { | ||||||||||||
allErrs = multierror.Append(allErrs, err) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I'm reading this right... This sounds like a fix, but might cause unintended build up of failures on remote failures. I think, if this was intended, a test should be added for it which failed before and passes now. That'll help show @arnavdugar-stripe the functional change as well for validation. |
||||||||||||
s.logger.Error(err) | ||||||||||||
droppedMetrics += len(batch) | ||||||||||||
} | ||||||||||||
|
||||||||||||
return nil | ||||||||||||
}) | ||||||||||||
|
||||||||||||
if err != nil { | ||||||||||||
s.logger.Error(err) | ||||||||||||
droppedMetrics += len(metrics) - flushedMetrics | ||||||||||||
return sinks.MetricFlushResult{MetricsFlushed: flushedMetrics, MetricsDropped: droppedMetrics}, err | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
var err error | ||||||||||||
if len(batch) > 0 { | ||||||||||||
err = doIfNotDone(func() error { | ||||||||||||
return s.writeMetrics(ctx, batch) | ||||||||||||
}) | ||||||||||||
|
||||||||||||
if err == nil { | ||||||||||||
flushedMetrics += len(batch) | ||||||||||||
} else { | ||||||||||||
s.logger.Error(err) | ||||||||||||
droppedMetrics += len(batch) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
return sinks.MetricFlushResult{MetricsFlushed: flushedMetrics, MetricsDropped: droppedMetrics}, err | ||||||||||||
return sinks.MetricFlushResult{MetricsFlushed: len(metrics) - droppedMetrics, MetricsDropped: droppedMetrics}, allErrs.ErrorOrNil() | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (s *CortexMetricSink) writeMetrics(ctx context.Context, metrics []samplers.InterMetric) error { | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No concern from me, but if anyone has concern about increasing the dependency surface area alternatives are listed here: https://stackoverflow.com/questions/33470649/combine-multiple-error-strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice! golang/go#53435
1.20 (can't use yet)