Skip to content

Commit

Permalink
[7.x] SQL: Pushdown WHERE clause inside subqueries (#71362) (#71398)
Browse files Browse the repository at this point in the history
Push down filters inside subqueries, even when dealing with aggregates.
The rule already existed however it was not being used inside SQL.
When dealing with Aggregates, keep the aggregate functions in place but
try and push down conjunctions on non-aggregates.
  • Loading branch information
costin authored Apr 7, 2021
1 parent 9d04e61 commit b4e19a2
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.xpack.ql.expression.Order.NullsPosition;
import org.elasticsearch.xpack.ql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.ql.expression.predicate.Predicates;
import org.elasticsearch.xpack.ql.expression.predicate.logical.And;
import org.elasticsearch.xpack.ql.expression.predicate.logical.Not;
import org.elasticsearch.xpack.ql.expression.predicate.logical.Or;
import org.elasticsearch.xpack.ql.expression.predicate.nulls.IsNotNull;
Expand All @@ -47,6 +46,7 @@
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.OptimizerRule;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PropagateEquals;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PruneLiteralsInOrderBy;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PushDownAndCombineFilters;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.ReplaceSurrogateFunction;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SetAsOptimized;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SimplifyComparisonsArithmetics;
Expand Down Expand Up @@ -93,13 +93,14 @@ protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {
new PropagateNullable(),
new CombineBinaryComparisons(),
new CombineDisjunctionsToIn(),
new PushDownAndCombineFilters(),
new SimplifyComparisonsArithmetics(DataTypes::areCompatible),
// prune/elimination
new PruneFilters(),
new PruneLiteralsInOrderBy(),
new PruneCast(),
new CombineLimits());
new CombineLimits(),
new PushDownAndCombineFilters()
);

Batch constraints = new Batch("Infer constraints", Limiter.ONCE,
new PropagateJoinKeyConstraints());
Expand Down Expand Up @@ -189,25 +190,6 @@ protected LogicalPlan rule(Filter filter) {
}
}

static class PushDownAndCombineFilters extends OptimizerRule<Filter> {

@Override
protected LogicalPlan rule(Filter filter) {
LogicalPlan child = filter.child();
LogicalPlan plan = filter;

if (child instanceof Filter) {
Filter f = (Filter) child;
plan = new Filter(f.source(), f.child(), new And(f.source(), f.condition(), filter.condition()));
} else if (child instanceof UnaryPlan) {
UnaryPlan up = (UnaryPlan) child;
plan = child.replaceChildrenSameSize(singletonList(new Filter(filter.source(), up.child(), filter.condition())));
}

return plan;
}
}

