Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
XBaith committed Dec 10, 2024
1 parent 204793a commit b82e209
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
Expand All @@ -65,6 +64,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ThreadPools;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -244,19 +244,14 @@ public void expireData(TableRuntime tableRuntime) {
try {
DataExpirationConfig expirationConfig =
tableRuntime.getTableConfiguration().getExpiringDataConfig();
Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField());
if (!TableConfigurations.isValidDataExpirationField(expirationConfig, field, table.name())) {
return;
}

expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig, field));
expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig));
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

protected Instant expireBaseOnRule(
DataExpirationConfig expirationConfig, Types.NestedField field) {
protected Instant expireBaseOnRule(DataExpirationConfig expirationConfig) {
switch (expirationConfig.getBaseOnRule()) {
case CURRENT_TIME:
return Instant.now();
Expand All @@ -283,22 +278,23 @@ protected Instant expireBaseOnRule(
* zone
*/
@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
public void expireDataFrom(DataExpirationConfig expirationConfig, @NotNull Instant instant) {
if (instant.equals(Instant.MIN)) {
return;
}
Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField());
if (TableConfigurations.isInvalidDataExpirationField(
expirationConfig, field, table.spec(), table.name())) {
return;
}

long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
LOG.info(
"Expiring data older than {} in table {} ",
Instant.ofEpochMilli(expireTimestamp)
.atZone(
getDefaultZoneId(table.schema().findField(expirationConfig.getExpirationField())))
.toLocalDateTime(),
Instant.ofEpochMilli(expireTimestamp).atZone(getDefaultZoneId(field)).toLocalDateTime(),
table.name());

Expression dataFilter =
getDataExpression(table.schema(), table.spec(), expirationConfig, expireTimestamp);
Expression dataFilter = getDataExpression(table.schema(), expirationConfig, expireTimestamp);

ExpireFiles expiredFiles = expiredFileScan(expirationConfig, dataFilter, expireTimestamp);
expireFiles(expiredFiles, expireTimestamp);
Expand Down Expand Up @@ -725,23 +721,12 @@ protected ExpireFiles expiredFileScan(
* filter for expired files at the scanning stage
*
* @param schema table schema
* @param spec current partition spec
* @param expirationConfig expiration configuration
* @param expireTimestamp expired timestamp
*/
protected static Expression getDataExpression(
Schema schema,
PartitionSpec spec,
DataExpirationConfig expirationConfig,
long expireTimestamp) {
Schema schema, DataExpirationConfig expirationConfig, long expireTimestamp) {
if (expirationConfig.getExpirationLevel().equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
Set<String> currentPartitionColumns =
spec.fields().stream()
.map(p -> schema.findColumnName(p.sourceId()))
.collect(Collectors.toSet());
if (!currentPartitionColumns.contains(expirationConfig.getExpirationField())) {
return Expressions.alwaysFalse();
}
return Expressions.alwaysTrue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeMap;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -121,26 +122,19 @@ public void expireData(TableRuntime tableRuntime) {
try {
DataExpirationConfig expirationConfig =
tableRuntime.getTableConfiguration().getExpiringDataConfig();
Types.NestedField field =
mixedTable.schema().findField(expirationConfig.getExpirationField());
if (!TableConfigurations.isValidDataExpirationField(
expirationConfig, field, mixedTable.name())) {
return;
}

expireDataFrom(expirationConfig, expireMixedBaseOnRule(expirationConfig, field));
expireDataFrom(expirationConfig, expireMixedBaseOnRule(expirationConfig));
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

protected Instant expireMixedBaseOnRule(
DataExpirationConfig expirationConfig, Types.NestedField field) {
protected Instant expireMixedBaseOnRule(DataExpirationConfig expirationConfig) {
Instant changeInstant =
Optional.ofNullable(changeMaintainer).isPresent()
? changeMaintainer.expireBaseOnRule(expirationConfig, field)
? changeMaintainer.expireBaseOnRule(expirationConfig)
: Instant.MIN;
Instant baseInstant = baseMaintainer.expireBaseOnRule(expirationConfig, field);
Instant baseInstant = baseMaintainer.expireBaseOnRule(expirationConfig);
if (changeInstant.compareTo(baseInstant) >= 0) {
return changeInstant;
} else {
Expand All @@ -149,13 +143,17 @@ protected Instant expireMixedBaseOnRule(
}

@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
public void expireDataFrom(DataExpirationConfig expirationConfig, @NotNull Instant instant) {
if (instant.equals(Instant.MIN)) {
return;
}
Types.NestedField field = mixedTable.schema().findField(expirationConfig.getExpirationField());
if (TableConfigurations.isInvalidDataExpirationField(
expirationConfig, field, mixedTable.spec(), mixedTable.name())) {
return;
}

long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
Types.NestedField field = mixedTable.schema().findField(expirationConfig.getExpirationField());
LOG.info(
"Expiring data older than {} in mixed table {} ",
Instant.ofEpochMilli(expireTimestamp)
Expand All @@ -165,7 +163,7 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan

Expression dataFilter =
IcebergTableMaintainer.getDataExpression(
mixedTable.schema(), mixedTable.spec(), expirationConfig, expireTimestamp);
mixedTable.schema(), expirationConfig, expireTimestamp);

Pair<IcebergTableMaintainer.ExpireFiles, IcebergTableMaintainer.ExpireFiles> mixedExpiredFiles =
mixedExpiredFileScan(expirationConfig, dataFilter, expireTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.amoro.utils.CompatiblePropertyUtil;
import org.apache.amoro.utils.PropertyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
Expand Down Expand Up @@ -168,42 +169,52 @@ private static DataExpirationConfig.BaseOnRule parseDataExpirationBaseOnRule(Str
}

/**
* Check if the given field is valid for data expiration.
* Check if the given field is invalid for data expiration.
*
* @param config data expiration config
* @param field table nested field
* @param name table name
* @return true if field is valid
* @param tableName table name
* @return true if field is invalid
*/
public static boolean isValidDataExpirationField(
DataExpirationConfig config, Types.NestedField field, String name) {
return config.isEnabled()
&& config.getRetentionTime() > 0
&& validateExpirationField(field, name, config.getExpirationField());
public static boolean isInvalidDataExpirationField(
DataExpirationConfig config, Types.NestedField field, PartitionSpec spec, String tableName) {
return !config.isEnabled()
|| config.getRetentionTime() <= 0
|| !validateExpirationField(config, field, spec, tableName);
}

public static final Set<Type.TypeID> DATA_EXPIRATION_FIELD_TYPES =
Sets.newHashSet(Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG);

private static boolean validateExpirationField(
Types.NestedField field, String name, String expirationField) {
DataExpirationConfig config, Types.NestedField field, PartitionSpec spec, String tableName) {
String expirationField = config.getExpirationField();

if (StringUtils.isBlank(expirationField) || null == field) {
LOG.warn(
String.format(
"Field(%s) used to determine data expiration is illegal for table(%s)",
expirationField, name));
"Field({}) used to determine data expiration is illegal for table({})",
expirationField,
tableName);
return false;
}
Type.TypeID typeID = field.type().typeId();
if (!DATA_EXPIRATION_FIELD_TYPES.contains(typeID)) {
LOG.warn(
String.format(
"Table(%s) field(%s) type(%s) is not supported for data expiration, please use the "
+ "following types: %s",
name,
expirationField,
typeID.name(),
StringUtils.join(DATA_EXPIRATION_FIELD_TYPES, ", ")));
"Table({}) field({}) type({}) is not supported for data expiration, please use the "
+ "following types: {}",
tableName,
expirationField,
typeID.name(),
StringUtils.join(DATA_EXPIRATION_FIELD_TYPES, ", "));
return false;
}
DataExpirationConfig.ExpireLevel level = config.getExpirationLevel();
if (level == DataExpirationConfig.ExpireLevel.PARTITION
&& spec.getFieldsBySourceId(field.fieldId()).isEmpty()) {
LOG.warn(
"Expiration field({}) must be a partition field for the table({})",
expirationField,
tableName);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,8 @@ public void testBaseOnRule() {
if (getTestFormat().equals(TableFormat.ICEBERG)) {
Table table = getMixedTable().asUnkeyedTable();
IcebergTableMaintainer icebergTableMaintainer = new IcebergTableMaintainer(table);
Types.NestedField field = table.schema().findField(config.getExpirationField());
long lastSnapshotTime = table.currentSnapshot().timestampMillis();
long lastCommitTime = icebergTableMaintainer.expireBaseOnRule(config, field).toEpochMilli();
long lastCommitTime = icebergTableMaintainer.expireBaseOnRule(config).toEpochMilli();
Assert.assertEquals(lastSnapshotTime, lastCommitTime);
} else {
MixedTable mixedTable = getMixedTable();
Expand All @@ -510,8 +509,7 @@ public void testBaseOnRule() {
} else {
lastSnapshotTime = mixedTable.asUnkeyedTable().currentSnapshot().timestampMillis();
}
long lastCommitTime =
mixedTableMaintainer.expireMixedBaseOnRule(config, field).toEpochMilli();
long lastCommitTime = mixedTableMaintainer.expireMixedBaseOnRule(config).toEpochMilli();
Assert.assertEquals(lastSnapshotTime, lastCommitTime);
}
}
Expand All @@ -524,23 +522,19 @@ protected void getMaintainerAndExpire(DataExpirationConfig config, String dateti
icebergTableMaintainer.expireDataFrom(
config,
StringUtils.isBlank(datetime)
? icebergTableMaintainer.expireBaseOnRule(config, field)
? icebergTableMaintainer.expireBaseOnRule(config)
: LocalDateTime.parse(datetime)
.atZone(
IcebergTableMaintainer.getDefaultZoneId(
getMixedTable().schema().findField(config.getExpirationField())))
.atZone(IcebergTableMaintainer.getDefaultZoneId(field))
.toInstant());
} else {
MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(getMixedTable());
Types.NestedField field = getMixedTable().schema().findField(config.getExpirationField());
mixedTableMaintainer.expireDataFrom(
config,
StringUtils.isBlank(datetime)
? mixedTableMaintainer.expireMixedBaseOnRule(config, field)
? mixedTableMaintainer.expireMixedBaseOnRule(config)
: LocalDateTime.parse(datetime)
.atZone(
IcebergTableMaintainer.getDefaultZoneId(
getMixedTable().schema().findField(config.getExpirationField())))
.atZone(IcebergTableMaintainer.getDefaultZoneId(field))
.toInstant());
}
}
Expand Down

0 comments on commit b82e209

Please sign in to comment.