Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2417] Support partition filter on optimizing plan #2436

Merged
merged 12 commits into from
Dec 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import com.netease.arctic.server.optimizing.OptimizingType;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.utils.ArcticTableUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.BinPacking;
import org.apache.iceberg.util.Pair;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -41,7 +45,7 @@

public abstract class AbstractPartitionPlan implements PartitionEvaluator {

protected final String partition;
protected final Pair<Integer, StructLike> partition;
protected final OptimizingConfig config;
protected final TableRuntime tableRuntime;
private CommonPartitionEvaluator evaluator;
Expand Down Expand Up @@ -69,7 +73,10 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator {
protected final Set<String> reservedDeleteFiles = Sets.newHashSet();

public AbstractPartitionPlan(
TableRuntime tableRuntime, ArcticTable table, String partition, long planTime) {
TableRuntime tableRuntime,
ArcticTable table,
Pair<Integer, StructLike> partition,
long planTime) {
this.partition = partition;
this.tableObject = table;
this.config = tableRuntime.getOptimizingConfig();
Expand All @@ -78,7 +85,7 @@ public AbstractPartitionPlan(
}

@Override
public String getPartition() {
public Pair<Integer, StructLike> getPartition() {
return partition;
}

Expand Down Expand Up @@ -280,8 +287,14 @@ public TaskDescriptor buildTask(OptimizingInputProperties properties) {
readOnlyDeleteFiles.toArray(new ContentFile[0]),
rewriteDeleteFiles.toArray(new ContentFile[0]),
tableObject);
PartitionSpec spec =
ArcticTableUtil.getArcticTablePartitionSpecById(tableObject, partition.first());
String partitionPath = spec.partitionToPath(partition.second());
return new TaskDescriptor(
tableRuntime.getTableIdentifier().getId(), partition, input, properties.getProperties());
tableRuntime.getTableIdentifier().getId(),
partitionPath,
input,
properties.getProperties());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,7 +41,7 @@ public class CommonPartitionEvaluator implements PartitionEvaluator {
private final Set<String> deleteFileSet = Sets.newHashSet();
protected final TableRuntime tableRuntime;

private final String partition;
private final Pair<Integer, StructLike> partition;
protected final OptimizingConfig config;
protected final long fragmentSize;
protected final long minTargetSize;
Expand Down Expand Up @@ -72,7 +74,8 @@ public class CommonPartitionEvaluator implements PartitionEvaluator {
private OptimizingType optimizingType = null;
private String name;

public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, long planTime) {
public CommonPartitionEvaluator(
TableRuntime tableRuntime, Pair<Integer, StructLike> partition, long planTime) {
this.partition = partition;
this.tableRuntime = tableRuntime;
this.config = tableRuntime.getOptimizingConfig();
Expand All @@ -91,7 +94,7 @@ public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, lon
}

@Override
public String getPartition() {
public Pair<Integer, StructLike> getPartition() {
return partition;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@
import com.netease.arctic.optimizing.OptimizingInputProperties;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.table.ArcticTable;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.util.Pair;

import java.util.List;
import java.util.stream.Collectors;

public class IcebergPartitionPlan extends AbstractPartitionPlan {

protected IcebergPartitionPlan(
TableRuntime tableRuntime, ArcticTable table, String partition, long planTime) {
TableRuntime tableRuntime,
ArcticTable table,
Pair<Integer, StructLike> partition,
long planTime) {
super(tableRuntime, table, partition, planTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.netease.arctic.table.ArcticTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.Pair;

import java.util.List;
import java.util.Map;
Expand All @@ -40,7 +42,7 @@ public class MixedHivePartitionPlan extends MixedIcebergPartitionPlan {
public MixedHivePartitionPlan(
TableRuntime tableRuntime,
ArcticTable table,
String partition,
Pair<Integer, StructLike> partition,
String hiveLocation,
long planTime) {
super(tableRuntime, table, partition, planTime);
Expand Down Expand Up @@ -120,7 +122,7 @@ protected static class MixedHivePartitionEvaluator extends MixedIcebergPartition

public MixedHivePartitionEvaluator(
TableRuntime tableRuntime,
String partition,
Pair<Integer, StructLike> partition,
Map<String, String> partitionProperties,
String hiveLocation,
long planTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;

import javax.annotation.Nonnull;

Expand All @@ -47,9 +49,12 @@ public class MixedIcebergPartitionPlan extends AbstractPartitionPlan {
protected final Map<String, String> partitionProperties;

public MixedIcebergPartitionPlan(
TableRuntime tableRuntime, ArcticTable table, String partition, long planTime) {
TableRuntime tableRuntime,
ArcticTable table,
Pair<Integer, StructLike> partition,
long planTime) {
super(tableRuntime, table, partition, planTime);
this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, partition);
this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, partition.second());
}

@Override
Expand Down Expand Up @@ -106,7 +111,7 @@ protected static class MixedIcebergPartitionEvaluator extends CommonPartitionEva

public MixedIcebergPartitionEvaluator(
TableRuntime tableRuntime,
String partition,
Pair<Integer, StructLike> partition,
Map<String, String> partitionProperties,
long planTime,
boolean keyedTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.server.optimizing.plan;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.hive.table.SupportHive;
import com.netease.arctic.server.optimizing.scan.IcebergTableFileScanHelper;
Expand All @@ -29,13 +30,17 @@
import com.netease.arctic.server.table.TableSnapshot;
import com.netease.arctic.server.utils.IcebergTableUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.utils.ArcticTableUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -98,8 +103,8 @@ protected void initEvaluator() {
System.currentTimeMillis() - startTime);
}

protected TableFileScanHelper.PartitionFilter getPartitionFilter() {
return null;
protected Expression getPartitionFilter() {
return Expressions.alwaysTrue();
}

private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
Expand All @@ -108,17 +113,15 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
try (CloseableIterable<TableFileScanHelper.FileScanResult> results =
tableFileScanHelper.scan()) {
for (TableFileScanHelper.FileScanResult fileScanResult : results) {
PartitionSpec partitionSpec;
if (arcticTable.format() == TableFormat.ICEBERG) {
partitionSpec = arcticTable.asUnkeyedTable().specs().get(fileScanResult.file().specId());
} else {
partitionSpec = arcticTable.spec();
}

PartitionSpec partitionSpec =
ArcticTableUtil.getArcticTablePartitionSpecById(
arcticTable, fileScanResult.file().specId());
StructLike partition = fileScanResult.file().partition();
String partitionPath = partitionSpec.partitionToPath(partition);
PartitionEvaluator evaluator =
partitionPlanMap.computeIfAbsent(partitionPath, this::buildEvaluator);
partitionPlanMap.computeIfAbsent(
partitionPath,
ignore -> buildEvaluator(Pair.of(partitionSpec.specId(), partition)));
evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles());
count++;
}
Expand All @@ -133,28 +136,28 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
partitionPlanMap.values().removeIf(plan -> !plan.isNecessary());
}

private Map<String, String> partitionProperties(String partitionPath) {
return TablePropertyUtil.getPartitionProperties(arcticTable, partitionPath);
private Map<String, String> partitionProperties(Pair<Integer, StructLike> partition) {
return TablePropertyUtil.getPartitionProperties(arcticTable, partition.second());
}

protected PartitionEvaluator buildEvaluator(String partitionPath) {
protected PartitionEvaluator buildEvaluator(Pair<Integer, StructLike> partition) {
if (TableFormat.ICEBERG == arcticTable.format()) {
return new CommonPartitionEvaluator(tableRuntime, partitionPath, System.currentTimeMillis());
return new CommonPartitionEvaluator(tableRuntime, partition, System.currentTimeMillis());
} else {
Map<String, String> partitionProperties = partitionProperties(partitionPath);
Map<String, String> partitionProperties = partitionProperties(partition);
if (com.netease.arctic.hive.utils.TableTypeUtil.isHive(arcticTable)) {
String hiveLocation = (((SupportHive) arcticTable).hiveLocation());
return new MixedHivePartitionPlan.MixedHivePartitionEvaluator(
tableRuntime,
partitionPath,
partition,
partitionProperties,
hiveLocation,
System.currentTimeMillis(),
arcticTable.isKeyedTable());
} else {
return new MixedIcebergPartitionPlan.MixedIcebergPartitionEvaluator(
tableRuntime,
partitionPath,
partition,
partitionProperties,
System.currentTimeMillis(),
arcticTable.isKeyedTable());
Expand All @@ -178,7 +181,7 @@ public PendingInput getPendingInput() {

public static class PendingInput {

private final Set<String> partitions = Sets.newHashSet();
@JsonIgnore private final Map<Integer, Set<StructLike>> partitions = Maps.newHashMap();

private int dataFileCount = 0;
private long dataFileSize = 0;
Expand All @@ -191,7 +194,9 @@ public PendingInput() {}

public PendingInput(Collection<PartitionEvaluator> evaluators) {
for (PartitionEvaluator evaluator : evaluators) {
partitions.add(evaluator.getPartition());
partitions
.computeIfAbsent(evaluator.getPartition().first(), ignore -> Sets.newHashSet())
.add(evaluator.getPartition().second());
dataFileCount += evaluator.getFragmentFileCount() + evaluator.getSegmentFileCount();
dataFileSize += evaluator.getFragmentFileSize() + evaluator.getSegmentFileSize();
positionalDeleteBytes += evaluator.getPosDeleteFileSize();
Expand All @@ -201,7 +206,7 @@ public PendingInput(Collection<PartitionEvaluator> evaluators) {
}
}

public Set<String> getPartitions() {
public Map<Integer, Set<StructLike>> getPartitions() {
return partitions;
}

Expand Down
Loading