diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java index b55ffbb9e6..662f88bb05 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; @@ -124,10 +125,19 @@ public void close() { snapshot.sequenceNumber(), "/metadata/" + UUID.randomUUID() + ".stats", fileIO); + PartitionStatisticsFile partitionStatisticsFile1 = + TaskTestUtils.writePartitionStatsFile( + snapshot.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); String firstMetadataFile = "v1-295495059.metadata.json"; TableMetadata firstMetadata = TaskTestUtils.writeTableMetadata( - fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot); + fileIO, + firstMetadataFile, + List.of(statisticsFile1), + List.of(partitionStatisticsFile1), + snapshot); assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); ManifestFile manifestFile3 = @@ -148,6 +158,11 @@ public void close() { snapshot2.sequenceNumber(), "/metadata/" + UUID.randomUUID() + ".stats", fileIO); + PartitionStatisticsFile partitionStatisticsFile2 = + TaskTestUtils.writePartitionStatsFile( + snapshot2.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); String secondMetadataFile = "v1-295495060.metadata.json"; TableMetadata secondMetadata = TaskTestUtils.writeTableMetadata( @@ -156,18 +171,19 @@ public void close() { firstMetadata, firstMetadataFile, List.of(statisticsFile2), + List.of(partitionStatisticsFile2), snapshot2); assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); List cleanupFiles = - Stream.concat( - secondMetadata.previousFiles().stream() - .map(TableMetadata.MetadataLogEntry::file) - .filter(file -> TaskUtils.exists(file, fileIO)), - secondMetadata.statisticsFiles().stream() - .map(StatisticsFile::path) - .filter(file -> TaskUtils.exists(file, fileIO))) + Stream.of( + secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), + secondMetadata.statisticsFiles().stream().map(StatisticsFile::path), + secondMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path)) + .flatMap(s -> s) + .filter(file -> TaskUtils.exists(file, fileIO)) .toList(); TaskEntity task = @@ -183,12 +199,9 @@ public void close() { assertThatPredicate(handler::canHandleTask).accepts(task); assertThat(handler.handleTask(task, callCtx)).isTrue(); - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) - .rejects(firstMetadataFile); - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) - .rejects(statisticsFile1.path()); - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) - .rejects(statisticsFile2.path()); + for (String cleanupFile : cleanupFiles) { + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile); + } } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index 33e96e2a16..a237aebb3f 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -32,6 +32,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; @@ -506,10 +507,19 @@ public void testTableCleanupMultipleMetadata() throws IOException { snapshot.sequenceNumber(), "/metadata/" + UUID.randomUUID() + ".stats", fileIO); + PartitionStatisticsFile partitionStatisticsFile1 = + TaskTestUtils.writePartitionStatsFile( + snapshot.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); String firstMetadataFile = "v1-295495059.metadata.json"; TableMetadata firstMetadata = TaskTestUtils.writeTableMetadata( - fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot); + fileIO, + firstMetadataFile, + List.of(statisticsFile1), + List.of(partitionStatisticsFile1), + snapshot); assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); ManifestFile manifestFile3 = @@ -530,6 +540,11 @@ public void testTableCleanupMultipleMetadata() throws IOException { snapshot2.sequenceNumber(), "/metadata/" + UUID.randomUUID() + ".stats", fileIO); + PartitionStatisticsFile partitionStatisticsFile2 = + TaskTestUtils.writePartitionStatsFile( + snapshot2.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); String secondMetadataFile = "v1-295495060.metadata.json"; TaskTestUtils.writeTableMetadata( fileIO, @@ -537,6 +552,7 @@ public void testTableCleanupMultipleMetadata() throws IOException { firstMetadata, firstMetadataFile, List.of(statisticsFile2), + List.of(partitionStatisticsFile2), snapshot2); assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); @@ -596,7 +612,9 @@ public void testTableCleanupMultipleMetadata() throws IOException { snapshot.manifestListLocation(), snapshot2.manifestListLocation(), statisticsFile1.path(), - statisticsFile2.path())), + statisticsFile2.path(), + partitionStatisticsFile1.path(), + partitionStatisticsFile2.path())), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java index 83f32f9b3b..a4a00060c7 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java @@ -20,6 +20,7 @@ import jakarta.annotation.Nonnull; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -30,10 +31,12 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.GenericBlobMetadata; import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; @@ -71,7 +74,7 @@ static ManifestFile manifestFile( static TableMetadata writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... snapshots) throws IOException { - return writeTableMetadata(fileIO, metadataFile, null, null, null, snapshots); + return writeTableMetadata(fileIO, metadataFile, null, null, null, null, snapshots); } static TableMetadata writeTableMetadata( @@ -80,7 +83,18 @@ static TableMetadata writeTableMetadata( List statisticsFiles, Snapshot... snapshots) throws IOException { - return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, snapshots); + return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, null, snapshots); + } + + static TableMetadata writeTableMetadata( + FileIO fileIO, + String metadataFile, + List statisticsFiles, + List partitionStatsFiles, + Snapshot... snapshots) + throws IOException { + return writeTableMetadata( + fileIO, metadataFile, null, null, statisticsFiles, partitionStatsFiles, snapshots); } static TableMetadata writeTableMetadata( @@ -89,6 +103,7 @@ static TableMetadata writeTableMetadata( TableMetadata prevMetadata, String prevMetadataFile, List statisticsFiles, + List partitionStatsFiles, Snapshot... snapshots) throws IOException { TableMetadata.Builder tmBuilder; @@ -106,11 +121,15 @@ static TableMetadata writeTableMetadata( .addPartitionSpec(PartitionSpec.unpartitioned()); int statisticsFileIndex = 0; + int partitionStatsFileIndex = 0; for (Snapshot snapshot : snapshots) { tmBuilder.addSnapshot(snapshot); if (statisticsFiles != null) { tmBuilder.setStatistics(statisticsFiles.get(statisticsFileIndex++)); } + if (partitionStatsFiles != null) { + tmBuilder.setPartitionStatistics(partitionStatsFiles.get(partitionStatsFileIndex++)); + } } TableMetadata tableMetadata = tmBuilder.build(); PositionOutputStream out = fileIO.newOutputFile(metadataFile).createOrOverwrite(); @@ -161,4 +180,21 @@ public static StatisticsFile writeStatsFile( puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList()); } } + + public static PartitionStatisticsFile writePartitionStatsFile( + long snapshotId, String statsLocation, FileIO fileIO) throws UncheckedIOException { + PositionOutputStream positionOutputStream; + try { + positionOutputStream = fileIO.newOutputFile(statsLocation).create(); + positionOutputStream.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path(statsLocation) + .fileSizeInBytes(42L) + .build(); + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index f9f1c2f35f..ff791bf188 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; @@ -112,7 +113,6 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) { metaStoreManager, polarisCallContext); - // TODO: handle partition statistics files Stream metadataFileCleanupTasks = getMetadataTaskStream( cleanupTask, @@ -243,12 +243,13 @@ private Stream getMetadataTaskStream( private List> getMetadataFileBatches(TableMetadata tableMetadata, int batchSize) { List> result = new ArrayList<>(); List metadataFiles = - Stream.concat( - Stream.concat( - tableMetadata.previousFiles().stream() - .map(TableMetadata.MetadataLogEntry::file), - tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation)), - tableMetadata.statisticsFiles().stream().map(StatisticsFile::path)) + Stream.of( + tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), + tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation), + tableMetadata.statisticsFiles().stream().map(StatisticsFile::path), + tableMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path)) + .flatMap(s -> s) .toList(); for (int i = 0; i < metadataFiles.size(); i += batchSize) {