Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARCTIC-1190][AMS] Table expire service should not expire snapshot which optimizing planned with #1193

Merged
merged 9 commits into from
Mar 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.netease.arctic.utils.CompatiblePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.ibatis.session.SqlSession;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -190,8 +191,9 @@ private void clearTableCache(TableIdentifier tableIdentifier) {
}
}

private void addTableIntoCache(TableOptimizeItem arcticTableItem, Map<String, String> properties,
boolean persistRuntime) {
@VisibleForTesting
void addTableIntoCache(TableOptimizeItem arcticTableItem, Map<String, String> properties,
boolean persistRuntime) {
cachedTables.put(arcticTableItem.getTableIdentifier(), arcticTableItem);
try {
String groupName = CompatiblePropertyUtil.propertyAsString(properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package com.netease.arctic.ams.server.service.impl;

import com.netease.arctic.ams.api.DataFileInfo;
import com.netease.arctic.ams.api.NoSuchObjectException;
import com.netease.arctic.ams.server.optimize.TableOptimizeItem;
import com.netease.arctic.ams.server.service.ITableExpireService;
import com.netease.arctic.ams.server.service.ServiceContainer;
import com.netease.arctic.ams.server.utils.CatalogUtil;
Expand Down Expand Up @@ -143,7 +145,12 @@ public static void expireArcticTable(ArcticTable arcticTable) {
Set<String> baseExcludePaths = UnKeyedTableUtil.getAllContentFilePath(changeTable);
baseExcludePaths.addAll(finalHiveLocations);
long latestBaseFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(baseTable);
expireSnapshots(baseTable, Math.min(latestBaseFlinkCommitTime, startTime - baseSnapshotsKeepTime),
long optimizingSnapshotTime = fetchOptimizingSnapshotTime(baseTable);
long baseOlderThan = startTime - baseSnapshotsKeepTime;
LOG.info("{} base table expire with latestFlinkCommitTime={}, optimizingSnapshotTime={}, olderThan={}",
arcticTable.id(), latestBaseFlinkCommitTime, optimizingSnapshotTime, baseOlderThan);
expireSnapshots(baseTable,
min(latestBaseFlinkCommitTime, optimizingSnapshotTime, baseOlderThan),
baseExcludePaths);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("{} base expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime);
Expand All @@ -159,16 +166,24 @@ public static void expireArcticTable(ArcticTable arcticTable) {
Set<String> changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable);
changeExclude.addAll(finalHiveLocations);
long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(changeTable);
expireSnapshots(changeTable, Math.min(latestChangeFlinkCommitTime, startTime - changeSnapshotsKeepTime),
long changeOlderThan = startTime - changeSnapshotsKeepTime;
LOG.info("{} change table expire with latestFlinkCommitTime={}, olderThan={}", arcticTable.id(),
latestChangeFlinkCommitTime, changeOlderThan);
expireSnapshots(changeTable,
Math.min(latestChangeFlinkCommitTime, changeOlderThan),
changeExclude);
return null;
});
LOG.info("{} expire cost total {} ms", arcticTable.id(),
System.currentTimeMillis() - startTime);
LOG.info("{} expire cost total {} ms", arcticTable.id(), System.currentTimeMillis() - startTime);
} else {
UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable();
long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unKeyedArcticTable);
expireSnapshots(unKeyedArcticTable, Math.min(latestFlinkCommitTime, startTime - baseSnapshotsKeepTime),
long optimizingSnapshotTime = fetchOptimizingSnapshotTime(unKeyedArcticTable);
long olderThan = startTime - baseSnapshotsKeepTime;
LOG.info("{} unKeyedTable expire with latestFlinkCommitTime={}, optimizingSnapshotTime={}, olderThan={}",
arcticTable.id(), latestFlinkCommitTime, optimizingSnapshotTime, olderThan);
expireSnapshots(unKeyedArcticTable,
min(latestFlinkCommitTime, optimizingSnapshotTime, olderThan),
hiveLocations);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("{} unKeyedTable expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime);
Expand All @@ -192,6 +207,34 @@ public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) {
return latestCommitTime;
}

/**
* When optimizing tasks are not committed, the snapshot with which it planned should not be expired, since
* it will use the snapshot to check conflict when committing.
*
* @param table - table
* @return commit time of snapshot for optimizing
*/
public static long fetchOptimizingSnapshotTime(UnkeyedTable table) {
try {
TableOptimizeItem tableOptimizeItem = ServiceContainer.getOptimizeService().getTableOptimizeItem(table.id());
if (!tableOptimizeItem.getOptimizeTasks().isEmpty()) {
long currentSnapshotId = tableOptimizeItem.getTableOptimizeRuntime().getCurrentSnapshotId();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.snapshotId() == currentSnapshotId) {
return snapshot.timestampMillis();
}
}
}
return Long.MAX_VALUE;
} catch (NoSuchObjectException e) {
return Long.MAX_VALUE;
}
}

public static long min(long a, long b, long c) {
return Math.min(Math.min(a, b), c);
}

public static void deleteChangeFile(KeyedTable keyedTable, List<DataFileInfo> changeDataFiles) {
if (CollectionUtils.isEmpty(changeDataFiles)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import com.netease.arctic.TableTestBase;
import com.netease.arctic.ams.api.DataFileInfo;
import com.netease.arctic.ams.api.OptimizeTaskId;
import com.netease.arctic.ams.server.model.BasicOptimizeTask;
import com.netease.arctic.ams.server.model.TableMetadata;
import com.netease.arctic.ams.server.service.ServiceContainer;
import com.netease.arctic.ams.server.service.impl.TableExpireService;
import com.netease.arctic.ams.server.util.DataFileInfoUtils;
import com.netease.arctic.ams.server.utils.UnKeyedTableUtil;
Expand Down Expand Up @@ -48,6 +52,7 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -155,6 +160,42 @@ public void testNotExpireFlinkLatestCommit() throws IOException {
Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots()));
}

@Test
public void testNotExpireOptimizeCommit() {
// commit snapshot
testTable.newAppend().commit();
testTable.newAppend().commit();
testTable.updateProperties().set(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, "0").commit();
TableExpireService.expireArcticTable(testTable);
Assert.assertEquals(1, Iterables.size(testTable.snapshots()));


testTable.newAppend().commit();

// init optimize tasks
long optimizeSnapshotId = testTable.currentSnapshot().snapshotId();
OptimizeService optimizeService = (OptimizeService) ServiceContainer.getOptimizeService();
TableMetadata metadata = new TableMetadata();
metadata.setTableIdentifier(testTable.id());
metadata.setProperties(testTable.properties());
TableOptimizeItem tableOptimizeItem =
new TableOptimizeItem(testTable, metadata, System.currentTimeMillis() + 6 * 60 * 60 * 1000);
optimizeService.addTableIntoCache(tableOptimizeItem, Collections.emptyMap(), false);
BasicOptimizeTask basicOptimizeTask = new BasicOptimizeTask();
basicOptimizeTask.setTaskId(new OptimizeTaskId());
tableOptimizeItem.initOptimizeTasks(Collections.singletonList(new OptimizeTaskItem(basicOptimizeTask, null)));
tableOptimizeItem.getTableOptimizeRuntime().setCurrentSnapshotId(optimizeSnapshotId);

testTable.newAppend().commit();
testTable.newAppend().commit();

TableExpireService.expireArcticTable(testTable);
Assert.assertEquals(3, Iterables.size(testTable.snapshots()));



}

private List<DataFile> insertChangeDataFiles(long transactionId) throws IOException {
GenericChangeTaskWriter writer = GenericTaskWriters.builderFor(testKeyedTable)
.withChangeAction(ChangeAction.INSERT)
Expand Down