diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java new file mode 100644 index 000000000000..10df7303d500 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** Interface for partition statistics returned from a {@link PartitionStatisticsScan}. */ +public interface PartitionStatistics extends StructLike { + + /** Returns the partition of these partition statistics */ + StructLike partition(); + + /** Returns the spec ID of the partition of these partition statistics */ + Integer specId(); + + /** Returns the number of data records in the partition */ + Long dataRecordCount(); + + /** Returns the number of data files in the partition */ + Integer dataFileCount(); + + /** Returns the total size of data files in bytes in the partition */ + Long totalDataFileSizeInBytes(); + + /** + * Returns the number of positional delete records in the partition. Also includes dv record count + * as per spec + */ + Long positionDeleteRecordCount(); + + /** Returns the number of positional delete files in the partition */ + Integer positionDeleteFileCount(); + + /** Returns the number of equality delete records in the partition */ + Long equalityDeleteRecordCount(); + + /** Returns the number of equality delete files in the partition */ + Integer equalityDeleteFileCount(); + + /** Returns the total number of records in the partition */ + Long totalRecords(); + + /** Returns the timestamp in milliseconds when the partition was last updated */ + Long lastUpdatedAt(); + + /** Returns the ID of the snapshot that last updated this partition */ + Long lastUpdatedSnapshotId(); + + /** Returns the number of delete vectors in the partition */ + Integer dvCount(); +} diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java new file mode 100644 index 000000000000..18d8b2031821 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; + +/** API for configuring partition statistics scan. */ +public interface PartitionStatisticsScan { + + /** + * Create a new scan from this scan's configuration that will use the given snapshot by ID. + * + * @param snapshotId a snapshot ID + * @return a new scan based on this with the given snapshot ID + * @throws IllegalArgumentException if the snapshot cannot be found + */ + PartitionStatisticsScan useSnapshot(long snapshotId); + + /** + * Create a new scan from the results of this, where partitions are filtered by the {@link + * Expression}. + * + * @param filter a filter expression + * @return a new scan based on this with results filtered by the expression + */ + PartitionStatisticsScan filter(Expression filter); + + /** + * Create a new scan from this with the schema as its projection. + * + * @param schema a projection schema + * @return a new scan based on this with the given projection + */ + PartitionStatisticsScan project(Schema schema); + + /** + * Scans a partition statistics file belonging to a particular snapshot + * + * @return an Iterable of partition statistics + */ + CloseableIterable scan(); +} diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 97ea9ba76526..3c0689e89288 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -83,6 +83,18 @@ default IncrementalChangelogScan newIncrementalChangelogScan() { throw new UnsupportedOperationException("Incremental changelog scan is not supported"); } + /** + * Create a new {@link PartitionStatisticsScan} for this table. + * + *

Once a partition statistics scan is created, it can be refined to project columns and filter + * data. + * + * @return a partition statistics scan for this table + */ + default PartitionStatisticsScan newPartitionStatisticsScan() { + throw new UnsupportedOperationException("Partition statistics scan is not supported"); + } + /** * Return the {@link Schema schema} for this table. * diff --git a/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java new file mode 100644 index 000000000000..c17718281b57 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.avro.SupportsIndexProjection; +import org.apache.iceberg.types.Types; + +public class BasePartitionStatistics extends SupportsIndexProjection + implements PartitionStatistics { + + private StructLike partition; + private Integer specId; + private Long dataRecordCount; + private Integer dataFileCount; + private Long totalDataFileSizeInBytes; + private Long positionDeleteRecordCount; + private Integer positionDeleteFileCount; + private Long equalityDeleteRecordCount; + private Integer equalityDeleteFileCount; + private Long totalRecordCount; // Not calculated, as it needs scanning the data. Remains null + private Long lastUpdatedAt; + private Long lastUpdatedSnapshotId; + private Integer dvCount; + + private static final int STATS_COUNT = 13; + + /** Used by internal readers to instantiate this class with a projection schema. */ + public BasePartitionStatistics(Types.StructType projection) { + super(STATS_COUNT); + } + + @Override + public StructLike partition() { + return partition; + } + + @Override + public Integer specId() { + return specId; + } + + @Override + public Long dataRecordCount() { + return dataRecordCount; + } + + @Override + public Integer dataFileCount() { + return dataFileCount; + } + + @Override + public Long totalDataFileSizeInBytes() { + return totalDataFileSizeInBytes; + } + + @Override + public Long positionDeleteRecordCount() { + return positionDeleteRecordCount; + } + + @Override + public Integer positionDeleteFileCount() { + return positionDeleteFileCount; + } + + @Override + public Long equalityDeleteRecordCount() { + return equalityDeleteRecordCount; + } + + @Override + public Integer equalityDeleteFileCount() { + return equalityDeleteFileCount; + } + + @Override + public Long totalRecords() { + return totalRecordCount; + } + + @Override + public Long lastUpdatedAt() { + return lastUpdatedAt; + } + + @Override + public Long lastUpdatedSnapshotId() { + return lastUpdatedSnapshotId; + } + + @Override + public Integer dvCount() { + return dvCount; + } + + @Override + protected T internalGet(int pos, Class javaClass) { + return javaClass.cast(getByPos(pos)); + } + + private Object getByPos(int pos) { + switch (pos) { + case 0: + return partition; + case 1: + return specId; + case 2: + return dataRecordCount; + case 3: + return dataFileCount; + case 4: + return totalDataFileSizeInBytes; + case 5: + return positionDeleteRecordCount; + case 6: + return positionDeleteFileCount; + case 7: + return equalityDeleteRecordCount; + case 8: + return equalityDeleteFileCount; + case 9: + return totalRecordCount; + case 10: + return lastUpdatedAt; + case 11: + return lastUpdatedSnapshotId; + case 12: + return dvCount; + default: + throw new UnsupportedOperationException("Unknown position: " + pos); + } + } + + @Override + protected void internalSet(int pos, T value) { + if (value == null) { + return; + } + + switch (pos) { + case 0: + this.partition = (StructLike) value; + break; + case 1: + this.specId = (int) value; + break; + case 2: + this.dataRecordCount = (long) value; + break; + case 3: + this.dataFileCount = (int) value; + break; + case 4: + this.totalDataFileSizeInBytes = (long) value; + break; + case 5: + this.positionDeleteRecordCount = (long) value; + break; + case 6: + this.positionDeleteFileCount = (int) value; + break; + case 7: + this.equalityDeleteRecordCount = (long) value; + break; + case 8: + this.equalityDeleteFileCount = (int) value; + break; + case 9: + this.totalRecordCount = (Long) value; + break; + case 10: + this.lastUpdatedAt = (Long) value; + break; + case 11: + this.lastUpdatedSnapshotId = (Long) value; + break; + case 12: + this.dvCount = (int) value; + break; + default: + throw new UnsupportedOperationException("Unknown position: " + pos); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java new file mode 100644 index 000000000000..075a1a85d394 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Optional; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; + +public class BasePartitionStatisticsScan implements PartitionStatisticsScan { + + private final Table table; + private Long snapshotId; + + public BasePartitionStatisticsScan(Table table) { + this.table = table; + } + + @Override + public PartitionStatisticsScan useSnapshot(long newSnapshotId) { + Preconditions.checkArgument( + table.snapshot(newSnapshotId) != null, "Cannot find snapshot with ID %s", newSnapshotId); + + this.snapshotId = newSnapshotId; + return this; + } + + @Override + public PartitionStatisticsScan filter(Expression newFilter) { + throw new UnsupportedOperationException("Filtering is not supported"); + } + + @Override + public PartitionStatisticsScan project(Schema newSchema) { + throw new UnsupportedOperationException("Projection is not supported"); + } + + @Override + public CloseableIterable scan() { + if (snapshotId == null) { + if (table.currentSnapshot() == null) { + return CloseableIterable.empty(); + } + + snapshotId = table.currentSnapshot().snapshotId(); + } + + Optional statsFile = + table.partitionStatisticsFiles().stream() + .filter(f -> f.snapshotId() == snapshotId) + .findFirst(); + + if (statsFile.isEmpty()) { + return CloseableIterable.empty(); + } + + Types.StructType partitionType = Partitioning.partitionType(table); + Schema schema = PartitionStatsHandler.schema(partitionType, TableUtil.formatVersion(table)); + + FileFormat fileFormat = FileFormat.fromFileName(statsFile.get().path()); + Preconditions.checkNotNull( + fileFormat != null, "Unable to determine format of file: %s", statsFile.get().path()); + + return InternalData.read(fileFormat, table.io().newInputFile(statsFile.get().path())) + .project(schema) + .setRootType(BasePartitionStatistics.class) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 23299a962ce5..c489c3bfb517 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -90,6 +90,11 @@ public IncrementalChangelogScan newIncrementalChangelogScan() { return new BaseIncrementalChangelogScan(this); } + @Override + public PartitionStatisticsScan newPartitionStatisticsScan() { + return new BasePartitionStatisticsScan(this); + } + @Override public Schema schema() { return ops.current().schema(); diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java index 9051c8535c7e..e8a4e18916bc 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStats.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -20,6 +20,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +/** + * Class to hold partition statistics values. + * + * @deprecated will be removed in 1.12.0. Use {@link BasePartitionStatistics instead} + */ +@Deprecated public class PartitionStats implements StructLike { private static final int STATS_COUNT = 13; diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java index 4e7c1b104ee8..7259a1f0684b 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -275,7 +275,9 @@ static PartitionStatisticsFile writePartitionStatsFile( * * @param schema The {@link Schema} of the partition statistics file. * @param inputFile An {@link InputFile} pointing to the partition stats file. + * @deprecated will be removed in 1.12.0, use {@link PartitionStatisticsScan} instead */ + @Deprecated public static CloseableIterable readPartitionStatsFile( Schema schema, InputFile inputFile) { Preconditions.checkArgument(schema != null, "Invalid schema: null"); diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java new file mode 100644 index 000000000000..89eb70959c6d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; +import org.assertj.core.groups.Tuple; +import org.junit.jupiter.api.Test; + +public abstract class PartitionStatisticsScanTestBase extends PartitionStatisticsTestBase { + + public abstract FileFormat format(); + + private final Map fileFormatProperty = + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format().name()); + + @Test + public void testEmptyTable() throws Exception { + Table testTable = + TestTables.create( + tempDir("scan_empty_table"), "scan_empty_table", SCHEMA, SPEC, 2, fileFormatProperty); + + assertThat(Lists.newArrayList(testTable.newPartitionStatisticsScan().scan())).isEmpty(); + } + + @Test + public void testInvalidSnapshotId() throws Exception { + Table testTable = + TestTables.create( + tempDir("scan_invalid_snapshot"), + "scan_invalid_snapshot", + SCHEMA, + SPEC, + 2, + fileFormatProperty); + + assertThatThrownBy(() -> testTable.newPartitionStatisticsScan().useSnapshot(1234L).scan()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot find snapshot with ID 1234"); + } + + @Test + public void testNoStatsForSnapshot() throws Exception { + Table testTable = + TestTables.create( + tempDir("scan_no_stats"), "scan_no_stats", SCHEMA, SPEC, 2, fileFormatProperty); + + DataFile dataFile = + DataFiles.builder(SPEC) + .withPath("some_path") + .withFileSizeInBytes(15) + .withFormat(format()) + .withRecordCount(1) + .build(); + testTable.newAppend().appendFile(dataFile).commit(); + long snapshotId = testTable.currentSnapshot().snapshotId(); + + assertThat(testTable.newPartitionStatisticsScan().useSnapshot(snapshotId).scan()).isEmpty(); + } + + @Test + public void testReadingStatsWithInvalidSchema() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + Table testTable = + TestTables.create( + tempDir("scan_with_old_schema"), + "scan_with_old_schema", + SCHEMA, + spec, + 2, + fileFormatProperty); + Types.StructType partitionType = Partitioning.partitionType(testTable); + Schema oldSchema = invalidOldSchema(partitionType); + + // Add a dummy file to the table to have a snapshot + DataFile dataFile = + DataFiles.builder(spec) + .withPath("some_path") + .withFileSizeInBytes(15) + .withFormat(FileFormat.PARQUET) + .withRecordCount(1) + .build(); + testTable.newAppend().appendFile(dataFile).commit(); + long snapshotId = testTable.currentSnapshot().snapshotId(); + + testTable + .updatePartitionStatistics() + .setPartitionStatistics( + PartitionStatsHandler.writePartitionStatsFile( + testTable, + snapshotId, + oldSchema, + Collections.singletonList(randomStats(partitionType)))) + .commit(); + + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().useSnapshot(snapshotId).scan()) { + + if (format() == FileFormat.PARQUET) { + assertThatThrownBy(() -> Lists.newArrayList(recordIterator)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Not a primitive type: struct"); + } else if (format() == FileFormat.AVRO) { + assertThatThrownBy(() -> Lists.newArrayList(recordIterator)) + .isInstanceOf(ClassCastException.class) + .hasMessageContaining("Integer cannot be cast to class org.apache.iceberg.StructLike"); + } + } + } + + @Test + public void testV2toV3SchemaEvolution() throws Exception { + Table testTable = + TestTables.create( + tempDir("scan_with_schema_evolution"), + "scan_with_schema_evolution", + SCHEMA, + SPEC, + 2, + fileFormatProperty); + + // write stats file using v2 schema + DataFile dataFile = + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "A")); + testTable.newAppend().appendFile(dataFile).commit(); + + testTable + .updatePartitionStatistics() + .setPartitionStatistics( + PartitionStatsHandler.computeAndWriteStatsFile( + testTable, testTable.currentSnapshot().snapshotId())) + .commit(); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + + // read with v2 schema + List partitionStatsV2; + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().scan()) { + partitionStatsV2 = Lists.newArrayList(recordIterator); + } + + testTable.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); + + // read with v3 schema + List partitionStatsV3; + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().scan()) { + partitionStatsV3 = Lists.newArrayList(recordIterator); + } + + assertThat(partitionStatsV2).hasSameSizeAs(partitionStatsV3); + Comparator comparator = Comparators.forType(partitionSchema); + for (int i = 0; i < partitionStatsV2.size(); i++) { + assertThat(isEqual(comparator, partitionStatsV2.get(i), partitionStatsV3.get(i))).isTrue(); + } + } + + @SuppressWarnings("checkstyle:MethodLength") + @Test + public void testScanPartitionStatsForCurrentSnapshot() throws Exception { + Table testTable = + TestTables.create( + tempDir("scan_partition_stats"), + "scan_partition_stats", + SCHEMA, + SPEC, + 2, + fileFormatProperty); + + DataFile dataFile1 = + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "A")); + DataFile dataFile2 = + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "B")); + DataFile dataFile3 = + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("bar", "A")); + DataFile dataFile4 = + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("bar", "B")); + + for (int i = 0; i < 3; i++) { + // insert same set of seven records thrice to have a new manifest files + testTable + .newAppend() + .appendFile(dataFile1) + .appendFile(dataFile2) + .appendFile(dataFile3) + .appendFile(dataFile4) + .commit(); + } + + Snapshot snapshot1 = testTable.currentSnapshot(); + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2); + + Types.StructType partitionType = + recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); + computeAndValidatePartitionStats( + testTable, + testTable.currentSnapshot().snapshotId(), + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 3 * dataFile1.recordCount(), + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + null), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3 * dataFile2.recordCount(), + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + null), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3 * dataFile3.recordCount(), + 3, + 3 * dataFile3.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + null), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 3 * dataFile4.recordCount(), + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + null)); + + DeleteFile posDelete = + FileGenerationUtil.generatePositionDeleteFile(testTable, TestHelpers.Row.of("bar", "A")); + testTable.newRowDelta().addDeletes(posDelete).commit(); + // snapshot2 is unused in the result as same partition was updated by snapshot4 + + DeleteFile eqDelete = + FileGenerationUtil.generateEqualityDeleteFile(testTable, TestHelpers.Row.of("foo", "A")); + testTable.newRowDelta().addDeletes(eqDelete).commit(); + Snapshot snapshot3 = testTable.currentSnapshot(); + + testTable.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); + DeleteFile dv = FileGenerationUtil.generateDV(testTable, dataFile3); + testTable.newRowDelta().addDeletes(dv).commit(); + Snapshot snapshot4 = testTable.currentSnapshot(); + + computeAndValidatePartitionStats( + testTable, + testTable.currentSnapshot().snapshotId(), + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 3 * dataFile1.recordCount(), + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + eqDelete.recordCount(), + 1, + null, + snapshot3.timestampMillis(), + snapshot3.snapshotId(), + 0), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3 * dataFile2.recordCount(), + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + 0), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3 * dataFile3.recordCount(), + 3, + 3 * dataFile3.fileSizeInBytes(), + posDelete.recordCount() + dv.recordCount(), + 1, + 0L, + 0, + null, + snapshot4.timestampMillis(), + snapshot4.snapshotId(), + 1), // dv count + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 3 * dataFile4.recordCount(), + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + 0)); + } + + @Test + public void testScanPartitionStatsForOlderSnapshot() throws Exception { + Table testTable = + TestTables.create( + tempDir("scan_older_snapshot"), + "scan_older_snapshot", + SCHEMA, + SPEC, + 2, + fileFormatProperty); + + DataFile dataFile1 = + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "A")); + DataFile dataFile2 = + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "B")); + + testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit(); + + Snapshot firstSnapshot = testTable.currentSnapshot(); + + testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit(); + + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2); + + Types.StructType partitionType = + recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); + + computeAndValidatePartitionStats( + testTable, + firstSnapshot.snapshotId(), + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + dataFile1.recordCount(), + 1, + dataFile1.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + firstSnapshot.timestampMillis(), + firstSnapshot.snapshotId(), + null), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + dataFile2.recordCount(), + 1, + dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + firstSnapshot.timestampMillis(), + firstSnapshot.snapshotId(), + null)); + } + + private static void computeAndValidatePartitionStats( + Table testTable, long snapshotId, Tuple... expectedValues) throws IOException { + PartitionStatisticsFile result = + PartitionStatsHandler.computeAndWriteStatsFile(testTable, snapshotId); + testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); + assertThat(result.snapshotId()).isEqualTo(snapshotId); + + PartitionStatisticsScan statScan = testTable.newPartitionStatisticsScan(); + if (testTable.currentSnapshot().snapshotId() != snapshotId) { + statScan.useSnapshot(snapshotId); + } + + List partitionStats; + try (CloseableIterable recordIterator = statScan.scan()) { + partitionStats = Lists.newArrayList(recordIterator); + } + + assertThat(partitionStats) + .extracting( + PartitionStatistics::partition, + PartitionStatistics::specId, + PartitionStatistics::dataRecordCount, + PartitionStatistics::dataFileCount, + PartitionStatistics::totalDataFileSizeInBytes, + PartitionStatistics::positionDeleteRecordCount, + PartitionStatistics::positionDeleteFileCount, + PartitionStatistics::equalityDeleteRecordCount, + PartitionStatistics::equalityDeleteFileCount, + PartitionStatistics::totalRecords, + PartitionStatistics::lastUpdatedAt, + PartitionStatistics::lastUpdatedSnapshotId, + PartitionStatistics::dvCount) + .containsExactlyInAnyOrder(expectedValues); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + protected static boolean isEqual( + Comparator partitionComparator, + PartitionStatistics stats1, + PartitionStatistics stats2) { + if (stats1 == stats2) { + return true; + } else if (stats1 == null || stats2 == null) { + return false; + } + + return partitionComparator.compare(stats1.partition(), stats2.partition()) == 0 + && Objects.equals(stats1.specId(), stats2.specId()) + && Objects.equals(stats1.dataRecordCount(), stats2.dataRecordCount()) + && Objects.equals(stats1.dataFileCount(), stats2.dataFileCount()) + && Objects.equals(stats1.totalDataFileSizeInBytes(), stats2.totalDataFileSizeInBytes()) + && Objects.equals(stats1.positionDeleteRecordCount(), stats2.positionDeleteRecordCount()) + && Objects.equals(stats1.positionDeleteFileCount(), stats2.positionDeleteFileCount()) + && Objects.equals(stats1.equalityDeleteRecordCount(), stats2.equalityDeleteRecordCount()) + && Objects.equals(stats1.equalityDeleteFileCount(), stats2.equalityDeleteFileCount()) + && Objects.equals(stats1.totalRecords(), stats2.totalRecords()) + && Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt()) + && Objects.equals(stats1.lastUpdatedSnapshotId(), stats2.lastUpdatedSnapshotId()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java new file mode 100644 index 000000000000..72a5405d7f1b --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.PartitionStatsHandler.DATA_FILE_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.DATA_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_AT; +import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID; +import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_NAME; +import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT; +import static org.apache.iceberg.PartitionStatsHandler.SPEC_ID; +import static org.apache.iceberg.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES; +import static org.apache.iceberg.PartitionStatsHandler.TOTAL_RECORD_COUNT; +import static org.apache.iceberg.types.Types.NestedField.optional; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.io.TempDir; + +public abstract class PartitionStatisticsTestBase { + + @TempDir private File temp; + + // positions in StructLike + protected static final int DATA_RECORD_COUNT_POSITION = 2; + protected static final int DATA_FILE_COUNT_POSITION = 3; + protected static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4; + protected static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5; + protected static final int POSITION_DELETE_FILE_COUNT_POSITION = 6; + protected static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7; + protected static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8; + protected static final int TOTAL_RECORD_COUNT_POSITION = 9; + protected static final int LAST_UPDATED_AT_POSITION = 10; + protected static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11; + protected static final int DV_COUNT_POSITION = 12; + + protected static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build(); + + private static final Random RANDOM = ThreadLocalRandom.current(); + + protected Schema invalidOldSchema(Types.StructType unifiedPartitionType) { + // field ids starts from 0 instead of 1 + return new Schema( + Types.NestedField.required(0, PARTITION_FIELD_NAME, unifiedPartitionType), + Types.NestedField.required(1, SPEC_ID.name(), Types.IntegerType.get()), + Types.NestedField.required(2, DATA_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.required(3, DATA_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.required(4, TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), Types.LongType.get()), + Types.NestedField.optional(5, POSITION_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional(6, POSITION_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional(7, EQUALITY_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional(8, EQUALITY_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional(9, TOTAL_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional(10, LAST_UPDATED_AT.name(), Types.LongType.get()), + Types.NestedField.optional(11, LAST_UPDATED_SNAPSHOT_ID.name(), Types.LongType.get())); + } + + protected PartitionStats randomStats(Types.StructType partitionType) { + PartitionData partitionData = new PartitionData(partitionType); + partitionData.set(0, RANDOM.nextInt()); + + return randomStats(partitionData); + } + + protected PartitionStats randomStats(PartitionData partitionData) { + PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); + stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong()); + stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt()); + stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * RANDOM.nextInt(20)); + return stats; + } + + protected File tempDir(String folderName) throws IOException { + return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); + } + + protected static StructLike partitionRecord( + Types.StructType partitionType, String val1, String val2) { + GenericRecord record = GenericRecord.create(partitionType); + record.set(0, val1); + record.set(1, val2); + return record; + } +} diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index 71fdc9507d58..9b93013a9b06 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -18,25 +18,12 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.PartitionStatsHandler.DATA_FILE_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.DATA_RECORD_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_AT; -import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID; import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID; -import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_NAME; -import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT; -import static org.apache.iceberg.PartitionStatsHandler.SPEC_ID; -import static org.apache.iceberg.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES; -import static org.apache.iceberg.PartitionStatsHandler.TOTAL_RECORD_COUNT; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -46,10 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Random; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -61,10 +45,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; @ExtendWith(ParameterizedTestExtension.class) -public abstract class PartitionStatsHandlerTestBase { +public abstract class PartitionStatsHandlerTestBase extends PartitionStatisticsTestBase { public abstract FileFormat format(); @@ -75,35 +58,9 @@ protected static List formatVersions() { @Parameter protected int formatVersion; - private static final Schema SCHEMA = - new Schema( - optional(1, "c1", Types.IntegerType.get()), - optional(2, "c2", Types.StringType.get()), - optional(3, "c3", Types.StringType.get())); - - protected static final PartitionSpec SPEC = - PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build(); - - @TempDir public File temp; - - private static final Random RANDOM = ThreadLocalRandom.current(); - private final Map fileFormatProperty = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format().name()); - // position in StructLike - private static final int DATA_RECORD_COUNT_POSITION = 2; - private static final int DATA_FILE_COUNT_POSITION = 3; - private static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4; - private static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5; - private static final int POSITION_DELETE_FILE_COUNT_POSITION = 6; - private static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7; - private static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8; - private static final int TOTAL_RECORD_COUNT_POSITION = 9; - private static final int LAST_UPDATED_AT_POSITION = 10; - private static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11; - private static final int DV_COUNT_POSITION = 12; - @Test public void testPartitionStatsOnEmptyTable() throws Exception { Table testTable = @@ -223,10 +180,7 @@ public void testAllDatatypePartitionWriting() throws Exception { partitionData.set(13, new BigDecimal("12345678901234567890.1234567890")); partitionData.set(14, Literal.of("10:10:10").to(Types.TimeType.get()).value()); - PartitionStats partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10)); - partitionStats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong()); - partitionStats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt()); - partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * RANDOM.nextInt(20)); + PartitionStats partitionStats = randomStats(partitionData); List expected = Collections.singletonList(partitionStats); PartitionStatisticsFile statisticsFile = PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected); @@ -262,14 +216,8 @@ public void testOptionalFieldsWriting() throws Exception { ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); for (int i = 0; i < 5; i++) { - PartitionData partitionData = - new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); - partitionData.set(0, RANDOM.nextInt()); - - PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); - stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong()); - stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt()); - stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * RANDOM.nextInt(20)); + PartitionStats stats = + randomStats(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); stats.set(POSITION_DELETE_RECORD_COUNT_POSITION, null); stats.set(POSITION_DELETE_FILE_COUNT_POSITION, null); stats.set(EQUALITY_DELETE_RECORD_COUNT_POSITION, null); @@ -315,8 +263,12 @@ public void testOptionalFieldsWriting() throws Exception { } } + /** + * @deprecated will be removed in 1.12.0 + */ @SuppressWarnings("checkstyle:MethodLength") @Test + @Deprecated public void testPartitionStats() throws Exception { Table testTable = TestTables.create( @@ -611,7 +563,11 @@ public void testLatestStatsFileWithBranch() throws Exception { assertThat(PartitionStatsHandler.latestStatsFile(testTable, snapshotBranchBId)).isNull(); } + /** + * @deprecated will be removed in 1.12.0 + */ @Test + @Deprecated public void testReadingStatsWithInvalidSchema() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); Table testTable = @@ -677,7 +633,11 @@ public void testFullComputeFallbackWithInvalidStats() throws Exception { assertThat(partitionStats.get(0).dataFileCount()).isEqualTo(2); } + /** + * @deprecated will be removed in 1.12.0 + */ @Test + @Deprecated public void testV2toV3SchemaEvolution() throws Exception { Table testTable = TestTables.create( @@ -718,14 +678,6 @@ public void testV2toV3SchemaEvolution() throws Exception { } } - private static StructLike partitionRecord( - Types.StructType partitionType, String val1, String val2) { - GenericRecord record = GenericRecord.create(partitionType); - record.set(0, val1); - record.set(1, val2); - return record; - } - private static void computeAndValidatePartitionStats( Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { // compute and commit partition stats file @@ -760,38 +712,6 @@ private static void computeAndValidatePartitionStats( .containsExactlyInAnyOrder(expectedValues); } - private File tempDir(String folderName) throws IOException { - return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); - } - - private Schema invalidOldSchema(Types.StructType unifiedPartitionType) { - // field ids starts from 0 instead of 1 - return new Schema( - Types.NestedField.required(0, PARTITION_FIELD_NAME, unifiedPartitionType), - Types.NestedField.required(1, SPEC_ID.name(), Types.IntegerType.get()), - Types.NestedField.required(2, DATA_RECORD_COUNT.name(), Types.LongType.get()), - Types.NestedField.required(3, DATA_FILE_COUNT.name(), Types.IntegerType.get()), - Types.NestedField.required(4, TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), Types.LongType.get()), - Types.NestedField.optional(5, POSITION_DELETE_RECORD_COUNT.name(), Types.LongType.get()), - Types.NestedField.optional(6, POSITION_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), - Types.NestedField.optional(7, EQUALITY_DELETE_RECORD_COUNT.name(), Types.LongType.get()), - Types.NestedField.optional(8, EQUALITY_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), - Types.NestedField.optional(9, TOTAL_RECORD_COUNT.name(), Types.LongType.get()), - Types.NestedField.optional(10, LAST_UPDATED_AT.name(), Types.LongType.get()), - Types.NestedField.optional(11, LAST_UPDATED_SNAPSHOT_ID.name(), Types.LongType.get())); - } - - private PartitionStats randomStats(Types.StructType partitionType) { - PartitionData partitionData = new PartitionData(partitionType); - partitionData.set(0, RANDOM.nextInt()); - - PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); - stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong()); - stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt()); - stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * RANDOM.nextInt(20)); - return stats; - } - @SuppressWarnings("checkstyle:CyclomaticComplexity") private static boolean isEqual( Comparator partitionComparator, PartitionStats stats1, PartitionStats stats2) { diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatisticsScan.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatisticsScan.java new file mode 100644 index 000000000000..54e03180cedb --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatisticsScan.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionStatisticsScanTestBase; + +public class TestAvroPartitionStatisticsScan extends PartitionStatisticsScanTestBase { + + public FileFormat format() { + return FileFormat.AVRO; + } +} diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestOrcPartitionStatisticsScan.java b/orc/src/test/java/org/apache/iceberg/orc/TestOrcPartitionStatisticsScan.java new file mode 100644 index 000000000000..2040f046ee5d --- /dev/null +++ b/orc/src/test/java/org/apache/iceberg/orc/TestOrcPartitionStatisticsScan.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.orc; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionStatisticsScanTestBase; + +public class TestOrcPartitionStatisticsScan extends PartitionStatisticsScanTestBase { + @Override + public FileFormat format() { + return FileFormat.ORC; + } + + @Override + public void testScanPartitionStatsForCurrentSnapshot() throws Exception { + assertThatThrownBy(super::testScanPartitionStatsForCurrentSnapshot) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot write using unregistered internal data format: ORC"); + } + + @Override + public void testScanPartitionStatsForOlderSnapshot() throws Exception { + assertThatThrownBy(super::testScanPartitionStatsForOlderSnapshot) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot write using unregistered internal data format: ORC"); + } + + @Override + public void testReadingStatsWithInvalidSchema() throws Exception { + assertThatThrownBy(super::testReadingStatsWithInvalidSchema) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot write using unregistered internal data format: ORC"); + } + + @Override + public void testV2toV3SchemaEvolution() throws Exception { + assertThatThrownBy(super::testV2toV3SchemaEvolution) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot write using unregistered internal data format: ORC"); + } +} diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPartitionStatisticsScan.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPartitionStatisticsScan.java new file mode 100644 index 000000000000..5152e31b28e5 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPartitionStatisticsScan.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionStatisticsScanTestBase; + +public class TestParquetPartitionStatisticsScan extends PartitionStatisticsScanTestBase { + + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } +}