Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARCTIC-930][AMS] Snapshot expire service should not expire snapshot with the lastest checkpoint id #1178

Merged
merged 7 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

public class SupportHiveSyncService implements ISupportHiveSyncService {
Expand Down Expand Up @@ -89,33 +88,32 @@ public static class SupportHiveSyncTask implements ScheduledTasks.Task {
@Override
public void run() {
long startTime = System.currentTimeMillis();
final String traceId = UUID.randomUUID().toString();
try {
LOG.info("[{}] {} start hive sync", traceId, tableIdentifier);
LOG.info("{} start hive sync", tableIdentifier);
ArcticCatalog catalog =
CatalogLoader.load(ServiceContainer.getTableMetastoreHandler(), tableIdentifier.getCatalog());
ArcticTable arcticTable = catalog.loadTable(tableIdentifier);
if (!TableTypeUtil.isHive(arcticTable)) {
LOG.debug("[{}] {} is not a support hive table", traceId, tableIdentifier);
LOG.debug("{} is not a support hive table", tableIdentifier);
return;
}

syncIcebergToHive(arcticTable, traceId);
syncIcebergToHive(arcticTable);
} catch (Exception e) {
LOG.error("[{}] {} hive sync failed", traceId, tableIdentifier, e);
LOG.error("{} hive sync failed", tableIdentifier, e);
} finally {
LOG.info("[{}] {} hive sync finished, cost {}ms", traceId, tableIdentifier,
LOG.info("{} hive sync finished, cost {}ms", tableIdentifier,
System.currentTimeMillis() - startTime);
}
}

public static void syncIcebergToHive(ArcticTable arcticTable, String traceId) throws Exception {
public static void syncIcebergToHive(ArcticTable arcticTable) throws Exception {
UnkeyedTable baseTable = arcticTable.isKeyedTable() ?
arcticTable.asKeyedTable().baseTable() : arcticTable.asUnkeyedTable();
StructLikeMap<Map<String, String>> partitionProperty = baseTable.partitionProperty();

if (arcticTable.spec().isUnpartitioned()) {
syncNoPartitionTable(arcticTable, partitionProperty, traceId);
syncNoPartitionTable(arcticTable, partitionProperty);
} else {
syncPartitionTable(arcticTable, partitionProperty);
}
Expand All @@ -126,11 +124,10 @@ public static void syncIcebergToHive(ArcticTable arcticTable, String traceId) th
* because only arctic update hive table location for unPartitioned table.
*/
private static void syncNoPartitionTable(ArcticTable arcticTable,
StructLikeMap<Map<String, String>> partitionProperty,
String traceId) {
StructLikeMap<Map<String, String>> partitionProperty) {
Map<String, String> property = partitionProperty.get(TablePropertyUtil.EMPTY_STRUCT);
if (property == null || property.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_HIVE_LOCATION) == null) {
LOG.debug("[{}] {} has no hive location in partition property", traceId, arcticTable.id());
LOG.debug("{} has no hive location in partition property", arcticTable.id());
return;
}

Expand All @@ -142,7 +139,7 @@ private static void syncNoPartitionTable(ArcticTable arcticTable,
return hiveTable.getSd().getLocation();
});
} catch (Exception e) {
LOG.error("[{}] {} get hive location failed", traceId, arcticTable.id(), e);
LOG.error("{} get hive location failed", arcticTable.id(), e);
return;
}

Expand All @@ -155,7 +152,7 @@ private static void syncNoPartitionTable(ArcticTable arcticTable,
return null;
});
} catch (Exception e) {
LOG.error("[{}] {} alter hive location failed", traceId, arcticTable.id(), e);
LOG.error("{} alter hive location failed", arcticTable.id(), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,13 +57,16 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class TableExpireService implements ITableExpireService {
private static final Logger LOG = LoggerFactory.getLogger(TableExpireService.class);
private static final long EXPIRE_INTERVAL = 3600_000; // 1 hour
/**
* the same with org.apache.iceberg.flink.sink.IcebergFilesCommitter#MAX_COMMITTED_CHECKPOINT_ID
*/
public static final String FLINK_MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";

private ScheduledTasks<TableIdentifier, TableExpireTask> cleanTasks;

Expand Down Expand Up @@ -90,10 +94,7 @@ public static class TableExpireTask implements ScheduledTasks.Task {

@Override
public void run() {
long startTime = System.currentTimeMillis();
final String traceId = UUID.randomUUID().toString();
try {
LOG.info("[{}] {} start expire", traceId, tableIdentifier);
ArcticCatalog catalog =
CatalogLoader.load(ServiceContainer.getTableMetastoreHandler(), tableIdentifier.getCatalog());
ArcticTable arcticTable = catalog.loadTable(tableIdentifier);
Expand All @@ -103,69 +104,92 @@ public void run() {
if (!needClean) {
return;
}
long changeDataTTL = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.CHANGE_DATA_TTL,
TableProperties.CHANGE_DATA_TTL_DEFAULT)) * 60 * 1000;
long baseSnapshotsKeepTime = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000;
long changeSnapshotsKeepTime = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES,
TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000;

Set<String> hiveLocation = new HashSet<>();
if (TableTypeUtil.isHive(arcticTable)) {
hiveLocation = HiveLocationUtils.getHiveLocation(arcticTable);
}
expireArcticTable(arcticTable);
} catch (Throwable t) {
LOG.error("unexpected expire error of table {} ", tableIdentifier, t);
}
}
}

if (arcticTable.isKeyedTable()) {
KeyedTable keyedArcticTable = arcticTable.asKeyedTable();
Set<String> finalHiveLocation = hiveLocation;
keyedArcticTable.io().doAs(() -> {
UnkeyedTable baseTable = keyedArcticTable.baseTable();
if (baseTable == null) {
LOG.warn("[{}] Base table is null: {} ", traceId, tableIdentifier);
return null;
}
UnkeyedTable changeTable = keyedArcticTable.changeTable();
if (changeTable == null) {
LOG.warn("[{}] Change table is null: {}", traceId, tableIdentifier);
return null;
}
public static void expireArcticTable(ArcticTable arcticTable) {
TableIdentifier tableIdentifier = arcticTable.id();
long startTime = System.currentTimeMillis();
LOG.info("{} start expire", tableIdentifier);

// get valid files in the change store which shouldn't physically delete when expire the snapshot
// in the base store
Set<String> baseExclude = UnKeyedTableUtil.getAllContentFilePath(changeTable);
baseExclude.addAll(finalHiveLocation);
expireSnapshots(baseTable, startTime - baseSnapshotsKeepTime, baseExclude);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("[{}] {} base expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime);

// delete ttl files
List<DataFileInfo> changeDataFiles = ServiceContainer.getFileInfoCacheService()
.getChangeTableTTLDataFiles(keyedArcticTable.id().buildTableIdentifier(),
System.currentTimeMillis() - changeDataTTL);
deleteChangeFile(keyedArcticTable, changeDataFiles);

// get valid files in the base store which shouldn't physically delete when expire the snapshot
// in the change store
Set<String> changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable);
changeExclude.addAll(finalHiveLocation);
expireSnapshots(changeTable, startTime - changeSnapshotsKeepTime, changeExclude);
return null;
});
LOG.info("[{}] {} expire cost total {} ms", traceId, arcticTable.id(),
System.currentTimeMillis() - startTime);
} else {
UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable();
expireSnapshots(unKeyedArcticTable, startTime - baseSnapshotsKeepTime, hiveLocation);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("[{}] {} unKeyedTable expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime);
}
} catch (Throwable t) {
LOG.error("[" + traceId + "] unexpected expire error of table " + tableIdentifier, t);
long changeDataTTL = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.CHANGE_DATA_TTL,
TableProperties.CHANGE_DATA_TTL_DEFAULT)) * 60 * 1000;
long baseSnapshotsKeepTime = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000;
long changeSnapshotsKeepTime = Long.parseLong(arcticTable.properties()
.getOrDefault(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES,
TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000;

Set<String> hiveLocations = new HashSet<>();
if (TableTypeUtil.isHive(arcticTable)) {
hiveLocations = HiveLocationUtils.getHiveLocation(arcticTable);
}

if (arcticTable.isKeyedTable()) {
KeyedTable keyedArcticTable = arcticTable.asKeyedTable();
Set<String> finalHiveLocations = hiveLocations;
keyedArcticTable.io().doAs(() -> {
UnkeyedTable baseTable = keyedArcticTable.baseTable();
UnkeyedTable changeTable = keyedArcticTable.changeTable();

// get valid files in the change store which shouldn't physically delete when expire the snapshot
// in the base store
Set<String> baseExcludePaths = UnKeyedTableUtil.getAllContentFilePath(changeTable);
baseExcludePaths.addAll(finalHiveLocations);
long latestBaseFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(baseTable);
expireSnapshots(baseTable, Math.min(latestBaseFlinkCommitTime, startTime - baseSnapshotsKeepTime),
baseExcludePaths);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("{} base expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime);

// delete ttl files
List<DataFileInfo> changeDataFiles = ServiceContainer.getFileInfoCacheService()
.getChangeTableTTLDataFiles(keyedArcticTable.id().buildTableIdentifier(),
System.currentTimeMillis() - changeDataTTL);
deleteChangeFile(keyedArcticTable, changeDataFiles);

// get valid files in the base store which shouldn't physically delete when expire the snapshot
// in the change store
Set<String> changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable);
changeExclude.addAll(finalHiveLocations);
long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(changeTable);
expireSnapshots(changeTable, Math.min(latestChangeFlinkCommitTime, startTime - changeSnapshotsKeepTime),
changeExclude);
return null;
});
LOG.info("{} expire cost total {} ms", arcticTable.id(),
System.currentTimeMillis() - startTime);
} else {
UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable();
long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unKeyedArcticTable);
expireSnapshots(unKeyedArcticTable, Math.min(latestFlinkCommitTime, startTime - baseSnapshotsKeepTime),
hiveLocations);
long baseCleanedTime = System.currentTimeMillis();
LOG.info("{} unKeyedTable expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime);
}
}

/**
* When committing a snapshot, Flink will write a checkpoint id into the snapshot summary.
* The latest snapshot with checkpoint id should not be expired or the flink job can't recover from state.
*
* @param table -
* @return commit time of snapshot with the latest flink checkpointId in summary
*/
public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) {
long latestCommitTime = Long.MAX_VALUE;
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.summary().containsKey(FLINK_MAX_COMMITTED_CHECKPOINT_ID)) {
latestCommitTime = snapshot.timestampMillis();
}
}
return latestCommitTime;
}

public static void deleteChangeFile(KeyedTable keyedTable, List<DataFileInfo> changeDataFiles) {
Expand Down Expand Up @@ -229,7 +253,7 @@ public static void deleteChangeFile(KeyedTable keyedTable, List<DataFileInfo> ch
public static void expireSnapshots(UnkeyedTable arcticInternalTable,
long olderThan,
Set<String> exclude) {
LOG.debug("start expire snapshots, the exclude is {}", exclude);
LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude);
final AtomicInteger toDeleteFiles = new AtomicInteger(0);
final AtomicInteger deleteFiles = new AtomicInteger(0);
Set<String> parentDirectory = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,37 @@ public void testExpiredChangeTableFilesInBase() throws Exception {
Assert.assertFalse(testKeyedTable.io().exists((String) s1Files.get(1).path()));
}

@Test
public void testNotExpireFlinkLatestCommit() throws IOException {
insertChangeDataFiles(1);
insertChangeDataFiles(2);
Assert.assertEquals(Long.MAX_VALUE,
TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable()));

AppendFiles appendFiles = testKeyedTable.changeTable().newAppend();
appendFiles.set(TableExpireService.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "100");
appendFiles.commit();
long checkpointTime = testKeyedTable.changeTable().currentSnapshot().timestampMillis();
Assert.assertEquals(checkpointTime,
TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable()));

AppendFiles appendFiles2 = testKeyedTable.changeTable().newAppend();
appendFiles2.set(TableExpireService.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "101");
appendFiles2.commit();
long checkpointTime2 = testKeyedTable.changeTable().currentSnapshot().timestampMillis();
Assert.assertEquals(checkpointTime2,
TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable()));

insertChangeDataFiles(2);
Assert.assertEquals(checkpointTime2,
TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable()));

