Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1960] Refactor data expiration to TableMaintainer #2328

Merged
merged 33 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f1ad2f8
refator
XBaith Nov 14, 2023
b1213d2
Merge branch 'master' into refactor-expire
XBaith Nov 16, 2023
f5af522
move expiration to table maintainer
XBaith Nov 16, 2023
d9b612e
Merge branch 'master' into refactor-expire
XBaith Nov 20, 2023
02c7266
move expiration to table maintainer
XBaith Nov 20, 2023
4ab33a7
move expiration to table maintainer
XBaith Nov 20, 2023
a7b992d
Merge branch 'master' into refactor-expire
XBaith Nov 20, 2023
adc5a99
optimize code
XBaith Nov 20, 2023
1518247
Merge branch 'master' into refactor-expire
XBaith Nov 20, 2023
0d53589
move unit test
XBaith Nov 20, 2023
307429f
Merge branch 'master' into refactor-expire
XBaith Nov 21, 2023
2512c30
Merge branch 'master' into refactor-expire
XBaith Nov 21, 2023
5de31e7
optimize memo
XBaith Nov 22, 2023
cc0f0cf
remove MaintainStrategy
XBaith Nov 22, 2023
2a7b5a5
spotless
XBaith Nov 22, 2023
1d6e5a7
clean code
XBaith Nov 22, 2023
7af5b8d
review
XBaith Nov 23, 2023
4f01400
Merge branch 'master' into refactor-expire
XBaith Nov 23, 2023
0cf5447
spotless
XBaith Nov 23, 2023
f147fd7
fix
XBaith Nov 23, 2023
2d8efdb
fix
XBaith Nov 23, 2023
9b7870e
review
XBaith Nov 23, 2023
557af3a
Merge branch 'master' into refactor-expire
XBaith Nov 23, 2023
32cf15e
Merge branch 'master' into refactor-expire
XBaith Nov 23, 2023
7d383be
expire since latest_snapshot or current_timestamp
XBaith Nov 24, 2023
fc24e28
Merge branch 'master' into refactor-expire
XBaith Nov 26, 2023
00e548d
Merge branch 'master' into refactor-expire
XBaith Nov 27, 2023
a0f76ca
spotless
XBaith Nov 27, 2023
33c63cb
Merge branch 'master' into refactor-expire
XBaith Nov 28, 2023
3979da8
Merge branch 'master' into refactor-expire
XBaith Nov 29, 2023
8eed903
review
XBaith Nov 29, 2023
fe4de4a
Merge branch 'master' into refactor-expire
XBaith Nov 29, 2023
50e3da0
Merge branch 'master' into refactor-expire
XBaith Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package com.netease.arctic.server.optimizing.maintainer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.netease.arctic.IcebergFileEntry;
import com.netease.arctic.data.FileNameRules;
import com.netease.arctic.hive.utils.TableTypeUtil;
import com.netease.arctic.scan.TableEntriesScan;
import com.netease.arctic.server.table.DataExpirationConfig;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.server.utils.HiveLocationUtil;
import com.netease.arctic.server.utils.IcebergTableUtil;
Expand All @@ -36,24 +39,35 @@
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
import java.util.stream.Collectors;

/** Table maintainer for mixed-iceberg and mixed-hive tables. */
Expand Down Expand Up @@ -120,18 +134,152 @@ public void expireSnapshots(TableRuntime tableRuntime) {
baseMaintainer.expireSnapshots(tableRuntime);
}

@Override
public void autoCreateTags(TableRuntime tableRuntime) {
throw new UnsupportedOperationException("Mixed table doesn't support auto create tags");
}

protected void expireSnapshots(long mustOlderThan) {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots(mustOlderThan);
}
baseMaintainer.expireSnapshots(mustOlderThan);
}

