Skip to content

Commit

Permalink
[ARCTIC-930][AMS] Snapshot expire service should not expire snapshot …
Browse files Browse the repository at this point in the history
…with the lastest checkpoint id (#1178)

* the last snapshot with flink.max-committed-checkpoint-id should not be expired

* 1.refactor TableExpireServicec for unit test
2.refactor method fetchLatestFlinkCommittedSnapshotTime

* remove the use of traceId
change to hiveLocations
remove baseTable == null, changeTable == null
change to baseExcludePaths
add some comment for fetchLatestFlinkCommittedSnapshotTime
import a static variable for flink.max-committed-checkpoint-id

* add some comment for optimizer
  • Loading branch information
wangtaohz authored Mar 6, 2023
1 parent 0d632f0 commit fdac57e
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

public class SupportHiveSyncService implements ISupportHiveSyncService {
Expand Down Expand Up @@ -89,33 +88,32 @@ public static class SupportHiveSyncTask implements ScheduledTasks.Task {
@Override
public void run() {
long startTime = System.currentTimeMillis();
final String traceId = UUID.randomUUID().toString();
try {
LOG.info("[{}] {} start hive sync", traceId, tableIdentifier);
LOG.info("{} start hive sync", tableIdentifier);
ArcticCatalog catalog =
CatalogLoader.load(ServiceContainer.getTableMetastoreHandler(), tableIdentifier.getCatalog());
ArcticTable arcticTable = catalog.loadTable(tableIdentifier);
if (!TableTypeUtil.isHive(arcticTable)) {
LOG.debug("[{}] {} is not a support hive table", traceId, tableIdentifier);
LOG.debug("{} is not a support hive table", tableIdentifier);
return;
}

syncIcebergToHive(arcticTable, traceId);
syncIcebergToHive(arcticTable);
} catch (Exception e) {
LOG.error("[{}] {} hive sync failed", traceId, tableIdentifier, e);
LOG.error("{} hive sync failed", tableIdentifier, e);
} finally {
LOG.info("[{}] {} hive sync finished, cost {}ms", traceId, tableIdentifier,
LOG.info("{} hive sync finished, cost {}ms", tableIdentifier,
System.currentTimeMillis() - startTime);
}
}

public static void syncIcebergToHive(ArcticTable arcticTable, String traceId) throws Exception {
public static void syncIcebergToHive(ArcticTable arcticTable) throws Exception {
UnkeyedTable baseTable = arcticTable.isKeyedTable() ?
arcticTable.asKeyedTable().baseTable() : arcticTable.asUnkeyedTable();
StructLikeMap<Map<String, String>> partitionProperty = baseTable.partitionProperty();

if (arcticTable.spec().isUnpartitioned()) {
syncNoPartitionTable(arcticTable, partitionProperty, traceId);
syncNoPartitionTable(arcticTable, partitionProperty);
} else {
syncPartitionTable(arcticTable, partitionProperty);
}
Expand All @@ -126,11 +124,10 @@ public static void syncIcebergToHive(ArcticTable arcticTable, String traceId) th
* because only arctic update hive table location for unPartitioned table.
*/
private static void syncNoPartitionTable(ArcticTable arcticTable,
StructLikeMap<Map<String, String>> partitionProperty,
String traceId) {
StructLikeMap<Map<String, String>> partitionProperty) {
Map<String, String> property = partitionProperty.get(TablePropertyUtil.EMPTY_STRUCT);
if (property == null || property.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_HIVE_LOCATION) == null) {
LOG.debug("[{}] {} has no hive location in partition property", traceId, arcticTable.id());
LOG.debug("{} has no hive location in partition property", arcticTable.id());
return;
}

Expand All @@ -142,7 +139,7 @@ private static void syncNoPartitionTable(ArcticTable arcticTable,
return hiveTable.getSd().getLocation();
});
} catch (Exception e) {
LOG.error("[{}] {} get hive location failed", traceId, arcticTable.id(), e);
LOG.error("{} get hive location failed", arcticTable.id(), e);
return;
}

Expand All @@ -155,7 +152,7 @@ private static void syncNoPartitionTable(ArcticTable arcticTable,
return null;
});
} catch (Exception e) {
LOG.error("[{}] {} alter hive location failed", traceId, arcticTable.id(), e);
LOG.error("{} alter hive location failed", arcticTable.id(), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,13 +57,16 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class TableExpireService implements ITableExpireService {
private static final Logger LOG = LoggerFactory.getLogger(TableExpireService.class);
private static final long EXPIRE_INTERVAL = 3600_000; // 1 hour
/**
* the same with org.apache.iceberg.flink.sink.IcebergFilesCommitter#MAX_COMMITTED_CHECKPOINT_ID
*/
public static final String FLINK_MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";

private ScheduledTasks<TableIdentifier, TableExpireTask> cleanTasks;

Expand Down Expand Up @@ -90,10 +94,7 @@ public static class TableExpireTask implements ScheduledTasks.Task {

@Override
public void run() {
long startTime = System.currentTimeMillis();
final String traceId = UUID.randomUUID().toString();
try {
LOG.info("[{}] {} start expire", traceId, tableIdentifier);
ArcticCatalog catalog =
CatalogLoader.load(ServiceContainer.getTableMetastoreHandler(), tableIdentifier.getCatalog());
ArcticTable arcticTable = catalog.loadTable(tableIdentifier);
Expand All @@ -103,69 +104,92 @@ public void run() {
if (!needClean) {
return;
}
long changeDataTTL = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.CHANGE_DATA_TTL,
TableProperties.CHANGE_DATA_TTL_DEFAULT)) * 60 * 1000;
long baseSnapshotsKeepTime = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000;
long changeSnapshotsKeepTime = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES,
TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000;

Set<String> hiveLocation = new HashSet<>();
if (TableTypeUtil.isHive(arcticTable)) {
hiveLocation = HiveLocationUtils.getHiveLocation(arcticTable);
}
expireArcticTable(arcticTable);
} catch (Throwable t) {
LOG.error("unexpected expire error of table {} ", tableIdentifier, t);
}
}
}

if (arcticTable.isKeyedTable()) {
KeyedTable keyedArcticTable = arcticTable.asKeyedTable();
Set<String> finalHiveLocation = hiveLocation;
keyedArcticTable.io().doAs(() -> {
UnkeyedTable baseTable = keyedArcticTable.baseTable();
if (baseTable == null) {
LOG.warn("[{}] Base table is null: {} ", traceId, tableIdentifier);
return null;
}
UnkeyedTable changeTable = keyedArcticTable.changeTable();
if (changeTable == null) {
LOG.warn("[{}] Change table is null: {}", traceId, tableIdentifier);
return null;
}
public static void expireArcticTable(ArcticTable arcticTable) {
TableIdentifier tableIdentifier = arcticTable.id();
long startTime = System.currentTimeMillis();
LOG.info("{} start expire", tableIdentifier);

// get valid files in the change store which shouldn't physically delete when expire the snapshot
// in the base store
Set<String> baseExclude = UnKeyedTableUtil.getAllContentFilePath(changeTable);
baseExclude.addAll(finalHiveLocation);
expireSnapshots(baseTable, startTime - baseSnapshotsKeepTime, baseExclude);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("[{}] {} base expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime);

// delete ttl files
List<DataFileInfo> changeDataFiles = ServiceContainer.getFileInfoCacheService()
.getChangeTableTTLDataFiles(keyedArcticTable.id().buildTableIdentifier(),
System.currentTimeMillis() - changeDataTTL);
deleteChangeFile(keyedArcticTable, changeDataFiles);

// get valid files in the base store which shouldn't physically delete when expire the snapshot
// in the change store
Set<String> changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable);
changeExclude.addAll(finalHiveLocation);
expireSnapshots(changeTable, startTime - changeSnapshotsKeepTime, changeExclude);
return null;
});
LOG.info("[{}] {} expire cost total {} ms", traceId, arcticTable.id(),
System.currentTimeMillis() - startTime);
} else {
UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable();
expireSnapshots(unKeyedArcticTable, startTime - baseSnapshotsKeepTime, hiveLocation);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("[{}] {} unKeyedTable expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime);
}
} catch (Throwable t) {
LOG.error("[" + traceId + "] unexpected expire error of table " + tableIdentifier, t);
long changeDataTTL = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.CHANGE_DATA_TTL,
TableProperties.CHANGE_DATA_TTL_DEFAULT)) * 60 * 1000;
long baseSnapshotsKeepTime = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000;
long changeSnapshotsKeepTime = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES,
TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000;

Set<String> hiveLocations = new HashSet<>();
if (TableTypeUtil.isHive(arcticTable)) {
hiveLocations = HiveLocationUtils.getHiveLocation(arcticTable);
}

if (arcticTable.isKeyedTable()) {
KeyedTable keyedArcticTable = arcticTable.asKeyedTable();
Set<String> finalHiveLocations = hiveLocations;
keyedArcticTable.io().doAs(() -> {
UnkeyedTable baseTable = keyedArcticTable.baseTable();
UnkeyedTable changeTable = keyedArcticTable.changeTable();

// get valid files in the change store which shouldn't physically delete when expire the snapshot
// in the base store
Set<String> baseExcludePaths = UnKeyedTableUtil.getAllContentFilePath(changeTable);
baseExcludePaths.addAll(finalHiveLocations);
long latestBaseFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(baseTable);
expireSnapshots(baseTable, Math.min(latestBaseFlinkCommitTime, startTime - baseSnapshotsKeepTime),
baseExcludePaths);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("{} base expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime);

// delete ttl files
List<DataFileInfo> changeDataFiles = ServiceContainer.getFileInfoCacheService()
.getChangeTableTTLDataFiles(keyedArcticTable.id().buildTableIdentifier(),
System.currentTimeMillis() - changeDataTTL);
deleteChangeFile(keyedArcticTable, changeDataFiles);

// get valid files in the base store which shouldn't physically delete when expire the snapshot
// in the change store
Set<String> changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable);
changeExclude.addAll(finalHiveLocations);
long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(changeTable);
expireSnapshots(changeTable, Math.min(latestChangeFlinkCommitTime, startTime - changeSnapshotsKeepTime),
changeExclude);
return null;
});
LOG.info("{} expire cost total {} ms", arcticTable.id(),
System.currentTimeMillis() - startTime);
} else {
UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable();
long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unKeyedArcticTable);
expireSnapshots(unKeyedArcticTable, Math.min(latestFlinkCommitTime, startTime - baseSnapshotsKeepTime),
hiveLocations);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("{} unKeyedTable expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime);
}
}

/**
* When committing a snapshot, Flink will write a checkpoint id into the snapshot summary.
* The latest snapshot with checkpoint id should not be expired or the flink job can't recover from state.
*
* @param table -
* @return commit time of snapshot with the latest flink checkpointId in summary
*/
public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) {
long latestCommitTime = Long.MAX_VALUE;
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.summary().containsKey(FLINK_MAX_COMMITTED_CHECKPOINT_ID)) {
latestCommitTime = snapshot.timestampMillis();
}
}
return latestCommitTime;
}

