Skip to content

Commit a1bd63d

Browse files
authored
Core: Add validation for row-level deletes with rewrites (#2865)
1 parent e6c5a94 commit a1bd63d

File tree

10 files changed

+211
-73
lines changed

10 files changed

+211
-73
lines changed

api/src/main/java/org/apache/iceberg/RewriteFiles.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,23 @@ default RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> fil
5454
/**
5555
* Add a rewrite that replaces one set of files with another set that contains the same data.
5656
*
57-
* @param dataFilesToDelete data files that will be replaced (deleted).
58-
* @param deleteFilesToDelete delete files that will be replaced (deleted).
57+
* @param dataFilesToReplace data files that will be replaced (deleted).
58+
* @param deleteFilesToReplace delete files that will be replaced (deleted).
5959
* @param dataFilesToAdd data files that will be added.
6060
* @param deleteFilesToAdd delete files that will be added.
6161
* @return this for method chaining.
6262
*/
63-
RewriteFiles rewriteFiles(Set<DataFile> dataFilesToDelete, Set<DeleteFile> deleteFilesToDelete,
63+
RewriteFiles rewriteFiles(Set<DataFile> dataFilesToReplace, Set<DeleteFile> deleteFilesToReplace,
6464
Set<DataFile> dataFilesToAdd, Set<DeleteFile> deleteFilesToAdd);
65+
66+
/**
67+
* Set the snapshot ID used in any reads for this operation.
68+
* <p>
69+
* Validations will check changes after this snapshot ID. If this is not called, all ancestor snapshots through the
70+
* table's initial snapshot are validated.
71+
*
72+
* @param snapshotId a snapshot ID
73+
* @return this for method chaining
74+
*/
75+
RewriteFiles validateFromSnapshot(long snapshotId);
6576
}

core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121

2222
import java.util.Set;
2323
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
24+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
2425

2526
class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
27+
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
28+
private Long startingSnapshotId = null;
29+
2630
BaseRewriteFiles(String tableName, TableOperations ops) {
2731
super(tableName, ops);
2832

@@ -63,15 +67,16 @@ private void verifyInputAndOutputFiles(Set<DataFile> dataFilesToDelete, Set<Dele
6367
}
6468

6569
@Override
66-
public RewriteFiles rewriteFiles(Set<DataFile> dataFilesToDelete, Set<DeleteFile> deleteFilesToDelete,
70+
public RewriteFiles rewriteFiles(Set<DataFile> dataFilesToReplace, Set<DeleteFile> deleteFilesToReplace,
6771
Set<DataFile> dataFilesToAdd, Set<DeleteFile> deleteFilesToAdd) {
68-
verifyInputAndOutputFiles(dataFilesToDelete, deleteFilesToDelete, dataFilesToAdd, deleteFilesToAdd);
72+
verifyInputAndOutputFiles(dataFilesToReplace, deleteFilesToReplace, dataFilesToAdd, deleteFilesToAdd);
73+
replacedDataFiles.addAll(dataFilesToReplace);
6974

70-
for (DataFile dataFile : dataFilesToDelete) {
75+
for (DataFile dataFile : dataFilesToReplace) {
7176
delete(dataFile);
7277
}
7378

74-
for (DeleteFile deleteFile : deleteFilesToDelete) {
79+
for (DeleteFile deleteFile : deleteFilesToReplace) {
7580
delete(deleteFile);
7681
}
7782

@@ -85,4 +90,18 @@ public RewriteFiles rewriteFiles(Set<DataFile> dataFilesToDelete, Set<DeleteFile
8590

8691
return this;
8792
}
93+
94+
@Override
95+
public RewriteFiles validateFromSnapshot(long snapshotId) {
96+
this.startingSnapshotId = snapshotId;
97+
return this;
98+
}
99+
100+
@Override
101+
protected void validate(TableMetadata base) {
102+
if (replacedDataFiles.size() > 0) {
103+
// if there are replaced data files, there cannot be any new row-level deletes for those data files
104+
validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles);
105+
}
106+
}
88107
}

core/src/main/java/org/apache/iceberg/BaseRowDelta.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ protected void validate(TableMetadata base) {
9494
validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes);
9595
}
9696

97+
// TODO: does this need to check new delete files?
9798
if (conflictDetectionFilter != null) {
9899
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);
99100
}

core/src/main/java/org/apache/iceberg/DeleteFileIndex.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) {
312312
static class Builder {
313313
private final FileIO io;
314314
private final Set<ManifestFile> deleteManifests;
315+
private long minSequenceNumber = 0L;
315316
private Map<Integer, PartitionSpec> specsById = null;
316317
private Expression dataFilter = Expressions.alwaysTrue();
317318
private Expression partitionFilter = Expressions.alwaysTrue();
@@ -323,6 +324,11 @@ static class Builder {
323324
this.deleteManifests = Sets.newHashSet(deleteManifests);
324325
}
325326

327+
Builder afterSequenceNumber(long seq) {
328+
this.minSequenceNumber = seq;
329+
return this;
330+
}
331+
326332
Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
327333
this.specsById = newSpecsById;
328334
return this;
@@ -357,8 +363,10 @@ DeleteFileIndex build() {
357363
.run(deleteFile -> {
358364
try (CloseableIterable<ManifestEntry<DeleteFile>> reader = deleteFile) {
359365
for (ManifestEntry<DeleteFile> entry : reader) {
360-
// copy with stats for better filtering against data file stats
361-
deleteEntries.add(entry.copy());
366+
if (entry.sequenceNumber() > minSequenceNumber) {
367+
// copy with stats for better filtering against data file stats
368+
deleteEntries.add(entry.copy());
369+
}
362370
}
363371
} catch (IOException e) {
364372
throw new RuntimeIOException(e, "Failed to close");

core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4242
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
4343
import org.apache.iceberg.util.CharSequenceSet;
44+
import org.apache.iceberg.util.Pair;
4445
import org.slf4j.Logger;
4546
import org.slf4j.LoggerFactory;
4647

@@ -62,6 +63,9 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
6263
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE, DataOperations.DELETE);
6364
private static final Set<String> VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS =
6465
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE);
66+
// delete files can be added in "overwrite" or "delete" operations
67+
private static final Set<String> VALIDATE_REPLACED_DATA_FILES_OPERATIONS =
68+
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE);
6569

6670
private final String tableName;
6771
private final TableOperations ops;
@@ -253,28 +257,10 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI
253257
return;
254258
}
255259

256-
List<ManifestFile> manifests = Lists.newArrayList();
257-
Set<Long> newSnapshots = Sets.newHashSet();
258-
259-
Long currentSnapshotId = base.currentSnapshot().snapshotId();
260-
while (currentSnapshotId != null && !currentSnapshotId.equals(startingSnapshotId)) {
261-
Snapshot currentSnapshot = ops.current().snapshot(currentSnapshotId);
262-
263-
ValidationException.check(currentSnapshot != null,
264-
"Cannot determine history between starting snapshot %s and current %s",
265-
startingSnapshotId, currentSnapshotId);
266-
267-
if (VALIDATE_ADDED_FILES_OPERATIONS.contains(currentSnapshot.operation())) {
268-
newSnapshots.add(currentSnapshotId);
269-
for (ManifestFile manifest : currentSnapshot.dataManifests()) {
270-
if (manifest.snapshotId() == (long) currentSnapshotId) {
271-
manifests.add(manifest);
272-
}
273-
}
274-
}
275-
276-
currentSnapshotId = currentSnapshot.parentId();
277-
}
260+
Pair<List<ManifestFile>, Set<Long>> history =
261+
validationHistory(base, startingSnapshotId, VALIDATE_ADDED_FILES_OPERATIONS, ManifestContent.DATA);
262+
List<ManifestFile> manifests = history.first();
263+
Set<Long> newSnapshots = history.second();
278264

279265
ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of())
280266
.caseSensitive(caseSensitive)
@@ -297,6 +283,39 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI
297283
}
298284
}
299285