testKeyedTable.updateProperties().set(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, "0").commit();
TableExpireService.expireArcticTable(testKeyedTable);

Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots()));
}

private List<DataFile> insertChangeDataFiles(long transactionId) throws IOException {
GenericChangeTaskWriter writer = GenericTaskWriters.builderFor(testKeyedTable)
.withChangeAction(ChangeAction.INSERT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testUnPartitionTableSyncInIceberg() throws Exception {
});
Assert.assertNotEquals(newLocation, hiveLocation);

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable);
hiveLocation = ((SupportHive) testUnPartitionKeyedHiveTable).getHMSClient().run(client -> {
Table hiveTable = client.getTable(testUnPartitionKeyedHiveTable.id().getDatabase(),
testUnPartitionKeyedHiveTable.id().getTableName());
Expand All @@ -85,7 +85,7 @@ public void testUnPartitionTableSyncNotInIceberg() throws Exception {
return hiveTable.getSd().getLocation();
});

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable);
String newHiveLocation = ((SupportHive) testUnPartitionKeyedHiveTable).getHMSClient().run(client -> {
Table hiveTable = client.getTable(testUnPartitionKeyedHiveTable.id().getDatabase(),
testUnPartitionKeyedHiveTable.id().getTableName());
Expand All @@ -110,7 +110,7 @@ public void testSyncOnlyInIceberg() throws Exception {
client.getPartition(testKeyedHiveTable.id().getDatabase(),
testKeyedHiveTable.id().getTableName(), partitionValues)));

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable);
Partition hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client ->
client.getPartition(testKeyedHiveTable.id().getDatabase(),
testKeyedHiveTable.id().getTableName(), partitionValues));
Expand Down Expand Up @@ -160,7 +160,7 @@ public void testSyncOnlyInHiveCreateByArctic() throws Exception {
testKeyedHiveTable.id().getTableName(), partitionValues));
Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation());

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable);

Assert.assertThrows(NoSuchObjectException.class, () -> ((SupportHive) testKeyedHiveTable).getHMSClient().run(client ->
client.getPartition(testKeyedHiveTable.id().getDatabase(),
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testSyncOnlyInHiveCreateNotByArctic() throws Exception {
testKeyedHiveTable.id().getTableName(), partitionValues));
Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation());

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable);

hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client ->
client.getPartition(testKeyedHiveTable.id().getDatabase(),
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testSyncInBoth() throws Exception {
.commit();
Assert.assertNotEquals(newPartitionLocation, hivePartition.getSd().getLocation());

SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest");
SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable);

hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client ->
client.getPartition(testKeyedHiveTable.id().getDatabase(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
package com.netease.arctic.optimizer.operator.executor;

public interface Executor {
/**
* Execute and return the execute result.
*
* @return OptimizeTaskResult - only the Prepared task.
* @throws Exception - exception
*/
OptimizeTaskResult execute() throws Exception;

void close();
Expand Down