@Override
public void expireData(TableRuntime tableRuntime) {
try {
DataExpirationConfig expirationConfig =
tableRuntime.getTableConfiguration().getExpiringDataConfig();
Types.NestedField field =
arcticTable.schema().findField(expirationConfig.getExpirationField());
if (!expirationConfig.isValid(field, arcticTable.name())) {
return;
}
ZoneId defaultZone = IcebergTableMaintainer.getDefaultZoneId(field);
Instant startInstant;
if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) {
startInstant = Instant.now().atZone(defaultZone).toInstant();
} else {
long latestBaseTs =
IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(baseMaintainer.getTable());
long latestChangeTs =
changeMaintainer == null
? Long.MAX_VALUE
: IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(
changeMaintainer.getTable());
long latestNonOptimizedTs = Longs.min(latestChangeTs, latestBaseTs);

startInstant = Instant.ofEpochMilli(latestNonOptimizedTs).atZone(defaultZone).toInstant();
}
expireDataFrom(expirationConfig, startInstant);
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
Types.NestedField field = arcticTable.schema().findField(expirationConfig.getExpirationField());
LOG.info(
"Expiring data older than {} in mixed table {} ",
Instant.ofEpochMilli(expireTimestamp)
.atZone(IcebergTableMaintainer.getDefaultZoneId(field))
.toLocalDateTime(),
arcticTable.name());

Expression dataFilter =
IcebergTableMaintainer.getDataExpression(
arcticTable.schema(), expirationConfig, expireTimestamp);

Pair<IcebergTableMaintainer.ExpireFiles, IcebergTableMaintainer.ExpireFiles> mixedExpiredFiles =
mixedExpiredFileScan(expirationConfig, dataFilter, expireTimestamp);

expireMixedFiles(mixedExpiredFiles.getLeft(), mixedExpiredFiles.getRight(), expireTimestamp);
}

private Pair<IcebergTableMaintainer.ExpireFiles, IcebergTableMaintainer.ExpireFiles>
mixedExpiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
return arcticTable.isKeyedTable()
? keyedExpiredFileScan(expirationConfig, dataFilter, expireTimestamp)
: Pair.of(
null,
getBaseMaintainer().expiredFileScan(expirationConfig, dataFilter, expireTimestamp));
}

private Pair<IcebergTableMaintainer.ExpireFiles, IcebergTableMaintainer.ExpireFiles>
keyedExpiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
Map<StructLike, IcebergTableMaintainer.DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();

KeyedTable keyedTable = arcticTable.asKeyedTable();
ChangeTable changeTable = keyedTable.changeTable();
BaseTable baseTable = keyedTable.baseTable();

CloseableIterable<MixedFileEntry> changeEntries =
CloseableIterable.transform(
changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), true));
CloseableIterable<MixedFileEntry> baseEntries =
CloseableIterable.transform(
baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), false));
IcebergTableMaintainer.ExpireFiles changeExpiredFiles =
new IcebergTableMaintainer.ExpireFiles();
IcebergTableMaintainer.ExpireFiles baseExpiredFiles = new IcebergTableMaintainer.ExpireFiles();

