Skip to content

Commit e276791

Browse files
Add cleanup support for partition-level statistics files when DROP TABLE PURGE (#1508)
* cleaning up partition stats * update partition stat file extension * update test partition stat write impl
1 parent da23e66 commit e276791

File tree

4 files changed

+93
-25
lines changed

4 files changed

+93
-25
lines changed

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.atomic.AtomicInteger;
3636
import java.util.stream.Stream;
3737
import org.apache.iceberg.ManifestFile;
38+
import org.apache.iceberg.PartitionStatisticsFile;
3839
import org.apache.iceberg.Snapshot;
3940
import org.apache.iceberg.StatisticsFile;
4041
import org.apache.iceberg.TableMetadata;
@@ -124,10 +125,19 @@ public void close() {
124125
snapshot.sequenceNumber(),
125126
"/metadata/" + UUID.randomUUID() + ".stats",
126127
fileIO);
128+
PartitionStatisticsFile partitionStatisticsFile1 =
129+
TaskTestUtils.writePartitionStatsFile(
130+
snapshot.snapshotId(),
131+
"/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet",
132+
fileIO);
127133
String firstMetadataFile = "v1-295495059.metadata.json";
128134
TableMetadata firstMetadata =
129135
TaskTestUtils.writeTableMetadata(
130-
fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
136+
fileIO,
137+
firstMetadataFile,
138+
List.of(statisticsFile1),
139+
List.of(partitionStatisticsFile1),
140+
snapshot);
131141
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
132142

133143
ManifestFile manifestFile3 =
@@ -148,6 +158,11 @@ public void close() {
148158
snapshot2.sequenceNumber(),
149159
"/metadata/" + UUID.randomUUID() + ".stats",
150160
fileIO);
161+
PartitionStatisticsFile partitionStatisticsFile2 =
162+
TaskTestUtils.writePartitionStatsFile(
163+
snapshot2.snapshotId(),
164+
"/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet",
165+
fileIO);
151166
String secondMetadataFile = "v1-295495060.metadata.json";
152167
TableMetadata secondMetadata =
153168
TaskTestUtils.writeTableMetadata(
@@ -156,18 +171,19 @@ public void close() {
156171
firstMetadata,
157172
firstMetadataFile,
158173
List.of(statisticsFile2),
174+
List.of(partitionStatisticsFile2),
159175
snapshot2);
160176
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
161177
assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
162178

163179
List<String> cleanupFiles =
164-
Stream.concat(
165-
secondMetadata.previousFiles().stream()
166-
.map(TableMetadata.MetadataLogEntry::file)
167-
.filter(file -> TaskUtils.exists(file, fileIO)),
168-
secondMetadata.statisticsFiles().stream()
169-
.map(StatisticsFile::path)
170-
.filter(file -> TaskUtils.exists(file, fileIO)))
180+
Stream.of(
181+
secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
182+
secondMetadata.statisticsFiles().stream().map(StatisticsFile::path),
183+
secondMetadata.partitionStatisticsFiles().stream()
184+
.map(PartitionStatisticsFile::path))
185+
.flatMap(s -> s)
186+
.filter(file -> TaskUtils.exists(file, fileIO))
171187
.toList();
172188

173189
TaskEntity task =
@@ -183,12 +199,9 @@ public void close() {
183199
assertThatPredicate(handler::canHandleTask).accepts(task);
184200
assertThat(handler.handleTask(task, callCtx)).isTrue();
185201

186-
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
187-
.rejects(firstMetadataFile);
188-
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
189-
.rejects(statisticsFile1.path());
190-
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
191-
.rejects(statisticsFile2.path());
202+
for (String cleanupFile : cleanupFiles) {
203+
assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile);
204+
}
192205
}
193206
}
194207

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.commons.codec.binary.Base64;
3434
import org.apache.iceberg.ManifestFile;
3535
import org.apache.iceberg.ManifestFiles;
36+
import org.apache.iceberg.PartitionStatisticsFile;
3637
import org.apache.iceberg.Snapshot;
3738
import org.apache.iceberg.StatisticsFile;
3839
import org.apache.iceberg.TableMetadata;
@@ -519,10 +520,19 @@ public void testTableCleanupMultipleMetadata() throws IOException {
519520
snapshot.sequenceNumber(),
520521
"/metadata/" + UUID.randomUUID() + ".stats",
521522
fileIO);
523+
PartitionStatisticsFile partitionStatisticsFile1 =
524+
TaskTestUtils.writePartitionStatsFile(
525+
snapshot.snapshotId(),
526+
"/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet",
527+
fileIO);
522528
String firstMetadataFile = "v1-295495059.metadata.json";
523529
TableMetadata firstMetadata =
524530
TaskTestUtils.writeTableMetadata(
525-
fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
531+
fileIO,
532+
firstMetadataFile,
533+
List.of(statisticsFile1),
534+
List.of(partitionStatisticsFile1),
535+
snapshot);
526536
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
527537

