From 53a591be4a4a23bd80dfba6e71d2379637d00def Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 11 Aug 2025 18:30:10 +0000 Subject: [PATCH 1/2] Prevent aggregation push down when it has inner filter (#4002) * Prevent aggregation push down when it has inner filter Signed-off-by: Heng Qian * Fix IT & Remove log Signed-off-by: Heng Qian * Fix 4009 Signed-off-by: Heng Qian * Fix IT Signed-off-by: Heng Qian --------- Signed-off-by: Heng Qian (cherry picked from commit 6e3329faca3efd77a2f9746194664ccf8ea1df73) Signed-off-by: github-actions[bot] --- .../sql/calcite/remote/CalciteExplainIT.java | 11 +++ .../calcite/explain_agg_on_window.json | 6 ++ .../calcite/explain_output.json | 2 +- .../rest-api-spec/test/issues/3996.yml | 52 ++++++++++++++ .../rest-api-spec/test/issues/4009.yml | 70 +++++++++++++++++++ .../OpenSearchAggregateIndexScanRule.java | 13 +++- .../physical/OpenSearchIndexScanRule.java | 5 ++ .../scan/AbstractCalciteIndexScan.java | 6 +- .../storage/scan/CalciteLogicalIndexScan.java | 6 -- 9 files changed, 159 insertions(+), 12 deletions(-) create mode 100644 integ-test/src/test/resources/expectedOutput/calcite/explain_agg_on_window.json create mode 100644 integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml create mode 100644 integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4009.yml diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 2c9ba3107f7..92b30686294 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -134,6 +134,17 @@ public void testExplainWithReverse() throws IOException { assertTrue(result.contains("dir0=[DESC]")); } + @Test + public void noPushDownForAggOnWindow() throws IOException { + Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + String query = + "source=opensearch-sql_test_index_account | patterns address method=BRAIN | stats count()" + + " by patterns_field"; + var result = explainQueryToString(query); + String expected = loadFromFile("expectedOutput/calcite/explain_agg_on_window.json"); + assertJsonEqualsIgnoreId(expected, result); + } + /** * Executes the PPL query and returns the result as a string with windows-style line breaks * replaced with Unix-style ones. diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_on_window.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_on_window.json new file mode 100644 index 00000000000..e5c93f8aa41 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_on_window.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(count()=[$1], patterns_field=[$0])\n LogicalAggregate(group=[{0}], count()=[COUNT()])\n LogicalProject(patterns_field=[SAFE_CAST(ITEM(PATTERN_PARSER($2, pattern($2, 10, 100000) OVER ()), 'pattern'))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], patterns_field=[$t0])\n EnumerableAggregate(group=[{0}], count()=[COUNT()])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[PATTERN_PARSER($t0, $t1)], expr#3=['pattern'], expr#4=[ITEM($t2, $t3)], expr#5=[SAFE_CAST($t4)], patterns_field=[$t5])\n EnumerableWindow(window#0=[window(aggs [pattern($0, $1, $2)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[address]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"address\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_output.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_output.json index 874d675de35..9474e4d1e31 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_output.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_output.json @@ -1,6 +1,6 @@ { "calcite": { "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(age2=[$2])\n LogicalFilter(condition=[<=($3, 1)])\n LogicalProject(avg_age=[$0], state=[$1], age2=[$2], _row_number_=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $2)])\n LogicalFilter(condition=[IS NOT NULL($2)])\n LogicalProject(avg_age=[$0], state=[$1], age2=[+($0, 2)])\n LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n LogicalProject(avg_age=[$2], state=[$0], city=[$1])\n LogicalAggregate(group=[{0, 1}], avg_age=[AVG($2)])\n LogicalProject(state=[$7], city=[$5], age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], age2=[$t1], $condition=[$t4])\n EnumerableWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[2], expr#4=[+($t2, $t3)], expr#5=[IS NOT NULL($t2)], state=[$t0], age2=[$t4], $condition=[$t5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), SORT->[0 ASC FIRST]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":10,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[<=($t2, $t3)], age2=[$t1], $condition=[$t4])\n EnumerableWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])\n EnumerableCalc(expr#0..2=[{inputs}], expr#3=[2], expr#4=[+($t2, $t3)], expr#5=[IS NOT NULL($t2)], state=[$t0], age2=[$t4], $condition=[$t5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2)), SORT->[0 ASC FIRST]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" } } diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml new file mode 100644 index 00000000000..0b2ac29bc42 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml @@ -0,0 +1,52 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : true + + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + body: + type: text + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + +--- +"Fix incorrectly push down aggregate with filter": + - skip: + features: + - headers + - allowed_warnings + - do: + bulk: + index: test + refresh: true + body: + - '{"index": {}}' + - '{"body": "[2025-07-31T05:44:45.164Z] \"GET /api/data HTTP/1.1\" 200 - via_upstream - \"-\" 0 221 2 2 \"-\" \"python-requests/2.32.4\" \"80a2a234-bbb2-9bf3-bbc6-ba7554aee8b6\" \"frontend-proxy:8080\" \"172.18.0.25:8080\" frontend 172.18.0.27:46596 172.18.0.27:8080 172.18.0.26:53294 - -"}' + + - do: + allowed_warnings: + - 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled' + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test | parse body 'HTTP/1.1\" (?\\d+)' | eval status2xx=if(httpstatus>='200' and httpstatus<'300', 1, 0), status3xx=if(httpstatus>='300' and httpstatus<'400', 1, 0), status4xx=if(httpstatus>='400' and httpstatus<'500', 1, 0), status5xx=if(httpstatus>='500' and httpstatus<'600', 1, 0), statusOther=if(httpstatus>='600', 1, 0) | stats count() as `Request Count`, sum(status2xx) as `HTTP 2xx`, sum(status3xx) as `HTTP 3xx`, sum(status4xx) as `HTTP 4xx`, sum(status5xx) as `HTTP 5xx`, sum(statusOther) as `Other` + - match: {"total": 1} + - match: {"schema": [{"name": "Request Count", "type": "bigint"}, {"name": "HTTP 2xx", "type": "bigint"}, {"name": "HTTP 3xx", "type": "bigint"}, {"name": "HTTP 4xx", "type": "bigint"}, {"name": "HTTP 5xx", "type": "bigint"}, {"name": "Other", "type": "bigint"}]} + - match: {"datarows": [[1, 1, 0, 0, 0, 0]]} diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4009.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4009.yml new file mode 100644 index 00000000000..ed53edbdb09 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4009.yml @@ -0,0 +1,70 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : true + + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + body: + type: text + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + +--- +"Fix bucket size missing": + - skip: + features: + - headers + - allowed_warnings + - do: + bulk: + index: test + refresh: true + body: + - '{"index": {}}' + - '{ "title" : "document 1"}' + - '{"index": {}}' + - '{ "title" : "document 2"}' + - '{"index": {}}' + - '{ "title" : "document 3"}' + - '{"index": {}}' + - '{ "title" : "document 4"}' + - '{"index": {}}' + - '{ "title" : "document 5"}' + - '{"index": {}}' + - '{ "title" : "document 6"}' + - '{"index": {}}' + - '{ "title" : "document 7"}' + - '{"index": {}}' + - '{ "title" : "document 8"}' + - '{"index": {}}' + - '{ "title" : "document 9"}' + - '{"index": {}}' + - '{ "title" : "document 10"}' + - '{"index": {}}' + - '{ "title" : "document 11"}' + + - do: + allowed_warnings: + - 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled' + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test | stats count() by title | sort title + - match: {"total": 11} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java index 35c171ab0f7..a93fecd2df1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java @@ -7,6 +7,7 @@ import java.util.function.Predicate; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.sql.SqlKind; @@ -64,10 +65,17 @@ public interface Config extends RelRule.Config { .withOperandSupplier( b0 -> b0.operand(LogicalAggregate.class) + .predicate( + agg -> + // Cannot push down aggregation with inner filter + agg.getAggCallList().stream().noneMatch(AggregateCall::hasFilter)) .oneInput( b1 -> b1.operand(LogicalProject.class) - .predicate(OpenSearchIndexScanRule::distinctProjectList) + .predicate( + // Don't push down aggregate on window function + Predicate.not(OpenSearchIndexScanRule::containsRexOver) + .and(OpenSearchIndexScanRule::distinctProjectList)) .oneInput( b2 -> b2.operand(CalciteLogicalIndexScan.class) @@ -92,7 +100,8 @@ public interface Config extends RelRule.Config { .allMatch( call -> call.getAggregation().kind == SqlKind.COUNT - && call.getArgList().isEmpty())) + && call.getArgList().isEmpty() + && !call.hasFilter())) .oneInput( b1 -> b1.operand(CalciteLogicalIndexScan.class) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java index 010f7ee47ca..ebedac0284c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java @@ -12,6 +12,7 @@ import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; @@ -42,6 +43,10 @@ static boolean distinctProjectList(LogicalProject project) { return project.getProjects().stream().allMatch(rexSet::add); } + static boolean containsRexOver(LogicalProject project) { + return project.getProjects().stream().anyMatch(RexOver::containsOver); + } + /** * The LogicalSort is a LIMIT that should be pushed down when its fetch field is not null and its * collation is empty. For example: sort name | head 5 should not be pushed down diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java index 68f477d834c..fd08b53803f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR; +import static org.opensearch.sql.opensearch.request.AggregateAnalyzer.AGGREGATION_BUCKET_SIZE; import java.util.ArrayDeque; import java.util.ArrayList; @@ -316,8 +317,6 @@ public AbstractCalciteIndexScan pushDownSort(List collations) } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Cannot pushdown the sort {}", getCollationNames(collations), e); - } else { - LOG.info("Cannot pushdown the sort {}, ", getCollationNames(collations)); } } return null; @@ -438,7 +437,8 @@ public void pushDownSortIntoAggBucket(List collations) { Pair.of( Collections.singletonList( AggregationBuilders.composite("composite_buckets", newBuckets) - .subAggregations(newAggBuilder)), + .subAggregations(newAggBuilder) + .size(AGGREGATION_BUCKET_SIZE)), aggregationBuilder.getRight()); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index a344a9b61c7..26d85dc601c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -142,8 +142,6 @@ public AbstractRelNode pushDownFilter(Filter filter) { } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Cannot pushdown the filter condition.", e); - } else { - LOG.info("Cannot pushdown the filter condition."); } } return null; @@ -247,8 +245,6 @@ public CalciteLogicalIndexScan pushDownAggregate(Aggregate aggregate, Project pr } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Cannot pushdown the aggregate {}", aggregate, e); - } else { - LOG.info("Cannot pushdown the aggregate {}, ", aggregate); } } return null; @@ -266,8 +262,6 @@ public CalciteLogicalIndexScan pushDownLimit(Integer limit, Integer offset) { } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Cannot pushdown limit {} with offset {}", limit, offset, e); - } else { - LOG.info("Cannot pushdown limit {} with offset {}", limit, offset); } } return null; From a43bd3936e503d08ad05f7095ef8abc65e53da74 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Mon, 18 Aug 2025 11:00:07 +0800 Subject: [PATCH 2/2] Fix IT Signed-off-by: Heng Qian --- .../yamlRestTest/resources/rest-api-spec/test/issues/3996.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml index 0b2ac29bc42..f5c22c31be9 100644 --- a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3996.yml @@ -46,7 +46,7 @@ teardown: Content-Type: 'application/json' ppl: body: - query: source=test | parse body 'HTTP/1.1\" (?\\d+)' | eval status2xx=if(httpstatus>='200' and httpstatus<'300', 1, 0), status3xx=if(httpstatus>='300' and httpstatus<'400', 1, 0), status4xx=if(httpstatus>='400' and httpstatus<'500', 1, 0), status5xx=if(httpstatus>='500' and httpstatus<'600', 1, 0), statusOther=if(httpstatus>='600', 1, 0) | stats count() as `Request Count`, sum(status2xx) as `HTTP 2xx`, sum(status3xx) as `HTTP 3xx`, sum(status4xx) as `HTTP 4xx`, sum(status5xx) as `HTTP 5xx`, sum(statusOther) as `Other` + query: source=test | parse body 'HTTP/1.1\" (?\d+)' | eval status2xx=if(httpstatus>='200' and httpstatus<'300', 1, 0), status3xx=if(httpstatus>='300' and httpstatus<'400', 1, 0), status4xx=if(httpstatus>='400' and httpstatus<'500', 1, 0), status5xx=if(httpstatus>='500' and httpstatus<'600', 1, 0), statusOther=if(httpstatus>='600', 1, 0) | stats count() as `Request Count`, sum(status2xx) as `HTTP 2xx`, sum(status3xx) as `HTTP 3xx`, sum(status4xx) as `HTTP 4xx`, sum(status5xx) as `HTTP 5xx`, sum(statusOther) as `Other` - match: {"total": 1} - match: {"schema": [{"name": "Request Count", "type": "bigint"}, {"name": "HTTP 2xx", "type": "bigint"}, {"name": "HTTP 3xx", "type": "bigint"}, {"name": "HTTP 4xx", "type": "bigint"}, {"name": "HTTP 5xx", "type": "bigint"}, {"name": "Other", "type": "bigint"}]} - match: {"datarows": [[1, 1, 0, 0, 0, 0]]}