diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index fd28f2de6e..376e22ec71 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -24,12 +24,16 @@ import com.netease.arctic.server.optimizing.OptimizingType; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.utils.ArcticTableUtil; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.BinPacking; +import org.apache.iceberg.util.Pair; import java.util.ArrayList; import java.util.Collection; @@ -41,7 +45,7 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator { - protected final String partition; + protected final Pair partition; protected final OptimizingConfig config; protected final TableRuntime tableRuntime; private CommonPartitionEvaluator evaluator; @@ -69,7 +73,10 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator { protected final Set reservedDeleteFiles = Sets.newHashSet(); public AbstractPartitionPlan( - TableRuntime tableRuntime, ArcticTable table, String partition, long planTime) { + TableRuntime tableRuntime, + ArcticTable table, + Pair partition, + long planTime) { this.partition = partition; this.tableObject = table; this.config = tableRuntime.getOptimizingConfig(); @@ -78,7 +85,7 @@ public AbstractPartitionPlan( } @Override - public String getPartition() { + public Pair getPartition() { return partition; } @@ -280,8 +287,14 @@ public TaskDescriptor buildTask(OptimizingInputProperties properties) { readOnlyDeleteFiles.toArray(new ContentFile[0]), rewriteDeleteFiles.toArray(new ContentFile[0]), tableObject); + PartitionSpec spec = + ArcticTableUtil.getArcticTablePartitionSpecById(tableObject, partition.first()); + String partitionPath = spec.partitionToPath(partition.second()); return new TaskDescriptor( - tableRuntime.getTableIdentifier().getId(), partition, input, properties.getProperties()); + tableRuntime.getTableIdentifier().getId(), + partitionPath, + input, + properties.getProperties()); } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java index 023f9b819d..68fd4db67e 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java @@ -24,9 +24,11 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +41,7 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { private final Set deleteFileSet = Sets.newHashSet(); protected final TableRuntime tableRuntime; - private final String partition; + private final Pair partition; protected final OptimizingConfig config; protected final long fragmentSize; protected final long minTargetSize; @@ -72,7 +74,8 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { private OptimizingType optimizingType = null; private String name; - public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, long planTime) { + public CommonPartitionEvaluator( + TableRuntime tableRuntime, Pair partition, long planTime) { this.partition = partition; this.tableRuntime = tableRuntime; this.config = tableRuntime.getOptimizingConfig(); @@ -91,7 +94,7 @@ public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, lon } @Override - public String getPartition() { + public Pair getPartition() { return partition; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java index e701e6f25d..4d9db0f284 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java @@ -22,6 +22,8 @@ import com.netease.arctic.optimizing.OptimizingInputProperties; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.table.ArcticTable; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.util.Pair; import java.util.List; import java.util.stream.Collectors; @@ -29,7 +31,10 @@ public class IcebergPartitionPlan extends AbstractPartitionPlan { protected IcebergPartitionPlan( - TableRuntime tableRuntime, ArcticTable table, String partition, long planTime) { + TableRuntime tableRuntime, + ArcticTable table, + Pair partition, + long planTime) { super(tableRuntime, table, partition, planTime); } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index 7ea5c67ec8..24edbfc686 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -27,7 +27,9 @@ import com.netease.arctic.table.ArcticTable; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; import java.util.List; import java.util.Map; @@ -40,7 +42,7 @@ public class MixedHivePartitionPlan extends MixedIcebergPartitionPlan { public MixedHivePartitionPlan( TableRuntime tableRuntime, ArcticTable table, - String partition, + Pair partition, String hiveLocation, long planTime) { super(tableRuntime, table, partition, planTime); @@ -120,7 +122,7 @@ protected static class MixedHivePartitionEvaluator extends MixedIcebergPartition public MixedHivePartitionEvaluator( TableRuntime tableRuntime, - String partition, + Pair partition, Map partitionProperties, String hiveLocation, long planTime, diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java index 0dc68ed3e9..5c58c6bdbf 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -30,9 +30,11 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Pair; import javax.annotation.Nonnull; @@ -47,9 +49,12 @@ public class MixedIcebergPartitionPlan extends AbstractPartitionPlan { protected final Map partitionProperties; public MixedIcebergPartitionPlan( - TableRuntime tableRuntime, ArcticTable table, String partition, long planTime) { + TableRuntime tableRuntime, + ArcticTable table, + Pair partition, + long planTime) { super(tableRuntime, table, partition, planTime); - this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, partition); + this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, partition.second()); } @Override @@ -106,7 +111,7 @@ protected static class MixedIcebergPartitionEvaluator extends CommonPartitionEva public MixedIcebergPartitionEvaluator( TableRuntime tableRuntime, - String partition, + Pair partition, Map partitionProperties, long planTime, boolean keyedTable) { diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java index 406d190826..52797e5ef8 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java @@ -18,6 +18,7 @@ package com.netease.arctic.server.optimizing.plan; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.hive.table.SupportHive; import com.netease.arctic.server.optimizing.scan.IcebergTableFileScanHelper; @@ -29,13 +30,17 @@ import com.netease.arctic.server.table.TableSnapshot; import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.utils.ArcticTableUtil; import com.netease.arctic.utils.TablePropertyUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,8 +103,8 @@ protected void initEvaluator() { System.currentTimeMillis() - startTime); } - protected TableFileScanHelper.PartitionFilter getPartitionFilter() { - return null; + protected Expression getPartitionFilter() { + return Expressions.alwaysTrue(); } private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { @@ -108,17 +113,15 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { try (CloseableIterable results = tableFileScanHelper.scan()) { for (TableFileScanHelper.FileScanResult fileScanResult : results) { - PartitionSpec partitionSpec; - if (arcticTable.format() == TableFormat.ICEBERG) { - partitionSpec = arcticTable.asUnkeyedTable().specs().get(fileScanResult.file().specId()); - } else { - partitionSpec = arcticTable.spec(); - } - + PartitionSpec partitionSpec = + ArcticTableUtil.getArcticTablePartitionSpecById( + arcticTable, fileScanResult.file().specId()); StructLike partition = fileScanResult.file().partition(); String partitionPath = partitionSpec.partitionToPath(partition); PartitionEvaluator evaluator = - partitionPlanMap.computeIfAbsent(partitionPath, this::buildEvaluator); + partitionPlanMap.computeIfAbsent( + partitionPath, + ignore -> buildEvaluator(Pair.of(partitionSpec.specId(), partition))); evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles()); count++; } @@ -133,20 +136,20 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { partitionPlanMap.values().removeIf(plan -> !plan.isNecessary()); } - private Map partitionProperties(String partitionPath) { - return TablePropertyUtil.getPartitionProperties(arcticTable, partitionPath); + private Map partitionProperties(Pair partition) { + return TablePropertyUtil.getPartitionProperties(arcticTable, partition.second()); } - protected PartitionEvaluator buildEvaluator(String partitionPath) { + protected PartitionEvaluator buildEvaluator(Pair partition) { if (TableFormat.ICEBERG == arcticTable.format()) { - return new CommonPartitionEvaluator(tableRuntime, partitionPath, System.currentTimeMillis()); + return new CommonPartitionEvaluator(tableRuntime, partition, System.currentTimeMillis()); } else { - Map partitionProperties = partitionProperties(partitionPath); + Map partitionProperties = partitionProperties(partition); if (com.netease.arctic.hive.utils.TableTypeUtil.isHive(arcticTable)) { String hiveLocation = (((SupportHive) arcticTable).hiveLocation()); return new MixedHivePartitionPlan.MixedHivePartitionEvaluator( tableRuntime, - partitionPath, + partition, partitionProperties, hiveLocation, System.currentTimeMillis(), @@ -154,7 +157,7 @@ protected PartitionEvaluator buildEvaluator(String partitionPath) { } else { return new MixedIcebergPartitionPlan.MixedIcebergPartitionEvaluator( tableRuntime, - partitionPath, + partition, partitionProperties, System.currentTimeMillis(), arcticTable.isKeyedTable()); @@ -178,7 +181,7 @@ public PendingInput getPendingInput() { public static class PendingInput { - private final Set partitions = Sets.newHashSet(); + @JsonIgnore private final Map> partitions = Maps.newHashMap(); private int dataFileCount = 0; private long dataFileSize = 0; @@ -191,7 +194,9 @@ public PendingInput() {} public PendingInput(Collection evaluators) { for (PartitionEvaluator evaluator : evaluators) { - partitions.add(evaluator.getPartition()); + partitions + .computeIfAbsent(evaluator.getPartition().first(), ignore -> Sets.newHashSet()) + .add(evaluator.getPartition().second()); dataFileCount += evaluator.getFragmentFileCount() + evaluator.getSegmentFileCount(); dataFileSize += evaluator.getFragmentFileSize() + evaluator.getSegmentFileSize(); positionalDeleteBytes += evaluator.getPosDeleteFileSize(); @@ -201,7 +206,7 @@ public PendingInput(Collection evaluators) { } } - public Set getPartitions() { + public Map> getPartitions() { return partitions; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java index 7bdd1f323c..c89d0feaec 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java @@ -22,11 +22,17 @@ import com.netease.arctic.hive.table.SupportHive; import com.netease.arctic.server.ArcticServiceConstants; import com.netease.arctic.server.optimizing.OptimizingType; -import com.netease.arctic.server.optimizing.scan.TableFileScanHelper; import com.netease.arctic.server.table.KeyedTableSnapshot; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.utils.ArcticTableUtil; +import com.netease.arctic.utils.ExpressionUtil; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +46,7 @@ public class OptimizingPlanner extends OptimizingEvaluator { private static final Logger LOG = LoggerFactory.getLogger(OptimizingPlanner.class); - private final TableFileScanHelper.PartitionFilter partitionFilter; + private final Expression partitionFilter; protected long processId; private final double availableCore; @@ -60,8 +66,14 @@ public OptimizingPlanner( super(tableRuntime, table); this.partitionFilter = tableRuntime.getPendingInput() == null - ? null - : tableRuntime.getPendingInput().getPartitions()::contains; + ? Expressions.alwaysTrue() + : tableRuntime.getPendingInput().getPartitions().entrySet().stream() + .map( + entry -> + ExpressionUtil.convertPartitionDataToDataFilter( + table, entry.getKey(), entry.getValue())) + .reduce(Expressions::or) + .orElse(Expressions.alwaysTrue()); this.availableCore = availableCore; this.planTime = System.currentTimeMillis(); this.processId = Math.max(tableRuntime.getNewestProcessId() + 1, planTime); @@ -70,8 +82,8 @@ public OptimizingPlanner( } @Override - protected PartitionEvaluator buildEvaluator(String partitionPath) { - return partitionPlannerFactory.buildPartitionPlanner(partitionPath); + protected PartitionEvaluator buildEvaluator(Pair partition) { + return partitionPlannerFactory.buildPartitionPlanner(partition); } public Map getFromSequence() { @@ -79,7 +91,14 @@ public Map getFromSequence() { .filter(p -> p.getFromSequence() != null) .collect( Collectors.toMap( - AbstractPartitionPlan::getPartition, AbstractPartitionPlan::getFromSequence)); + partitionPlan -> { + Pair partition = partitionPlan.getPartition(); + PartitionSpec spec = + ArcticTableUtil.getArcticTablePartitionSpecById( + arcticTable, partition.first()); + return spec.partitionToPath(partition.second()); + }, + AbstractPartitionPlan::getFromSequence)); } public Map getToSequence() { @@ -87,11 +106,18 @@ public Map getToSequence() { .filter(p -> p.getToSequence() != null) .collect( Collectors.toMap( - AbstractPartitionPlan::getPartition, AbstractPartitionPlan::getToSequence)); + partitionPlan -> { + Pair partition = partitionPlan.getPartition(); + PartitionSpec spec = + ArcticTableUtil.getArcticTablePartitionSpecById( + arcticTable, partition.first()); + return spec.partitionToPath(partition.second()); + }, + AbstractPartitionPlan::getToSequence)); } @Override - protected TableFileScanHelper.PartitionFilter getPartitionFilter() { + protected Expression getPartitionFilter() { return partitionFilter; } @@ -207,15 +233,15 @@ public PartitionPlannerFactory( } } - public PartitionEvaluator buildPartitionPlanner(String partitionPath) { + public PartitionEvaluator buildPartitionPlanner(Pair partition) { if (TableFormat.ICEBERG == arcticTable.format()) { - return new IcebergPartitionPlan(tableRuntime, arcticTable, partitionPath, planTime); + return new IcebergPartitionPlan(tableRuntime, arcticTable, partition, planTime); } else { if (com.netease.arctic.hive.utils.TableTypeUtil.isHive(arcticTable)) { return new MixedHivePartitionPlan( - tableRuntime, arcticTable, partitionPath, hiveLocation, planTime); + tableRuntime, arcticTable, partition, hiveLocation, planTime); } else { - return new MixedIcebergPartitionPlan(tableRuntime, arcticTable, partitionPath, planTime); + return new MixedIcebergPartitionPlan(tableRuntime, arcticTable, partition, planTime); } } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java index 2f4ee9ea61..46056e3b0b 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java @@ -21,6 +21,9 @@ import com.netease.arctic.server.optimizing.OptimizingType; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.util.Pair; import java.util.List; @@ -34,11 +37,12 @@ public interface PartitionEvaluator { interface Weight extends Comparable {} /** - * Get the partition name. + * Get the partition represented by a Pair of {@link PartitionSpec#specId()} and partition {@link + * StructLike}. * - * @return the partition name + * @return the Pair of the partition spec id and the partition */ - String getPartition(); + Pair getPartition(); /** * Add a Data file and its related Delete files to this evaluator diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/IcebergTableFileScanHelper.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/IcebergTableFileScanHelper.java index 41e38c56a9..a444a78ac2 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/IcebergTableFileScanHelper.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/IcebergTableFileScanHelper.java @@ -20,17 +20,15 @@ import com.netease.arctic.server.ArcticServiceConstants; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import java.util.Map; - public class IcebergTableFileScanHelper implements TableFileScanHelper { private final Table table; - private PartitionFilter partitionFilter; + private Expression partitionFilter = Expressions.alwaysTrue(); private final long snapshotId; public IcebergTableFileScanHelper(Table table, long snapshotId) { @@ -43,19 +41,8 @@ public CloseableIterable scan() { if (snapshotId == ArcticServiceConstants.INVALID_SNAPSHOT_ID) { return CloseableIterable.empty(); } - Map specs = table.specs(); return CloseableIterable.transform( - CloseableIterable.filter( - table.newScan().useSnapshot(snapshotId).planFiles(), - fileScanTask -> { - if (partitionFilter != null) { - StructLike partition = fileScanTask.file().partition(); - String partitionPath = - specs.get(fileScanTask.file().specId()).partitionToPath(partition); - return partitionFilter.test(partitionPath); - } - return true; - }), + table.newScan().useSnapshot(snapshotId).filter(partitionFilter).planFiles(), this::buildFileScanResult); } @@ -64,7 +51,7 @@ protected FileScanResult buildFileScanResult(FileScanTask fileScanTask) { } @Override - public TableFileScanHelper withPartitionFilter(PartitionFilter partitionFilter) { + public TableFileScanHelper withPartitionFilter(Expression partitionFilter) { this.partitionFilter = partitionFilter; return this; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/KeyedTableFileScanHelper.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/KeyedTableFileScanHelper.java index a33521d466..0eead65dd6 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/KeyedTableFileScanHelper.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/KeyedTableFileScanHelper.java @@ -37,7 +37,8 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; -import org.apache.iceberg.StructLike; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -65,7 +66,7 @@ public class KeyedTableFileScanHelper implements TableFileScanHelper { private final KeyedTable arcticTable; private final long changeSnapshotId; private final long baseSnapshotId; - private PartitionFilter partitionFilter; + private Expression partitionFilter = Expressions.alwaysTrue(); public KeyedTableFileScanHelper(KeyedTable arcticTable, KeyedTableSnapshot snapshot) { this.arcticTable = arcticTable; @@ -171,6 +172,7 @@ public CloseableIterable scan() { ChangeTableIncrementalScan changeTableIncrementalScan = changeTable .newScan() + .filter(partitionFilter) .fromSequence(optimizedSequence) .toSequence(maxSequence) .useSnapshot(changeSnapshotId); @@ -187,7 +189,6 @@ public CloseableIterable scan() { CloseableIterable.withNoopClose( changeFiles .allInsertFiles() - .filter(insertFile -> filterFilePartition(partitionSpec, insertFile)) .map( insertFile -> { List> relatedDeleteFiles = @@ -200,12 +201,9 @@ public CloseableIterable scan() { CloseableIterable baseScanResult = CloseableIterable.empty(); if (baseSnapshotId != ArcticServiceConstants.INVALID_SNAPSHOT_ID) { - PartitionSpec partitionSpec = baseTable.spec(); baseScanResult = CloseableIterable.transform( - CloseableIterable.filter( - baseTable.newScan().useSnapshot(baseSnapshotId).planFiles(), - fileScanTask -> filterFilePartition(partitionSpec, fileScanTask.file())), + baseTable.newScan().filter(partitionFilter).useSnapshot(baseSnapshotId).planFiles(), fileScanTask -> { DataFile dataFile = wrapBaseFile(fileScanTask.file()); List> deleteFiles = new ArrayList<>(fileScanTask.deletes()); @@ -220,7 +218,7 @@ public CloseableIterable scan() { } @Override - public KeyedTableFileScanHelper withPartitionFilter(PartitionFilter partitionFilter) { + public KeyedTableFileScanHelper withPartitionFilter(Expression partitionFilter) { this.partitionFilter = partitionFilter; return this; } @@ -233,16 +231,6 @@ private DataFile wrapBaseFile(DataFile dataFile) { return DefaultKeyedFile.parseBase(dataFile); } - private boolean filterFilePartition(PartitionSpec partitionSpec, ContentFile file) { - if (partitionFilter != null) { - StructLike partition = file.partition(); - String partitionPath = partitionSpec.partitionToPath(partition); - return partitionFilter.test(partitionPath); - } else { - return true; - } - } - private long getMaxSequenceLimit( KeyedTable arcticTable, long changeSnapshotId, @@ -265,6 +253,7 @@ private long getMaxSequenceLimit( ChangeTableIncrementalScan changeTableIncrementalScan = changeTable .newScan() + .filter(partitionFilter) .fromSequence(partitionOptimizedSequence) .useSnapshot(changeSnapshot.snapshotId()); Map changeFilesGroupBySequence = new HashMap<>(); diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/TableFileScanHelper.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/TableFileScanHelper.java index d101d4afba..2e2f4acef6 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/TableFileScanHelper.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/scan/TableFileScanHelper.java @@ -20,6 +20,7 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import java.util.List; @@ -43,17 +44,7 @@ public List> deleteFiles() { } } - interface PartitionFilter { - /** - * If we should keep or skip this partition - * - * @param partition - - * @return true for keep this partition - */ - boolean test(String partition); - } - CloseableIterable scan(); - TableFileScanHelper withPartitionFilter(PartitionFilter partitionFilter); + TableFileScanHelper withPartitionFilter(Expression partitionFilter); } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java index 95bb2f93f6..8d9f5a2ccd 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java @@ -37,14 +37,18 @@ import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; +import com.netease.arctic.utils.ArcticDataFiles; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.StructLike; import org.apache.iceberg.Transaction; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.StructLikeMap; import org.junit.Assert; @@ -497,7 +501,7 @@ protected void assertTask( List rePosDeletedDataFiles, List> readOnlyDeleteFiles, List> rewrittenDeleteFiles) { - Assert.assertEquals(actual.getPartition(), getPartition()); + Assert.assertEquals(actual.getPartition(), getPartitionPath()); assertFiles(rewrittenDeleteFiles, actual.getInput().rewrittenDeleteFiles()); assertFiles(rewrittenDataFiles, actual.getInput().rewrittenDataFiles()); assertFiles(readOnlyDeleteFiles, actual.getInput().readOnlyDeleteFiles()); @@ -538,7 +542,15 @@ protected AbstractPartitionPlan getAndCheckPartitionPlan() { protected abstract TableFileScanHelper getTableFileScanHelper(); - protected String getPartition() { + protected Pair getPartition() { + return isPartitionedTable() + ? Pair.of( + getArcticTable().spec().specId(), + ArcticDataFiles.data(getArcticTable().spec(), "op_time_day=2022-01-01")) + : Pair.of(getArcticTable().spec().specId(), new PartitionData(Types.StructType.of())); + } + + protected String getPartitionPath() { return isPartitionedTable() ? "op_time_day=2022-01-01" : ""; } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestOptimizingEvaluator.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestOptimizingEvaluator.java index 3829013a1f..4a9cdee5f9 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestOptimizingEvaluator.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestOptimizingEvaluator.java @@ -28,9 +28,10 @@ import com.netease.arctic.server.optimizing.scan.TableFileScanHelper; import com.netease.arctic.server.optimizing.scan.UnkeyedTableFileScanHelper; import org.apache.iceberg.DataFile; -import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; @@ -38,6 +39,7 @@ import org.junit.runners.Parameterized; import java.util.List; +import java.util.Map; import java.util.Set; @RunWith(Parameterized.class) @@ -104,7 +106,7 @@ public void testFragmentFiles() { Assert.assertTrue(optimizingEvaluator.isNecessary()); pendingInput = optimizingEvaluator.getPendingInput(); - assertInput(pendingInput, FileInfo.buildFileInfo(getArcticTable().spec(), dataFiles)); + assertInput(pendingInput, FileInfo.buildFileInfo(dataFiles)); } protected OptimizingEvaluator buildOptimizingEvaluator() { @@ -133,7 +135,7 @@ protected void assertInput(OptimizingEvaluator.PendingInput input, FileInfo file } private static class FileInfo { - private final Set partitions = Sets.newHashSet(); + private final Map> partitions = Maps.newHashMap(); private int dataFileCount = 0; private long dataFileSize = 0; private final int equalityDeleteFileCount = 0; @@ -141,17 +143,20 @@ private static class FileInfo { private final long positionalDeleteBytes = 0L; private final long equalityDeleteBytes = 0L; - public static FileInfo buildFileInfo(PartitionSpec spec, List dataFiles) { + public static FileInfo buildFileInfo(List dataFiles) { FileInfo fileInfo = new FileInfo(); for (DataFile dataFile : dataFiles) { fileInfo.dataFileCount++; fileInfo.dataFileSize += dataFile.fileSizeInBytes(); - fileInfo.partitions.add(spec.partitionToPath(dataFile.partition())); + fileInfo + .partitions + .computeIfAbsent(dataFile.specId(), ignore -> Sets.newHashSet()) + .add(dataFile.partition()); } return fileInfo; } - public Set getPartitions() { + public Map> getPartitions() { return partitions; } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/scan/TestKeyedTableFileScanHelper.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/scan/TestKeyedTableFileScanHelper.java index 4daa7a629a..23b5e4a290 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/scan/TestKeyedTableFileScanHelper.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/scan/TestKeyedTableFileScanHelper.java @@ -30,6 +30,7 @@ import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableProperties; +import com.netease.arctic.utils.ExpressionUtil; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -125,15 +126,19 @@ public void testScanChangeAndBase() { tableTestHelper().generateTestRecord(4, "444", 0, "2022-01-02T12:00:00")); long transactionId = getArcticTable().beginTransaction(""); - OptimizingTestHelpers.appendBase( - getArcticTable(), - tableTestHelper().writeBaseStore(getArcticTable(), transactionId, newRecords, false)); + List dataFiles = + OptimizingTestHelpers.appendBase( + getArcticTable(), + tableTestHelper().writeBaseStore(getArcticTable(), transactionId, newRecords, false)); + // partition field = "2022-01-01T12:00:00" + DataFile sampleFile = dataFiles.get(0); transactionId = getArcticTable().beginTransaction(""); - appendChange( + List dataFiles1 = tableTestHelper() .writeChangeStore( - getArcticTable(), transactionId, ChangeAction.DELETE, newRecords, false)); + getArcticTable(), transactionId, ChangeAction.DELETE, newRecords, false); + appendChange(dataFiles1); newRecords = Lists.newArrayList( @@ -156,7 +161,9 @@ public void testScanChangeAndBase() { scan = scanFiles( buildFileScanHelper() - .withPartitionFilter(partition -> getPartition().equals(partition))); + .withPartitionFilter( + ExpressionUtil.convertPartitionDataToDataFilter( + getArcticTable(), sampleFile.specId(), sampleFile.partition()))); if (isPartitionedTable()) { assertScanResult(scan, 4, 1); } else { diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/scan/TestUnkeyedTableFileScanHelper.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/scan/TestUnkeyedTableFileScanHelper.java index 832bd50056..346cbce9cf 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/scan/TestUnkeyedTableFileScanHelper.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/scan/TestUnkeyedTableFileScanHelper.java @@ -27,6 +27,7 @@ import com.netease.arctic.server.optimizing.OptimizingTestHelpers; import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.UnkeyedTable; +import com.netease.arctic.utils.ExpressionUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.data.Record; @@ -83,9 +84,13 @@ public void testScan() { tableTestHelper().generateTestRecord(2, "222", 0, "2022-01-01T12:00:00"), tableTestHelper().generateTestRecord(3, "333", 0, "2022-01-02T12:00:00"), tableTestHelper().generateTestRecord(4, "444", 0, "2022-01-02T12:00:00")); - OptimizingTestHelpers.appendBase( - getArcticTable(), - tableTestHelper().writeBaseStore(getArcticTable(), 0L, newRecords, false)); + List dataFiles = + OptimizingTestHelpers.appendBase( + getArcticTable(), + tableTestHelper().writeBaseStore(getArcticTable(), 0L, newRecords, false)); + // partition field = "2022-01-01T12:00:00" + DataFile sampleFile = dataFiles.get(0); + OptimizingTestHelpers.appendBase( getArcticTable(), tableTestHelper().writeBaseStore(getArcticTable(), 0L, newRecords, false)); @@ -102,7 +107,9 @@ public void testScan() { scan = scanFiles( buildFileScanHelper() - .withPartitionFilter(partition -> getPartition().equals(partition))); + .withPartitionFilter( + ExpressionUtil.convertPartitionDataToDataFilter( + getArcticTable(), sampleFile.specId(), sampleFile.partition()))); assertScanResult(scan, 2, null, 0); } diff --git a/core/src/main/java/com/netease/arctic/utils/ArcticTableUtil.java b/core/src/main/java/com/netease/arctic/utils/ArcticTableUtil.java index 3a998c079f..ff7ca520a8 100644 --- a/core/src/main/java/com/netease/arctic/utils/ArcticTableUtil.java +++ b/core/src/main/java/com/netease/arctic/utils/ArcticTableUtil.java @@ -23,6 +23,7 @@ import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; @@ -172,4 +173,21 @@ private static StructLikeMap readLegacyPartitionProperties( throw new IllegalArgumentException("Unknown type: " + type); } } + + /** + * Return the {@link PartitionSpec} of the arctic table by {@link PartitionSpec#specId()}, Mix + * format table will return directly after checking}. + */ + public static PartitionSpec getArcticTablePartitionSpecById(ArcticTable arcticTable, int specId) { + if (arcticTable.format() == TableFormat.ICEBERG) { + return arcticTable.asUnkeyedTable().specs().get(specId); + } else { + PartitionSpec spec = arcticTable.spec(); + if (spec.specId() != specId) { + throw new IllegalArgumentException( + "Partition spec id " + specId + " not found in table " + arcticTable.name()); + } + return spec; + } + } } diff --git a/core/src/main/java/com/netease/arctic/utils/ExpressionUtil.java b/core/src/main/java/com/netease/arctic/utils/ExpressionUtil.java new file mode 100644 index 0000000000..244e931e48 --- /dev/null +++ b/core/src/main/java/com/netease/arctic/utils/ExpressionUtil.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.utils; + +import com.netease.arctic.table.ArcticTable; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.UnboundTerm; +import org.apache.iceberg.types.Types; + +import java.util.Collection; + +/** Utility class for working with {@link Expression}. */ +public class ExpressionUtil { + + /** + * Convert partition data to data filter. + * + * @param table the {@link ArcticTable} table + * @param specId the partition spec id + * @param partitions the collection of partition data + * @return data filter converted from partition data + */ + public static Expression convertPartitionDataToDataFilter( + ArcticTable table, int specId, Collection partitions) { + Expression filter = Expressions.alwaysFalse(); + for (StructLike partition : partitions) { + filter = Expressions.or(filter, convertPartitionDataToDataFilter(table, specId, partition)); + } + return filter; + } + + /** + * Convert partition data to data filter. + * + * @param table the {@link ArcticTable} table + * @param specId the partition spec id + * @param partition the partition data + * @return data filter converted from partition data + */ + public static Expression convertPartitionDataToDataFilter( + ArcticTable table, int specId, StructLike partition) { + PartitionSpec spec = ArcticTableUtil.getArcticTablePartitionSpecById(table, specId); + Schema schema = table.schema(); + Expression filter = Expressions.alwaysTrue(); + for (int i = 0; i < spec.fields().size(); i++) { + PartitionField partitionField = spec.fields().get(i); + Types.NestedField sourceField = schema.findField(partitionField.sourceId()); + UnboundTerm transform = Expressions.transform(sourceField.name(), partitionField.transform()); + + Class resultType = + partitionField.transform().getResultType(sourceField.type()).typeId().javaClass(); + Object partitionValue = partition.get(i, resultType); + filter = Expressions.and(filter, Expressions.equal(transform, partitionValue)); + } + return filter; + } +} diff --git a/core/src/main/java/com/netease/arctic/utils/TablePropertyUtil.java b/core/src/main/java/com/netease/arctic/utils/TablePropertyUtil.java index 49fb92a795..2cb7eca7dd 100644 --- a/core/src/main/java/com/netease/arctic/utils/TablePropertyUtil.java +++ b/core/src/main/java/com/netease/arctic/utils/TablePropertyUtil.java @@ -100,23 +100,12 @@ public static StructLikeMap getPartitionLongProperties( } public static Map getPartitionProperties( - ArcticTable arcticTable, String partitionPath) { + ArcticTable arcticTable, StructLike partition) { return getPartitionProperties( arcticTable.isKeyedTable() ? arcticTable.asKeyedTable().baseTable() : arcticTable.asUnkeyedTable(), - partitionPath); - } - - public static Map getPartitionProperties( - UnkeyedTable unkeyedTable, String partitionPath) { - StructLike partitionData; - if (unkeyedTable.spec().isUnpartitioned()) { - partitionData = TablePropertyUtil.EMPTY_STRUCT; - } else { - partitionData = ArcticDataFiles.data(unkeyedTable.spec(), partitionPath); - } - return getPartitionProperties(unkeyedTable, partitionData); + partition); } public static Map getPartitionProperties( diff --git a/core/src/test/java/com/netease/arctic/BasicTableTestHelper.java b/core/src/test/java/com/netease/arctic/BasicTableTestHelper.java index e1b6641ddb..6a51a00a46 100644 --- a/core/src/test/java/com/netease/arctic/BasicTableTestHelper.java +++ b/core/src/test/java/com/netease/arctic/BasicTableTestHelper.java @@ -92,6 +92,14 @@ public BasicTableTestHelper(boolean hasPrimaryKey, boolean hasPartition) { this(hasPrimaryKey, hasPartition, DEFAULT_FILE_FORMAT_DEFAULT); } + public BasicTableTestHelper(boolean hasPrimaryKey, PartitionSpec partitionSpec) { + this( + TABLE_SCHEMA, + hasPrimaryKey ? PRIMARY_KEY_SPEC : PrimaryKeySpec.noPrimaryKey(), + partitionSpec, + buildTableFormat(DEFAULT_FILE_FORMAT_DEFAULT)); + } + @Override public Schema tableSchema() { return tableSchema; diff --git a/core/src/test/java/com/netease/arctic/data/TestUpsertPushDown.java b/core/src/test/java/com/netease/arctic/data/TestUpsertPushDown.java index 129aa40b60..2cc93cc6be 100644 --- a/core/src/test/java/com/netease/arctic/data/TestUpsertPushDown.java +++ b/core/src/test/java/com/netease/arctic/data/TestUpsertPushDown.java @@ -76,26 +76,41 @@ public static Object[] parameters() { @Before public void initChangeStoreData() { MixedDataTestHelpers.writeAndCommitChangeStore( - getArcticTable().asKeyedTable(), 1L, ChangeAction.DELETE, writeRecords(1, "aaa", 0, 1)); + getArcticTable().asKeyedTable(), + 1L, + ChangeAction.DELETE, + writeRecords(1, "aaa", 0, 1), + false); MixedDataTestHelpers.writeAndCommitChangeStore( getArcticTable().asKeyedTable(), 2L, ChangeAction.UPDATE_AFTER, - writeRecords(1, "aaa", 0, 1)); + writeRecords(1, "aaa", 0, 1), + false); MixedDataTestHelpers.writeAndCommitChangeStore( - getArcticTable().asKeyedTable(), 3L, ChangeAction.DELETE, writeRecords(2, "bbb", 0, 2)); + getArcticTable().asKeyedTable(), + 3L, + ChangeAction.DELETE, + writeRecords(2, "bbb", 0, 2), + false); MixedDataTestHelpers.writeAndCommitChangeStore( getArcticTable().asKeyedTable(), 3L, ChangeAction.UPDATE_AFTER, - writeRecords(2, "bbb", 0, 2)); + writeRecords(2, "bbb", 0, 2), + false); MixedDataTestHelpers.writeAndCommitChangeStore( - getArcticTable().asKeyedTable(), 4L, ChangeAction.DELETE, writeRecords(2, "ccc", 0, 2)); + getArcticTable().asKeyedTable(), + 4L, + ChangeAction.DELETE, + writeRecords(2, "ccc", 0, 2), + false); MixedDataTestHelpers.writeAndCommitChangeStore( getArcticTable().asKeyedTable(), 5L, ChangeAction.UPDATE_AFTER, - writeRecords(2, "ccc", 0, 2)); + writeRecords(2, "ccc", 0, 2), + false); } @Test diff --git a/core/src/test/java/com/netease/arctic/io/MixedDataTestHelpers.java b/core/src/test/java/com/netease/arctic/io/MixedDataTestHelpers.java index ba47e192ae..63775dc6c5 100644 --- a/core/src/test/java/com/netease/arctic/io/MixedDataTestHelpers.java +++ b/core/src/test/java/com/netease/arctic/io/MixedDataTestHelpers.java @@ -200,8 +200,12 @@ public static List writeAndCommitBaseStore( } public static List writeAndCommitChangeStore( - KeyedTable keyedTable, Long txId, ChangeAction action, List records) { - List writeFiles = writeChangeStore(keyedTable, txId, action, records, false); + KeyedTable keyedTable, + Long txId, + ChangeAction action, + List records, + boolean orderedWrite) { + List writeFiles = writeChangeStore(keyedTable, txId, action, records, orderedWrite); AppendFiles appendFiles = keyedTable.changeTable().newAppend(); writeFiles.forEach(appendFiles::appendFile); appendFiles.commit(); diff --git a/core/src/test/java/com/netease/arctic/utils/TestKeyedExpressionUtil.java b/core/src/test/java/com/netease/arctic/utils/TestKeyedExpressionUtil.java new file mode 100644 index 0000000000..a50e3084a4 --- /dev/null +++ b/core/src/test/java/com/netease/arctic/utils/TestKeyedExpressionUtil.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.utils; + +import static com.netease.arctic.BasicTableTestHelper.TABLE_SCHEMA; + +import com.netease.arctic.BasicTableTestHelper; +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.BasicCatalogTestHelper; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.data.ChangeAction; +import com.netease.arctic.io.MixedDataTestHelpers; +import com.netease.arctic.scan.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@RunWith(Parameterized.class) +public class TestKeyedExpressionUtil extends TableTestBase { + + public TestKeyedExpressionUtil(PartitionSpec partitionSpec) { + super( + new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), + new BasicTableTestHelper(true, partitionSpec)); + } + + @Parameterized.Parameters(name = "{0}") + public static Object[] parameters() { + return new Object[][] { + {PartitionSpec.builderFor(TABLE_SCHEMA).identity("op_time").build()}, + {PartitionSpec.builderFor(TABLE_SCHEMA).bucket("name", 2).build()}, + {PartitionSpec.builderFor(TABLE_SCHEMA).truncate("ts", 10).build()}, + {PartitionSpec.builderFor(TABLE_SCHEMA).year("op_time").build()}, + {PartitionSpec.builderFor(TABLE_SCHEMA).month("op_time").build()}, + {PartitionSpec.builderFor(TABLE_SCHEMA).day("op_time").build()}, + {PartitionSpec.builderFor(TABLE_SCHEMA).hour("op_time").build()}, + {PartitionSpec.unpartitioned()} + }; + } + + @Test + public void testKeyedConvertPartitionStructLikeToDataFilter() { + Assume.assumeTrue(isKeyedTable()); + ArrayList baseStoreRecords = + Lists.newArrayList( + // hash("111") = -210118348, hash("222") = -699778209 + tableTestHelper().generateTestRecord(1, "111", 1, "2021-01-01T01:00:00"), + tableTestHelper().generateTestRecord(2, "111", 1, "2021-01-01T01:00:00"), + tableTestHelper().generateTestRecord(3, "222", 11, "2022-02-02T02:00:00"), + tableTestHelper().generateTestRecord(4, "222", 11, "2022-02-02T02:00:00")); + ArrayList changeStoreRecords = + Lists.newArrayList( + tableTestHelper().generateTestRecord(5, "111", 1, "2021-01-01T01:00:00"), + tableTestHelper().generateTestRecord(6, "111", 1, "2021-01-01T01:00:00"), + tableTestHelper().generateTestRecord(7, "222", 11, "2022-02-02T02:00:00"), + tableTestHelper().generateTestRecord(8, "222", 11, "2022-02-02T02:00:00")); + // 4 files + List baseStoreFiles = + MixedDataTestHelpers.writeAndCommitBaseStore(getArcticTable(), 1L, baseStoreRecords, true); + // identity: opTime=2021-01-01T01:00:00; bucket: id_bucket=0; + // truncate: ts_trunc=0; year: op_time_year=2021; month: op_time_month=2021-01; + // day: op_time_day=2021-01-01; hour: op_time_hour=2021-01-01-01 + DataFile sampleFile = baseStoreFiles.get(0); + + MixedDataTestHelpers.writeAndCommitChangeStore( + getArcticTable().asKeyedTable(), 2L, ChangeAction.INSERT, changeStoreRecords, true); + + Expression partitionFilter = + ExpressionUtil.convertPartitionDataToDataFilter( + getArcticTable(), sampleFile.specId(), Sets.newHashSet(sampleFile.partition())); + assertPlanHalfWithPartitionFilter(partitionFilter); + } + + private void assertPlanHalfWithPartitionFilter(Expression partitionFilter) { + // plan all + Set baseDataFiles = Sets.newHashSet(); + Set insertFiles = Sets.newHashSet(); + try (CloseableIterable it = + getArcticTable().asKeyedTable().newScan().planTasks()) { + it.forEach( + cst -> + cst.tasks() + .forEach( + t -> { + t.baseTasks().forEach(fileTask -> baseDataFiles.add(fileTask.file())); + t.insertTasks().forEach(fileTask -> insertFiles.add(fileTask.file())); + })); + } catch (IOException e) { + throw new RuntimeException(e); + } + Assert.assertEquals(4, baseDataFiles.size()); + Assert.assertEquals(4, insertFiles.size()); + baseDataFiles.clear(); + insertFiles.clear(); + + // plan with partition filter + try (CloseableIterable it = + getArcticTable().asKeyedTable().newScan().filter(partitionFilter).planTasks()) { + it.forEach( + cst -> + cst.tasks() + .forEach( + t -> { + t.baseTasks().forEach(fileTask -> baseDataFiles.add(fileTask.file())); + t.insertTasks().forEach(fileTask -> insertFiles.add(fileTask.file())); + })); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (isPartitionedTable()) { + Assert.assertEquals(2, baseDataFiles.size()); + Assert.assertEquals(2, insertFiles.size()); + } else { + Assert.assertEquals(4, baseDataFiles.size()); + Assert.assertEquals(4, insertFiles.size()); + } + } +} diff --git a/core/src/test/java/com/netease/arctic/utils/TestUnkeyedExpressionUtil.java b/core/src/test/java/com/netease/arctic/utils/TestUnkeyedExpressionUtil.java new file mode 100644 index 0000000000..2aa26b1979 --- /dev/null +++ b/core/src/test/java/com/netease/arctic/utils/TestUnkeyedExpressionUtil.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.utils; + +import static com.netease.arctic.BasicTableTestHelper.TABLE_SCHEMA; + +import com.netease.arctic.BasicTableTestHelper; +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.BasicCatalogTestHelper; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.io.IcebergDataTestHelpers; +import com.netease.arctic.table.UnkeyedTable; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Set; + +@RunWith(Parameterized.class) +public class TestUnkeyedExpressionUtil extends TableTestBase { + + public TestUnkeyedExpressionUtil(TableFormat tableFormat, PartitionSpec partitionSpec) { + super(new BasicCatalogTestHelper(tableFormat), new BasicTableTestHelper(false, partitionSpec)); + } + + @Parameterized.Parameters(name = "{0}, {1}") + public static Object[][] parameters() { + return new Object[][] { + {TableFormat.ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).identity("op_time").build()}, + {TableFormat.ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).bucket("name", 2).build()}, + {TableFormat.ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).truncate("ts", 10).build()}, + {TableFormat.ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).year("op_time").build()}, + {TableFormat.ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).month("op_time").build()}, + {TableFormat.ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).day("op_time").build()}, + {TableFormat.ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).hour("op_time").build()}, + {TableFormat.ICEBERG, PartitionSpec.unpartitioned()}, + { + TableFormat.MIXED_ICEBERG, + PartitionSpec.builderFor(TABLE_SCHEMA).identity("op_time").build() + }, + {TableFormat.MIXED_ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).bucket("name", 2).build()}, + { + TableFormat.MIXED_ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).truncate("ts", 10).build() + }, + {TableFormat.MIXED_ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).year("op_time").build()}, + {TableFormat.MIXED_ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).month("op_time").build()}, + {TableFormat.MIXED_ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).day("op_time").build()}, + {TableFormat.MIXED_ICEBERG, PartitionSpec.builderFor(TABLE_SCHEMA).hour("op_time").build()}, + {TableFormat.MIXED_ICEBERG, PartitionSpec.unpartitioned()} + }; + } + + @Test + public void testUnkeyedConvertPartitionStructLikeToDataFilter() throws IOException { + Assume.assumeTrue(getArcticTable().isUnkeyedTable()); + UnkeyedTable table = getArcticTable().asUnkeyedTable(); + ArrayList records = + Lists.newArrayList( + // hash("111") = -210118348, hash("222") = -699778209 + tableTestHelper().generateTestRecord(1, "111", 1, "2021-01-01T01:00:00"), + tableTestHelper().generateTestRecord(2, "111", 1, "2021-01-01T01:00:00"), + tableTestHelper().generateTestRecord(3, "222", 11, "2022-02-02T02:00:00"), + tableTestHelper().generateTestRecord(4, "222", 11, "2022-02-02T02:00:00")); + // 2 files for partition table, 1 file for unpartition table + DataFile[] dataFiles = IcebergDataTestHelpers.insert(table, records).dataFiles(); + AppendFiles appendFiles = table.newAppend(); + for (DataFile dataFile : dataFiles) { + appendFiles.appendFile(dataFile); + } + appendFiles.commit(); + // identity: opTime=2021-01-01T01:00:00; bucket: id_bucket=0; + // truncate: ts_trunc=0; year: op_time_year=2021; month: op_time_month=2021-01; + // day: op_time_day=2021-01-01; hour: op_time_hour=2021-01-01-01 + DataFile sampleFile = dataFiles[0]; + Expression partitionFilter = + ExpressionUtil.convertPartitionDataToDataFilter( + getArcticTable(), sampleFile.specId(), Sets.newHashSet(sampleFile.partition())); + assertPlanHalfWithPartitionFilter(partitionFilter); + } + + private void assertPlanHalfWithPartitionFilter(Expression partitionFilter) { + // plan all + Set baseDataFiles = Sets.newHashSet(); + try (CloseableIterable fileScanTasks = + getArcticTable().asUnkeyedTable().newScan().planFiles()) { + for (FileScanTask fileScanTask : fileScanTasks) { + baseDataFiles.add(fileScanTask.file()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (isPartitionedTable()) { + Assert.assertEquals(2, baseDataFiles.size()); + } else { + Assert.assertEquals(1, baseDataFiles.size()); + } + baseDataFiles.clear(); + + // plan with partition filter + try (CloseableIterable fileScanTasks = + getArcticTable().asUnkeyedTable().newScan().filter(partitionFilter).planFiles()) { + for (FileScanTask fileScanTask : fileScanTasks) { + baseDataFiles.add(fileScanTask.file()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (isPartitionedTable()) { + Assert.assertEquals(1, baseDataFiles.size()); + } else { + Assert.assertEquals(1, baseDataFiles.size()); + } + } +}