diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java new file mode 100644 index 000000000000..eaef8e0bccaa --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.action; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +import java.io.IOException; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.relocated.com.google.common.io.Files; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.DataTypes; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; + +@Fork(1) +@State(Scope.Benchmark) +@Measurement(iterations = 10) +@BenchmarkMode(Mode.SingleShotTime) +@Timeout(time = 1000, timeUnit = TimeUnit.HOURS) +public class IcebergSortCompactionBenchmark { + + private static final String[] NAMESPACE = new String[] {"default"}; + private static final String NAME = "sortbench"; + private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME); + private static final int NUM_FILES = 8; + private static final long NUM_ROWS = 7500000L; + private static final long UNIQUE_VALUES = NUM_ROWS / 4; + + private final Configuration hadoopConf = initHadoopConf(); + private SparkSession spark; + + @Setup + public void setupBench() { + setupSpark(); + } + + @TearDown + public void teardownBench() { + tearDownSpark(); + } + + @Setup(Level.Iteration) + public void setupIteration() { + initTable(); + appendData(); + } + + @TearDown(Level.Iteration) + public void cleanUpIteration() throws IOException { + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void sortInt() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort( + SortOrder.builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortInt2() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort( + SortOrder.builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol2", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortInt3() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort( + SortOrder.builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol2", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol3", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol4", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortInt4() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort( + SortOrder.builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol2", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol3", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol4", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortString() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort( + SortOrder.builderFor(table().schema()) + .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortFourColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort( + SortOrder.builderFor(table().schema()) + .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("dateCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("doubleCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortSixColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .sort( + SortOrder.builderFor(table().schema()) + .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("dateCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("timestampCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("doubleCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("longCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("intCol") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt2() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("intCol", "intCol2") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt3() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("intCol", "intCol2", "intCol3") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt4() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("intCol", "intCol2", "intCol3", "intCol4") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortString() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("stringCol") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortFourColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("stringCol", "intCol", "dateCol", "doubleCol") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortSixColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .option(BinPackStrategy.REWRITE_ALL, "true") + .zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", "longCol") + .execute(); + } + + protected Configuration initHadoopConf() { + return new Configuration(); + } + + protected final void initTable() { + Schema schema = + new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "intCol2", Types.IntegerType.get()), + required(4, "intCol3", Types.IntegerType.get()), + required(5, "intCol4", Types.IntegerType.get()), + required(6, "floatCol", Types.FloatType.get()), + optional(7, "doubleCol", Types.DoubleType.get()), + optional(8, "dateCol", Types.DateType.get()), + optional(9, "timestampCol", Types.TimestampType.withZone()), + optional(10, "stringCol", Types.StringType.get())); + + SparkSessionCatalog catalog; + try { + catalog = + (SparkSessionCatalog) Spark3Util.catalogAndIdentifier(spark(), "spark_catalog").catalog(); + catalog.dropTable(IDENT); + catalog.createTable( + IDENT, SparkSchemaUtil.convert(schema), new Transform[0], Collections.emptyMap()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void appendData() { + Dataset df = + spark() + .range(0, NUM_ROWS * NUM_FILES, 1, NUM_FILES) + .drop("id") + .withColumn("longCol", new RandomGeneratingUDF(UNIQUE_VALUES).randomLongUDF().apply()) + .withColumn( + "intCol", + new RandomGeneratingUDF(UNIQUE_VALUES) + .randomLongUDF() + .apply() + .cast(DataTypes.IntegerType)) + .withColumn( + "intCol2", + new RandomGeneratingUDF(UNIQUE_VALUES) + .randomLongUDF() + .apply() + .cast(DataTypes.IntegerType)) + .withColumn( + "intCol3", + new RandomGeneratingUDF(UNIQUE_VALUES) + .randomLongUDF() + .apply() + .cast(DataTypes.IntegerType)) + .withColumn( + "intCol4", + new RandomGeneratingUDF(UNIQUE_VALUES) + .randomLongUDF() + .apply() + .cast(DataTypes.IntegerType)) + .withColumn( + "floatCol", + new RandomGeneratingUDF(UNIQUE_VALUES) + .randomLongUDF() + .apply() + .cast(DataTypes.FloatType)) + .withColumn( + "doubleCol", + new RandomGeneratingUDF(UNIQUE_VALUES) + .randomLongUDF() + .apply() + .cast(DataTypes.DoubleType)) + .withColumn("dateCol", date_add(current_date(), col("intCol").mod(NUM_FILES))) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", new RandomGeneratingUDF(UNIQUE_VALUES).randomString().apply()); + writeData(df); + } + + private void writeData(Dataset df) { + df.write().format("iceberg").mode(SaveMode.Append).save(NAME); + } + + protected final Table table() { + try { + return Spark3Util.loadIcebergTable(spark(), NAME); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected final SparkSession spark() { + return spark; + } + + protected String getCatalogWarehouse() { + String location = Files.createTempDir().getAbsolutePath() + "/" + UUID.randomUUID() + "/"; + return location; + } + + protected void cleanupFiles() throws IOException { + spark.sql("DROP TABLE IF EXISTS " + NAME); + } + + protected void setupSpark() { + SparkSession.Builder builder = + SparkSession.builder() + .config( + "spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", getCatalogWarehouse()) + .master("local[*]"); + spark = builder.getOrCreate(); + Configuration sparkHadoopConf = spark.sessionState().newHadoopConf(); + hadoopConf.forEach(entry -> sparkHadoopConf.set(entry.getKey(), entry.getValue())); + } + + protected void tearDownSpark() { + spark.stop(); + } +} diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/action/RandomGeneratingUDF.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/action/RandomGeneratingUDF.java new file mode 100644 index 000000000000..63d24f7da553 --- /dev/null +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/action/RandomGeneratingUDF.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.action; + +import static org.apache.spark.sql.functions.udf; + +import java.io.Serializable; +import java.util.Random; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.RandomUtil; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; + +class RandomGeneratingUDF implements Serializable { + private final long uniqueValues; + private Random rand = new Random(); + + RandomGeneratingUDF(long uniqueValues) { + this.uniqueValues = uniqueValues; + } + + UserDefinedFunction randomLongUDF() { + return udf(() -> rand.nextLong() % (uniqueValues / 2), DataTypes.LongType) + .asNondeterministic() + .asNonNullable(); + } + + UserDefinedFunction randomString() { + return udf( + () -> (String) RandomUtil.generatePrimitive(Types.StringType.get(), rand), + DataTypes.StringType) + .asNondeterministic() + .asNonNullable(); + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java index a12ada501796..67331e828104 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java @@ -22,6 +22,7 @@ import org.apache.iceberg.actions.BinPackStrategy; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.SortStrategy; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.spark.sql.SparkSession; public class BaseRewriteDataFilesSpark3Action extends BaseRewriteDataFilesSparkAction { @@ -40,6 +41,11 @@ protected SortStrategy sortStrategy() { return new Spark3SortStrategy(table(), spark()); } + @Override + protected SortStrategy zOrderStrategy(String... columnNames) { + return new Spark3ZOrderStrategy(table(), spark(), Lists.newArrayList(columnNames)); + } + @Override protected RewriteDataFiles self() { return this; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index 6791ef433e60..1de6d0c0bdce 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -111,6 +111,9 @@ protected Table table() { /** The framework specific {@link SortStrategy} */ protected abstract SortStrategy sortStrategy(); + /** The framework specific {@link Spark3ZOrderStrategy} */ + protected abstract SortStrategy zOrderStrategy(String... columnNames); + @Override public RewriteDataFiles binPack() { Preconditions.checkArgument( @@ -141,6 +144,16 @@ public RewriteDataFiles sort() { return this; } + @Override + public RewriteDataFiles zOrder(String... columnNames) { + Preconditions.checkArgument( + this.strategy == null, + "Cannot set strategy to zorder, it has already been set to %s", + this.strategy); + this.strategy = zOrderStrategy(columnNames); + return this; + } + @Override public RewriteDataFiles filter(Expression expression) { filter = Expressions.and(filter, expression); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java index cfa1d66729c6..e2ca6dcf9468 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java @@ -162,4 +162,16 @@ protected LogicalPlan sortPlan( Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) { return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, ordering, plan, conf); } + + protected double sizeEstimateMultiple() { + return sizeEstimateMultiple; + } + + protected FileScanTaskSetManager manager() { + return manager; + } + + protected FileRewriteCoordinator rewriteCoordinator() { + return rewriteCoordinator; + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderStrategy.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderStrategy.java new file mode 100644 index 000000000000..157b23ed1db8 --- /dev/null +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderStrategy.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteStrategy; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SortOrderUtil; +import org.apache.iceberg.util.ZOrderByteUtils; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.connector.iceberg.distributions.Distribution; +import org.apache.spark.sql.connector.iceberg.distributions.Distributions; +import org.apache.spark.sql.connector.iceberg.expressions.SortOrder; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructField; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Spark3ZOrderStrategy extends Spark3SortStrategy { + private static final Logger LOG = LoggerFactory.getLogger(Spark3ZOrderStrategy.class); + + private static final String Z_COLUMN = "ICEZVALUE"; + private static final Schema Z_SCHEMA = + new Schema(NestedField.required(0, Z_COLUMN, Types.BinaryType.get())); + private static final org.apache.iceberg.SortOrder Z_SORT_ORDER = + org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA) + .sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST) + .build(); + + /** + * Controls the amount of bytes interleaved in the ZOrder Algorithm. Default is all bytes being + * interleaved. + */ + private static final String MAX_OUTPUT_SIZE_KEY = "max-output-size"; + + private static final int DEFAULT_MAX_OUTPUT_SIZE = Integer.MAX_VALUE; + + /** + * Controls the number of bytes considered from an input column of a type with variable length + * (String, Binary). Default is to use the same size as primitives {@link + * ZOrderByteUtils#PRIMITIVE_BUFFER_SIZE} + */ + private static final String VAR_LENGTH_CONTRIBUTION_KEY = "var-length-contribution"; + + private static final int DEFAULT_VAR_LENGTH_CONTRIBUTION = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE; + + private final List zOrderColNames; + + private int maxOutputSize; + private int varLengthContribution; + + @Override + public Set validOptions() { + return ImmutableSet.builder() + .addAll(super.validOptions()) + .add(VAR_LENGTH_CONTRIBUTION_KEY) + .add(MAX_OUTPUT_SIZE_KEY) + .build(); + } + + @Override + public RewriteStrategy options(Map options) { + super.options(options); + + varLengthContribution = + PropertyUtil.propertyAsInt( + options, VAR_LENGTH_CONTRIBUTION_KEY, DEFAULT_VAR_LENGTH_CONTRIBUTION); + Preconditions.checkArgument( + varLengthContribution > 0, + "Cannot use less than 1 byte for variable length types with zOrder, %s was set to %s", + VAR_LENGTH_CONTRIBUTION_KEY, + varLengthContribution); + + maxOutputSize = + PropertyUtil.propertyAsInt(options, MAX_OUTPUT_SIZE_KEY, DEFAULT_MAX_OUTPUT_SIZE); + Preconditions.checkArgument( + maxOutputSize > 0, + "Cannot have the interleaved ZOrder value use less than 1 byte, %s was set to %s", + MAX_OUTPUT_SIZE_KEY, + maxOutputSize); + + return this; + } + + public Spark3ZOrderStrategy(Table table, SparkSession spark, List zOrderColNames) { + super(table, spark); + + Preconditions.checkArgument( + zOrderColNames != null && !zOrderColNames.isEmpty(), + "Cannot ZOrder when no columns are specified"); + + Stream identityPartitionColumns = + table.spec().fields().stream() + .filter(f -> f.transform().isIdentity()) + .map(PartitionField::name); + List partZOrderCols = + identityPartitionColumns.filter(zOrderColNames::contains).collect(Collectors.toList()); + + if (!partZOrderCols.isEmpty()) { + LOG.warn( + "Cannot ZOrder on an Identity partition column as these values are constant within a partition " + + "and will be removed from the ZOrder expression: {}", + partZOrderCols); + zOrderColNames.removeAll(partZOrderCols); + Preconditions.checkArgument( + !zOrderColNames.isEmpty(), + "Cannot perform ZOrdering, all columns provided were identity partition columns and cannot be used."); + } + + validateColumnsExistence(table, spark, zOrderColNames); + + this.zOrderColNames = zOrderColNames; + } + + private void validateColumnsExistence(Table table, SparkSession spark, List colNames) { + boolean caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); + Schema schema = table.schema(); + colNames.forEach( + col -> { + NestedField nestedField = + caseSensitive ? schema.findField(col) : schema.caseInsensitiveFindField(col); + if (nestedField == null) { + throw new IllegalArgumentException( + String.format( + "Cannot find column '%s' in table schema: %s", col, schema.asStruct())); + } + }); + } + + @Override + public String name() { + return "Z-ORDER"; + } + + @Override + protected void validateOptions() { + // Ignore SortStrategy validation + return; + } + + @Override + public Set rewriteFiles(List filesToRewrite) { + Spark3ZOrderUDF zOrderUDF = + new Spark3ZOrderUDF(zOrderColNames.size(), varLengthContribution, maxOutputSize); + + String groupID = UUID.randomUUID().toString(); + boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table().spec()); + + SortOrder[] ordering; + if (requiresRepartition) { + ordering = Spark3Util.convert(SortOrderUtil.buildSortOrder(table(), sortOrder())); + } else { + ordering = Spark3Util.convert(sortOrder()); + } + + Distribution distribution = Distributions.ordered(ordering); + + try { + manager().stageTasks(table(), groupID, filesToRewrite); + + // Disable Adaptive Query Execution as this may change the output partitioning of our write + SparkSession cloneSession = spark().cloneSession(); + cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); + + // Reset Shuffle Partitions for our sort + long numOutputFiles = + numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple())); + cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles)); + + Dataset scanDF = + cloneSession + .read() + .format("iceberg") + .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID) + .load(table().name()); + + Column[] originalColumns = + Arrays.stream(scanDF.schema().names()).map(n -> functions.col(n)).toArray(Column[]::new); + + List zOrderColumns = + zOrderColNames.stream().map(scanDF.schema()::apply).collect(Collectors.toList()); + + Column zvalueArray = + functions.array( + zOrderColumns.stream() + .map( + colStruct -> + zOrderUDF.sortedLexicographically( + functions.col(colStruct.name()), colStruct.dataType())) + .toArray(Column[]::new)); + + Dataset zvalueDF = scanDF.withColumn(Z_COLUMN, zOrderUDF.interleaveBytes(zvalueArray)); + + SQLConf sqlConf = cloneSession.sessionState().conf(); + LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf); + Dataset sortedDf = new Dataset<>(cloneSession, sortPlan, zvalueDF.encoder()); + sortedDf + .select(originalColumns) + .write() + .format("iceberg") + .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID) + .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) + .mode("append") + .save(table().name()); + + return rewriteCoordinator().fetchNewDataFiles(table(), groupID); + } finally { + manager().removeTasks(table(), groupID); + rewriteCoordinator().clearRewrite(table(), groupID); + } + } + + @Override + protected org.apache.iceberg.SortOrder sortOrder() { + return Z_SORT_ORDER; + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderUDF.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderUDF.java new file mode 100644 index 000000000000..ff80792e1fa3 --- /dev/null +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderUDF.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import org.apache.iceberg.util.ZOrderByteUtils; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.TimestampType; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +class Spark3ZOrderUDF implements Serializable { + private static final byte[] PRIMITIVE_EMPTY = new byte[ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE]; + + /** + * Every Spark task runs iteratively on a rows in a single thread so ThreadLocal should protect + * from concurrent access to any of these structures. + */ + private transient ThreadLocal outputBuffer; + + private transient ThreadLocal inputHolder; + private transient ThreadLocal inputBuffers; + private transient ThreadLocal encoder; + + private final int numCols; + + private int inputCol = 0; + private int totalOutputBytes = 0; + private final int varTypeSize; + private final int maxOutputSize; + + Spark3ZOrderUDF(int numCols, int varTypeSize, int maxOutputSize) { + this.numCols = numCols; + this.varTypeSize = varTypeSize; + this.maxOutputSize = maxOutputSize; + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + inputBuffers = ThreadLocal.withInitial(() -> new ByteBuffer[numCols]); + inputHolder = ThreadLocal.withInitial(() -> new byte[numCols][]); + outputBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(totalOutputBytes)); + encoder = ThreadLocal.withInitial(() -> StandardCharsets.UTF_8.newEncoder()); + } + + private ByteBuffer inputBuffer(int position, int size) { + ByteBuffer buffer = inputBuffers.get()[position]; + if (buffer == null) { + buffer = ByteBuffer.allocate(size); + inputBuffers.get()[position] = buffer; + } + return buffer; + } + + byte[] interleaveBits(Seq scalaBinary) { + byte[][] columnsBinary = JavaConverters.seqAsJavaList(scalaBinary).toArray(inputHolder.get()); + return ZOrderByteUtils.interleaveBits(columnsBinary, totalOutputBytes, outputBuffer.get()); + } + + private UserDefinedFunction tinyToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = + functions + .udf( + (Byte value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.tinyintToOrderedBytes( + value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, + DataTypes.BinaryType) + .withName("TINY_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction shortToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = + functions + .udf( + (Short value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.shortToOrderedBytes( + value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, + DataTypes.BinaryType) + .withName("SHORT_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction intToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = + functions + .udf( + (Integer value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.intToOrderedBytes( + value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, + DataTypes.BinaryType) + .withName("INT_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction longToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = + functions + .udf( + (Long value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.longToOrderedBytes( + value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, + DataTypes.BinaryType) + .withName("LONG_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction floatToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = + functions + .udf( + (Float value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.floatToOrderedBytes( + value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, + DataTypes.BinaryType) + .withName("FLOAT_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction doubleToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = + functions + .udf( + (Double value) -> { + if (value == null) { + return PRIMITIVE_EMPTY; + } + return ZOrderByteUtils.doubleToOrderedBytes( + value, inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE)) + .array(); + }, + DataTypes.BinaryType) + .withName("DOUBLE_ORDERED_BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + + return udf; + } + + private UserDefinedFunction booleanToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = + functions + .udf( + (Boolean value) -> { + ByteBuffer buffer = inputBuffer(position, ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + buffer.put(0, (byte) (value ? -127 : 0)); + return buffer.array(); + }, + DataTypes.BinaryType) + .withName("BOOLEAN-LEXICAL-BYTES"); + + this.inputCol++; + increaseOutputSize(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE); + return udf; + } + + private UserDefinedFunction stringToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = + functions + .udf( + (String value) -> + ZOrderByteUtils.stringToOrderedBytes( + value, varTypeSize, inputBuffer(position, varTypeSize), encoder.get()) + .array(), + DataTypes.BinaryType) + .withName("STRING-LEXICAL-BYTES"); + + this.inputCol++; + increaseOutputSize(varTypeSize); + + return udf; + } + + private UserDefinedFunction bytesTruncateUDF() { + int position = inputCol; + UserDefinedFunction udf = + functions + .udf( + (byte[] value) -> + ZOrderByteUtils.byteTruncateOrFill( + value, varTypeSize, inputBuffer(position, varTypeSize)) + .array(), + DataTypes.BinaryType) + .withName("BYTE-TRUNCATE"); + + this.inputCol++; + increaseOutputSize(varTypeSize); + + return udf; + } + + private final UserDefinedFunction interleaveUDF = + functions + .udf((Seq arrayBinary) -> interleaveBits(arrayBinary), DataTypes.BinaryType) + .withName("INTERLEAVE_BYTES"); + + Column interleaveBytes(Column arrayBinary) { + return interleaveUDF.apply(arrayBinary); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + Column sortedLexicographically(Column column, DataType type) { + if (type instanceof ByteType) { + return tinyToOrderedBytesUDF().apply(column); + } else if (type instanceof ShortType) { + return shortToOrderedBytesUDF().apply(column); + } else if (type instanceof IntegerType) { + return intToOrderedBytesUDF().apply(column); + } else if (type instanceof LongType) { + return longToOrderedBytesUDF().apply(column); + } else if (type instanceof FloatType) { + return floatToOrderedBytesUDF().apply(column); + } else if (type instanceof DoubleType) { + return doubleToOrderedBytesUDF().apply(column); + } else if (type instanceof StringType) { + return stringToOrderedBytesUDF().apply(column); + } else if (type instanceof BinaryType) { + return bytesTruncateUDF().apply(column); + } else if (type instanceof BooleanType) { + return booleanToOrderedBytesUDF().apply(column); + } else if (type instanceof TimestampType) { + return longToOrderedBytesUDF().apply(column.cast(DataTypes.LongType)); + } else if (type instanceof DateType) { + return longToOrderedBytesUDF().apply(column.cast(DataTypes.LongType)); + } else { + throw new IllegalArgumentException( + String.format( + "Cannot use column %s of type %s in ZOrdering, the type is unsupported", + column, type)); + } + } + + private void increaseOutputSize(int bytes) { + totalOutputBytes = Math.min(totalOutputBytes + bytes, maxOutputSize); + } +} diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 6d62fd223963..da336a3185a0 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -19,6 +19,10 @@ package org.apache.iceberg.spark.actions; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doAnswer; @@ -72,6 +76,7 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; @@ -1047,6 +1052,104 @@ public void testCommitStateUnknownException() { shouldHaveSnapshots(table, 2); // Commit actually Succeeded } + @Test + public void testZOrderSort() { + int originalFiles = 20; + Table table = createTable(originalFiles); + shouldHaveLastCommitUnsorted(table, "c2"); + shouldHaveFiles(table, originalFiles); + + List originalData = currentData(); + double originalFilesC2 = percentFilesRequired(table, "c2", "foo23"); + double originalFilesC3 = percentFilesRequired(table, "c3", "bar21"); + double originalFilesC2C3 = + percentFilesRequired(table, new String[] {"c2", "c3"}, new String[] {"foo23", "bar23"}); + + Assert.assertTrue("Should require all files to scan c2", originalFilesC2 > 0.99); + Assert.assertTrue("Should require all files to scan c3", originalFilesC3 > 0.99); + + RewriteDataFiles.Result result = + basicRewrite(table) + .zOrder("c2", "c3") + .option( + SortStrategy.MAX_FILE_SIZE_BYTES, + Integer.toString((averageFileSize(table) / 2) + 2)) + // Divide files in 2 + .option( + RewriteDataFiles.TARGET_FILE_SIZE_BYTES, + Integer.toString(averageFileSize(table) / 2)) + .option(SortStrategy.MIN_INPUT_FILES, "1") + .execute(); + + Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); + int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedDataFiles(table.io())); + Assert.assertTrue("Should have written 40+ files", zOrderedFilesTotal >= 40); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + + double filesScannedC2 = percentFilesRequired(table, "c2", "foo23"); + double filesScannedC3 = percentFilesRequired(table, "c3", "bar21"); + double filesScannedC2C3 = + percentFilesRequired(table, new String[] {"c2", "c3"}, new String[] {"foo23", "bar23"}); + + Assert.assertTrue( + "Should have reduced the number of files required for c2", + filesScannedC2 < originalFilesC2); + Assert.assertTrue( + "Should have reduced the number of files required for c3", + filesScannedC3 < originalFilesC3); + Assert.assertTrue( + "Should have reduced the number of files required for a c2,c3 predicate", + filesScannedC2C3 < originalFilesC2C3); + } + + @Test + public void testZOrderAllTypesSort() { + Table table = createTypeTestTable(); + shouldHaveFiles(table, 10); + + List originalRaw = + spark.read().format("iceberg").load(tableLocation).sort("longCol").collectAsList(); + List originalData = rowsToJava(originalRaw); + + // TODO add in UUID when it is supported in Spark + RewriteDataFiles.Result result = + basicRewrite(table) + .zOrder( + "longCol", + "intCol", + "floatCol", + "doubleCol", + "dateCol", + "timestampCol", + "stringCol", + "binaryCol", + "booleanCol") + .option(SortStrategy.MIN_INPUT_FILES, "1") + .option(SortStrategy.REWRITE_ALL, "true") + .execute(); + + Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); + int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedDataFiles(table.io())); + Assert.assertEquals("Should have written 1 file", 1, zOrderedFilesTotal); + + table.refresh(); + + List postRaw = + spark.read().format("iceberg").load(tableLocation).sort("longCol").collectAsList(); + List postRewriteData = rowsToJava(postRaw); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @Test public void testInvalidAPIUsage() { Table table = createTable(1); @@ -1206,6 +1309,41 @@ public void testRewriteJobOrderFilesDesc() { Assert.assertNotEquals("Number of files order should not be ascending", actual, expected); } + private Table createTypeTestTable() { + Schema schema = + new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "dateCol", Types.DateType.get()), + optional(6, "timestampCol", Types.TimestampType.withZone()), + optional(7, "stringCol", Types.StringType.get()), + optional(8, "booleanCol", Types.BooleanType.get()), + optional(9, "binaryCol", Types.BinaryType.get())); + + Map options = Maps.newHashMap(); + Table table = TABLES.create(schema, PartitionSpec.unpartitioned(), options, tableLocation); + + spark + .range(0, 10, 1, 10) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) + .withColumn("booleanCol", expr("longCol > 5")) + .withColumn("binaryCol", expr("CAST(longCol AS BINARY)")) + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + return table; + } + private Stream toGroupStream( Table table, BaseRewriteDataFilesSparkAction rewrite) { rewrite.validateAndInitOptions(); @@ -1476,6 +1614,21 @@ private Set cacheContents(Table table) { .build(); } + private double percentFilesRequired(Table table, String col, String value) { + return percentFilesRequired(table, new String[] {col}, new String[] {value}); + } + + private double percentFilesRequired(Table table, String[] cols, String[] values) { + Preconditions.checkArgument(cols.length == values.length); + Expression restriction = Expressions.alwaysTrue(); + for (int i = 0; i < cols.length; i++) { + restriction = Expressions.and(restriction, Expressions.equal(cols[i], values[i])); + } + int totalFiles = Iterables.size(table.newScan().planFiles()); + int filteredFiles = Iterables.size(table.newScan().filter(restriction).planFiles()); + return (double) filteredFiles / (double) totalFiles; + } + class GroupInfoMatcher implements ArgumentMatcher { private final Set groupIDs;