Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1960] Refactor data expiration to TableMaintainer #2328

Merged
merged 33 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f1ad2f8
refator
XBaith Nov 14, 2023
b1213d2
Merge branch 'master' into refactor-expire
XBaith Nov 16, 2023
f5af522
move expiration to table maintainer
XBaith Nov 16, 2023
d9b612e
Merge branch 'master' into refactor-expire
XBaith Nov 20, 2023
02c7266
move expiration to table maintainer
XBaith Nov 20, 2023
4ab33a7
move expiration to table maintainer
XBaith Nov 20, 2023
a7b992d
Merge branch 'master' into refactor-expire
XBaith Nov 20, 2023
adc5a99
optimize code
XBaith Nov 20, 2023
1518247
Merge branch 'master' into refactor-expire
XBaith Nov 20, 2023
0d53589
move unit test
XBaith Nov 20, 2023
307429f
Merge branch 'master' into refactor-expire
XBaith Nov 21, 2023
2512c30
Merge branch 'master' into refactor-expire
XBaith Nov 21, 2023
5de31e7
optimize memo
XBaith Nov 22, 2023
cc0f0cf
remove MaintainStrategy
XBaith Nov 22, 2023
2a7b5a5
spotless
XBaith Nov 22, 2023
1d6e5a7
clean code
XBaith Nov 22, 2023
7af5b8d
review
XBaith Nov 23, 2023
4f01400
Merge branch 'master' into refactor-expire
XBaith Nov 23, 2023
0cf5447
spotless
XBaith Nov 23, 2023
f147fd7
fix
XBaith Nov 23, 2023
2d8efdb
fix
XBaith Nov 23, 2023
9b7870e
review
XBaith Nov 23, 2023
557af3a
Merge branch 'master' into refactor-expire
XBaith Nov 23, 2023
32cf15e
Merge branch 'master' into refactor-expire
XBaith Nov 23, 2023
7d383be
expire since latest_snapshot or current_timestamp
XBaith Nov 24, 2023
fc24e28
Merge branch 'master' into refactor-expire
XBaith Nov 26, 2023
00e548d
Merge branch 'master' into refactor-expire
XBaith Nov 27, 2023
a0f76ca
spotless
XBaith Nov 27, 2023
33c63cb
Merge branch 'master' into refactor-expire
XBaith Nov 28, 2023
3979da8
Merge branch 'master' into refactor-expire
XBaith Nov 29, 2023
8eed903
review
XBaith Nov 29, 2023
fe4de4a
Merge branch 'master' into refactor-expire
XBaith Nov 29, 2023
50e3da0
Merge branch 'master' into refactor-expire
XBaith Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package com.netease.arctic.server.optimizing.maintainer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.netease.arctic.IcebergFileEntry;
import com.netease.arctic.data.FileNameRules;
import com.netease.arctic.hive.utils.TableTypeUtil;
import com.netease.arctic.scan.TableEntriesScan;
import com.netease.arctic.server.table.DataExpirationConfig;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.server.utils.HiveLocationUtil;
import com.netease.arctic.server.utils.IcebergTableUtil;
Expand All @@ -36,24 +39,35 @@
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
import java.util.stream.Collectors;

/** Table maintainer for mixed-iceberg and mixed-hive tables. */
Expand Down Expand Up @@ -120,18 +134,152 @@
baseMaintainer.expireSnapshots(tableRuntime);
}

@Override
public void autoCreateTags(TableRuntime tableRuntime) {
throw new UnsupportedOperationException("Mixed table doesn't support auto create tags");
}

protected void expireSnapshots(long mustOlderThan) {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots(mustOlderThan);
}
baseMaintainer.expireSnapshots(mustOlderThan);
}