528538
ManifestFile manifestFile3 =
@@ -543,13 +553,19 @@ public void testTableCleanupMultipleMetadata() throws IOException {
543553
snapshot2.sequenceNumber(),
544554
"/metadata/" + UUID.randomUUID() + ".stats",
545555
fileIO);
556+
PartitionStatisticsFile partitionStatisticsFile2 =
557+
TaskTestUtils.writePartitionStatsFile(
558+
snapshot2.snapshotId(),
559+
"/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet",
560+
fileIO);
546561
String secondMetadataFile = "v1-295495060.metadata.json";
547562
TaskTestUtils.writeTableMetadata(
548563
fileIO,
549564
secondMetadataFile,
550565
firstMetadata,
551566
firstMetadataFile,
552567
List.of(statisticsFile2),
568+
List.of(partitionStatisticsFile2),
553569
snapshot2);
554570
assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
555571
assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
@@ -609,7 +625,9 @@ public void testTableCleanupMultipleMetadata() throws IOException {
609625
snapshot.manifestListLocation(),
610626
snapshot2.manifestListLocation(),
611627
statisticsFile1.path(),
612-
statisticsFile2.path())),
628+
statisticsFile2.path(),
629+
partitionStatisticsFile1.path(),
630+
partitionStatisticsFile2.path())),
613631
entity ->
614632
entity.readData(
615633
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)));

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import jakarta.annotation.Nonnull;
2222
import java.io.IOException;
23+
import java.io.UncheckedIOException;
2324
import java.nio.ByteBuffer;
2425
import java.nio.charset.StandardCharsets;
2526
import java.util.Arrays;
@@ -30,10 +31,12 @@
3031
import org.apache.iceberg.FileFormat;
3132
import org.apache.iceberg.GenericBlobMetadata;
3233
import org.apache.iceberg.GenericStatisticsFile;
34+
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
3335
import org.apache.iceberg.ManifestFile;
3436
import org.apache.iceberg.ManifestFiles;
3537
import org.apache.iceberg.ManifestWriter;
3638
import org.apache.iceberg.PartitionSpec;
39+
import org.apache.iceberg.PartitionStatisticsFile;
3740
import org.apache.iceberg.Schema;
3841
import org.apache.iceberg.Snapshot;
3942
import org.apache.iceberg.SortOrder;
@@ -71,7 +74,7 @@ static ManifestFile manifestFile(
7174

7275
static TableMetadata writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... snapshots)
7376
throws IOException {
74-
return writeTableMetadata(fileIO, metadataFile, null, null, null, snapshots);
77+
return writeTableMetadata(fileIO, metadataFile, null, null, null, null, snapshots);
7578
}
7679

7780
static TableMetadata writeTableMetadata(
@@ -80,7 +83,18 @@ static TableMetadata writeTableMetadata(
8083
List<StatisticsFile> statisticsFiles,
8184
Snapshot... snapshots)
8285
throws IOException {
83-
return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, snapshots);
86+
return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, null, snapshots);
87+
}
88+
89+
static TableMetadata writeTableMetadata(
90+
FileIO fileIO,
91+
String metadataFile,
92+
List<StatisticsFile> statisticsFiles,
93+
List<PartitionStatisticsFile> partitionStatsFiles,
94+
Snapshot... snapshots)
95+
throws IOException {
96+
return writeTableMetadata(
97+
fileIO, metadataFile, null, null, statisticsFiles, partitionStatsFiles, snapshots);
8498
}
8599