public static void deleteChangeFile(KeyedTable keyedTable, List<DataFileInfo> changeDataFiles) {
Expand Down Expand Up @@ -229,7 +253,7 @@ public static void deleteChangeFile(KeyedTable keyedTable, List<DataFileInfo> ch
public static void expireSnapshots(UnkeyedTable arcticInternalTable,
long olderThan,
Set<String> exclude) {
LOG.debug("start expire snapshots, the exclude is {}", exclude);
LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude);
final AtomicInteger toDeleteFiles = new AtomicInteger(0);
final AtomicInteger deleteFiles = new AtomicInteger(0);
Set<String> parentDirectory = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,37 @@ public void testExpiredChangeTableFilesInBase() throws Exception {
Assert.assertFalse(testKeyedTable.io().exists((String) s1Files.get(1).path()));
}

@Test
public void testNotExpireFlinkLatestCommit() throws IOException {
insertChangeDataFiles(1);
insertChangeDataFiles(2);
Assert.assertEquals(Long.MAX_VALUE,
TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable()));

AppendFiles appendFiles = testKeyedTable.changeTable().newAppend();
appendFiles.set(TableExpireService.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "100");
appendFiles.commit();
long checkpointTime = testKeyedTable.changeTable().currentSnapshot().timestampMillis();
Assert.assertEquals(checkpointTime,
TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable()));

AppendFiles appendFiles2 = testKeyedTable.changeTable().newAppend();
appendFiles2.set(TableExpireService.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "101");
appendFiles2.commit();
long checkpointTime2 = testKeyedTable.changeTable().currentSnapshot().timestampMillis();
Assert.assertEquals(checkpointTime2,
TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable()));

