Skip to content

Commit 6e3329f

Browse files
authored
Prevent aggregation push down when it has inner filter (#4002)
* Prevent aggregation push down when it has inner filter Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix IT & Remove log Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix 4009 Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix IT Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent 569bf94 commit 6e3329f

File tree

9 files changed

+159
-12
lines changed

9 files changed

+159
-12
lines changed

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,17 @@ public void testExplainWithReverse() throws IOException {
134134
assertTrue(result.contains("dir0=[DESC]"));
135135
}
136136

137+
@Test
138+
public void noPushDownForAggOnWindow() throws IOException {
139+
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
140+
String query =
141+
"source=opensearch-sql_test_index_account | patterns address method=BRAIN | stats count()"
142+
+ " by patterns_field";
143+
var result = explainQueryToString(query);
144+
String expected = loadFromFile("expectedOutput/calcite/explain_agg_on_window.json");
145+
assertJsonEqualsIgnoreId(expected, result);
146+
}
147+
137148
/**
138149
* Executes the PPL query and returns the result as a string with windows-style line breaks
139150
* replaced with Unix-style ones.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(count()=[$1], patterns_field=[$0])\n LogicalAggregate(group=[{0}], count()=[COUNT()])\n LogicalProject(patterns_field=[SAFE_CAST(ITEM(PATTERN_PARSER($2, pattern($2, 10, 100000) OVER ()), 'pattern'))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], patterns_field=[$t0])\n EnumerableAggregate(group=[{0}], count()=[COUNT()])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[PATTERN_PARSER($t0, $t1)], expr#3=['pattern'], expr#4=[ITEM($t2, $t3)], expr#5=[SAFE_CAST($t4)], patterns_field=[$t5])\n EnumerableWindow(window#0=[window(aggs [pattern($0, $1, $2)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[address]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"address\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
33
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(age2=[$2])\n LogicalFilter(condition=[<=($3, 1)])\n LogicalProject(avg_age=[$0], state=[$1], age2=[$2], _row_number_=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $2)])\n LogicalFilter(condition=[IS NOT NULL($2)])\n LogicalProject(avg_age=[$0], state=[$1], age2=[+($0, 2)])\n LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n LogicalProject(avg_age=[$2], state=[$0], city=[$1])\n LogicalAggregate(group=[{0, 1}], avg_age=[AVG($2)])\n LogicalProject(state=[$7], city=[$5], age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], age2=[$t1], $condition=[$t4])\n EnumerableWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[2], expr#4=[+($t2, $t3)], expr#5=[IS NOT NULL($t2)], state=[$t0], age2=[$t4], $condition=[$t5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), SORT->[0 ASC FIRST]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":10,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], age2=[$t1], $condition=[$t4])\n EnumerableWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[2], expr#4=[+($t2, $t3)], expr#5=[IS NOT NULL($t2)], state=[$t0], age2=[$t4], $condition=[$t5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), SORT->[0 ASC FIRST]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
setup:
2+
- do:
3+
query.settings:
4+
body:
5+
transient:
6+
plugins.calcite.enabled : true
7+
8+
- do:
9+
indices.create:
10+
index: test
11+
body:
12+
settings:
13+
number_of_shards: 1
14+
number_of_replicas: 0
15+
mappings:
16+
properties:
17+
body:
18+
type: text
19+
20+
---
21+
teardown:
22+
- do:
23+
query.settings:
24+
body:
25+
transient:
26+
plugins.calcite.enabled : false
27+
28+
---
29+
"Fix incorrectly push down aggregate with filter":
30+
- skip:
31+
features:
32+
- headers
33+
- allowed_warnings
34+
- do:
35+
bulk:
36+
index: test
37+
refresh: true
38+
body:
39+
- '{"index": {}}'
40+
- '{"body": "[2025-07-31T05:44:45.164Z] \"GET /api/data HTTP/1.1\" 200 - via_upstream - \"-\" 0 221 2 2 \"-\" \"python-requests/2.32.4\" \"80a2a234-bbb2-9bf3-bbc6-ba7554aee8b6\" \"frontend-proxy:8080\" \"172.18.0.25:8080\" frontend 172.18.0.27:46596 172.18.0.27:8080 172.18.0.26:53294 - -"}'
41+
42+
- do:
43+
allowed_warnings:
44+
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
45+
headers:
46+
Content-Type: 'application/json'
47+
ppl:
48+
body:
49+
query: source=test | parse body 'HTTP/1.1\" (?<httpstatus>\\d+)' | eval status2xx=if(httpstatus>='200' and httpstatus<'300', 1, 0), status3xx=if(httpstatus>='300' and httpstatus<'400', 1, 0), status4xx=if(httpstatus>='400' and httpstatus<'500', 1, 0), status5xx=if(httpstatus>='500' and httpstatus<'600', 1, 0), statusOther=if(httpstatus>='600', 1, 0) | stats count() as `Request Count`, sum(status2xx) as `HTTP 2xx`, sum(status3xx) as `HTTP 3xx`, sum(status4xx) as `HTTP 4xx`, sum(status5xx) as `HTTP 5xx`, sum(statusOther) as `Other`
50+
- match: {"total": 1}
51+
- match: {"schema": [{"name": "Request Count", "type": "bigint"}, {"name": "HTTP 2xx", "type": "bigint"}, {"name": "HTTP 3xx", "type": "bigint"}, {"name": "HTTP 4xx", "type": "bigint"}, {"name": "HTTP 5xx", "type": "bigint"}, {"name": "Other", "type": "bigint"}]}
52+
- match: {"datarows": [[1, 1, 0, 0, 0, 0]]}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
setup:
2+
- do:
3+
query.settings:
4+
body:
5+
transient:
6+
plugins.calcite.enabled : true
7+
8+
- do:
9+
indices.create:
10+
index: test
11+
body:
12+
settings:
13+
number_of_shards: 1
14+
number_of_replicas: 0
15+
mappings:
16+
properties:
17+
body:
18+
type: text
19+
20+
---
21+
teardown:
22+
- do:
23+
query.settings:
24+
body:
25+
transient:
26+
plugins.calcite.enabled : false
27+
28+
---
29+
"Fix bucket size missing":
30+
- skip:
31+
features:
32+
- headers
33+
- allowed_warnings
34+
- do:
35+
bulk:
36+
index: test
37+
refresh: true
38+
body:
39+
- '{"index": {}}'
40+
- '{ "title" : "document 1"}'
41+
- '{"index": {}}'
42+
- '{ "title" : "document 2"}'
43+
- '{"index": {}}'
44+
- '{ "title" : "document 3"}'
45+
- '{"index": {}}'
46+
- '{ "title" : "document 4"}'
47+
- '{"index": {}}'
48+
- '{ "title" : "document 5"}'
49+
- '{"index": {}}'
50+
- '{ "title" : "document 6"}'
51+
- '{"index": {}}'
52+
- '{ "title" : "document 7"}'
53+
- '{"index": {}}'
54+
- '{ "title" : "document 8"}'
55+
- '{"index": {}}'
56+
- '{ "title" : "document 9"}'
57+
- '{"index": {}}'
58+
- '{ "title" : "document 10"}'
59+
- '{"index": {}}'
60+
- '{ "title" : "document 11"}'
61+
62+
- do:
63+
allowed_warnings:
64+
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
65+
headers:
66+
Content-Type: 'application/json'
67+
ppl:
68+
body:
69+
query: source=test | stats count() by title | sort title
70+
- match: {"total": 11}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.function.Predicate;
88
import org.apache.calcite.plan.RelOptRuleCall;
99
import org.apache.calcite.plan.RelRule;
10+
import org.apache.calcite.rel.core.AggregateCall;
1011
import org.apache.calcite.rel.logical.LogicalAggregate;
1112
import org.apache.calcite.rel.logical.LogicalProject;
1213
import org.apache.calcite.sql.SqlKind;
@@ -64,10 +65,17 @@ public interface Config extends RelRule.Config {
6465
.withOperandSupplier(
6566
b0 ->
6667
b0.operand(LogicalAggregate.class)
68+
.predicate(
69+
agg ->
70+
// Cannot push down aggregation with inner filter
71+
agg.getAggCallList().stream().noneMatch(AggregateCall::hasFilter))
6772
.oneInput(
6873
b1 ->
6974
b1.operand(LogicalProject.class)
70-
.predicate(OpenSearchIndexScanRule::distinctProjectList)
75+
.predicate(
76+
// Don't push down aggregate on window function
77+
Predicate.not(OpenSearchIndexScanRule::containsRexOver)
78+
.and(OpenSearchIndexScanRule::distinctProjectList))
7179
.oneInput(
7280
b2 ->
7381
b2.operand(CalciteLogicalIndexScan.class)
@@ -92,7 +100,8 @@ public interface Config extends RelRule.Config {
92100
.allMatch(
93101
call ->
94102
call.getAggregation().kind == SqlKind.COUNT
95-
&& call.getArgList().isEmpty()))
103+
&& call.getArgList().isEmpty()
104+
&& !call.hasFilter()))
96105
.oneInput(
97106
b1 ->
98107
b1.operand(CalciteLogicalIndexScan.class)

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.calcite.rel.logical.LogicalProject;
1313
import org.apache.calcite.rel.logical.LogicalSort;
1414
import org.apache.calcite.rex.RexNode;
15+
import org.apache.calcite.rex.RexOver;
1516
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
1617
import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan;
1718

@@ -42,6 +43,10 @@ static boolean distinctProjectList(LogicalProject project) {
4243
return project.getProjects().stream().allMatch(rexSet::add);
4344
}
4445

46+
static boolean containsRexOver(LogicalProject project) {
47+
return project.getProjects().stream().anyMatch(RexOver::containsOver);
48+
}
49+
4550
/**
4651
* The LogicalSort is a LIMIT that should be pushed down when its fetch field is not null and its
4752
* collation is empty. For example: <code>sort name | head 5</code> should not be pushed down

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static java.util.Objects.requireNonNull;
99
import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR;
10+
import static org.opensearch.sql.opensearch.request.AggregateAnalyzer.AGGREGATION_BUCKET_SIZE;
1011

1112
import java.util.ArrayDeque;
1213
import java.util.ArrayList;
@@ -293,8 +294,6 @@ public AbstractCalciteIndexScan pushDownSort(List<RelFieldCollation> collations)
293294
} catch (Exception e) {
294295
if (LOG.isDebugEnabled()) {
295296
LOG.debug("Cannot pushdown the sort {}", getCollationNames(collations), e);
296-
} else {
297-
LOG.info("Cannot pushdown the sort {}, ", getCollationNames(collations));
298297
}
299298
}
300299
return null;
@@ -389,7 +388,8 @@ public void pushDownSortIntoAggBucket(List<RelFieldCollation> collations) {
389388
Pair.of(
390389
Collections.singletonList(
391390
AggregationBuilders.composite("composite_buckets", newBuckets)
392-
.subAggregations(newAggBuilder)),
391+
.subAggregations(newAggBuilder)
392+
.size(AGGREGATION_BUCKET_SIZE)),
393393
aggregationBuilder.getRight());
394394
}
395395
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,6 @@ public AbstractRelNode pushDownFilter(Filter filter) {
142142
} catch (Exception e) {
143143
if (LOG.isDebugEnabled()) {
144144
LOG.debug("Cannot pushdown the filter condition.", e);
145-
} else {
146-
LOG.info("Cannot pushdown the filter condition.");
147145
}
148146
}
149147
return null;
@@ -248,8 +246,6 @@ public CalciteLogicalIndexScan pushDownAggregate(Aggregate aggregate, Project pr
248246
} catch (Exception e) {
249247
if (LOG.isDebugEnabled()) {
250248
LOG.debug("Cannot pushdown the aggregate {}", aggregate, e);
251-
} else {
252-
LOG.info("Cannot pushdown the aggregate {}, ", aggregate);
253249
}
254250
}
255251
return null;
@@ -267,8 +263,6 @@ public CalciteLogicalIndexScan pushDownLimit(Integer limit, Integer offset) {
267263
} catch (Exception e) {
268264
if (LOG.isDebugEnabled()) {
269265
LOG.debug("Cannot pushdown limit {} with offset {}", limit, offset, e);
270-
} else {
271-
LOG.info("Cannot pushdown limit {} with offset {}", limit, offset);
272266
}
273267
}
274268
return null;

0 commit comments

Comments
 (0)