-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
simplify batching on flush to cortex sink #1022
base: master
Are you sure you want to change the base?
Conversation
|
} | ||
|
||
doIfNotDone := func(fn func() error) error { | ||
batching: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure we can avoid using a label. Should a method be introduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or a done bool
for the for loop
end := i + batchSize | ||
if end > len(metrics) { | ||
end = len(metrics) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
end := i + batchSize | |
if end > len(metrics) { | |
end = len(metrics) | |
} | |
end := math.Min(i + batchSize, len(metrics)) |
droppedMetrics += len(metrics[i:]) | ||
break batching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous behavior had this drop observable with s.logger.Error(err)
batch = []samplers.InterMetric{} | ||
err := s.writeMetrics(ctx, batch) | ||
if err != nil { | ||
allErrs = multierror.Append(allErrs, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm reading this right...
Previously if we had a single failure, we stopped processing (it returned from the method). Now it will continue through the batch?
This sounds like a fix, but might cause unintended build up of failures on remote failures.
I think, if this was intended, a test should be added for it which failed before and passes now. That'll help show @arnavdugar-stripe the functional change as well for validation.
@@ -13,6 +13,7 @@ import ( | |||
|
|||
"github.com/golang/protobuf/proto" | |||
"github.com/golang/snappy" | |||
"github.com/hashicorp/go-multierror" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No concern from me, but if anyone has concern about increasing the dependency surface area alternatives are listed here: https://stackoverflow.com/questions/33470649/combine-multiple-error-strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice! golang/go#53435
1.20 (can't use yet)
if end > len(metrics) { | ||
end = len(metrics) | ||
} | ||
batch := metrics[i:end] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, this made a new array&slice for batch.
Now it uses a slice from metrics.
I had to read https://go.dev/blog/slices-intro, but it looks like
- this will be more performant
- It's possible that changes (including append()) inside of anything that
batch
is passed to may write into the arraymetrics
I think this fixes it (alternatively we can read the code for where its passed, but then we have to hope it never gets changed)?
batch := metrics[i:end] | |
batch := metrics[i:end:end-i] |
select { | ||
case <-ctx.Done(): | ||
return errors.New("context finished before completing metrics flush") | ||
droppedMetrics += len(metrics[i:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably not important, but since we are looking at performance improvements anyway...
I think metrics[i:]
allocates a single pointer. We could avoid the allocation by...
droppedMetrics += len(metrics[i:]) | |
droppedMetrics += len(metrics)-i |
} | ||
|
||
doIfNotDone := func(fn func() error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏 hooray for not needing a closure now!
Summary
This change uses a simple gofunc + channel to implement the batching logic. The break function requires a label, I did avoid using
goto
.Motivation
Simplifying the code to enable the use of channel with select without a function closure.
Test plan
Updated existing tests, there is no functional change. However, the test does require one update due to the async nature changing subtly.
Rollout/monitoring/revert plan
This change can be reverted and should not change the behavior after deploy.