Skip to content

Commit

Permalink
Merge branch 'master' into task-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
majin1102 authored Oct 10, 2024
2 parents 58e90c8 + 6ed8162 commit 6fc406f
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.utils.TableFileUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
Expand Down Expand Up @@ -226,8 +227,12 @@ public boolean fileShouldRewrite(DataFile dataFile, List<ContentFile<?>> deletes

public boolean segmentShouldRewritePos(DataFile dataFile, List<ContentFile<?>> deletes) {
Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported fragment file.");
if (deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count()
>= 2) {
long posDeleteFileCount =
deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count();
if (posDeleteFileCount == 1) {
return !TableFileUtil.isOptimizingPosDeleteFile(
dataFile.path().toString(), deletes.get(0).path().toString());
} else if (posDeleteFileCount > 1) {
combinePosSegmentFileCount++;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@

package org.apache.amoro.process;

/**
* Status of any {@link AmoroProcess}. Only UNKNOWN, RUNNING, FINISHED(SUCCESS, CLOSED, FAILED) are
* necessary Stage classes are used to define multiple phases of one process such as OptimizingStage
*/
/** Status of any {@link AmoroProcess}. */
public enum ProcessStatus {
UNKNOWN,
PENDING,

/** This status containing scheduled and running phases */
ACTIVE,
RUNNING,
SUBMITTED,
SUCCESS,
CLOSED,
FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class TableProcess<T extends TableProcessState> implements Amoro
protected final TableRuntime tableRuntime;
private final SimpleFuture submitFuture = new SimpleFuture();
private final SimpleFuture completeFuture = new SimpleFuture();
private volatile ProcessStatus status = ProcessStatus.ACTIVE;
private volatile ProcessStatus status = ProcessStatus.RUNNING;
private volatile String failedReason;

protected TableProcess(T state, TableRuntime tableRuntime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class TableProcessState implements ProcessState {
private final ServerTableIdentifier tableIdentifier;
@StateField private long startTime;
@StateField private long endTime = -1L;
@StateField private ProcessStatus status = ProcessStatus.ACTIVE;
@StateField private ProcessStatus status = ProcessStatus.SUBMITTED;
@StateField private volatile String failedReason;
private volatile Map<String, String> summary;

Expand Down Expand Up @@ -108,7 +108,7 @@ protected void setStatus(ProcessStatus status) {
|| status == ProcessStatus.FAILED
|| status == ProcessStatus.CLOSED) {
endTime = System.currentTimeMillis();
} else if (this.status != ProcessStatus.ACTIVE && status == ProcessStatus.ACTIVE) {
} else if (this.status != ProcessStatus.SUBMITTED && status == ProcessStatus.SUBMITTED) {
endTime = -1L;
failedReason = null;
summary = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ protected OptimizingProcessInfo getOptimizingInfo(
HoodieInstant inf =
instantMap.get(instantTimestamp + "_" + HoodieInstant.State.INFLIGHT.name());
if (inf != null) {
processInfo.setStatus(ProcessStatus.ACTIVE);
processInfo.setStatus(ProcessStatus.RUNNING);
}
}
return processInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ private void flushDeletes() {
String fileDir = TableFileUtil.getFileDir(filePath.get().toString());
String deleteFilePath =
format.addExtension(
String.format("%s/%s-delete-%s", fileDir, fileName, fileNameSuffix));
String.format(
"%s/%s",
fileDir,
TableFileUtil.optimizingPosDeleteFileName(fileName, fileNameSuffix)));
EncryptedOutputFile outputFile =
encryptionManager.encrypt(fileIO.newOutputFile(deleteFilePath));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

public class TableFileUtil {
private static final Logger LOG = LoggerFactory.getLogger(TableFileUtil.class);
private static final String POS_DELETE_FILE_IDENTIFIER = "delete";

/**
* Parse file name form file path
Expand Down Expand Up @@ -192,4 +193,13 @@ public static String getParent(String path) {
Path p = new Path(path);
return p.getParent().toString();
}

public static String optimizingPosDeleteFileName(String dataFileName, String suffix) {
return String.format("%s-%s-%s", dataFileName, POS_DELETE_FILE_IDENTIFIER, suffix);
}

public static boolean isOptimizingPosDeleteFile(String dataFilePath, String posDeleteFilePath) {
return getFileName(posDeleteFilePath)
.startsWith(String.format("%s-%s", getFileName(dataFilePath), POS_DELETE_FILE_IDENTIFIER));
}
}

0 comments on commit 6fc406f

Please sign in to comment.