diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index e52e2e49a56..841ad7ca306 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -33,6 +33,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.annotation.JsonCreator; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -52,8 +53,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HTable; -import parquet.org.codehaus.jackson.annotate.JsonCreator; - import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java index 96ae257001e..08ea7f12ede 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java @@ -122,7 +122,7 @@ public static class HBaseSubScanSpec { protected byte[] stopRow; protected byte[] serializedFilter; - @parquet.org.codehaus.jackson.annotate.JsonCreator + @JsonCreator public HBaseSubScanSpec(@JsonProperty("tableName") String tableName, @JsonProperty("regionServer") String regionServer, @JsonProperty("startRow") byte[] startRow, diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java index 954fc4680a3..1e6ad99a985 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java @@ -37,7 +37,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.hadoop.conf.Configuration; @@ -50,9 +50,10 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; @SuppressWarnings("unused") public class HiveDrillNativeScanBatchCreator implements BatchCreator { @@ -125,7 +126,8 @@ public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan context, Path.getPathWithoutSchemeAndAuthority(finalPath).toString(), rowGroupNum, fs, - new DirectCodecFactory(fs.getConf(), oContext.getAllocator()), + CodecFactory.createDirectCodecFactory(fs.getConf(), + new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0), parquetMetadata, newColumns) ); diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java index 5c31baf1a32..92a2bc74a5f 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java @@ -31,6 +31,11 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.commons.lang3.StringUtils; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -53,12 +58,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import parquet.org.codehaus.jackson.annotate.JsonCreator; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java index fb6e09517b9..f3b7d4fd2b8 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java @@ -132,7 +132,7 @@ public static class MongoSubScanSpec { protected BasicDBObject filter; - @parquet.org.codehaus.jackson.annotate.JsonCreator + @JsonCreator public MongoSubScanSpec(@JsonProperty("dbName") String dbName, @JsonProperty("collectionName") String collectionName, @JsonProperty("hosts") List hosts, diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index c79f902c440..467b755f0be 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -159,7 +159,7 @@ test - com.twitter + org.apache.parquet parquet-column ${parquet.version} @@ -174,7 +174,7 @@ - com.twitter + org.apache.parquet parquet-hadoop ${parquet.version} @@ -189,9 +189,9 @@ - com.twitter + org.apache.parquet parquet-format - 2.1.1-drill-r1 + 2.3.0-incubating org.apache.hadoop @@ -204,7 +204,7 @@ - com.twitter + org.apache.parquet parquet-common ${parquet.version} @@ -219,7 +219,7 @@ - com.twitter + org.apache.parquet parquet-jackson ${parquet.version} @@ -234,7 +234,7 @@ - com.twitter + org.apache.parquet parquet-encoding ${parquet.version} @@ -249,7 +249,7 @@ - com.twitter + org.apache.parquet parquet-generator ${parquet.version} diff --git a/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java index 6a713a38434..b6b0e9c1456 100644 --- a/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java @@ -22,36 +22,13 @@ package org.apache.drill.exec.store; -import com.fasterxml.jackson.core.JsonGenerator; -import com.google.common.collect.Lists; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.expr.holders.*; -import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; -import org.apache.drill.exec.store.parquet.ParquetTypeHelper; -import org.apache.drill.exec.vector.*; -import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.complex.reader.FieldReader; -import parquet.io.api.RecordConsumer; -import parquet.schema.MessageType; -import parquet.io.api.Binary; -import io.netty.buffer.ByteBuf; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.vector.complex.fn.JsonOutput; - - -import org.apache.drill.common.types.TypeProtos; - -import org.joda.time.DateTimeUtils; - import java.io.IOException; import java.lang.UnsupportedOperationException; -import java.util.Arrays; import java.util.List; -import java.util.Map; /** * Abstract implementation of RecordWriter interface which exposes interface: diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java index 787edc2dee8..74af3eac052 100644 --- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java @@ -17,7 +17,7 @@ */ import org.joda.time.DateTimeUtils; -import parquet.io.api.Binary; +import org.apache.parquet.io.api.Binary; import java.lang.Override; import java.lang.RuntimeException; @@ -39,9 +39,9 @@ import org.apache.drill.exec.vector.*; import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.complex.reader.FieldReader; -import parquet.io.api.RecordConsumer; -import parquet.schema.MessageType; -import parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.io.api.Binary; import io.netty.buffer.DrillBuf; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; diff --git a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java index 47881bcec66..ecd1d3e77a0 100644 --- a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java +++ b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java @@ -17,10 +17,10 @@ */ import org.apache.drill.common.types.MinorType; -import parquet.format.ConvertedType; -import parquet.schema.DecimalMetadata; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.format.ConvertedType; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; <@pp.dropOutputFile /> <@pp.changeOutputFile name="org/apache/drill/exec/store/parquet/ParquetTypeHelper.java" /> @@ -31,10 +31,10 @@ import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.MaterializedField; -import parquet.schema.OriginalType; -import parquet.schema.DecimalMetadata; -import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type.Repetition; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; import java.util.HashMap; import java.util.Map; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 49e81ec9762..cd94274021c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -78,7 +78,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.calcite.rel.RelFieldCollation.Direction; -import parquet.Preconditions; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.sun.codemodel.JConditional; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java index ee319296d4c..ddbbe7dc563 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java @@ -46,7 +46,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import parquet.Preconditions; +import com.google.common.base.Preconditions; @XmlRootElement public class QueryWrapper { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java index 262c94ca9a2..b1cdfdbe5a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java @@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FSDataInputStream; -import parquet.bytes.BytesInput; -import parquet.format.PageHeader; -import parquet.format.Util; -import parquet.hadoop.util.CompatibilityUtil; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.util.CompatibilityUtil; public class ColumnDataReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class); @@ -46,6 +46,10 @@ public PageHeader readPageHeader() throws IOException{ return Util.readPageHeader(input); } + public FSDataInputStream getInputStream() { + return input; + } + public BytesInput getPageAsBytesInput(int pageLength) throws IOException{ byte[] b = new byte[pageLength]; input.read(b); @@ -55,8 +59,9 @@ public BytesInput getPageAsBytesInput(int pageLength) throws IOException{ public void loadPage(DrillBuf target, int pageLength) throws IOException { target.clear(); ByteBuffer directBuffer = target.nioBuffer(0, pageLength); - while (directBuffer.remaining() > 0) { - CompatibilityUtil.getBuf(input, directBuffer, directBuffer.remaining()); + int lengthLeftToRead = pageLength; + while (lengthLeftToRead > 0) { + lengthLeftToRead -= CompatibilityUtil.getBuf(input, directBuffer, lengthLeftToRead); } target.writerIndex(pageLength); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java deleted file mode 100644 index 7abe05a1e05..00000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.parquet; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.DrillBuf; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; - -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.DirectDecompressor; -import org.xerial.snappy.Snappy; - -import parquet.bytes.ByteBufferAllocator; -import parquet.bytes.BytesInput; -import parquet.hadoop.CodecFactory; -import parquet.hadoop.CodecFactory.BytesCompressor; -import parquet.hadoop.HeapCodecFactory.HeapBytesCompressor; -import parquet.hadoop.HeapCodecFactory.HeapBytesDecompressor; -import parquet.hadoop.metadata.CompressionCodecName; - -import com.google.common.base.Preconditions; - -public class DirectCodecFactory extends CodecFactory implements AutoCloseable { - - private final ByteBufferAllocator allocator; - - public DirectCodecFactory(Configuration config, ByteBufferAllocator allocator) { - super(config); - Preconditions.checkNotNull(allocator); - this.allocator = allocator; - } - - public DirectCodecFactory(Configuration config, BufferAllocator allocator) { - this(config, new ParquetDirectByteBufferAllocator(allocator)); - } - - private ByteBuffer ensure(ByteBuffer buffer, int size) { - if (buffer == null) { - buffer = allocator.allocate(size); - } else if (buffer.capacity() >= size) { - buffer.clear(); - } else { - allocator.release(buffer); - release(buffer); - buffer = allocator.allocate(size); - } - return buffer; - } - - ByteBuffer release(ByteBuffer buffer) { - if (buffer != null) { - allocator.release(buffer); - } - return null; - } - - @Override - protected BytesCompressor createCompressor(final CompressionCodecName codecName, final CompressionCodec codec, - int pageSize) { - - if (codec == null) { - return new NoopCompressor(); - } else if (codecName == CompressionCodecName.SNAPPY) { - // avoid using the Parquet Snappy codec since it allocates direct buffers at awkward spots. - return new SnappyCompressor(); - } else { - - // todo: move zlib above since it also generates allocateDirect calls. - return new HeapBytesCompressor(codecName, codec, pageSize); - } - } - - @Override - protected DirectBytesDecompressor createDecompressor(final CompressionCodec codec) { - // This is here so that debugging can be done if we see inconsistencies between our decompression and upstream - // decompression. - // if (true) { - // return new HeapFakeDirect(codec); - // } - - if (codec == null) { - return new NoopDecompressor(); - } else if (DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) { - return new FullDirectDecompressor(codec); - } else { - return new IndirectDecompressor(codec); - } - } - - public void close() { - release(); - } - - /** - * Keeping this here for future debugging versus using custom implementations below. - */ - private class HeapFakeDirect extends DirectBytesDecompressor { - - private final ExposedHeapBytesDecompressor innerCompressor; - - public HeapFakeDirect(CompressionCodec codec){ - innerCompressor = new ExposedHeapBytesDecompressor(codec); - } - - @Override - public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) - throws IOException { - BytesInput uncompressed = decompress(new ByteBufBytesInput(input), uncompressedSize); - output.clear(); - output.setBytes(0, uncompressed.toByteArray()); - output.writerIndex((int) uncompressed.size()); - } - - @Override - public BytesInput decompress(BytesInput paramBytesInput, int uncompressedSize) throws IOException { - return innerCompressor.decompress(paramBytesInput, uncompressedSize); - } - - @Override - protected void release() { - innerCompressor.release(); - } - - } - - private class ExposedHeapBytesDecompressor extends HeapBytesDecompressor { - public ExposedHeapBytesDecompressor(CompressionCodec codec) { - super(codec); - } - - public void release() { - super.release(); - } - } - - public class IndirectDecompressor extends DirectBytesDecompressor { - private final Decompressor decompressor; - - public IndirectDecompressor(CompressionCodec codec) { - this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor(); - } - - @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - decompressor.reset(); - byte[] inputBytes = bytes.toByteArray(); - decompressor.setInput(inputBytes, 0, inputBytes.length); - byte[] output = new byte[uncompressedSize]; - decompressor.decompress(output, 0, uncompressedSize); - return BytesInput.from(output); - } - - @Override - public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) - throws IOException { - - decompressor.reset(); - byte[] inputBytes = new byte[input.capacity()]; - input.getBytes(0, inputBytes); - decompressor.setInput(inputBytes, 0, inputBytes.length); - byte[] outputBytes = new byte[uncompressedSize]; - decompressor.decompress(outputBytes, 0, uncompressedSize); - output.clear(); - output.writeBytes(outputBytes); - } - - @Override - protected void release() { - DirectCodecPool.INSTANCE.returnDecompressor(decompressor); - } - } - - public class FullDirectDecompressor extends DirectBytesDecompressor { - private final DirectDecompressor decompressor; - private ByteBuffer compressedBuffer; - private ByteBuffer uncompressedBuffer; - private ExposedHeapBytesDecompressor extraDecompressor; - public FullDirectDecompressor(CompressionCodec codec){ - this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor(); - this.extraDecompressor = new ExposedHeapBytesDecompressor(codec); - } - - @Override - public BytesInput decompress(BytesInput compressedBytes, int uncompressedSize) throws IOException { - - if(false){ - // TODO: fix direct path. (currently, this code is causing issues when writing complex Parquet files. - ByteBuffer bufferIn = compressedBytes.toByteBuffer(); - uncompressedBuffer = ensure(uncompressedBuffer, uncompressedSize); - uncompressedBuffer.clear(); - - if (bufferIn.isDirect()) { - decompressor.decompress(bufferIn, uncompressedBuffer); - } else { - compressedBuffer = ensure(this.compressedBuffer, (int) compressedBytes.size()); - compressedBuffer.clear(); - compressedBuffer.put(bufferIn); - compressedBuffer.flip(); - decompressor.decompress(compressedBuffer, uncompressedBuffer); - } - return BytesInput.from(uncompressedBuffer, 0, uncompressedSize); - - } else { - return extraDecompressor.decompress(compressedBytes, uncompressedSize); - } - - - } - - - @Override - public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) - throws IOException { - output.clear(); - decompressor.decompress(input.nioBuffer(0, compressedSize), output.nioBuffer(0, uncompressedSize)); - output.writerIndex(uncompressedSize); - } - - @Override - protected void release() { - compressedBuffer = DirectCodecFactory.this.release(compressedBuffer); - uncompressedBuffer = DirectCodecFactory.this.release(uncompressedBuffer); - DirectCodecPool.INSTANCE.returnDecompressor(decompressor); - extraDecompressor.release(); - } - - } - - public class NoopDecompressor extends DirectBytesDecompressor { - - @Override - public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) - throws IOException { - Preconditions.checkArgument(compressedSize == uncompressedSize, - "Non-compressed data did not have matching compressed and uncompressed sizes."); - output.clear(); - output.writeBytes(input, compressedSize); - } - - @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - return bytes; - } - - @Override - protected void release() { - } - - } - - public class SnappyCompressor extends BytesCompressor { - - private ByteBuffer incoming; - private ByteBuffer outgoing; - - public SnappyCompressor() { - super(); - } - - @Override - public BytesInput compress(BytesInput bytes) throws IOException { - int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size()); - ByteBuffer bufferIn = bytes.toByteBuffer(); - outgoing = ensure(outgoing, maxOutputSize); - final int size; - if (bufferIn.isDirect()) { - size = Snappy.compress(bufferIn, outgoing); - } else { - this.incoming = ensure(this.incoming, (int) bytes.size()); - this.incoming.put(bufferIn); - this.incoming.flip(); - size = Snappy.compress(this.incoming, outgoing); - } - - return BytesInput.from(outgoing, 0, (int) size); - } - - @Override - public CompressionCodecName getCodecName() { - return CompressionCodecName.SNAPPY; - } - - @Override - protected void release() { - outgoing = DirectCodecFactory.this.release(outgoing); - incoming = DirectCodecFactory.this.release(incoming); - } - - } - - public static class NoopCompressor extends BytesCompressor { - - @Override - public BytesInput compress(BytesInput bytes) throws IOException { - return bytes; - } - - @Override - public CompressionCodecName getCodecName() { - return CompressionCodecName.UNCOMPRESSED; - } - - @Override - protected void release() { - } - - } - - public static class ByteBufBytesInput extends BytesInput { - private final ByteBuf buf; - private final int length; - - public ByteBufBytesInput(ByteBuf buf) { - this(buf, 0, buf.capacity()); - } - - public ByteBufBytesInput(ByteBuf buf, int offset, int length) { - super(); - if(buf.capacity() == length && offset == 0){ - this.buf = buf; - }else{ - this.buf = buf.slice(offset, length); - } - - this.length = length; - } - - @Override - public void writeAllTo(OutputStream out) throws IOException { - final WritableByteChannel outputChannel = Channels.newChannel(out); - outputChannel.write(buf.nioBuffer()); - } - - @Override - public ByteBuffer toByteBuffer() throws IOException { - return buf.nioBuffer(); - } - - @Override - public long size() { - return length; - } - } - - - public abstract class DirectBytesDecompressor extends CodecFactory.BytesDecompressor { - public abstract void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize) - throws IOException; - } - - - -} - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java deleted file mode 100644 index 26d97c96978..00000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.parquet; - -import java.util.Collections; -import java.util.Map; - -import org.apache.commons.pool.BasePoolableObjectFactory; -import org.apache.commons.pool.impl.GenericObjectPool; -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.DirectDecompressionCodec; -import org.apache.hadoop.io.compress.DirectDecompressor; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -public class DirectCodecPool { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectCodecPool.class); - - public static final DirectCodecPool INSTANCE = new DirectCodecPool(); - - @SuppressWarnings("unchecked") - private final Map codecs = (Map) (Object) Collections.synchronizedMap(Maps.newHashMap()); - - @SuppressWarnings("unchecked") - private final Map, GenericObjectPool> directDePools = (Map, GenericObjectPool>) (Object) Collections - .synchronizedMap(Maps.newHashMap()); - private final Map, GenericObjectPool> dePools = (Map, GenericObjectPool>) (Object) Collections - .synchronizedMap(Maps.newHashMap()); - private final Map, GenericObjectPool> cPools = (Map, GenericObjectPool>) (Object) Collections - .synchronizedMap(Maps.newHashMap()); - - private DirectCodecPool() { - } - - public class CodecPool { - private final GenericObjectPool compressorPool; - private final GenericObjectPool decompressorPool; - private final GenericObjectPool directDecompressorPool; - private final boolean supportDirectDecompressor; - - private CodecPool(final CompressionCodec codec){ - try { - boolean supportDirectDecompressor = codec instanceof DirectDecompressionCodec; - compressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { - public Object makeObject() throws Exception { - return codec.createCompressor(); - } - }, Integer.MAX_VALUE); - - Object com = compressorPool.borrowObject(); - if (com != null) { - cPools.put(com.getClass(), compressorPool); - compressorPool.returnObject(com); - }else{ - logger.warn("Unable to find compressor for codec {}", codec.getClass().getName()); - } - - decompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { - public Object makeObject() throws Exception { - return codec.createDecompressor(); - } - }, Integer.MAX_VALUE); - - Object decom = decompressorPool.borrowObject(); - if (decom != null) { - dePools.put(decom.getClass(), decompressorPool); - decompressorPool.returnObject(decom); - } else { - logger.warn("Unable to find decompressor for codec {}", codec.getClass().getName()); - } - - if (supportDirectDecompressor) { - directDecompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { - public Object makeObject() throws Exception { - return ((DirectDecompressionCodec) codec).createDirectDecompressor(); - } - }, Integer.MAX_VALUE); - - Object ddecom = directDecompressorPool.borrowObject(); - if (ddecom != null) { - directDePools.put(ddecom.getClass(), directDecompressorPool); - directDecompressorPool.returnObject(ddecom); - - } else { - supportDirectDecompressor = false; - logger.warn("Unable to find direct decompressor for codec {}", codec.getClass().getName()); - } - - } else { - directDecompressorPool = null; - } - - this.supportDirectDecompressor = supportDirectDecompressor; - } catch (Exception e) { - throw new DrillRuntimeException(e); - } - } - - public DirectDecompressor borrowDirectDecompressor(){ - Preconditions.checkArgument(supportDirectDecompressor, "Tried to get a direct Decompressor from a non-direct codec."); - try { - return (DirectDecompressor) directDecompressorPool.borrowObject(); - } catch (Exception e) { - throw new DrillRuntimeException(e); - } - } - - public boolean supportsDirectDecompression() { - return supportDirectDecompressor; - } - - public Decompressor borrowDecompressor(){ - try { - return (Decompressor) decompressorPool.borrowObject(); - } catch (Exception e) { - throw new DrillRuntimeException(e); - } - } - - public Compressor borrowCompressor(){ - try { - return (Compressor) compressorPool.borrowObject(); - } catch (Exception e) { - throw new DrillRuntimeException(e); - } - } - } - - public CodecPool codec(CompressionCodec codec){ - CodecPool pools = codecs.get(codec); - if(pools == null){ - synchronized(this){ - pools = codecs.get(codec); - if(pools == null){ - pools = new CodecPool(codec); - codecs.put(codec, pools); - } - } - } - return pools; - } - - private void returnToPool(Object obj, Map, GenericObjectPool> pools) { - try { - GenericObjectPool pool = pools.get(obj.getClass()); - if (pool == null) { - throw new IllegalStateException("Received unexpected decompressor."); - } - pool.returnObject(obj); - } catch (Exception e) { - throw new DrillRuntimeException(e); - } - - } - - public void returnCompressor(Compressor compressor) { - returnToPool(compressor, cPools); - } - - public void returnDecompressor(Decompressor decompressor) { - returnToPool(decompressor, dePools); - } - - public void returnDecompressor(DirectDecompressor decompressor) { - returnToPool(decompressor, directDePools); - } - -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java index d8495c9d456..b68ffbbd39e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java @@ -31,11 +31,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import parquet.bytes.BytesUtils; -import parquet.hadoop.Footer; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.ParquetFileWriter; -import parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java index 522ff924f97..1196f01526b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java @@ -40,18 +40,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - -import parquet.column.statistics.Statistics; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.io.api.Binary; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; import java.io.IOException; import java.util.ArrayList; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java index 059886d288c..91183151f65 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java @@ -26,7 +26,7 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.OperatorContext; -import parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.ByteBufferAllocator; /** * {@link ByteBufferAllocator} implementation that uses Drill's {@link BufferAllocator} to allocate and release @@ -72,6 +72,11 @@ public void release(ByteBuffer b) { } } + @Override + public boolean isDirect() { + return true; + } + /** * ByteBuffer wrapper that computes a fixed hashcode. *

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index ff32b90e99e..0167700faf6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -58,8 +58,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileWriter; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index a145d793f1e..ce2f845ee90 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -86,9 +87,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.joda.time.DateTimeUtils; - -import parquet.io.api.Binary; -import parquet.org.codehaus.jackson.annotate.JsonCreator; +import org.apache.parquet.io.api.Binary; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.annotation.JacksonInject; @@ -99,9 +98,8 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; - -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; @JsonTypeName("parquet-scan") public class ParquetGroupScan extends AbstractFileGroupScan { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 6c15d3f23e7..f7845661fe7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -43,30 +43,30 @@ import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.vector.BitVector; -import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import parquet.column.ColumnWriteStore; -import parquet.column.ParquetProperties.WriterVersion; -import parquet.column.impl.ColumnWriteStoreV1; -import parquet.column.page.PageWriteStore; -import parquet.hadoop.ColumnChunkPageWriteStoreExposer; -import parquet.hadoop.ParquetFileWriter; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.io.api.RecordConsumer; -import parquet.schema.DecimalMetadata; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type; -import parquet.schema.Type.Repetition; +import org.apache.parquet.column.ColumnWriteStore; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.column.impl.ColumnWriteStoreV1; +import org.apache.parquet.column.page.PageWriteStore; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ColumnChunkPageWriteStoreExposer; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; import com.google.common.collect.Lists; @@ -86,7 +86,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { private boolean enableDictionary = false; private CompressionCodecName codec = CompressionCodecName.SNAPPY; private WriterVersion writerVersion = WriterVersion.PARQUET_1_0; - private DirectCodecFactory codecFactory; + private CodecFactory codecFactory; private long recordCount = 0; private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; @@ -108,7 +108,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{ super(); this.oContext = context.newOperatorContext(writer, true); - this.codecFactory = new DirectCodecFactory(writer.getFormatPlugin().getFsConf(), oContext.getAllocator()); + this.codecFactory = CodecFactory.createDirectCodecFactory(writer.getFormatPlugin().getFsConf(), + new ParquetDirectByteBufferAllocator(oContext.getAllocator()), pageSize); this.partitionColumns = writer.getPartitionColumns(); this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0; } @@ -186,11 +187,11 @@ private void newSchema() throws IOException { int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5); pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext, - codecFactory.getCompressor(codec, pageSize), - schema, - initialBlockBufferSize); + codecFactory.getCompressor(codec), + schema); int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); - store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); + store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, enableDictionary, + writerVersion, new ParquetDirectByteBufferAllocator(oContext)); MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema); consumer = columnIO.getRecordWriter(store); setUp(schema, consumer); @@ -207,12 +208,12 @@ private PrimitiveType getPrimitiveType(MaterializedField field) { return new PrimitiveType(repetition, primitiveTypeName, length, name, originalType, decimalMetadata, null); } - private parquet.schema.Type getType(MaterializedField field) { + private Type getType(MaterializedField field) { MinorType minorType = field.getType().getMinorType(); DataMode dataMode = field.getType().getMode(); switch(minorType) { case MAP: - List types = Lists.newArrayList(); + List types = Lists.newArrayList(); for (MaterializedField childField : field.getChildren()) { types.add(getType(childField)); } @@ -243,6 +244,7 @@ public void checkForNewPartition(int index) { private void flush() throws IOException { if (recordCount > 0) { parquetFileWriter.startBlock(recordCount); + consumer.flush(); store.flush(); ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter); recordCount = 0; @@ -254,7 +256,9 @@ private void flush() throws IOException { } store.close(); - ColumnChunkPageWriteStoreExposer.close(pageStore); + // TODO(jaltekruse) - review this close method should no longer be necessary +// ColumnChunkPageWriteStoreExposer.close(pageStore); + store = null; pageStore = null; index++; @@ -371,6 +375,6 @@ public void abort() throws IOException { public void cleanup() throws IOException { flush(); - codecFactory.close(); + codecFactory.release(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 441a707f574..d743fe17a11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -42,11 +42,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import parquet.column.ColumnDescriptor; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.schema.MessageType; -import parquet.schema.Type; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -129,7 +130,9 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS readers.add( new ParquetRecordReader( context, e.getPath(), e.getRowGroupIndex(), fs, - new DirectCodecFactory(fs.getConf(), oContext.getAllocator()), + CodecFactory.createDirectCodecFactory( + fs.getConf(), + new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0), footers.get(e.getPath()), rowGroupScan.getColumns() ) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java index 986328eb700..b0c5fd09c4a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.parquet; +import com.fasterxml.jackson.annotation.JsonCreator; import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -26,7 +27,7 @@ public class RowGroupReadEntry extends ReadEntryFromHDFS { private int rowGroupIndex; - @parquet.org.codehaus.jackson.annotate.JsonCreator + @JsonCreator public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) { super(path, start, length); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java index 81b80023161..b2a42dc71ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java @@ -21,9 +21,9 @@ import org.apache.drill.exec.vector.BitVector; import org.apache.drill.exec.vector.ValueVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; final class BitReader extends ColumnReader { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java index 5650ae3e316..f62f42424ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java @@ -25,11 +25,11 @@ import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.ValueVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.schema.PrimitiveType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; public abstract class ColumnReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index a13cde32052..532019861d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -46,12 +46,12 @@ import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarCharVector; import org.apache.drill.exec.vector.VariableWidthVector; -import parquet.column.ColumnDescriptor; -import parquet.column.Encoding; -import parquet.format.ConvertedType; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.schema.PrimitiveType; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.format.ConvertedType; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.schema.PrimitiveType; public class ColumnReaderFactory { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java index ef2ae1b0880..76aa073cdc6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java @@ -36,10 +36,10 @@ import org.apache.drill.exec.vector.VariableWidthVector; import org.joda.time.DateTimeUtils; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.io.api.Binary; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.io.api.Binary; class FixedByteAlignedReader extends ColumnReader { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java index d6e30ae36f2..501f5a6c89a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java @@ -24,9 +24,9 @@ import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.apache.drill.exec.vector.UInt4Vector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; public class FixedWidthRepeatedReader extends VarLengthColumn { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java index 8a8ac2983f9..bbc45a72795 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java @@ -21,9 +21,9 @@ import org.apache.drill.exec.vector.NullableBitVector; import org.apache.drill.exec.vector.ValueVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; /** * This class is used in conjunction with its superclass to read nullable bit columns in a parquet file. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java index 9db87f4ea0b..b8c81b44148 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java @@ -24,9 +24,9 @@ import org.apache.drill.exec.vector.NullableVectorDefinitionSetter; import org.apache.drill.exec.vector.ValueVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; abstract class NullableColumnReader extends ColumnReader{ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableColumnReader.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index f658518dcde..52593453281 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -20,6 +20,7 @@ import io.netty.buffer.DrillBuf; import java.math.BigDecimal; +import java.nio.ByteBuffer; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder; @@ -43,10 +44,10 @@ import org.apache.drill.exec.vector.ValueVector; import org.joda.time.DateTimeUtils; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.io.api.Binary; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.io.api.Binary; public class NullableFixedByteAlignedReaders { @@ -91,7 +92,8 @@ protected void readField(long recordsToReadInThisPass) { Binary currDictValToWrite; for (int i = 0; i < recordsReadInThisIteration; i++){ currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); - mutator.setSafe(valuesReadInCurrentPass + i, currDictValToWrite.toByteBuffer(), 0, + ByteBuffer buf = currDictValToWrite.toByteBuffer(); + mutator.setSafe(valuesReadInCurrentPass + i, buf, buf.position(), currDictValToWrite.length()); } // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java index 528b6db01c9..b18a81c6065 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java @@ -24,9 +24,9 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; public abstract class NullableVarLengthValuesColumn extends VarLengthValuesColumn { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 8c73b2ac083..762f91a3ea7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -17,7 +17,8 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; -import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; +import static org.apache.parquet.column.Encoding.valueOf; import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; @@ -28,29 +29,27 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.store.parquet.ColumnDataReader; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; -import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput; -import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor; import org.apache.drill.exec.store.parquet.ParquetFormatPlugin; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import parquet.bytes.BytesInput; -import parquet.column.Dictionary; -import parquet.column.Encoding; -import parquet.column.ValuesType; -import parquet.column.page.DictionaryPage; -import parquet.column.statistics.Statistics; -import parquet.column.values.ValuesReader; -import parquet.column.values.dictionary.DictionaryValuesReader; -import parquet.format.PageHeader; -import parquet.format.PageType; -import parquet.format.Util; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.schema.PrimitiveType; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.ValuesType; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.dictionary.DictionaryValuesReader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.PrimitiveType; import com.google.common.base.Preconditions; @@ -102,7 +101,7 @@ final class PageReader { // These need to be held throughout reading of the entire column chunk List allocatedDictionaryBuffers; - private final DirectCodecFactory codecFactory; + private final CodecFactory codecFactory; PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{ @@ -129,40 +128,49 @@ private void loadDictionaryIfExists(final ColumnReader parentStatus, f.seek(columnChunkMetaData.getDictionaryPageOffset()); final PageHeader pageHeader = Util.readPageHeader(f); assert pageHeader.type == PageType.DICTIONARY_PAGE; + readDictionaryPage(pageHeader, parentStatus); + } + } - final DrillBuf dictionaryData = allocateDictionaryBuffer(pageHeader.getUncompressed_page_size()); - - if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) { - dataReader.loadPage(dictionaryData, pageHeader.compressed_page_size); - } else { - final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size); - try { - dataReader.loadPage(compressedData, pageHeader.compressed_page_size); - DirectBytesDecompressor decompressor = codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData - .getCodec()); - decompressor.decompress( - compressedData, - pageHeader.compressed_page_size, - dictionaryData, - pageHeader.getUncompressed_page_size()); - - } finally { - compressedData.release(); - } - } + private void readDictionaryPage(final PageHeader pageHeader, + final ColumnReader parentStatus) throws IOException { + int compressedSize = pageHeader.getCompressed_page_size(); + int uncompressedSize = pageHeader.getUncompressed_page_size(); + + final DrillBuf dictionaryData = allocateDictionaryBuffer(uncompressedSize); + readPage(compressedSize, uncompressedSize, dictionaryData); + + DictionaryPage page = new DictionaryPage( + asBytesInput(dictionaryData, 0, uncompressedSize), + pageHeader.uncompressed_page_size, + pageHeader.dictionary_page_header.num_values, + valueOf(pageHeader.dictionary_page_header.encoding.name())); + + this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page); + } - DictionaryPage page = new DictionaryPage( - asBytesInput(dictionaryData, 0, pageHeader.uncompressed_page_size), - pageHeader.uncompressed_page_size, - pageHeader.dictionary_page_header.num_values, - parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) - ); - this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page); + public void readPage(int compressedSize, int uncompressedSize, DrillBuf dest) throws IOException { + if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) { + dataReader.loadPage(dest, compressedSize); + } else { + final DrillBuf compressedData = allocateTemporaryBuffer(compressedSize); + try { + dataReader.loadPage(compressedData, compressedSize); + codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData + .getCodec()).decompress( + compressedData.nioBuffer(0, compressedSize), + compressedSize, + dest.nioBuffer(0, uncompressedSize), + uncompressedSize); + + } finally { + compressedData.release(); + } } } public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) throws IOException { - return new ByteBufBytesInput(buf); + return BytesInput.from(buf.nioBuffer(offset, length), 0, length); } /** @@ -190,50 +198,16 @@ public boolean next() throws IOException { do { pageHeader = dataReader.readPageHeader(); if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { - - //TODO: Handle buffer allocation exception - DrillBuf uncompressedData = allocateDictionaryBuffer(pageHeader.getUncompressed_page_size()); - if( parentColumnReader.columnChunkMetaData.getCodec()== CompressionCodecName.UNCOMPRESSED) { - dataReader.loadPage(uncompressedData, pageHeader.compressed_page_size); - }else{ - final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size); - try{ - dataReader.loadPage(compressedData, pageHeader.compressed_page_size); - codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress( - compressedData, - pageHeader.compressed_page_size, - uncompressedData, - pageHeader.getUncompressed_page_size()); - } finally { - compressedData.release(); - } - } - DictionaryPage page = new DictionaryPage( - asBytesInput(uncompressedData, 0, pageHeader.uncompressed_page_size), - pageHeader.uncompressed_page_size, - pageHeader.dictionary_page_header.num_values, - parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) - ); - this.dictionary = page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page); + readDictionaryPage(pageHeader, parentColumnReader); } } while (pageHeader.getType() == PageType.DICTIONARY_PAGE); //TODO: Handle buffer allocation exception allocatePageData(pageHeader.getUncompressed_page_size()); - - if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) { - dataReader.loadPage(pageData, pageHeader.compressed_page_size); - }else{ - final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size); - dataReader.loadPage(compressedData, pageHeader.compressed_page_size); - codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress( - compressedData, - pageHeader.compressed_page_size, - pageData, - pageHeader.getUncompressed_page_size()); - compressedData.release(); - } + int compressedSize = pageHeader.getCompressed_page_size(); + int uncompressedSize = pageHeader.getUncompressed_page_size(); + readPage(compressedSize, uncompressedSize, pageData); currentPageCount = pageHeader.data_page_header.num_values; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java index 4969dc04f0b..a8e6c2ce4e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -17,7 +17,6 @@ ******************************************************************************/ package org.apache.drill.exec.store.parquet.columnreaders; -import io.netty.buffer.DrillBuf; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.Decimal18Vector; @@ -25,15 +24,14 @@ import org.apache.drill.exec.vector.Float4Vector; import org.apache.drill.exec.vector.Float8Vector; import org.apache.drill.exec.vector.IntVector; -import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.TimeStampVector; import org.apache.drill.exec.vector.TimeVector; import org.apache.drill.exec.vector.VarBinaryVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.io.api.Binary; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.io.api.Binary; public class ParquetFixedWidthDictionaryReaders { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 3a00a4c4426..f42996b26fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -39,7 +39,6 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField.Key; import org.apache.drill.exec.store.AbstractRecordReader; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; @@ -47,15 +46,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import parquet.column.ColumnDescriptor; -import parquet.format.FileMetaData; -import parquet.format.SchemaElement; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.ParquetFileWriter; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.schema.PrimitiveType; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.FileMetaData; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.PrimitiveType; import com.google.common.collect.Lists; @@ -101,7 +101,7 @@ public class ParquetRecordReader extends AbstractRecordReader { // records specified in the row group metadata long mockRecordsRead; - private final DirectCodecFactory codecFactory; + private final CodecFactory codecFactory; int rowGroupIndex; long totalRecordsRead; private final FragmentContext fragmentContext; @@ -110,7 +110,7 @@ public ParquetRecordReader(FragmentContext fragmentContext, String path, int rowGroupIndex, FileSystem fs, - DirectCodecFactory codecFactory, + CodecFactory codecFactory, ParquetMetadata footer, List columns) throws ExecutionSetupException { this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer, @@ -123,7 +123,7 @@ public ParquetRecordReader( String path, int rowGroupIndex, FileSystem fs, - DirectCodecFactory codecFactory, + CodecFactory codecFactory, ParquetMetadata footer, List columns) throws ExecutionSetupException { this.hadoopPath = new Path(path); @@ -136,7 +136,7 @@ public ParquetRecordReader( setColumns(columns); } - public DirectCodecFactory getCodecFactory() { + public CodecFactory getCodecFactory() { return codecFactory; } @@ -471,7 +471,7 @@ public void close() { columnStatuses = null; } - codecFactory.close(); + codecFactory.release(); if (varLengthReader != null) { for (final VarLengthColumn r : varLengthReader.columns) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java index a33e616609f..b6d1a729d5c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java @@ -17,21 +17,17 @@ ******************************************************************************/ package org.apache.drill.exec.store.parquet.columnreaders; -import static parquet.Preconditions.checkArgument; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.util.CoreDecimalUtility; -import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; -import org.apache.drill.exec.work.ExecErrorConstants; -import parquet.format.ConvertedType; -import parquet.format.SchemaElement; -import parquet.schema.PrimitiveType; +import org.apache.parquet.format.ConvertedType; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.schema.PrimitiveType; + +import static com.google.common.base.Preconditions.checkArgument; public class ParquetToDrillTypeConverter { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java index 8e72bffbedc..a62e8c5b050 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java @@ -22,11 +22,11 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; -import parquet.column.ColumnDescriptor; -import parquet.format.Encoding; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.io.api.Binary; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.Encoding; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.io.api.Binary; public abstract class VarLengthColumn extends ColumnReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumn.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java index fe5266c6bcb..ba126d2fabf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java @@ -20,6 +20,7 @@ import io.netty.buffer.DrillBuf; import java.math.BigDecimal; +import java.nio.ByteBuffer; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; @@ -34,9 +35,9 @@ import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarCharVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; public class VarLengthColumnReaders { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumnReaders.class); @@ -222,7 +223,8 @@ public boolean setSafe(int index, DrillBuf value, int start, int length) { } if (usingDictionary) { - mutator.setSafe(index, currDictValToWrite.toByteBuffer(), 0, currDictValToWrite.length()); + ByteBuffer buf = currDictValToWrite.toByteBuffer(); + mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length()); } else { mutator.setSafe(index, 1, start, start + length, value); } @@ -257,7 +259,8 @@ public boolean setSafe(int index, DrillBuf value, int start, int length) { if (usingDictionary) { currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); - mutator.setSafe(index, currDictValToWrite.toByteBuffer(), 0, currDictValToWrite.length()); + ByteBuffer buf = currDictValToWrite.toByteBuffer(); + mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length()); } else { mutator.setSafe(index, start, start + length, value); } @@ -294,8 +297,8 @@ public boolean setSafe(int index, DrillBuf value, int start, int length) { } if (usingDictionary) { - mutator.setSafe(index, currDictValToWrite.toByteBuffer(), 0, - currDictValToWrite.length()); + ByteBuffer buf = currDictValToWrite.toByteBuffer(); + mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length()); } else { mutator.setSafe(index, 1, start, start + length, value); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java index 1e14a3e15c8..6a86cea424a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java @@ -25,11 +25,11 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; -import parquet.column.ColumnDescriptor; -import parquet.format.Encoding; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.io.api.Binary; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.Encoding; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.io.api.Binary; public abstract class VarLengthValuesColumn extends VarLengthColumn { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java index 6b8154af8d8..5bc8ad227fa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java @@ -67,17 +67,17 @@ import org.apache.drill.exec.vector.complex.writer.VarCharWriter; import org.joda.time.DateTimeUtils; -import parquet.io.api.Binary; -import parquet.io.api.Converter; -import parquet.io.api.GroupConverter; -import parquet.io.api.PrimitiveConverter; -import parquet.schema.DecimalMetadata; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType; -import parquet.schema.Type; -import parquet.schema.Type.Repetition; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; import com.google.common.collect.Lists; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 01a98537f53..9e6919bb801 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -42,7 +42,7 @@ import org.apache.drill.exec.record.MaterializedField.Key; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; import org.apache.drill.exec.store.parquet.RowGroupReadEntry; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; @@ -51,17 +51,19 @@ import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.hadoop.fs.Path; -import parquet.column.ColumnDescriptor; -import parquet.common.schema.ColumnPath; -import parquet.hadoop.ColumnChunkIncReadStore; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.hadoop.ColumnChunkIncReadStore; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -80,7 +82,7 @@ public class DrillParquetReader extends AbstractRecordReader { private RowGroupReadEntry entry; private VectorContainerWriter writer; private ColumnChunkIncReadStore pageReadStore; - private parquet.io.RecordReader recordReader; + private RecordReader recordReader; private DrillParquetRecordMaterializer recordMaterializer; private int recordCount; private List primitiveVectors; @@ -246,7 +248,8 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio recordCount = (int) blockMetaData.getRowCount(); pageReadStore = new ColumnChunkIncReadStore(recordCount, - new DirectCodecFactory(fileSystem.getConf(), operatorContext.getAllocator()), operatorContext.getAllocator(), + CodecFactory.createDirectCodecFactory(fileSystem.getConf(), + new ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0), operatorContext.getAllocator(), fileSystem, filePath); for (String[] path : schema.getPaths()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java index a80eb57a267..6b7edc44e01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java @@ -22,9 +22,9 @@ import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; -import parquet.io.api.GroupConverter; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; import java.util.Collection; import java.util.List; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java index 22bd7df77f3..8ade25caefc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -35,8 +36,6 @@ import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.store.StoragePluginRegistry; -import parquet.org.codehaus.jackson.annotate.JsonCreator; - import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java similarity index 85% rename from exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java rename to exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java index 4559083f608..28f63902d9b 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package parquet.hadoop; +package org.apache.parquet.hadoop; import io.netty.buffer.ByteBuf; @@ -29,42 +29,42 @@ import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; +import org.apache.drill.exec.store.parquet.ColumnDataReader; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import parquet.bytes.BytesInput; -import parquet.column.ColumnDescriptor; -import parquet.column.page.DataPage; -import parquet.column.page.DataPageV1; -import parquet.column.page.DataPageV2; -import parquet.column.page.DictionaryPage; -import parquet.column.page.PageReadStore; -import parquet.column.page.PageReader; -import parquet.format.DataPageHeaderV2; -import parquet.format.PageHeader; -import parquet.format.Util; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.CodecFactory.BytesDecompressor; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.util.CompatibilityUtil; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; -import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; public class ColumnChunkIncReadStore implements PageReadStore { private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - private DirectCodecFactory codecFactory; + private CodecFactory codecFactory; private BufferAllocator allocator; private FileSystem fs; private Path path; private long rowCount; private List streams = new ArrayList(); - public ColumnChunkIncReadStore(long rowCount, DirectCodecFactory codecFactory, BufferAllocator allocator, + public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, FileSystem fs, Path path) { this.codecFactory = codecFactory; this.allocator = allocator; @@ -88,7 +88,7 @@ public class ColumnChunkIncPageReader implements PageReader { private ByteBuf lastPage; - public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, ColumnDescriptor columnDescriptor, FSDataInputStream in) { + public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, ColumnDescriptor columnDescriptor, FSDataInputStream in) throws IOException { this.metaData = metaData; this.columnDescriptor = columnDescriptor; this.size = metaData.getTotalSize(); @@ -165,8 +165,9 @@ public DataPage readPage() { ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size); lastPage = buf; ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); - while (buffer.remaining() > 0) { - CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size); + int lengthLeftToRead = pageHeader.compressed_page_size; + while (lengthLeftToRead > 0) { + lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead); } return new DataPageV1( decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), @@ -183,8 +184,9 @@ public DataPage readPage() { buf = allocator.buffer(pageHeader.compressed_page_size); lastPage = buf; buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); - while (buffer.remaining() > 0) { - CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size); + lengthLeftToRead = pageHeader.compressed_page_size; + while (lengthLeftToRead > 0) { + lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead); } DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length(); diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java similarity index 70% rename from exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java rename to exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java index 743d18549ec..564a0a46846 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java @@ -15,36 +15,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package parquet.hadoop; +package org.apache.parquet.hadoop; import java.io.IOException; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; -import parquet.column.page.PageWriteStore; -import parquet.hadoop.CodecFactory.BytesCompressor; -import parquet.schema.MessageType; +import org.apache.parquet.column.page.PageWriteStore; +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; +import org.apache.parquet.schema.MessageType; public class ColumnChunkPageWriteStoreExposer { public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore( OperatorContext oContext, BytesCompressor compressor, - MessageType schema, - int initialSize + MessageType schema ) { - return new ColumnChunkPageWriteStore(compressor, schema, initialSize, new ParquetDirectByteBufferAllocator(oContext)); + return new ColumnChunkPageWriteStore(compressor, schema, new ParquetDirectByteBufferAllocator(oContext)); } public static void flushPageStore(PageWriteStore pageStore, ParquetFileWriter w) throws IOException { ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w); } - public static void close(PageWriteStore pageStore) throws IOException { - ((ColumnChunkPageWriteStore) pageStore).close(); - - } - + // TODO(jaltekruse) - review, this used to have a method for closing a pageStore + // the parquet code once rebased did not include this close method, make sure it isn't needed + // I might have messed up the merge } diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java index 60116e2c466..352f487de90 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java @@ -166,7 +166,11 @@ private void compareMergedVectors(Map expectedRecords, Map expectedRecords, Map expectedRecords, Map actualRecords, int offset) { + StringBuilder expected = new StringBuilder(); + StringBuilder actual = new StringBuilder(); + expected.append("Expected Records near verification failure:\n"); + actual.append("Actual Records near verification failure:\n"); + int firstRecordToPrint = Math.max(0, offset - 5); + List expectedValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); + List actualValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); + int numberOfRecordsToPrint = Math.min(Math.min(10, expectedValuesInFirstColumn.size()), actualValuesInFirstColumn.size()); + for (int i = firstRecordToPrint; i < numberOfRecordsToPrint; i++) { + expected.append("Record Number: ").append(i).append(" { "); + actual.append("Record Number: ").append(i).append(" { "); + for (String s : actualRecords.keySet()) { + List actualValues = actualRecords.get(s); + actual.append(s).append(" : ").append(actualValues.get(i)).append(","); + } + for (String s : expectedRecords.keySet()) { + List expectedValues = expectedRecords.get(s); + expected.append(s).append(" : ").append(expectedValues.get(i)).append(","); + } + expected.append(" }\n"); + actual.append(" }\n"); + } + + return expected.append("\n\n").append(actual).toString(); + + } + private Map addToHyperVectorMap(List records, RecordBatchLoader loader, BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes diff --git a/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java b/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java index f9cfdb5b6f5..204eeb0a0c5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java +++ b/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java @@ -17,11 +17,11 @@ */ package org.apache.drill; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.PrimitiveType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type.Repetition; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; public class ParquetSchemaMerge { public static void main(String[] args) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java index deeb7cbca75..9d9c1a7192b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java @@ -23,6 +23,7 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.hamcrest.CoreMatchers; import org.junit.Test; import java.math.BigDecimal; @@ -30,6 +31,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; // TODO - update framework to remove any dependency on the Drill engine for reading baseline result sets // currently using it with the assumption that the csv and json readers are well tested, and handling diverse @@ -237,7 +239,8 @@ public void testCSVVerificationOfTypes() throws Throwable { .baselineColumns("employee_id", "first_name", "last_name") .build().run(); } catch (Exception ex) { - assertEquals("at position 0 column '`employee_id`' mismatched values, expected: 12(Integer) but received 12(Long)", ex.getMessage()); + assertThat(ex.getMessage(), CoreMatchers.containsString( + "at position 0 column '`employee_id`' mismatched values, expected: 12(Integer) but received 12(Long)")); // this indicates successful completion of the test return; } @@ -254,7 +257,8 @@ public void testCSVVerificationOfOrder_checkFailure() throws Throwable { .baselineColumns("employee_id", "first_name", "last_name") .build().run(); } catch (Exception ex) { - assertEquals("at position 0 column '`first_name`' mismatched values, expected: Jewel(String) but received Peggy(String)", ex.getMessage()); + assertThat(ex.getMessage(), CoreMatchers.containsString( + "at position 0 column '`first_name`' mismatched values, expected: Jewel(String) but received Peggy(String)")); // this indicates successful completion of the test return; } @@ -319,9 +323,9 @@ public void testRepeatedColumnMatching() throws Exception { .optionSettingQueriesForBaseline("alter system set `store.json.all_text_mode` = true") .build().run(); } catch (Exception ex) { - assertEquals("at position 1 column '`field_1`' mismatched values, " + - "expected: [\"5\",\"2\",\"3\",\"4\",\"1\",\"2\"](JsonStringArrayList) but received [\"5\"](JsonStringArrayList)", - ex.getMessage()); + assertThat(ex.getMessage(), CoreMatchers.containsString( + "at position 1 column '`field_1`' mismatched values, " + + "expected: [\"5\",\"2\",\"3\",\"4\",\"1\",\"2\"](JsonStringArrayList) but received [\"5\"](JsonStringArrayList)")); // this indicates successful completion of the test return; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java index e01e45d04b7..df63d7e57df 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java @@ -18,11 +18,18 @@ package org.apache.drill.exec.impersonation; import com.google.common.collect.Maps; +import org.apache.drill.exec.physical.impl.writer.TestParquetWriter; import org.apache.drill.exec.store.dfs.WorkspaceConfig; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +/** + * Note to future devs, please do not put random tests here. Make sure that they actually require + * access to a DFS instead of the local filesystem implementation used by default in the rest of + * the tests. Running this mini cluster is slow and it is best for these tests to only cover + * necessary cases. + */ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation { @BeforeClass @@ -37,6 +44,38 @@ private static void createTestData() throws Exception { // Create test table in minidfs.tmp schema for use in test queries test(String.format("CREATE TABLE %s.tmp.dfsRegion AS SELECT * FROM cp.`region.json`", MINIDFS_STORAGE_PLUGIN_NAME)); + + // generate a large enough file that the DFS will not fulfill requests to read a + // page of data all at once, see notes above testReadLargeParquetFileFromDFS() + test(String.format( + "CREATE TABLE %s.tmp.large_employee AS " + + "(SELECT employee_id, full_name FROM cp.`/employee.json`) " + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)", + MINIDFS_STORAGE_PLUGIN_NAME)); + } + + /** + * When working on merging the Drill fork of parquet a bug was found that only manifested when + * run on a cluster. It appears that the local implementation of the Hadoop FileSystem API + * never fails to provide all of the bytes that are requested in a single read. The API is + * designed to allow for a subset of the requested bytes be returned, and a client can decide + * if they want to do processing on teh subset that are available now before requesting the rest. + * + * For parquet's block compression of page data, we need all of the bytes. This test is here as + * a sanitycheck to make sure we don't accidentally introduce an issue where a subset of the bytes + * are read and would otherwise require testing on a cluster for the full contract of the read method + * we are using to be exercised. + */ + @Test + public void testReadLargeParquetFileFromDFS() throws Exception { + test(String.format("USE %s", MINIDFS_STORAGE_PLUGIN_NAME)); + test("SELECT * FROM tmp.`large_employee`"); } @Test // DRILL-3037 diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 7c4ac1e2698..51d5d080a39 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -19,7 +19,6 @@ import java.io.File; import java.io.FileWriter; -import java.io.IOException; import java.math.BigDecimal; import java.sql.Date; @@ -33,7 +32,6 @@ import org.joda.time.DateTime; import org.joda.time.Period; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; @@ -47,6 +45,28 @@ public class TestParquetWriter extends BaseTestQuery { public TemporaryFolder folder = new TemporaryFolder(); static FileSystem fs; + private String allTypesSelection = + "cast( `int_col` AS int) `int_col`, " + + "cast( `bigint_col` AS bigint) `bigint_col`, " + + // TODO(DRILL-3367) +// "cast( `decimal9_col` AS decimal(9, 4)) `decimal9_col`, " + +// "cast( `decimal18_col` AS decimal(18,9)) `decimal18_col`, " + +// "cast( `decimal28sparse_col` AS decimal(28, 14)) `decimal28sparse_col`, " + +// "cast( `decimal38sparse_col` AS decimal(38, 19)) `decimal38sparse_col`, " + + "cast( `date_col` AS date) `date_col`, " + + "cast( `timestamp_col` AS timestamp) `timestamp_col`, " + + "cast( `float4_col` AS float) `float4_col`, " + + "cast( `float8_col` AS double) `float8_col`, " + + "cast( `varbinary_col` AS varbinary(65000)) `varbinary_col`, " + + // TODO(DRILL-2297) +// "cast( `intervalyear_col` AS interval year) `intervalyear_col`, " + + "cast( `intervalday_col` AS interval day) `intervalday_col`, " + + "cast( `bit_col` AS boolean) `bit_col`, " + + " `varchar_col` `varchar_col`, " + + "cast( `time_col` AS time) `time_col` "; + + private String allTypesTable = "cp.`/parquet/alltypes.json`"; + @BeforeClass public static void initFs() throws Exception { Configuration conf = new Configuration(); @@ -61,6 +81,12 @@ public static void disableDecimalDataType() throws Exception { test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY)); } + @Test + public void testSmallFileValueReadWrite() throws Exception { + String selection = "key"; + String inputTable = "cp.`/store/json/intData.json`"; + runTestAndValidate(selection, selection, inputTable, "smallFileTest"); + } @Test public void testSimple() throws Exception { @@ -102,6 +128,47 @@ public void testLargeFooter() throws Exception { .build().run(); } + @Test + public void testAllScalarTypes() throws Exception { + /// read once with the flat reader + runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); + + try { + // read all of the types with the complex reader + test(String.format("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER)); + runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); + } finally { + test(String.format("alter session set %s = false", ExecConstants.PARQUET_NEW_RECORD_READER)); + } + } + + @Test + public void testAllScalarTypesDictionary() throws Exception { + try { + test(String.format("alter session set %s = true", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + /// read once with the flat reader + runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); + + // read all of the types with the complex reader + test(String.format("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER)); + runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); + } finally { + test(String.format("alter session set %s = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + } + } + + @Test + public void testDictionaryEncoding() throws Exception { + String selection = "type"; + String inputTable = "cp.`donuts.json`"; + try { + test(String.format("alter session set %s = true", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + runTestAndValidate(selection, selection, inputTable, "donuts_json"); + } finally { + test(String.format("alter session set %s = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + } + } + @Test public void testComplex() throws Exception { String selection = "*"; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java deleted file mode 100644 index 004a8d00f61..00000000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store; - -import io.netty.buffer.DrillBuf; - -import java.nio.ByteBuffer; -import java.util.Random; - -import org.apache.drill.common.DeferredException; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.ExecTest; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.RootAllocatorFactory; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; -import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput; -import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.Test; - -import parquet.bytes.BytesInput; -import parquet.hadoop.CodecFactory.BytesCompressor; -import parquet.hadoop.metadata.CompressionCodecName; - -public class TestDirectCodecFactory extends ExecTest { - //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestDirectCodecFactory.class); - private final DrillConfig drillConfig = DrillConfig.create(); - - private static enum Decompression { - ON_HEAP, OFF_HEAP, DRILLBUF - } - - private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) throws Exception { - DrillBuf rawBuf = null; - DrillBuf outBuf = null; - try (final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig); - final DirectCodecFactory codecFactory = new DirectCodecFactory(new Configuration(), allocator)) { - try { - rawBuf = allocator.buffer(size); - final byte[] rawArr = new byte[size]; - outBuf = allocator.buffer(size * 2); - final Random r = new Random(); - final byte[] random = new byte[1024]; - int pos = 0; - while (pos < size) { - r.nextBytes(random); - rawBuf.writeBytes(random); - System.arraycopy(random, 0, rawArr, pos, random.length); - pos += random.length; - } - - final BytesCompressor c = codecFactory.getCompressor(codec, 64 * 1024); - final DirectBytesDecompressor d = codecFactory.getDecompressor(codec); - - final BytesInput compressed; - if (useOnHeapCompression) { - compressed = c.compress(BytesInput.from(rawArr)); - } else { - compressed = c.compress(new ByteBufBytesInput(rawBuf)); - } - - switch (decomp) { - case DRILLBUF: { - final ByteBuffer buf = compressed.toByteBuffer(); - final DrillBuf b = allocator.buffer(buf.capacity()); - try { - b.writeBytes(buf); - d.decompress(b, (int) compressed.size(), outBuf, size); - for (int i = 0; i < size; i++) { - Assert.assertTrue("Data didn't match at " + i, outBuf.getByte(i) == rawBuf.getByte(i)); - } - } finally { - b.release(); - } - break; - } - - case OFF_HEAP: { - final ByteBuffer buf = compressed.toByteBuffer(); - final DrillBuf b = allocator.buffer(buf.capacity()); - try { - b.writeBytes(buf); - final BytesInput input = d.decompress(new ByteBufBytesInput(b), size); - Assert.assertArrayEquals(input.toByteArray(), rawArr); - } finally { - b.release(); - } - break; - } - case ON_HEAP: { - final byte[] buf = compressed.toByteArray(); - final BytesInput input = d.decompress(BytesInput.from(buf), size); - Assert.assertArrayEquals(input.toByteArray(), rawArr); - break; - } - } - } catch (Exception e) { - final String msg = String.format( - "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d", - codec.name(), - useOnHeapCompression, decomp.name(), size); - System.out.println(msg); - throw new RuntimeException(msg, e); - } finally { - if (rawBuf != null) { - rawBuf.release(); - } - if (outBuf != null) { - outBuf.release(); - } - } - } - } - - @Test - public void compressionCodecs() throws Exception { - final int[] sizes = { 4 * 1024, 1 * 1024 * 1024 }; - final boolean[] comp = { true, false }; - - try (final DeferredException ex = new DeferredException()) { - for (final int size : sizes) { - for (final boolean useOnHeapComp : comp) { - for (final Decompression decomp : Decompression.values()) { - for (final CompressionCodecName codec : CompressionCodecName.values()) { - if (codec == CompressionCodecName.LZO) { - // not installed as gpl. - continue; - } - try { - test(size, codec, useOnHeapComp, decomp); - } catch (Exception e) { - ex.addException(e); - } - } - } - } - } - } - } -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 14cfd8eef45..d5f7352eafa 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -63,18 +63,19 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.parquet.hadoop.CodecFactory; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import parquet.bytes.BytesInput; -import parquet.column.page.DataPageV1; -import parquet.column.page.PageReadStore; -import parquet.column.page.PageReader; -import parquet.hadoop.Footer; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.schema.MessageType; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; import com.google.common.base.Charsets; import com.google.common.base.Stopwatch; @@ -638,7 +639,8 @@ public void testPerformance(@Injectable final DrillbitContext bitContext, final BufferAllocator allocator = RootAllocatorFactory.newRoot(c); for(int i = 0; i < 25; i++) { final ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs, - new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns); + CodecFactory.createDirectCodecFactory(dfsConfig, new ParquetDirectByteBufferAllocator(allocator), 0), + f.getParquetMetadata(), columns); final TestOutputMutator mutator = new TestOutputMutator(allocator); rr.setup(null, mutator); final Stopwatch watch = new Stopwatch(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java index 013ea955009..593e0dbe793 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java @@ -17,25 +17,26 @@ */ package org.apache.drill.exec.store.parquet; -import static parquet.column.Encoding.PLAIN; -import static parquet.column.Encoding.RLE; +import static org.apache.parquet.column.Encoding.PLAIN; +import static org.apache.parquet.column.Encoding.RLE; import java.util.HashMap; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.store.ByteArrayUtil; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import parquet.bytes.BytesInput; -import parquet.bytes.DirectByteBufferAllocator; -import parquet.column.ColumnDescriptor; -import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; -import parquet.hadoop.ParquetFileWriter; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.schema.MessageType; -import parquet.schema.MessageTypeParser; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; public class TestFileGenerator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFileGenerator.class); @@ -185,16 +186,19 @@ public static void generateParquetFile(String filename, ParquetTestProperties pr ColumnDescriptor c1 = schema.getColumnDescription(path1); w.startColumn(c1, props.recordsPerRowGroup, codec); - int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages); + final int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages); + final int PAGE_SIZE = 1024 * 1024; // 1 MB byte[] bytes; RunLengthBitPackingHybridValuesWriter defLevels = new RunLengthBitPackingHybridValuesWriter( - MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, - valsPerPage, - new DirectByteBufferAllocator()); + MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, + valsPerPage, + PAGE_SIZE, + new DirectByteBufferAllocator()); RunLengthBitPackingHybridValuesWriter repLevels = new RunLengthBitPackingHybridValuesWriter( - MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, - valsPerPage, - new DirectByteBufferAllocator()); + MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, + valsPerPage, + PAGE_SIZE, + new DirectByteBufferAllocator()); // for variable length binary fields int bytesNeededToEncodeLength = 4; if ((int) fieldInfo.bitLength > 0) { diff --git a/exec/java-exec/src/test/resources/store/json/donuts_short.json b/exec/java-exec/src/test/resources/store/json/donuts_short.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java index d119ec89e1f..286ebf402bf 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java @@ -17,6 +17,8 @@ */ package org.apache.drill.jdbc; +import com.google.common.base.Preconditions; + import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; @@ -36,8 +38,6 @@ import java.util.Properties; import java.util.concurrent.Executor; -import parquet.Preconditions; - /** * A connection decorator that ignores {@link Connection#close} calls. * diff --git a/pom.xml b/pom.xml index e799783f9cd..575eadd4f00 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ 4.11 1.7.6 2 - 1.6.0rc3-drill-r0.3 + 1.8.1-drill-r0 1.1.9-drill-r7