86100
static TableMetadata writeTableMetadata(
@@ -89,6 +103,7 @@ static TableMetadata writeTableMetadata(
89103
TableMetadata prevMetadata,
90104
String prevMetadataFile,
91105
List<StatisticsFile> statisticsFiles,
106+
List<PartitionStatisticsFile> partitionStatsFiles,
92107
Snapshot... snapshots)
93108
throws IOException {
94109
TableMetadata.Builder tmBuilder;
@@ -106,11 +121,15 @@ static TableMetadata writeTableMetadata(
106121
.addPartitionSpec(PartitionSpec.unpartitioned());
107122

108123
int statisticsFileIndex = 0;
124+
int partitionStatsFileIndex = 0;
109125
for (Snapshot snapshot : snapshots) {
110126
tmBuilder.addSnapshot(snapshot);
111127
if (statisticsFiles != null) {
112128
tmBuilder.setStatistics(statisticsFiles.get(statisticsFileIndex++));
113129
}
130+
if (partitionStatsFiles != null) {
131+
tmBuilder.setPartitionStatistics(partitionStatsFiles.get(partitionStatsFileIndex++));
132+
}
114133
}
115134
TableMetadata tableMetadata = tmBuilder.build();
116135
PositionOutputStream out = fileIO.newOutputFile(metadataFile).createOrOverwrite();
@@ -161,4 +180,21 @@ public static StatisticsFile writeStatsFile(
161180
puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList());
162181
}
163182
}
183+
184+
public static PartitionStatisticsFile writePartitionStatsFile(
185+
long snapshotId, String statsLocation, FileIO fileIO) throws UncheckedIOException {
186+
PositionOutputStream positionOutputStream;
187+
try {
188+
positionOutputStream = fileIO.newOutputFile(statsLocation).create();
189+
positionOutputStream.close();
190+
} catch (IOException e) {
191+
throw new UncheckedIOException(e);
192+
}
193+
194+
return ImmutableGenericPartitionStatisticsFile.builder()
195+
.snapshotId(snapshotId)
196+
.path(statsLocation)
197+
.fileSizeInBytes(42L)
198+
.build();
199+
}
164200
}

service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.stream.Collectors;
2626
import java.util.stream.Stream;
2727
import org.apache.iceberg.ManifestFile;
28+
import org.apache.iceberg.PartitionStatisticsFile;
2829
import org.apache.iceberg.Snapshot;
2930
import org.apache.iceberg.StatisticsFile;
3031
import org.apache.iceberg.TableMetadata;
@@ -112,7 +113,6 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
112113
metaStoreManager,
113114
polarisCallContext);
114115

115-
// TODO: handle partition statistics files
116116
Stream<TaskEntity> metadataFileCleanupTasks =
117117
getMetadataTaskStream(
118118
cleanupTask,
@@ -243,12 +243,13 @@ private Stream<TaskEntity> getMetadataTaskStream(
243243
private List<List<String>> getMetadataFileBatches(TableMetadata tableMetadata, int batchSize) {
244244
List<List<String>> result = new ArrayList<>();
245245
List<String> metadataFiles =
246-
Stream.concat(
247-
Stream.concat(
248-
tableMetadata.previousFiles().stream()
249-
.map(TableMetadata.MetadataLogEntry::file),
250-
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation)),
251-
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path))
246+
Stream.of(
247+
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
248+
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation),
249+
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path),
250+
tableMetadata.partitionStatisticsFiles().stream()
251+
.map(PartitionStatisticsFile::path))
252+
.flatMap(s -> s)
252253
.toList();
253254

254255
for (int i = 0; i < metadataFiles.size(); i += batchSize) {

0 commit comments

Comments
 (0)