diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java index f651406df740..8277546b5255 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java @@ -431,6 +431,44 @@ public static void setDefaultValues(Record record, List field } } + /** + * Sets a value into a {@link Record} using a struct-only field path (top-level column or nested + * through structs). Intermediate struct records are created as needed. + * + *

If the path traverses a non-struct type (e.g. list/map), the operation is ignored. + */ + public static void setStructField(Record root, String[] path, Object value) { + if (root == null || path == null || path.length == 0) { + return; + } + Record current = root; + Types.StructType currentStruct = root.struct(); + + for (int i = 0; i < path.length - 1; i++) { + String fieldName = path[i]; + Types.NestedField field = currentStruct.field(fieldName); + if (field == null || !field.type().isStructType()) { + return; + } + Types.StructType nestedStruct = field.type().asStructType(); + current = getOrCreateStructRecord(current, fieldName, nestedStruct); + currentStruct = nestedStruct; + } + + current.setField(path[path.length - 1], value); + } + + private static Record getOrCreateStructRecord( + Record parent, String fieldName, Types.StructType structType) { + Object value = parent.getField(fieldName); + if (value instanceof Record) { + return (Record) value; + } + Record record = GenericRecord.create(structType); + parent.setField(fieldName, record); + return record; + } + // Special method for nested structs that always applies defaults to null fields private static void setDefaultValuesForNestedStruct(Record record, List fields) { for (Types.NestedField field : fields) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java index 6489c78f9edf..234cf928432b 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java @@ -20,7 +20,6 @@ package org.apache.iceberg.mr.hive.writer; import java.util.Map; -import java.util.function.Supplier; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -33,12 +32,13 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.VariantShreddingFunction; import org.apache.iceberg.parquet.VariantUtil; class HiveFileWriterFactory extends BaseFileWriterFactory { private final Map properties; - private Supplier sampleRecord = null; + private Record sampleRecord = null; HiveFileWriterFactory( Table table, @@ -85,9 +85,34 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { @Override protected void configureDataWrite(Parquet.DataWriteBuilder builder) { builder.createWriterFunc(GenericParquetWriter::create); - // Configure variant shredding function if conditions are met: - VariantUtil.variantShreddingFunc(dataSchema(), sampleRecord, properties) - .ifPresent(builder::variantShreddingFunc); + // Configure variant shredding if enabled and a sample record is available + if (VariantUtil.shouldUseVariantShredding(properties, dataSchema())) { + setVariantShreddingFunc(builder, VariantUtil.variantShreddingFunc(sampleRecord, dataSchema())); + } + } + + /** + * Sets a {@link VariantShreddingFunction} on the underlying Parquet write builder. + * + *

