Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: mv_expand pushes down limit and project and keep the limit after it untouched #100782

Merged
merged 10 commits into from
Oct 24, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,100 @@ sum_a:long | b:integer
12555000 | 29
12555000 | 30
;

expandAfterSort
from employees | keep job_positions, emp_no | sort emp_no | mv_expand job_positions | limit 10;

job_positions:keyword |emp_no:integer
Accountant |10001
Senior Python Developer |10001
Senior Team Lead |10002
null |10003
Support Engineer |10004
Head Human Resources |10004
Reporting Analyst |10004
Tech Lead |10004
null |10005
Principal Support Engineer|10006
;

expandWithMultiSort
from employees | keep emp_no, job_positions | mv_expand job_positions | limit 10 | where emp_no <= 10006 | sort job_positions nulls first;

emp_no:integer | job_positions:keyword
10003 |null
10005 |null
10001 |Accountant
10004 |Head Human Resources
10006 |Principal Support Engineer
10004 |Reporting Analyst
10001 |Senior Python Developer
10002 |Senior Team Lead
10004 |Support Engineer
10004 |Tech Lead
;

filterMvExpanded
from employees | keep emp_no, job_positions | mv_expand job_positions | where job_positions like "A*" | sort job_positions, emp_no;

emp_no:integer | job_positions:keyword
10001 |Accountant
10012 |Accountant
10016 |Accountant
10023 |Accountant
10025 |Accountant
10028 |Accountant
10034 |Accountant
10037 |Accountant
10044 |Accountant
10045 |Accountant
10050 |Accountant
10051 |Accountant
10066 |Accountant
10081 |Accountant
10085 |Accountant
10089 |Accountant
10092 |Accountant
10094 |Accountant
10010 |Architect
10011 |Architect
10031 |Architect
10032 |Architect
10042 |Architect
10047 |Architect
10059 |Architect
10068 |Architect
10072 |Architect
10076 |Architect
10078 |Architect
10096 |Architect
10098 |Architect
;

doubleLimit_expandLimitLowerThanAvailable
from employees | where emp_no == 10004 | limit 1 | keep emp_no, job_positions | mv_expand job_positions | limit 2;

emp_no:integer | job_positions:keyword
10004 |Head Human Resources
10004 |Reporting Analyst
;

doubleLimit_expandLimitGreaterThanAvailable
from employees | where emp_no == 10004 | limit 1 | keep emp_no, job_positions | mv_expand job_positions | limit 5;

emp_no:integer | job_positions:keyword
10004 |Head Human Resources
10004 |Reporting Analyst
10004 |Support Engineer
10004 |Tech Lead
;

doubleLimitWithSort
from employees | where emp_no == 10004 | limit 1 | keep emp_no, job_positions | mv_expand job_positions | limit 5 | sort job_positions desc;