@Override
public void expireData(TableRuntime tableRuntime) {
try {
DataExpirationConfig expirationConfig =
tableRuntime.getTableConfiguration().getExpiringDataConfig();
Types.NestedField field =
arcticTable.schema().findField(expirationConfig.getExpirationField());

Check warning on line 150 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L147-L150

Added lines #L147 - L150 were not covered by tests
if (!expirationConfig.isValid(field, arcticTable.name())) {
return;

Check warning on line 152 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L152

Added line #L152 was not covered by tests
}
ZoneId defaultZone = IcebergTableMaintainer.getDefaultZoneId(field);

Check warning on line 154 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L154

Added line #L154 was not covered by tests
Instant startInstant;
if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) {
startInstant = Instant.now().atZone(defaultZone).toInstant();

Check warning on line 157 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L157

Added line #L157 was not covered by tests
} else {
long latestBaseTs =
IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(baseMaintainer.getTable());

Check warning on line 160 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L159-L160

Added lines #L159 - L160 were not covered by tests
long latestChangeTs =
changeMaintainer == null
? Long.MAX_VALUE
: IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(
changeMaintainer.getTable());
long latestNonOptimizedTs = Longs.min(latestChangeTs, latestBaseTs);

Check warning on line 166 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L164-L166

Added lines #L164 - L166 were not covered by tests

startInstant = Instant.ofEpochMilli(latestNonOptimizedTs).atZone(defaultZone).toInstant();

Check warning on line 168 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L168

Added line #L168 was not covered by tests
}
expireDataFrom(expirationConfig, startInstant);
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

Check warning on line 174 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L170-L174

Added lines #L170 - L174 were not covered by tests

@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
Types.NestedField field = arcticTable.schema().findField(expirationConfig.getExpirationField());
LOG.info(
"Expiring data older than {} in mixed table {} ",
Instant.ofEpochMilli(expireTimestamp)
.atZone(IcebergTableMaintainer.getDefaultZoneId(field))
.toLocalDateTime(),
arcticTable.name());

Expression dataFilter =
IcebergTableMaintainer.getDataExpression(
arcticTable.schema(), expirationConfig, expireTimestamp);

Pair<IcebergTableMaintainer.ExpireFiles, IcebergTableMaintainer.ExpireFiles> mixedExpiredFiles =
mixedExpiredFileScan(expirationConfig, dataFilter, expireTimestamp);

expireMixedFiles(mixedExpiredFiles.getLeft(), mixedExpiredFiles.getRight(), expireTimestamp);
}

private Pair<IcebergTableMaintainer.ExpireFiles, IcebergTableMaintainer.ExpireFiles>
mixedExpiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
return arcticTable.isKeyedTable()
? keyedExpiredFileScan(expirationConfig, dataFilter, expireTimestamp)
: Pair.of(
null,
getBaseMaintainer().expiredFileScan(expirationConfig, dataFilter, expireTimestamp));
}

private Pair<IcebergTableMaintainer.ExpireFiles, IcebergTableMaintainer.ExpireFiles>
keyedExpiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
Map<StructLike, IcebergTableMaintainer.DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();

KeyedTable keyedTable = arcticTable.asKeyedTable();
ChangeTable changeTable = keyedTable.changeTable();
BaseTable baseTable = keyedTable.baseTable();

CloseableIterable<MixedFileEntry> changeEntries =
CloseableIterable.transform(
changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), true));
CloseableIterable<MixedFileEntry> baseEntries =
CloseableIterable.transform(
baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), false));
IcebergTableMaintainer.ExpireFiles changeExpiredFiles =
new IcebergTableMaintainer.ExpireFiles();
IcebergTableMaintainer.ExpireFiles baseExpiredFiles = new IcebergTableMaintainer.ExpireFiles();

try (CloseableIterable<MixedFileEntry> entries =
CloseableIterable.withNoopClose(
com.google.common.collect.Iterables.concat(changeEntries, baseEntries))) {
Queue<MixedFileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
if (IcebergTableMaintainer.mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
});
fileEntries
.parallelStream()
.filter(
e -> IcebergTableMaintainer.willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(
e -> {
if (e.isChange()) {
changeExpiredFiles.addFile(e);
} else {
baseExpiredFiles.addFile(e);
}
});
} catch (IOException e) {
throw new RuntimeException(e);

Check warning on line 252 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L251-L252

Added lines #L251 - L252 were not covered by tests
}

return Pair.of(changeExpiredFiles, baseExpiredFiles);
}

private void expireMixedFiles(
IcebergTableMaintainer.ExpireFiles changeFiles,
IcebergTableMaintainer.ExpireFiles baseFiles,
long expireTimestamp) {
Optional.ofNullable(changeMaintainer)
XBaith marked this conversation as resolved.
Show resolved Hide resolved
.ifPresent(
c ->
c.expireFiles(
IcebergTableUtil.getSnapshotId(changeMaintainer.getTable(), false),
changeFiles,
expireTimestamp));
Optional.ofNullable(baseMaintainer)
.ifPresent(
c ->
c.expireFiles(
IcebergTableUtil.getSnapshotId(baseMaintainer.getTable(), false),
baseFiles,
expireTimestamp));
}

@Override
public void autoCreateTags(TableRuntime tableRuntime) {
throw new UnsupportedOperationException("Mixed table doesn't support auto create tags");

Check warning on line 280 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L280

Added line #L280 was not covered by tests
}

protected void cleanContentFiles(long lastTime) {
if (changeMaintainer != null) {
changeMaintainer.cleanContentFiles(lastTime);
Expand Down Expand Up @@ -341,4 +489,18 @@
return Sets.union(changeFiles, hiveFiles);
}
}