static class ReplaceRegexMatch extends org.elasticsearch.xpack.ql.optimizer.OptimizerRules.ReplaceRegexMatch {
@Override
protected Expression regexToEquals(RegexMatch<?> regexMatch, Literal literal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.elasticsearch.xpack.eql.EqlTestUtils.TEST_CFG;
import static org.elasticsearch.xpack.ql.TestUtils.UTC;
import static org.elasticsearch.xpack.ql.expression.Literal.TRUE;
import static org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PushDownAndCombineFilters;
import static org.elasticsearch.xpack.ql.tree.Source.EMPTY;
import static org.elasticsearch.xpack.ql.type.DataTypes.INTEGER;

Expand Down Expand Up @@ -334,7 +335,7 @@ public void testCombineFilters() {
Filter filterChild = basicFilter(left);
Filter filterParent = new Filter(EMPTY, filterChild, right);

LogicalPlan result = new Optimizer.PushDownAndCombineFilters().apply(filterParent);
LogicalPlan result = new PushDownAndCombineFilters().apply(filterParent);

assertEquals(Filter.class, result.getClass());
Expression condition = ((Filter) result).condition();
Expand All @@ -359,7 +360,7 @@ public void testPushDownFilterUnary() {
OrderBy order = new OrderBy(EMPTY, rel(), emptyList());
Filter filter = new Filter(EMPTY, order, left);

LogicalPlan result = new Optimizer.PushDownAndCombineFilters().apply(filter);
LogicalPlan result = new PushDownAndCombineFilters().apply(filter);

assertEquals(OrderBy.class, result.getClass());
OrderBy o = (OrderBy) result;
Expand All @@ -386,7 +387,7 @@ public void testPushDownFilterDoesNotApplyOnNonUnary() {
Sequence s = sequence(rule1, rule2);
Filter filter = new Filter(EMPTY, s, left);

LogicalPlan result = new Optimizer.PushDownAndCombineFilters().apply(filter);
LogicalPlan result = new PushDownAndCombineFilters().apply(filter);

assertEquals(Filter.class, result.getClass());
Filter f = (Filter) result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.xpack.ql.expression.Nullability;
import org.elasticsearch.xpack.ql.expression.Order;
import org.elasticsearch.xpack.ql.expression.function.Function;
import org.elasticsearch.xpack.ql.expression.function.Functions;
import org.elasticsearch.xpack.ql.expression.function.scalar.SurrogateFunction;
import org.elasticsearch.xpack.ql.expression.predicate.BinaryOperator;
import org.elasticsearch.xpack.ql.expression.predicate.BinaryPredicate;
Expand Down Expand Up @@ -40,10 +41,12 @@
import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.NullEquals;
import org.elasticsearch.xpack.ql.expression.predicate.regex.RegexMatch;
import org.elasticsearch.xpack.ql.expression.predicate.regex.StringPattern;
import org.elasticsearch.xpack.ql.plan.logical.Aggregate;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.Limit;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.ql.rule.Rule;
import org.elasticsearch.xpack.ql.type.DataType;
import org.elasticsearch.xpack.ql.type.DataTypes;
Expand All @@ -63,6 +66,7 @@

import static java.lang.Math.signum;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.ql.expression.Literal.FALSE;
import static org.elasticsearch.xpack.ql.expression.Literal.TRUE;
import static org.elasticsearch.xpack.ql.expression.predicate.Predicates.combineAnd;
Expand Down Expand Up @@ -1139,6 +1143,51 @@ protected In createIn(Expression key, List<Expression> values, ZoneId zoneId) {
}
}

public static class PushDownAndCombineFilters extends OptimizerRule<Filter> {

@Override
protected LogicalPlan rule(Filter filter) {
LogicalPlan plan = filter;
LogicalPlan child = filter.child();
Expression condition = filter.condition();

if (child instanceof Filter) {
Filter f = (Filter) child;
plan = f.with(new And(f.source(), f.condition(), condition));
}
// as it stands, all other unary plans should allow filters to be pushed down
else if (child instanceof UnaryPlan) {
UnaryPlan unary = (UnaryPlan) child;
// in case of aggregates, worry about filters that contain aggregations
if (unary instanceof Aggregate && condition.anyMatch(Functions::isAggregate)) {
Aggregate agg = (Aggregate) unary;
List<Expression> conjunctions = new ArrayList<>(splitAnd(condition));
List<Expression> inPlace = new ArrayList<>();
// extract all conjunctions containing aggregates
for (Iterator<Expression> iterator = conjunctions.iterator(); iterator.hasNext();) {
Expression conjunction = iterator.next();
if (conjunction.anyMatch(Functions::isAggregate)) {
inPlace.add(conjunction);
iterator.remove();
}
}
// if at least one expression can be pushed down, update the tree
if (conjunctions.size() > 0) {
child = child.replaceChildrenSameSize(
singletonList(filter.with(unary.child(), Predicates.combineAnd(conjunctions)))
);
plan = filter.with(child, Predicates.combineAnd(inPlace));
}
} else {
// push down filter
plan = child.replaceChildrenSameSize(singletonList(filter.with(unary.child(), condition)));
}
}

return plan;
}
}

public static class ReplaceSurrogateFunction extends OptimizerExpressionRule {

public ReplaceSurrogateFunction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.expression.Literal;
import org.elasticsearch.xpack.ql.expression.Nullability;
import org.elasticsearch.xpack.ql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.ql.expression.predicate.BinaryOperator;
import org.elasticsearch.xpack.ql.expression.predicate.Predicates;
import org.elasticsearch.xpack.ql.expression.predicate.Range;
Expand Down Expand Up @@ -45,6 +46,10 @@
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.CombineBinaryComparisons;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.ConstantFolding;
import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PropagateEquals;
import org.elasticsearch.xpack.ql.plan.logical.Aggregate;
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataType;
Expand All @@ -56,6 +61,7 @@
import java.util.List;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.ql.TestUtils.equalsOf;
Expand All @@ -68,11 +74,13 @@
import static org.elasticsearch.xpack.ql.TestUtils.nullEqualsOf;
import static org.elasticsearch.xpack.ql.TestUtils.of;
import static org.elasticsearch.xpack.ql.TestUtils.rangeOf;
import static org.elasticsearch.xpack.ql.TestUtils.relation;
import static org.elasticsearch.xpack.ql.expression.Literal.FALSE;
import static org.elasticsearch.xpack.ql.expression.Literal.NULL;
import static org.elasticsearch.xpack.ql.expression.Literal.TRUE;
import static org.elasticsearch.xpack.ql.optimizer.OptimizerRules.CombineDisjunctionsToIn;
import static org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PropagateNullable;
import static org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PushDownAndCombineFilters;
import static org.elasticsearch.xpack.ql.optimizer.OptimizerRules.ReplaceRegexMatch;
import static org.elasticsearch.xpack.ql.tree.Source.EMPTY;
import static org.elasticsearch.xpack.ql.type.DataTypes.BOOLEAN;
Expand Down Expand Up @@ -1609,7 +1617,48 @@ public void testIsNullDisjunction() throws Exception {
assertEquals(and, new PropagateNullable().rule(and));
}

public void testSkipNull() throws Exception {
public void testCombineFilters() throws Exception {
EsRelation relation = relation();
GreaterThan conditionA = greaterThanOf(getFieldAttribute("a"), ONE);
LessThan conditionB = lessThanOf(getFieldAttribute("b"), TWO);

Filter fa = new Filter(EMPTY, relation, conditionA);
Filter fb = new Filter(EMPTY, fa, conditionB);

assertEquals(new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB)), new PushDownAndCombineFilters().apply(fb));
}

public void testPushDownFilter() throws Exception {
EsRelation relation = relation();
GreaterThan conditionA = greaterThanOf(getFieldAttribute("a"), ONE);
LessThan conditionB = lessThanOf(getFieldAttribute("b"), TWO);

Filter fa = new Filter(EMPTY, relation, conditionA);
List<FieldAttribute> projections = singletonList(getFieldAttribute("b"));
Project project = new Project(EMPTY, fa, projections);
Filter fb = new Filter(EMPTY, project, conditionB);

Filter combinedFilter = new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB));
assertEquals(new Project(EMPTY, combinedFilter, projections), new PushDownAndCombineFilters().apply(fb));
}

public void testPushDownFilterThroughAgg() throws Exception {
EsRelation relation = relation();
GreaterThan conditionA = greaterThanOf(getFieldAttribute("a"), ONE);
LessThan conditionB = lessThanOf(getFieldAttribute("b"), TWO);
GreaterThanOrEqual aggregateCondition = greaterThanOrEqualOf(new Count(EMPTY, ONE, false), THREE);

Filter fa = new Filter(EMPTY, relation, conditionA);
List<FieldAttribute> projections = singletonList(getFieldAttribute("b"));
// invalid aggregate but that's fine cause its properties are not used by this rule
Aggregate aggregate = new Aggregate(EMPTY, fa, emptyList(), emptyList());
Filter fb = new Filter(EMPTY, aggregate, new And(EMPTY, aggregateCondition, conditionB));

Filter combinedFilter = new Filter(EMPTY, relation, new And(EMPTY, conditionA, conditionB));

// expected
Filter expected = new Filter(EMPTY, new Aggregate(EMPTY, combinedFilter, emptyList(), emptyList()), aggregateCondition);
assertEquals(expected, new PushDownAndCombineFilters().apply(fb));

}
}
Loading

0 comments on commit b4e19a2

Please sign in to comment.