286+
/**
287+
* Validates that no new delete files that must be applied to the given data files have been added to the table since
288+
* a starting snapshot.
289+
*
290+
* @param base table metadata to validate
291+
* @param startingSnapshotId id of the snapshot current at the start of the operation
292+
* @param dataFiles data files to validate have no new row deletes
293+
*/
294+
protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId,
295+
Iterable<DataFile> dataFiles) {
296+
// if there is no current table state, no files have been added
297+
if (base.currentSnapshot() == null) {
298+
return;
299+
}
300+
301+
Pair<List<ManifestFile>, Set<Long>> history =
302+
validationHistory(base, startingSnapshotId, VALIDATE_REPLACED_DATA_FILES_OPERATIONS, ManifestContent.DELETES);
303+
List<ManifestFile> deleteManifests = history.first();
304+
305+
long startingSequenceNumber = startingSnapshotId == null ? 0 : base.snapshot(startingSnapshotId).sequenceNumber();
306+
DeleteFileIndex deletes = DeleteFileIndex.builderFor(ops.io(), deleteManifests)
307+
.afterSequenceNumber(startingSequenceNumber)
308+
.specsById(ops.current().specsById())
309+
.build();
310+
311+
for (DataFile dataFile : dataFiles) {
312+
// if any delete is found that applies to files written in or before the starting snapshot, fail
313+
if (deletes.forDataFile(startingSequenceNumber, dataFile).length > 0) {
314+
throw new ValidationException("Cannot commit, found new delete for replaced data file: %s", dataFile);
315+
}
316+
}
317+
}
318+
300319
@SuppressWarnings("CollectionUndefinedEquality")
301320
protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId,
302321
CharSequenceSet requiredDataFiles, boolean skipDeletes) {
@@ -309,6 +328,31 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI
309328
VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS :
310329
VALIDATE_DATA_FILES_EXIST_OPERATIONS;
311330

331+
Pair<List<ManifestFile>, Set<Long>> history =
332+
validationHistory(base, startingSnapshotId, matchingOperations, ManifestContent.DATA);
333+
List<ManifestFile> manifests = history.first();
334+
Set<Long> newSnapshots = history.second();
335+
336+
ManifestGroup matchingDeletesGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of())
337+
.filterManifestEntries(entry -> entry.status() != ManifestEntry.Status.ADDED &&
338+
newSnapshots.contains(entry.snapshotId()) && requiredDataFiles.contains(entry.file().path()))
339+
.specsById(base.specsById())
340+
.ignoreExisting();
341+
342+
try (CloseableIterator<ManifestEntry<DataFile>> deletes = matchingDeletesGroup.entries().iterator()) {
343+
if (deletes.hasNext()) {
344+
throw new ValidationException("Cannot commit, missing data files: %s",
345+
Iterators.toString(Iterators.transform(deletes, entry -> entry.file().path().toString())));
346+
}
347+
348+
} catch (IOException e) {
349+
throw new UncheckedIOException("Failed to validate required files exist", e);
350+
}
351+
}
352+
353+
private Pair<List<ManifestFile>, Set<Long>> validationHistory(TableMetadata base, Long startingSnapshotId,
354+
Set<String> matchingOperations,
355+
ManifestContent content) {
312356
List<ManifestFile> manifests = Lists.newArrayList();
313357
Set<Long> newSnapshots = Sets.newHashSet();
314358

@@ -322,31 +366,25 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI
322366

323367
if (matchingOperations.contains(currentSnapshot.operation())) {
324368
newSnapshots.add(currentSnapshotId);
325-
for (ManifestFile manifest : currentSnapshot.dataManifests()) {
326-
if (manifest.snapshotId() == (long) currentSnapshotId) {
327-
manifests.add(manifest);
369+
if (content == ManifestContent.DATA) {
370+
for (ManifestFile manifest : currentSnapshot.dataManifests()) {
371+
if (manifest.snapshotId() == (long) currentSnapshotId) {
372+
manifests.add(manifest);
373+
}
374+
}
375+
} else {
376+
for (ManifestFile manifest : currentSnapshot.deleteManifests()) {
377+
if (manifest.snapshotId() == (long) currentSnapshotId) {
378+
manifests.add(manifest);
379+
}
328380
}
329381
}
330382
}
331383

332384
currentSnapshotId = currentSnapshot.parentId();
333385
}
334386

335-
ManifestGroup matchingDeletesGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of())
336-
.filterManifestEntries(entry -> entry.status() != ManifestEntry.Status.ADDED &&
337-
newSnapshots.contains(entry.snapshotId()) && requiredDataFiles.contains(entry.file().path()))
338-
.specsById(base.specsById())
339-
.ignoreExisting();
340-
341-
try (CloseableIterator<ManifestEntry<DataFile>> deletes = matchingDeletesGroup.entries().iterator()) {
342-
if (deletes.hasNext()) {
343-
throw new ValidationException("Cannot commit, missing data files: %s",
344-
Iterators.toString(Iterators.transform(deletes, entry -> entry.file().path().toString())));
345-
}
346-
347-
} catch (IOException e) {
348-
throw new UncheckedIOException("Failed to validate required files exist", e);
349-
}
387+
return Pair.of(manifests, newSnapshots);
350388
}
351389

