Skip to content

Commit 54a59d9

Browse files
authored
Append limit operator for QUEERY_SIZE_LIMIT (#3940) (#3966)
* Append limit operator for QUEERY_SIZE_LIMIT * Add LogicalSystemLimit * Revert part of #3880 * Fix IT after merging main --------- (cherry picked from commit 5316c0a) Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent 4bd23cf commit 54a59d9

File tree

105 files changed

+348
-230
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

105 files changed

+348
-230
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.plan;
7+
8+
import java.util.Collections;
9+
import java.util.List;
10+
import lombok.Getter;
11+
import org.apache.calcite.plan.Convention;
12+
import org.apache.calcite.plan.RelOptCluster;
13+
import org.apache.calcite.plan.RelTraitSet;
14+
import org.apache.calcite.rel.RelCollation;
15+
import org.apache.calcite.rel.RelCollationTraitDef;
16+
import org.apache.calcite.rel.RelNode;
17+
import org.apache.calcite.rel.RelWriter;
18+
import org.apache.calcite.rel.core.Sort;
19+
import org.apache.calcite.rel.hint.RelHint;
20+
import org.apache.calcite.rel.logical.LogicalSort;
21+
import org.apache.calcite.rex.RexNode;
22+
import org.checkerframework.checker.nullness.qual.Nullable;
23+
24+
/** System level limit logical plan, comparing to user level plan {@link LogicalSort}. */
25+
public class LogicalSystemLimit extends Sort {
26+
27+
public enum SystemLimitType {
28+
/**
29+
* System limit type for system level limit.
30+
*
31+
* <p>This type is used to indicate that the limit is applied to the system level.
32+
*/
33+
QUERY_SIZE_LIMIT
34+
}
35+
36+
@Getter private final SystemLimitType type;
37+
38+
private LogicalSystemLimit(
39+
SystemLimitType type,
40+
RelOptCluster cluster,
41+
RelTraitSet traitSet,
42+
RelNode input,
43+
RelCollation collation,
44+
@Nullable RexNode offset,
45+
@Nullable RexNode fetch) {
46+
this(type, cluster, traitSet, Collections.emptyList(), input, collation, offset, fetch);
47+
}
48+
49+
private LogicalSystemLimit(
50+
SystemLimitType type,
51+
RelOptCluster cluster,
52+
RelTraitSet traitSet,
53+
List<RelHint> hints,
54+
RelNode input,
55+
RelCollation collation,
56+
@Nullable RexNode offset,
57+
@Nullable RexNode fetch) {
58+
super(cluster, traitSet, hints, input, collation, offset, fetch);
59+
assert traitSet.containsIfApplicable(Convention.NONE);
60+
this.type = type;
61+
}
62+
63+
public static LogicalSystemLimit create(SystemLimitType type, RelNode input, RexNode fetch) {
64+
return create(type, input, input.getTraitSet().getCollation(), null, fetch);
65+
}
66+
67+
public static LogicalSystemLimit create(
68+
SystemLimitType type,
69+
RelNode input,
70+
RelCollation collation,
71+
@Nullable RexNode offset,
72+
@Nullable RexNode fetch) {
73+
RelOptCluster cluster = input.getCluster();
74+
collation = RelCollationTraitDef.INSTANCE.canonize(collation);
75+
RelTraitSet traitSet = input.getTraitSet().replace(Convention.NONE).replace(collation);
76+
return new LogicalSystemLimit(type, cluster, traitSet, input, collation, offset, fetch);
77+
}
78+
79+
@Override
80+
public Sort copy(
81+
RelTraitSet traitSet,
82+
RelNode newInput,
83+
RelCollation newCollation,
84+
@Nullable RexNode offset,
85+
@Nullable RexNode fetch) {
86+
return new LogicalSystemLimit(
87+
this.type, getCluster(), traitSet, hints, newInput, newCollation, offset, fetch);
88+
}
89+
90+
@Override
91+
public RelWriter explainTerms(RelWriter pw) {
92+
super.explainTerms(pw);
93+
// Show type in the explain
94+
pw.item("type", type);
95+
return pw;
96+
}
97+
}

core/src/main/java/org/opensearch/sql/calcite/plan/Scannable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@
1616
*/
1717
public interface Scannable {
1818

19-
public Enumerable<@Nullable Object> scanWithLimit();
19+
public Enumerable<@Nullable Object> scan();
2020
}

core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ protected PreparedResult implement(RelRoot root) {
302302
RelDataType resultType = root.rel.getRowType();
303303
boolean isDml = root.kind.belongsTo(SqlKind.DML);
304304
if (root.rel instanceof Scannable) {
305-
final Bindable bindable = dataContext -> ((Scannable) root.rel).scanWithLimit();
305+
final Bindable bindable = dataContext -> ((Scannable) root.rel).scan();
306306

307307
return new PreparedResultImpl(
308308
resultType,

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.opensearch.sql.calcite.CalcitePlanContext;
3737
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
3838
import org.opensearch.sql.calcite.OpenSearchSchema;
39+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
40+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
3941
import org.opensearch.sql.common.response.ResponseListener;
4042
import org.opensearch.sql.common.setting.Settings;
4143
import org.opensearch.sql.common.setting.Settings.Key;
@@ -100,7 +102,7 @@ public void executeWithCalcite(
100102
CalcitePlanContext.create(
101103
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
102104
RelNode relNode = analyze(plan, context);
103-
RelNode optimized = optimize(relNode);
105+
RelNode optimized = optimize(relNode, context);
104106
RelNode calcitePlan = convertToCalcitePlan(optimized);
105107
executionEngine.execute(calcitePlan, context, listener);
106108
return null;
@@ -133,7 +135,7 @@ public void explainWithCalcite(
133135
CalcitePlanContext.create(
134136
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
135137
RelNode relNode = analyze(plan, context);
136-
RelNode optimized = optimize(relNode);
138+
RelNode optimized = optimize(relNode, context);
137139
RelNode calcitePlan = convertToCalcitePlan(optimized);
138140
executionEngine.explain(calcitePlan, format, context, listener);
139141
return null;
@@ -250,8 +252,13 @@ public PhysicalPlan plan(LogicalPlan plan) {
250252
return planner.plan(plan);
251253
}
252254

253-
public RelNode optimize(RelNode plan) {
254-
return planner.customOptimize(plan);
255+
/**
256+
* Try to optimize the plan by appending a limit operator for QUERY_SIZE_LIMIT Don't add for
257+
* `EXPLAIN` to avoid changing its output plan.
258+
*/
259+
public RelNode optimize(RelNode plan, CalcitePlanContext context) {
260+
return LogicalSystemLimit.create(
261+
SystemLimitType.QUERY_SIZE_LIMIT, plan, context.relBuilder.literal(context.querySizeLimit));
255262
}
256263

257264
private boolean isCalciteFallbackAllowed() {

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLExplainIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public void testExplainCommandCost() throws IOException {
8080
? loadFromFile("expectedOutput/calcite/explain_filter_cost_w_pushdown.txt")
8181
: loadFromFile("expectedOutput/calcite/explain_filter_cost_wo_pushdown.txt");
8282
assertTrue(
83-
String.format("Got: %s\n, expected: %s", result, expected), result.contains(expected));
83+
String.format("Got: %s\n, expected: %s", result, expected),
84+
result.contains(expected.trim()));
8485
}
8586

8687
@Test

integ-test/src/test/resources/expectedOutput/calcite/explain_agg_with_script.json

Lines changed: 2 additions & 2 deletions
Large diffs are not rendered by default.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalAggregate(group=[{}], cnt=[COUNT()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},cnt=COUNT())], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"_index\"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], cnt=[COUNT()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},cnt=COUNT())], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"_index\"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalProject(age=[COALESCE($8, -1)], balance=[COALESCE($3, -1)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableCalc(expr#0..1=[{inputs}], expr#2=[-1], expr#3=[COALESCE($t0, $t2)], expr#4=[COALESCE($t1, $t2)], $f0=[$t3], $f1=[$t4])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age, balance]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\",\"balance\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(age=[COALESCE($8, -1)], balance=[COALESCE($3, -1)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableCalc(expr#0..1=[{inputs}], expr#2=[-1], expr#3=[COALESCE($t0, $t2)], expr#4=[COALESCE($t1, $t2)], $f0=[$t3], $f1=[$t4])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age, balance], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\",\"balance\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalProject(avg_age=[$2], state=[$0], city=[$1])\n LogicalAggregate(group=[{0, 1}], avg_age=[AVG($2)])\n LogicalProject(state=[$7], city=[$5], age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableCalc(expr#0..2=[{inputs}], avg_age=[$t2], state=[$t0], city=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(avg_age=[$2], state=[$0], city=[$1])\n LogicalAggregate(group=[{0, 1}], avg_age=[AVG($2)])\n LogicalProject(state=[$7], city=[$5], age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], avg_age=[$t2], state=[$t0], city=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[city, state, age], FILTER->>($2, 30), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg_age=AVG($2))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"city\",\"state\",\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[GREATER_IP($0, IP('1.1.1.1':VARCHAR))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], SCRIPT->GREATER_IP($0, IP('1.1.1.1':VARCHAR)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"script\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAensKICAiZmllbGRzIjogWwogICAgewogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQDfXsKICAib3AiOiB7CiAgICAibmFtZSI6ICJHUkVBVEVSX0lQIiwKICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICJzeW50YXgiOiAiRlVOQ1RJT04iCiAgfSwKICAib3BlcmFuZHMiOiBbCiAgICB7CiAgICAgICJpbnB1dCI6IDAsCiAgICAgICJuYW1lIjogIiQwIgogICAgfSwKICAgIHsKICAgICAgIm9wIjogewogICAgICAgICJuYW1lIjogIklQIiwKICAgICAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAgICAgInN5bnRheCI6ICJGVU5DVElPTiIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJsaXRlcmFsIjogIjEuMS4xLjEiLAogICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICJ0eXBlIjogIlZBUkNIQVIiLAogICAgICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICBdLAogICAgICAiY2xhc3MiOiAib3JnLm9wZW5zZWFyY2guc3FsLmV4cHJlc3Npb24uZnVuY3Rpb24uVXNlckRlZmluZWRGdW5jdGlvbkJ1aWxkZXIkMSIsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIk9USEVSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlCiAgICAgIH0sCiAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgImR5bmFtaWMiOiBmYWxzZQogICAgfQogIF0sCiAgImNsYXNzIjogIm9yZy5vcGVuc2VhcmNoLnNxbC5leHByZXNzaW9uLmZ1bmN0aW9uLlVzZXJEZWZpbmVkRnVuY3Rpb25CdWlsZGVyJDEiLAogICJ0eXBlIjogewogICAgInR5cGUiOiAiQk9PTEVBTiIsCiAgICAibnVsbGFibGUiOiB0cnVlCiAgfSwKICAiZGV0ZXJtaW5pc3RpYyI6IHRydWUsCiAgImR5bmFtaWMiOiBmYWxzZQp9dAAKZmllbGRUeXBlc3NyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAABdAAEaG9zdH5yAClvcmcub3BlbnNlYXJjaC5zcWwuZGF0YS50eXBlLkV4cHJDb3JlVHlwZQAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQAAklQeHg=\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":*}},\"boost\":1.0}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
5+
}
6+
}

0 commit comments

Comments
 (0)