-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Producer refactor #544
Producer refactor #544
Conversation
Useless abstraction right this instant, but will be very useful soon.
Add an eachPartition to the produceSet
Now that nobody uses `batch` after the set is constructed, we no longer have to nil out its entries when we return errors.
Temporary breakage ensues.
Get rid of keyCache and valueCache
This simplifies a lot of things because we can filter out messages before we batch them (except when they fail to encode).
Clean up logging
This moves key and value encoding earlier in the process without the need for the old keyCache/valueCache.
State management is now sane, we don't have to flush just because we see a chaser message.
We never build the request now unless we have messages, which means the request can never be nil, which means the input into the errors channel can never be nil.
That was a bad pattern because it could easily have gotten out-of-sync with what was actually sent to the flusher due to subtle bugs. Instead, have the flusher pass back the set it sent alongside the response. This forces us to use a struct, which lets us combine the `responses` and `errors` channels too.
The problem is that receiving a response can change the retry state of a partition (or the entire broker). This potentially requires us to retry any messages we'd had buffered for that partition, which changes the size of the produceSet we'd accumulated, potentially even making it empty again. Things are complicated further by the fact that we must be prepared to handle responses while forcing blocking flushes, midway through the message processing. This does a quick fix, but I'm not really happy with it: - Flush the buffer in the appropriate response/error cases. - Move the blocking flush to the beginning of the message processing so any state changes it causes are picked up correctly for that message. - When we handle a response which leaves our buffer empty, don't try and send it. This isn't perfect in that a response which shrinks our buffer below the flush threshold may still end up getting flushed, but at least we won't flush a buffer that's entirely empty.
Added the requested comment, cleaned up the (mis)-use of the |
Turn the "if wouldOverflow then flush" logic into a more complete `waitForSpace` method which handles all of that logic including re-checking error states and buffer sizes as necessary. Extract the "when should we be setting `output`" logic into the end of every single iteration. This requires tracking an additional `timerFired` instance variable on the `brokerProducer`, but is much easier to follow and allows us to be more precise when e.g. a response flushes part but not all of our buffer. Extract the "if this is a chaser, change state" logic out of `retryReason` and rename it to `needsRetry`. This makes it a pure function and avoids unnecessary work in `waitForSpace`.
1480a1f
to
5fc13db
Compare
OK, another refactor and I think I'm happy with this again. This most recent commit ended up solving another problem with the version currently in master: if a response is received which invalidates some of the messages we have buffered to the point where we no longer meet our flush conditions, we now stop trying to flush until the conditions are re-met. |
Worth noting: this morning's rework has partially lost us benefit point 3 (retry state being handled message-at-a-time) since we have to handle responses arriving when a batch is already collected. I think the only way around that would be to not collect batches while a response is pending, which is of course unacceptably slow in the common success case. |
I am going to extract this into several stand-alone chunks which should be easier to reason about and review. |
First stand-alone chunk extracted from #544. This introduces and uses a produceSet structure which takes care of collecting messages, rejecting ones which don't encode, and turning the rest into a `ProduceRequest`. This has several knock-on effects: - we no longer nil out array entries, so nil checks in several places (e.g. `returnErrors`) can be removed - parseResponse gets re-indented since it loops over the partitions in the set via a callback now rather than a pair of nested loops - groupAndFilter is much simpler, a lot of its logic now lives in the produceSet - the flusher has to use the set to return errors/successes/retries, rather than `batch` which may now contain messages not in the eventual request - keyCache and valueCache can be removed
Second stand-alone chunk extracted from #544, (first chunk: #549). This uses the `produceSet` struct in the aggregator as well, and moves the `wouldOverflow` and `readyToFlush` methods to methods on the `produceSet`. Knock-on effects: - now that we do per-partition size tracking in the aggregator we can do much more precise overflow checking (see the compressed-message-batch-size-limit case in `wouldOverflow` which has changed) which will be more efficient in high-volume scenarios - since the produceSet encodes immediately, messages which fail to encode are now rejected from the aggregator and don't count towards batch size - we still have to iterate the messages in the flusher in order to reject those which need retrying due to the state machine; for simplicity I add them to a second produceSet still, which means all messages get encoded twice; this is a definite major performance regression which will go away again in part 3 of this refactor
Second stand-alone chunk extracted from #544, (first chunk: #549). This uses the `produceSet` struct in the aggregator as well, and moves the `wouldOverflow` and `readyToFlush` methods to methods on the `produceSet`. Knock-on effects: - now that we do per-partition size tracking in the aggregator we can do much more precise overflow checking (see the compressed-message-batch-size-limit case in `wouldOverflow` which has changed) which will be more efficient in high-volume scenarios - since the produceSet encodes immediately, messages which fail to encode are now rejected from the aggregator and don't count towards batch size - we still have to iterate the messages in the flusher in order to reject those which need retrying due to the state machine; for simplicity I add them to a second produceSet still, which means all messages get encoded twice; this is a definite major performance regression which will go away again in part 3 of this refactor
This is an experimental refactor of the entire aggregator/flusher pipeline. If successful it would close #433 as well as properly solve the issue papered over by #538.
I did a decent job of making each commit stand-alone and self-explanatory, so you can read commit-at-a-time, or just treat it as entirely new code. The total diff against the old code is impossible to follow.
High-level, this is what I did:
produceSet
structure which collects messages, rejecting those which fail to encode and incrementally constructing aProduceRequest
out of those which succeed.produceSet
in the aggregator instead of just a[]*ProducerMessage
. Move all the "how many bytes/messages in this batch so far?" logic into theproduceSet
.brokerProducer
. Revive the flusher as a trivial goroutine whose sole purpose is to let the aggregatorselect
on the result of its network calls.This new layout has a number of benefits:
produceSet
, which makes them much easier to track vs. having them spread across a slice, a map, and a long-lived request object.produceSet
when they encode properly, which means that a whole raft of logic can go away around handling requests which encoded successfully but contained no successful messages. This also means that we no longer need to nil out array entries for failed messages, getting rid of another bunch of ugly logic.brokerProducer
, rather than batch-at-a-time as they enter theflusher
. This is much easier to follow, cleans up a few hacks that were necessary for the aggregator to correctly pass along flagged messages, and makes for a sane state machine that clearly terminates in all cases.produceSet
can track how many bytes it has accumulated in each individual partition. It also has access to the actual encoded length, not just the estimate provided by theEncoder
interface. This makes it more precise, and lets it accumulate much larger batches safely. In particular, this lets it accumulate requests larger thanMaxMessageSize
(usually 1MB) when using compression.@wvanbergen
cc @kvs @andremedeiros @bai @cep21
P.S. I don't want to make any claims about performance without measurements to back them up, but in theory this is a performance win too: we do fewer passes over each message, we generate less garbage for the collector, and we do more work without needing to context-switch.