352390
@Override

core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,14 @@ public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
197197
@Override
198198
public RewriteDataFilesActionResult execute() {
199199
CloseableIterable<FileScanTask> fileScanTasks = null;
200+
if (table.currentSnapshot() == null) {
201+
return RewriteDataFilesActionResult.empty();
202+
}
203+
204+
long startingSnapshotId = table.currentSnapshot().snapshotId();
200205
try {
201206
fileScanTasks = table.newScan()
207+
.useSnapshot(startingSnapshotId)
202208
.caseSensitive(caseSensitive)
203209
.ignoreResiduals()
204210
.filter(filter)
@@ -241,7 +247,7 @@ public RewriteDataFilesActionResult execute() {
241247
List<DataFile> currentDataFiles = combinedScanTasks.stream()
242248
.flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
243249
.collect(Collectors.toList());
244-
replaceDataFiles(currentDataFiles, addedDataFiles);
250+
replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId);
245251

246252
return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
247253
}
@@ -262,10 +268,12 @@ private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
262268
return tasksGroupedByPartition.asMap();
263269
}
264270

265-
private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
271+
private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles,
272+
long startingSnapshotId) {
266273
try {
267-
RewriteFiles rewriteFiles = table.newRewrite();
268-
rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
274+
RewriteFiles rewriteFiles = table.newRewrite()
275+
.validateFromSnapshot(startingSnapshotId)
276+
.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
269277
commit(rewriteFiles);
270278
} catch (Exception e) {
271279
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))

