diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchRuleConfig.java b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchRuleConfig.java new file mode 100644 index 00000000000..ca031afbb51 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchRuleConfig.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan; + +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.tools.RelBuilderFactory; +import org.immutables.value.Value; +import org.opensearch.sql.calcite.utils.CalciteToolsHelper; + +public interface OpenSearchRuleConfig extends RelRule.Config { + + /** Return a custom RelBuilderFactory for creating OpenSearchRelBuilder */ + @Override + @Value.Default + default RelBuilderFactory relBuilderFactory() { + return CalciteToolsHelper.proto(Contexts.of(RelFactories.DEFAULT_STRUCT)); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggGroupMergeRule.java b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggGroupMergeRule.java index 5a6ac7d5370..01ac18f1fb1 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggGroupMergeRule.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggGroupMergeRule.java @@ -105,7 +105,7 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { Config GROUP_MERGE = ImmutablePPLAggGroupMergeRule.Config.builder() .build() diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java index 2b8a6b4b0e0..6c5d6172fec 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java @@ -241,7 +241,7 @@ private RexNode aliasMaybe(RelBuilder builder, RexNode node, String alias) { /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { Config SUM_CONVERTER = ImmutablePPLAggregateConvertRule.Config.builder() .build() diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index 463706d52c3..23c83749389 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -83,6 +83,7 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; import org.apache.calcite.tools.RelRunner; import org.apache.calcite.util.Holder; import org.apache.calcite.util.Util; @@ -127,6 +128,10 @@ public static Connection connect(FrameworkConfig config, JavaTypeFactory typeFac } } + public static RelBuilderFactory proto(final Context context) { + return (cluster, schema) -> new OpenSearchRelBuilder(context, cluster, schema); + } + /** * This method copied from {@link Frameworks#withPrepare(FrameworkConfig, * Frameworks.BasePrepareAction)}. The purpose is the method {@link diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java index 056e1840f8e..27a5db80bd4 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/clickbench/PPLClickBenchIT.java @@ -72,12 +72,12 @@ public void test() throws IOException { } logger.info("Running Query{}", i); String ppl = sanitize(loadFromFile("clickbench/queries/q" + i + ".ppl")); - timing(summary, "q" + i, ppl); // V2 gets unstable scripts, ignore them when comparing plan if (isCalciteEnabled()) { String expected = loadExpectedPlan("clickbench/q" + i + ".yaml"); assertYamlEqualsIgnoreId(expected, explainQueryYaml(ppl)); } + timing(summary, "q" + i, ppl); } } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index cdd8dc18471..9a7f425b1c3 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -1187,6 +1187,24 @@ public void testExplainSortOnMeasureMultiTermsWithScript() throws IOException { + " sort `count()`")); } + @Test + public void testExplainSortOnMeasureComplex() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_agg_sort_on_measure_complex1.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_account | stats bucket_nullable=false sum(balance)," + + " count() as c, dc(employer) by state | sort - c")); + expected = loadExpectedPlan("explain_agg_sort_on_measure_complex2.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_account | eval new_state = lower(state) | stats" + + " bucket_nullable=false sum(balance), count(), dc(employer) as d by gender," + + " new_state | sort - d")); + } + @Test public void testExplainCompositeMultiBucketsAutoDateThenSortOnMeasureNotPushdown() throws IOException { @@ -1239,7 +1257,7 @@ public void testExplainCompositeRangeAutoDateThenSortOnMeasureNotPushdown() thro } @Test - public void testExplainMultipleAggregatorsWithSortOnOneMeasureNotPushDown() throws IOException { + public void testExplainMultipleCollationsWithSortOnOneMeasureNotPushDown() throws IOException { enabledOnlyWhenPushdownIsEnabled(); String expected = loadExpectedPlan("explain_multiple_agg_with_sort_on_one_measure_not_push1.yaml"); @@ -1247,7 +1265,7 @@ public void testExplainMultipleAggregatorsWithSortOnOneMeasureNotPushDown() thro expected, explainQueryYaml( "source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c," - + " sum(balance) as s by state | sort c")); + + " sum(balance) as s by state | sort c, state")); expected = loadExpectedPlan("explain_multiple_agg_with_sort_on_one_measure_not_push2.yaml"); assertYamlEqualsIgnoreId( expected, @@ -1256,6 +1274,17 @@ public void testExplainMultipleAggregatorsWithSortOnOneMeasureNotPushDown() thro + " sum(balance) as s by state | sort c, s")); } + @Test + public void testExplainSortOnMeasureMultiBucketsNotMultiTermsNotPushDown() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_agg_sort_on_measure_multi_buckets_not_pushed.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c," + + " sum(balance) as s by state, span(age, 5) | sort c")); + } + @Test public void testExplainEvalMax() throws IOException { String expected = loadExpectedPlan("explain_eval_max.json"); diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java index 446643388b7..da857179be7 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StatsCommandIT.java @@ -1166,4 +1166,55 @@ public void testStatsSpanSortOnMeasureMultiTermsWithScript() throws IOException resetQueryBucketSize(); } } + + @Test + public void testStatsSortOnMeasureComplex() throws IOException { + try { + setQueryBucketSize(5); + JSONObject response = + executeQuery( + String.format( + "source=%s | stats bucket_nullable=false sum(balance), count() as c, dc(employer)" + + " as d by state | sort - c | head 5", + TEST_INDEX_ACCOUNT)); + verifySchema( + response, + schema("sum(balance)", null, "bigint"), + schema("c", null, "bigint"), + schema("d", null, "bigint"), + schema("state", null, "string")); + System.out.println(response); + verifyDataRows( + response, + rows(782199, 30, 30, "TX"), + rows(732523, 28, 28, "MD"), + rows(657957, 27, 27, "ID"), + rows(541575, 25, 25, "ME"), + rows(643489, 25, 25, "AL")); + response = + executeQuery( + String.format( + "source=%s | eval new_state = lower(state) | stats bucket_nullable=false" + + " sum(balance), count() as c, dc(employer) as d by gender, new_state | sort" + + " - d | head 5", + TEST_INDEX_ACCOUNT)); + verifySchema( + response, + schema("sum(balance)", null, "bigint"), + schema("c", null, "bigint"), + schema("d", null, "bigint"), + schema("gender", null, "string"), + schema("new_state", null, "string")); + System.out.println(response); + verifyDataRows( + response, + rows(484567, 18, 18, "M", "md"), + rows(376394, 17, 17, "M", "id"), + rows(505688, 17, 17, "F", "tx"), + rows(375409, 16, 16, "M", "me"), + rows(432776, 15, 15, "M", "ok")); + } finally { + resetQueryBucketSize(); + } + } } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q10.yaml b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q10.yaml index 8138d506a93..4e6c0e1f59f 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q10.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q10.yaml @@ -8,7 +8,4 @@ calcite: LogicalFilter(condition=[IS NOT NULL($68)]) CalciteLogicalIndexScan(table=[[OpenSearch, hits]]) physical: | - EnumerableLimit(fetch=[10000]) - EnumerableLimit(fetch=[10]) - EnumerableSort(sort0=[$1], dir0=[DESC-nulls-last]) - CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},sum(AdvEngineID)=SUM($0),c=COUNT(),avg(ResolutionWidth)=AVG($2),dc(UserID)=COUNT(DISTINCT $3)), PROJECT->[sum(AdvEngineID), c, avg(ResolutionWidth), dc(UserID), RegionID]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"RegionID":{"terms":{"field":"RegionID","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"sum(AdvEngineID)":{"sum":{"field":"AdvEngineID"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"dc(UserID)":{"cardinality":{"field":"UserID"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},sum(AdvEngineID)=SUM($0),c=COUNT(),avg(ResolutionWidth)=AVG($2),dc(UserID)=COUNT(DISTINCT $3)), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[sum(AdvEngineID), c, avg(ResolutionWidth), dc(UserID), RegionID], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"RegionID":{"terms":{"field":"RegionID","size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"sum(AdvEngineID)":{"sum":{"field":"AdvEngineID"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"dc(UserID)":{"cardinality":{"field":"UserID"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q23.yaml b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q23.yaml index f258552964f..acbf78ee28d 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q23.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q23.yaml @@ -9,7 +9,4 @@ calcite: LogicalFilter(condition=[AND(ILIKE($97, '%Google%', '\'), <>($63, ''), NOT(ILIKE($26, '%.google.%', '\')))]) CalciteLogicalIndexScan(table=[[OpenSearch, hits]]) physical: | - EnumerableLimit(fetch=[10000]) - EnumerableLimit(fetch=[10]) - EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last]) - CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[URL, SearchPhrase, UserID, Title], FILTER->AND(ILIKE($3, '%Google%', '\'), <>($1, ''), NOT(ILIKE($0, '%.google.%', '\'))), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},c=COUNT(),dc(UserID)=COUNT(DISTINCT $1)), PROJECT->[c, dc(UserID), SearchPhrase]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"wildcard":{"Title":{"wildcard":"*Google*","case_insensitive":true,"boost":1.0}}},{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"bool":{"must_not":[{"wildcard":{"URL":{"wildcard":"*.google.*","case_insensitive":true,"boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["URL","SearchPhrase","UserID","Title"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"SearchPhrase":{"terms":{"field":"SearchPhrase","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"dc(UserID)":{"cardinality":{"field":"UserID"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[URL, SearchPhrase, UserID, Title], FILTER->AND(ILIKE($3, '%Google%', '\'), <>($1, ''), NOT(ILIKE($0, '%.google.%', '\'))), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},c=COUNT(),dc(UserID)=COUNT(DISTINCT $1)), SORT_AGG_METRICS->[1 DESC LAST], PROJECT->[c, dc(UserID), SearchPhrase], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"wildcard":{"Title":{"wildcard":"*Google*","case_insensitive":true,"boost":1.0}}},{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"bool":{"must_not":[{"wildcard":{"URL":{"wildcard":"*.google.*","case_insensitive":true,"boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["URL","SearchPhrase","UserID","Title"],"excludes":[]},"aggregations":{"SearchPhrase":{"terms":{"field":"SearchPhrase","size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"dc(UserID)":{"cardinality":{"field":"UserID"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q31.yaml b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q31.yaml index a0bab4f2aed..16e58d05b48 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q31.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q31.yaml @@ -9,7 +9,4 @@ calcite: LogicalFilter(condition=[<>($63, '')]) CalciteLogicalIndexScan(table=[[OpenSearch, hits]]) physical: | - EnumerableLimit(fetch=[10000]) - EnumerableLimit(fetch=[10]) - EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last]) - CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[SearchPhrase, SearchEngineID, IsRefresh, ClientIP, ResolutionWidth], FILTER->AND(<>($0, ''), IS NOT NULL($1), IS NOT NULL($3)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},c=COUNT(),sum(IsRefresh)=SUM($2),avg(ResolutionWidth)=AVG($3)), PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), SearchEngineID, ClientIP]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"SearchEngineID","boost":1.0}},{"exists":{"field":"ClientIP","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["SearchPhrase","SearchEngineID","IsRefresh","ClientIP","ResolutionWidth"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"SearchEngineID":{"terms":{"field":"SearchEngineID","missing_bucket":false,"order":"asc"}}},{"ClientIP":{"terms":{"field":"ClientIP","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[SearchPhrase, SearchEngineID, IsRefresh, ClientIP, ResolutionWidth], FILTER->AND(<>($0, ''), IS NOT NULL($1), IS NOT NULL($3)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},c=COUNT(),sum(IsRefresh)=SUM($2),avg(ResolutionWidth)=AVG($3)), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), SearchEngineID, ClientIP], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"SearchEngineID","boost":1.0}},{"exists":{"field":"ClientIP","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["SearchPhrase","SearchEngineID","IsRefresh","ClientIP","ResolutionWidth"],"excludes":[]},"aggregations":{"SearchEngineID|ClientIP":{"multi_terms":{"terms":[{"field":"SearchEngineID"},{"field":"ClientIP"}],"size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q32.yaml b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q32.yaml index 60e5f7af061..2c78447a13d 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q32.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q32.yaml @@ -9,7 +9,4 @@ calcite: LogicalFilter(condition=[<>($63, '')]) CalciteLogicalIndexScan(table=[[OpenSearch, hits]]) physical: | - EnumerableLimit(fetch=[10000]) - EnumerableLimit(fetch=[10]) - EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last]) - CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[WatchID, SearchPhrase, IsRefresh, ClientIP, ResolutionWidth], FILTER->AND(<>($1, ''), IS NOT NULL($0), IS NOT NULL($3)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},c=COUNT(),sum(IsRefresh)=SUM($2),avg(ResolutionWidth)=AVG($3)), PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), WatchID, ClientIP]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"WatchID","boost":1.0}},{"exists":{"field":"ClientIP","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["WatchID","SearchPhrase","IsRefresh","ClientIP","ResolutionWidth"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"WatchID":{"terms":{"field":"WatchID","missing_bucket":false,"order":"asc"}}},{"ClientIP":{"terms":{"field":"ClientIP","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[PROJECT->[WatchID, SearchPhrase, IsRefresh, ClientIP, ResolutionWidth], FILTER->AND(<>($1, ''), IS NOT NULL($0), IS NOT NULL($3)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},c=COUNT(),sum(IsRefresh)=SUM($2),avg(ResolutionWidth)=AVG($3)), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), WatchID, ClientIP], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"bool":{"must":[{"exists":{"field":"SearchPhrase","boost":1.0}}],"must_not":[{"term":{"SearchPhrase":{"value":"","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},{"exists":{"field":"WatchID","boost":1.0}},{"exists":{"field":"ClientIP","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["WatchID","SearchPhrase","IsRefresh","ClientIP","ResolutionWidth"],"excludes":[]},"aggregations":{"WatchID|ClientIP":{"multi_terms":{"terms":[{"field":"WatchID"},{"field":"ClientIP"}],"size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q33.yaml b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q33.yaml index 998d052f16e..964dcece0f6 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q33.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/clickbench/q33.yaml @@ -8,7 +8,4 @@ calcite: LogicalFilter(condition=[AND(IS NOT NULL($41), IS NOT NULL($76))]) CalciteLogicalIndexScan(table=[[OpenSearch, hits]]) physical: | - EnumerableLimit(fetch=[10000]) - EnumerableLimit(fetch=[10]) - EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last]) - CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},c=COUNT(),sum(IsRefresh)=SUM($1),avg(ResolutionWidth)=AVG($3)), PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), WatchID, ClientIP]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"WatchID":{"terms":{"field":"WatchID","missing_bucket":false,"order":"asc"}}},{"ClientIP":{"terms":{"field":"ClientIP","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, hits]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},c=COUNT(),sum(IsRefresh)=SUM($1),avg(ResolutionWidth)=AVG($3)), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[c, sum(IsRefresh), avg(ResolutionWidth), WatchID, ClientIP], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"WatchID|ClientIP":{"multi_terms":{"terms":[{"field":"WatchID"},{"field":"ClientIP"}],"size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"sum(IsRefresh)":{"sum":{"field":"IsRefresh"}},"avg(ResolutionWidth)":{"avg":{"field":"ResolutionWidth"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure_complex1.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure_complex1.yaml new file mode 100644 index 00000000000..e75e44a129d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure_complex1.yaml @@ -0,0 +1,11 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$1], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$1], dir0=[DESC-nulls-last]) + LogicalProject(sum(balance)=[$1], c=[$2], dc(employer)=[$3], state=[$0]) + LogicalAggregate(group=[{0}], sum(balance)=[SUM($1)], c=[COUNT()], dc(employer)=[COUNT(DISTINCT $2)]) + LogicalProject(state=[$7], balance=[$3], employer=[$6]) + LogicalFilter(condition=[IS NOT NULL($7)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={2},sum(balance)=SUM($0),c=COUNT(),dc(employer)=COUNT(DISTINCT $1)), SORT_AGG_METRICS->[2 DESC LAST], PROJECT->[sum(balance), c, dc(employer), state], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"state":{"terms":{"field":"state.keyword","size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"c":"desc"},{"_key":"asc"}]},"aggregations":{"sum(balance)":{"sum":{"field":"balance"}},"dc(employer)":{"cardinality":{"field":"employer.keyword"}},"c":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure_complex2.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure_complex2.yaml new file mode 100644 index 00000000000..1a3df86b66a --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure_complex2.yaml @@ -0,0 +1,12 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$2], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$2], dir0=[DESC-nulls-last]) + LogicalProject(sum(balance)=[$2], count()=[$3], d=[$4], gender=[$0], new_state=[$1]) + LogicalAggregate(group=[{0, 1}], sum(balance)=[SUM($2)], count()=[COUNT()], d=[COUNT(DISTINCT $3)]) + LogicalProject(gender=[$4], new_state=[$17], balance=[$3], employer=[$6]) + LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($17))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], new_state=[LOWER($7)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1, 3},sum(balance)=SUM($0),count()=COUNT(),d=COUNT(DISTINCT $2)), SORT_AGG_METRICS->[4 DESC LAST], PROJECT->[sum(balance), count(), d, gender, new_state], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"gender|new_state":{"multi_terms":{"terms":[{"field":"gender.keyword"},{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAlHsKICAiZmllbGRzIjogWwogICAgewogICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgInByZWNpc2lvbiI6IC0xLAogICAgICAibmFtZSI6ICJzdGF0ZSIKICAgIH0KICBdLAogICJudWxsYWJsZSI6IGZhbHNlCn10AARleHBydACjewogICJvcCI6IHsKICAgICJuYW1lIjogIkxPV0VSIiwKICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICJzeW50YXgiOiAiRlVOQ1RJT04iCiAgfSwKICAib3BlcmFuZHMiOiBbCiAgICB7CiAgICAgICJpbnB1dCI6IDAsCiAgICAgICJuYW1lIjogIiQwIgogICAgfQogIF0KfXQACmZpZWxkVHlwZXNzcgARamF2YS51dGlsLkhhc2hNYXAFB9rBwxZg0QMAAkYACmxvYWRGYWN0b3JJAAl0aHJlc2hvbGR4cD9AAAAAAAAMdwgAAAAQAAAAAXQABXN0YXRlc3IAOm9yZy5vcGVuc2VhcmNoLnNxbC5vcGVuc2VhcmNoLmRhdGEudHlwZS5PcGVuU2VhcmNoVGV4dFR5cGWtg6OTBOMxRAIAAUwABmZpZWxkc3QAD0xqYXZhL3V0aWwvTWFwO3hyADpvcmcub3BlbnNlYXJjaC5zcWwub3BlbnNlYXJjaC5kYXRhLnR5cGUuT3BlblNlYXJjaERhdGFUeXBlwmO8ygL6BTUCAANMAAxleHByQ29yZVR5cGV0ACtMb3JnL29wZW5zZWFyY2gvc3FsL2RhdGEvdHlwZS9FeHByQ29yZVR5cGU7TAALbWFwcGluZ1R5cGV0AEhMb3JnL29wZW5zZWFyY2gvc3FsL29wZW5zZWFyY2gvZGF0YS90eXBlL09wZW5TZWFyY2hEYXRhVHlwZSRNYXBwaW5nVHlwZTtMAApwcm9wZXJ0aWVzcQB+AAt4cH5yAClvcmcub3BlbnNlYXJjaC5zcWwuZGF0YS50eXBlLkV4cHJDb3JlVHlwZQAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQAB1VOS05PV05+cgBGb3JnLm9wZW5zZWFyY2guc3FsLm9wZW5zZWFyY2guZGF0YS50eXBlLk9wZW5TZWFyY2hEYXRhVHlwZSRNYXBwaW5nVHlwZQAAAAAAAAAAEgAAeHEAfgARdAAEVGV4dHNyADxzaGFkZWQuY29tLmdvb2dsZS5jb21tb24uY29sbGVjdC5JbW11dGFibGVNYXAkU2VyaWFsaXplZEZvcm0AAAAAAAAAAAIAAkwABGtleXN0ABJMamF2YS9sYW5nL09iamVjdDtMAAZ2YWx1ZXNxAH4AGHhwdXIAE1tMamF2YS5sYW5nLk9iamVjdDuQzlifEHMpbAIAAHhwAAAAAHVxAH4AGgAAAABzcQB+AAAAAAADdwQAAAACdAAHa2V5d29yZHNxAH4ADH5xAH4AEHQABlNUUklOR35xAH4AFHQAB0tleXdvcmRxAH4AGXh4eA==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0}}}],"size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"d":"desc"},{"_key":"asc"}]},"aggregations":{"sum(balance)":{"sum":{"field":"balance"}},"d":{"cardinality":{"field":"employer.keyword"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure_multi_buckets_not_pushed.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure_multi_buckets_not_pushed.yaml new file mode 100644 index 00000000000..4ecd0e026bb --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_measure_multi_buckets_not_pushed.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], dir0=[ASC-nulls-first]) + LogicalProject(c=[$2], s=[$3], span(age,5)=[$1], state=[$0]) + LogicalAggregate(group=[{0, 2}], c=[COUNT()], s=[SUM($1)]) + LogicalProject(state=[$7], balance=[$3], span(age,5)=[SPAN($8, 5, null:NULL)]) + LogicalFilter(condition=[AND(IS NOT NULL($8), IS NOT NULL($7))]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[balance, state, age], FILTER->AND(IS NOT NULL($2), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},c=COUNT(),s=SUM($1)), PROJECT->[c, s, span(age,5), state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"age","boost":1.0}},{"exists":{"field":"state","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["balance","state","age"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}},{"span(age,5)":{"histogram":{"field":"age","missing_bucket":false,"order":"asc","interval":5.0}}}]},"aggregations":{"s":{"sum":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_multiple_agg_with_sort_on_one_measure_not_push1.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_multiple_agg_with_sort_on_one_measure_not_push1.yaml index 6a5bc8ea0f5..8ac888eab4a 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_multiple_agg_with_sort_on_one_measure_not_push1.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_multiple_agg_with_sort_on_one_measure_not_push1.yaml @@ -1,7 +1,7 @@ calcite: logical: | - LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) - LogicalSort(sort0=[$0], dir0=[ASC-nulls-first]) + LogicalSystemLimit(sort0=[$0], sort1=[$2], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], sort1=[$2], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first]) LogicalProject(c=[$1], s=[$2], state=[$0]) LogicalAggregate(group=[{0}], c=[COUNT()], s=[SUM($1)]) LogicalProject(state=[$7], balance=[$3]) @@ -9,5 +9,5 @@ calcite: CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableLimit(fetch=[10000]) - EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first]) + EnumerableSort(sort0=[$0], sort1=[$2], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},c=COUNT(),s=SUM($0)), PROJECT->[c, s, state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"s":{"sum":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java index db489c29f37..c1e6fbc2bad 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java @@ -23,6 +23,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.immutables.value.Value; import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.expression.function.udf.binning.WidthBucketFunction; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; @@ -91,7 +92,7 @@ protected void apply( /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { Config DEFAULT = ImmutableAggregateIndexScanRule.Config.builder() .build() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java index fa4ed750ee0..79fbe10d00d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @@ -106,7 +107,7 @@ private static boolean validFilter(LogicalFilter filter) { * LogicalFilter(condition=[IS NOT NULL($0)])
*/ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { Config DEFAULT = ImmutableDedupPushdownRule.Config.builder() .build() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java index b1bd711601d..204ecacbd39 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java @@ -19,6 +19,7 @@ import org.apache.calcite.rel.core.Project; import org.apache.commons.lang3.tuple.Pair; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.opensearch.util.OpenSearchRelOptUtil; @@ -94,7 +95,7 @@ public void onMatch(RelOptRuleCall call) { } @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { /** * Only match ENUMERABLE convention RelNode combination like below to narrow the optimization diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/FilterIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/FilterIndexScanRule.java index 8aa2f77b6ac..b0c4f55aa3d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/FilterIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/FilterIndexScanRule.java @@ -12,6 +12,7 @@ import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.logical.LogicalFilter; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @@ -48,7 +49,7 @@ protected void apply(RelOptRuleCall call, Filter filter, CalciteLogicalIndexScan /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { /** Config that matches Filter on CalciteLogicalIndexScan. */ Config DEFAULT = ImmutableFilterIndexScanRule.Config.builder() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/LimitIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/LimitIndexScanRule.java index 1c24c7664fc..5d5412ce86d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/LimitIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/LimitIndexScanRule.java @@ -13,6 +13,7 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @@ -82,7 +83,7 @@ private static Integer extractOffsetValue(RexNode offset) { /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { LimitIndexScanRule.Config DEFAULT = ImmutableLimitIndexScanRule.Config.builder() .build() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ProjectIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ProjectIndexScanRule.java index d4c7986145e..629869be547 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ProjectIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ProjectIndexScanRule.java @@ -21,6 +21,7 @@ import org.apache.calcite.util.mapping.Mapping; import org.apache.calcite.util.mapping.Mappings; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @@ -102,7 +103,7 @@ public boolean isIdentity(Integer size) { /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { /** Config that matches Project on ProjectIndexScanRule. */ Config DEFAULT = ImmutableProjectIndexScanRule.Config.builder() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RareTopPushdownRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RareTopPushdownRule.java index 5672e756977..649ce9465bc 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RareTopPushdownRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RareTopPushdownRule.java @@ -19,6 +19,7 @@ import org.apache.calcite.rex.RexWindow; import org.apache.calcite.sql.SqlKind; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @@ -75,7 +76,7 @@ public void onMatch(RelOptRuleCall call) { } @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { RareTopPushdownRule.Config DEFAULT = ImmutableRareTopPushdownRule.Config.builder() .build() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RelevanceFunctionPushdownRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RelevanceFunctionPushdownRule.java index 31a67f49757..6ec968ebc6d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RelevanceFunctionPushdownRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RelevanceFunctionPushdownRule.java @@ -18,6 +18,7 @@ import org.apache.calcite.rex.RexVisitorImpl; import org.apache.calcite.sql.SqlOperator; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; /** @@ -100,7 +101,7 @@ boolean hasRelevanceFunction() { /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { /** Config that matches Filter on CalciteLogicalIndexScan. */ Config DEFAULT = ImmutableRelevanceFunctionPushdownRule.Config.builder() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortAggregateMeasureRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortAggregateMeasureRule.java index 1b40063e6b1..62587a2d430 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortAggregateMeasureRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortAggregateMeasureRule.java @@ -11,6 +11,7 @@ import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.logical.LogicalSort; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @@ -34,7 +35,7 @@ public void onMatch(RelOptRuleCall call) { /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { // TODO support multiple measures, only support single measure sort Predicate hasOneFieldCollation = sort -> sort.getCollation().getFieldCollations().size() == 1; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortIndexScanRule.java index ff30324d09f..86a039cc145 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortIndexScanRule.java @@ -10,6 +10,7 @@ import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.core.Sort; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; @@ -37,7 +38,7 @@ public void onMatch(RelOptRuleCall call) { /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { SortIndexScanRule.Config DEFAULT = ImmutableSortIndexScanRule.Config.builder() .build() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortProjectExprTransposeRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortProjectExprTransposeRule.java index 48684020909..9f27bfd3954 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortProjectExprTransposeRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortProjectExprTransposeRule.java @@ -26,6 +26,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.commons.lang3.tuple.Pair; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.opensearch.util.OpenSearchRelOptUtil; @@ -121,7 +122,7 @@ public void onMatch(RelOptRuleCall call) { * and physical conventions, aka LogicalSort with fetch vs EnumerableLimit. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { SortProjectExprTransposeRule.Config DEFAULT = ImmutableSortProjectExprTransposeRule.Config.builder() .build() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index 71c4c2087e0..f1535550998 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -224,18 +224,18 @@ public void pushDownFilterForCalcite(QueryBuilder query) { /** * Push down aggregation to DSL request. * - * @param aggregationBuilder pair of aggregation query and aggregation parser. + * @param builderAndParser pair of aggregation query and aggregation parser. */ public void pushDownAggregation( - Pair, OpenSearchAggregationResponseParser> aggregationBuilder) { - aggregationBuilder.getLeft().forEach(sourceBuilder::aggregation); + Pair, OpenSearchAggregationResponseParser> builderAndParser) { + builderAndParser.getLeft().forEach(sourceBuilder::aggregation); sourceBuilder.size(0); - exprValueFactory.setParser(aggregationBuilder.getRight()); + exprValueFactory.setParser(builderAndParser.getRight()); // no need to sort docs for aggregation if (sourceBuilder.sorts() != null) { sourceBuilder.sorts().clear(); } - if (aggregationBuilder.getRight() instanceof CountAsTotalHitsParser) { + if (builderAndParser.getRight() instanceof CountAsTotalHitsParser) { sourceBuilder.trackTotalHits(true); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index ae456d9bbbe..503bc5f4fd9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -295,7 +295,7 @@ public CalciteLogicalIndexScan pushDownSortAggregateMeasure(Sort sort) { try { if (!pushDownContext.isAggregatePushed()) return null; List aggregationBuilders = - pushDownContext.getAggPushDownAction().getAggregationBuilder().getLeft(); + pushDownContext.getAggPushDownAction().getBuilderAndParser().getLeft(); if (aggregationBuilders.size() != 1) { return null; } @@ -303,7 +303,7 @@ public CalciteLogicalIndexScan pushDownSortAggregateMeasure(Sort sort) { return null; } List collationNames = getCollationNames(sort.getCollation().getFieldCollations()); - if (!isAllCollationNamesEqualAggregators(collationNames)) { + if (!isAnyCollationNameInAggregators(collationNames)) { return null; } CalciteLogicalIndexScan newScan = copyWithNewTraitSet(sort.getTraitSet()); @@ -367,7 +367,7 @@ public AbstractRelNode pushDownAggregate(Aggregate aggregate, Project project) { AggregateAnalyzer.AggregateBuilderHelper helper = new AggregateAnalyzer.AggregateBuilderHelper( getRowType(), fieldTypes, getCluster(), bucketNullable, bucketSize); - final Pair, OpenSearchAggregationResponseParser> aggregationBuilder = + final Pair, OpenSearchAggregationResponseParser> builderAndParser = AggregateAnalyzer.analyze(aggregate, project, outputFields, helper); Map extendedTypeMapping = aggregate.getRowType().getFieldList().stream() @@ -380,7 +380,7 @@ public AbstractRelNode pushDownAggregate(Aggregate aggregate, Project project) { field.getType())))); AggPushDownAction action = new AggPushDownAction( - aggregationBuilder, + builderAndParser, extendedTypeMapping, outputFields.subList(0, aggregate.getGroupSet().cardinality())); newScan.pushDownContext.add(PushDownType.AGGREGATION, aggregate, action); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java index a45f06a14fe..a236267b130 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java @@ -15,7 +15,6 @@ import lombok.Getter; import org.apache.calcite.rel.RelFieldCollation; import org.apache.commons.lang3.tuple.Pair; -import org.opensearch.search.aggregations.AbstractAggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.AggregatorFactories; @@ -44,21 +43,22 @@ @Getter @EqualsAndHashCode public class AggPushDownAction implements OSRequestBuilderAction { + private static final int MAX_BUCKET_SIZE = 65535; - private Pair, OpenSearchAggregationResponseParser> aggregationBuilder; + private Pair, OpenSearchAggregationResponseParser> builderAndParser; private final Map extendedTypeMapping; private final long scriptCount; // Record the output field names of all buckets as the sequence of buckets private List bucketNames; public AggPushDownAction( - Pair, OpenSearchAggregationResponseParser> aggregationBuilder, + Pair, OpenSearchAggregationResponseParser> builderAndParser, Map extendedTypeMapping, List bucketNames) { - this.aggregationBuilder = aggregationBuilder; + this.builderAndParser = builderAndParser; this.extendedTypeMapping = extendedTypeMapping; this.scriptCount = - aggregationBuilder.getLeft().stream().mapToInt(AggPushDownAction::getScriptCount).sum(); + builderAndParser.getLeft().stream().mapToInt(AggPushDownAction::getScriptCount).sum(); this.bucketNames = bucketNames; } @@ -82,7 +82,7 @@ private static int getScriptCount(AggregationBuilder aggBuilder) { @Override public void apply(OpenSearchRequestBuilder requestBuilder) { - requestBuilder.pushDownAggregation(aggregationBuilder); + requestBuilder.pushDownAggregation(builderAndParser); requestBuilder.pushTypeMapping(extendedTypeMapping); } @@ -109,8 +109,8 @@ private String multiTermsBucketNameAsString(CompositeAggregationBuilder composit /** Re-pushdown a sort aggregation measure to replace the pushed composite aggregation */ public void rePushDownSortAggMeasure( List collations, List fieldNames) { - if (aggregationBuilder.getLeft().isEmpty()) return; - AggregationBuilder builder = aggregationBuilder.getLeft().get(0); + if (builderAndParser.getLeft().isEmpty()) return; + AggregationBuilder builder = builderAndParser.getLeft().get(0); if (builder instanceof CompositeAggregationBuilder) { CompositeAggregationBuilder composite = (CompositeAggregationBuilder) builder; String path = getAggregationPath(collations, fieldNames, composite); @@ -118,136 +118,113 @@ public void rePushDownSortAggMeasure( collations.get(0).getDirection() == RelFieldCollation.Direction.ASCENDING ? BucketOrder.aggregation(path, true) : BucketOrder.aggregation(path, false); - + AggregationBuilder aggregationBuilder = null; if (composite.sources().size() == 1) { if (composite.sources().get(0) instanceof TermsValuesSourceBuilder) { TermsValuesSourceBuilder terms = (TermsValuesSourceBuilder) composite.sources().get(0); if (!terms.missingBucket()) { - TermsAggregationBuilder termsBuilder = - buildTermsAggregationBuilder(terms, bucketOrder, composite.size()); - attachSubAggregations(composite.getSubAggregations(), path, termsBuilder); - aggregationBuilder = - Pair.of( - Collections.singletonList(termsBuilder), - convertTo(aggregationBuilder.getRight())); - return; + aggregationBuilder = buildTermsAggregationBuilder(terms, bucketOrder, composite.size()); + attachSubAggregations(composite.getSubAggregations(), path, aggregationBuilder); + } else { + throw new OpenSearchRequestBuilder.PushDownUnSupportedException( + "Cannot pushdown sort aggregate measure"); } } else if (composite.sources().get(0) instanceof DateHistogramValuesSourceBuilder) { DateHistogramValuesSourceBuilder dateHisto = (DateHistogramValuesSourceBuilder) composite.sources().get(0); - DateHistogramAggregationBuilder dateHistoBuilder = - buildDateHistogramAggregationBuilder(dateHisto, bucketOrder); - attachSubAggregations(composite.getSubAggregations(), path, dateHistoBuilder); - aggregationBuilder = - Pair.of( - Collections.singletonList(dateHistoBuilder), - convertTo(aggregationBuilder.getRight())); - return; + aggregationBuilder = buildDateHistogramAggregationBuilder(dateHisto, bucketOrder); + attachSubAggregations(composite.getSubAggregations(), path, aggregationBuilder); } else if (composite.sources().get(0) instanceof HistogramValuesSourceBuilder) { HistogramValuesSourceBuilder histo = (HistogramValuesSourceBuilder) composite.sources().get(0); if (!histo.missingBucket()) { - HistogramAggregationBuilder histoBuilder = - buildHistogramAggregationBuilder(histo, bucketOrder); - attachSubAggregations(composite.getSubAggregations(), path, histoBuilder); - aggregationBuilder = - Pair.of( - Collections.singletonList(histoBuilder), - convertTo(aggregationBuilder.getRight())); - return; + aggregationBuilder = buildHistogramAggregationBuilder(histo, bucketOrder); + attachSubAggregations(composite.getSubAggregations(), path, aggregationBuilder); + } else { + throw new OpenSearchRequestBuilder.PushDownUnSupportedException( + "Cannot pushdown sort aggregate measure"); } } } else { if (composite.sources().stream() - .allMatch( - src -> src instanceof TermsValuesSourceBuilder - && !((TermsValuesSourceBuilder) src).missingBucket())) { + .allMatch(src -> src instanceof TermsValuesSourceBuilder && !src.missingBucket())) { // multi-term agg - MultiTermsAggregationBuilder multiTermsBuilder = - buildMultiTermsAggregationBuilder(composite, bucketOrder); - attachSubAggregations(composite.getSubAggregations(), path, multiTermsBuilder); - aggregationBuilder = - Pair.of( - Collections.singletonList(multiTermsBuilder), - convertTo(aggregationBuilder.getRight())); - return; + aggregationBuilder = buildMultiTermsAggregationBuilder(composite, bucketOrder); + attachSubAggregations(composite.getSubAggregations(), path, aggregationBuilder); + } else { + throw new OpenSearchRequestBuilder.PushDownUnSupportedException( + "Cannot pushdown sort aggregate measure"); } } - throw new OpenSearchRequestBuilder.PushDownUnSupportedException( - "Cannot pushdown sort aggregate metrics"); + builderAndParser = + Pair.of( + Collections.singletonList(aggregationBuilder), + convertTo(builderAndParser.getRight())); } } /** Re-pushdown a nested aggregation for rare/top to replace the pushed composite aggregation */ public void rePushDownRareTop(RareTopDigest digest) { - if (aggregationBuilder.getLeft().isEmpty()) return; - AggregationBuilder builder = aggregationBuilder.getLeft().get(0); + if (builderAndParser.getLeft().isEmpty()) return; + AggregationBuilder builder = builderAndParser.getLeft().get(0); if (builder instanceof CompositeAggregationBuilder) { CompositeAggregationBuilder composite = (CompositeAggregationBuilder) builder; BucketOrder bucketOrder = digest.direction() == RelFieldCollation.Direction.ASCENDING ? BucketOrder.count(true) : BucketOrder.count(false); + AggregationBuilder aggregationBuilder = null; if (composite.sources().size() == 1) { if (composite.sources().get(0) instanceof TermsValuesSourceBuilder) { TermsValuesSourceBuilder terms = (TermsValuesSourceBuilder) composite.sources().get(0); if (!terms.missingBucket()) { - TermsAggregationBuilder termsBuilder = - buildTermsAggregationBuilder(terms, bucketOrder, digest.number()); - aggregationBuilder = - Pair.of( - Collections.singletonList(termsBuilder), - convertTo(aggregationBuilder.getRight())); - return; + aggregationBuilder = buildTermsAggregationBuilder(terms, bucketOrder, digest.number()); + } else { + throw new OpenSearchRequestBuilder.PushDownUnSupportedException( + "Cannot pushdown " + digest); } } else if (composite.sources().get(0) instanceof DateHistogramValuesSourceBuilder) { DateHistogramValuesSourceBuilder dateHisto = (DateHistogramValuesSourceBuilder) composite.sources().get(0); // for top/rare, only field can be used in by-clause, so this branch never accessed now - DateHistogramAggregationBuilder dateHistoBuilder = - buildDateHistogramAggregationBuilder(dateHisto, bucketOrder); - aggregationBuilder = - Pair.of( - Collections.singletonList(dateHistoBuilder), - convertTo(aggregationBuilder.getRight())); - return; + aggregationBuilder = buildDateHistogramAggregationBuilder(dateHisto, bucketOrder); } else if (composite.sources().get(0) instanceof HistogramValuesSourceBuilder) { HistogramValuesSourceBuilder histo = (HistogramValuesSourceBuilder) composite.sources().get(0); if (!histo.missingBucket()) { // for top/rare, only field can be used in by-clause, so this branch never accessed now - HistogramAggregationBuilder histoBuilder = - buildHistogramAggregationBuilder(histo, bucketOrder); - aggregationBuilder = - Pair.of( - Collections.singletonList(histoBuilder), - convertTo(aggregationBuilder.getRight())); - return; + aggregationBuilder = buildHistogramAggregationBuilder(histo, bucketOrder); + } else { + throw new OpenSearchRequestBuilder.PushDownUnSupportedException( + "Cannot pushdown " + digest); } + } else { + throw new OpenSearchRequestBuilder.PushDownUnSupportedException( + "Cannot pushdown " + digest); } } else { if (composite.sources().stream() - .allMatch( - src -> src instanceof TermsValuesSourceBuilder - && !((TermsValuesSourceBuilder) src).missingBucket())) { + .allMatch(src -> src instanceof TermsValuesSourceBuilder && !src.missingBucket())) { // nested term agg TermsAggregationBuilder termsBuilder = null; for (int i = 0; i < composite.sources().size(); i++) { TermsValuesSourceBuilder terms = (TermsValuesSourceBuilder) composite.sources().get(i); if (i == 0) { // first - termsBuilder = buildTermsAggregationBuilder(terms, null, 65535); + aggregationBuilder = buildTermsAggregationBuilder(terms, null, MAX_BUCKET_SIZE); } else if (i == composite.sources().size() - 1) { // last - termsBuilder.subAggregation( + aggregationBuilder.subAggregation( buildTermsAggregationBuilder(terms, bucketOrder, digest.number())); } else { - termsBuilder.subAggregation(buildTermsAggregationBuilder(terms, null, 65535)); + aggregationBuilder.subAggregation( + buildTermsAggregationBuilder(terms, null, MAX_BUCKET_SIZE)); } } - aggregationBuilder = - Pair.of( - Collections.singletonList(termsBuilder), - convertTo(aggregationBuilder.getRight())); - return; + } else { + throw new OpenSearchRequestBuilder.PushDownUnSupportedException( + "Cannot pushdown " + digest); } } - throw new OpenSearchRequestBuilder.PushDownUnSupportedException("Cannot pushdown " + digest); + builderAndParser = + Pair.of( + Collections.singletonList(aggregationBuilder), + convertTo(builderAndParser.getRight())); } } @@ -341,29 +318,29 @@ private String getAggregationPath( List collations, List fieldNames, CompositeAggregationBuilder composite) { - String path; AggregationBuilder metric = composite.getSubAggregations().stream().findFirst().orElse(null); - if (metric == null) { - // count agg optimized, get the path name from field names - path = fieldNames.get(collations.get(0).getFieldIndex()); - } else if (metric instanceof ValuesSourceAggregationBuilder.LeafOnly) { - path = metric.getName(); - } else { - // we do not support pushdown sort aggregate measure for nested aggregation + if (metric != null && !(metric instanceof ValuesSourceAggregationBuilder.LeafOnly)) { + // do not pushdown sort aggregate measure for nested aggregation, e.g. composite then range throw new OpenSearchRequestBuilder.PushDownUnSupportedException( "Cannot pushdown sort aggregate measure, composite.getSubAggregations() is not a" + " LeafOnly"); } - return path; + return fieldNames.get(collations.get(0).getFieldIndex()); } - private > T attachSubAggregations( - Collection subAggregations, String path, T aggregationBuilder) { + private AggregationBuilder attachSubAggregations( + Collection subAggregations, + String path, + AggregationBuilder aggregationBuilder) { AggregatorFactories.Builder metricBuilder = new AggregatorFactories.Builder(); if (subAggregations.isEmpty()) { metricBuilder.addAggregator(AggregationBuilders.count(path).field("_index")); } else { - metricBuilder.addAggregator(subAggregations.stream().collect(Collectors.toList()).get(0)); + subAggregations.forEach(metricBuilder::addAggregator); + // the count aggregator may be eliminated by doc_count optimization, add it back + if (subAggregations.stream().noneMatch(sub -> sub.getName().equals(path))) { + metricBuilder.addAggregator(AggregationBuilders.count(path).field("_index")); + } } aggregationBuilder.subAggregations(metricBuilder); return aggregationBuilder; @@ -372,8 +349,8 @@ private > T attachSubAggregations( public void pushDownSortIntoAggBucket( List collations, List fieldNames) { // aggregationBuilder.getLeft() could be empty when count agg optimization works - if (aggregationBuilder.getLeft().isEmpty()) return; - AggregationBuilder builder = aggregationBuilder.getLeft().get(0); + if (builderAndParser.getLeft().isEmpty()) return; + AggregationBuilder builder = builderAndParser.getLeft().get(0); List selected = new ArrayList<>(collations.size()); if (builder instanceof CompositeAggregationBuilder) { CompositeAggregationBuilder compositeAggBuilder = (CompositeAggregationBuilder) builder; @@ -427,13 +404,13 @@ public void pushDownSortIntoAggBucket( }); AggregatorFactories.Builder newAggBuilder = new AggregatorFactories.Builder(); compositeAggBuilder.getSubAggregations().forEach(newAggBuilder::addAggregator); - aggregationBuilder = + builderAndParser = Pair.of( Collections.singletonList( AggregationBuilders.composite("composite_buckets", newBuckets) .subAggregations(newAggBuilder) .size(compositeAggBuilder.size())), - aggregationBuilder.getRight()); + builderAndParser.getRight()); bucketNames = newBucketNames; } if (builder instanceof TermsAggregationBuilder) { @@ -449,8 +426,8 @@ public void pushDownSortIntoAggBucket( */ public boolean pushDownLimitIntoBucketSize(Integer size) { // aggregationBuilder.getLeft() could be empty when count agg optimization works - if (aggregationBuilder.getLeft().isEmpty()) return false; - AggregationBuilder builder = aggregationBuilder.getLeft().get(0); + if (builderAndParser.getLeft().isEmpty()) return false; + AggregationBuilder builder = builderAndParser.getLeft().get(0); if (builder instanceof CompositeAggregationBuilder) { CompositeAggregationBuilder compositeAggBuilder = (CompositeAggregationBuilder) builder; if (size < compositeAggBuilder.size()) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java index 1b50a2a8751..9098d1ca17c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java @@ -94,6 +94,7 @@ ArrayDeque getOperationsForAgg() { @Override public boolean add(PushDownOperation operation) { + operation.action().transform(this, operation); if (operation.type() == PushDownType.AGGREGATION) { isAggregatePushed = true; this.aggPushDownAction = (AggPushDownAction) operation.action(); @@ -116,7 +117,6 @@ public boolean add(PushDownOperation operation) { if (operation.type() == PushDownType.RARE_TOP) { isRareTopPushed = true; } - operation.action().transform(this, operation); return true; }