From 51d325a915faf01d85beda3bc48c18ddedea0283 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 31 Mar 2023 12:05:25 -0700 Subject: [PATCH 1/2] Barebones partition pruning with test --- .../main/java/io/delta/core/data/JsonRow.java | 10 +- .../io/delta/core/internal/TableSuite.scala | 16 ++++ .../core/expressions/BinaryComparison.java | 23 +++++ .../core/expressions/BinaryExpression.java | 61 ++++++++++++ .../core/expressions/BinaryOperator.java | 30 ++++++ .../core/expressions/CastingComparator.java | 42 ++++++++ .../io/delta/core/expressions/Column.java | 92 ++++++++++++++++++ .../io/delta/core/expressions/EqualTo.java | 17 ++++ .../io/delta/core/expressions/Expression.java | 40 ++++++++ .../core/expressions/LeafExpression.java | 27 ++++++ .../io/delta/core/expressions/Literal.java | 95 +++++++++++++++++++ .../io/delta/core/expressions/Predicate.java | 14 +++ .../delta/core/internal/ScanBuilderImpl.java | 37 ++++++-- .../java/io/delta/core/internal/ScanImpl.java | 88 +++++++++++++---- .../io/delta/core/internal/ScanTaskImpl.java | 9 +- .../io/delta/core/internal/SnapshotImpl.java | 12 ++- .../delta/core/internal/actions/AddFile.java | 13 +++ .../delta/core/internal/actions/Metadata.java | 20 +++- .../core/internal/data/PartitionRow.java | 69 ++++++++++++++ .../lang/FilteredCloseableIterator.java | 62 ++++++++++++ .../ReverseActionsToAddFilesIterator.java | 95 ++++++++----------- .../core/internal/util/PartitionUtils.java | 49 ++++++++++ .../java/io/delta/core/types/DataType.java | 16 ++++ .../java/io/delta/core/types/StructField.java | 20 +++- .../java/io/delta/core/types/StructType.java | 42 +++++++- 25 files changed, 897 insertions(+), 102 deletions(-) create mode 100644 kernel/src/main/java/io/delta/core/expressions/BinaryComparison.java create mode 100644 kernel/src/main/java/io/delta/core/expressions/BinaryExpression.java create mode 100644 kernel/src/main/java/io/delta/core/expressions/BinaryOperator.java create mode 100644 kernel/src/main/java/io/delta/core/expressions/CastingComparator.java create mode 100644 kernel/src/main/java/io/delta/core/expressions/Column.java create mode 100644 kernel/src/main/java/io/delta/core/expressions/EqualTo.java create mode 100644 kernel/src/main/java/io/delta/core/expressions/LeafExpression.java create mode 100644 kernel/src/main/java/io/delta/core/expressions/Literal.java create mode 100644 kernel/src/main/java/io/delta/core/expressions/Predicate.java create mode 100644 kernel/src/main/java/io/delta/core/internal/data/PartitionRow.java create mode 100644 kernel/src/main/java/io/delta/core/internal/lang/FilteredCloseableIterator.java create mode 100644 kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java diff --git a/kernel-default/src/main/java/io/delta/core/data/JsonRow.java b/kernel-default/src/main/java/io/delta/core/data/JsonRow.java index 117ee7489..30bd508a7 100644 --- a/kernel-default/src/main/java/io/delta/core/data/JsonRow.java +++ b/kernel-default/src/main/java/io/delta/core/data/JsonRow.java @@ -114,21 +114,21 @@ private static Object decodeElement(JsonNode jsonValue, DataType dataType) { } private static Object decodeField(ObjectNode rootNode, StructField field) { - if (rootNode.get(field.name) == null) { - if (field.nullable) { + if (rootNode.get(field.getName()) == null) { + if (field.isNullable()) { return null; } throw new RuntimeException( String.format( "Root node at key %s is null but field isn't nullable. Root node: %s", - field.name, + field.getName(), rootNode ) ); } - return decodeElement(rootNode.get(field.name), field.dataType); + return decodeElement(rootNode.get(field.getName()), field.getDataType()); } //////////////////////////////////////////////////////////////////////////////// @@ -216,7 +216,7 @@ public String toString() { //////////////////////////////////////// private void assertType(int ordinal, DataType expectedType) { - final String actualTypeName = readSchema.at(ordinal).dataType.typeName(); + final String actualTypeName = readSchema.at(ordinal).getDataType().typeName(); if (!actualTypeName.equals(expectedType.typeName()) && !actualTypeName.equals(UnresolvedDataType.INSTANCE.typeName())) { throw new RuntimeException( diff --git a/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala b/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala index e2f9f7431..484a9b869 100644 --- a/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala +++ b/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala @@ -19,6 +19,7 @@ package io.delta.core.internal import scala.collection.JavaConverters._ import io.delta.core.Table +import io.delta.core.expressions.{EqualTo, Literal} import io.delta.core.helpers.DefaultTableHelper import io.delta.core.util.GoldenTableUtils import org.scalatest.funsuite.AnyFunSuite @@ -75,4 +76,19 @@ class TableSuite extends AnyFunSuite with GoldenTableUtils { snapshot.getAddFiles.forEachRemaining(x => println(x)) } } + + test("can perform partition pruning- basic - no checkpoint") { + withGoldenTable("basic-partitioned-no-checkpoint") { path => + val table = Table.forPath(path, new DefaultTableHelper()) + val snapshot = table.getLatestSnapshot.asInstanceOf[SnapshotImpl] + val schema = snapshot.getSchema + val partitionFilter = new EqualTo(schema.column("part_a"), Literal.of(0L)); + val scan = snapshot.getScanBuilder().withFilter(partitionFilter).build() + scan + .getTasks + .asScala + .map(task => task.asInstanceOf[ScanTaskImpl].getAddFile) + .foreach(add => assert(add.getPartitionValues.get("part_a").toLong == 0)) + } + } } diff --git a/kernel/src/main/java/io/delta/core/expressions/BinaryComparison.java b/kernel/src/main/java/io/delta/core/expressions/BinaryComparison.java new file mode 100644 index 000000000..f7b5a4301 --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/BinaryComparison.java @@ -0,0 +1,23 @@ +package io.delta.core.expressions; + +import java.util.Comparator; + +/** + * A {@link BinaryOperator} that compares the left and right {@link Expression}s and evaluates to a + * boolean value. + */ +public abstract class BinaryComparison extends BinaryOperator implements Predicate { + private final Comparator comparator; + + protected BinaryComparison(Expression left, Expression right, String symbol) { + super(left, right, symbol); + + // super asserted that left and right DataTypes were the same + + comparator = CastingComparator.forDataType(left.dataType()); + } + + protected int compare(Object leftResult, Object rightResult) { + return comparator.compare(leftResult, rightResult); + } +} diff --git a/kernel/src/main/java/io/delta/core/expressions/BinaryExpression.java b/kernel/src/main/java/io/delta/core/expressions/BinaryExpression.java new file mode 100644 index 000000000..ac6975fee --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/BinaryExpression.java @@ -0,0 +1,61 @@ +package io.delta.core.expressions; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import io.delta.core.data.Row; + +/** + * An {@link Expression} with two inputs and one output. The output is by default evaluated to null + * if either input is evaluated to null. + */ +public abstract class BinaryExpression implements Expression { + protected final Expression left; + protected final Expression right; + + protected BinaryExpression(Expression left, Expression right) { + this.left = left; + this.right = right; + } + + public Expression getLeft() { + return left; + } + + public Expression getRight() { + return right; + } + + @Override + public final Object eval(Row row) { + Object leftResult = left.eval(row); + if (null == leftResult) return null; + + Object rightResult = right.eval(row); + if (null == rightResult) return null; + + return nullSafeEval(leftResult, rightResult); + } + + protected abstract Object nullSafeEval(Object leftResult, Object rightResult); + + @Override + public List children() { + return Arrays.asList(left, right); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BinaryExpression that = (BinaryExpression) o; + return Objects.equals(left, that.left) && + Objects.equals(right, that.right); + } + + @Override + public int hashCode() { + return Objects.hash(left, right); + } +} diff --git a/kernel/src/main/java/io/delta/core/expressions/BinaryOperator.java b/kernel/src/main/java/io/delta/core/expressions/BinaryOperator.java new file mode 100644 index 000000000..e3fc9661b --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/BinaryOperator.java @@ -0,0 +1,30 @@ +package io.delta.core.expressions; + +/** + * A {@link BinaryExpression} that is an operator, meaning the string representation is + * {@code x symbol y}, rather than {@code funcName(x, y)}. + *

+ * Requires both inputs to be of the same data type. + */ +public abstract class BinaryOperator extends BinaryExpression { + protected final String symbol; + + protected BinaryOperator(Expression left, Expression right, String symbol) { + super(left, right); + this.symbol = symbol; + + if (!left.dataType().equivalent(right.dataType())) { + throw new IllegalArgumentException( + String.format( + "BinaryOperator left and right DataTypes must be the same. Found %s and %s.", + left.dataType().typeName(), + right.dataType().typeName()) + ); + } + } + + @Override + public String toString() { + return String.format("(%s %s %s)", left.toString(), symbol, right.toString()); + } +} \ No newline at end of file diff --git a/kernel/src/main/java/io/delta/core/expressions/CastingComparator.java b/kernel/src/main/java/io/delta/core/expressions/CastingComparator.java new file mode 100644 index 000000000..3d4d22381 --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/CastingComparator.java @@ -0,0 +1,42 @@ +package io.delta.core.expressions; + +import java.util.Comparator; + +import io.delta.core.types.*; + +public class CastingComparator> implements Comparator { + + public static Comparator forDataType(DataType dataType) { + if (dataType instanceof IntegerType) { + return new CastingComparator(); + } + + if (dataType instanceof BooleanType) { + return new CastingComparator(); + } + + if (dataType instanceof LongType) { + return new CastingComparator(); + } + + if (dataType instanceof StringType) { + return new CastingComparator(); + } + + throw new IllegalArgumentException( + String.format("Unsupported DataType: %s", dataType.typeName()) + ); + } + + private final Comparator comparator; + + public CastingComparator() { + comparator = Comparator.naturalOrder(); + } + + @SuppressWarnings("unchecked") + @Override + public int compare(Object a, Object b) { + return comparator.compare((T) a, (T) b); + } +} diff --git a/kernel/src/main/java/io/delta/core/expressions/Column.java b/kernel/src/main/java/io/delta/core/expressions/Column.java new file mode 100644 index 000000000..9d20b2a5d --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/Column.java @@ -0,0 +1,92 @@ +package io.delta.core.expressions; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +import io.delta.core.data.Row; +import io.delta.core.types.*; + +/** + * A column whose row-value will be computed based on the data in a {@link io.delta.core.data.Row}. + *

+ * It is recommended that you instantiate using an existing table schema + * {@link io.delta.core.types.StructType} with {@link StructType#column(int)}. + *

+ * Only supports primitive data types, see + * Delta Transaction Log Protocol: Primitive Types. + */ +public final class Column extends LeafExpression { + private final int ordinal; + private final String name; + private final DataType dataType; + private final RowEvaluator evaluator; + + public Column(int ordinal, String name, DataType dataType) { + this.ordinal = ordinal; + this.name = name; + this.dataType = dataType; + + if (dataType instanceof IntegerType) { + evaluator = (row -> row.getInt(ordinal)); + } else if (dataType instanceof BooleanType) { + evaluator = (row -> row.getBoolean(ordinal)); + } else if (dataType instanceof LongType) { + evaluator = (row -> row.getLong(ordinal)); + } else if (dataType instanceof StringType) { + evaluator = (row -> row.getString(ordinal)); + } else { + throw new UnsupportedOperationException( + String.format( + "The data type %s of column %s at ordinal %s is not supported", + dataType.typeName(), + name, + ordinal) + ); + } + } + + public String name() { + return name; + } + + @Override + public Object eval(Row row) { + return row.isNullAt(ordinal) ? null : evaluator.nullSafeEval(row); + } + + @Override + public DataType dataType() { + return dataType; + } + + @Override + public String toString() { + return "Column(" + name + ")"; + } + + @Override + public Set references() { + return Collections.singleton(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Column column = (Column) o; + return Objects.equals(ordinal, column.ordinal) && + Objects.equals(name, column.name) && + Objects.equals(dataType, column.dataType); + } + + @Override + public int hashCode() { + return Objects.hash(name, dataType); + } + + @FunctionalInterface + private interface RowEvaluator { + Object nullSafeEval(Row row); + } +} diff --git a/kernel/src/main/java/io/delta/core/expressions/EqualTo.java b/kernel/src/main/java/io/delta/core/expressions/EqualTo.java new file mode 100644 index 000000000..09160c4da --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/EqualTo.java @@ -0,0 +1,17 @@ +package io.delta.core.expressions; + +/** + * Evaluates {@code expr1} = {@code expr2} for {@code new EqualTo(expr1, expr2)}. + */ +public final class EqualTo extends BinaryComparison implements Predicate { + + public EqualTo(Expression left, Expression right) { + super(left, right, "="); + } + + @Override + protected Object nullSafeEval(Object leftResult, Object rightResult) { + return compare(leftResult, rightResult) == 0; + } +} + diff --git a/kernel/src/main/java/io/delta/core/expressions/Expression.java b/kernel/src/main/java/io/delta/core/expressions/Expression.java index 4a117af4d..95f9ba48c 100644 --- a/kernel/src/main/java/io/delta/core/expressions/Expression.java +++ b/kernel/src/main/java/io/delta/core/expressions/Expression.java @@ -1,4 +1,44 @@ package io.delta.core.expressions; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import io.delta.core.data.Row; +import io.delta.core.types.DataType; + +/** + * Generic interface for all Expressions + */ public interface Expression { + + /** + * @param row the input row to evaluate. + * @return the result of evaluating this expression on the given input {@link Row}. + */ + Object eval(Row row); + + /** + * @return the {@link DataType} of the result of evaluating this expression. + */ + DataType dataType(); + + /** + * @return the String representation of this expression. + */ + String toString(); + + /** + * @return a {@link List} of the immediate children of this node + */ + List children(); + + /** + * @return the names of columns referenced by this expression. + */ + default Set references() { + Set result = new HashSet<>(); + children().forEach(child -> result.addAll(child.references())); + return result; + } } diff --git a/kernel/src/main/java/io/delta/core/expressions/LeafExpression.java b/kernel/src/main/java/io/delta/core/expressions/LeafExpression.java new file mode 100644 index 000000000..fb6598bb5 --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/LeafExpression.java @@ -0,0 +1,27 @@ +package io.delta.core.expressions; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * An {@link Expression} with no children. + */ +public abstract class LeafExpression implements Expression { + + protected LeafExpression() {} + + @Override + public List children() { + return Collections.emptyList(); + } + + @Override + public Set references() { + return Collections.emptySet(); + } + + public abstract boolean equals(Object o); + + public abstract int hashCode(); +} diff --git a/kernel/src/main/java/io/delta/core/expressions/Literal.java b/kernel/src/main/java/io/delta/core/expressions/Literal.java new file mode 100644 index 000000000..e5d53355d --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/Literal.java @@ -0,0 +1,95 @@ +package io.delta.core.expressions; + +import java.util.Objects; + +import io.delta.core.data.Row; +import io.delta.core.types.*; + +/** + * A literal value. + *

+ * Only supports primitive data types, see + * Delta Transaction Log Protocol: Primitive Types. + */ +public final class Literal extends LeafExpression { + + //////////////////////////////////////////////////////////////////////////////// + // Static Fields / Methods + //////////////////////////////////////////////////////////////////////////////// + + public static final Literal TRUE = Literal.of(true); + public static final Literal FALSE = Literal.of(false); + + /** + * @return a {@link Literal} with data type {@link IntegerType} + */ + public static Literal of(int value) { + return new Literal(value, IntegerType.INSTANCE); + } + + /** + * @return a {@link Literal} with data type {@link BooleanType} + */ + public static Literal of(boolean value) { + return new Literal(value, BooleanType.INSTANCE); + } + + /** + * @return a {@link Literal} with data type {@link LongType} + */ + public static Literal of(long value) { + return new Literal(value, LongType.INSTANCE); + } + + /** + * @return a {@link Literal} with data type {@link StringType} + */ + public static Literal of(String value) { + return new Literal(value, StringType.INSTANCE); + } + + //////////////////////////////////////////////////////////////////////////////// + // Instance Fields / Methods + //////////////////////////////////////////////////////////////////////////////// + + private final Object value; + private final DataType dataType; + + private Literal(Object value, DataType dataType) { + this.value = value; + this.dataType = dataType; + } + + public Object value() { + return value; + } + + @Override + public Object eval(Row record) { + return value; + } + + @Override + public DataType dataType() { + return dataType; + } + + @Override + public String toString() { + return String.valueOf(value); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Literal literal = (Literal) o; + return Objects.equals(value, literal.value) && + Objects.equals(dataType, literal.dataType); + } + + @Override + public int hashCode() { + return Objects.hash(value, dataType); + } +} diff --git a/kernel/src/main/java/io/delta/core/expressions/Predicate.java b/kernel/src/main/java/io/delta/core/expressions/Predicate.java new file mode 100644 index 000000000..c1664a000 --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/Predicate.java @@ -0,0 +1,14 @@ +package io.delta.core.expressions; + +import io.delta.core.types.BooleanType; +import io.delta.core.types.DataType; + +/** + * An {@link Expression} that defines a relation on inputs. Evaluates to true, false, or null. + */ +public interface Predicate extends Expression { + @Override + default DataType dataType() { + return BooleanType.INSTANCE; + } +} diff --git a/kernel/src/main/java/io/delta/core/internal/ScanBuilderImpl.java b/kernel/src/main/java/io/delta/core/internal/ScanBuilderImpl.java index 5cd8c914f..f4166e139 100644 --- a/kernel/src/main/java/io/delta/core/internal/ScanBuilderImpl.java +++ b/kernel/src/main/java/io/delta/core/internal/ScanBuilderImpl.java @@ -1,31 +1,50 @@ package io.delta.core.internal; +import java.util.Optional; + import io.delta.core.Scan; import io.delta.core.ScanBuilder; import io.delta.core.expressions.Expression; -import io.delta.core.internal.replay.LogReplay; +import io.delta.core.internal.actions.AddFile; import io.delta.core.types.StructType; +import io.delta.core.utils.CloseableIterator; public class ScanBuilderImpl implements ScanBuilder { - private final LogReplay logReplay; + private final StructType snapshotSchema; + private final StructType snapshotPartitionSchema; + private final CloseableIterator filesIter; + + private StructType readSchema; + private Optional filter; + + public ScanBuilderImpl( + StructType snapshotSchema, + StructType snapshotPartitionSchema, + CloseableIterator filesIter) { + this.snapshotSchema = snapshotSchema; + this.snapshotPartitionSchema = snapshotPartitionSchema; + this.filesIter = filesIter; - public ScanBuilderImpl(LogReplay logReplay) { - this.logReplay = logReplay; + this.readSchema = snapshotSchema; + this.filter = Optional.empty(); } @Override - public ScanBuilder withReadSchema(StructType schema) { - return null; + public ScanBuilder withReadSchema(StructType readSchema) { + // TODO: validate + this.readSchema = readSchema; + return this; } @Override - public ScanBuilder withFilter(Expression filter) { - return null; + public ScanBuilder withFilter(Expression expression) { + this.filter = Optional.of(expression); + return this; } @Override public Scan build() { - return new ScanImpl(logReplay); + return new ScanImpl(snapshotSchema, readSchema, snapshotPartitionSchema, filesIter, filter); } } diff --git a/kernel/src/main/java/io/delta/core/internal/ScanImpl.java b/kernel/src/main/java/io/delta/core/internal/ScanImpl.java index 273387c81..a74b2d9f3 100644 --- a/kernel/src/main/java/io/delta/core/internal/ScanImpl.java +++ b/kernel/src/main/java/io/delta/core/internal/ScanImpl.java @@ -1,39 +1,91 @@ package io.delta.core.internal; -import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; import io.delta.core.Scan; import io.delta.core.ScanTask; +import io.delta.core.expressions.Expression; import io.delta.core.internal.actions.AddFile; -import io.delta.core.internal.replay.LogReplay; +import io.delta.core.internal.data.PartitionRow; +import io.delta.core.internal.lang.FilteredCloseableIterator; +import io.delta.core.internal.lang.Tuple2; +import io.delta.core.internal.util.PartitionUtils; +import io.delta.core.types.StructType; import io.delta.core.utils.CloseableIterator; public class ScanImpl implements Scan { - private final LogReplay logReplay; + /** Complete schema of the snapshot to be scanned. */ + private final StructType snapshotSchema; - public ScanImpl(LogReplay logReplay) { - this.logReplay = logReplay; + /** Schema that we actually want to read. */ + private final StructType readSchema; + + /** Partition schema. */ + private final StructType snapshotPartitionSchema; + + private final CloseableIterator filesIter; + + /** Mapping from partitionColumnName to its ordinal in the `snapshotSchema`. */ + private final Map partitionColumnOrdinals; + + private final Optional metadataFilterConjunction; + private final Optional dataFilterConjunction; + + public ScanImpl( + StructType snapshotSchema, + StructType readSchema, + StructType snapshotPartitionSchema, + CloseableIterator filesIter, + Optional filter) { + this.snapshotSchema = snapshotSchema; + this.readSchema = readSchema; + this.snapshotPartitionSchema = snapshotPartitionSchema; + this.filesIter = filesIter; + this.partitionColumnOrdinals = PartitionUtils.getPartitionOrdinals(snapshotSchema, snapshotPartitionSchema); + + if (filter.isPresent()) { + final List partitionColumns = snapshotPartitionSchema.fieldNames(); + final Tuple2, Optional> metadataAndDataConjunctions = + PartitionUtils.splitMetadataAndDataPredicates(filter.get(), partitionColumns); + + this.metadataFilterConjunction = metadataAndDataConjunctions._1; + this.dataFilterConjunction = metadataAndDataConjunctions._2; + } else { + this.metadataFilterConjunction = Optional.empty(); + this.dataFilterConjunction = Optional.empty(); + } + + System.out.println("ScanImpl: snapshotSchema: " + snapshotSchema.fields()); + System.out.println("ScanImpl: readSchema: " + readSchema.fields()); + System.out.println("ScanImpl: snapshotPartitionSchema: " + snapshotPartitionSchema.fields()); + System.out.println("ScanImpl: partitionColumnOrdinals: " + partitionColumnOrdinals.toString()); + + System.out.println("ScanImpl: snapshotPartitionSchema: " + snapshotPartitionSchema.fields()); + System.out.println("ScanImpl: metadataFilterConjunction: " + metadataFilterConjunction.toString()); + System.out.println("ScanImpl: dataFilterConjunction: " + dataFilterConjunction.toString()); } @Override public CloseableIterator getTasks() { - return new CloseableIterator() { - final CloseableIterator addFileIter = logReplay.getAddFiles(); - + return new FilteredCloseableIterator(filesIter) { @Override - public void close() throws IOException { - addFileIter.close(); - } + protected Optional accept(AddFile addFile) { + if (!metadataFilterConjunction.isPresent()) { + return Optional.of(new ScanTaskImpl(addFile)); + } - @Override - public boolean hasNext() { - return addFileIter.hasNext(); - } + // Perform Partition Pruning + final PartitionRow row = new PartitionRow(partitionColumnOrdinals, addFile.getPartitionValues()); + final boolean accept = (boolean) metadataFilterConjunction.get().eval(row); - @Override - public ScanTask next() { - return new ScanTaskImpl(addFileIter.next()); + if (accept) { + return Optional.of(new ScanTaskImpl(addFile)); + } + + return Optional.empty(); } }; } diff --git a/kernel/src/main/java/io/delta/core/internal/ScanTaskImpl.java b/kernel/src/main/java/io/delta/core/internal/ScanTaskImpl.java index a2a7a92a9..a3694e22d 100644 --- a/kernel/src/main/java/io/delta/core/internal/ScanTaskImpl.java +++ b/kernel/src/main/java/io/delta/core/internal/ScanTaskImpl.java @@ -7,12 +7,19 @@ public class ScanTaskImpl implements ScanTask { + private final AddFile addFile; + public ScanTaskImpl(AddFile addFile) { - System.out.println("Created ScanTaskImpl for AddFile " + addFile.getPath()); + this.addFile = addFile; } @Override public CloseableIterator getData() { return null; } + + /** Visible for testing */ + public AddFile getAddFile() { + return this.addFile; + } } diff --git a/kernel/src/main/java/io/delta/core/internal/SnapshotImpl.java b/kernel/src/main/java/io/delta/core/internal/SnapshotImpl.java index 513eb559c..882acc427 100644 --- a/kernel/src/main/java/io/delta/core/internal/SnapshotImpl.java +++ b/kernel/src/main/java/io/delta/core/internal/SnapshotImpl.java @@ -50,18 +50,26 @@ public long getVersion() { @Override public StructType getSchema() { - return protocolAndMetadata.get()._2.getSchema(); + return getMetadata().getSchema(); } @Override public ScanBuilder getScanBuilder() { - return new ScanBuilderImpl(logReplay); + return new ScanBuilderImpl( + getSchema(), + getMetadata().getPartitionSchema(), + logReplay.getAddFiles() + ); } //////////////////////////////////////// // Internal APIs //////////////////////////////////////// + public Metadata getMetadata() { + return protocolAndMetadata.get()._2; + } + public CloseableIterator getAddFiles() { return logReplay.getAddFiles(); } diff --git a/kernel/src/main/java/io/delta/core/internal/actions/AddFile.java b/kernel/src/main/java/io/delta/core/internal/actions/AddFile.java index d6fe86619..b3ab259ce 100644 --- a/kernel/src/main/java/io/delta/core/internal/actions/AddFile.java +++ b/kernel/src/main/java/io/delta/core/internal/actions/AddFile.java @@ -1,5 +1,6 @@ package io.delta.core.internal.actions; +import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -61,6 +62,18 @@ public AddFile copyWithDataChange(boolean dataChange) { return this; // TODO } + public Map getPartitionValues() { + return Collections.unmodifiableMap(partitionValues); + } + + public long getSize() { + return size; + } + + public long getModificationTime() { + return modificationTime; + } + @Override public String toString() { return "AddFile{" + diff --git a/kernel/src/main/java/io/delta/core/internal/actions/Metadata.java b/kernel/src/main/java/io/delta/core/internal/actions/Metadata.java index 06c46b78b..00985f91b 100644 --- a/kernel/src/main/java/io/delta/core/internal/actions/Metadata.java +++ b/kernel/src/main/java/io/delta/core/internal/actions/Metadata.java @@ -1,5 +1,8 @@ package io.delta.core.internal.actions; +import java.util.List; +import java.util.stream.Collectors; + import io.delta.core.data.Row; import io.delta.core.helpers.TableHelper; import io.delta.core.types.*; @@ -17,10 +20,11 @@ public static Metadata fromRow(Row row, TableHelper tableHelper) { final String description = row.getString(2); final Format format = Format.fromRow(row.getRecord(3)); final String schemaJson = row.getString(4); + final List partitionColumns = row.getList(5); Row schemaRow = tableHelper.parseJson(schemaJson, StructType.READ_SCHEMA); StructType schema = StructType.fromRow(schemaRow); - return new Metadata(schema); + return new Metadata(schema, partitionColumns); } public static final StructType READ_SCHEMA = new StructType() @@ -47,12 +51,24 @@ public static Metadata fromRow(Row row, TableHelper tableHelper) { // createdTime private final StructType schema; + private final List partitionColumns; - public Metadata(StructType schema) { + public Metadata(StructType schema, List partitionColumns) { this.schema = schema; + this.partitionColumns = partitionColumns; } public StructType getSchema() { return schema; } + + public List getPartitionColumns() { + return partitionColumns; + } + + public StructType getPartitionSchema() { + return new StructType( + partitionColumns.stream().map(schema::get).collect(Collectors.toList()) + ); + } } diff --git a/kernel/src/main/java/io/delta/core/internal/data/PartitionRow.java b/kernel/src/main/java/io/delta/core/internal/data/PartitionRow.java new file mode 100644 index 000000000..f7f44dd78 --- /dev/null +++ b/kernel/src/main/java/io/delta/core/internal/data/PartitionRow.java @@ -0,0 +1,69 @@ +package io.delta.core.internal.data; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.delta.core.data.Row; + +/** + * The type of Row that will be evaluated against {@link io.delta.core.expressions.Column}s. + * + * These Columns must be partition columns, and will have ordinals matching the latest snapshot + * schema. + */ +public class PartitionRow implements Row { + + private final Map ordinalToValue; + + public PartitionRow(Map partitionOrdinals, Map partitionValuesMap) { + this.ordinalToValue = new HashMap<>(); + + for (Map.Entry entry : partitionOrdinals.entrySet()) { + final String partitionColumnName = entry.getKey(); + final int partitionColumnOrdinal = entry.getValue(); + final String partitionColumnValue = partitionValuesMap.get(partitionColumnName); + ordinalToValue.put(partitionColumnOrdinal, partitionColumnValue); + } + } + + @Override + public boolean isNullAt(int ordinal) { + return ordinalToValue.get(ordinal) == null; + } + + @Override + public boolean getBoolean(int ordinal) { + return Boolean.parseBoolean(ordinalToValue.get(ordinal)); + } + + @Override + public int getInt(int ordinal) { + return Integer.parseInt(ordinalToValue.get(ordinal)); + } + + @Override + public long getLong(int ordinal) { + return Long.parseLong(ordinalToValue.get(ordinal)); + } + + @Override + public String getString(int ordinal) { + return ordinalToValue.get(ordinal); + } + + @Override + public Row getRecord(int ordinal) { + throw new UnsupportedOperationException("Partition values can't be StructTypes"); + } + + @Override + public List getList(int ordinal) { + throw new UnsupportedOperationException("Partition values can't be Lists"); + } + + @Override + public Map getMap(int ordinal) { + throw new UnsupportedOperationException("Partition values can't be Maps"); + } +} diff --git a/kernel/src/main/java/io/delta/core/internal/lang/FilteredCloseableIterator.java b/kernel/src/main/java/io/delta/core/internal/lang/FilteredCloseableIterator.java new file mode 100644 index 000000000..533e99eb8 --- /dev/null +++ b/kernel/src/main/java/io/delta/core/internal/lang/FilteredCloseableIterator.java @@ -0,0 +1,62 @@ +package io.delta.core.internal.lang; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Optional; + +import io.delta.core.utils.CloseableIterator; + +public abstract class FilteredCloseableIterator implements CloseableIterator { + + private final CloseableIterator iter; + private Optional nextValid; + private boolean closed; + + public FilteredCloseableIterator(CloseableIterator iter) { + this.iter = iter; + this.nextValid = Optional.empty(); + this.closed = false; + } + + protected abstract Optional accept(ITER_TYPE element); + + @Override + public final boolean hasNext() { + if (closed) { + throw new IllegalStateException("Can't call `hasNext` on a closed iterator."); + } + if (!nextValid.isPresent()) { + nextValid = findNextValid(); + } + return nextValid.isPresent(); + } + + @Override + public final RETURN_TYPE next() { + if (closed) { + throw new IllegalStateException("Can't call `next` on a closed iterator."); + } + if (!hasNext()) throw new NoSuchElementException(); + + // By the definition of hasNext, we know that nextValid is non-empty + + final RETURN_TYPE ret = nextValid.get(); + nextValid = Optional.empty(); + return ret; + } + + @Override + public final void close() throws IOException { + iter.close(); + this.closed = true; + } + + private Optional findNextValid() { + while (iter.hasNext()) { + final Optional acceptedElementOpt = accept(iter.next()); + if (acceptedElementOpt.isPresent()) return acceptedElementOpt; + } + + return Optional.empty(); + } +} diff --git a/kernel/src/main/java/io/delta/core/internal/replay/ReverseActionsToAddFilesIterator.java b/kernel/src/main/java/io/delta/core/internal/replay/ReverseActionsToAddFilesIterator.java index f72e5264e..5dc7870f0 100644 --- a/kernel/src/main/java/io/delta/core/internal/replay/ReverseActionsToAddFilesIterator.java +++ b/kernel/src/main/java/io/delta/core/internal/replay/ReverseActionsToAddFilesIterator.java @@ -1,18 +1,18 @@ package io.delta.core.internal.replay; -import java.io.IOException; import java.net.URI; import java.util.HashMap; -import java.util.NoSuchElementException; import java.util.Optional; import io.delta.core.internal.actions.Action; import io.delta.core.internal.actions.AddFile; import io.delta.core.internal.actions.RemoveFile; +import io.delta.core.internal.lang.FilteredCloseableIterator; import io.delta.core.internal.lang.Tuple2; import io.delta.core.utils.CloseableIterator; -public class ReverseActionsToAddFilesIterator implements CloseableIterator { +public class ReverseActionsToAddFilesIterator + extends FilteredCloseableIterator> { private final CloseableIterator> reverseActionIter; @@ -20,74 +20,55 @@ public class ReverseActionsToAddFilesIterator implements CloseableIterator addFilesFromJson; - private Optional nextValid; - public ReverseActionsToAddFilesIterator(CloseableIterator> reverseActionIter) { + super(reverseActionIter); + this.reverseActionIter = reverseActionIter; this.tombstonesFromJson = new HashMap<>(); this.addFilesFromJson = new HashMap<>(); - this.nextValid = Optional.empty(); } @Override - public boolean hasNext() { - if (!nextValid.isPresent()) { - nextValid = findNextValid(); - } - - return nextValid.isPresent(); - } - - @Override - public AddFile next() { - if (!hasNext()) throw new NoSuchElementException(); - - // By the definition of hasNext, we know that actionsIter is non-empty + protected Optional accept(Tuple2 element) { + final Action action = element._1; + final boolean isFromCheckpoint = element._2; + + if (action instanceof AddFile) { + final AddFile add = ((AddFile) action).copyWithDataChange(false); + final UniqueFileActionTuple key = + new UniqueFileActionTuple(add.toURI(), add.getDeletionVectorUniqueId()); + final boolean alreadyDeleted = tombstonesFromJson.containsKey(key); + final boolean alreadyReturned = addFilesFromJson.containsKey(key); + + if (!alreadyReturned) { + // Note: No AddFile will appear twice in a checkpoint, so we only need + // non-checkpoint AddFiles in the set + if (!isFromCheckpoint) { + addFilesFromJson.put(key, add); + } - final AddFile ret = nextValid.get(); - nextValid = Optional.empty(); - return ret; - } + if (!alreadyDeleted) { + return Optional.of(add); + } + } + } else if (action instanceof RemoveFile && !isFromCheckpoint) { + // Note: There's no reason to put a RemoveFile from a checkpoint into tombstones map + // since, when we generate a checkpoint, any corresponding AddFile would have + // been excluded + final RemoveFile remove = ((RemoveFile) action).copyWithDataChange(false); + final UniqueFileActionTuple key = + new UniqueFileActionTuple(remove.toURI(), remove.getDeletionVectorUniqueId()); + + tombstonesFromJson.put(key, remove); + } - @Override - public void close() throws IOException { - reverseActionIter.close(); + return Optional.empty(); } private Optional findNextValid() { while (reverseActionIter.hasNext()) { final Tuple2 tuple = reverseActionIter.next(); - final Action action = tuple._1; - final boolean isFromCheckpoint = tuple._2; - - if (action instanceof AddFile) { - final AddFile add = ((AddFile) action).copyWithDataChange(false); - final UniqueFileActionTuple key = - new UniqueFileActionTuple(add.toURI(), add.getDeletionVectorUniqueId()); - final boolean alreadyDeleted = tombstonesFromJson.containsKey(key); - final boolean alreadyReturned = addFilesFromJson.containsKey(key); - - if (!alreadyReturned) { - // Note: No AddFile will appear twice in a checkpoint, so we only need - // non-checkpoint AddFiles in the set - if (!isFromCheckpoint) { - addFilesFromJson.put(key, add); - } - - if (!alreadyDeleted) { - return Optional.of(add); - } - } - } else if (action instanceof RemoveFile && !isFromCheckpoint) { - // Note: There's no reason to put a RemoveFile from a checkpoint into tombstones map - // since, when we generate a checkpoint, any corresponding AddFile would have - // been excluded - final RemoveFile remove = ((RemoveFile) action).copyWithDataChange(false); - final UniqueFileActionTuple key = - new UniqueFileActionTuple(remove.toURI(), remove.getDeletionVectorUniqueId()); - - tombstonesFromJson.put(key, remove); - } + } return Optional.empty(); diff --git a/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java b/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java new file mode 100644 index 000000000..3c04c4c5b --- /dev/null +++ b/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java @@ -0,0 +1,49 @@ +package io.delta.core.internal.util; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import io.delta.core.expressions.Expression; +import io.delta.core.internal.lang.Tuple2; +import io.delta.core.types.StructType; + +public class PartitionUtils { + + private PartitionUtils() { } + + public static Map getPartitionOrdinals( + StructType snapshotSchema, + StructType partitionSchema) { + final Map output = new HashMap<>(); + partitionSchema + .fieldNames() + .forEach(fieldName -> output.put(fieldName, snapshotSchema.indexOf(fieldName))); + + return output; + } + + /** + * Partition the given condition into two optional conjunctive predicates M, D such that + * condition = M AND D, where we define: + * - M: conjunction of predicates that can be evaluated using metadata only. + * - D: conjunction of other predicates. + */ + public static Tuple2, Optional> splitMetadataAndDataPredicates( + Expression condition, + List partitionColumns) { + return new Tuple2<>(Optional.of(condition), Optional.empty()); + } + + private static List splitConjunctivePredicates(Expression condition) { + return null; + } + +// private def splitConjunctivePredicates(condition: Expression): Seq[Expression] = { +// condition match { +// case a: And => splitConjunctivePredicates(a.getLeft) ++ splitConjunctivePredicates(a.getRight) +// case other => other :: Nil +// } +// } +} diff --git a/kernel/src/main/java/io/delta/core/types/DataType.java b/kernel/src/main/java/io/delta/core/types/DataType.java index 920eb92b3..ec525cc80 100644 --- a/kernel/src/main/java/io/delta/core/types/DataType.java +++ b/kernel/src/main/java/io/delta/core/types/DataType.java @@ -20,5 +20,21 @@ public String typeName() { } return name.toLowerCase(Locale.ROOT); } + public boolean equivalent(DataType dt) { + return this.equals(dt); + } + + @Override + public String toString() { + return typeName(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DataType that = (DataType) o; + return typeName().equals(that.typeName()); + } } diff --git a/kernel/src/main/java/io/delta/core/types/StructField.java b/kernel/src/main/java/io/delta/core/types/StructField.java index 9c7dd37c8..a2f147491 100644 --- a/kernel/src/main/java/io/delta/core/types/StructField.java +++ b/kernel/src/main/java/io/delta/core/types/StructField.java @@ -24,9 +24,9 @@ public static StructField fromRow(Row row) { // Instance Fields / Methods //////////////////////////////////////////////////////////////////////////////// - public final String name; - public final DataType dataType; - public final boolean nullable; + private final String name; + private final DataType dataType; + private final boolean nullable; // private final FieldMetadata metadata; public StructField(String name, DataType dataType, boolean nullable) { @@ -35,8 +35,20 @@ public StructField(String name, DataType dataType, boolean nullable) { this.nullable = nullable; } + public String getName() { + return name; + } + + public DataType getDataType() { + return dataType; + } + + public boolean isNullable() { + return nullable; + } + @Override public String toString() { - return String.format("StructField(%s,%s,%s)", name, dataType, nullable); + return String.format("StructField(name=%s,type=%s,nullable=%s)", name, dataType, nullable); } } diff --git a/kernel/src/main/java/io/delta/core/types/StructType.java b/kernel/src/main/java/io/delta/core/types/StructType.java index 0b8b2caf3..265644566 100644 --- a/kernel/src/main/java/io/delta/core/types/StructType.java +++ b/kernel/src/main/java/io/delta/core/types/StructType.java @@ -1,11 +1,11 @@ package io.delta.core.types; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.stream.Collectors; import io.delta.core.data.Row; +import io.delta.core.expressions.Column; +import io.delta.core.internal.lang.Tuple2; public final class StructType extends DataType { @@ -32,7 +32,9 @@ public static StructType fromRow(Row row) { // Instance Fields / Methods //////////////////////////////////////////////////////////////////////////////// + private final Map> nameToFieldAndOrdinal; private final List fields; + private final List fieldNames; public StructType() { this(new ArrayList<>()); @@ -40,6 +42,12 @@ public StructType() { public StructType(List fields) { this.fields = fields; + this.fieldNames = fields.stream().map(f -> f.getName()).collect(Collectors.toList()); + + this.nameToFieldAndOrdinal = new HashMap<>(); + for (int i = 0; i < fields.size(); i++) { + nameToFieldAndOrdinal.put(fields.get(i).getName(), new Tuple2<>(fields.get(i), i)); + } } public StructType add(StructField field) { @@ -58,17 +66,43 @@ public List fields() { } public List fieldNames() { - return fields.stream().map(f -> f.name).collect(Collectors.toList()); + return fieldNames; } public int length() { return fields.size(); } + public int indexOf(String fieldName) { + return fieldNames.indexOf(fieldName); + } + + public StructField get(String fieldName) { + return nameToFieldAndOrdinal.get(fieldName)._1; + } + public StructField at(int index) { return fields.get(index); } + /** + * Creates a {@link io.delta.core.expressions.Column} expression for the field with the given + * {@code fieldName}. + * + * @param ordinal the ordinal of the {@link StructField} to create a column for + * @return a {@link Column} expression for the {@link StructField} with name {@code fieldName} + */ + public Column column(int ordinal) { + final StructField field = at(ordinal); + return new Column(ordinal, field.getName(), field.getDataType()); + } + + public Column column(String fieldName) { + Tuple2 fieldAndOrdinal = nameToFieldAndOrdinal.get(fieldName); + System.out.println("Created column " + fieldName + " with ordinal " + fieldAndOrdinal._2); + return new Column(fieldAndOrdinal._2, fieldName, fieldAndOrdinal._1.getDataType()); + } + @Override public String toString() { return String.format( From 3df4e880f88d6b025c0a1044ba37049fb0f7c1c6 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 31 Mar 2023 12:30:08 -0700 Subject: [PATCH 2/2] Got basic partition pruning using AND(part1, part2) --- .../io/delta/core/internal/TableSuite.scala | 26 +++++++-- .../java/io/delta/core/expressions/And.java | 46 +++++++++++++++ .../core/internal/util/PartitionUtils.java | 58 +++++++++++++++---- 3 files changed, 113 insertions(+), 17 deletions(-) create mode 100644 kernel/src/main/java/io/delta/core/expressions/And.java diff --git a/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala b/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala index 484a9b869..4cf21298c 100644 --- a/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala +++ b/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala @@ -19,7 +19,7 @@ package io.delta.core.internal import scala.collection.JavaConverters._ import io.delta.core.Table -import io.delta.core.expressions.{EqualTo, Literal} +import io.delta.core.expressions.{And, EqualTo, Literal} import io.delta.core.helpers.DefaultTableHelper import io.delta.core.util.GoldenTableUtils import org.scalatest.funsuite.AnyFunSuite @@ -77,18 +77,34 @@ class TableSuite extends AnyFunSuite with GoldenTableUtils { } } - test("can perform partition pruning- basic - no checkpoint") { + test("can perform partition pruning - basic - no checkpoint") { withGoldenTable("basic-partitioned-no-checkpoint") { path => val table = Table.forPath(path, new DefaultTableHelper()) val snapshot = table.getLatestSnapshot.asInstanceOf[SnapshotImpl] val schema = snapshot.getSchema - val partitionFilter = new EqualTo(schema.column("part_a"), Literal.of(0L)); - val scan = snapshot.getScanBuilder().withFilter(partitionFilter).build() - scan + + val partitionFilter1 = new EqualTo(schema.column("part_a"), Literal.of(0L)); + val scan1 = snapshot.getScanBuilder().withFilter(partitionFilter1).build() + scan1 .getTasks .asScala .map(task => task.asInstanceOf[ScanTaskImpl].getAddFile) .foreach(add => assert(add.getPartitionValues.get("part_a").toLong == 0)) + + val partitionFilter2 = new And( + new EqualTo(schema.column("part_a"), Literal.of(0L)), + new EqualTo(schema.column("part_b"), Literal.of(0L)) + ) + val scan2 = snapshot.getScanBuilder().withFilter(partitionFilter2).build() + scan2 + .getTasks + .asScala + .map(task => task.asInstanceOf[ScanTaskImpl].getAddFile) + .foreach { add => assert( + add.getPartitionValues.get("part_a").toLong == 0 && + add.getPartitionValues.get("part_b").toLong == 0 + ) + } } } } diff --git a/kernel/src/main/java/io/delta/core/expressions/And.java b/kernel/src/main/java/io/delta/core/expressions/And.java new file mode 100644 index 000000000..72a89e020 --- /dev/null +++ b/kernel/src/main/java/io/delta/core/expressions/And.java @@ -0,0 +1,46 @@ +package io.delta.core.expressions; + +import java.util.Collection; + +import io.delta.core.types.BooleanType; + +/** + * Evaluates logical {@code expr1} AND {@code expr2} for {@code new And(expr1, expr2)}. + *

+ * Requires both left and right input expressions evaluate to booleans. + */ +public final class And extends BinaryOperator implements Predicate { + + public static And apply(Collection conjunctions) { + if (conjunctions.size() == 0) { + throw new IllegalArgumentException("And.apply must be called with at least 1 element"); + } + + return (And) conjunctions + .stream() + // we start off with And(true, true) + // then we get the 1st expression: And(And(true, true), expr1) + // then we get the 2nd expression: And(And(true, true), expr1), expr2) etc. + .reduce(new And(Literal.TRUE, Literal.TRUE), And::new); + } + + public And(Expression left, Expression right) { + super(left, right, "&&"); + if (!(left.dataType() instanceof BooleanType) || + !(right.dataType() instanceof BooleanType)) { + + throw new IllegalArgumentException( + String.format( + "'And' requires expressions of type boolean. Got %s and %s.", + left.dataType().typeName(), + right.dataType().typeName() + ) + ); + } + } + + @Override + public Object nullSafeEval(Object leftResult, Object rightResult) { + return (boolean) leftResult && (boolean) rightResult; + } +} diff --git a/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java b/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java index 3c04c4c5b..e89aaa6a2 100644 --- a/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java +++ b/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java @@ -1,11 +1,12 @@ package io.delta.core.internal.util; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import io.delta.core.expressions.And; import io.delta.core.expressions.Expression; +import io.delta.core.internal.lang.ListUtils; import io.delta.core.internal.lang.Tuple2; import io.delta.core.types.StructType; @@ -33,17 +34,50 @@ public static Map getPartitionOrdinals( public static Tuple2, Optional> splitMetadataAndDataPredicates( Expression condition, List partitionColumns) { - return new Tuple2<>(Optional.of(condition), Optional.empty()); + final Tuple2, List> metadataAndDataPredicates = ListUtils + .partition( + splitConjunctivePredicates(condition), + c -> isPredicateMetadataOnly(c, partitionColumns) + ); + + final Optional metadataConjunction; + if (metadataAndDataPredicates._1.isEmpty()) { + metadataConjunction = Optional.empty(); + } else { + metadataConjunction = Optional.of(And.apply(metadataAndDataPredicates._1)); + } + + final Optional dataConjunction; + if (metadataAndDataPredicates._2.isEmpty()) { + dataConjunction = Optional.empty(); + } else { + dataConjunction = Optional.of(And.apply(metadataAndDataPredicates._2)); + } + return new Tuple2<>(metadataConjunction, dataConjunction); } private static List splitConjunctivePredicates(Expression condition) { - return null; + if (condition instanceof And) { + final And andExpr = (And) condition; + return Stream.concat( + splitConjunctivePredicates(andExpr.getLeft()).stream(), + splitConjunctivePredicates(andExpr.getRight()).stream() + ).collect(Collectors.toList()); + } + return Collections.singletonList(condition); } -// private def splitConjunctivePredicates(condition: Expression): Seq[Expression] = { -// condition match { -// case a: And => splitConjunctivePredicates(a.getLeft) ++ splitConjunctivePredicates(a.getRight) -// case other => other :: Nil -// } -// } + private static boolean isPredicateMetadataOnly( + Expression condition, + List partitionColumns) { + Set lowercasePartCols = partitionColumns + .stream().map(s -> s.toLowerCase(Locale.ROOT)) + .collect(Collectors.toSet()); + + return condition + .references() + .stream() + .map(s -> s.toLowerCase(Locale.ROOT)) + .allMatch(lowercasePartCols::contains); + } }