From c7bcd608c44615af47ac47f216c2dc55d0a236f2 Mon Sep 17 00:00:00 2001 From: Xavier Bai Date: Thu, 30 Nov 2023 07:05:28 -0600 Subject: [PATCH] [AMORO-1960] Refactor data expiration to TableMaintainer (#2328) --- .../maintainer/IcebergTableMaintainer.java | 453 +++++++++++++++- .../maintainer/MixedTableMaintainer.java | 161 +++++- .../maintainer/TableMaintainer.java | 8 + .../server/table/DataExpirationConfig.java | 86 ++- .../table/executor/DataExpiringExecutor.java | 489 +----------------- .../arctic/server/utils/IcebergTableUtil.java | 8 + .../maintainer}/TestDataExpire.java | 58 ++- .../maintainer}/TestDataExpireHive.java | 2 +- .../maintainer}/TestDataExpireIceberg.java | 2 +- .../netease/arctic/table/TableProperties.java | 2 + docs/user-guides/configurations.md | 29 +- docs/user-guides/using-tables.md | 15 +- 12 files changed, 768 insertions(+), 545 deletions(-) rename ams/server/src/test/java/com/netease/arctic/server/{table/executor => optimizing/maintainer}/TestDataExpire.java (92%) rename ams/server/src/test/java/com/netease/arctic/server/{table/executor => optimizing/maintainer}/TestDataExpireHive.java (97%) rename ams/server/src/test/java/com/netease/arctic/server/{table/executor => optimizing/maintainer}/TestDataExpireIceberg.java (98%) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java index 2808075574..242d7e7bd4 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -20,10 +20,18 @@ import static org.apache.iceberg.relocated.com.google.common.primitives.Longs.min; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Strings; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.netease.arctic.ams.api.CommitMetaProducer; import com.netease.arctic.io.ArcticFileIO; import com.netease.arctic.io.PathInfo; import com.netease.arctic.io.SupportsFileSystemOperations; +import com.netease.arctic.op.SnapshotSummary; +import com.netease.arctic.server.ArcticServiceConstants; +import com.netease.arctic.server.table.DataExpirationConfig; import com.netease.arctic.server.table.TableConfiguration; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.server.utils.IcebergTableUtil; @@ -31,32 +39,56 @@ import com.netease.arctic.utils.CompatiblePropertyUtil; import com.netease.arctic.utils.TableFileUtil; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.SupportsPrefixOperations; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.time.Instant; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** Table maintainer for iceberg tables. */ public class IcebergTableMaintainer implements TableMaintainer { @@ -71,6 +103,9 @@ public class IcebergTableMaintainer implements TableMaintainer { public static final String FLINK_MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; + public static final String EXPIRE_TIMESTAMP_MS = "TIMESTAMP_MS"; + public static final String EXPIRE_TIMESTAMP_S = "TIMESTAMP_S"; + protected Table table; public IcebergTableMaintainer(Table table) { @@ -116,14 +151,7 @@ public void expireSnapshots(TableRuntime tableRuntime) { olderThanSnapshotNeedToExpire(tableRuntime), expireSnapshotNeedToExcludeFiles()); } - @Override - public void autoCreateTags(TableRuntime tableRuntime) { - new AutoCreateIcebergTagAction( - table, tableRuntime.getTableConfiguration().getTagConfiguration(), LocalDateTime.now()) - .execute(); - } - - void expireSnapshots(long mustOlderThan) { + public void expireSnapshots(long mustOlderThan) { expireSnapshots( olderThanSnapshotNeedToExpire(mustOlderThan), expireSnapshotNeedToExcludeFiles()); } @@ -163,6 +191,62 @@ public void expireSnapshots(long olderThan, Set exclude) { LOG.info("to delete {} files, success delete {} files", toDeleteFiles.get(), deleteFiles.get()); } + @Override + public void expireData(TableRuntime tableRuntime) { + try { + DataExpirationConfig expirationConfig = + tableRuntime.getTableConfiguration().getExpiringDataConfig(); + Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); + if (!expirationConfig.isValid(field, table.name())) { + return; + } + Instant startInstant; + if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) { + startInstant = Instant.now().atZone(getDefaultZoneId(field)).toInstant(); + } else { + startInstant = + Instant.ofEpochMilli(fetchLatestNonOptimizedSnapshotTime(table)) + .atZone(getDefaultZoneId(field)) + .toInstant(); + } + + expireDataFrom(expirationConfig, startInstant); + } catch (Throwable t) { + LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t); + } + } + + /** + * Purge data older than the specified UTC timestamp + * + * @param expirationConfig expiration configs + * @param instant timestamp/timestampz/long field type uses UTC, others will use the local time + * zone + */ + @VisibleForTesting + public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) { + long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli(); + LOG.info( + "Expiring data older than {} in table {} ", + Instant.ofEpochMilli(expireTimestamp) + .atZone( + getDefaultZoneId(table.schema().findField(expirationConfig.getExpirationField()))) + .toLocalDateTime(), + table.name()); + + Expression dataFilter = getDataExpression(table.schema(), expirationConfig, expireTimestamp); + + ExpireFiles expiredFiles = expiredFileScan(expirationConfig, dataFilter, expireTimestamp); + expireFiles(expiredFiles, expireTimestamp); + } + + @Override + public void autoCreateTags(TableRuntime tableRuntime) { + new AutoCreateIcebergTagAction( + table, tableRuntime.getTableConfiguration().getTagConfiguration(), LocalDateTime.now()) + .execute(); + } + protected void cleanContentFiles(long lastTime) { // For clean data files, should getRuntime valid files in the base store and the change store, // so acquire in advance @@ -201,8 +285,10 @@ protected long olderThanSnapshotNeedToExpire(long mustOlderThan) { // Latest checkpoint of flink need retain. If Flink does not continuously commit new snapshots, // it can lead to issues with table partitions not expiring. long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table); + // Retain the latest non-optimized snapshot for remember the real latest update + long latestNonOptimizedTime = fetchLatestNonOptimizedSnapshotTime(table); long olderThan = System.currentTimeMillis() - baseSnapshotsKeepTime; - return min(latestFlinkCommitTime, mustOlderThan, olderThan); + return min(latestFlinkCommitTime, latestNonOptimizedTime, mustOlderThan, olderThan); } protected Set expireSnapshotNeedToExcludeFiles() { @@ -324,6 +410,20 @@ public static long fetchOptimizingSnapshotTime(Table table, TableRuntime tableRu return Long.MAX_VALUE; } + /** + * When expiring historic data and `data-expire.since` is `CURRENT_SNAPSHOT`, the latest snapshot + * should not be produced by Amoro. + * + * @param table iceberg table + * @return the latest non-optimized snapshot timestamp + */ + public static long fetchLatestNonOptimizedSnapshotTime(Table table) { + Optional snapshot = + IcebergTableUtil.findSnapshot( + table, s -> s.summary().containsValue(CommitMetaProducer.OPTIMIZE.name())); + return snapshot.isPresent() ? snapshot.get().timestampMillis() : Long.MAX_VALUE; + } + private static int deleteInvalidFilesInFs( SupportsFileSystemOperations fio, String location, long lastTime, Set excludes) { if (!fio.exists(location)) { @@ -449,4 +549,335 @@ private static String formatTime(long timestamp) { return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()) .toString(); } + + CloseableIterable fileScan( + Table table, Expression dataFilter, DataExpirationConfig expirationConfig) { + TableScan tableScan = table.newScan().filter(dataFilter).includeColumnStats(); + + CloseableIterable tasks; + Snapshot snapshot = IcebergTableUtil.getSnapshot(table, false); + long snapshotId = snapshot.snapshotId(); + if (snapshotId == ArcticServiceConstants.INVALID_SNAPSHOT_ID) { + tasks = tableScan.planFiles(); + } else { + tasks = tableScan.useSnapshot(snapshotId).planFiles(); + } + long deleteFileCnt = + Long.parseLong( + snapshot + .summary() + .getOrDefault(org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")); + CloseableIterable dataFiles = + CloseableIterable.transform(tasks, ContentScanTask::file); + CloseableIterable hasDeleteTask = + deleteFileCnt > 0 + ? CloseableIterable.filter(tasks, t -> !t.deletes().isEmpty()) + : CloseableIterable.empty(); + + Set deleteFiles = + StreamSupport.stream(hasDeleteTask.spliterator(), true) + .flatMap(e -> e.deletes().stream()) + .collect(Collectors.toSet()); + + Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); + return CloseableIterable.transform( + CloseableIterable.withNoopClose( + com.google.common.collect.Iterables.concat(dataFiles, deleteFiles)), + contentFile -> { + Literal literal = + getExpireTimestampLiteral( + contentFile, + field, + DateTimeFormatter.ofPattern( + expirationConfig.getDateTimePattern(), Locale.getDefault()), + expirationConfig.getNumberDateFormat()); + return new FileEntry(contentFile.copyWithoutStats(), literal); + }); + } + + protected ExpireFiles expiredFileScan( + DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) { + Map partitionFreshness = Maps.newConcurrentMap(); + ExpireFiles expiredFiles = new ExpireFiles(); + try (CloseableIterable entries = fileScan(table, dataFilter, expirationConfig)) { + Queue fileEntries = new LinkedTransferQueue<>(); + entries.forEach( + e -> { + if (mayExpired(e, partitionFreshness, expireTimestamp)) { + fileEntries.add(e); + } + }); + fileEntries + .parallelStream() + .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness)) + .forEach(expiredFiles::addFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + return expiredFiles; + } + + /** + * Create a filter expression for expired files for the `FILE` level. For the `PARTITION` level, + * we need to collect the oldest files to determine if the partition is obsolete, so we will not + * filter for expired files at the scanning stage + * + * @param expirationConfig expiration configuration + * @param expireTimestamp expired timestamp + */ + protected static Expression getDataExpression( + Schema schema, DataExpirationConfig expirationConfig, long expireTimestamp) { + if (expirationConfig.getExpirationLevel().equals(DataExpirationConfig.ExpireLevel.PARTITION)) { + return Expressions.alwaysTrue(); + } + + Types.NestedField field = schema.findField(expirationConfig.getExpirationField()); + Type.TypeID typeID = field.type().typeId(); + switch (typeID) { + case TIMESTAMP: + return Expressions.lessThanOrEqual(field.name(), expireTimestamp * 1000); + case LONG: + if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_MS)) { + return Expressions.lessThanOrEqual(field.name(), expireTimestamp); + } else if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_S)) { + return Expressions.lessThanOrEqual(field.name(), expireTimestamp / 1000); + } else { + return Expressions.alwaysTrue(); + } + case STRING: + String expireDateTime = + LocalDateTime.ofInstant(Instant.ofEpochMilli(expireTimestamp), getDefaultZoneId(field)) + .format( + DateTimeFormatter.ofPattern( + expirationConfig.getDateTimePattern(), Locale.getDefault())); + return Expressions.lessThanOrEqual(field.name(), expireDateTime); + default: + return Expressions.alwaysTrue(); + } + } + + void expireFiles(ExpireFiles expiredFiles, long expireTimestamp) { + long snapshotId = IcebergTableUtil.getSnapshotId(table, false); + Queue dataFiles = expiredFiles.dataFiles; + Queue deleteFiles = expiredFiles.deleteFiles; + if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { + return; + } + // expire data files + DeleteFiles delete = table.newDelete(); + dataFiles.forEach(delete::deleteFile); + delete.set(SnapshotSummary.SNAPSHOT_PRODUCER, "DATA_EXPIRATION"); + delete.commit(); + // expire delete files + if (!deleteFiles.isEmpty()) { + RewriteFiles rewriteFiles = table.newRewrite().validateFromSnapshot(snapshotId); + deleteFiles.forEach(rewriteFiles::deleteFile); + rewriteFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, "DATA_EXPIRATION"); + rewriteFiles.commit(); + } + + // TODO: persistent table expiration record. Contains some meta information such as table_id, + // snapshotId, + // file_infos(file_content, path, recordCount, fileSizeInBytes, equalityFieldIds, + // partitionPath, + // sequenceNumber) and expireTimestamp... + + LOG.info( + "Expired {} files older than {}, {} data files[{}] and {} delete files[{}]", + table.name(), + expireTimestamp, + dataFiles.size(), + dataFiles.stream().map(ContentFile::path).collect(Collectors.joining(",")), + deleteFiles.size(), + deleteFiles.stream().map(ContentFile::path).collect(Collectors.joining(","))); + } + + public static class ExpireFiles { + Queue dataFiles; + Queue deleteFiles; + + ExpireFiles() { + this.dataFiles = new LinkedTransferQueue<>(); + this.deleteFiles = new LinkedTransferQueue<>(); + } + + void addFile(FileEntry entry) { + ContentFile file = entry.getFile(); + switch (file.content()) { + case DATA: + dataFiles.add((DataFile) file.copyWithoutStats()); + break; + case EQUALITY_DELETES: + case POSITION_DELETES: + deleteFiles.add((DeleteFile) file.copyWithoutStats()); + break; + default: + throw new IllegalArgumentException(file.content().name() + "cannot be expired"); + } + } + } + + public static class DataFileFreshness { + long latestExpiredSeq; + long latestUpdateMillis; + long expiredDataFileCount; + long totalDataFileCount; + + DataFileFreshness(long sequenceNumber, long latestUpdateMillis) { + this.latestExpiredSeq = sequenceNumber; + this.latestUpdateMillis = latestUpdateMillis; + } + + DataFileFreshness updateLatestMillis(long ts) { + this.latestUpdateMillis = ts; + return this; + } + + DataFileFreshness updateExpiredSeq(Long seq) { + this.latestExpiredSeq = seq; + return this; + } + + DataFileFreshness incTotalCount() { + totalDataFileCount++; + return this; + } + + DataFileFreshness incExpiredCount() { + expiredDataFileCount++; + return this; + } + } + + static boolean mayExpired( + FileEntry fileEntry, + Map partitionFreshness, + Long expireTimestamp) { + ContentFile contentFile = fileEntry.getFile(); + StructLike partition = contentFile.partition(); + + boolean expired = true; + if (contentFile.content().equals(FileContent.DATA)) { + Literal literal = fileEntry.getTsBound(); + if (partitionFreshness.containsKey(partition)) { + DataFileFreshness freshness = partitionFreshness.get(partition).incTotalCount(); + if (freshness.latestUpdateMillis <= literal.value()) { + partitionFreshness.put(partition, freshness.updateLatestMillis(literal.value())); + } + } else { + partitionFreshness.putIfAbsent( + partition, + new DataFileFreshness(fileEntry.getFile().dataSequenceNumber(), literal.value()) + .incTotalCount()); + } + expired = literal.comparator().compare(expireTimestamp, literal.value()) >= 0; + if (expired) { + partitionFreshness.computeIfPresent( + partition, + (k, v) -> + v.updateExpiredSeq(fileEntry.getFile().dataSequenceNumber()).incExpiredCount()); + } + } + + return expired; + } + + static boolean willNotRetain( + FileEntry fileEntry, + DataExpirationConfig expirationConfig, + Map partitionFreshness) { + ContentFile contentFile = fileEntry.getFile(); + + switch (expirationConfig.getExpirationLevel()) { + case PARTITION: + // if only partial expired files in a partition, all the files in that partition should be + // preserved + return partitionFreshness.containsKey(contentFile.partition()) + && partitionFreshness.get(contentFile.partition()).expiredDataFileCount + == partitionFreshness.get(contentFile.partition()).totalDataFileCount; + case FILE: + if (!contentFile.content().equals(FileContent.DATA)) { + long seqUpperBound = + partitionFreshness.getOrDefault( + contentFile.partition(), + new DataFileFreshness(Long.MIN_VALUE, Long.MAX_VALUE)) + .latestExpiredSeq; + // only expire delete files with sequence-number less or equal to expired data file + // there may be some dangling delete files, they will be cleaned by + // OrphanFileCleaningExecutor + return fileEntry.getFile().dataSequenceNumber() <= seqUpperBound; + } else { + return true; + } + default: + return false; + } + } + + private static Literal getExpireTimestampLiteral( + ContentFile contentFile, + Types.NestedField field, + DateTimeFormatter formatter, + String numberDateFormatter) { + Type type = field.type(); + Object upperBound = + Conversions.fromByteBuffer(type, contentFile.upperBounds().get(field.fieldId())); + Literal literal = Literal.of(Long.MAX_VALUE); + if (null == upperBound) { + return literal; + } else if (upperBound instanceof Long) { + switch (type.typeId()) { + case TIMESTAMP: + // nanosecond -> millisecond + literal = Literal.of((Long) upperBound / 1000); + break; + default: + if (numberDateFormatter.equals(EXPIRE_TIMESTAMP_MS)) { + literal = Literal.of((Long) upperBound); + } else if (numberDateFormatter.equals(EXPIRE_TIMESTAMP_S)) { + // second -> millisecond + literal = Literal.of((Long) upperBound * 1000); + } + } + } else if (type.typeId().equals(Type.TypeID.STRING)) { + literal = + Literal.of( + LocalDate.parse(upperBound.toString(), formatter) + .atStartOfDay() + .atZone(getDefaultZoneId(field)) + .toInstant() + .toEpochMilli()); + } + return literal; + } + + public Table getTable() { + return table; + } + + public static ZoneId getDefaultZoneId(Types.NestedField expireField) { + Type type = expireField.type(); + if (type.typeId() == Type.TypeID.STRING) { + return ZoneId.systemDefault(); + } + return ZoneOffset.UTC; + } + + public static class FileEntry { + private final ContentFile file; + private final Literal tsBound; + + FileEntry(ContentFile file, Literal tsBound) { + this.file = file; + this.tsBound = tsBound; + } + + public ContentFile getFile() { + return file; + } + + public Literal getTsBound() { + return tsBound; + } + } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java index 76ee0e1e14..a57807a0c9 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java @@ -18,10 +18,13 @@ package com.netease.arctic.server.optimizing.maintainer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import com.netease.arctic.IcebergFileEntry; import com.netease.arctic.data.FileNameRules; import com.netease.arctic.hive.utils.TableTypeUtil; import com.netease.arctic.scan.TableEntriesScan; +import com.netease.arctic.server.table.DataExpirationConfig; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.server.utils.HiveLocationUtil; import com.netease.arctic.server.utils.IcebergTableUtil; @@ -36,24 +39,35 @@ import com.netease.arctic.utils.TablePropertyUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileContent; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.primitives.Longs; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Instant; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.LinkedTransferQueue; import java.util.stream.Collectors; /** Table maintainer for mixed-iceberg and mixed-hive tables. */ @@ -120,11 +134,6 @@ public void expireSnapshots(TableRuntime tableRuntime) { baseMaintainer.expireSnapshots(tableRuntime); } - @Override - public void autoCreateTags(TableRuntime tableRuntime) { - throw new UnsupportedOperationException("Mixed table doesn't support auto create tags"); - } - protected void expireSnapshots(long mustOlderThan) { if (changeMaintainer != null) { changeMaintainer.expireSnapshots(mustOlderThan); @@ -132,6 +141,134 @@ protected void expireSnapshots(long mustOlderThan) { baseMaintainer.expireSnapshots(mustOlderThan); } + @Override + public void expireData(TableRuntime tableRuntime) { + try { + DataExpirationConfig expirationConfig = + tableRuntime.getTableConfiguration().getExpiringDataConfig(); + Types.NestedField field = + arcticTable.schema().findField(expirationConfig.getExpirationField()); + if (!expirationConfig.isValid(field, arcticTable.name())) { + return; + } + ZoneId defaultZone = IcebergTableMaintainer.getDefaultZoneId(field); + Instant startInstant; + if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) { + startInstant = Instant.now().atZone(defaultZone).toInstant(); + } else { + long latestBaseTs = + IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(baseMaintainer.getTable()); + long latestChangeTs = + changeMaintainer == null + ? Long.MAX_VALUE + : IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime( + changeMaintainer.getTable()); + long latestNonOptimizedTs = Longs.min(latestChangeTs, latestBaseTs); + + startInstant = Instant.ofEpochMilli(latestNonOptimizedTs).atZone(defaultZone).toInstant(); + } + expireDataFrom(expirationConfig, startInstant); + } catch (Throwable t) { + LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t); + } + } + + @VisibleForTesting + public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) { + long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli(); + Types.NestedField field = arcticTable.schema().findField(expirationConfig.getExpirationField()); + LOG.info( + "Expiring data older than {} in mixed table {} ", + Instant.ofEpochMilli(expireTimestamp) + .atZone(IcebergTableMaintainer.getDefaultZoneId(field)) + .toLocalDateTime(), + arcticTable.name()); + + Expression dataFilter = + IcebergTableMaintainer.getDataExpression( + arcticTable.schema(), expirationConfig, expireTimestamp); + + Pair mixedExpiredFiles = + mixedExpiredFileScan(expirationConfig, dataFilter, expireTimestamp); + + expireMixedFiles(mixedExpiredFiles.getLeft(), mixedExpiredFiles.getRight(), expireTimestamp); + } + + private Pair + mixedExpiredFileScan( + DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) { + return arcticTable.isKeyedTable() + ? keyedExpiredFileScan(expirationConfig, dataFilter, expireTimestamp) + : Pair.of( + new IcebergTableMaintainer.ExpireFiles(), + getBaseMaintainer().expiredFileScan(expirationConfig, dataFilter, expireTimestamp)); + } + + private Pair + keyedExpiredFileScan( + DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) { + Map partitionFreshness = + Maps.newConcurrentMap(); + + KeyedTable keyedTable = arcticTable.asKeyedTable(); + ChangeTable changeTable = keyedTable.changeTable(); + BaseTable baseTable = keyedTable.baseTable(); + + CloseableIterable changeEntries = + CloseableIterable.transform( + changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig), + e -> new MixedFileEntry(e.getFile(), e.getTsBound(), true)); + CloseableIterable baseEntries = + CloseableIterable.transform( + baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig), + e -> new MixedFileEntry(e.getFile(), e.getTsBound(), false)); + IcebergTableMaintainer.ExpireFiles changeExpiredFiles = + new IcebergTableMaintainer.ExpireFiles(); + IcebergTableMaintainer.ExpireFiles baseExpiredFiles = new IcebergTableMaintainer.ExpireFiles(); + + try (CloseableIterable entries = + CloseableIterable.withNoopClose( + com.google.common.collect.Iterables.concat(changeEntries, baseEntries))) { + Queue fileEntries = new LinkedTransferQueue<>(); + entries.forEach( + e -> { + if (IcebergTableMaintainer.mayExpired(e, partitionFreshness, expireTimestamp)) { + fileEntries.add(e); + } + }); + fileEntries + .parallelStream() + .filter( + e -> IcebergTableMaintainer.willNotRetain(e, expirationConfig, partitionFreshness)) + .forEach( + e -> { + if (e.isChange()) { + changeExpiredFiles.addFile(e); + } else { + baseExpiredFiles.addFile(e); + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return Pair.of(changeExpiredFiles, baseExpiredFiles); + } + + private void expireMixedFiles( + IcebergTableMaintainer.ExpireFiles changeFiles, + IcebergTableMaintainer.ExpireFiles baseFiles, + long expireTimestamp) { + Optional.ofNullable(changeMaintainer) + .ifPresent(c -> c.expireFiles(changeFiles, expireTimestamp)); + Optional.ofNullable(baseMaintainer).ifPresent(c -> c.expireFiles(baseFiles, expireTimestamp)); + } + + @Override + public void autoCreateTags(TableRuntime tableRuntime) { + throw new UnsupportedOperationException("Mixed table doesn't support auto create tags"); + } + protected void cleanContentFiles(long lastTime) { if (changeMaintainer != null) { changeMaintainer.cleanContentFiles(lastTime); @@ -341,4 +478,18 @@ protected Set expireSnapshotNeedToExcludeFiles() { return Sets.union(changeFiles, hiveFiles); } } + + public static class MixedFileEntry extends IcebergTableMaintainer.FileEntry { + + private final boolean isChange; + + MixedFileEntry(ContentFile file, Literal tsBound, boolean isChange) { + super(file, tsBound); + this.isChange = isChange; + } + + public boolean isChange() { + return isChange; + } + } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/TableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/TableMaintainer.java index 75a1b499c1..f439512dc8 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/TableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/TableMaintainer.java @@ -42,6 +42,14 @@ public interface TableMaintainer { */ void expireSnapshots(TableRuntime tableRuntime); + /** + * Expire historical data based on the expiration field, and data that exceeds the retention + * period will be purged + * + * @param tableRuntime TableRuntime + */ + void expireData(TableRuntime tableRuntime); + /** Auto create tags for table. */ void autoCreateTags(TableRuntime tableRuntime); diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java b/ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java index 93747e91c2..bfe55fcc16 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java @@ -12,6 +12,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Locale; import java.util.Map; @@ -31,6 +33,8 @@ public class DataExpirationConfig { private String dateTimePattern; // data-expire.datetime-number-format private String numberDateFormat; + // data-expire.since + private Since since; @VisibleForTesting public enum ExpireLevel { @@ -47,9 +51,27 @@ public static ExpireLevel fromString(String level) { } } + @VisibleForTesting + public enum Since { + LATEST_SNAPSHOT, + CURRENT_TIMESTAMP; + + public static Since fromString(String since) { + Preconditions.checkArgument(null != since, "data-expire.since is invalid: null"); + try { + return Since.valueOf(since.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format("Unable to expire data since: %s", since), e); + } + } + } + public static final Set FIELD_TYPES = Sets.newHashSet(Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG); + private static final Logger LOG = LoggerFactory.getLogger(DataExpirationConfig.class); + public DataExpirationConfig() {} public DataExpirationConfig( @@ -58,13 +80,15 @@ public DataExpirationConfig( ExpireLevel expirationLevel, long retentionTime, String dateTimePattern, - String numberDateFormat) { + String numberDateFormat, + Since since) { this.enabled = enabled; this.expirationField = expirationField; this.expirationLevel = expirationLevel; this.retentionTime = retentionTime; this.dateTimePattern = dateTimePattern; this.numberDateFormat = numberDateFormat; + this.since = since; } public DataExpirationConfig(ArcticTable table) { @@ -109,6 +133,12 @@ public DataExpirationConfig(ArcticTable table) { properties, TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT, TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT); + since = + Since.fromString( + CompatiblePropertyUtil.propertyAsString( + properties, + TableProperties.DATA_EXPIRATION_SINCE, + TableProperties.DATA_EXPIRATION_SINCE_DEFAULT)); } public static DataExpirationConfig parse(Map properties) { @@ -137,7 +167,13 @@ public static DataExpirationConfig parse(Map properties) { CompatiblePropertyUtil.propertyAsString( properties, TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT, - TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT)); + TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT)) + .setSince( + Since.fromString( + CompatiblePropertyUtil.propertyAsString( + properties, + TableProperties.DATA_EXPIRATION_SINCE, + TableProperties.DATA_EXPIRATION_SINCE_DEFAULT))); String retention = CompatiblePropertyUtil.propertyAsString( properties, TableProperties.DATA_EXPIRATION_RETENTION_TIME, null); @@ -202,6 +238,15 @@ public DataExpirationConfig setNumberDateFormat(String numberDateFormat) { return this; } + public Since getSince() { + return since; + } + + public DataExpirationConfig setSince(Since since) { + this.since = since; + return this; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -216,7 +261,8 @@ public boolean equals(Object o) { && Objects.equal(expirationField, config.expirationField) && expirationLevel == config.expirationLevel && Objects.equal(dateTimePattern, config.dateTimePattern) - && Objects.equal(numberDateFormat, config.numberDateFormat); + && Objects.equal(numberDateFormat, config.numberDateFormat) + && since == config.since; } @Override @@ -227,6 +273,38 @@ public int hashCode() { expirationLevel, retentionTime, dateTimePattern, - numberDateFormat); + numberDateFormat, + since); + } + + public boolean isValid(Types.NestedField field, String name) { + return isEnabled() + && getRetentionTime() > 0 + && validateExpirationField(field, name, getExpirationField()); + } + + private boolean validateExpirationField( + Types.NestedField field, String name, String expirationField) { + if (StringUtils.isBlank(expirationField) || null == field) { + LOG.warn( + String.format( + "Field(%s) used to determine data expiration is illegal for table(%s)", + expirationField, name)); + return false; + } + Type.TypeID typeID = field.type().typeId(); + if (!DataExpirationConfig.FIELD_TYPES.contains(typeID)) { + LOG.warn( + String.format( + "Table(%s) field(%s) type(%s) is not supported for data expiration, please use the " + + "following types: %s", + name, + expirationField, + typeID.name(), + StringUtils.join(DataExpirationConfig.FIELD_TYPES, ", "))); + return false; + } + + return true; } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/executor/DataExpiringExecutor.java b/ams/server/src/main/java/com/netease/arctic/server/table/executor/DataExpiringExecutor.java index f29ba568a6..f5c0abca1f 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/executor/DataExpiringExecutor.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/executor/DataExpiringExecutor.java @@ -18,59 +18,15 @@ package com.netease.arctic.server.table.executor; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.netease.arctic.IcebergFileEntry; -import com.netease.arctic.op.SnapshotSummary; -import com.netease.arctic.server.ArcticServiceConstants; -import com.netease.arctic.server.table.DataExpirationConfig; +import com.netease.arctic.AmoroTable; +import com.netease.arctic.server.optimizing.maintainer.TableMaintainer; import com.netease.arctic.server.table.TableConfiguration; import com.netease.arctic.server.table.TableManager; import com.netease.arctic.server.table.TableRuntime; -import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.BaseTable; -import com.netease.arctic.table.ChangeTable; -import com.netease.arctic.table.KeyedTable; -import com.netease.arctic.table.UnkeyedTable; -import com.netease.arctic.utils.ManifestEntryFields; -import org.apache.commons.lang3.StringUtils; -import org.apache.iceberg.ContentFile; -import org.apache.iceberg.ContentScanTask; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.DeleteFiles; -import org.apache.iceberg.FileContent; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.RewriteFiles; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.time.Duration; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public class DataExpiringExecutor extends BaseTableExecutor { @@ -78,9 +34,6 @@ public class DataExpiringExecutor extends BaseTableExecutor { private final Duration interval; - public static final String EXPIRE_TIMESTAMP_MS = "TIMESTAMP_MS"; - public static final String EXPIRE_TIMESTAMP_S = "TIMESTAMP_S"; - protected DataExpiringExecutor(TableManager tableManager, int poolSize, Duration interval) { super(tableManager, poolSize); this.interval = interval; @@ -101,444 +54,14 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or scheduleIfNecessary(tableRuntime, getStartDelay()); } - private boolean validateExpirationField(ArcticTable table, String expirationField) { - Types.NestedField field = table.schema().findField(expirationField); - - if (StringUtils.isBlank(expirationField) || null == field) { - LOG.warn( - String.format( - "Field(%s) used to determine data expiration is illegal for table(%s)", - expirationField, table.name())); - return false; - } - Type.TypeID typeID = field.type().typeId(); - if (!DataExpirationConfig.FIELD_TYPES.contains(typeID)) { - LOG.warn( - String.format( - "Table(%s) field(%s) type(%s) is not supported for data expiration, please use the " - + "following types: %s", - table.name(), - expirationField, - typeID.name(), - StringUtils.join(DataExpirationConfig.FIELD_TYPES, ", "))); - return false; - } - - return true; - } - @Override protected void execute(TableRuntime tableRuntime) { try { - ArcticTable arcticTable = (ArcticTable) loadTable(tableRuntime).originalTable(); - DataExpirationConfig expirationConfig = - tableRuntime.getTableConfiguration().getExpiringDataConfig(); - if (!expirationConfig.isEnabled() - || expirationConfig.getRetentionTime() <= 0 - || !validateExpirationField(arcticTable, expirationConfig.getExpirationField())) { - return; - } - - purgeTableFrom( - arcticTable, - expirationConfig, - Instant.now() - .atZone( - getDefaultZoneId( - arcticTable.schema().findField(expirationConfig.getExpirationField()))) - .toInstant()); + AmoroTable amoroTable = loadTable(tableRuntime); + TableMaintainer tableMaintainer = TableMaintainer.ofTable(amoroTable); + tableMaintainer.expireData(tableRuntime); } catch (Throwable t) { - LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t); - } - } - - /** - * Purge data older than the specified UTC timestamp - * - * @param table Arctic table - * @param expirationConfig expiration configs - * @param instant timestamp/timestampz/long field type uses UTC, others will use the local time - * zone - */ - protected static void purgeTableFrom( - ArcticTable table, DataExpirationConfig expirationConfig, Instant instant) { - long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli(); - LOG.info( - "Expiring Data older than {} in table {} ", - Instant.ofEpochMilli(expireTimestamp) - .atZone( - getDefaultZoneId(table.schema().findField(expirationConfig.getExpirationField()))) - .toLocalDateTime(), - table.name()); - - purgeTableData(table, expirationConfig, expireTimestamp); - } - - protected static ZoneId getDefaultZoneId(Types.NestedField expireField) { - Type type = expireField.type(); - if (type.typeId() == Type.TypeID.STRING) { - return ZoneId.systemDefault(); - } - return ZoneOffset.UTC; - } - - private static CloseableIterable fileScan( - UnkeyedTable table, Expression partitionFilter, Snapshot snapshot) { - TableScan tableScan = table.newScan().filter(partitionFilter).includeColumnStats(); - - CloseableIterable tasks; - long snapshotId = getSnapshotId(snapshot); - if (snapshotId == ArcticServiceConstants.INVALID_SNAPSHOT_ID) { - tasks = tableScan.planFiles(); - } else { - tasks = tableScan.useSnapshot(snapshotId).planFiles(); - } - CloseableIterable dataFiles = - CloseableIterable.transform(tasks, ContentScanTask::file); - Set deleteFiles = - StreamSupport.stream(tasks.spliterator(), false) - .flatMap(e -> e.deletes().stream()) - .collect(Collectors.toSet()); - - return CloseableIterable.transform( - CloseableIterable.withNoopClose(Iterables.concat(dataFiles, deleteFiles)), - contentFile -> - new IcebergFileEntry( - snapshotId, - contentFile.dataSequenceNumber(), - ManifestEntryFields.Status.EXISTING, - contentFile)); - } - - private static void purgeTableData( - ArcticTable table, DataExpirationConfig expirationConfig, long expireTimestamp) { - Expression dataFilter = getDataExpression(table, expirationConfig, expireTimestamp); - Map partitionFreshness = Maps.newHashMap(); - - if (table.isKeyedTable()) { - KeyedTable keyedTable = table.asKeyedTable(); - ChangeTable changeTable = keyedTable.changeTable(); - BaseTable baseTable = keyedTable.baseTable(); - Snapshot changeSnapshot = changeTable.currentSnapshot(); - Snapshot baseSnapshot = baseTable.currentSnapshot(); - - CloseableIterable changeEntries = - fileScan(changeTable, dataFilter, changeSnapshot); - CloseableIterable baseEntries = - fileScan(baseTable, dataFilter, baseSnapshot); - ExpireFiles changeExpiredFiles = new ExpireFiles(); - ExpireFiles baseExpiredFiles = new ExpireFiles(); - CloseableIterable changed = - CloseableIterable.transform(changeEntries, e -> new FileEntry(e, true)); - CloseableIterable based = - CloseableIterable.transform(baseEntries, e -> new FileEntry(e, false)); - - try (CloseableIterable entries = - CloseableIterable.withNoopClose(Iterables.concat(changed, based))) { - CloseableIterable mayExpiredFiles = - CloseableIterable.withNoopClose( - Lists.newArrayList( - CloseableIterable.filter( - entries, - e -> - mayExpired( - table, e, expirationConfig, partitionFreshness, expireTimestamp)))); - CloseableIterable.filter( - mayExpiredFiles, e -> willNotRetain(e, expirationConfig, partitionFreshness)) - .forEach( - e -> { - if (e.isChange) { - changeExpiredFiles.addFile(e); - } else { - baseExpiredFiles.addFile(e); - } - }); - } catch (IOException e) { - throw new RuntimeException(e); - } - expireFiles(changeTable, getSnapshotId(changeSnapshot), changeExpiredFiles, expireTimestamp); - expireFiles(baseTable, getSnapshotId(baseSnapshot), baseExpiredFiles, expireTimestamp); - } else { - UnkeyedTable unkeyedTable = table.asUnkeyedTable(); - Snapshot snapshot = unkeyedTable.currentSnapshot(); - ExpireFiles expiredFiles = new ExpireFiles(); - try (CloseableIterable entries = - fileScan(unkeyedTable, dataFilter, snapshot)) { - CloseableIterable mayExpiredFiles = - CloseableIterable.withNoopClose( - Lists.newArrayList( - CloseableIterable.filter( - entries, - e -> - mayExpired( - table, e, expirationConfig, partitionFreshness, expireTimestamp)))); - CloseableIterable.filter( - mayExpiredFiles, e -> willNotRetain(e, expirationConfig, partitionFreshness)) - .forEach(expiredFiles::addFile); - } catch (IOException e) { - throw new RuntimeException(e); - } - expireFiles(unkeyedTable, getSnapshotId(snapshot), expiredFiles, expireTimestamp); - } - } - - /** - * Create a filter expression for expired files for the `FILE` level. For the `PARTITION` level, - * we need to collect the oldest files to determine if the partition is obsolete, so we will not - * filter for expired files at the scanning stage - * - * @param expirationConfig expiration configuration - * @param expireTimestamp expired timestamp - * @return filter expression - */ - private static Expression getDataExpression( - ArcticTable table, DataExpirationConfig expirationConfig, long expireTimestamp) { - if (expirationConfig.getExpirationLevel().equals(DataExpirationConfig.ExpireLevel.PARTITION)) { - return Expressions.alwaysTrue(); - } - - Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); - Type.TypeID typeID = field.type().typeId(); - switch (typeID) { - case TIMESTAMP: - return Expressions.lessThanOrEqual(field.name(), expireTimestamp * 1000); - case LONG: - if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_MS)) { - return Expressions.lessThanOrEqual(field.name(), expireTimestamp); - } else if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_S)) { - return Expressions.lessThanOrEqual(field.name(), expireTimestamp / 1000); - } else { - return Expressions.alwaysTrue(); - } - case STRING: - String expireDateTime = - LocalDateTime.ofInstant(Instant.ofEpochMilli(expireTimestamp), getDefaultZoneId(field)) - .format( - DateTimeFormatter.ofPattern( - expirationConfig.getDateTimePattern(), Locale.getDefault())); - return Expressions.lessThanOrEqual(field.name(), expireDateTime); - default: - return Expressions.alwaysTrue(); - } - } - - private static void expireFiles( - UnkeyedTable table, long snapshotId, ExpireFiles expiredFiles, long expireTimestamp) { - List dataFiles = expiredFiles.dataFiles; - List deleteFiles = expiredFiles.deleteFiles; - if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { - return; - } - // expire data files - DeleteFiles delete = table.newDelete(); - dataFiles.forEach(delete::deleteFile); - delete.set(SnapshotSummary.SNAPSHOT_PRODUCER, "DATA_EXPIRATION"); - delete.commit(); - // expire delete files - if (!deleteFiles.isEmpty()) { - RewriteFiles rewriteFiles = table.newRewrite().validateFromSnapshot(snapshotId); - deleteFiles.forEach(rewriteFiles::deleteFile); - rewriteFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, "DATA_EXPIRATION"); - rewriteFiles.commit(); - } - - // TODO: persistent table expiration record. Contains some meta information such as table_id, - // snapshotId, - // file_infos(file_content, path, recordCount, fileSizeInBytes, equalityFieldIds, - // partitionPath, - // sequenceNumber) and expireTimestamp... - - LOG.info( - "Expired {} files older than {}, {} data files[{}] and {} delete files[{}]", - table.name(), - expireTimestamp, - dataFiles.size(), - dataFiles.stream().map(ContentFile::path).collect(Collectors.joining(",")), - deleteFiles.size(), - deleteFiles.stream().map(ContentFile::path).collect(Collectors.joining(","))); - } - - private static class ExpireFiles { - List dataFiles; - List deleteFiles; - - ExpireFiles() { - this.dataFiles = new LinkedList<>(); - this.deleteFiles = new LinkedList<>(); - } - - void addFile(IcebergFileEntry entry) { - ContentFile file = entry.getFile(); - switch (file.content()) { - case DATA: - dataFiles.add((DataFile) file.copyWithoutStats()); - break; - case EQUALITY_DELETES: - case POSITION_DELETES: - deleteFiles.add((DeleteFile) file.copyWithoutStats()); - break; - default: - throw new IllegalArgumentException(file.content().name() + "cannot be expired"); - } - } - } - - private static class DataFileFreshness { - long latestExpiredSeq; - long latestUpdateMillis; - long expiredDataFileCount; - long totalDataFileCount; - - DataFileFreshness(long sequenceNumber, long latestUpdateMillis) { - this.latestExpiredSeq = sequenceNumber; - this.latestUpdateMillis = latestUpdateMillis; - } - - DataFileFreshness updateLatestMillis(long ts) { - this.latestUpdateMillis = ts; - return this; - } - - DataFileFreshness updateExpiredSeq(Long seq) { - this.latestExpiredSeq = seq; - return this; - } - - DataFileFreshness incTotalCount() { - totalDataFileCount++; - return this; - } - - DataFileFreshness incExpiredCount() { - expiredDataFileCount++; - return this; - } - } - - private static boolean mayExpired( - ArcticTable table, - IcebergFileEntry fileEntry, - DataExpirationConfig expirationConfig, - Map partitionFreshness, - Long expireTimestamp) { - ContentFile contentFile = fileEntry.getFile(); - StructLike partition = contentFile.partition(); - - boolean expired = true; - Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); - if (contentFile.content().equals(FileContent.DATA)) { - Literal literal = - getExpireTimestampLiteral( - contentFile, - field, - DateTimeFormatter.ofPattern( - expirationConfig.getDateTimePattern(), Locale.getDefault()), - expirationConfig.getNumberDateFormat()); - if (partitionFreshness.containsKey(partition)) { - DataFileFreshness freshness = partitionFreshness.get(partition).incTotalCount(); - if (freshness.latestUpdateMillis <= literal.value()) { - partitionFreshness.put(partition, freshness.updateLatestMillis(literal.value())); - } - } else { - partitionFreshness.putIfAbsent( - partition, - new DataFileFreshness(fileEntry.getSequenceNumber(), literal.value()).incTotalCount()); - } - expired = literal.comparator().compare(expireTimestamp, literal.value()) >= 0; - if (expired) { - partitionFreshness.computeIfPresent( - partition, - (k, v) -> v.updateExpiredSeq(fileEntry.getSequenceNumber()).incExpiredCount()); - } - } - - return expired; - } - - private static boolean willNotRetain( - IcebergFileEntry fileEntry, - DataExpirationConfig expirationConfig, - Map partitionFreshness) { - ContentFile contentFile = fileEntry.getFile(); - - switch (expirationConfig.getExpirationLevel()) { - case PARTITION: - // if only partial expired files in a partition, all the files in that partition should be - // preserved - return partitionFreshness.containsKey(contentFile.partition()) - && partitionFreshness.get(contentFile.partition()).expiredDataFileCount - == partitionFreshness.get(contentFile.partition()).totalDataFileCount; - case FILE: - if (!contentFile.content().equals(FileContent.DATA)) { - long seqUpperBound = - partitionFreshness.getOrDefault( - contentFile.partition(), - new DataFileFreshness(Long.MIN_VALUE, Long.MAX_VALUE)) - .latestExpiredSeq; - // only expire delete files with sequence-number less or equal to expired data file - // there may be some dangling delete files, they will be cleaned by - // OrphanFileCleaningExecutor - return fileEntry.getSequenceNumber() <= seqUpperBound; - } else { - return true; - } - default: - return false; - } - } - - private static Literal getExpireTimestampLiteral( - ContentFile contentFile, - Types.NestedField field, - DateTimeFormatter formatter, - String numberDateFormatter) { - Type type = field.type(); - Object upperBound = - Conversions.fromByteBuffer(type, contentFile.upperBounds().get(field.fieldId())); - Literal literal = Literal.of(Long.MAX_VALUE); - if (null == upperBound) { - return literal; - } else if (upperBound instanceof Long) { - switch (type.typeId()) { - case TIMESTAMP: - // nanosecond -> millisecond - literal = Literal.of((Long) upperBound / 1000); - break; - default: - if (numberDateFormatter.equals(EXPIRE_TIMESTAMP_MS)) { - literal = Literal.of((Long) upperBound); - } else if (numberDateFormatter.equals(EXPIRE_TIMESTAMP_S)) { - // second -> millisecond - literal = Literal.of((Long) upperBound * 1000); - } - } - } else if (type.typeId().equals(Type.TypeID.STRING)) { - literal = - Literal.of( - LocalDate.parse(upperBound.toString(), formatter) - .atStartOfDay() - .atZone(getDefaultZoneId(field)) - .toInstant() - .toEpochMilli()); - } - return literal; - } - - private static long getSnapshotId(Snapshot snapshot) { - return null == snapshot ? ArcticServiceConstants.INVALID_SNAPSHOT_ID : snapshot.snapshotId(); - } - - protected static class FileEntry extends IcebergFileEntry { - - private final boolean isChange; - - FileEntry(IcebergFileEntry fileEntry, boolean isChange) { - super( - fileEntry.getSnapshotId(), - fileEntry.getSequenceNumber(), - fileEntry.getStatus(), - fileEntry.getFile()); - this.isChange = isChange; + LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t); } } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java index 170221a040..969fc441ab 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java +++ b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java @@ -18,6 +18,9 @@ package com.netease.arctic.server.utils; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.netease.arctic.IcebergFileEntry; import com.netease.arctic.scan.TableEntriesScan; import com.netease.arctic.server.ArcticServiceConstants; @@ -75,6 +78,11 @@ public static Snapshot getSnapshot(Table table, boolean refresh) { return table.currentSnapshot(); } + public static Optional findSnapshot(Table table, Predicate predicate) { + Iterable snapshots = table.snapshots(); + return Iterables.tryFind(snapshots, predicate); + } + public static Set getAllContentFilePath(Table internalTable) { Set validFilesPath = new HashSet<>(); diff --git a/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestDataExpire.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestDataExpire.java similarity index 92% rename from ams/server/src/test/java/com/netease/arctic/server/table/executor/TestDataExpire.java rename to ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestDataExpire.java index 8cdc26b05f..97f16023f2 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestDataExpire.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestDataExpire.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.server.table.executor; +package com.netease.arctic.server.optimizing.maintainer; import static com.netease.arctic.BasicTableTestHelper.PRIMARY_KEY_SPEC; import static com.netease.arctic.BasicTableTestHelper.SPEC; @@ -37,6 +37,7 @@ import com.netease.arctic.server.optimizing.scan.UnkeyedTableFileScanHelper; import com.netease.arctic.server.table.DataExpirationConfig; import com.netease.arctic.server.table.KeyedTableSnapshot; +import com.netease.arctic.server.table.executor.ExecutorTestBase; import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.KeyedTable; @@ -183,14 +184,8 @@ private void testUnKeyedPartitionLevel() { getArcticTable(), tableTestHelper().writeBaseStore(getArcticTable(), 0, records, false)); DataExpirationConfig config = new DataExpirationConfig(getArcticTable()); - DataExpiringExecutor.purgeTableFrom( - getArcticTable(), - config, - LocalDateTime.parse("2022-01-03T18:00:00.000") - .atZone( - DataExpiringExecutor.getDefaultZoneId( - getArcticTable().schema().findField(config.getExpirationField()))) - .toInstant()); + + getMaintainerAndExpire(config, "2022-01-03T18:00:00.000"); List result = readSortedBaseRecords(getArcticTable()); @@ -241,12 +236,12 @@ private void testKeyedPartitionLevel() { // expire partitions that order than 2022-01-02 18:00:00.000 DataExpirationConfig config = new DataExpirationConfig(keyedTable); - DataExpiringExecutor.purgeTableFrom( - keyedTable, + MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(keyedTable); + tableMaintainer.expireDataFrom( config, LocalDateTime.parse("2022-01-03T18:00:00.000") .atZone( - DataExpiringExecutor.getDefaultZoneId( + IcebergTableMaintainer.getDefaultZoneId( keyedTable.schema().findField(config.getExpirationField()))) .toInstant()); @@ -324,12 +319,12 @@ private void testKeyedFileLevel() { // expire partitions that order than 2022-01-02 18:00:00.000 DataExpirationConfig config = new DataExpirationConfig(keyedTable); - DataExpiringExecutor.purgeTableFrom( - keyedTable, + MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(getArcticTable()); + mixedTableMaintainer.expireDataFrom( config, LocalDateTime.parse("2022-01-03T18:00:00.000") .atZone( - DataExpiringExecutor.getDefaultZoneId( + IcebergTableMaintainer.getDefaultZoneId( keyedTable.schema().findField(config.getExpirationField()))) .toInstant()); @@ -362,14 +357,8 @@ private void testUnKeyedFileLevel() { // expire partitions that order than 2022-01-02 18:00:00.000 DataExpirationConfig config = new DataExpirationConfig(getArcticTable()); - DataExpiringExecutor.purgeTableFrom( - getArcticTable(), - config, - LocalDateTime.parse("2022-01-03T18:00:00.000") - .atZone( - DataExpiringExecutor.getDefaultZoneId( - getArcticTable().schema().findField(config.getExpirationField()))) - .toInstant()); + + getMaintainerAndExpire(config, "2022-01-03T18:00:00.000"); List result = readSortedBaseRecords(getArcticTable()); @@ -387,6 +376,29 @@ private void testUnKeyedFileLevel() { Assert.assertEquals(expected, result); } + protected void getMaintainerAndExpire(DataExpirationConfig config, String datetime) { + if (getTestFormat().equals(TableFormat.ICEBERG)) { + IcebergTableMaintainer icebergTableMaintainer = + new IcebergTableMaintainer(getArcticTable().asUnkeyedTable()); + icebergTableMaintainer.expireDataFrom( + config, + LocalDateTime.parse(datetime) + .atZone( + IcebergTableMaintainer.getDefaultZoneId( + getArcticTable().schema().findField(config.getExpirationField()))) + .toInstant()); + } else { + MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(getArcticTable()); + mixedTableMaintainer.expireDataFrom( + config, + LocalDateTime.parse(datetime) + .atZone( + IcebergTableMaintainer.getDefaultZoneId( + getArcticTable().schema().findField(config.getExpirationField()))) + .toInstant()); + } + } + @Test public void testNormalFieldPartitionLevel() { getArcticTable().updateProperties().set(TableProperties.DATA_EXPIRATION_FIELD, "ts").commit(); diff --git a/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestDataExpireHive.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestDataExpireHive.java similarity index 97% rename from ams/server/src/test/java/com/netease/arctic/server/table/executor/TestDataExpireHive.java rename to ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestDataExpireHive.java index a0b84a0fcb..9be98bf14c 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestDataExpireHive.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestDataExpireHive.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.server.table.executor; +package com.netease.arctic.server.optimizing.maintainer; import com.netease.arctic.TableTestHelper; import com.netease.arctic.ams.api.TableFormat; diff --git a/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestDataExpireIceberg.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestDataExpireIceberg.java similarity index 98% rename from ams/server/src/test/java/com/netease/arctic/server/table/executor/TestDataExpireIceberg.java rename to ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestDataExpireIceberg.java index 759a6fdde4..6517d843f0 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestDataExpireIceberg.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestDataExpireIceberg.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.server.table.executor; +package com.netease.arctic.server.optimizing.maintainer; import static com.netease.arctic.BasicTableTestHelper.SPEC; diff --git a/core/src/main/java/com/netease/arctic/table/TableProperties.java b/core/src/main/java/com/netease/arctic/table/TableProperties.java index 4668f83f52..cef4fc2fa3 100644 --- a/core/src/main/java/com/netease/arctic/table/TableProperties.java +++ b/core/src/main/java/com/netease/arctic/table/TableProperties.java @@ -158,6 +158,8 @@ private TableProperties() {} "data-expire.datetime-number-format"; public static final String DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT = "TIMESTAMP_MS"; public static final String DATA_EXPIRATION_RETENTION_TIME = "data-expire.retention-time"; + public static final String DATA_EXPIRATION_SINCE = "data-expire.since"; + public static final String DATA_EXPIRATION_SINCE_DEFAULT = "LATEST_SNAPSHOT"; public static final String ENABLE_DANGLING_DELETE_FILES_CLEAN = "clean-dangling-delete-files.enabled"; diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index 4645592d77..ed51491fe4 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -47,20 +47,21 @@ Self-optimizing configurations are applicable to both Iceberg Format and Mixed s Data-cleaning configurations are applicable to both Iceberg Format and Mixed streaming Format. -| Key | Default | Description | -|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------| -| table-expire.enabled | true | Enables periodically expire table | -| change.data.ttl.minutes | 10080(7 days) | Time to live in minutes for data of ChangeStore | -| snapshot.base.keep.minutes | 720(12 hours) | Table-Expiration keeps the latest snapshots of BaseStore within a specified time in minutes | -| clean-orphan-file.enabled | false | Enables periodically clean orphan files | -| clean-orphan-file.min-existing-time-minutes | 2880(2 days) | Cleaning orphan files keeps the files modified within a specified time in minutes | -| clean-dangling-delete-files.enabled | true | Whether to enable cleaning of dangling delete files | -| data-expire.enabled | false | Whether to enable data expiration | -| data-expire.level | partition | Level of data expiration. Including partition and file | -| data-expire.field | NULL | Field used to determine data expiration, supporting timestamp/timestampz/long type and string type field in date format | -| data-expire.datetime-string-pattern | yyyy-MM-dd | Pattern used for matching string datetime | -| data-expire.datetime-number-format | TIMESTAMP_MS | Timestamp unit for long field. Including TIMESTAMP_MS and TIMESTAMP_S | -| data-expire.retention-time | NULL | Retention period for data expiration. For example, 1d means retaining data for 1 day. Other supported units include h (hour), min (minute), s (second), ms (millisecond), etc. | +| Key | Default | Description | +|---------------------------------------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| table-expire.enabled | true | Enables periodically expire table | +| change.data.ttl.minutes | 10080(7 days) | Time to live in minutes for data of ChangeStore | +| snapshot.base.keep.minutes | 720(12 hours) | Table-Expiration keeps the latest snapshots of BaseStore within a specified time in minutes | +| clean-orphan-file.enabled | false | Enables periodically clean orphan files | +| clean-orphan-file.min-existing-time-minutes | 2880(2 days) | Cleaning orphan files keeps the files modified within a specified time in minutes | +| clean-dangling-delete-files.enabled | true | Whether to enable cleaning of dangling delete files | +| data-expire.enabled | false | Whether to enable data expiration | +| data-expire.level | partition | Level of data expiration. Including partition and file | +| data-expire.field | NULL | Field used to determine data expiration, supporting timestamp/timestampz/long type and string type field in date format | +| data-expire.datetime-string-pattern | yyyy-MM-dd | Pattern used for matching string datetime | +| data-expire.datetime-number-format | TIMESTAMP_MS | Timestamp unit for long field. Including TIMESTAMP_MS and TIMESTAMP_S | +| data-expire.retention-time | NULL | Retention period for data expiration. For example, 1d means retaining data for 1 day. Other supported units include h (hour), min (minute), s (second), ms (millisecond), etc. | +| data-expire.since | LATEST_SNAPSHOT | A event to indicate when start expire data. Including LATEST_SNAPSHOT and CURRENT_TIMESTAMP. LATEST_SNAPSHOT uses the timestamp of latest **non-optimized** snapshot as the start of the expiration, which ensures that the table has retention-time data | ## Tags configurations diff --git a/docs/user-guides/using-tables.md b/docs/user-guides/using-tables.md index 9bb21a0fc5..110cbdc8fb 100644 --- a/docs/user-guides/using-tables.md +++ b/docs/user-guides/using-tables.md @@ -155,7 +155,7 @@ ALTER TABLE test_db.test_log_store set tblproperties ( 'data-expire.enabled' = 'true'); ``` -### Set the data retention period +### Set retention period The configuration for data retention duration consists of a number and a unit. For example, '90d' represents retaining data for 90 days, and '12h' indicates 12 hours. @@ -164,7 +164,7 @@ ALTER TABLE test_db.test_log_store set tblproperties ( 'data-expire.retention-time' = '90d'); ``` -### Select the event-time field +### Select expiration field Data expiration requires users to specify a field for determining expiration. In addition to supporting timestampz/timestamp field types for this purpose, it also supports string and long field type. @@ -186,7 +186,7 @@ ALTER TABLE test_db.test_log_store set tblproperties ( 'data-expire.datetime-number-format' = 'TIMESTAMP_MS'); ``` -### Adjust the data expiration level +### Adjust expiration level Data expiration supports two levels, including `PARTITION` and `FILE`. The default level is `PARTITION`, which means that AMS deletes files only when all the files within a partition have expired. @@ -195,6 +195,15 @@ ALTER TABLE test_db.test_log_store set tblproperties ( 'data-expire.level' = 'partition'); ``` +### Specify start time + +Amoro expire data since `CURRENT_SNAPSHOT` or `CURRENT_TIMESTAMP`. `CURRENT_SNAPSHOT` will follow the timestamp of the table's most recent snapshot as the start time of the expiration, which ensures that the table has `data-expire.retention-time` data; while `CURRENT_TIMESTAMP` will follow the current time of the service. + +```sql +ALTER TABLE test_db.test_log_store set tblproperties ( + 'data-expire.since' = 'current_timestamp'); +``` + ## Delete table After logging into the AMS Dashboard. To modify a table, enter the modification statement in the `terminal` and execute it.