Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1720] Fix Mixed Format KeyedTable expiring all the snapshots with optimized sequence #2394

Merged
merged 11 commits into from
Dec 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@

import static org.apache.iceberg.relocated.com.google.common.primitives.Longs.min;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.netease.arctic.ams.api.CommitMetaProducer;
import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.io.PathInfo;
Expand All @@ -34,8 +29,6 @@
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.server.utils.IcebergTableUtil;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.TableFileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
Expand All @@ -61,6 +54,11 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Optional;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -152,21 +150,23 @@

@Override
public void expireSnapshots(TableRuntime tableRuntime) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();
if (!tableConfiguration.isExpireSnapshotEnabled()) {
if (!expireSnapshotEnabled(tableRuntime)) {
return;
}
expireSnapshots(
olderThanSnapshotNeedToExpire(tableRuntime), expireSnapshotNeedToExcludeFiles());
expireSnapshots(mustOlderThan(tableRuntime, System.currentTimeMillis()));
}

public void expireSnapshots(long mustOlderThan) {
expireSnapshots(
olderThanSnapshotNeedToExpire(mustOlderThan), expireSnapshotNeedToExcludeFiles());
protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();
return tableConfiguration.isExpireSnapshotEnabled();
}

@VisibleForTesting
public void expireSnapshots(long olderThan, Set<String> exclude) {
void expireSnapshots(long mustOlderThan) {
expireSnapshots(mustOlderThan, expireSnapshotNeedToExcludeFiles());
}

private void expireSnapshots(long olderThan, Set<String> exclude) {
LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude);
final AtomicInteger toDeleteFiles = new AtomicInteger(0);
final AtomicInteger deleteFiles = new AtomicInteger(0);
Expand All @@ -178,10 +178,14 @@
.deleteWith(
file -> {
try {
String filePath = TableFileUtil.getUriPath(file);
if (!exclude.contains(filePath)
&& !exclude.contains(new Path(filePath).getParent().toString())) {
if (exclude.isEmpty()) {
arcticFileIO().deleteFile(file);
} else {
String fileUriPath = TableFileUtil.getUriPath(file);
if (!exclude.contains(fileUriPath)
&& !exclude.contains(new Path(fileUriPath).getParent().toString())) {
arcticFileIO().deleteFile(file);
}
}
parentDirectory.add(new Path(file).getParent().toString());
deleteFiles.incrementAndGet();
Expand Down Expand Up @@ -278,26 +282,20 @@
LOG.info("{} total delete {} dangling delete files", table.name(), danglingDeleteFilesCnt);
}

protected long olderThanSnapshotNeedToExpire(TableRuntime tableRuntime) {
long optimizingSnapshotTime = fetchOptimizingSnapshotTime(table, tableRuntime);
return olderThanSnapshotNeedToExpire(optimizingSnapshotTime);
protected long mustOlderThan(TableRuntime tableRuntime, long now) {
return min(

Check warning on line 286 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L286

Added line #L286 was not covered by tests
// The snapshots keep time
now - snapshotsKeepTime(tableRuntime),

Check warning on line 288 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L288

Added line #L288 was not covered by tests
// The snapshot optimizing plan based should not be expired for committing
fetchOptimizingPlanSnapshotTime(table, tableRuntime),

Check warning on line 290 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L290

Added line #L290 was not covered by tests
// The latest non-optimized snapshot should not be expired for data expiring
fetchLatestNonOptimizedSnapshotTime(table),

Check warning on line 292 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L292

Added line #L292 was not covered by tests
// The latest flink committed snapshot should not be expired for recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table));

Check warning on line 294 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L294

Added line #L294 was not covered by tests
}

protected long olderThanSnapshotNeedToExpire(long mustOlderThan) {
long baseSnapshotsKeepTime =
CompatiblePropertyUtil.propertyAsLong(
table.properties(),
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)
* 60
* 1000;
// Latest checkpoint of flink need retain. If Flink does not continuously commit new snapshots,
// it can lead to issues with table partitions not expiring.
long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table);
// Retain the latest non-optimized snapshot for remember the real latest update
long latestNonOptimizedTime = fetchLatestNonOptimizedSnapshotTime(table);
long olderThan = System.currentTimeMillis() - baseSnapshotsKeepTime;
return min(latestFlinkCommitTime, latestNonOptimizedTime, mustOlderThan, olderThan);
protected long snapshotsKeepTime(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().getSnapshotTTLMinutes() * 60 * 1000;
}

protected Set<String> expireSnapshotNeedToExcludeFiles() {
Expand Down Expand Up @@ -382,31 +380,28 @@
}

/**
* When committing a snapshot, Flink will write a checkpoint id into the snapshot summary. The
* latest snapshot with checkpoint id should not be expired or the flink job can't recover from
* state.
* When committing a snapshot, Flink will write a checkpoint id into the snapshot summary, which
* will be used when Flink job recovers from the checkpoint.
*
* @param table -
* @return commit time of snapshot with the latest flink checkpointId in summary
* @param table table
* @return commit time of snapshot with the latest flink checkpointId in summary, return
* Long.MAX_VALUE if not exist
*/
public static long fetchLatestFlinkCommittedSnapshotTime(Table table) {
long latestCommitTime = Long.MAX_VALUE;
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.summary().containsKey(FLINK_MAX_COMMITTED_CHECKPOINT_ID)) {
latestCommitTime = snapshot.timestampMillis();
}
}
return latestCommitTime;
Snapshot snapshot = findLatestSnapshotContainsKey(table, FLINK_MAX_COMMITTED_CHECKPOINT_ID);
return snapshot == null ? Long.MAX_VALUE : snapshot.timestampMillis();
}

/**
* When optimizing tasks are not committed, the snapshot with which it planned should not be
* expired, since it will use the snapshot to check conflict when committing.
* When the current optimizing process not committed, get the time of snapshot for optimizing
* process planned based. This snapshot will be used when optimizing process committing.
*
* @param table - table
* @return commit time of snapshot for optimizing
* @param table table
* @param tableRuntime table runtime
* @return time of snapshot for optimizing process planned based, return Long.MAX_VALUE if no
* optimizing process exists
*/
public static long fetchOptimizingSnapshotTime(Table table, TableRuntime tableRuntime) {
public static long fetchOptimizingPlanSnapshotTime(Table table, TableRuntime tableRuntime) {
if (tableRuntime.getOptimizingStatus().isProcessing()) {
long fromSnapshotId = tableRuntime.getOptimizingProcess().getTargetSnapshotId();

Expand All @@ -419,6 +414,16 @@
return Long.MAX_VALUE;
}

public static Snapshot findLatestSnapshotContainsKey(Table table, String summaryKey) {
Snapshot latestSnapshot = null;
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.summary().containsKey(summaryKey)) {
latestSnapshot = snapshot;
}
}
return latestSnapshot;
}

/**
* When expiring historic data and `data-expire.since` is `CURRENT_SNAPSHOT`, the latest snapshot
* should not be produced by Amoro.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package com.netease.arctic.server.optimizing.maintainer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import static com.netease.arctic.utils.ArcticTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST;
import static org.apache.iceberg.relocated.com.google.common.primitives.Longs.min;

import com.netease.arctic.IcebergFileEntry;
import com.netease.arctic.data.FileNameRules;
import com.netease.arctic.hive.utils.TableTypeUtil;
Expand All @@ -32,10 +33,8 @@
import com.netease.arctic.table.BaseTable;
import com.netease.arctic.table.ChangeTable;
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.ArcticTableUtil;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
Expand All @@ -46,9 +45,12 @@
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -134,6 +136,7 @@
baseMaintainer.expireSnapshots(tableRuntime);
}

@VisibleForTesting
protected void expireSnapshots(long mustOlderThan) {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots(mustOlderThan);
Expand Down Expand Up @@ -315,43 +318,46 @@
}

@Override
public void expireSnapshots(TableRuntime tableRuntime) {
expireSnapshots(Long.MAX_VALUE);
@VisibleForTesting
void expireSnapshots(long mustOlderThan) {
expireFiles(mustOlderThan);
super.expireSnapshots(mustOlderThan);
}

@Override
public void expireSnapshots(long mustOlderThan) {
long changeTTLPoint = getChangeTTLPoint();
expireFiles(Longs.min(getChangeTTLPoint(), mustOlderThan));
super.expireSnapshots(Longs.min(changeTTLPoint, mustOlderThan));
public void expireSnapshots(TableRuntime tableRuntime) {
if (!expireSnapshotEnabled(tableRuntime)) {
return;

Check warning on line 330 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L330

Added line #L330 was not covered by tests
}
long now = System.currentTimeMillis();
expireFiles(now - snapshotsKeepTime(tableRuntime));
expireSnapshots(mustOlderThan(tableRuntime, now));
}

@Override
protected long olderThanSnapshotNeedToExpire(long mustOlderThan) {
long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unkeyedTable);
return Longs.min(latestChangeFlinkCommitTime, mustOlderThan);
protected long mustOlderThan(TableRuntime tableRuntime, long now) {
return min(
// The change data ttl time
now - snapshotsKeepTime(tableRuntime),
// The latest flink committed snapshot should not be expired for recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table));
}

@Override
protected Set<String> expireSnapshotNeedToExcludeFiles() {
return Sets.union(baseFiles, hiveFiles);
}

@Override
protected long snapshotsKeepTime(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().getChangeDataTTLMinutes() * 60 * 1000;
}

public void expireFiles(long ttlPoint) {
List<IcebergFileEntry> expiredDataFileEntries = getExpiredDataFileEntries(ttlPoint);
deleteChangeFile(expiredDataFileEntries);
}

private long getChangeTTLPoint() {
return System.currentTimeMillis()
- CompatiblePropertyUtil.propertyAsLong(
unkeyedTable.properties(),
TableProperties.CHANGE_DATA_TTL,
TableProperties.CHANGE_DATA_TTL_DEFAULT)
* 60
* 1000;
}

private List<IcebergFileEntry> getExpiredDataFileEntries(long ttlPoint) {
TableEntriesScan entriesScan =
TableEntriesScan.builder(unkeyedTable).includeFileContent(FileContent.DATA).build();
Expand Down Expand Up @@ -473,6 +479,40 @@
return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles));
}

@Override
protected long mustOlderThan(TableRuntime tableRuntime, long now) {
return min(
// The snapshots keep time for base store
now - snapshotsKeepTime(tableRuntime),
// The snapshot optimizing plan based should not be expired for committing
fetchOptimizingPlanSnapshotTime(table, tableRuntime),
// The latest non-optimized snapshot should not be expired for data expiring
fetchLatestNonOptimizedSnapshotTime(table),
// The latest flink committed snapshot should not be expired for recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table),
// The latest snapshot contains the optimized sequence should not be expired for MOR
fetchLatestOptimizedSequenceSnapshotTime(table));
}

/**
* When committing a snapshot to the base store of mixed format keyed table, it will store the
* optimized sequence to the snapshot, and will set a flag to the summary of this snapshot.
*
* <p>The optimized sequence will affect the correctness of MOR.
*
* @param table table
* @return time of the latest snapshot with the optimized sequence flag in summary
*/
private long fetchLatestOptimizedSequenceSnapshotTime(Table table) {
if (arcticTable.isKeyedTable()) {
Snapshot snapshot =
findLatestSnapshotContainsKey(table, BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST);
return snapshot == null ? Long.MAX_VALUE : snapshot.timestampMillis();
} else {
return Long.MAX_VALUE;
}
}

@Override
protected Set<String> expireSnapshotNeedToExcludeFiles() {
return Sets.union(changeFiles, hiveFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package com.netease.arctic.server.utils;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.netease.arctic.IcebergFileEntry;
import com.netease.arctic.scan.TableEntriesScan;
import com.netease.arctic.server.ArcticServiceConstants;
Expand All @@ -39,6 +36,9 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Optional;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Loading