Skip to content

Commit

Permalink
[AMORO-1720] 0.6.x Fix Mixed Format KeyedTable expiring all the snaps…
Browse files Browse the repository at this point in the history
…hots with optimized sequence (#2456)

* [AMORO-1720] Fix Mixed Format KeyedTable expiring all the snapshots with optimized sequence (#2394)

* should not expire the latest snapshot contains optimized sequence

* add visible for testing

* add fetchLatestNonOptimizedSnapshotTime for base store

* get hive locations return the uri path

* refactor codes and fix comments

* improve for exclude files is empty for expring snapshots

---------

Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>

* revert changes introduced from #2328

---------

Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
  • Loading branch information
wangtaohz and zhoujinsong authored Dec 21, 2023
1 parent adbc918 commit 564e7fb
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.server.utils.IcebergTableUtil;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.TableFileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DeleteFile;
Expand Down Expand Up @@ -106,21 +104,23 @@ public void cleanOrphanFiles(TableRuntime tableRuntime) {
}

public void expireSnapshots(TableRuntime tableRuntime) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();
if (!tableConfiguration.isExpireSnapshotEnabled()) {
if (!expireSnapshotEnabled(tableRuntime)) {
return;
}
expireSnapshots(
olderThanSnapshotNeedToExpire(tableRuntime), expireSnapshotNeedToExcludeFiles());
expireSnapshots(mustOlderThan(tableRuntime, System.currentTimeMillis()));
}

public void expireSnapshots(long mustOlderThan) {
expireSnapshots(
olderThanSnapshotNeedToExpire(mustOlderThan), expireSnapshotNeedToExcludeFiles());
protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();
return tableConfiguration.isExpireSnapshotEnabled();
}

@VisibleForTesting
public void expireSnapshots(long olderThan, Set<String> exclude) {
void expireSnapshots(long mustOlderThan) {
expireSnapshots(mustOlderThan, expireSnapshotNeedToExcludeFiles());
}

private void expireSnapshots(long olderThan, Set<String> exclude) {
LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude);
final AtomicInteger toDeleteFiles = new AtomicInteger(0);
final AtomicInteger deleteFiles = new AtomicInteger(0);
Expand All @@ -132,10 +132,14 @@ public void expireSnapshots(long olderThan, Set<String> exclude) {
.deleteWith(
file -> {
try {
String filePath = TableFileUtil.getUriPath(file);
if (!exclude.contains(filePath)
&& !exclude.contains(new Path(filePath).getParent().toString())) {
if (exclude.isEmpty()) {
arcticFileIO().deleteFile(file);
} else {
String fileUriPath = TableFileUtil.getUriPath(file);
if (!exclude.contains(fileUriPath)
&& !exclude.contains(new Path(fileUriPath).getParent().toString())) {
arcticFileIO().deleteFile(file);
}
}
parentDirectory.add(new Path(file).getParent().toString());
deleteFiles.incrementAndGet();
Expand Down Expand Up @@ -176,24 +180,18 @@ protected void cleanDanglingDeleteFiles() {
LOG.info("{} total delete {} dangling delete files", table.name(), danglingDeleteFilesCnt);
}

protected long olderThanSnapshotNeedToExpire(TableRuntime tableRuntime) {
long optimizingSnapshotTime = fetchOptimizingSnapshotTime(table, tableRuntime);
return olderThanSnapshotNeedToExpire(optimizingSnapshotTime);
protected long mustOlderThan(TableRuntime tableRuntime, long now) {
return min(
// The snapshots keep time
now - snapshotsKeepTime(tableRuntime),
// The snapshot optimizing plan based should not be expired for committing
fetchOptimizingPlanSnapshotTime(table, tableRuntime),
// The latest flink committed snapshot should not be expired for recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table));
}

protected long olderThanSnapshotNeedToExpire(long mustOlderThan) {
long baseSnapshotsKeepTime =
CompatiblePropertyUtil.propertyAsLong(
table.properties(),
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)
* 60
* 1000;
// Latest checkpoint of flink need retain. If Flink does not continuously commit new snapshots,
// it can lead to issues with table partitions not expiring.
long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table);
long olderThan = System.currentTimeMillis() - baseSnapshotsKeepTime;
return min(latestFlinkCommitTime, mustOlderThan, olderThan);
protected long snapshotsKeepTime(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().getSnapshotTTLMinutes() * 60 * 1000;
}

protected Set<String> expireSnapshotNeedToExcludeFiles() {
Expand Down Expand Up @@ -278,31 +276,28 @@ private int clearInternalTableDanglingDeleteFiles() {
}

/**
* When committing a snapshot, Flink will write a checkpoint id into the snapshot summary. The
* latest snapshot with checkpoint id should not be expired or the flink job can't recover from
* state.
* When committing a snapshot, Flink will write a checkpoint id into the snapshot summary, which
* will be used when Flink job recovers from the checkpoint.
*
* @param table -
* @return commit time of snapshot with the latest flink checkpointId in summary
* @param table table
* @return commit time of snapshot with the latest flink checkpointId in summary, return
* Long.MAX_VALUE if not exist
*/
public static long fetchLatestFlinkCommittedSnapshotTime(Table table) {
long latestCommitTime = Long.MAX_VALUE;
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.summary().containsKey(FLINK_MAX_COMMITTED_CHECKPOINT_ID)) {
latestCommitTime = snapshot.timestampMillis();
}
}
return latestCommitTime;
Snapshot snapshot = findLatestSnapshotContainsKey(table, FLINK_MAX_COMMITTED_CHECKPOINT_ID);
return snapshot == null ? Long.MAX_VALUE : snapshot.timestampMillis();
}

/**
* When optimizing tasks are not committed, the snapshot with which it planned should not be
* expired, since it will use the snapshot to check conflict when committing.
* When the current optimizing process not committed, get the time of snapshot for optimizing
* process planned based. This snapshot will be used when optimizing process committing.
*
* @param table - table
* @return commit time of snapshot for optimizing
* @param table table
* @param tableRuntime table runtime
* @return time of snapshot for optimizing process planned based, return Long.MAX_VALUE if no
* optimizing process exists
*/
public static long fetchOptimizingSnapshotTime(Table table, TableRuntime tableRuntime) {
public static long fetchOptimizingPlanSnapshotTime(Table table, TableRuntime tableRuntime) {
if (tableRuntime.getOptimizingStatus().isProcessing()) {
long fromSnapshotId = tableRuntime.getOptimizingProcess().getTargetSnapshotId();

Expand All @@ -315,6 +310,16 @@ public static long fetchOptimizingSnapshotTime(Table table, TableRuntime tableRu
return Long.MAX_VALUE;
}

public static Snapshot findLatestSnapshotContainsKey(Table table, String summaryKey) {
Snapshot latestSnapshot = null;
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.summary().containsKey(summaryKey)) {
latestSnapshot = snapshot;
}
}
return latestSnapshot;
}

private static int deleteInvalidFilesInFs(
SupportsFileSystemOperations fio, String location, long lastTime, Set<String> excludes) {
if (!fio.exists(location)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package com.netease.arctic.server.optimizing.maintainer;

import static com.netease.arctic.utils.ArcticTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST;
import static org.apache.iceberg.relocated.com.google.common.primitives.Longs.min;

import com.netease.arctic.IcebergFileEntry;
import com.netease.arctic.data.FileNameRules;
import com.netease.arctic.hive.utils.TableTypeUtil;
Expand All @@ -29,20 +32,19 @@
import com.netease.arctic.table.BaseTable;
import com.netease.arctic.table.ChangeTable;
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.ArcticTableUtil;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -120,6 +122,7 @@ public void expireSnapshots(TableRuntime tableRuntime) {
baseMaintainer.expireSnapshots(tableRuntime);
}

@VisibleForTesting
protected void expireSnapshots(long mustOlderThan) {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots(mustOlderThan);
Expand Down Expand Up @@ -173,43 +176,46 @@ public Set<String> orphanFileCleanNeedToExcludeFiles() {
}

@Override
public void expireSnapshots(TableRuntime tableRuntime) {
expireSnapshots(Long.MAX_VALUE);
@VisibleForTesting
void expireSnapshots(long mustOlderThan) {
expireFiles(mustOlderThan);
super.expireSnapshots(mustOlderThan);
}

@Override
public void expireSnapshots(long mustOlderThan) {
long changeTTLPoint = getChangeTTLPoint();
expireFiles(Longs.min(getChangeTTLPoint(), mustOlderThan));
super.expireSnapshots(Longs.min(changeTTLPoint, mustOlderThan));
public void expireSnapshots(TableRuntime tableRuntime) {
if (!expireSnapshotEnabled(tableRuntime)) {
return;
}
long now = System.currentTimeMillis();
expireFiles(now - snapshotsKeepTime(tableRuntime));
expireSnapshots(mustOlderThan(tableRuntime, now));
}

@Override
protected long olderThanSnapshotNeedToExpire(long mustOlderThan) {
long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unkeyedTable);
return Longs.min(latestChangeFlinkCommitTime, mustOlderThan);
protected long mustOlderThan(TableRuntime tableRuntime, long now) {
return min(
// The change data ttl time
now - snapshotsKeepTime(tableRuntime),
// The latest flink committed snapshot should not be expired for recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table));
}

@Override
protected Set<String> expireSnapshotNeedToExcludeFiles() {
return Sets.union(baseFiles, hiveFiles);
}

@Override
protected long snapshotsKeepTime(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().getChangeDataTTLMinutes() * 60 * 1000;
}

public void expireFiles(long ttlPoint) {
List<IcebergFileEntry> expiredDataFileEntries = getExpiredDataFileEntries(ttlPoint);
deleteChangeFile(expiredDataFileEntries);
}

private long getChangeTTLPoint() {
return System.currentTimeMillis()
- CompatiblePropertyUtil.propertyAsLong(
unkeyedTable.properties(),
TableProperties.CHANGE_DATA_TTL,
TableProperties.CHANGE_DATA_TTL_DEFAULT)
* 60
* 1000;
}

private List<IcebergFileEntry> getExpiredDataFileEntries(long ttlPoint) {
TableEntriesScan entriesScan =
TableEntriesScan.builder(unkeyedTable).includeFileContent(FileContent.DATA).build();
Expand Down Expand Up @@ -331,6 +337,38 @@ public Set<String> orphanFileCleanNeedToExcludeFiles() {
return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles));
}

@Override
protected long mustOlderThan(TableRuntime tableRuntime, long now) {
return min(
// The snapshots keep time for base store
now - snapshotsKeepTime(tableRuntime),
// The snapshot optimizing plan based should not be expired for committing
fetchOptimizingPlanSnapshotTime(table, tableRuntime),
// The latest flink committed snapshot should not be expired for recovering flink job
fetchLatestFlinkCommittedSnapshotTime(table),
// The latest snapshot contains the optimized sequence should not be expired for MOR
fetchLatestOptimizedSequenceSnapshotTime(table));
}

/**
* When committing a snapshot to the base store of mixed format keyed table, it will store the
* optimized sequence to the snapshot, and will set a flag to the summary of this snapshot.
*
* <p>The optimized sequence will affect the correctness of MOR.
*
* @param table table
* @return time of the latest snapshot with the optimized sequence flag in summary
*/
private long fetchLatestOptimizedSequenceSnapshotTime(Table table) {
if (arcticTable.isKeyedTable()) {
Snapshot snapshot =
findLatestSnapshotContainsKey(table, BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST);
return snapshot == null ? Long.MAX_VALUE : snapshot.timestampMillis();
} else {
return Long.MAX_VALUE;
}
}

@Override
protected Set<String> expireSnapshotNeedToExcludeFiles() {
return Sets.union(changeFiles, hiveFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.netease.arctic.server.optimizing.maintainer;

import static com.netease.arctic.server.optimizing.maintainer.IcebergTableMaintainer.FLINK_MAX_COMMITTED_CHECKPOINT_ID;
import static com.netease.arctic.utils.ArcticTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST;

import com.netease.arctic.BasicTableTestHelper;
import com.netease.arctic.TableTestHelper;
Expand Down Expand Up @@ -132,7 +133,7 @@ public void testExpireChangeTableFiles() {
private void writeOptimizedSequence(
KeyedTable testKeyedTable, StructLikeMap<Long> optimizedSequence) {
BaseTable baseTable = testKeyedTable.baseTable();
baseTable.newAppend().set(ArcticTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true").commit();
baseTable.newAppend().set(BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true").commit();
Snapshot snapshot = baseTable.currentSnapshot();
StatisticsFile statisticsFile =
StatisticsFileUtil.writerBuilder(baseTable)
Expand Down Expand Up @@ -253,7 +254,47 @@ public void testNotExpireFlinkLatestCommit4All() {

Assert.assertEquals(4, Iterables.size(table.snapshots()));

MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(table);
MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(getArcticTable());
tableMaintainer.expireSnapshots(tableRuntime);

Assert.assertEquals(2, Iterables.size(table.snapshots()));
List<Snapshot> expectedSnapshots = new ArrayList<>();
expectedSnapshots.add(checkpointTime2Snapshot);
expectedSnapshots.add(lastSnapshot);
Assert.assertTrue(
Iterators.elementsEqual(expectedSnapshots.iterator(), table.snapshots().iterator()));
}

@Test
public void testNotExpireOptimizedSequenceCommit4All() {
Assume.assumeTrue(isKeyedTable());
BaseTable table = getArcticTable().asKeyedTable().baseTable();
writeAndCommitBaseStore(table);

AppendFiles appendFiles = table.newAppend();
appendFiles.set(BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true");
appendFiles.commit();

AppendFiles appendFiles2 = table.newAppend();
appendFiles2.set(BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true");
appendFiles2.commit();
Snapshot checkpointTime2Snapshot = table.currentSnapshot();

writeAndCommitBaseStore(table);
Snapshot lastSnapshot = table.currentSnapshot();

table.updateProperties().set(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, "0").commit();
TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
Mockito.when(tableRuntime.getTableIdentifier())
.thenReturn(
ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id()), getTestFormat()));
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
Mockito.when(tableRuntime.getTableConfiguration())
.thenReturn(TableConfiguration.parseConfig(table.properties()));

Assert.assertEquals(4, Iterables.size(table.snapshots()));

MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(getArcticTable());
tableMaintainer.expireSnapshots(tableRuntime);

Assert.assertEquals(2, Iterables.size(table.snapshots()));
Expand Down

0 comments on commit 564e7fb

Please sign in to comment.