core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicBoolean;
3030
import org.apache.iceberg.DataFile;
31+
import org.apache.iceberg.RewriteFiles;
3132
import org.apache.iceberg.Table;
3233
import org.apache.iceberg.exceptions.CommitStateUnknownException;
3334
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -46,9 +47,16 @@ public class RewriteDataFilesCommitManager {
4647
private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesCommitManager.class);
4748

4849
private final Table table;
50+
private final long startingSnapshotId;
4951

52+
// constructor used for testing
5053
public RewriteDataFilesCommitManager(Table table) {
54+
this(table, table.currentSnapshot().snapshotId());
55+
}
56+
57+
public RewriteDataFilesCommitManager(Table table, long startingSnapshotId) {
5158
this.table = table;
59+
this.startingSnapshotId = startingSnapshotId;
5260
}
5361

5462
/**
@@ -64,9 +72,10 @@ public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
6472
addedDataFiles = Sets.union(addedDataFiles, group.addedFiles());
6573
}
6674

67-
table.newRewrite()
68-
.rewriteFiles(rewrittenDataFiles, addedDataFiles)
69-
.commit();
75+
RewriteFiles rewrite = table.newRewrite()
76+
.validateFromSnapshot(startingSnapshotId)
77+
.rewriteFiles(rewrittenDataFiles, addedDataFiles);
78+
rewrite.commit();
7079
}
7180

7281
/**

0 commit comments

Comments
 (0)