diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java index 013f11f87a..8038e97618 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java @@ -24,6 +24,7 @@ import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; +import org.apache.amoro.utils.TableFileUtil; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; @@ -226,8 +227,12 @@ public boolean fileShouldRewrite(DataFile dataFile, List> deletes public boolean segmentShouldRewritePos(DataFile dataFile, List> deletes) { Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported fragment file."); - if (deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count() - >= 2) { + long posDeleteFileCount = + deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count(); + if (posDeleteFileCount == 1) { + return !TableFileUtil.isOptimizingPosDeleteFile( + dataFile.path().toString(), deletes.get(0).path().toString()); + } else if (posDeleteFileCount > 1) { combinePosSegmentFileCount++; return true; } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessStatus.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessStatus.java index c89cbc25ed..77d2ae0236 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessStatus.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessStatus.java @@ -18,16 +18,12 @@ package org.apache.amoro.process; -/** - * Status of any {@link AmoroProcess}. Only UNKNOWN, RUNNING, FINISHED(SUCCESS, CLOSED, FAILED) are - * necessary Stage classes are used to define multiple phases of one process such as OptimizingStage - */ +/** Status of any {@link AmoroProcess}. */ public enum ProcessStatus { UNKNOWN, PENDING, - - /** This status containing scheduled and running phases */ - ACTIVE, + RUNNING, + SUBMITTED, SUCCESS, CLOSED, FAILED diff --git a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java index a3b6be1e01..2df63b7fcf 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java @@ -31,7 +31,7 @@ public abstract class TableProcess implements Amoro protected final TableRuntime tableRuntime; private final SimpleFuture submitFuture = new SimpleFuture(); private final SimpleFuture completeFuture = new SimpleFuture(); - private volatile ProcessStatus status = ProcessStatus.ACTIVE; + private volatile ProcessStatus status = ProcessStatus.RUNNING; private volatile String failedReason; protected TableProcess(T state, TableRuntime tableRuntime) { diff --git a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java index c358e64aff..024484ddca 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java @@ -32,7 +32,7 @@ public class TableProcessState implements ProcessState { private final ServerTableIdentifier tableIdentifier; @StateField private long startTime; @StateField private long endTime = -1L; - @StateField private ProcessStatus status = ProcessStatus.ACTIVE; + @StateField private ProcessStatus status = ProcessStatus.SUBMITTED; @StateField private volatile String failedReason; private volatile Map summary; @@ -108,7 +108,7 @@ protected void setStatus(ProcessStatus status) { || status == ProcessStatus.FAILED || status == ProcessStatus.CLOSED) { endTime = System.currentTimeMillis(); - } else if (this.status != ProcessStatus.ACTIVE && status == ProcessStatus.ACTIVE) { + } else if (this.status != ProcessStatus.SUBMITTED && status == ProcessStatus.SUBMITTED) { endTime = -1L; failedReason = null; summary = null; diff --git a/amoro-format-hudi/src/main/java/org/apache/amoro/formats/hudi/HudiTableDescriptor.java b/amoro-format-hudi/src/main/java/org/apache/amoro/formats/hudi/HudiTableDescriptor.java index a70dd0ab4b..8720958b0b 100644 --- a/amoro-format-hudi/src/main/java/org/apache/amoro/formats/hudi/HudiTableDescriptor.java +++ b/amoro-format-hudi/src/main/java/org/apache/amoro/formats/hudi/HudiTableDescriptor.java @@ -447,7 +447,7 @@ protected OptimizingProcessInfo getOptimizingInfo( HoodieInstant inf = instantMap.get(instantTimestamp + "_" + HoodieInstant.State.INFLIGHT.name()); if (inf != null) { - processInfo.setStatus(ProcessStatus.ACTIVE); + processInfo.setStatus(ProcessStatus.RUNNING); } } return processInfo; diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java index b709e18f21..ea3b7ef89d 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/IcebergFanoutPosDeleteWriter.java @@ -157,7 +157,10 @@ private void flushDeletes() { String fileDir = TableFileUtil.getFileDir(filePath.get().toString()); String deleteFilePath = format.addExtension( - String.format("%s/%s-delete-%s", fileDir, fileName, fileNameSuffix)); + String.format( + "%s/%s", + fileDir, + TableFileUtil.optimizingPosDeleteFileName(fileName, fileNameSuffix))); EncryptedOutputFile outputFile = encryptionManager.encrypt(fileIO.newOutputFile(deleteFilePath)); diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java index e1b57e45dd..e140c79ebf 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/TableFileUtil.java @@ -33,6 +33,7 @@ public class TableFileUtil { private static final Logger LOG = LoggerFactory.getLogger(TableFileUtil.class); + private static final String POS_DELETE_FILE_IDENTIFIER = "delete"; /** * Parse file name form file path @@ -192,4 +193,13 @@ public static String getParent(String path) { Path p = new Path(path); return p.getParent().toString(); } + + public static String optimizingPosDeleteFileName(String dataFileName, String suffix) { + return String.format("%s-%s-%s", dataFileName, POS_DELETE_FILE_IDENTIFIER, suffix); + } + + public static boolean isOptimizingPosDeleteFile(String dataFilePath, String posDeleteFilePath) { + return getFileName(posDeleteFilePath) + .startsWith(String.format("%s-%s", getFileName(dataFilePath), POS_DELETE_FILE_IDENTIFIER)); + } }