-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New output buffer implementation #5002
Conversation
e175a9a
to
4a4e2bf
Compare
4a4e2bf
to
4fe40f0
Compare
Note: Github, in its infinite wisdom, has shuffled the commit order. |
4fe40f0
to
a4c1581
Compare
@haozhun can you review "Add new PartitionedBuffer" and "Add new BroadcastBuffer" |
|
||
public static OutputBuffers createInitialEmptyOutputBuffers() | ||
{ | ||
return new OutputBuffers(0, false, ImmutableMap.<TaskId, Integer>of()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ImmutableMap.of()
will just work
I renamed the classes to make it clear which ones are specific to the old SharedBuffer code and which ones are generic. I also named OutputBuffer implementations to end with "OutputBuffer" for clarity.
I reworked the URI and BufferId management logic in the query scheduler. The key insight was that I could force the partition number and the taskId to be the same for partitioned outputs, with some minor changes. With that, I didn't need most of my changes, so the old code is restored:
Finally, when working on the new BroadcastOutputBuffer, I discovered that I could reuse the internal PartitionedOutputBuffer.Partition object in BroadcastOutputBuffer by adding reference counting. I pulled out this class into a ClientBuffer and added tests for the client protocol. I'll squash the first two commits:
|
Note: Please squash "Extract PartitionedOutputBuffer.Partition into ClientBuffer". For these multi-threaded code, I find it easier to reason about correctness when I have the full picture. |
* Declare out buffers immediately for non-broadcast stages
* PartitionedOutputBufferManager.addOutputBuffers
* Validate that noMoreBuffer does not change from `true` to `false`, unless there's a valid use case to do that.
* Calling `withBuffer` a bunch of times will allocate a bunch of objects and create the `version` a lot. I guess these don't really matter. But I would create a map and then pass the map in just to be on the safe side. The The real issues is with the version number which the * Move shared buffer not-full notification to a new thread
* Question about `OutputBufferMemoryManager`:
* Is there any particular reason `updateMemoryUsage` submit the job outside the sync block and `setNoBlockOnFull` submit the job inside the sync block? No, I simplified the code. * Add new PartitionedOutputBuffer
* Why is calling `checkFlushComplete` necessary/useful in `setNoMorePages` or the constructor?
* Add reference counting to ClientBuffer
* In `destroy`, `pendingRead` should be completed with `emptyResults(taskInstanceId, sequenceId, true)` This doesn't matter. There is only one client and the destroy comes from the client. This has the benefit of keeping the API surface area smaller. |
I will merge once the current release goes out |
Rename PartitionBuffer to SharedOutputBufferPartition
Field was being read in synchronized and unsynchronized contexts.
LazyOutputBuffer delays buffer creation until query plan is recieved so we can select different buffer implementations. Add config option to enable new buffer implementation.
No description provided.