insertChangeDataFiles(2);
Assert.assertEquals(checkpointTime2,
TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable()));

testKeyedTable.updateProperties().set(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, "0").commit();
TableExpireService.expireArcticTable(testKeyedTable);

Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots()));
}

private List<DataFile> insertChangeDataFiles(long transactionId) throws IOException {
GenericChangeTaskWriter writer = GenericTaskWriters.builderFor(testKeyedTable)
.withChangeAction(ChangeAction.INSERT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testUnPartitionTableSyncInIceberg() throws Exception {
});
Assert.assertNotEquals(newLocation, hiveLocation);

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable);
hiveLocation = ((SupportHive) testUnPartitionKeyedHiveTable).getHMSClient().run(client -> {
Table hiveTable = client.getTable(testUnPartitionKeyedHiveTable.id().getDatabase(),
testUnPartitionKeyedHiveTable.id().getTableName());
Expand All @@ -85,7 +85,7 @@ public void testUnPartitionTableSyncNotInIceberg() throws Exception {
return hiveTable.getSd().getLocation();
});

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable);
String newHiveLocation = ((SupportHive) testUnPartitionKeyedHiveTable).getHMSClient().run(client -> {
Table hiveTable = client.getTable(testUnPartitionKeyedHiveTable.id().getDatabase(),
testUnPartitionKeyedHiveTable.id().getTableName());
Expand All @@ -110,7 +110,7 @@ public void testSyncOnlyInIceberg() throws Exception {
client.getPartition(testKeyedHiveTable.id().getDatabase(),
testKeyedHiveTable.id().getTableName(), partitionValues)));

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable);
Partition hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client ->
client.getPartition(testKeyedHiveTable.id().getDatabase(),
testKeyedHiveTable.id().getTableName(), partitionValues));
Expand Down Expand Up @@ -160,7 +160,7 @@ public void testSyncOnlyInHiveCreateByArctic() throws Exception {
testKeyedHiveTable.id().getTableName(), partitionValues));
Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation());

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable);

Assert.assertThrows(NoSuchObjectException.class, () -> ((SupportHive) testKeyedHiveTable).getHMSClient().run(client ->
client.getPartition(testKeyedHiveTable.id().getDatabase(),
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testSyncOnlyInHiveCreateNotByArctic() throws Exception {
testKeyedHiveTable.id().getTableName(), partitionValues));
Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation());

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable);

hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client ->
client.getPartition(testKeyedHiveTable.id().getDatabase(),
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testSyncInBoth() throws Exception {
.commit();
Assert.assertNotEquals(newPartitionLocation, hivePartition.getSd().getLocation());

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable);

hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client ->
client.getPartition(testKeyedHiveTable.id().getDatabase(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
package com.netease.arctic.optimizer.operator.executor;

public interface Executor {
/**
* Execute and return the execute result.
*
* @return OptimizeTaskResult - only the Prepared task.
* @throws Exception - exception
*/
OptimizeTaskResult execute() throws Exception;

void close();
Expand Down

0 comments on commit fdac57e

Please sign in to comment.