-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor the producer, part 1 #549
Conversation
First stand-alone chunk extracted from #544. This introduces and uses a produceSet structure which takes care of collecting messages, rejecting ones which don't encode, and turning the rest into a `ProduceRequest`. This has several knock-on effects: - we no longer nil out array entries, so nil checks in several places (e.g. `returnErrors`) can be removed - parseResponse gets re-indented since it loops over the partitions in the set via a callback now rather than a pair of nested loops - groupAndFilter is much simpler, a lot of its logic now lives in the produceSet - the flusher has to use the set to return errors/successes/retries, rather than `batch` which may now contain messages not in the eventual request - keyCache and valueCache can be removed
for i := range msgs { | ||
msgs[i].Offset = block.Offset + int64(i) | ||
} | ||
f.parent.returnSuccesses(msgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can return a success immediately after setting the offset? That prevents a second iteration of the message collection. Might be a micro-optimization that is not worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, not right now anyways, we'd have to split returnSuccesses
out into a returnSuccess
method too
This is a much easier to swallow changeset. 👍 |
Second stand-alone chunk extracted from #544, (first chunk: #549). This uses the `produceSet` struct in the aggregator as well, and moves the `wouldOverflow` and `readyToFlush` methods to methods on the `produceSet`. Knock-on effects: - now that we do per-partition size tracking in the aggregator we can do much more precise overflow checking (see the compressed-message-batch-size-limit case in `wouldOverflow` which has changed) which will be more efficient in high-volume scenarios - since the produceSet encodes immediately, messages which fail to encode are now rejected from the aggregator and don't count towards batch size - we still have to iterate the messages in the flusher in order to reject those which need retrying due to the state machine; for simplicity I add them to a second produceSet still, which means all messages get encoded twice; this is a definite major performance regression which will go away again in part 3 of this refactor
Second stand-alone chunk extracted from #544, (first chunk: #549). This uses the `produceSet` struct in the aggregator as well, and moves the `wouldOverflow` and `readyToFlush` methods to methods on the `produceSet`. Knock-on effects: - now that we do per-partition size tracking in the aggregator we can do much more precise overflow checking (see the compressed-message-batch-size-limit case in `wouldOverflow` which has changed) which will be more efficient in high-volume scenarios - since the produceSet encodes immediately, messages which fail to encode are now rejected from the aggregator and don't count towards batch size - we still have to iterate the messages in the flusher in order to reject those which need retrying due to the state machine; for simplicity I add them to a second produceSet still, which means all messages get encoded twice; this is a definite major performance regression which will go away again in part 3 of this refactor
First stand-alone chunk extracted from #544. This introduces and uses a
produceSet structure which takes care of collecting messages, rejecting ones
which don't encode, and turning the rest into a
ProduceRequest
.This has several knock-on effects:
returnErrors
) can be removedvia a callback now rather than a pair of nested loops
batch
which may now contain messages not in the eventual request@wvanbergen cc @kvs