public static class MixedFileEntry extends IcebergTableMaintainer.FileEntry {

private final boolean isChange;

MixedFileEntry(ContentFile<?> file, Literal<Long> tsBound, boolean isChange) {
super(file, tsBound);
this.isChange = isChange;
}

public boolean isChange() {
return isChange;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public interface TableMaintainer {
*/
void expireSnapshots(TableRuntime tableRuntime);

/**
* Expire historical data based on the expiration field, and data that exceeds the retention
* period will be purged
*
* @param tableRuntime TableRuntime
*/
void expireData(TableRuntime tableRuntime);

/** Auto create tags for table. */
void autoCreateTags(TableRuntime tableRuntime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;
import java.util.Map;
Expand All @@ -31,6 +33,8 @@
private String dateTimePattern;
// data-expire.datetime-number-format
private String numberDateFormat;
// data-expire.since
private Since since;

@VisibleForTesting
public enum ExpireLevel {
Expand All @@ -47,9 +51,27 @@
}
}

@VisibleForTesting
public enum Since {
LATEST_SNAPSHOT,
CURRENT_TIMESTAMP;

public static Since fromString(String since) {
Preconditions.checkArgument(null != since, "data-expire.since is invalid: null");
try {
return Since.valueOf(since.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format("Unable to expire data since: %s", since), e);

Check warning on line 65 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L63-L65

Added lines #L63 - L65 were not covered by tests
}
}
}

public static final Set<Type.TypeID> FIELD_TYPES =
Sets.newHashSet(Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG);

private static final Logger LOG = LoggerFactory.getLogger(DataExpirationConfig.class);

public DataExpirationConfig() {}

public DataExpirationConfig(
Expand All @@ -58,13 +80,15 @@
ExpireLevel expirationLevel,
long retentionTime,
String dateTimePattern,
String numberDateFormat) {
String numberDateFormat,
Since since) {

Check warning on line 84 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L84

Added line #L84 was not covered by tests
this.enabled = enabled;
this.expirationField = expirationField;
this.expirationLevel = expirationLevel;
this.retentionTime = retentionTime;
this.dateTimePattern = dateTimePattern;
this.numberDateFormat = numberDateFormat;
this.since = since;

Check warning on line 91 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L91

Added line #L91 was not covered by tests
}

public DataExpirationConfig(ArcticTable table) {
Expand Down Expand Up @@ -109,6 +133,12 @@
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT);
since =
Since.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT));
}

public static DataExpirationConfig parse(Map<String, String> properties) {
Expand Down Expand Up @@ -137,7 +167,13 @@
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT));
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT))
.setSince(
Since.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT)));
String retention =
CompatiblePropertyUtil.propertyAsString(
properties, TableProperties.DATA_EXPIRATION_RETENTION_TIME, null);
Expand Down Expand Up @@ -202,6 +238,15 @@
return this;
}

public Since getSince() {
return since;
}

public DataExpirationConfig setSince(Since since) {
this.since = since;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -216,7 +261,8 @@
&& Objects.equal(expirationField, config.expirationField)
&& expirationLevel == config.expirationLevel
&& Objects.equal(dateTimePattern, config.dateTimePattern)
&& Objects.equal(numberDateFormat, config.numberDateFormat);
&& Objects.equal(numberDateFormat, config.numberDateFormat)
&& since == config.since;
}

@Override
Expand All @@ -227,6 +273,38 @@
expirationLevel,
retentionTime,
dateTimePattern,
numberDateFormat);
numberDateFormat,
since);
}

public boolean isValid(Types.NestedField field, String name) {
return isEnabled()
&& getRetentionTime() > 0
&& validateExpirationField(field, name, getExpirationField());
}

private boolean validateExpirationField(
Types.NestedField field, String name, String expirationField) {
if (StringUtils.isBlank(expirationField) || null == field) {
LOG.warn(
String.format(

Check warning on line 290 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L289-L290

Added lines #L289 - L290 were not covered by tests
"Field(%s) used to determine data expiration is illegal for table(%s)",
expirationField, name));
return false;

Check warning on line 293 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L293

Added line #L293 was not covered by tests
}
Type.TypeID typeID = field.type().typeId();

Check warning on line 295 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L295

Added line #L295 was not covered by tests
if (!DataExpirationConfig.FIELD_TYPES.contains(typeID)) {
LOG.warn(
String.format(

Check warning on line 298 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L297-L298

Added lines #L297 - L298 were not covered by tests
"Table(%s) field(%s) type(%s) is not supported for data expiration, please use the "
+ "following types: %s",
name,
expirationField,
typeID.name(),
StringUtils.join(DataExpirationConfig.FIELD_TYPES, ", ")));
return false;

Check warning on line 305 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L303-L305

Added lines #L303 - L305 were not covered by tests
}

return true;

Check warning on line 308 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L308

Added line #L308 was not covered by tests
}
}
Loading