try (CloseableIterable<MixedFileEntry> entries =
CloseableIterable.withNoopClose(
com.google.common.collect.Iterables.concat(changeEntries, baseEntries))) {
Queue<MixedFileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
if (IcebergTableMaintainer.mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
});
fileEntries
.parallelStream()
.filter(
e -> IcebergTableMaintainer.willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(
e -> {
if (e.isChange()) {
changeExpiredFiles.addFile(e);
} else {
baseExpiredFiles.addFile(e);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}

return Pair.of(changeExpiredFiles, baseExpiredFiles);
}

private void expireMixedFiles(
IcebergTableMaintainer.ExpireFiles changeFiles,
IcebergTableMaintainer.ExpireFiles baseFiles,
long expireTimestamp) {
Optional.ofNullable(changeMaintainer)
XBaith marked this conversation as resolved.
Show resolved Hide resolved
.ifPresent(
c ->
c.expireFiles(
IcebergTableUtil.getSnapshotId(changeMaintainer.getTable(), false),
changeFiles,
expireTimestamp));
Optional.ofNullable(baseMaintainer)
.ifPresent(
c ->
c.expireFiles(
IcebergTableUtil.getSnapshotId(baseMaintainer.getTable(), false),
baseFiles,
expireTimestamp));
}

@Override
public void autoCreateTags(TableRuntime tableRuntime) {
throw new UnsupportedOperationException("Mixed table doesn't support auto create tags");
}

protected void cleanContentFiles(long lastTime) {
if (changeMaintainer != null) {
changeMaintainer.cleanContentFiles(lastTime);
Expand Down Expand Up @@ -341,4 +489,18 @@ protected Set<String> expireSnapshotNeedToExcludeFiles() {
return Sets.union(changeFiles, hiveFiles);
}
}

public static class MixedFileEntry extends IcebergTableMaintainer.FileEntry {

private final boolean isChange;

MixedFileEntry(ContentFile<?> file, Literal<Long> tsBound, boolean isChange) {
super(file, tsBound);
this.isChange = isChange;
}

public boolean isChange() {
return isChange;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public interface TableMaintainer {
*/
void expireSnapshots(TableRuntime tableRuntime);

/**
* Expire historical data based on the expiration field, and data that exceeds the retention
* period will be purged
*
* @param tableRuntime TableRuntime
*/
void expireData(TableRuntime tableRuntime);

/** Auto create tags for table. */
void autoCreateTags(TableRuntime tableRuntime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;
import java.util.Map;
Expand All @@ -31,6 +33,8 @@ public class DataExpirationConfig {
private String dateTimePattern;
// data-expire.datetime-number-format
private String numberDateFormat;
// data-expire.since
private Since since;

@VisibleForTesting
public enum ExpireLevel {
Expand All @@ -47,9 +51,27 @@ public static ExpireLevel fromString(String level) {
}
}

@VisibleForTesting
public enum Since {
LATEST_SNAPSHOT,
CURRENT_TIMESTAMP;

public static Since fromString(String since) {
Preconditions.checkArgument(null != since, "data-expire.since is invalid: null");
try {
return Since.valueOf(since.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format("Unable to expire data since: %s", since), e);
}
}
}

public static final Set<Type.TypeID> FIELD_TYPES =
Sets.newHashSet(Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG);

private static final Logger LOG = LoggerFactory.getLogger(DataExpirationConfig.class);

public DataExpirationConfig() {}

public DataExpirationConfig(
Expand All @@ -58,13 +80,15 @@ public DataExpirationConfig(
ExpireLevel expirationLevel,
long retentionTime,
String dateTimePattern,
String numberDateFormat) {
String numberDateFormat,
Since since) {
this.enabled = enabled;
this.expirationField = expirationField;
this.expirationLevel = expirationLevel;
this.retentionTime = retentionTime;
this.dateTimePattern = dateTimePattern;
this.numberDateFormat = numberDateFormat;
this.since = since;
}

public DataExpirationConfig(ArcticTable table) {
Expand Down Expand Up @@ -109,6 +133,12 @@ public DataExpirationConfig(ArcticTable table) {
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT);
since =
Since.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT));
}

public static DataExpirationConfig parse(Map<String, String> properties) {
Expand Down Expand Up @@ -137,7 +167,13 @@ public static DataExpirationConfig parse(Map<String, String> properties) {
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT));
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT))
.setSince(
Since.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT)));
String retention =
CompatiblePropertyUtil.propertyAsString(
properties, TableProperties.DATA_EXPIRATION_RETENTION_TIME, null);
Expand Down Expand Up @@ -202,6 +238,15 @@ public DataExpirationConfig setNumberDateFormat(String numberDateFormat) {
return this;
}

public Since getSince() {
return since;
}

public DataExpirationConfig setSince(Since since) {
this.since = since;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -216,7 +261,8 @@ public boolean equals(Object o) {
&& Objects.equal(expirationField, config.expirationField)
&& expirationLevel == config.expirationLevel
&& Objects.equal(dateTimePattern, config.dateTimePattern)
&& Objects.equal(numberDateFormat, config.numberDateFormat);
&& Objects.equal(numberDateFormat, config.numberDateFormat)
&& since == config.since;
}

@Override
Expand All @@ -227,6 +273,38 @@ public int hashCode() {
expirationLevel,
retentionTime,
dateTimePattern,
numberDateFormat);
numberDateFormat,
since);
}

public boolean isValid(Types.NestedField field, String name) {
return isEnabled()
&& getRetentionTime() > 0
&& validateExpirationField(field, name, getExpirationField());
}

private boolean validateExpirationField(
Types.NestedField field, String name, String expirationField) {
if (StringUtils.isBlank(expirationField) || null == field) {
LOG.warn(
String.format(
"Field(%s) used to determine data expiration is illegal for table(%s)",
expirationField, name));
return false;
}
Type.TypeID typeID = field.type().typeId();
if (!DataExpirationConfig.FIELD_TYPES.contains(typeID)) {
LOG.warn(
String.format(
"Table(%s) field(%s) type(%s) is not supported for data expiration, please use the "
+ "following types: %s",
name,
expirationField,
typeID.name(),
StringUtils.join(DataExpirationConfig.FIELD_TYPES, ", ")));
return false;
}

return true;
}
}
Loading
Loading