diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java index 48855a0323..fc0f80e2af 100644 --- a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java +++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java @@ -76,6 +76,17 @@ public void testFilterMatchesMultiple() throws IOException { assertNull(reader.read()); } + @Test + public void testFilterMatchesMultipleBlocks() throws IOException { + Path path = writeCarsToParquetFile(10000, CompressionCodecName.UNCOMPRESSED, false, DEFAULT_BLOCK_SIZE/64, DEFAULT_PAGE_SIZE/64); + ParquetReader reader = new AvroParquetReader(path, column("make", equalTo("Volkswagen"))); + for (int i = 0; i < 10000; i++) { + assertEquals(getVwPolo().toString(), reader.read().toString()); + assertEquals(getVwPassat().toString(), reader.read().toString()); + } + assertNull(reader.read()); + } + @Test public void testFilterWithDictionary() throws IOException { Path path = writeCarsToParquetFile(1,CompressionCodecName.UNCOMPRESSED,true); @@ -159,6 +170,10 @@ public void testAvroReadSchema() throws IOException { } private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary) throws IOException { + return writeCarsToParquetFile(num, compression, enableDictionary, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); + } + + private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary, int blockSize, int pageSize) throws IOException { File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); tmp.deleteOnExit(); tmp.delete(); @@ -169,7 +184,7 @@ private Path writeCarsToParquetFile( int num, CompressionCodecName compression, Car bmwMini = getBmwMini(); ParquetWriter writer = new AvroParquetWriter(path,Car.SCHEMA$, compression, - DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary); + blockSize, pageSize, enableDictionary); for (int i = 0; i < num; i++) { writer.write(vwPolo); writer.write(vwPassat); diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java index f1c952cfd7..8f7650b374 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java @@ -52,7 +52,7 @@ class InternalParquetRecordReader { private T currentValue; private long total; - private int current = 0; + private long current = 0; private int currentBlock = -1; private ParquetFileReader reader; private parquet.io.RecordReader recordReader; @@ -172,6 +172,10 @@ public boolean nextKeyValue() throws IOException, InterruptedException { checkRead(); currentValue = recordReader.read(); if (DEBUG) LOG.debug("read value: " + currentValue); + if (currentValue == null) { // only happens with FilteredRecordReader at end of block + current = totalCountLoadedSoFar; + return nextKeyValue(); + } current ++; } catch (RuntimeException e) { throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);