Skip to content

Commit 4abba3e

Browse files
committed
PARQUET-787: Update byte buffer input streams for review comments.
1 parent e7c6c5d commit 4abba3e

File tree

2 files changed

+19
-11
lines changed

2 files changed

+19
-11
lines changed

parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
2525
import java.util.ArrayList;
26-
import java.util.Collection;
2726
import java.util.Collections;
2827
import java.util.Iterator;
2928
import java.util.List;
@@ -32,7 +31,7 @@
3231
class MultiBufferInputStream extends ByteBufferInputStream {
3332
private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
3433

35-
private final Collection<ByteBuffer> buffers;
34+
private final List<ByteBuffer> buffers;
3635
private final long length;
3736

3837
private Iterator<ByteBuffer> iterator;
@@ -43,7 +42,7 @@ class MultiBufferInputStream extends ByteBufferInputStream {
4342
private long markLimit = 0;
4443
private List<ByteBuffer> markBuffers = new ArrayList<>();
4544

46-
MultiBufferInputStream(Collection<ByteBuffer> buffers) {
45+
MultiBufferInputStream(List<ByteBuffer> buffers) {
4746
this.buffers = buffers;
4847

4948
long totalLen = 0;
@@ -171,10 +170,11 @@ public List<ByteBuffer> sliceBuffers(long len) throws EOFException {
171170
}
172171

173172
List<ByteBuffer> buffers = new ArrayList<>();
174-
int bytesAccumulated = 0;
173+
long bytesAccumulated = 0;
175174
while (bytesAccumulated < len) {
176175
if (current.remaining() > 0) {
177176
// get a slice of the current buffer to return
177+
// always fits in an int because remaining returns an int that is >= 0
178178
int bufLen = (int) Math.min(len - bytesAccumulated, current.remaining());
179179
ByteBuffer slice = current.duplicate();
180180
slice.limit(slice.position() + bufLen);
@@ -211,6 +211,9 @@ public List<ByteBuffer> remainingBuffers() {
211211
@Override
212212
public int read(byte[] bytes, int off, int len) {
213213
if (len <= 0) {
214+
if (len < 0) {
215+
throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len);
216+
}
214217
return 0;
215218
}
216219

@@ -267,7 +270,7 @@ public int available() {
267270
}
268271

269272
@Override
270-
public synchronized void mark(int readlimit) {
273+
public void mark(int readlimit) {
271274
if (mark >= 0) {
272275
discardMark();
273276
}
@@ -279,7 +282,7 @@ public synchronized void mark(int readlimit) {
279282
}
280283

281284
@Override
282-
public synchronized void reset() throws IOException {
285+
public void reset() throws IOException {
283286
if (mark >= 0 && position < markLimit) {
284287
this.position = mark;
285288
// replace the current iterator with one that adds back the buffers that
@@ -288,11 +291,11 @@ public synchronized void reset() throws IOException {
288291
discardMark();
289292
nextBuffer(); // go back to the marked buffers
290293
} else {
291-
throw new IOException("No mark defined");
294+
throw new IOException("No mark defined or has read past the previous mark limit");
292295
}
293296
}
294297

295-
private synchronized void discardMark() {
298+
private void discardMark() {
296299
this.mark = -1;
297300
this.markLimit = 0;
298301
markBuffers = new ArrayList<>();
@@ -353,13 +356,18 @@ public boolean hasNext() {
353356

354357
@Override
355358
public E next() {
356-
if (!hasNext()) {
359+
if (useFirst && !first.hasNext()) {
360+
useFirst = false;
361+
}
362+
363+
if (!useFirst && !second.hasNext()) {
357364
throw new NoSuchElementException();
358365
}
359366

360367
if (useFirst) {
361368
return first.next();
362369
}
370+
363371
return second.next();
364372
}
365373

parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,12 @@ public List<ByteBuffer> remainingBuffers() {
151151
}
152152

153153
@Override
154-
public synchronized void mark(int readlimit) {
154+
public void mark(int readlimit) {
155155
this.mark = buffer.position();
156156
}
157157

158158
@Override
159-
public synchronized void reset() throws IOException {
159+
public void reset() throws IOException {
160160
if (mark >= 0) {
161161
buffer.position(mark);
162162
this.mark = -1;

0 commit comments

Comments
 (0)