Skip to content

Commit

Permalink
ARROW-11066: [Java][FlightRPC] fix zero-copy optimization
Browse files Browse the repository at this point in the history
It turns out the zero-copy optimization was never actually applied in Java. Fixing this brings us +50% throughput on localhost.

Before:

```
Transferred 100000000 records totaling 3200000000 bytes at 329.056671 MiB/s. 10782528.996111 record/s. 2633.309231 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 518.238607 MiB/s. 16981642.690398 record/s. 4147.256778 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 614.855615 MiB/s. 20147588.800460 record/s. 4920.444137 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 614.137783 MiB/s. 20124066.880701 record/s. 4914.699614 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 650.376169 MiB/s. 21311526.320277 record/s. 5204.700958 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 633.765603 MiB/s. 20767231.292374 record/s. 5071.773226 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 639.984121 MiB/s. 20970999.671369 record/s. 5121.537540 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 601.255186 MiB/s. 19701929.947094 record/s. 4811.605332 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 593.782434 MiB/s. 19457062.784742 record/s. 4751.803873 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 610.451501 MiB/s. 20003274.776120 record/s. 4885.199766 batch/s.
Summary:
Average throughput: 580.590369 MiB/s, standard deviation: 90.650159 MiB/s
```

After:
```
Transferred 100000000 records totaling 3200000000 bytes at 411.363707 MiB/s. 13479565.944378 record/s. 3291.979595 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 709.494366 MiB/s. 23248711.374843 record/s. 5677.800292 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 870.269771 MiB/s. 28516999.860372 record/s. 6964.421706 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 963.586464 MiB/s. 31574801.258002 record/s. 7711.197963 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 924.437899 MiB/s. 30291981.064273 record/s. 7397.907616 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 954.509764 MiB/s. 31277375.943584 record/s. 7638.560753 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 966.933269 MiB/s. 31684469.365217 record/s. 7737.981108 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 884.759214 MiB/s. 28991789.926919 record/s. 7080.374936 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 962.418507 MiB/s. 31536529.643890 record/s. 7701.851270 batch/s.
Transferred 100000000 records totaling 3200000000 bytes at 997.708658 MiB/s. 32692917.301419 record/s. 7984.264263 batch/s.
Summary:
Average throughput: 864.548162 MiB/s, standard deviation: 170.041331 MiB/s
```

I looked at adding a unit test, but we'd need reflection to construct the internal gRPC classes. IMO, we're better off establishing benchmarks to guard against a regression here.

Closes apache#9354 from lidavidm/arrow-11066

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: Micah Kornfield <emkornfield@gmail.com>
  • Loading branch information
lidavidm authored and emkornfield committed Jan 31, 2021
1 parent e0b3c9a commit c3e3073
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.arrow.flight.grpc;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -88,11 +87,8 @@ public class AddWritableBuffer {
* @param buf The buffer to add.
* @param stream The Candidate OutputStream to add to.
* @return True if added. False if not possible.
* @throws IOException on error
*/
public static boolean add(ByteBuf buf, OutputStream stream) throws IOException {
buf.readBytes(stream, buf.readableBytes());

public static boolean add(ByteBuf buf, OutputStream stream) {
if (bufChainOut == null) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void register(FlightProducer.ServerStreamListener listener) {

@Override
public WaitResult waitForListener(long timeout) {
while (!listener.isReady()) {
while (!listener.isReady() && !listener.isCancelled()) {
// busy wait
}
return WaitResult.READY;
Expand Down

0 comments on commit c3e3073

Please sign in to comment.