{@link Parquet.DataWriteBuilder} does not expose {@code variantShreddingFunc} directly; it is set on an + * internal write builder held in the private {@code appenderBuilder} field. This method uses reflection to + * access that internal builder and invoke {@code variantShreddingFunc(VariantShreddingFunction)}. + * + * TODO: Replace with {@code DataWriteBuilder.variantShreddingFunc(VariantShreddingFunction)} + * once it becomes publicly available. + */ + private static void setVariantShreddingFunc(Parquet.DataWriteBuilder dataWriteBuilder, + VariantShreddingFunction fn) { + try { + java.lang.reflect.Field field = dataWriteBuilder.getClass().getDeclaredField("appenderBuilder"); + field.setAccessible(true); + Object writeBuilder = field.get(dataWriteBuilder); + writeBuilder.getClass() + .getMethod("variantShreddingFunc", VariantShreddingFunction.class) + .invoke(writeBuilder, fn); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } } @Override @@ -164,7 +189,7 @@ HiveFileWriterFactory build() { * Set a sample record to use for data-driven variant shredding schema generation. * Should be called before the Parquet writer is created. */ - public void initialize(Supplier record) { + public void initialize(Record record) { if (sampleRecord == null) { sampleRecord = record; } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java index e8d5ed2a8e4b..f48bf21345e2 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.List; -import java.util.Set; import org.apache.hadoop.io.Writable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; @@ -29,7 +28,6 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; -import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.io.DataWriteResult; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.mr.hive.FilesForCommit; @@ -37,32 +35,18 @@ import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context; import org.apache.iceberg.mr.mapred.Container; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; -class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase { - - private final int currentSpecId; - private final Set missingColumns; - private final List missingOrStructFields; +class HiveIcebergCopyOnWriteRecordWriter extends SchemaInferringDefaultsWriter { private final GenericRecord rowDataTemplate; private final List replacedDataFiles; - private final HiveFileWriterFactory fileWriterFactory; - HiveIcebergCopyOnWriteRecordWriter(Table table, HiveFileWriterFactory writerFactory, OutputFileFactory deleteFileFactory, Context context) { - super(table, newDataWriter(table, writerFactory, deleteFileFactory, context)); + super(table, writerFactory, deleteFileFactory, context); - this.currentSpecId = table.spec().specId(); this.rowDataTemplate = GenericRecord.create(table.schema()); this.replacedDataFiles = Lists.newArrayList(); - - this.missingColumns = context.missingColumns(); - this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream() - .filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()) - .toList(); - this.fileWriterFactory = writerFactory; } @Override @@ -82,9 +66,7 @@ public void write(Writable row) throws IOException { .build(); replacedDataFiles.add(dataFile); } else { - HiveSchemaUtil.setDefaultValues(rowData, missingOrStructFields, missingColumns); - fileWriterFactory.initialize(() -> rowData); - writer.write(rowData, specs.get(currentSpecId), partition(rowData, currentSpecId)); + writeOrBuffer(rowData); } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java index ca9d232e3d58..cb9a21e9780c 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java @@ -21,45 +21,27 @@ import java.io.IOException; import java.util.List; -import java.util.Set; import org.apache.hadoop.io.Writable; import org.apache.iceberg.DataFile; import org.apache.iceberg.Table; import org.apache.iceberg.data.Record; -import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.io.DataWriteResult; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.mr.hive.FilesForCommit; import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context; import org.apache.iceberg.mr.mapred.Container; -import org.apache.iceberg.types.Types; -class HiveIcebergRecordWriter extends HiveIcebergWriterBase { - - private final int currentSpecId; - private final Set missingColumns; - private final List missingOrStructFields; - - private final HiveFileWriterFactory fileWriterFactory; +class HiveIcebergRecordWriter extends SchemaInferringDefaultsWriter { HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory, OutputFileFactory dataFileFactory, Context context) { - super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, context)); - - this.currentSpecId = table.spec().specId(); - this.missingColumns = context.missingColumns(); - this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream() - .filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()) - .toList(); - this.fileWriterFactory = fileWriterFactory; + super(table, fileWriterFactory, dataFileFactory, context); } @Override public void write(Writable row) throws IOException { Record record = ((Container) row).get(); - HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, missingColumns); - fileWriterFactory.initialize(() -> record); - writer.write(record, specs.get(currentSpecId), partition(record, currentSpecId)); + writeOrBuffer(record); } @Override diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java index f797c37c130c..ad026a83040b 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java @@ -80,7 +80,7 @@ public void close(boolean abort) throws IOException { .retry(3) .suppressFailureWhenFinished() .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception)) - .run(file -> io.deleteFile(file.path().toString())); + .run(file -> io.deleteFile(file.location())); LOG.warn("HiveIcebergWriter is closed with abort"); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/SchemaInferringDefaultsWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/SchemaInferringDefaultsWriter.java new file mode 100644 index 000000000000..487085b6631d --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/SchemaInferringDefaultsWriter.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.writer; + +import java.io.IOException; +import java.util.BitSet; +import java.util.List; +import java.util.Set; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hive.HiveSchemaUtil; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context; +import org.apache.iceberg.parquet.VariantUtil; +import org.apache.iceberg.parquet.VariantUtil.VariantField; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types.NestedField; + +abstract class SchemaInferringDefaultsWriter extends HiveIcebergWriterBase { + + private static final int VARIANT_SAMPLE_BUFFER_SIZE = 100; + + private final HiveFileWriterFactory fileWriterFactory; + + private final int currentSpecId; + private final Set missingColumns; + private final List missingOrStructFields; + + private final List variantFields; + private final BitSet sampledVariantFields; + + private final List buffer; + private final Record accumulatedSample; + private boolean sampleInitialized = false; + + SchemaInferringDefaultsWriter( + Table table, + HiveFileWriterFactory fileWriterFactory, + OutputFileFactory dataFileFactory, + Context context) { + + super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, context)); + Schema schema = table.schema(); + this.fileWriterFactory = fileWriterFactory; + + this.currentSpecId = table.spec().specId(); + this.missingColumns = context.missingColumns(); + this.missingOrStructFields = schema.columns().stream() + .filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()) + .toList(); + + this.variantFields = VariantUtil.variantFieldsForShredding(table.properties(), schema); + this.sampledVariantFields = new BitSet(variantFields.size()); + + boolean shouldBuffer = !variantFields.isEmpty(); + this.buffer = shouldBuffer ? Lists.newArrayListWithCapacity(VARIANT_SAMPLE_BUFFER_SIZE) : null; + this.accumulatedSample = shouldBuffer ? GenericRecord.create(schema) : null; + } + + protected void writeOrBuffer(Record record) { + HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, missingColumns); + + if (buffer != null && !sampleInitialized) { + accumulateSample(record); + + if (allVariantFieldsSampled() || buffer.size() >= VARIANT_SAMPLE_BUFFER_SIZE) { + // Use accumulated sample for schema inference + fileWriterFactory.initialize(accumulatedSample); + sampleInitialized = true; + + flushBufferedRecords(); + } else { + buffer.add(record.copy()); + return; + } + } + writeRecord(record); + } + + private void writeRecord(Record record) { + writer.write(record, specs.get(currentSpecId), partition(record, currentSpecId)); + } + + private void flushBufferedRecords() { + for (Record bufferedRecord : buffer) { + writeRecord(bufferedRecord); + } + buffer.clear(); + } + + private boolean allVariantFieldsSampled() { + return sampledVariantFields.nextClearBit(0) >= variantFields.size(); + } + + private void accumulateSample(Record record) { + if (accumulatedSample == null || allVariantFieldsSampled()) { + return; + } + for (int fieldIndex = sampledVariantFields.nextClearBit(0); + fieldIndex < variantFields.size(); + fieldIndex = sampledVariantFields.nextClearBit(fieldIndex + 1)) { + trySampleVariantField(fieldIndex, record); + } + } + + private void trySampleVariantField(int fieldIndex, Record record) { + VariantField variantField = variantFields.get(fieldIndex); + Object val = safeGet(variantField, record); + if (!VariantUtil.isShreddable(val)) { + return; + } + HiveSchemaUtil.setStructField(accumulatedSample, variantField.path(), val); + sampledVariantFields.set(fieldIndex); + } + + private static Object safeGet(VariantField variantField, Record record) { + try { + return variantField.accessor().get(record); + } catch (RuntimeException e) { + // Treat unexpected access failures as "no sample" and keep scanning. + return null; + } + } + + @Override + public void close(boolean abort) throws IOException { + if (buffer != null) { + if (abort) { + // Don't write anything on abort. Just drop any buffered records. + buffer.clear(); + } else if (!buffer.isEmpty()) { + if (!sampleInitialized) { + // Use whatever we have accumulated so far + fileWriterFactory.initialize(accumulatedSample); + } + flushBufferedRecords(); + } + } + super.close(abort); + } + +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java deleted file mode 100644 index 56e3f3480c74..000000000000 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ /dev/null @@ -1,1524 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.parquet; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Files; -import org.apache.iceberg.InternalData; -import org.apache.iceberg.MetricsConfig; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.SystemConfigs; -import org.apache.iceberg.Table; -import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.deletes.EqualityDeleteWriter; -import org.apache.iceberg.deletes.PositionDeleteWriter; -import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.encryption.EncryptionKeyMetadata; -import org.apache.iceberg.encryption.NativeEncryptionInputFile; -import org.apache.iceberg.encryption.NativeEncryptionOutputFile; -import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.hadoop.HadoopInputFile; -import org.apache.iceberg.hadoop.HadoopOutputFile; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.DeleteSchemaUtil; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.mapping.NameMapping; -import org.apache.iceberg.parquet.ParquetValueWriters.PositionDeleteStructWriter; -import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.util.ArrayUtil; -import org.apache.iceberg.util.ByteBuffers; -import org.apache.iceberg.util.PropertyUtil; -import org.apache.parquet.HadoopReadOptions; -import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.avro.AvroReadSupport; -import org.apache.parquet.avro.AvroWriteSupport; -import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.column.ParquetProperties.WriterVersion; -import org.apache.parquet.conf.PlainParquetConfiguration; -import org.apache.parquet.crypto.FileDecryptionProperties; -import org.apache.parquet.crypto.FileEncryptionProperties; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.ParquetOutputFormat; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type.ID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT; -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; -import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; -import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; -import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; -import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; -import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT; -import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT; -import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; -import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT; -import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT; -import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; -import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT; -import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; -import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT; -import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; - -// TODO: remove class once upgraded to Iceberg v1.11.0 (https://github.com/apache/iceberg/pull/14153) - -public class Parquet { - private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); - - private Parquet() { - } - - private static final Collection READ_PROPERTIES_TO_REMOVE = - Sets.newHashSet( - "parquet.read.filter", - "parquet.private.read.filter.predicate", - "parquet.read.support.class", - "parquet.crypto.factory.class"); - - public static WriteBuilder write(OutputFile file) { - if (file instanceof EncryptedOutputFile) { - return write((EncryptedOutputFile) file); - } - - return new WriteBuilder(file); - } - - public static WriteBuilder write(EncryptedOutputFile file) { - if (file instanceof NativeEncryptionOutputFile) { - NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; - return write(nativeFile.plainOutputFile()) - .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) - .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); - } else { - return write(file.encryptingOutputFile()); - } - } - - public static class WriteBuilder implements InternalData.WriteBuilder { - private final OutputFile file; - private final Configuration conf; - private final Map metadata = Maps.newLinkedHashMap(); - private final Map config = Maps.newLinkedHashMap(); - private Schema schema = null; - private VariantShreddingFunction variantShreddingFunc = null; - private String name = "table"; - private WriteSupport writeSupport = null; - private BiFunction> createWriterFunc = null; - private MetricsConfig metricsConfig = MetricsConfig.getDefault(); - private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE; - private WriterVersion writerVersion = WriterVersion.PARQUET_1_0; - private Function, Context> createContextFunc = Context::dataContext; - private ByteBuffer fileEncryptionKey = null; - private ByteBuffer fileAADPrefix = null; - - private WriteBuilder(OutputFile file) { - this.file = file; - if (file instanceof HadoopOutputFile) { - this.conf = new Configuration(((HadoopOutputFile) file).getConf()); - } else { - this.conf = new Configuration(); - } - } - - public WriteBuilder forTable(Table table) { - schema(table.schema()); - setAll(table.properties()); - metricsConfig(MetricsConfig.forTable(table)); - return this; - } - - @Override - public WriteBuilder schema(Schema newSchema) { - this.schema = newSchema; - return this; - } - - /** - * Set a {@link VariantShreddingFunction} that is called with each variant field's name and - * field ID to produce the shredding type as a {@code typed_value} field. This field is added to - * the result variant struct alongside the {@code metadata} and {@code value} fields. - * - * @param func {@link VariantShreddingFunction} that produces a shredded {@code typed_value} - * @return this for method chaining - */ - public WriteBuilder variantShreddingFunc(VariantShreddingFunction func) { - this.variantShreddingFunc = func; - return this; - } - - @Override - public WriteBuilder named(String newName) { - this.name = newName; - return this; - } - - public WriteBuilder writeSupport(WriteSupport newWriteSupport) { - this.writeSupport = newWriteSupport; - return this; - } - - @Override - public WriteBuilder set(String property, String value) { - config.put(property, value); - return this; - } - - public WriteBuilder setAll(Map properties) { - config.putAll(properties); - return this; - } - - @Override - public WriteBuilder meta(String property, String value) { - metadata.put(property, value); - return this; - } - - public WriteBuilder createWriterFunc( - Function> newCreateWriterFunc) { - if (newCreateWriterFunc != null) { - this.createWriterFunc = (icebergSchema, type) -> newCreateWriterFunc.apply(type); - } - return this; - } - - public WriteBuilder createWriterFunc( - BiFunction> newCreateWriterFunc) { - this.createWriterFunc = newCreateWriterFunc; - return this; - } - - public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { - this.metricsConfig = newMetricsConfig; - return this; - } - - @Override - public WriteBuilder overwrite() { - return overwrite(true); - } - - public WriteBuilder overwrite(boolean enabled) { - this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE; - return this; - } - - public WriteBuilder writerVersion(WriterVersion version) { - this.writerVersion = version; - return this; - } - - public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { - this.fileEncryptionKey = encryptionKey; - return this; - } - - public WriteBuilder withAADPrefix(ByteBuffer aadPrefix) { - this.fileAADPrefix = aadPrefix; - return this; - } - - @SuppressWarnings("unchecked") - private WriteSupport getWriteSupport(MessageType type) { - if (writeSupport != null) { - return (WriteSupport) writeSupport; - } else { - return new AvroWriteSupport<>( - type, - ParquetAvro.parquetAvroSchema(AvroSchemaUtil.convert(schema, name)), - ParquetAvro.DEFAULT_MODEL); - } - } - - /* - * Sets the writer version. Default value is PARQUET_1_0 (v1). - */ - @VisibleForTesting - WriteBuilder withWriterVersion(WriterVersion version) { - this.writerVersion = version; - return this; - } - - // supposed to always be a private method used strictly by data and delete write builders - private WriteBuilder createContextFunc( - Function, Context> newCreateContextFunc) { - this.createContextFunc = newCreateContextFunc; - return this; - } - - private void setBloomFilterConfig( - Context context, - Map colNameToParquetPathMap, - BiConsumer withBloomFilterEnabled, - BiConsumer withBloomFilterFPP) { - - context - .columnBloomFilterEnabled() - .forEach( - (colPath, isEnabled) -> { - String parquetColumnPath = colNameToParquetPathMap.get(colPath); - if (parquetColumnPath == null) { - LOG.warn("Skipping bloom filter config for missing field: {}", colPath); - return; - } - - withBloomFilterEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); - String fpp = context.columnBloomFilterFpp().get(colPath); - if (fpp != null) { - withBloomFilterFPP.accept(parquetColumnPath, Double.parseDouble(fpp)); - } - }); - } - - private void setColumnStatsConfig( - Context context, - Map colNameToParquetPathMap, - BiConsumer withColumnStatsEnabled) { - - context - .columnStatsEnabled() - .forEach( - (colPath, isEnabled) -> { - String parquetColumnPath = colNameToParquetPathMap.get(colPath); - if (parquetColumnPath == null) { - LOG.warn("Skipping column statistics config for missing field: {}", colPath); - return; - } - withColumnStatsEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); - }); - } - - @Override - public FileAppender build() throws IOException { - Preconditions.checkNotNull(schema, "Schema is required"); - Preconditions.checkNotNull(name, "Table name is required and cannot be null"); - - // add the Iceberg schema to keyValueMetadata - meta("iceberg.schema", SchemaParser.toJson(schema)); - - // Map Iceberg properties to pass down to the Parquet writer - Context context = createContextFunc.apply(config); - - int rowGroupSize = context.rowGroupSize(); - int pageSize = context.pageSize(); - int pageRowLimit = context.pageRowLimit(); - int dictionaryPageSize = context.dictionaryPageSize(); - String compressionLevel = context.compressionLevel(); - CompressionCodecName codec = context.codec(); - int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount(); - int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); - int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); - boolean dictionaryEnabled = context.dictionaryEnabled(); - - if (compressionLevel != null) { - switch (codec) { - case GZIP: - config.put("zlib.compress.level", compressionLevel); - break; - case BROTLI: - config.put("compression.brotli.quality", compressionLevel); - break; - case ZSTD: - // keep "io.compression.codec.zstd.level" for backwards compatibility - config.put("io.compression.codec.zstd.level", compressionLevel); - config.put("parquet.compression.codec.zstd.level", compressionLevel); - break; - default: - // compression level is not supported; ignore it - } - } - - set("parquet.avro.write-old-list-structure", "false"); - MessageType type = ParquetSchemaUtil.convert(schema, name, variantShreddingFunc); - - FileEncryptionProperties fileEncryptionProperties = null; - if (fileEncryptionKey != null) { - byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey); - byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix); - - fileEncryptionProperties = - FileEncryptionProperties.builder(encryptionKeyArray) - .withAADPrefix(aadPrefixArray) - .withoutAADPrefixStorage() - .build(); - } else { - Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); - } - - Map colNameToParquetPathMap = - type.getColumns().stream() - .filter( - col -> { - ID id = col.getPrimitiveType().getId(); - return id != null && schema.findColumnName(id.intValue()) != null; - }) - .collect( - Collectors.toMap( - col -> schema.findColumnName(col.getPrimitiveType().getId().intValue()), - col -> String.join(".", col.getPath()))); - - if (createWriterFunc != null) { - Preconditions.checkArgument( - writeSupport == null, "Cannot write with both write support and Parquet value writer"); - - for (Map.Entry entry : config.entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - - ParquetProperties.Builder propsBuilder = - ParquetProperties.builder() - .withWriterVersion(writerVersion) - .withPageSize(pageSize) - .withPageRowCountLimit(pageRowLimit) - .withDictionaryEncoding(dictionaryEnabled) - .withDictionaryPageSize(dictionaryPageSize) - .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount) - .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) - .withMaxBloomFilterBytes(bloomFilterMaxBytes); - - setBloomFilterConfig( - context, - colNameToParquetPathMap, - propsBuilder::withBloomFilterEnabled, - propsBuilder::withBloomFilterFPP); - - setColumnStatsConfig(context, colNameToParquetPathMap, propsBuilder::withStatisticsEnabled); - - ParquetProperties parquetProperties = propsBuilder.build(); - - return new org.apache.iceberg.parquet.ParquetWriter<>( - conf, - file, - schema, - type, - rowGroupSize, - metadata, - createWriterFunc, - codec, - parquetProperties, - metricsConfig, - writeMode, - fileEncryptionProperties); - } else { - ParquetWriteBuilder parquetWriteBuilder = - new ParquetWriteBuilder(ParquetIO.file(file)) - .withWriterVersion(writerVersion) - .setType(type) - .setConfig(config) - .setKeyValueMetadata(metadata) - .setWriteSupport(getWriteSupport(type)) - .withCompressionCodec(codec) - .withWriteMode(writeMode) - .withRowGroupSize(rowGroupSize) - .withPageSize(pageSize) - .withPageRowCountLimit(pageRowLimit) - .withDictionaryEncoding(dictionaryEnabled) - .withDictionaryPageSize(dictionaryPageSize) - .withEncryption(fileEncryptionProperties); - - setBloomFilterConfig( - context, - colNameToParquetPathMap, - parquetWriteBuilder::withBloomFilterEnabled, - parquetWriteBuilder::withBloomFilterFPP); - - setColumnStatsConfig( - context, colNameToParquetPathMap, parquetWriteBuilder::withStatisticsEnabled); - - return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); - } - } - - private static class Context { - private final int rowGroupSize; - private final int pageSize; - private final int pageRowLimit; - private final int dictionaryPageSize; - private final CompressionCodecName codec; - private final String compressionLevel; - private final int rowGroupCheckMinRecordCount; - private final int rowGroupCheckMaxRecordCount; - private final int bloomFilterMaxBytes; - private final Map columnBloomFilterFpp; - private final Map columnBloomFilterEnabled; - private final Map columnStatsEnabled; - private final boolean dictionaryEnabled; - - private Context( - int rowGroupSize, - int pageSize, - int pageRowLimit, - int dictionaryPageSize, - CompressionCodecName codec, - String compressionLevel, - int rowGroupCheckMinRecordCount, - int rowGroupCheckMaxRecordCount, - int bloomFilterMaxBytes, - Map columnBloomFilterFpp, - Map columnBloomFilterEnabled, - Map columnStatsEnabled, - boolean dictionaryEnabled) { - this.rowGroupSize = rowGroupSize; - this.pageSize = pageSize; - this.pageRowLimit = pageRowLimit; - this.dictionaryPageSize = dictionaryPageSize; - this.codec = codec; - this.compressionLevel = compressionLevel; - this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount; - this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount; - this.bloomFilterMaxBytes = bloomFilterMaxBytes; - this.columnBloomFilterFpp = columnBloomFilterFpp; - this.columnBloomFilterEnabled = columnBloomFilterEnabled; - this.columnStatsEnabled = columnStatsEnabled; - this.dictionaryEnabled = dictionaryEnabled; - } - - static Context dataContext(Map config) { - int rowGroupSize = - PropertyUtil.propertyAsInt( - config, PARQUET_ROW_GROUP_SIZE_BYTES, PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT); - Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be > 0"); - - int pageSize = - PropertyUtil.propertyAsInt( - config, PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT); - Preconditions.checkArgument(pageSize > 0, "Page size must be > 0"); - - int pageRowLimit = - PropertyUtil.propertyAsInt( - config, PARQUET_PAGE_ROW_LIMIT, PARQUET_PAGE_ROW_LIMIT_DEFAULT); - Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit must be > 0"); - - int dictionaryPageSize = - PropertyUtil.propertyAsInt( - config, PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT); - Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page size must be > 0"); - - String codecAsString = - config.getOrDefault(PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT); - CompressionCodecName codec = toCodec(codecAsString); - - String compressionLevel = - config.getOrDefault(PARQUET_COMPRESSION_LEVEL, PARQUET_COMPRESSION_LEVEL_DEFAULT); - - int rowGroupCheckMinRecordCount = - PropertyUtil.propertyAsInt( - config, - PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, - PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT); - Preconditions.checkArgument( - rowGroupCheckMinRecordCount > 0, "Row group check minimal record count must be > 0"); - - int rowGroupCheckMaxRecordCount = - PropertyUtil.propertyAsInt( - config, - PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, - PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT); - Preconditions.checkArgument( - rowGroupCheckMaxRecordCount > 0, "Row group check maximum record count must be > 0"); - Preconditions.checkArgument( - rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount, - "Row group check maximum record count must be >= minimal record count"); - - int bloomFilterMaxBytes = - PropertyUtil.propertyAsInt( - config, PARQUET_BLOOM_FILTER_MAX_BYTES, PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT); - Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0"); - - Map columnBloomFilterFpp = - PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX); - - Map columnBloomFilterEnabled = - PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX); - - Map columnStatsEnabled = - PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_STATS_ENABLED_PREFIX); - - boolean dictionaryEnabled = - PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); - - return new Context( - rowGroupSize, - pageSize, - pageRowLimit, - dictionaryPageSize, - codec, - compressionLevel, - rowGroupCheckMinRecordCount, - rowGroupCheckMaxRecordCount, - bloomFilterMaxBytes, - columnBloomFilterFpp, - columnBloomFilterEnabled, - columnStatsEnabled, - dictionaryEnabled); - } - - static Context deleteContext(Map config) { - // default delete config using data config - Context dataContext = dataContext(config); - - int rowGroupSize = - PropertyUtil.propertyAsInt( - config, DELETE_PARQUET_ROW_GROUP_SIZE_BYTES, dataContext.rowGroupSize()); - Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be > 0"); - - int pageSize = - PropertyUtil.propertyAsInt( - config, DELETE_PARQUET_PAGE_SIZE_BYTES, dataContext.pageSize()); - Preconditions.checkArgument(pageSize > 0, "Page size must be > 0"); - - int pageRowLimit = - PropertyUtil.propertyAsInt( - config, DELETE_PARQUET_PAGE_ROW_LIMIT, dataContext.pageRowLimit()); - Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit must be > 0"); - - int dictionaryPageSize = - PropertyUtil.propertyAsInt( - config, DELETE_PARQUET_DICT_SIZE_BYTES, dataContext.dictionaryPageSize()); - Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page size must be > 0"); - - String codecAsString = config.get(DELETE_PARQUET_COMPRESSION); - CompressionCodecName codec = - codecAsString != null ? toCodec(codecAsString) : dataContext.codec(); - - String compressionLevel = - config.getOrDefault(DELETE_PARQUET_COMPRESSION_LEVEL, dataContext.compressionLevel()); - - int rowGroupCheckMinRecordCount = - PropertyUtil.propertyAsInt( - config, - DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, - dataContext.rowGroupCheckMinRecordCount()); - Preconditions.checkArgument( - rowGroupCheckMinRecordCount > 0, "Row group check minimal record count must be > 0"); - - int rowGroupCheckMaxRecordCount = - PropertyUtil.propertyAsInt( - config, - DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, - dataContext.rowGroupCheckMaxRecordCount()); - Preconditions.checkArgument( - rowGroupCheckMaxRecordCount > 0, "Row group check maximum record count must be > 0"); - Preconditions.checkArgument( - rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount, - "Row group check maximum record count must be >= minimal record count"); - - boolean dictionaryEnabled = - PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); - - return new Context( - rowGroupSize, - pageSize, - pageRowLimit, - dictionaryPageSize, - codec, - compressionLevel, - rowGroupCheckMinRecordCount, - rowGroupCheckMaxRecordCount, - PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT, - ImmutableMap.of(), - ImmutableMap.of(), - ImmutableMap.of(), - dictionaryEnabled); - } - - private static CompressionCodecName toCodec(String codecAsString) { - try { - return CompressionCodecName.valueOf(codecAsString.toUpperCase(Locale.ENGLISH)); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Unsupported compression codec: " + codecAsString); - } - } - - int rowGroupSize() { - return rowGroupSize; - } - - int pageSize() { - return pageSize; - } - - int pageRowLimit() { - return pageRowLimit; - } - - int dictionaryPageSize() { - return dictionaryPageSize; - } - - CompressionCodecName codec() { - return codec; - } - - String compressionLevel() { - return compressionLevel; - } - - int rowGroupCheckMinRecordCount() { - return rowGroupCheckMinRecordCount; - } - - int rowGroupCheckMaxRecordCount() { - return rowGroupCheckMaxRecordCount; - } - - int bloomFilterMaxBytes() { - return bloomFilterMaxBytes; - } - - Map columnBloomFilterFpp() { - return columnBloomFilterFpp; - } - - Map columnBloomFilterEnabled() { - return columnBloomFilterEnabled; - } - - Map columnStatsEnabled() { - return columnStatsEnabled; - } - - boolean dictionaryEnabled() { - return dictionaryEnabled; - } - } - } - - public static DataWriteBuilder writeData(OutputFile file) { - return new DataWriteBuilder(file); - } - - public static DataWriteBuilder writeData(EncryptedOutputFile file) { - if (file instanceof NativeEncryptionOutputFile) { - NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; - return writeData(nativeFile.plainOutputFile()) - .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) - .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); - } else { - return writeData(file.encryptingOutputFile()); - } - } - - public static class DataWriteBuilder { - private final WriteBuilder appenderBuilder; - private final String location; - private PartitionSpec spec = null; - private StructLike partition = null; - private EncryptionKeyMetadata keyMetadata = null; - private SortOrder sortOrder = null; - - private DataWriteBuilder(OutputFile file) { - this.appenderBuilder = write(file); - this.location = file.location(); - } - - public DataWriteBuilder forTable(Table table) { - schema(table.schema()); - withSpec(table.spec()); - setAll(table.properties()); - metricsConfig(MetricsConfig.forTable(table)); - return this; - } - - public DataWriteBuilder schema(Schema newSchema) { - appenderBuilder.schema(newSchema); - return this; - } - - public DataWriteBuilder set(String property, String value) { - appenderBuilder.set(property, value); - return this; - } - - public DataWriteBuilder setAll(Map properties) { - appenderBuilder.setAll(properties); - return this; - } - - public DataWriteBuilder meta(String property, String value) { - appenderBuilder.meta(property, value); - return this; - } - - public DataWriteBuilder overwrite() { - return overwrite(true); - } - - public DataWriteBuilder overwrite(boolean enabled) { - appenderBuilder.overwrite(enabled); - return this; - } - - public DataWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { - appenderBuilder.metricsConfig(newMetricsConfig); - return this; - } - - public DataWriteBuilder createWriterFunc( - Function> newCreateWriterFunc) { - appenderBuilder.createWriterFunc(newCreateWriterFunc); - return this; - } - - public DataWriteBuilder createWriterFunc( - BiFunction> newCreateWriterFunc) { - appenderBuilder.createWriterFunc(newCreateWriterFunc); - return this; - } - - public DataWriteBuilder variantShreddingFunc(VariantShreddingFunction func) { - appenderBuilder.variantShreddingFunc(func); - return this; - } - - public DataWriteBuilder withSpec(PartitionSpec newSpec) { - this.spec = newSpec; - return this; - } - - public DataWriteBuilder withPartition(StructLike newPartition) { - this.partition = newPartition; - return this; - } - - public DataWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) { - this.keyMetadata = metadata; - return this; - } - - public DataWriteBuilder withFileEncryptionKey(ByteBuffer fileEncryptionKey) { - appenderBuilder.withFileEncryptionKey(fileEncryptionKey); - return this; - } - - public DataWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { - appenderBuilder.withAADPrefix(aadPrefix); - return this; - } - - public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { - this.sortOrder = newSortOrder; - return this; - } - - public DataWriter build() throws IOException { - Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); - Preconditions.checkArgument( - spec.isUnpartitioned() || partition != null, - "Partition must not be null when creating data writer for partitioned spec"); - - FileAppender fileAppender = appenderBuilder.build(); - return new DataWriter<>( - fileAppender, FileFormat.PARQUET, location, spec, partition, keyMetadata, sortOrder); - } - } - - public static DeleteWriteBuilder writeDeletes(OutputFile file) { - return new DeleteWriteBuilder(file); - } - - public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { - if (file instanceof NativeEncryptionOutputFile) { - NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; - return writeDeletes(nativeFile.plainOutputFile()) - .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) - .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); - } else { - return writeDeletes(file.encryptingOutputFile()); - } - } - - public static class DeleteWriteBuilder { - private final WriteBuilder appenderBuilder; - private final String location; - private BiFunction> createWriterFunc = null; - private Schema rowSchema = null; - private PartitionSpec spec = null; - private StructLike partition = null; - private EncryptionKeyMetadata keyMetadata = null; - private int[] equalityFieldIds = null; - private SortOrder sortOrder; - private Function pathTransformFunc = Function.identity(); - - private DeleteWriteBuilder(OutputFile file) { - this.appenderBuilder = write(file); - this.location = file.location(); - } - - public DeleteWriteBuilder forTable(Table table) { - rowSchema(table.schema()); - withSpec(table.spec()); - setAll(table.properties()); - metricsConfig(MetricsConfig.forTable(table)); - return this; - } - - public DeleteWriteBuilder set(String property, String value) { - appenderBuilder.set(property, value); - return this; - } - - public DeleteWriteBuilder setAll(Map properties) { - appenderBuilder.setAll(properties); - return this; - } - - public DeleteWriteBuilder meta(String property, String value) { - appenderBuilder.meta(property, value); - return this; - } - - public DeleteWriteBuilder overwrite() { - return overwrite(true); - } - - public DeleteWriteBuilder overwrite(boolean enabled) { - appenderBuilder.overwrite(enabled); - return this; - } - - public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { - appenderBuilder.metricsConfig(newMetricsConfig); - return this; - } - - public DeleteWriteBuilder createWriterFunc( - Function> newCreateWriterFunc) { - this.createWriterFunc = (ignored, fileSchema) -> newCreateWriterFunc.apply(fileSchema); - return this; - } - - public DeleteWriteBuilder createWriterFunc( - BiFunction> newCreateWriterFunc) { - this.createWriterFunc = newCreateWriterFunc; - return this; - } - - public DeleteWriteBuilder rowSchema(Schema newSchema) { - this.rowSchema = newSchema; - return this; - } - - public DeleteWriteBuilder withSpec(PartitionSpec newSpec) { - this.spec = newSpec; - return this; - } - - public DeleteWriteBuilder withPartition(StructLike key) { - this.partition = key; - return this; - } - - public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) { - this.keyMetadata = metadata; - return this; - } - - public DeleteWriteBuilder withFileEncryptionKey(ByteBuffer fileEncryptionKey) { - appenderBuilder.withFileEncryptionKey(fileEncryptionKey); - return this; - } - - public DeleteWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { - appenderBuilder.withAADPrefix(aadPrefix); - return this; - } - - public DeleteWriteBuilder equalityFieldIds(List fieldIds) { - this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds); - return this; - } - - public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { - this.equalityFieldIds = fieldIds; - return this; - } - - public DeleteWriteBuilder transformPaths(Function newPathTransformFunc) { - this.pathTransformFunc = newPathTransformFunc; - return this; - } - - public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { - this.sortOrder = newSortOrder; - return this; - } - - public EqualityDeleteWriter buildEqualityWriter() throws IOException { - Preconditions.checkState( - rowSchema != null, "Cannot create equality delete file without a schema"); - Preconditions.checkState( - equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); - Preconditions.checkState( - createWriterFunc != null, - "Cannot create equality delete file unless createWriterFunc is set"); - Preconditions.checkArgument( - spec != null, "Spec must not be null when creating equality delete writer"); - Preconditions.checkArgument( - spec.isUnpartitioned() || partition != null, - "Partition must not be null for partitioned writes"); - - meta("delete-type", "equality"); - meta( - "delete-field-ids", - IntStream.of(equalityFieldIds) - .mapToObj(Objects::toString) - .collect(Collectors.joining(", "))); - - // the appender uses the row schema without extra columns - appenderBuilder.schema(rowSchema); - appenderBuilder.createWriterFunc(createWriterFunc); - appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); - - return new EqualityDeleteWriter<>( - appenderBuilder.build(), - FileFormat.PARQUET, - location, - spec, - partition, - keyMetadata, - sortOrder, - equalityFieldIds); - } - - public PositionDeleteWriter buildPositionWriter() throws IOException { - Preconditions.checkState( - equalityFieldIds == null, "Cannot create position delete file using delete field ids"); - Preconditions.checkArgument( - spec != null, "Spec must not be null when creating position delete writer"); - Preconditions.checkArgument( - spec.isUnpartitioned() || partition != null, - "Partition must not be null for partitioned writes"); - Preconditions.checkArgument( - rowSchema == null || createWriterFunc != null, - "Create function should be provided if we write row data"); - - meta("delete-type", "position"); - - if (rowSchema != null && createWriterFunc != null) { - // the appender uses the row schema wrapped with position fields - appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema)); - - appenderBuilder.createWriterFunc( - (schema, parquetSchema) -> { - ParquetValueWriter writer = createWriterFunc.apply(schema, parquetSchema); - if (writer instanceof StructWriter) { - return new PositionDeleteStructWriter( - (StructWriter) writer, pathTransformFunc); - } else { - throw new UnsupportedOperationException( - "Cannot wrap writer for position deletes: " + writer.getClass()); - } - }); - - } else { - appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema()); - - // We ignore the 'createWriterFunc' and 'rowSchema' even if is provided, since we do not - // write row data itself - appenderBuilder.createWriterFunc( - (schema, parquetSchema) -> - new PositionDeleteStructWriter( - (StructWriter) GenericParquetWriter.create(schema, parquetSchema), - Function.identity())); - } - - appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); - - return new PositionDeleteWriter<>( - appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata); - } - } - - private static class ParquetWriteBuilder - extends ParquetWriter.Builder> { - private Map keyValueMetadata = Maps.newHashMap(); - private Map config = Maps.newHashMap(); - private MessageType type; - private WriteSupport writeSupport; - - private ParquetWriteBuilder(org.apache.parquet.io.OutputFile path) { - super(path); - } - - @Override - protected ParquetWriteBuilder self() { - return this; - } - - public ParquetWriteBuilder setKeyValueMetadata(Map keyValueMetadata) { - this.keyValueMetadata = keyValueMetadata; - return self(); - } - - public ParquetWriteBuilder setConfig(Map config) { - this.config = config; - return self(); - } - - public ParquetWriteBuilder setType(MessageType type) { - this.type = type; - return self(); - } - - public ParquetWriteBuilder setWriteSupport(WriteSupport writeSupport) { - this.writeSupport = writeSupport; - return self(); - } - - @Override - protected WriteSupport getWriteSupport(Configuration configuration) { - for (Map.Entry entry : config.entrySet()) { - configuration.set(entry.getKey(), entry.getValue()); - } - return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport); - } - } - - public static ReadBuilder read(InputFile file) { - if (file instanceof NativeEncryptionInputFile) { - NativeEncryptionInputFile nativeFile = (NativeEncryptionInputFile) file; - return new ReadBuilder(nativeFile.encryptedInputFile()) - .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) - .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); - } else { - return new ReadBuilder(file); - } - } - - public static class ReadBuilder implements InternalData.ReadBuilder { - private final InputFile file; - private final Map properties = Maps.newHashMap(); - private Long start = null; - private Long length = null; - private Schema schema = null; - private Expression filter = null; - private ReadSupport readSupport = null; - private Function> batchedReaderFunc = null; - private Function> readerFunc = null; - private BiFunction> readerFuncWithSchema = null; - private boolean filterRecords = true; - private boolean caseSensitive = true; - private boolean callInit = false; - private boolean reuseContainers = false; - private int maxRecordsPerBatch = 10000; - private NameMapping nameMapping = null; - private ByteBuffer fileEncryptionKey = null; - private ByteBuffer fileAADPrefix = null; - - private ReadBuilder(InputFile file) { - this.file = file; - } - - /** - * Restricts the read to the given range: [start, start + length). - * - * @param newStart the start position for this read - * @param newLength the length of the range this read should scan - * @return this builder for method chaining - */ - @Override - public ReadBuilder split(long newStart, long newLength) { - this.start = newStart; - this.length = newLength; - return this; - } - - @Override - public ReadBuilder project(Schema newSchema) { - this.schema = newSchema; - return this; - } - - public ReadBuilder caseInsensitive() { - return caseSensitive(false); - } - - public ReadBuilder caseSensitive(boolean newCaseSensitive) { - this.caseSensitive = newCaseSensitive; - return this; - } - - public ReadBuilder filterRecords(boolean newFilterRecords) { - this.filterRecords = newFilterRecords; - return this; - } - - public ReadBuilder filter(Expression newFilter) { - this.filter = newFilter; - return this; - } - - /** - * @deprecated will be removed in 2.0.0; use {@link #createReaderFunc(Function)} instead - */ - @Deprecated - public ReadBuilder readSupport(ReadSupport newFilterSupport) { - this.readSupport = newFilterSupport; - return this; - } - - public ReadBuilder createReaderFunc( - Function> newReaderFunction) { - Preconditions.checkArgument( - this.batchedReaderFunc == null, - "Cannot set reader function: batched reader function already set"); - Preconditions.checkArgument( - this.readerFuncWithSchema == null, - "Cannot set reader function: 2-argument reader function already set"); - this.readerFunc = newReaderFunction; - return this; - } - - public ReadBuilder createReaderFunc( - BiFunction> newReaderFunction) { - Preconditions.checkArgument( - this.readerFunc == null, - "Cannot set 2-argument reader function: reader function already set"); - Preconditions.checkArgument( - this.batchedReaderFunc == null, - "Cannot set 2-argument reader function: batched reader function already set"); - this.readerFuncWithSchema = newReaderFunction; - return this; - } - - public ReadBuilder createBatchedReaderFunc(Function> func) { - Preconditions.checkArgument( - this.readerFunc == null, - "Cannot set batched reader function: reader function already set"); - Preconditions.checkArgument( - this.readerFuncWithSchema == null, - "Cannot set batched reader function: 2-argument reader function already set"); - this.batchedReaderFunc = func; - return this; - } - - public ReadBuilder set(String key, String value) { - properties.put(key, value); - return this; - } - - /** - * @deprecated will be removed in 2.0.0; use {@link #createReaderFunc(Function)} instead - */ - @Deprecated - public ReadBuilder callInit() { - this.callInit = true; - return this; - } - - @Override - public ReadBuilder reuseContainers() { - this.reuseContainers = true; - return this; - } - - public ReadBuilder recordsPerBatch(int numRowsPerBatch) { - this.maxRecordsPerBatch = numRowsPerBatch; - return this; - } - - public ReadBuilder withNameMapping(NameMapping newNameMapping) { - this.nameMapping = newNameMapping; - return this; - } - - @Override - public ReadBuilder setRootType(Class rootClass) { - throw new UnsupportedOperationException("Custom types are not yet supported"); - } - - @Override - public ReadBuilder setCustomType(int fieldId, Class structClass) { - throw new UnsupportedOperationException("Custom types are not yet supported"); - } - - public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { - this.fileEncryptionKey = encryptionKey; - return this; - } - - public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { - this.fileAADPrefix = aadPrefix; - return this; - } - - @Override - @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) - public CloseableIterable build() { - FileDecryptionProperties fileDecryptionProperties = null; - if (fileEncryptionKey != null) { - byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey); - byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix); - fileDecryptionProperties = - FileDecryptionProperties.builder() - .withFooterKey(encryptionKeyArray) - .withAADPrefix(aadPrefixArray) - .build(); - } else { - Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); - } - - if (readerFunc != null || readerFuncWithSchema != null || batchedReaderFunc != null) { - ParquetReadOptions.Builder optionsBuilder; - if (file instanceof HadoopInputFile) { - // remove read properties already set that may conflict with this read - Configuration conf = new Configuration(((HadoopInputFile) file).getConf()); - for (String property : READ_PROPERTIES_TO_REMOVE) { - conf.unset(property); - } - optionsBuilder = HadoopReadOptions.builder(conf); - } else { - optionsBuilder = ParquetReadOptions.builder(new PlainParquetConfiguration()); - } - - for (Map.Entry entry : properties.entrySet()) { - optionsBuilder.set(entry.getKey(), entry.getValue()); - } - - if (start != null) { - optionsBuilder.withRange(start, start + length); - } - - if (fileDecryptionProperties != null) { - optionsBuilder.withDecryption(fileDecryptionProperties); - } - - ParquetReadOptions options = optionsBuilder.build(); - - NameMapping mapping; - if (nameMapping != null) { - mapping = nameMapping; - } else if (SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) { - mapping = null; - } else { - mapping = NameMapping.empty(); - } - - if (batchedReaderFunc != null) { - return new VectorizedParquetReader<>( - file, - schema, - options, - batchedReaderFunc, - mapping, - filter, - reuseContainers, - caseSensitive, - maxRecordsPerBatch); - } else { - Function> readBuilder = - readerFuncWithSchema != null ? - fileType -> readerFuncWithSchema.apply(schema, fileType) : - readerFunc; - return new org.apache.iceberg.parquet.ParquetReader<>( - file, schema, options, readBuilder, mapping, filter, reuseContainers, caseSensitive); - } - } - - ParquetReadBuilder builder = new ParquetReadBuilder<>(ParquetIO.file(file)); - - builder.project(schema); - - if (readSupport != null) { - builder.readSupport((ReadSupport) readSupport); - } else { - builder.readSupport(new AvroReadSupport<>(ParquetAvro.DEFAULT_MODEL)); - } - - // default options for readers - builder - .set("parquet.strict.typing", "false") // allow type promotion - .set("parquet.avro.compatible", "false") // use the new RecordReader with Utf8 support - .set( - "parquet.avro.add-list-element-records", - "false"); // assume that lists use a 3-level schema - - for (Map.Entry entry : properties.entrySet()) { - builder.set(entry.getKey(), entry.getValue()); - } - - if (filter != null) { - // TODO: should not need to get the schema to push down before opening the file. - // Parquet should allow setting a filter inside its read support - ParquetReadOptions decryptOptions = - ParquetReadOptions.builder(new PlainParquetConfiguration()) - .withDecryption(fileDecryptionProperties) - .build(); - MessageType type; - try (ParquetFileReader schemaReader = - ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) { - type = schemaReader.getFileMetaData().getSchema(); - } catch (IOException e) { - throw new RuntimeIOException(e); - } - Schema fileSchema = ParquetSchemaUtil.convert(type); - builder - .useStatsFilter() - .useDictionaryFilter() - .useRecordFilter(filterRecords) - .useBloomFilter() - .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive)); - } else { - // turn off filtering - builder - .useStatsFilter(false) - .useDictionaryFilter(false) - .useBloomFilter(false) - .useRecordFilter(false); - } - - if (callInit) { - builder.callInit(); - } - - if (start != null) { - builder.withFileRange(start, start + length); - } - - if (nameMapping != null) { - builder.withNameMapping(nameMapping); - } - - if (fileDecryptionProperties != null) { - builder.withDecryption(fileDecryptionProperties); - } - - return new ParquetIterable<>(builder); - } - } - - private static class ParquetReadBuilder extends ParquetReader.Builder { - private Schema schema = null; - private ReadSupport readSupport = null; - private boolean callInit = false; - private NameMapping nameMapping = null; - - private ParquetReadBuilder(org.apache.parquet.io.InputFile file) { - super(file); - } - - public ParquetReadBuilder project(Schema newSchema) { - this.schema = newSchema; - return this; - } - - public ParquetReadBuilder withNameMapping(NameMapping newNameMapping) { - this.nameMapping = newNameMapping; - return this; - } - - public ParquetReadBuilder readSupport(ReadSupport newReadSupport) { - this.readSupport = newReadSupport; - return this; - } - - public ParquetReadBuilder callInit() { - this.callInit = true; - return this; - } - - @Override - protected ReadSupport getReadSupport() { - return new ParquetReadSupport<>(schema, readSupport, callInit, nameMapping); - } - } - - /** - * Combines several files into one - * - * @param inputFiles an {@link Iterable} of parquet files. The order of iteration determines the - * order in which content of files are read and written to the {@code outputFile} - * @param outputFile the output parquet file containing all the data from {@code inputFiles} - * @param rowGroupSize the row group size to use when writing the {@code outputFile} - * @param schema the schema of the data - * @param metadata extraMetadata to write at the footer of the {@code outputFile} - */ - public static void concat( - Iterable inputFiles, - File outputFile, - int rowGroupSize, - Schema schema, - Map metadata) - throws IOException { - OutputFile file = Files.localOutput(outputFile); - try (ParquetFileWriter writer = - new ParquetFileWriter( - ParquetIO.file(file), - ParquetSchemaUtil.convert(schema, "table"), - ParquetFileWriter.Mode.CREATE, - rowGroupSize, - 0)) { - writer.start(); - for (File inputFile : inputFiles) { - writer.appendFile(ParquetIO.file(Files.localInput(inputFile))); - } - writer.end(metadata); - } - } -} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java index 7d0f0feec8c0..736a39b895c8 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java @@ -19,64 +19,106 @@ package org.apache.iceberg.parquet; +import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.function.Supplier; +import org.apache.iceberg.Accessor; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.data.Record; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.PhysicalType; import org.apache.iceberg.variants.Variant; +/** + * Utilities for variant shredding support in Parquet writers. + * + *

This includes: + *

+ */ public class VariantUtil { private VariantUtil() { } /** - * Create a VariantShreddingFunction if variant shredding is enabled and the schema has variant columns. + * A VARIANT field in an Iceberg {@link Schema} that can be used for shredding. * - * @param schema The Iceberg schema - * @param sampleRecord A sample record to infer variant schemas from actual data (can be null) - * @param properties Table properties to check if variant shredding is enabled - * @return An Optional containing the VariantShreddingFunction if applicable + *

Shredding is supported for top-level VARIANT columns or VARIANT fields nested in structs. + * VARIANT fields stored inside lists or maps are excluded because schema accessors do not retrieve + * list/map contents. + */ + public record VariantField(int fieldId, Accessor accessor, String[] path) { + } + + /** + * Check if variant shredding is enabled via table properties. */ - public static Optional variantShreddingFunc( - Schema schema, - Supplier sampleRecord, - Map properties) { - - // Preconditions: must have variant columns + property enabled - if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) { - return Optional.empty(); + public static boolean isVariantShreddingEnabled(Map properties) { + String shreddingEnabled = properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED); + return Boolean.parseBoolean(shreddingEnabled); + } + + public static boolean isShreddable(Object value) { + if (value instanceof Variant variant) { + return variant.value().type() != PhysicalType.NULL; } + return false; + } - VariantShreddingFunction fn = - constructVariantShreddingFunc(sampleRecord.get(), schema); + public static List variantFieldsForShredding( + Map properties, Schema schema) { + if (!isVariantShreddingEnabled(properties)) { + return List.of(); + } + return variantFieldsForShredding(schema); + } + + /** + * Returns all VARIANT fields that are eligible for shredding: top-level VARIANT columns and VARIANT + * fields nested in structs (excluding lists/maps). + */ + private static List variantFieldsForShredding(Schema schema) { + List results = Lists.newArrayList(); + new VariantFieldVisitor(schema).collect(results); + return results; + } + + public static boolean shouldUseVariantShredding(Map properties, Schema schema) { + return isVariantShreddingEnabled(properties) && hasVariantFields(schema); + } - return Optional.of(fn); + private static boolean hasVariantFields(Schema schema) { + return new VariantFieldVisitor(schema).hasVariantField(); } - private static VariantShreddingFunction constructVariantShreddingFunc( + public static VariantShreddingFunction variantShreddingFunc( Record sampleRecord, Schema schema) { - return (id, name) -> { + return (id, ignoredName) -> { // Validate the field exists and is a variant type Types.NestedField field = schema.findField(id); - if (field == null || !(field.type() instanceof Types.VariantType)) { + if (field == null || !field.type().isVariantType()) { return null; // Not a variant field, no shredding } // If we have a sample record, try to generate schema from actual data if (sampleRecord != null) { try { - Object variantValue = sampleRecord.getField(name); + // NOTE: Parquet conversion passes the field's local name, not the full path. + // Use an accessor to support variant fields nested in structs. + Accessor accessor = schema.accessorForField(id); + Object variantValue = accessor != null ? accessor.get(sampleRecord) : null; if (variantValue instanceof Variant variant) { - // Use ParquetVariantUtil to generate schema from actual variant value return ParquetVariantUtil.toParquetSchema(variant.value()); } - } catch (Exception e) { + } catch (RuntimeException e) { // Fall through to default schema } } @@ -84,20 +126,66 @@ private static VariantShreddingFunction constructVariantShreddingFunc( }; } - /** - * Check if the schema contains any variant columns. - */ - private static boolean hasVariantColumns(Schema schema) { - return schema.columns().stream() - .anyMatch(field -> field.type() instanceof Types.VariantType); - } + private static final class VariantFieldVisitor { + private final Schema schema; - /** - * Check if variant shredding is enabled via table properties. - */ - private static boolean isVariantShreddingEnabled(Map properties) { - String shreddingEnabled = properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED); - return Boolean.parseBoolean(shreddingEnabled); - } + private VariantFieldVisitor(Schema schema) { + this.schema = schema; + } + + private boolean hasVariantField() { + return hasVariantField(schema.asStruct()); + } + + private boolean hasVariantField(Types.StructType struct) { + for (Types.NestedField field : struct.fields()) { + if (field.type().isVariantType()) { + // Accessors don't retrieve values contained in lists/maps so this enforces the "struct-only" + // nesting rule for shredding. + if (schema.accessorForField(field.fieldId()) != null) { + return true; + } + } else if (field.type().isStructType() && hasVariantField(field.type().asStructType())) { + return true; + } + // Do not recurse into List or Map (shredding is not supported there) + } + return false; + } + private void collect(List results) { + collect(schema.asStruct(), Lists.newArrayList(), results); + } + + private void collect( + Types.StructType struct, + List parents, + List results) { + + for (Types.NestedField field : struct.fields()) { + if (field.type().isVariantType()) { + // Accessors don't retrieve values contained in lists/maps so this enforces the "struct-only" + // nesting rule for shredding. + Accessor accessor = schema.accessorForField(field.fieldId()); + if (accessor == null) { + continue; + } + + String[] path = new String[parents.size() + 1]; + for (int i = 0; i < parents.size(); i++) { + path[i] = parents.get(i); + } + path[parents.size()] = field.name(); + + results.add(new VariantField(field.fieldId(), accessor, path)); + + } else if (field.type().isStructType()) { + parents.add(field.name()); + collect(field.type().asStructType(), parents, results); + parents.removeLast(); + } + // Do not recurse into List or Map (shredding is not supported there) + } + } + } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java index 990d6e782fdb..8a1cf740067b 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java @@ -22,9 +22,6 @@ import java.io.IOException; import java.util.List; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -import org.apache.hadoop.fs.Path; -import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -35,16 +32,11 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; import org.junit.Assert; import org.junit.Test; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assume.assumeTrue; /** @@ -195,68 +187,6 @@ public void testSpecialCharacters() { Assert.assertArrayEquals(new Object[]{"star", 2L}, result.get(1)); } - @Test - public void testVariantSelectProjection() throws IOException { - assumeTrue(fileFormat == FileFormat.PARQUET); - assumeTrue(!isVectorized); - - TableIdentifier table = TableIdentifier.of("default", "variant_projection"); - shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table)); - - shell.executeStatement( - String.format( - "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG STORED AS %s %s %s", - table, - fileFormat, - testTables.locationForCreateTableSQL(table), - testTables.propertiesForCreateTableSQL( - ImmutableMap.of("format-version", "3", "variant.shredding.enabled", "true")))); - - shell.executeStatement( - String.format( - "INSERT INTO %s VALUES " + - "(1, parse_json('{\"name\":\"Alice\",\"age\":30}'))," + - "(2, parse_json('{\"name\":\"Bob\"}'))", - table)); - - List rows = - shell.executeStatement( - String.format( - "SELECT id, " + - "variant_get(payload, '$.name') AS name, " + - "try_variant_get(payload, '$.age', 'int') AS age " + - "FROM %s ORDER BY id", - table)); - - Assert.assertEquals(2, rows.size()); - Assert.assertEquals(1, ((Number) rows.get(0)[0]).intValue()); - Assert.assertEquals("Alice", rows.get(0)[1]); - Assert.assertEquals(30, ((Number) rows.get(0)[2]).intValue()); - - Assert.assertEquals(2, ((Number) rows.get(1)[0]).intValue()); - Assert.assertEquals("Bob", rows.get(1)[1]); - Assert.assertNull(rows.get(1)[2]); - - Table icebergTable = testTables.loadTable(table); - Types.NestedField variantField = icebergTable.schema().findField("payload"); - Assert.assertNotNull("Variant column should exist", variantField); - DataFile dataFile = - StreamSupport.stream( - icebergTable.currentSnapshot().addedDataFiles(icebergTable.io()).spliterator(), false) - .findFirst() - .orElseThrow(() -> new IllegalStateException("No data files written for test table")); - - Path parquetPath = new Path(dataFile.path().toString()); - try (ParquetFileReader reader = - ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath, shell.getHiveConf()))) { - MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema(); - GroupType variantType = parquetSchema.getType(variantField.name()).asGroupType(); - assertThat(variantType.containsField("typed_value")).isTrue(); - } - - shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table)); - } - @Test public void testScanTableCaseInsensitive() throws IOException { testTables.createTable(shell, "customers", diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergVariant.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergVariant.java new file mode 100644 index 000000000000..7ec594a609cf --- /dev/null +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergVariant.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.util.List; +import java.util.stream.StreamSupport; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.junit.Assert; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assume.assumeTrue; + +public class TestHiveIcebergVariant extends HiveIcebergStorageHandlerWithEngineBase { + private static final String TYPED_VALUE_FIELD = "typed_value"; + + @Test + public void testVariantSelectProjection() throws IOException { + assumeParquetNonVectorized(); + + TableIdentifier table = TableIdentifier.of("default", "variant_projection"); + shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table)); + + shell.executeStatement( + String.format( + "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG STORED AS %s %s %s", + table, + fileFormat, + testTables.locationForCreateTableSQL(table), + testTables.propertiesForCreateTableSQL( + ImmutableMap.of("format-version", "3", "variant.shredding.enabled", "true")))); + + shell.executeStatement( + String.format( + "INSERT INTO %s VALUES " + + "(1, parse_json('null'))," + + "(2, parse_json('{\"name\":\"Alice\",\"age\":30}'))," + + "(3, parse_json('{\"name\":\"Bob\"}'))", + table)); + + List rows = + shell.executeStatement( + String.format( + "SELECT id, " + + "variant_get(payload, '$.name') AS name, " + + "try_variant_get(payload, '$.age', 'int') AS age " + + "FROM %s ORDER BY id", + table)); + + Assert.assertEquals(3, rows.size()); + + Assert.assertEquals(1, ((Number) rows.get(0)[0]).intValue()); + Assert.assertNull(rows.get(0)[1]); + Assert.assertNull(rows.get(0)[2]); + + Assert.assertEquals(2, ((Number) rows.get(1)[0]).intValue()); + Assert.assertEquals("Alice", rows.get(1)[1]); + Assert.assertEquals(30, ((Number) rows.get(1)[2]).intValue()); + + Assert.assertEquals(3, ((Number) rows.get(2)[0]).intValue()); + Assert.assertEquals("Bob", rows.get(2)[1]); + Assert.assertNull(rows.get(2)[2]); + + Table icebergTable = testTables.loadTable(table); + Types.NestedField variantField = requiredField(icebergTable, "payload", "Variant column should exist"); + MessageType parquetSchema = readParquetSchema(firstDataFile(icebergTable)); + assertThat(hasTypedValue(parquetSchema, variantField.name())).isTrue(); + } + + @Test + public void testVariantShreddingInStruct() throws IOException { + assumeParquetNonVectorized(); + + TableIdentifier table = TableIdentifier.of("default", "variant_struct_shredding"); + shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table)); + + shell.executeStatement( + String.format( + "CREATE TABLE %s (id INT, payload STRUCT) STORED BY ICEBERG STORED AS %s %s %s", + table, + fileFormat, + testTables.locationForCreateTableSQL(table), + testTables.propertiesForCreateTableSQL( + ImmutableMap.of("format-version", "3", "variant.shredding.enabled", "true")))); + + shell.executeStatement( + String.format( + "INSERT INTO %s VALUES " + + "(1, named_struct('info', parse_json('null')))," + + "(2, named_struct('info', parse_json('{\"city\":\"Seattle\",\"state\":\"WA\"}')))", + table)); + + Table icebergTable = testTables.loadTable(table); + Types.NestedField payloadField = requiredField(icebergTable, "payload", "Struct column should exist"); + MessageType parquetSchema = readParquetSchema(firstDataFile(icebergTable)); + assertThat(hasTypedValue(parquetSchema, payloadField.name(), "info")).isTrue(); + } + + @Test + public void testVariantShreddingNotAppliedInArrayOrMap() throws IOException { + assumeParquetNonVectorized(); + + TableIdentifier table = TableIdentifier.of("default", "variant_container_no_shredding"); + shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table)); + + shell.executeStatement( + String.format( + "CREATE TABLE %s (id INT, arr ARRAY, mp MAP) " + + "STORED BY ICEBERG STORED AS %s %s %s", + table, + fileFormat, + testTables.locationForCreateTableSQL(table), + testTables.propertiesForCreateTableSQL( + ImmutableMap.of("format-version", "3", "variant.shredding.enabled", "true")))); + + shell.executeStatement( + String.format( + "INSERT INTO %s VALUES " + + "(1, array(parse_json('{\"a\":1}')), map('k', parse_json('{\"b\":2}')))", + table)); + + Table icebergTable = testTables.loadTable(table); + MessageType parquetSchema = readParquetSchema(firstDataFile(icebergTable)); + // The element/value types should remain as the base VARIANT struct (no typed_value). + assertThat(hasTypedValue(parquetSchema, "arr", "list", "element")).isFalse(); + assertThat(hasTypedValue(parquetSchema, "mp", "key_value", "value")).isFalse(); + } + + private void assumeParquetNonVectorized() { + assumeTrue(fileFormat == FileFormat.PARQUET); + assumeTrue(!isVectorized); + } + + private static Types.NestedField requiredField(Table table, String fieldName, String message) { + Types.NestedField field = table.schema().findField(fieldName); + Assert.assertNotNull(message, field); + return field; + } + + private static DataFile firstDataFile(Table table) { + return StreamSupport.stream(table.currentSnapshot().addedDataFiles(table.io()).spliterator(), false) + .findFirst() + .orElseThrow(() -> new IllegalStateException("No data files written for test table")); + } + + private MessageType readParquetSchema(DataFile dataFile) throws IOException { + Path parquetPath = new Path(dataFile.location()); + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath, shell.getHiveConf()))) { + return reader.getFooter().getFileMetaData().getSchema(); + } + } + + private static GroupType groupAt(MessageType parquetSchema, String... path) { + org.apache.parquet.schema.Type type = parquetSchema.getType(path[0]); + for (int i = 1; i < path.length; i++) { + type = type.asGroupType().getType(path[i]); + } + return type.asGroupType(); + } + + private static boolean hasTypedValue(MessageType parquetSchema, String... pathToVariantGroup) { + return groupAt(parquetSchema, pathToVariantGroup).containsField(TYPED_VALUE_FIELD); + } +} diff --git a/iceberg/iceberg-shading/pom.xml b/iceberg/iceberg-shading/pom.xml index ac4bf4a9b106..17d50dd91d62 100644 --- a/iceberg/iceberg-shading/pom.xml +++ b/iceberg/iceberg-shading/pom.xml @@ -117,7 +117,6 @@ - *:* @@ -128,13 +127,6 @@ static/ - - - org.apache.iceberg:iceberg-parquet - - org/apache/iceberg/parquet/Parquet.class - -