Skip to content

Commit

Permalink
[ARCTIC-1190][AMS] Table expire service should not expire snapshot wh…
Browse files Browse the repository at this point in the history
…ich optimizing planned with (#1193)

* the last snapshot with flink.max-committed-checkpoint-id should not be expired

* 1.refactor TableExpireServicec for unit test
2.refactor method fetchLatestFlinkCommittedSnapshotTime

* remove the use of traceId
change to hiveLocations
remove baseTable == null, changeTable == null
change to baseExcludePaths
add some comment for fetchLatestFlinkCommittedSnapshotTime
import a static variable for flink.max-committed-checkpoint-id

* table expire should not expire snapshot for optimizing plan

* add some comment for optimizer
  • Loading branch information
wangtaohz authored Mar 6, 2023
1 parent 4c28850 commit 2f2d4c8
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.netease.arctic.utils.CompatiblePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.ibatis.session.SqlSession;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -190,8 +191,9 @@ private void clearTableCache(TableIdentifier tableIdentifier) {
}
}

private void addTableIntoCache(TableOptimizeItem arcticTableItem, Map<String, String> properties,
boolean persistRuntime) {
@VisibleForTesting
void addTableIntoCache(TableOptimizeItem arcticTableItem, Map<String, String> properties,
boolean persistRuntime) {
cachedTables.put(arcticTableItem.getTableIdentifier(), arcticTableItem);
try {
String groupName = CompatiblePropertyUtil.propertyAsString(properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package com.netease.arctic.ams.server.service.impl;

import com.netease.arctic.ams.api.DataFileInfo;
import com.netease.arctic.ams.api.NoSuchObjectException;
import com.netease.arctic.ams.server.optimize.TableOptimizeItem;
import com.netease.arctic.ams.server.service.ITableExpireService;
import com.netease.arctic.ams.server.service.ServiceContainer;
import com.netease.arctic.ams.server.utils.CatalogUtil;
Expand Down Expand Up @@ -143,7 +145,12 @@ public static void expireArcticTable(ArcticTable arcticTable) {
Set<String> baseExcludePaths = UnKeyedTableUtil.getAllContentFilePath(changeTable);
baseExcludePaths.addAll(finalHiveLocations);
long latestBaseFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(baseTable);
expireSnapshots(baseTable, Math.min(latestBaseFlinkCommitTime, startTime - baseSnapshotsKeepTime),
long optimizingSnapshotTime = fetchOptimizingSnapshotTime(baseTable);
long baseOlderThan = startTime - baseSnapshotsKeepTime;
LOG.info("{} base table expire with latestFlinkCommitTime={}, optimizingSnapshotTime={}, olderThan={}",
arcticTable.id(), latestBaseFlinkCommitTime, optimizingSnapshotTime, baseOlderThan);
expireSnapshots(baseTable,
min(latestBaseFlinkCommitTime, optimizingSnapshotTime, baseOlderThan),
baseExcludePaths);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("{} base expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime);
Expand All @@ -159,16 +166,24 @@ public static void expireArcticTable(ArcticTable arcticTable) {
Set<String> changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable);
changeExclude.addAll(finalHiveLocations);
long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(changeTable);
expireSnapshots(changeTable, Math.min(latestChangeFlinkCommitTime, startTime - changeSnapshotsKeepTime),
long changeOlderThan = startTime - changeSnapshotsKeepTime;
LOG.info("{} change table expire with latestFlinkCommitTime={}, olderThan={}", arcticTable.id(),
latestChangeFlinkCommitTime, changeOlderThan);
expireSnapshots(changeTable,
Math.min(latestChangeFlinkCommitTime, changeOlderThan),
changeExclude);
return null;
});
LOG.info("{} expire cost total {} ms", arcticTable.id(),
System.currentTimeMillis() - startTime);
LOG.info("{} expire cost total {} ms", arcticTable.id(), System.currentTimeMillis() - startTime);
} else {
UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable();
long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unKeyedArcticTable);
expireSnapshots(unKeyedArcticTable, Math.min(latestFlinkCommitTime, startTime - baseSnapshotsKeepTime),
long optimizingSnapshotTime = fetchOptimizingSnapshotTime(unKeyedArcticTable);
long olderThan = startTime - baseSnapshotsKeepTime;
LOG.info("{} unKeyedTable expire with latestFlinkCommitTime={}, optimizingSnapshotTime={}, olderThan={}",
arcticTable.id(), latestFlinkCommitTime, optimizingSnapshotTime, olderThan);
expireSnapshots(unKeyedArcticTable,
min(latestFlinkCommitTime, optimizingSnapshotTime, olderThan),
hiveLocations);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("{} unKeyedTable expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime);
Expand All @@ -192,6 +207,34 @@ public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) {
return latestCommitTime;
}

/**
* When optimizing tasks are not committed, the snapshot with which it planned should not be expired, since
* it will use the snapshot to check conflict when committing.
*
* @param table - table
* @return commit time of snapshot for optimizing
*/
public static long fetchOptimizingSnapshotTime(UnkeyedTable table) {
try {
TableOptimizeItem tableOptimizeItem = ServiceContainer.getOptimizeService().getTableOptimizeItem(table.id());
if (!tableOptimizeItem.getOptimizeTasks().isEmpty()) {
long currentSnapshotId = tableOptimizeItem.getTableOptimizeRuntime().getCurrentSnapshotId();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.snapshotId() == currentSnapshotId) {
return snapshot.timestampMillis();
}
}
}
return Long.MAX_VALUE;
} catch (NoSuchObjectException e) {
return Long.MAX_VALUE;
}
}

public static long min(long a, long b, long c) {
return Math.min(Math.min(a, b), c);
}

public static void deleteChangeFile(KeyedTable keyedTable, List<DataFileInfo> changeDataFiles) {
if (CollectionUtils.isEmpty(changeDataFiles)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import com.netease.arctic.TableTestBase;
import com.netease.arctic.ams.api.DataFileInfo;
import com.netease.arctic.ams.api.OptimizeTaskId;
import com.netease.arctic.ams.server.model.BasicOptimizeTask;
import com.netease.arctic.ams.server.model.TableMetadata;
import com.netease.arctic.ams.server.service.ServiceContainer;
import com.netease.arctic.ams.server.service.impl.TableExpireService;
import com.netease.arctic.ams.server.util.DataFileInfoUtils;
import com.netease.arctic.ams.server.utils.UnKeyedTableUtil;
Expand Down Expand Up @@ -48,6 +52,7 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -155,6 +160,42 @@ public void testNotExpireFlinkLatestCommit() throws IOException {
Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots()));
}

@Test
public void testNotExpireOptimizeCommit() {
// commit snapshot
testTable.newAppend().commit();
testTable.newAppend().commit();
testTable.updateProperties().set(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, "0").commit();
TableExpireService.expireArcticTable(testTable);
Assert.assertEquals(1, Iterables.size(testTable.snapshots()));


testTable.newAppend().commit();

// init optimize tasks
long optimizeSnapshotId = testTable.currentSnapshot().snapshotId();
OptimizeService optimizeService = (OptimizeService) ServiceContainer.getOptimizeService();
TableMetadata metadata = new TableMetadata();
metadata.setTableIdentifier(testTable.id());
metadata.setProperties(testTable.properties());
TableOptimizeItem tableOptimizeItem =
new TableOptimizeItem(testTable, metadata, System.currentTimeMillis() + 6 * 60 * 60 * 1000);
optimizeService.addTableIntoCache(tableOptimizeItem, Collections.emptyMap(), false);
BasicOptimizeTask basicOptimizeTask = new BasicOptimizeTask();
basicOptimizeTask.setTaskId(new OptimizeTaskId());
tableOptimizeItem.initOptimizeTasks(Collections.singletonList(new OptimizeTaskItem(basicOptimizeTask, null)));
tableOptimizeItem.getTableOptimizeRuntime().setCurrentSnapshotId(optimizeSnapshotId);

testTable.newAppend().commit();
testTable.newAppend().commit();

TableExpireService.expireArcticTable(testTable);
Assert.assertEquals(3, Iterables.size(testTable.snapshots()));



}

private List<DataFile> insertChangeDataFiles(long transactionId) throws IOException {
GenericChangeTaskWriter writer = GenericTaskWriters.builderFor(testKeyedTable)
.withChangeAction(ChangeAction.INSERT)
Expand Down

0 comments on commit 2f2d4c8

Please sign in to comment.