emp_no:integer | job_positions:keyword
10004 |Tech Lead
10004 |Support Engineer
10004 |Reporting Analyst
10004 |Head Human Resources
;
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Grok;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
Expand Down Expand Up @@ -258,6 +259,7 @@ public static List<PlanNameRegistry.Entry> namedTypeEntries() {
of(LogicalPlan.class, Filter.class, PlanNamedTypes::writeFilter, PlanNamedTypes::readFilter),
of(LogicalPlan.class, Grok.class, PlanNamedTypes::writeGrok, PlanNamedTypes::readGrok),
of(LogicalPlan.class, Limit.class, PlanNamedTypes::writeLimit, PlanNamedTypes::readLimit),
of(LogicalPlan.class, MvExpand.class, PlanNamedTypes::writeMvExpand, PlanNamedTypes::readMvExpand),
of(LogicalPlan.class, OrderBy.class, PlanNamedTypes::writeOrderBy, PlanNamedTypes::readOrderBy),
of(LogicalPlan.class, Project.class, PlanNamedTypes::writeProject, PlanNamedTypes::readProject),
of(LogicalPlan.class, TopN.class, PlanNamedTypes::writeTopN, PlanNamedTypes::readTopN),
Expand Down Expand Up @@ -763,6 +765,16 @@ static void writeLimit(PlanStreamOutput out, Limit limit) throws IOException {
out.writeLogicalPlanNode(limit.child());
}

static MvExpand readMvExpand(PlanStreamInput in) throws IOException {
return new MvExpand(in.readSource(), in.readLogicalPlanNode(), in.readNamedExpression());
}

static void writeMvExpand(PlanStreamOutput out, MvExpand mvExpand) throws IOException {
out.writeNoSource();
out.writeLogicalPlanNode(mvExpand.child());
out.writeNamedExpression(mvExpand.target());
}

static OrderBy readOrderBy(PlanStreamInput in) throws IOException {
return new OrderBy(
in.readSource(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
Expand Down Expand Up @@ -126,6 +127,7 @@ protected static List<Batch<LogicalPlan>> rules() {
new PushDownEval(),
new PushDownRegexExtract(),
new PushDownEnrich(),
new PushDownMvExpand(),
new PushDownAndCombineOrderBy(),
new PruneOrderByBeforeStats(),
new PruneRedundantSortClauses()
Expand Down Expand Up @@ -383,7 +385,8 @@ protected LogicalPlan rule(Limit limit) {
// check if there's a 'visible' descendant limit lower than the current one
// and if so, align the current limit since it adds no value
// this applies for cases such as | limit 1 | sort field | limit 10
else {
// but NOT for mv_expand (ie | limit 1 | mv_expand x | limit 20) where we want that last "limit" to apply on expand results
else if (unary instanceof MvExpand == false) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't combine the limits for mv_expand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works only if the limit is right above mv_expand - if it's hidden by another node (keep, where, etc..) the rule won't see it.
Move the check into descendantLimit so that if mv_expand is encountered, just like with an agg, no limit is returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Haven't thought of this scenario.

Limit descendantLimit = descendantLimit(unary);
if (descendantLimit != null) {
var l1 = (int) limit.limit().fold();
Expand Down Expand Up @@ -653,8 +656,22 @@ protected LogicalPlan rule(Enrich re) {
}
}

protected static class PushDownAndCombineOrderBy extends OptimizerRules.OptimizerRule<OrderBy> {
protected static class PushDownMvExpand extends OptimizerRules.OptimizerRule<MvExpand> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of pushing down MvExpand? I'd argue we want the opposite as this increases the amount of data earlier in the pipeline.

@Override
protected LogicalPlan rule(MvExpand mve) {
LogicalPlan child = mve.child();

if (child instanceof OrderBy orderBy) {
return orderBy.replaceChild(mve.replaceChild(orderBy.child()));
} else if (child instanceof Project) {
return pushDownPastProject(mve);
}

return mve;
}
}

protected static class PushDownAndCombineOrderBy extends OptimizerRules.OptimizerRule<OrderBy> {
@Override
protected LogicalPlan rule(OrderBy orderBy) {
LogicalPlan child = orderBy.child();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Grok;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
Expand Down Expand Up @@ -888,6 +889,50 @@ public void testCombineOrderByThroughFilter() {
as(filter.child(), EsRelation.class);
}

public void testCombineOrderByThroughMvExpand() {
LogicalPlan plan = optimizedPlan("""
from test
| sort emp_no
| mv_expand first_name
| sort first_name""");

var topN = as(plan, TopN.class);
assertThat(orderNames(topN), contains("first_name", "emp_no"));
var mvExpand = as(topN.child(), MvExpand.class);
as(mvExpand.child(), EsRelation.class);
}

public void testPushDownMvExpandPastProject() {
LogicalPlan plan = optimizedPlan("""
from test
| rename first_name as x
| keep x
| mv_expand x
""");

var keep = as(plan, Project.class);
var limit = as(keep.child(), Limit.class);
var mvExpand = as(limit.child(), MvExpand.class);
assertThat(as(mvExpand.target(), FieldAttribute.class).name(), is("first_name"));
}

public void testDontPushDownLimitPastMvExpand() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add as javadoc the expanded plans.

LogicalPlan plan = optimizedPlan("""
from test
| limit 1
| keep first_name, last_name
| mv_expand first_name
| limit 10""");

var project = as(plan, Project.class);
var limit = as(project.child(), Limit.class);
assertThat(limit.limit().fold(), equalTo(10));
var mvExpand = as(limit.child(), MvExpand.class);
limit = as(mvExpand.child(), Limit.class);
assertThat(limit.limit().fold(), equalTo(1));
as(limit.child(), EsRelation.class);
}

private static List<String> orderNames(TopN topN) {
return topN.order().stream().map(o -> as(o.child(), NamedExpression.class).name()).toList();
}
Expand Down