Skip to content

Commit

Permalink
[AMORO-1720] Fix Mixed Format KeyedTable expiring all the snapshots w…
Browse files Browse the repository at this point in the history
…ith optimized sequence (apache#2394)

* should not expire the latest snapshot contains optimized sequence

* add visible for testing

* add fetchLatestNonOptimizedSnapshotTime for base store

* get hive locations return the uri path

* refactor codes and fix comments

* improve for exclude files is empty for expring snapshots

---------

Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
  • Loading branch information
2 people authored and ShawHee committed Dec 29, 2023
1 parent bf06bdd commit d16c373
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@

import static org.apache.iceberg.relocated.com.google.common.primitives.Longs.min;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.netease.arctic.ams.api.CommitMetaProducer;
import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.io.PathInfo;
Expand All @@ -34,8 +29,6 @@
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.server.utils.IcebergTableUtil;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.TableFileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
Expand All @@ -61,6 +54,11 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Optional;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -152,21 +150,23 @@ public void cleanOrphanFiles(TableRuntime tableRuntime) {

@Override
public void expireSnapshots(TableRuntime tableRuntime) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();
if (!tableConfiguration.isExpireSnapshotEnabled()) {
if (!expireSnapshotEnabled(tableRuntime)) {
return;
}
expireSnapshots(
olderThanSnapshotNeedToExpire(tableRuntime), expireSnapshotNeedToExcludeFiles());
expireSnapshots(mustOlderThan(tableRuntime, System.currentTimeMillis()));
}

public void expireSnapshots(long mustOlderThan) {
expireSnapshots(
olderThanSnapshotNeedToExpire(mustOlderThan), expireSnapshotNeedToExcludeFiles());
protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();
return tableConfiguration.isExpireSnapshotEnabled();
}

@VisibleForTesting
public void expireSnapshots(long olderThan, Set<String> exclude) {
void expireSnapshots(long mustOlderThan) {
expireSnapshots(mustOlderThan, expireSnapshotNeedToExcludeFiles());
}

private void expireSnapshots(long olderThan, Set<String> exclude) {
LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude);
final AtomicInteger toDeleteFiles = new AtomicInteger(0);
final AtomicInteger deleteFiles = new AtomicInteger(0);
Expand All @@ -178,10 +178,14 @@ public void expireSnapshots(long olderThan, Set<String> exclude) {
.deleteWith(
file -> {
try {
String filePath = TableFileUtil.getUriPath(file);
if (!exclude.contains(filePath)
&& !exclude.contains(new Path(filePath).getParent().toString())) {
if (exclude.isEmpty()) {
arcticFileIO().deleteFile(file);
} else {
String fileUriPath = TableFileUtil.getUriPath(file);
if (!exclude.contains(fileUriPath)
&& !exclude.contains(new Path(fileUriPath).getParent().toString())) {
arcticFileIO().deleteFile(file);
}
}
parentDirectory.add(new Path(file).getParent().toString());
deleteFiles.incrementAndGet();
Expand Down Expand Up @@ -278,26 +282,20 @@ protected void cleanDanglingDeleteFiles() {
LOG.info("{} total delete {} dangling delete files", table.name(), danglingDeleteFilesCnt);
}

protected long olderThanSnapshotNeedToExpire(TableRuntime tableRuntime) {
long optimizingSnapshotTime = fetchOptimizingSnapshotTime(table, tableRuntime);
return olderThanSnapshotNeedToExpire(optimizingSnapshotTime);
protected long mustOlderThan(TableRuntime tableRuntime, long now) {
return min(
// The snapshots keep time
now - snapshotsKeepTime(tableRuntime),
// The snapshot optimizing plan based should not be expired for committing
fetchOptimizingPlanSnapshotTime(table, tableRuntime),
// The latest non-optimized snapshot should not be expired for data expiring
fetchLatestNonOptimizedSnapshotTime(table),
// The latest flink committed snapshot should not be expired for recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table));
}

protected long olderThanSnapshotNeedToExpire(long mustOlderThan) {
long baseSnapshotsKeepTime =
CompatiblePropertyUtil.propertyAsLong(
table.properties(),
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)
* 60
* 1000;
// Latest checkpoint of flink need retain. If Flink does not continuously commit new snapshots,
// it can lead to issues with table partitions not expiring.
long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table);
// Retain the latest non-optimized snapshot for remember the real latest update
long latestNonOptimizedTime = fetchLatestNonOptimizedSnapshotTime(table);
long olderThan = System.currentTimeMillis() - baseSnapshotsKeepTime;
return min(latestFlinkCommitTime, latestNonOptimizedTime, mustOlderThan, olderThan);
protected long snapshotsKeepTime(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().getSnapshotTTLMinutes() * 60 * 1000;
}

protected Set<String> expireSnapshotNeedToExcludeFiles() {
Expand Down Expand Up @@ -382,31 +380,28 @@ private int clearInternalTableDanglingDeleteFiles() {
}

/**
* When committing a snapshot, Flink will write a checkpoint id into the snapshot summary. The
* latest snapshot with checkpoint id should not be expired or the flink job can't recover from
* state.
* When committing a snapshot, Flink will write a checkpoint id into the snapshot summary, which
* will be used when Flink job recovers from the checkpoint.
*
* @param table -
* @return commit time of snapshot with the latest flink checkpointId in summary
* @param table table
* @return commit time of snapshot with the latest flink checkpointId in summary, return
* Long.MAX_VALUE if not exist
*/
public static long fetchLatestFlinkCommittedSnapshotTime(Table table) {
long latestCommitTime = Long.MAX_VALUE;
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.summary().containsKey(FLINK_MAX_COMMITTED_CHECKPOINT_ID)) {
latestCommitTime = snapshot.timestampMillis();
}
}
return latestCommitTime;
Snapshot snapshot = findLatestSnapshotContainsKey(table, FLINK_MAX_COMMITTED_CHECKPOINT_ID);
return snapshot == null ? Long.MAX_VALUE : snapshot.timestampMillis();
}

/**
* When optimizing tasks are not committed, the snapshot with which it planned should not be
* expired, since it will use the snapshot to check conflict when committing.
* When the current optimizing process not committed, get the time of snapshot for optimizing
* process planned based. This snapshot will be used when optimizing process committing.
*
* @param table - table
* @return commit time of snapshot for optimizing
* @param table table
* @param tableRuntime table runtime
* @return time of snapshot for optimizing process planned based, return Long.MAX_VALUE if no
* optimizing process exists
*/
public static long fetchOptimizingSnapshotTime(Table table, TableRuntime tableRuntime) {
public static long fetchOptimizingPlanSnapshotTime(Table table, TableRuntime tableRuntime) {
if (tableRuntime.getOptimizingStatus().isProcessing()) {
long fromSnapshotId = tableRuntime.getOptimizingProcess().getTargetSnapshotId();

Expand All @@ -419,6 +414,16 @@ public static long fetchOptimizingSnapshotTime(Table table, TableRuntime tableRu
return Long.MAX_VALUE;
}

public static Snapshot findLatestSnapshotContainsKey(Table table, String summaryKey) {
Snapshot latestSnapshot = null;
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.summary().containsKey(summaryKey)) {
latestSnapshot = snapshot;
}
}
return latestSnapshot;
}

/**
* When expiring historic data and `data-expire.since` is `CURRENT_SNAPSHOT`, the latest snapshot
* should not be produced by Amoro.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package com.netease.arctic.server.optimizing.maintainer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import static com.netease.arctic.utils.ArcticTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST;
import static org.apache.iceberg.relocated.com.google.common.primitives.Longs.min;

import com.netease.arctic.IcebergFileEntry;
import com.netease.arctic.data.FileNameRules;
import com.netease.arctic.hive.utils.TableTypeUtil;
Expand All @@ -32,10 +33,8 @@
import com.netease.arctic.table.BaseTable;
import com.netease.arctic.table.ChangeTable;
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.ArcticTableUtil;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
Expand All @@ -46,9 +45,12 @@
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -134,6 +136,7 @@ public void expireSnapshots(TableRuntime tableRuntime) {
baseMaintainer.expireSnapshots(tableRuntime);
}

@VisibleForTesting
protected void expireSnapshots(long mustOlderThan) {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots(mustOlderThan);
Expand Down Expand Up @@ -315,43 +318,46 @@ public Set<String> orphanFileCleanNeedToExcludeFiles() {
}

@Override
public void expireSnapshots(TableRuntime tableRuntime) {
expireSnapshots(Long.MAX_VALUE);
@VisibleForTesting
void expireSnapshots(long mustOlderThan) {
expireFiles(mustOlderThan);
super.expireSnapshots(mustOlderThan);
}

@Override
public void expireSnapshots(long mustOlderThan) {
long changeTTLPoint = getChangeTTLPoint();
expireFiles(Longs.min(getChangeTTLPoint(), mustOlderThan));
super.expireSnapshots(Longs.min(changeTTLPoint, mustOlderThan));
public void expireSnapshots(TableRuntime tableRuntime) {
if (!expireSnapshotEnabled(tableRuntime)) {
return;
}
long now = System.currentTimeMillis();
expireFiles(now - snapshotsKeepTime(tableRuntime));
expireSnapshots(mustOlderThan(tableRuntime, now));
}

@Override
protected long olderThanSnapshotNeedToExpire(long mustOlderThan) {
long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unkeyedTable);
return Longs.min(latestChangeFlinkCommitTime, mustOlderThan);
protected long mustOlderThan(TableRuntime tableRuntime, long now) {
return min(
// The change data ttl time
now - snapshotsKeepTime(tableRuntime),
// The latest flink committed snapshot should not be expired for recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table));
}

@Override
protected Set<String> expireSnapshotNeedToExcludeFiles() {
return Sets.union(baseFiles, hiveFiles);
}

@Override
protected long snapshotsKeepTime(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().getChangeDataTTLMinutes() * 60 * 1000;
}

public void expireFiles(long ttlPoint) {
List<IcebergFileEntry> expiredDataFileEntries = getExpiredDataFileEntries(ttlPoint);
deleteChangeFile(expiredDataFileEntries);
}

private long getChangeTTLPoint() {
return System.currentTimeMillis()
- CompatiblePropertyUtil.propertyAsLong(
unkeyedTable.properties(),
TableProperties.CHANGE_DATA_TTL,
TableProperties.CHANGE_DATA_TTL_DEFAULT)
* 60
* 1000;
}

private List<IcebergFileEntry> getExpiredDataFileEntries(long ttlPoint) {
TableEntriesScan entriesScan =
TableEntriesScan.builder(unkeyedTable).includeFileContent(FileContent.DATA).build();
Expand Down Expand Up @@ -473,6 +479,40 @@ public Set<String> orphanFileCleanNeedToExcludeFiles() {
return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles));
}

@Override
protected long mustOlderThan(TableRuntime tableRuntime, long now) {
return min(
// The snapshots keep time for base store
now - snapshotsKeepTime(tableRuntime),
// The snapshot optimizing plan based should not be expired for committing
fetchOptimizingPlanSnapshotTime(table, tableRuntime),
// The latest non-optimized snapshot should not be expired for data expiring
fetchLatestNonOptimizedSnapshotTime(table),
// The latest flink committed snapshot should not be expired for recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table),
// The latest snapshot contains the optimized sequence should not be expired for MOR
fetchLatestOptimizedSequenceSnapshotTime(table));
}

/**
* When committing a snapshot to the base store of mixed format keyed table, it will store the
* optimized sequence to the snapshot, and will set a flag to the summary of this snapshot.
*
* <p>The optimized sequence will affect the correctness of MOR.
*
* @param table table
* @return time of the latest snapshot with the optimized sequence flag in summary
*/
private long fetchLatestOptimizedSequenceSnapshotTime(Table table) {
if (arcticTable.isKeyedTable()) {
Snapshot snapshot =
findLatestSnapshotContainsKey(table, BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST);
return snapshot == null ? Long.MAX_VALUE : snapshot.timestampMillis();
} else {
return Long.MAX_VALUE;
}
}

@Override
protected Set<String> expireSnapshotNeedToExcludeFiles() {
return Sets.union(changeFiles, hiveFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package com.netease.arctic.server.utils;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.netease.arctic.IcebergFileEntry;
import com.netease.arctic.scan.TableEntriesScan;
import com.netease.arctic.server.ArcticServiceConstants;
Expand All @@ -39,6 +36,9 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Optional;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Loading

0 comments on commit d16c373

Please sign in to comment.