Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic partition pruning #54

Open
wants to merge 2 commits into
base: delta-core-3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions kernel-default/src/main/java/io/delta/core/data/JsonRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,21 @@ private static Object decodeElement(JsonNode jsonValue, DataType dataType) {
}

private static Object decodeField(ObjectNode rootNode, StructField field) {
if (rootNode.get(field.name) == null) {
if (field.nullable) {
if (rootNode.get(field.getName()) == null) {
if (field.isNullable()) {
return null;
}

throw new RuntimeException(
String.format(
"Root node at key %s is null but field isn't nullable. Root node: %s",
field.name,
field.getName(),
rootNode
)
);
}

return decodeElement(rootNode.get(field.name), field.dataType);
return decodeElement(rootNode.get(field.getName()), field.getDataType());
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -216,7 +216,7 @@ public String toString() {
////////////////////////////////////////

private void assertType(int ordinal, DataType expectedType) {
final String actualTypeName = readSchema.at(ordinal).dataType.typeName();
final String actualTypeName = readSchema.at(ordinal).getDataType().typeName();
if (!actualTypeName.equals(expectedType.typeName()) &&
!actualTypeName.equals(UnresolvedDataType.INSTANCE.typeName())) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.delta.core.internal
import scala.collection.JavaConverters._

import io.delta.core.Table
import io.delta.core.expressions.{And, EqualTo, Literal}
import io.delta.core.helpers.DefaultTableHelper
import io.delta.core.util.GoldenTableUtils
import org.scalatest.funsuite.AnyFunSuite
Expand Down Expand Up @@ -75,4 +76,35 @@ class TableSuite extends AnyFunSuite with GoldenTableUtils {
snapshot.getAddFiles.forEachRemaining(x => println(x))
}
}

test("can perform partition pruning - basic - no checkpoint") {
withGoldenTable("basic-partitioned-no-checkpoint") { path =>
val table = Table.forPath(path, new DefaultTableHelper())
val snapshot = table.getLatestSnapshot.asInstanceOf[SnapshotImpl]
val schema = snapshot.getSchema

val partitionFilter1 = new EqualTo(schema.column("part_a"), Literal.of(0L));
val scan1 = snapshot.getScanBuilder().withFilter(partitionFilter1).build()
scan1
.getTasks
.asScala
.map(task => task.asInstanceOf[ScanTaskImpl].getAddFile)
.foreach(add => assert(add.getPartitionValues.get("part_a").toLong == 0))

val partitionFilter2 = new And(
new EqualTo(schema.column("part_a"), Literal.of(0L)),
new EqualTo(schema.column("part_b"), Literal.of(0L))
)
val scan2 = snapshot.getScanBuilder().withFilter(partitionFilter2).build()
scan2
.getTasks
.asScala
.map(task => task.asInstanceOf[ScanTaskImpl].getAddFile)
.foreach { add => assert(
add.getPartitionValues.get("part_a").toLong == 0 &&
add.getPartitionValues.get("part_b").toLong == 0
)
}
}
}
}
46 changes: 46 additions & 0 deletions kernel/src/main/java/io/delta/core/expressions/And.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.delta.core.expressions;

import java.util.Collection;

import io.delta.core.types.BooleanType;

/**
* Evaluates logical {@code expr1} AND {@code expr2} for {@code new And(expr1, expr2)}.
* <p>
* Requires both left and right input expressions evaluate to booleans.
*/
public final class And extends BinaryOperator implements Predicate {

public static And apply(Collection<Expression> conjunctions) {
if (conjunctions.size() == 0) {
throw new IllegalArgumentException("And.apply must be called with at least 1 element");
}

return (And) conjunctions
.stream()
// we start off with And(true, true)
// then we get the 1st expression: And(And(true, true), expr1)
// then we get the 2nd expression: And(And(true, true), expr1), expr2) etc.
.reduce(new And(Literal.TRUE, Literal.TRUE), And::new);
}

public And(Expression left, Expression right) {
super(left, right, "&&");
if (!(left.dataType() instanceof BooleanType) ||
!(right.dataType() instanceof BooleanType)) {

throw new IllegalArgumentException(
String.format(
"'And' requires expressions of type boolean. Got %s and %s.",
left.dataType().typeName(),
right.dataType().typeName()
)
);
}
}

@Override
public Object nullSafeEval(Object leftResult, Object rightResult) {
return (boolean) leftResult && (boolean) rightResult;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.delta.core.expressions;

import java.util.Comparator;

/**
* A {@link BinaryOperator} that compares the left and right {@link Expression}s and evaluates to a
* boolean value.
*/
public abstract class BinaryComparison extends BinaryOperator implements Predicate {
private final Comparator<Object> comparator;

protected BinaryComparison(Expression left, Expression right, String symbol) {
super(left, right, symbol);

// super asserted that left and right DataTypes were the same

comparator = CastingComparator.forDataType(left.dataType());
}

protected int compare(Object leftResult, Object rightResult) {
return comparator.compare(leftResult, rightResult);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.delta.core.expressions;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import io.delta.core.data.Row;

/**
* An {@link Expression} with two inputs and one output. The output is by default evaluated to null
* if either input is evaluated to null.
*/
public abstract class BinaryExpression implements Expression {
protected final Expression left;
protected final Expression right;

protected BinaryExpression(Expression left, Expression right) {
this.left = left;
this.right = right;
}

public Expression getLeft() {
return left;
}

public Expression getRight() {
return right;
}

@Override
public final Object eval(Row row) {
Object leftResult = left.eval(row);
if (null == leftResult) return null;

Object rightResult = right.eval(row);
if (null == rightResult) return null;

return nullSafeEval(leftResult, rightResult);
}

protected abstract Object nullSafeEval(Object leftResult, Object rightResult);

@Override
public List<Expression> children() {
return Arrays.asList(left, right);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BinaryExpression that = (BinaryExpression) o;
return Objects.equals(left, that.left) &&
Objects.equals(right, that.right);
}

@Override
public int hashCode() {
return Objects.hash(left, right);
}
}
30 changes: 30 additions & 0 deletions kernel/src/main/java/io/delta/core/expressions/BinaryOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.delta.core.expressions;

/**
* A {@link BinaryExpression} that is an operator, meaning the string representation is
* {@code x symbol y}, rather than {@code funcName(x, y)}.
* <p>
* Requires both inputs to be of the same data type.
*/
public abstract class BinaryOperator extends BinaryExpression {
protected final String symbol;

protected BinaryOperator(Expression left, Expression right, String symbol) {
super(left, right);
this.symbol = symbol;

if (!left.dataType().equivalent(right.dataType())) {
throw new IllegalArgumentException(
String.format(
"BinaryOperator left and right DataTypes must be the same. Found %s and %s.",
left.dataType().typeName(),
right.dataType().typeName())
);
}
}

@Override
public String toString() {
return String.format("(%s %s %s)", left.toString(), symbol, right.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.delta.core.expressions;

import java.util.Comparator;

import io.delta.core.types.*;

public class CastingComparator<T extends Comparable<T>> implements Comparator<Object> {

public static Comparator<Object> forDataType(DataType dataType) {
if (dataType instanceof IntegerType) {
return new CastingComparator<Integer>();
}

if (dataType instanceof BooleanType) {
return new CastingComparator<Boolean>();
}

if (dataType instanceof LongType) {
return new CastingComparator<Long>();
}

if (dataType instanceof StringType) {
return new CastingComparator<String>();
}

throw new IllegalArgumentException(
String.format("Unsupported DataType: %s", dataType.typeName())
);
}

private final Comparator<T> comparator;

public CastingComparator() {
comparator = Comparator.naturalOrder();
}

@SuppressWarnings("unchecked")
@Override
public int compare(Object a, Object b) {
return comparator.compare((T) a, (T) b);
}
}
92 changes: 92 additions & 0 deletions kernel/src/main/java/io/delta/core/expressions/Column.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.delta.core.expressions;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;

import io.delta.core.data.Row;
import io.delta.core.types.*;

/**
* A column whose row-value will be computed based on the data in a {@link io.delta.core.data.Row}.
* <p>
* It is recommended that you instantiate using an existing table schema
* {@link io.delta.core.types.StructType} with {@link StructType#column(int)}.
* <p>
* Only supports primitive data types, see
* <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types">Delta Transaction Log Protocol: Primitive Types</a>.
*/
public final class Column extends LeafExpression {
private final int ordinal;
private final String name;
private final DataType dataType;
private final RowEvaluator evaluator;

public Column(int ordinal, String name, DataType dataType) {
this.ordinal = ordinal;
this.name = name;
this.dataType = dataType;

if (dataType instanceof IntegerType) {
evaluator = (row -> row.getInt(ordinal));
} else if (dataType instanceof BooleanType) {
evaluator = (row -> row.getBoolean(ordinal));
} else if (dataType instanceof LongType) {
evaluator = (row -> row.getLong(ordinal));
} else if (dataType instanceof StringType) {
evaluator = (row -> row.getString(ordinal));
} else {
throw new UnsupportedOperationException(
String.format(
"The data type %s of column %s at ordinal %s is not supported",
dataType.typeName(),
name,
ordinal)
);
}
}

public String name() {
return name;
}

@Override
public Object eval(Row row) {
return row.isNullAt(ordinal) ? null : evaluator.nullSafeEval(row);
}

@Override
public DataType dataType() {
return dataType;
}

@Override
public String toString() {
return "Column(" + name + ")";
}

@Override
public Set<String> references() {
return Collections.singleton(name);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Column column = (Column) o;
return Objects.equals(ordinal, column.ordinal) &&
Objects.equals(name, column.name) &&
Objects.equals(dataType, column.dataType);
}

@Override
public int hashCode() {
return Objects.hash(name, dataType);
}

@FunctionalInterface
private interface RowEvaluator {
Object nullSafeEval(Row row);
}
}
17 changes: 17 additions & 0 deletions kernel/src/main/java/io/delta/core/expressions/EqualTo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.delta.core.expressions;

/**
* Evaluates {@code expr1} = {@code expr2} for {@code new EqualTo(expr1, expr2)}.
*/
public final class EqualTo extends BinaryComparison implements Predicate {

public EqualTo(Expression left, Expression right) {
super(left, right, "=");
}

@Override
protected Object nullSafeEval(Object leftResult, Object rightResult) {
return compare(leftResult, rightResult) == 0;
}
}

Loading