diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index e5d9304255f..0f921c173aa 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -578,6 +578,17 @@ public void testMvjoinExplain() throws IOException { assertJsonEqualsIgnoreId(expected, result); } + @Test + public void testPreventLimitPushdown() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + setMaxResultWindow("opensearch-sql_test_index_account", 1); + String query = "source=opensearch-sql_test_index_account | head 1 from 1"; + var result = explainQueryToString(query); + String expected = loadExpectedPlan("explain_prevent_limit_push.yaml"); + assertYamlEqualsJsonIgnoreId(expected, result); + resetMaxResultWindow("opensearch-sql_test_index_account"); + } + @Test public void testPushdownLimitIntoAggregation() throws IOException { enabledOnlyWhenPushdownIsEnabled(); diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_prevent_limit_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_prevent_limit_push.yaml new file mode 100644 index 00000000000..e7019b44d7d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_prevent_limit_push.yaml @@ -0,0 +1,10 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10]) + LogicalSort(offset=[1], fetch=[1]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableLimit(offset=[1], fetch=[1]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3102.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3102.yml index ffd8d5510f1..532f4161b5b 100644 --- a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3102.yml +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/3102.yml @@ -3,6 +3,11 @@ setup: features: - headers - allowed_warnings + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: true - do: indices.create: index: test @@ -21,6 +26,14 @@ setup: - '{"index": {}}' - '{"id": 3}' +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + --- "Prevent push down limit if the offset reach max_result_window": - do: diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java index 0173454f026..a6189a8f4d8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java @@ -170,6 +170,7 @@ public static class PushDownContext extends ArrayDeque { @Getter private AggPushDownAction aggPushDownAction; @Getter private boolean isLimitPushed = false; @Getter private boolean isProjectPushed = false; + @Getter private int startFrom = 0; @Override public PushDownContext clone() { @@ -184,6 +185,7 @@ public boolean add(PushDownAction pushDownAction) { } if (pushDownAction.type == PushDownType.LIMIT) { isLimitPushed = true; + startFrom += ((LimitDigest) pushDownAction.digest).offset(); } if (pushDownAction.type == PushDownType.PROJECT) { isProjectPushed = true; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index 84cead5d2b8..26b0e5d5b7a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -45,6 +45,7 @@ import org.opensearch.sql.opensearch.planner.physical.EnumerableIndexScanRule; import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules; import org.opensearch.sql.opensearch.request.AggregateAnalyzer; +import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.request.PredicateAnalyzer; import org.opensearch.sql.opensearch.request.PredicateAnalyzer.QueryExpression; import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; @@ -323,6 +324,13 @@ public AbstractRelNode pushDownLimit(LogicalSort sort, Integer limit, Integer of return offset > 0 ? sort.copy(sort.getTraitSet(), List.of(newScan)) : newScan; } else { CalciteLogicalIndexScan newScan = this.copyWithNewSchema(getRowType()); + int newStartFrom = newScan.pushDownContext.getStartFrom() + offset; + if (newStartFrom >= newScan.osIndex.getMaxResultWindow()) { + throw new OpenSearchRequestBuilder.PushDownUnSupportedException( + String.format( + "Requested offset %d should be less than the max result window %d", + newStartFrom, newScan.osIndex.getMaxResultWindow())); + } newScan.pushDownContext.add( PushDownAction.of( PushDownType.LIMIT,