-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: mv_expand pushes down limit and project and keep the limit after it untouched #100782
Conversation
- properly accept a limit after mv_expand when there is also a second limit before the mv_expand
Pinging @elastic/es-ql (Team:QL) |
Hi @astefan, I've created a changelog YAML for you. |
Pinging @elastic/elasticsearch-esql (:Query Languages/ES|QL) |
@@ -383,7 +385,8 @@ protected LogicalPlan rule(Limit limit) { | |||
// check if there's a 'visible' descendant limit lower than the current one | |||
// and if so, align the current limit since it adds no value | |||
// this applies for cases such as | limit 1 | sort field | limit 10 | |||
else { | |||
// but NOT for mv_expand (ie | limit 1 | mv_expand x | limit 20) where we want that last "limit" to apply on expand results | |||
else if (unary instanceof MvExpand == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't combine the limits for mv_expand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this works only if the limit is right above mv_expand
- if it's hidden by another node (keep, where, etc..) the rule won't see it.
Move the check into descendantLimit so that if mv_expand is encountered, just like with an agg, no limit is returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Haven't thought of this scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current approach can be improved and made more reliable - see my comments.
public void testCombineOrderByThroughMvExpand() { | ||
LogicalPlan plan = optimizedPlan(""" | ||
from test | ||
| sort emp_no | ||
| mv_expand first_name | ||
| sort first_name"""); | ||
|
||
var topN = as(plan, TopN.class); | ||
assertThat(orderNames(topN), contains("first_name", "emp_no")); | ||
var mvExpand = as(topN.child(), MvExpand.class); | ||
as(mvExpand.child(), EsRelation.class); | ||
} | ||
|
||
public void testPushDownMvExpandPastProject() { | ||
LogicalPlan plan = optimizedPlan(""" | ||
from test | ||
| rename first_name as x | ||
| keep x | ||
| mv_expand x | ||
"""); | ||
|
||
var keep = as(plan, Project.class); | ||
var limit = as(keep.child(), Limit.class); | ||
var mvExpand = as(limit.child(), MvExpand.class); | ||
assertThat(as(mvExpand.target(), FieldAttribute.class).name(), is("first_name")); | ||
} | ||
|
||
public void testDontPushDownLimitPastMvExpand() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add as javadoc the expanded plans.
@@ -653,8 +656,22 @@ protected LogicalPlan rule(Enrich re) { | |||
} | |||
} | |||
|
|||
protected static class PushDownAndCombineOrderBy extends OptimizerRules.OptimizerRule<OrderBy> { | |||
protected static class PushDownMvExpand extends OptimizerRules.OptimizerRule<MvExpand> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the advantage of pushing down MvExpand? I'd argue we want the opposite as this increases the amount of data earlier in the pipeline.
@@ -383,7 +385,8 @@ protected LogicalPlan rule(Limit limit) { | |||
// check if there's a 'visible' descendant limit lower than the current one | |||
// and if so, align the current limit since it adds no value | |||
// this applies for cases such as | limit 1 | sort field | limit 10 | |||
else { | |||
// but NOT for mv_expand (ie | limit 1 | mv_expand x | limit 20) where we want that last "limit" to apply on expand results | |||
else if (unary instanceof MvExpand == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this works only if the limit is right above mv_expand
- if it's hidden by another node (keep, where, etc..) the rule won't see it.
Move the check into descendantLimit so that if mv_expand is encountered, just like with an agg, no limit is returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me add a couple of comments, because I think the situation here is a bit more convoluted:
- pushing down
mv_expand
is a very peculiar operation:sort a, b | mv_expand c
could be rewritten asmv_expand c | sort a, b
(they have the same semantics), but this doesn't apply tosort a, b | mv_expand a
(ifa
is expanded before the sort, the final result will be different) - pushing down
limit
vsmv_expand
can still be done (and is necessary in some cases), but with a slightly different logic:mv_expand a | limit 10
is different fromlimit 10 | mv_expand a
, but is equivalent tolimit 10 | mv_expand a | limit 10
. We will need this kind of logic especially when we havesort
, eg. forsort a | mv_expand b | limit 10
, if we don't push downlimit
, we won't be able to build a TopN. - if we apply the rule above (ie. push down the limit, but also maintain the original limit at the end) we have to be careful, the plan optimization could enter in an infinite loop, eg.
sort ... | keep ... | mv_expand a | limit 10
sort ... | keep ... | limit 10 | mv_expand a | limit 10
sort ... | limit 10 | keep ... | mv_expand a | limit 10
topN(10) | keep ... | mv_expand a | limit 10
topN(10) | keep ... | limit 10 | mv_expand a | limit 10
topN(10) | limit 10 | keep ... | mv_expand a | limit 10
and so on
mv_expand
is a pretty heavy command (mostly in terms of memory consumption), so having alimit
after it could be a sub-optimal solution. It could be much better to enhance MvExpandOperator with an internal limit, like for TopN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @astefan, I think we are much closer to a solution, but apparently there are still some cases that are not covered.
I managed to break the planning (unknown physical plan node [OrderExec]
) with the following:
row a = 1 | sort a | mv_expand a | eval b = 100 | sort b | limit 10
I think the problem here is that MV_EXPAND between the two SORTs cuts out some optimization rules, so that the first sort (that is practically useless) does not get removed
@luigidellaquila thank you for this scenario. I didn't think of testing such a query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a small round of comments - a potential problem with the current approach is looking at things top-down vs bottom-up; the later might simplify the rule.
while (plan instanceof Aggregate == false) { | ||
if (plan instanceof Limit limit) { | ||
return limit; | ||
} else if (plan instanceof MvExpand) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not apply the same behavior for Aggregate - stop searching for a limit once an MvExpand is found?
else if (unary instanceof MvExpand || (unary instanceof OrderBy orderBy && orderBy.child() instanceof MvExpand)) { | ||
MvExpand mvExpand = unary instanceof MvExpand mve ? mve : (MvExpand) (unary.child()); | ||
Limit limitBeforeMvExpand = limitBeforeMvExpand(mvExpand); | ||
// if there is no "appropriate" limit before mv_expand, then push down a copy of the one after it so that: | ||
// - a possible TopN is properly built as low as possible in the tree (closed to Lucene) | ||
// - the input of mv_expand is as small as possible before it is expanded (less rows to inflate and occupy memory) | ||
if (limitBeforeMvExpand == null) { | ||
var duplicateLimit = new Limit(limit.source(), limit.limit(), mvExpand.child()); | ||
if (unary instanceof OrderBy orderBy) { | ||
return limit.replaceChild(orderBy.replaceChild(mvExpand.replaceChild(duplicateLimit))); | ||
} else { | ||
return limit.replaceChild(mvExpand.replaceChild(duplicateLimit)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block needs to be simplified as the main condition is checked 2-3 times.
do a simple boolean isMvExpand = unary instanceof MvExpand, boolean orderByChild = unary instanceof OrderBy && ...
then use those later in the code.
Add a default TopN for cases when there is only a sort at Lucene level
I have another solution to how we handle mv_expand that is also adapted to further scenarios discovered in the meantime. But it has some bigger implications, non-ideal ones. This change is not final, but I am providing below these new points for awareness purposes and feedback.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
The comments below can be done in a separate PR as they are mainly cosmetic and do NOT impact the 8.11 code.
The issue this ticket works on is dealing with generators, that is operators that create additional rows.
We only have them as a Source and thus no mechanism per se and thus makes this issue somewhat out of scope (one approach is to let the generator decide whether it can impose a limit or not internally).
My main comment is around moving the MvExpand handling from the CombineAndPushDownLimit into a separate rule - it shares little code with that rule and also complicated the logic.
Secondary, avoid making the Optimizer context aware and instead pick up the default limit from the plan (potentially through a separate path). If the limit was somehow optimized, try to incorporate that.
Again this is more of a tweak and can be done separately. On the flip side it would simplify the size of this PR.
FTR, I've raised #101248 to get away from the weird gradle compilation errors in the CI (and I was able to reproduce). I've cherry picked the changes, squashed them and rebased them on main. |
Add OrderBy node type to the exceptions for duplicating the limit after mv_expand
Thanks @costin. I've found why the CI was complaining, as you guessed it was a faulty merge.
I've extracted that specific logic in a separate rule.
I've created #101266 to track this. |
💔 Backport failed
You can use sqren/backport to manually backport by running |
… untouched (elastic#100782) - allow mv_expand to push down limit and project past it - accept a limit after mv_expand when there is also a second limit before the mv_expand - adds a default TopN for cases when there is only a sort at Lucene level - adds OrderBy node type to the exceptions for duplicating the limit after mv_expand (cherry picked from commit 4679b09)
… untouched (#100782) (#101268) - allow mv_expand to push down limit and project past it - accept a limit after mv_expand when there is also a second limit before the mv_expand - adds a default TopN for cases when there is only a sort at Lucene level - adds OrderBy node type to the exceptions for duplicating the limit after mv_expand (cherry picked from commit 4679b09)
Allow mv_expand to push down limit and project past it.
Have the limit before mv_expand to not override the limit after it, since mv_expand is special from this point of view in the sense that it creates more rows than the original ones as defined by the
from
command.Fixes #99971
Fixes #100774