Skip to content

Commit

Permalink
MergingSnapshotProducer: Change file holder to be generic
Browse files Browse the repository at this point in the history
This will allow re-using it for insert files for specific data versions in future commits
  • Loading branch information
jasonf20 committed Dec 11, 2023
1 parent f21199d commit 934a3f9
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// update data
private final List<DataFile> newDataFiles = Lists.newArrayList();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final Map<Integer, List<FileHolder<DeleteFile>>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
Expand Down Expand Up @@ -237,22 +237,22 @@ protected void add(DataFile file) {
/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
add(new DeleteFileHolder(file));
addDelete(new FileHolder<>(file));
}

/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file, long dataSequenceNumber) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
add(new DeleteFileHolder(file, dataSequenceNumber));
addDelete(new FileHolder<>(file, dataSequenceNumber));
}

private void add(DeleteFileHolder fileHolder) {
int specId = fileHolder.deleteFile().specId();
private void addDelete(FileHolder<DeleteFile> fileHolder) {
int specId = fileHolder.file().specId();
PartitionSpec fileSpec = ops.current().spec(specId);
List<DeleteFileHolder> deleteFiles =
List<FileHolder<DeleteFile>> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
addedFilesSummary.addedFile(fileSpec, fileHolder.file());
hasNewDeleteFiles = true;
}

Expand Down Expand Up @@ -1006,9 +1006,9 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
deleteFiles.forEach(
df -> {
if (df.dataSequenceNumber() != null) {
writer.add(df.deleteFile(), df.dataSequenceNumber());
writer.add(df.file(), df.dataSequenceNumber());
} else {
writer.add(df.deleteFile());
writer.add(df.file());
}
});
} finally {
Expand Down Expand Up @@ -1132,33 +1132,33 @@ protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
}
}

private static class DeleteFileHolder {
private final DeleteFile deleteFile;
private static class FileHolder<T extends ContentFile<?>>{
private final T file;
private final Long dataSequenceNumber;

/**
* Wrap a delete file for commit with a given data sequence number
*
* @param deleteFile delete file
* @param file content file
* @param dataSequenceNumber data sequence number to apply
*/
DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
this.deleteFile = deleteFile;
FileHolder(T file, long dataSequenceNumber) {
this.file = file;
this.dataSequenceNumber = dataSequenceNumber;
}

/**
* Wrap a delete file for commit with the latest sequence number
*
* @param deleteFile delete file
* @param file the content fle
*/
DeleteFileHolder(DeleteFile deleteFile) {
this.deleteFile = deleteFile;
FileHolder(T file) {
this.file = file;
this.dataSequenceNumber = null;
}

public DeleteFile deleteFile() {
return deleteFile;
public T file() {
return file;
}

public Long dataSequenceNumber() {
Expand Down

0 comments on commit 934a3f9

Please sign in to comment.