diff --git a/flink-formats/flink-orc-nohive/pom.xml b/flink-formats/flink-orc-nohive/pom.xml index 2c3cb7c00a45a..3110af5bb3da3 100644 --- a/flink-formats/flink-orc-nohive/pom.xml +++ b/flink-formats/flink-orc-nohive/pom.xml @@ -82,6 +82,18 @@ under the License. + + org.apache.hadoop + hadoop-common + provided + + + + org.apache.hadoop + hadoop-hdfs + provided + + diff --git a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveBulkWriterFactory.java b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveBulkWriterFactory.java index a5a1facbe6d9a..b7a845357b0de 100644 --- a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveBulkWriterFactory.java +++ b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveBulkWriterFactory.java @@ -20,7 +20,7 @@ import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.orc.nohive.writer.NoHivePhysicalWriterImpl; +import org.apache.flink.orc.writer.HadoopNoCloseStream; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; @@ -31,7 +31,9 @@ import org.apache.hadoop.fs.Path; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.PhysicalFsWriter; import org.apache.orc.impl.WriterImpl; +import org.apache.orc.impl.writer.WriterEncryptionVariant; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; @@ -65,7 +67,10 @@ public BulkWriter create(FSDataOutputStream out) throws IOException { OrcFile.WriterOptions opts = OrcFile.writerOptions(new Properties(), conf); TypeDescription description = TypeDescription.fromString(schema); opts.setSchema(description); - opts.physicalWriter(new NoHivePhysicalWriterImpl(out, opts)); + + HadoopNoCloseStream hadoopOutputStream = new HadoopNoCloseStream(out, null); + opts.physicalWriter( + new PhysicalFsWriter(hadoopOutputStream, opts, new WriterEncryptionVariant[0])); WriterImpl writer = new WriterImpl(null, new Path("."), opts); VectorizedRowBatch rowBatch = description.createRowBatch(); diff --git a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java deleted file mode 100644 index 0734e9bf90bc3..0000000000000 --- a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc.nohive.writer; - -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.orc.writer.PhysicalWriterImpl; - -import com.google.protobuf25.CodedOutputStream; -import org.apache.orc.OrcFile; -import org.apache.orc.OrcProto; - -import java.io.IOException; - -/** - * Protobuf is relocated in orc-core-nohive, therefore method calls involving PB classes need to use - * the relocated class names here. - */ -public class NoHivePhysicalWriterImpl extends PhysicalWriterImpl { - - // relocated PB class in orc-core-nohive - private final CodedOutputStream noHiveProtobufWriter; - - public NoHivePhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) - throws IOException { - super(out, opts); - noHiveProtobufWriter = CodedOutputStream.newInstance(writer); - } - - @Override - protected void writeMetadata(OrcProto.Metadata metadata) throws IOException { - metadata.writeTo(noHiveProtobufWriter); - noHiveProtobufWriter.flush(); - writer.flush(); - } - - @Override - protected void writeFileFooter(OrcProto.Footer footer) throws IOException { - footer.writeTo(noHiveProtobufWriter); - noHiveProtobufWriter.flush(); - writer.flush(); - } - - @Override - protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException { - footer.writeTo(noHiveProtobufWriter); - noHiveProtobufWriter.flush(); - writer.flush(); - } -} diff --git a/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java b/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java index 8f84134a1c15f..f0f0094a390d0 100644 --- a/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java +++ b/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java @@ -47,11 +47,11 @@ protected void prepareReadFileWithTypes(String file, int rowSize) throws IOExcep TypeDescription schema = TypeDescription.fromString( "struct<" - + "f0:float," - + "f1:double," - + "f2:timestamp," - + "f3:tinyint," - + "f4:smallint" + + "_col0:float," + + "_col1:double," + + "_col2:timestamp," + + "_col3:tinyint," + + "_col4:smallint" + ">"); org.apache.hadoop.fs.Path filePath = new org.apache.hadoop.fs.Path(file); @@ -105,7 +105,9 @@ protected OrcColumnarRowSplitReader createReader( throws IOException { return OrcNoHiveSplitReaderUtil.genPartColumnarRowReader( new Configuration(), - IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new), + IntStream.range(0, fullTypes.length) + .mapToObj(i -> "_col" + i) + .toArray(String[]::new), fullTypes, partitionSpec, selectedFields, diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/HadoopNoCloseStream.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/HadoopNoCloseStream.java new file mode 100644 index 0000000000000..633b3c02c1d11 --- /dev/null +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/HadoopNoCloseStream.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.writer; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * This class is designed to not close the underlying flink stream to avoid exceptions when + * checkpointing. + */ +public class HadoopNoCloseStream extends FSDataOutputStream { + + public HadoopNoCloseStream(OutputStream out, FileSystem.Statistics stats) throws IOException { + super(out, stats); + } + + @Override + public void close() throws IOException { + // Don't close the internal stream here to avoid + // Stream Closed or ClosedChannelException when Flink performs checkpoint. + // noop + } +} diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java index 5e4310107ab45..fa857039f7a51 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java @@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.orc.OrcFile; +import org.apache.orc.impl.PhysicalFsWriter; import org.apache.orc.impl.WriterImpl; +import org.apache.orc.impl.writer.WriterEncryptionVariant; import java.io.IOException; import java.util.HashMap; @@ -96,7 +98,9 @@ public OrcBulkWriterFactory( @Override public BulkWriter create(FSDataOutputStream out) throws IOException { OrcFile.WriterOptions opts = getWriterOptions(); - opts.physicalWriter(new PhysicalWriterImpl(out, opts)); + HadoopNoCloseStream hadoopOutputStream = new HadoopNoCloseStream(out, null); + opts.physicalWriter( + new PhysicalFsWriter(hadoopOutputStream, opts, new WriterEncryptionVariant[0])); // The path of the Writer is not used to indicate the destination file // in this case since we have used a dedicated physical writer to write diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java deleted file mode 100644 index 56e16c99c0a35..0000000000000 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc.writer; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.core.fs.FSDataOutputStream; - -import com.google.protobuf.CodedOutputStream; -import org.apache.orc.CompressionCodec; -import org.apache.orc.CompressionKind; -import org.apache.orc.OrcFile; -import org.apache.orc.OrcProto; -import org.apache.orc.PhysicalWriter; -import org.apache.orc.impl.HadoopShims; -import org.apache.orc.impl.OrcCodecPool; -import org.apache.orc.impl.OutStream; -import org.apache.orc.impl.StreamName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize; - -/** - * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}. - * - *

Whereas PhysicalFsWriter implementation works on the basis of a Path, this implementation - * leverages Flink's {@link FSDataOutputStream} to write the compressed data. - * - *

NOTE: If the ORC dependency version is updated, this file may have to be updated as well to be - * in sync with the new version's PhysicalFsWriter. - */ -@Internal -public class PhysicalWriterImpl implements PhysicalWriter { - - private static final Logger LOG = LoggerFactory.getLogger(PhysicalWriterImpl.class); - private static final byte[] ZEROS = new byte[64 * 1024]; - private static final int HDFS_BUFFER_SIZE = 256 * 1024; - - protected final OutStream writer; - private final CodedOutputStream protobufWriter; - private final CompressionKind compress; - private final Map streams; - private final HadoopShims shims; - private final int maxPadding; - private final int bufferSize; - private final long blockSize; - private final boolean addBlockPadding; - private final boolean writeVariableLengthBlocks; - - private CompressionCodec codec; - private FSDataOutputStream out; - private long headerLength; - private long stripeStart; - private long blockOffset; - private int metadataLength; - private int footerLength; - - public PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) - throws IOException { - if (opts.isEnforceBufferSize()) { - this.bufferSize = opts.getBufferSize(); - } else { - this.bufferSize = - getEstimatedBufferSize( - opts.getStripeSize(), - opts.getSchema().getMaximumId() + 1, - opts.getBufferSize()); - } - - this.out = out; - this.blockOffset = 0; - this.blockSize = opts.getBlockSize(); - this.maxPadding = (int) (opts.getPaddingTolerance() * (double) opts.getBufferSize()); - this.compress = opts.getCompress(); - this.codec = OrcCodecPool.getCodec(this.compress); - this.streams = new TreeMap<>(); - this.writer = - new OutStream("metadata", this.bufferSize, this.codec, new DirectStream(this.out)); - this.shims = opts.getHadoopShims(); - this.addBlockPadding = opts.getBlockPadding(); - this.protobufWriter = CodedOutputStream.newInstance(this.writer); - this.writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks(); - } - - @Override - public void writeHeader() throws IOException { - this.out.write("ORC".getBytes()); - this.headerLength = this.out.getPos(); - } - - @Override - public OutputReceiver createDataStream(StreamName name) throws IOException { - BufferedStream result = streams.get(name); - - if (result == null) { - result = new BufferedStream(); - streams.put(name, result); - } - - return result; - } - - @Override - public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec) - throws IOException { - OutputStream stream = - new OutStream(this.toString(), bufferSize, codec, createDataStream(name)); - index.build().writeTo(stream); - stream.flush(); - } - - @Override - public void writeBloomFilter( - StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec) - throws IOException { - OutputStream stream = - new OutStream(this.toString(), bufferSize, codec, createDataStream(name)); - bloom.build().writeTo(stream); - stream.flush(); - } - - @Override - public void finalizeStripe( - OrcProto.StripeFooter.Builder footerBuilder, - OrcProto.StripeInformation.Builder dirEntry) - throws IOException { - long indexSize = 0; - long dataSize = 0; - - for (Map.Entry pair : streams.entrySet()) { - BufferedStream receiver = pair.getValue(); - if (!receiver.isSuppressed) { - long streamSize = receiver.getOutputSize(); - StreamName name = pair.getKey(); - footerBuilder.addStreams( - OrcProto.Stream.newBuilder() - .setColumn(name.getColumn()) - .setKind(name.getKind()) - .setLength(streamSize)); - if (StreamName.Area.INDEX == name.getArea()) { - indexSize += streamSize; - } else { - dataSize += streamSize; - } - } - } - - dirEntry.setIndexLength(indexSize).setDataLength(dataSize); - OrcProto.StripeFooter footer = footerBuilder.build(); - // Do we need to pad the file so the stripe doesn't straddle a block boundary? - padStripe(indexSize + dataSize + footer.getSerializedSize()); - - // write out the data streams - for (Map.Entry pair : streams.entrySet()) { - pair.getValue().spillToDiskAndClear(out); - } - - // Write out the footer. - writeStripeFooter(footer, dataSize, indexSize, dirEntry); - } - - @Override - public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException { - long startPosition = out.getPos(); - OrcProto.Metadata metadata = builder.build(); - writeMetadata(metadata); - this.metadataLength = (int) (out.getPos() - startPosition); - } - - @Override - public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException { - long bodyLength = out.getPos() - metadataLength; - builder.setContentLength(bodyLength); - builder.setHeaderLength(headerLength); - long startPosition = out.getPos(); - OrcProto.Footer footer = builder.build(); - writeFileFooter(footer); - this.footerLength = (int) (out.getPos() - startPosition); - } - - @Override - public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException { - builder.setFooterLength(footerLength); - builder.setMetadataLength(metadataLength); - - OrcProto.PostScript ps = builder.build(); - // need to write this uncompressed - long startPosition = out.getPos(); - ps.writeTo(out); - long length = out.getPos() - startPosition; - - if (length > 255) { - throw new IllegalArgumentException("PostScript too large at " + length); - } - - out.write((int) length); - return out.getPos(); - } - - @Override - public void close() { - // Just release the codec but don't close the internal stream here to avoid - // Stream Closed or ClosedChannelException when Flink performs checkpoint. - OrcCodecPool.returnCodec(compress, codec); - codec = null; - } - - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry) - throws IOException { - long start = out.getPos(); - int length = buffer.remaining(); - long availBlockSpace = blockSize - (start % blockSize); - - // see if stripe can fit in the current hdfs block, else pad the remaining - // space in the block - if (length < blockSize && length > availBlockSpace && addBlockPadding) { - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; - LOG.info(String.format("Padding ORC by %d bytes while merging..", availBlockSpace)); - start += availBlockSpace; - while (availBlockSpace > 0) { - int writeLen = (int) Math.min(availBlockSpace, pad.length); - out.write(pad, 0, writeLen); - availBlockSpace -= writeLen; - } - } - - out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length); - dirEntry.setOffset(start); - } - - @Override - public CompressionCodec getCompressionCodec() { - return this.codec; - } - - @Override - public long getFileBytes(int column) { - long size = 0; - - for (final Map.Entry pair : streams.entrySet()) { - final BufferedStream receiver = pair.getValue(); - if (!receiver.isSuppressed) { - - final StreamName name = pair.getKey(); - if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) { - size += receiver.getOutputSize(); - } - } - } - - return size; - } - - private void padStripe(long stripeSize) throws IOException { - this.stripeStart = out.getPos(); - long previousBytesInBlock = (stripeStart - blockOffset) % blockSize; - - // We only have options if this isn't the first stripe in the block - if (previousBytesInBlock > 0) { - if (previousBytesInBlock + stripeSize >= blockSize) { - // Try making a short block - if (writeVariableLengthBlocks && shims.endVariableLengthBlock(out)) { - blockOffset = stripeStart; - } else if (addBlockPadding) { - // if we cross the block boundary, figure out what we should do - long padding = blockSize - previousBytesInBlock; - if (padding <= maxPadding) { - writeZeros(out, padding); - stripeStart += padding; - } - } - } - } - } - - private void writeStripeFooter( - OrcProto.StripeFooter footer, - long dataSize, - long indexSize, - OrcProto.StripeInformation.Builder dirEntry) - throws IOException { - writeStripeFooter(footer); - - dirEntry.setOffset(stripeStart); - dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize - indexSize); - } - - protected void writeMetadata(OrcProto.Metadata metadata) throws IOException { - metadata.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - } - - protected void writeFileFooter(OrcProto.Footer footer) throws IOException { - footer.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - } - - protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException { - footer.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - } - - private static void writeZeros(OutputStream output, long remaining) throws IOException { - while (remaining > 0) { - long size = Math.min(ZEROS.length, remaining); - output.write(ZEROS, 0, (int) size); - remaining -= size; - } - } - - private static class DirectStream implements OutputReceiver { - private final FSDataOutputStream output; - - DirectStream(FSDataOutputStream output) { - this.output = output; - } - - public void output(ByteBuffer buffer) throws IOException { - this.output.write( - buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); - } - - public void suppress() { - throw new UnsupportedOperationException("Can't suppress direct stream"); - } - } - - private static final class BufferedStream implements OutputReceiver { - private boolean isSuppressed = false; - private final List output = new ArrayList<>(); - - @Override - public void output(ByteBuffer buffer) { - if (!isSuppressed) { - output.add(buffer); - } - } - - public void suppress() { - isSuppressed = true; - output.clear(); - } - - void spillToDiskAndClear(FSDataOutputStream raw) throws IOException { - if (!isSuppressed) { - for (ByteBuffer buffer : output) { - raw.write( - buffer.array(), - buffer.arrayOffset() + buffer.position(), - buffer.remaining()); - } - output.clear(); - } - isSuppressed = false; - } - - public long getOutputSize() { - long result = 0; - for (ByteBuffer buffer : output) { - result += buffer.remaining(); - } - return result; - } - } -} diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java index ccfa98cc8e4ed..73dfd1354fee7 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.stream.IntStream; import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath; import static org.assertj.core.api.Assertions.assertThat; @@ -189,30 +190,38 @@ public void testReadDecimalTypeFile() throws IOException { @Test public void testReadFileWithPartitionFields() throws IOException { LinkedHashMap partSpec = new LinkedHashMap<>(); - partSpec.put("f1", "1"); - partSpec.put("f3", "3"); - partSpec.put("f5", "f5"); - partSpec.put("f8", BigDecimal.valueOf(5.333).toString()); - partSpec.put("f13", "f13"); + partSpec.put("_col1", "1"); + partSpec.put("_col3", "3"); + partSpec.put("_col5", "f5"); + partSpec.put("_col8", BigDecimal.valueOf(5.333).toString()); + partSpec.put("_col13", "f13"); final Path flatFile = copyFileFromResource("test-data-flat.orc", partSpec); + LogicalType[] dataTypes = { + /* 0 */ + DataTypes.INT().getLogicalType(), + /* 1 */ DataTypes.INT().getLogicalType(), // part-1 + /* 2 */ DataTypes.STRING().getLogicalType(), + /* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2 + /* 4 */ DataTypes.STRING().getLogicalType(), + /* 5 */ DataTypes.STRING().getLogicalType(), // part-3 + /* 6 */ DataTypes.STRING().getLogicalType(), + /* 7 */ DataTypes.INT().getLogicalType(), + /* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), // part-4 + /* 9 */ DataTypes.STRING().getLogicalType(), + /* 11*/ DataTypes.INT().getLogicalType(), + /* 12*/ DataTypes.INT().getLogicalType(), + /* 13*/ DataTypes.STRING().getLogicalType(), // part-5 + /* 14*/ DataTypes.INT().getLogicalType() + }; + RowType tableType = RowType.of( - /* 0 */ DataTypes.INT().getLogicalType(), - /* 1 */ DataTypes.INT().getLogicalType(), // part-1 - /* 2 */ DataTypes.STRING().getLogicalType(), - /* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2 - /* 4 */ DataTypes.STRING().getLogicalType(), - /* 5 */ DataTypes.STRING().getLogicalType(), // part-3 - /* 6 */ DataTypes.STRING().getLogicalType(), - /* 7 */ DataTypes.INT().getLogicalType(), - /* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), // part-4 - /* 9 */ DataTypes.STRING().getLogicalType(), - /* 11*/ DataTypes.INT().getLogicalType(), - /* 12*/ DataTypes.INT().getLogicalType(), - /* 13*/ DataTypes.STRING().getLogicalType(), // part-5 - /* 14*/ DataTypes.INT().getLogicalType()); + dataTypes, + IntStream.range(0, dataTypes.length) + .mapToObj(i -> "_col" + i) + .toArray(String[]::new)); int[] projectedFields = {8, 1, 3, 0, 5, 2}; diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java index 1d0d05e7f0d08..513ac4b4d583d 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java @@ -147,11 +147,11 @@ public void testReadFileWithSelectFields() throws IOException { long totalF0 = 0; Map partSpec = new HashMap<>(); - partSpec.put("f1", 1); - partSpec.put("f3", 3L); - partSpec.put("f5", "f5"); - partSpec.put("f8", BigDecimal.valueOf(5.333)); - partSpec.put("f13", "f13"); + partSpec.put("_col1", 1); + partSpec.put("_col3", 3L); + partSpec.put("_col5", "f5"); + partSpec.put("_col8", BigDecimal.valueOf(5.333)); + partSpec.put("_col13", "f13"); // read all splits for (FileInputSplit split : splits) { @@ -239,11 +239,11 @@ protected void prepareReadFileWithTypes(String file, int rowSize) throws IOExcep TypeDescription schema = TypeDescription.fromString( "struct<" - + "f0:float," - + "f1:double," - + "f2:timestamp," - + "f3:tinyint," - + "f4:smallint" + + "_col0:float," + + "_col1:double," + + "_col2:timestamp," + + "_col3:tinyint," + + "_col4:smallint" + ">"); org.apache.hadoop.fs.Path filePath = new org.apache.hadoop.fs.Path(file); @@ -301,15 +301,15 @@ public void testReadFileWithTypes() throws IOException { int cnt = 0; Map partSpec = new HashMap<>(); - partSpec.put("f5", true); - partSpec.put("f6", new Date(562423)); - partSpec.put("f7", LocalDateTime.of(1999, 1, 1, 1, 1)); - partSpec.put("f8", 6.6); - partSpec.put("f9", null); - partSpec.put("f10", null); - partSpec.put("f11", null); - partSpec.put("f12", null); - partSpec.put("f13", null); + partSpec.put("_col5", true); + partSpec.put("_col6", new Date(562423)); + partSpec.put("_col7", LocalDateTime.of(1999, 1, 1, 1, 1)); + partSpec.put("_col8", 6.6); + partSpec.put("_col9", null); + partSpec.put("_col10", null); + partSpec.put("_col11", null); + partSpec.put("_col12", null); + partSpec.put("_col13", null); try (OrcColumnarRowSplitReader reader = createReader( new int[] {2, 0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, @@ -400,7 +400,9 @@ protected OrcColumnarRowSplitReader createReader( return OrcSplitReaderUtil.genPartColumnarRowReader( "2.3.0", new Configuration(), - IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new), + IntStream.range(0, fullTypes.length) + .mapToObj(i -> "_col" + i) + .toArray(String[]::new), fullTypes, partitionSpec, selectedFields, diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java index 6262f22e714ca..d30863d255f0c 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java @@ -206,10 +206,10 @@ protected static void assertOrcFormatTableStatsEquals( "f_timestamp9", new ColumnStats.Builder() .setMax( - DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3) + DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123456789", 9) .toTimestamp()) .setMin( - DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3) + DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123456789", 9) .toTimestamp()) .setNullCount(0L) .build()); diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java index 33e649db3f0ea..eb543ac6e45da 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java @@ -43,36 +43,36 @@ public class OrcBulkWriterTestUtil { public static final String USER_METADATA_KEY = "userKey"; public static final ByteBuffer USER_METADATA_VALUE = ByteBuffer.wrap("hello".getBytes()); - public static void validate(File files, List expected) throws IOException { - final File[] buckets = files.listFiles(); - assertThat(buckets).isNotNull(); - assertThat(buckets).hasSize(1); + public static void validate(File files, List expected, CompressionKind compressionKind) + throws IOException { + assertThat(files).isNotNull(); + assertThat(files.exists()).isTrue(); + assertThat(expected).isNotNull(); + assertThat(expected).isNotEmpty(); + validateBucketAndFileSize(files, 1); + + final File[] buckets = files.listFiles(); final File[] partFiles = buckets[0].listFiles(); - assertThat(partFiles).isNotNull(); for (File partFile : partFiles) { - assertThat(partFile.length()).isGreaterThan(0); + Reader reader = getOrcReader(partFile); - OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(new Configuration()); - Reader reader = - OrcFile.createReader( - new org.apache.hadoop.fs.Path(partFile.toURI()), readerOptions); - - assertThat(reader.getNumberOfRows()).isEqualTo(3); - assertThat(reader.getSchema().getFieldNames()).hasSize(2); - assertThat(reader.getCompressionKind()).isSameAs(CompressionKind.LZ4); + assertThat(reader.getNumberOfRows()).isEqualTo(expected.size()); + assertThat(reader.getSchema().getFieldNames()) + .hasSize(expected.get(0).getClass().getDeclaredFields().length); + assertThat(reader.getCompressionKind()).isSameAs(compressionKind); assertThat(reader.hasMetadataValue(USER_METADATA_KEY)).isTrue(); assertThat(reader.getMetadataKeys()).contains(USER_METADATA_KEY); List results = getResults(reader); - assertThat(results).hasSize(3); + assertThat(results).hasSize(expected.size()); assertThat(results).isEqualTo(expected); } } - private static List getResults(Reader reader) throws IOException { + public static List getResults(Reader reader) throws IOException { List results = new ArrayList<>(); RecordReader recordReader = reader.rows(); @@ -96,4 +96,21 @@ private static List getResults(Reader reader) throws IOException { return results; } + + public static void validateBucketAndFileSize(File outputDir, int bucketCount) { + final File[] buckets = outputDir.listFiles(); + assertThat(buckets).isNotNull(); + assertThat(buckets.length).isEqualTo(bucketCount); + + final File[] partFiles = buckets[0].listFiles(); + assertThat(partFiles.length).isNotNull(); + } + + public static Reader getOrcReader(File orcFile) throws IOException { + assertThat(orcFile.exists()).isTrue(); + assertThat(orcFile.length()).isGreaterThan(0); + + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(new Configuration()); + return OrcFile.createReader(new org.apache.hadoop.fs.Path(orcFile.toURI()), readerOptions); + } } diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java index 81d143d874d93..944d31e304402 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java @@ -31,6 +31,7 @@ import org.apache.flink.util.TestLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.orc.CompressionKind; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -73,6 +74,6 @@ public void testOrcBulkWriter() throws Exception { env.execute(); - OrcBulkWriterTestUtil.validate(outDir, testData); + OrcBulkWriterTestUtil.validate(outDir, testData, CompressionKind.LZ4); } } diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java index 4a5c15649b5bc..2bc6cd5711bf2 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java @@ -28,9 +28,10 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.hadoop.conf.Configuration; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.apache.orc.CompressionKind; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.File; import java.util.Arrays; @@ -40,22 +41,31 @@ /** Unit test for the ORC BulkWriter implementation. */ public class OrcBulkWriterTest { - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @TempDir private java.nio.file.Path tempDir; private final String schema = "struct<_col0:string,_col1:int>"; private final List input = Arrays.asList(new Record("Shiv", 44), new Record("Jesse", 23), new Record("Walt", 50)); - @Test - public void testOrcBulkWriter() throws Exception { - final File outDir = TEMPORARY_FOLDER.newFolder(); + @ParameterizedTest + @EnumSource(CompressionKind.class) + public void writeOrcFileWithCodec(CompressionKind codec) throws Exception { + final File outDir = tempDir.toFile(); final Properties writerProps = new Properties(); - writerProps.setProperty("orc.compress", "LZ4"); + writerProps.setProperty("orc.compress", codec.name()); - final OrcBulkWriterFactory writer = + OrcBulkWriterFactory writer = new OrcBulkWriterFactory<>( new RecordVectorizer(schema), writerProps, new Configuration()); + writeRecordsIntoOrcFile(outDir, writer); + + // validate records and compression kind + OrcBulkWriterTestUtil.validate(outDir, input, codec); + } + + private void writeRecordsIntoOrcFile(File outDir, OrcBulkWriterFactory writer) + throws Exception { StreamingFileSink sink = StreamingFileSink.forBulkFormat(new Path(outDir.toURI()), writer) .withBucketAssigner(new UniqueBucketAssigner<>("test")) @@ -75,8 +85,6 @@ public void testOrcBulkWriter() throws Exception { testHarness.snapshot(1, ++time); testHarness.notifyOfCompletedCheckpoint(1); - - OrcBulkWriterTestUtil.validate(outDir, input); } } } diff --git a/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE b/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE index 3346ec6a9e373..969ec29df0fcd 100644 --- a/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE +++ b/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE @@ -6,11 +6,10 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- org.apache.orc:orc-core:1.5.6 -- org.apache.orc:orc-shims:1.5.6 -- org.apache.hive:hive-storage-api:2.6.0 -- io.airlift:aircompressor:0.10 -- commons-lang:commons-lang:2.6 +- org.apache.orc:orc-core:1.7.5 +- org.apache.orc:orc-shims:1.7.5 +- org.apache.hive:hive-storage-api:2.8.1 +- io.airlift:aircompressor:0.21 This project bundles the following dependencies under the BSD license. See bundled license files for details. diff --git a/pom.xml b/pom.xml index fe14bbd874c30..db993310827a9 100644 --- a/pom.xml +++ b/pom.xml @@ -167,7 +167,7 @@ under the License. --> 3.2.0 2.3.9 - 1.5.6 + 1.7.5