@@ -36,6 +36,7 @@ final class GzipReadableByteChannel implements UnbufferedReadableByteChannel {
36
36
37
37
private boolean retEOF = false ;
38
38
private ScatteringByteChannel delegate ;
39
+ private ByteBuffer leftovers ;
39
40
40
41
GzipReadableByteChannel (UnbufferedReadableByteChannel source , ApiFuture <String > contentEncoding ) {
41
42
this .source = source ;
@@ -51,11 +52,11 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
51
52
// if our delegate is null, that means this is the first read attempt
52
53
if (delegate == null ) {
53
54
// try to determine if the underlying data coming out of `source` is gzip
54
- byte [] first4 = new byte [4 ]; // 4 bytes = 32-bits
55
- final ByteBuffer wrap = ByteBuffer .wrap (first4 );
56
- // Step 1: initiate a read of the first 4 bytes of the object
55
+ byte [] firstByte = new byte [1 ];
56
+ ByteBuffer wrap = ByteBuffer .wrap (firstByte );
57
+ // Step 1: initiate a read of the first byte of the object
57
58
// this will have minimal overhead as the messages coming from gcs are inherently windowed
58
- // if the object size is between 5 and 2MiB the remaining bytes will be held in the channel
59
+ // if the object size is between 2 and 2MiB the remaining bytes will be held in the channel
59
60
// for later read.
60
61
source .read (wrap );
61
62
try {
@@ -65,13 +66,13 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
65
66
// this will have a copy impact as we are no longer controlling all the buffers
66
67
if ("gzip" .equals (contentEncoding ) || "x-gzip" .equals (contentEncoding )) {
67
68
// to wire gzip decompression into the byte path:
68
- // Create an input stream of the first4 bytes we already read
69
- ByteArrayInputStream first4again = new ByteArrayInputStream (first4 );
69
+ // Create an input stream of the firstByte bytes we already read
70
+ ByteArrayInputStream firstByteAgain = new ByteArrayInputStream (firstByte );
70
71
// Create an InputStream facade of source
71
72
InputStream sourceInputStream = Channels .newInputStream (source );
72
- // create a new InputStream with the first4 bytes prepended to source
73
+ // create a new InputStream with the firstByte bytes prepended to source
73
74
SequenceInputStream first4AndSource =
74
- new SequenceInputStream (first4again , sourceInputStream );
75
+ new SequenceInputStream (firstByteAgain , sourceInputStream );
75
76
// add gzip decompression
76
77
GZIPInputStream decompress =
77
78
new GZIPInputStream (new OptimisticAvailabilityInputStream (first4AndSource ));
@@ -84,14 +85,22 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
84
85
// to source
85
86
wrap .flip ();
86
87
bytesRead += Buffers .copy (wrap , dsts , offset , length );
88
+ if (wrap .hasRemaining ()) {
89
+ leftovers = wrap ;
90
+ }
87
91
delegate = source ;
88
92
}
89
93
} catch (InterruptedException | ExecutionException e ) {
90
94
throw new IOException (e );
91
95
}
96
+ } else if (leftovers != null && leftovers .hasRemaining ()) {
97
+ bytesRead += Buffers .copy (leftovers , dsts , offset , length );
98
+ if (!leftovers .hasRemaining ()) {
99
+ leftovers = null ;
100
+ }
92
101
}
93
102
94
- // Because we're pre-reading a few bytes of the object in order to determine if we need to
103
+ // Because we're pre-reading a byte of the object in order to determine if we need to
95
104
// plumb in gzip decompress, there is the possibility we will reach EOF while probing.
96
105
// In order to maintain correctness of EOF propagation, determine if we will need to signal EOF
97
106
// upon the next read.
0 commit comments