-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(batchprocessor): block requests until response is received #71
(batchprocessor): block requests until response is received #71
Conversation
2347913
to
bdbbabd
Compare
} else { // waiter gets a complete response. | ||
numItemsBefore += b.pending[0].numItems | ||
if trigger == triggerTimeout { | ||
err = multierr.Append(err, errTimedOut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this looks like we're making up an error, let's discuss. Is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh maybe I'm confused.. should the waiter be notified if their request times out? I thought the answer was yes, so that's why I'm letting the completeError
indicate the timeout. Do you think timeout should simply be logged instead?
|
||
numItemsBefore := b.totalSent | ||
numItemsAfter := b.totalSent + sent | ||
// The current batch can contain items from several different producers. Ensure each producer gets a response back. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: in a follow-on PR I'd like the export
to happen in the background -- it's going to mean running through this loop before calling export, so we know the set of complete/pending responses that will be returned when the response comes back.
var ( | ||
// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. | ||
errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations")) | ||
errTimedOut = errors.New("processing items timed out") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: It looks to me like a non-error condition below, where you used this variable. Maybe we can remove it, let's discuss.
respCh := make(chan error, 1) | ||
// TODO: add a semaphore to only write to channel if sizeof(data) keeps | ||
// us below some configured inflight byte limit. | ||
sb.batcher.newItem <- dataItem{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: not for now, but to consider: if the other side of this channel is being slow, this will block until after the context times out. This could instead be more defensive, like
select {
case sb.batcher.newItem <- dataItem{...}:
// ok
case <-ctx.Done():
return ctx.Err()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out! I went ahead and added the select statement you suggested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Nice work. The test changes look tedious, thanks for doing this.
Part 2 for #80
This PR adds the batchprocessor to otel-arrow with a couple enhancements:
The main files to review are
batch_processor.go
andbatch_processor_test.go