diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java index 4185d55c55..cdd3d3a103 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java @@ -18,7 +18,6 @@ import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.LiteralExpression; import org.opensearch.sql.expression.NamedExpression; -import org.opensearch.sql.expression.ParseExpression; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.aggregation.NamedAggregator; import org.opensearch.sql.expression.window.WindowDefinition; diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 15f0261f0d..5c339cc7bb 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -31,6 +31,7 @@ import java.nio.file.Paths; import java.util.Locale; +import static com.google.common.base.Strings.isNullOrEmpty; import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient; import static org.opensearch.sql.legacy.TestUtils.getAccountIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getBankIndexMapping; @@ -71,6 +72,8 @@ public abstract class SQLIntegTestCase extends OpenSearchSQLRestTestCase { public static final String TRANSIENT = "transient"; public static final Integer DEFAULT_QUERY_SIZE_LIMIT = Integer.parseInt(System.getProperty("defaultQuerySizeLimit", "200")); + public static final Integer DEFAULT_MAX_RESULT_WINDOW = + Integer.parseInt(System.getProperty("defaultMaxResultWindow", "10000")); public boolean shouldResetQuerySizeLimit() { return true; @@ -161,6 +164,15 @@ protected static void wipeAllClusterSettings() throws IOException { updateClusterSettings(new ClusterSetting("transient", "*", null)); } + protected void setMaxResultWindow(String indexName, Integer window) throws IOException { + updateIndexSettings(indexName, "{ \"index\": { \"max_result_window\":" + window + " } }"); + } + + protected void resetMaxResultWindow(String indexName) throws IOException { + updateIndexSettings(indexName, + "{ \"index\": { \"max_result_window\": " + DEFAULT_MAX_RESULT_WINDOW + " } }"); + } + /** * Provide for each test to load test index, data and other setup work */ @@ -378,6 +390,18 @@ public String toString() { } } + protected static JSONObject updateIndexSettings(String indexName, String setting) + throws IOException { + Request request = new Request("PUT", "/" + indexName + "/_settings"); + if (!isNullOrEmpty(setting)) { + request.setJsonEntity(setting); + } + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + return new JSONObject(executeRequest(request)); + } + protected String makeRequest(String query) { return makeRequest(query, 0); } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/HeadCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/HeadCommandIT.java index 1ae45ab469..48c489ce10 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/HeadCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/HeadCommandIT.java @@ -14,6 +14,7 @@ import org.json.JSONObject; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.jupiter.api.Test; public class HeadCommandIT extends PPLIntegTestCase { @@ -26,6 +27,7 @@ public void beforeTest() throws IOException { @After public void afterTest() throws IOException { resetQuerySizeLimit(); + resetMaxResultWindow(TEST_INDEX_ACCOUNT); } @Override @@ -60,6 +62,76 @@ public void testHeadWithNumber() throws IOException { rows("Nanette", 28)); } + @Ignore("Fix https://github.com/opensearch-project/sql/issues/703#issuecomment-1211422130") + @Test + public void testHeadWithNumberLargerThanQuerySizeLimit() throws IOException { + setQuerySizeLimit(5); + JSONObject result = + executeQuery(String.format( + "source=%s | fields firstname, age | head 10", TEST_INDEX_ACCOUNT)); + verifyDataRows(result, + rows("Amber", 32), + rows("Hattie", 36), + rows("Nanette", 28), + rows("Dale", 33), + rows("Elinor", 36), + rows("Virginia", 39), + rows("Dillard", 34), + rows("Mcgee", 39), + rows("Aurelia", 37), + rows("Fulton", 23)); + } + + @Test + public void testHeadWithNumberLargerThanMaxResultWindow() throws IOException { + setMaxResultWindow(TEST_INDEX_ACCOUNT, 10); + JSONObject result = + executeQuery(String.format( + "source=%s | fields firstname, age | head 15", TEST_INDEX_ACCOUNT)); + verifyDataRows(result, + rows("Amber", 32), + rows("Hattie", 36), + rows("Nanette", 28), + rows("Dale", 33), + rows("Elinor", 36), + rows("Virginia", 39), + rows("Dillard", 34), + rows("Mcgee", 39), + rows("Aurelia", 37), + rows("Fulton", 23), + rows("Burton", 31), + rows("Josie", 32), + rows("Hughes", 30), + rows("Hall", 25), + rows("Deidre", 33)); + } + + @Ignore("Fix https://github.com/opensearch-project/sql/issues/703#issuecomment-1211422130") + @Test + public void testHeadWithLargeNumber() throws IOException { + setQuerySizeLimit(5); + setMaxResultWindow(TEST_INDEX_ACCOUNT, 10); + JSONObject result = + executeQuery(String.format( + "source=%s | fields firstname, age | head 15", TEST_INDEX_ACCOUNT)); + verifyDataRows(result, + rows("Amber", 32), + rows("Hattie", 36), + rows("Nanette", 28), + rows("Dale", 33), + rows("Elinor", 36), + rows("Virginia", 39), + rows("Dillard", 34), + rows("Mcgee", 39), + rows("Aurelia", 37), + rows("Fulton", 23), + rows("Burton", 31), + rows("Josie", 32), + rows("Hughes", 30), + rows("Hall", 25), + rows("Deidre", 33)); + } + @Test public void testHeadWithNumberAndFrom() throws IOException { JSONObject result = diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java index c1b7d782d2..09a83f65a5 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java @@ -30,6 +30,14 @@ public interface OpenSearchClient { */ Map getIndexMappings(String... indexExpression); + /** + * Fetch index.max_result_window settings according to index expression given. + * + * @param indexExpression index expression + * @return map from index name to its max result window + */ + Map getIndexMaxResultWindows(String... indexExpression); + /** * Perform search query in the search request. * diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index fe26280812..db35f3580c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -24,11 +24,14 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.AliasMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexSettings; import org.opensearch.sql.opensearch.mapping.IndexMapping; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; @@ -86,6 +89,29 @@ public Map getIndexMappings(String... indexExpression) { } } + /** + * Fetch index.max_result_window settings according to index expression given. + * + * @param indexExpression index expression + * @return map from index name to its max result window + */ + @Override + public Map getIndexMaxResultWindows(String... indexExpression) { + ClusterState state = clusterService.state(); + ImmutableOpenMap indicesMetadata = state.metadata().getIndices(); + String[] concreteIndices = resolveIndexExpression(state, indexExpression); + + ImmutableMap.Builder result = ImmutableMap.builder(); + for (String index : concreteIndices) { + Settings settings = indicesMetadata.get(index).getSettings(); + Integer maxResultWindow = settings.getAsInt("index.max_result_window", + IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(settings)); + result.put(index, maxResultWindow); + } + + return result.build(); + } + /** * TODO: Scroll doesn't work for aggregation. Support aggregation later. */ diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java index 9da8c442e0..f354215e05 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java @@ -11,12 +11,15 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; @@ -26,6 +29,7 @@ import org.opensearch.client.indices.GetMappingsResponse; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.metadata.AliasMetadata; +import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; import org.opensearch.sql.opensearch.mapping.IndexMapping; import org.opensearch.sql.opensearch.request.OpenSearchRequest; @@ -54,6 +58,36 @@ public Map getIndexMappings(String... indexExpression) { } } + @Override + public Map getIndexMaxResultWindows(String... indexExpression) { + GetSettingsRequest request = new GetSettingsRequest() + .indices(indexExpression).includeDefaults(true); + try { + GetSettingsResponse response = client.indices().getSettings(request, RequestOptions.DEFAULT); + ImmutableOpenMap settings = response.getIndexToSettings(); + ImmutableOpenMap defaultSettings = response.getIndexToDefaultSettings(); + Map result = new HashMap<>(); + + defaultSettings.forEach(entry -> { + Integer maxResultWindow = entry.value.getAsInt("index.max_result_window", null); + if (maxResultWindow != null) { + result.put(entry.key, maxResultWindow); + } + }); + + settings.forEach(entry -> { + Integer maxResultWindow = entry.value.getAsInt("index.max_result_window", null); + if (maxResultWindow != null) { + result.put(entry.key, maxResultWindow); + } + }); + + return result; + } catch (IOException e) { + throw new IllegalStateException("Failed to get max result window for " + indexExpression, e); + } + } + @Override public OpenSearchResponse search(OpenSearchRequest request) { return request.search( diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 5ca3670ca1..6f6fea841b 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -49,7 +49,7 @@ public class OpenSearchQueryRequest implements OpenSearchRequest { /** - * ElasticsearchExprValueFactory. + * OpenSearchExprValueFactory. */ @EqualsAndHashCode.Exclude @ToString.Exclude @@ -61,7 +61,7 @@ public class OpenSearchQueryRequest implements OpenSearchRequest { private boolean searchDone = false; /** - * Constructor of ElasticsearchQueryRequest. + * Constructor of OpenSearchQueryRequest. */ public OpenSearchQueryRequest(String indexName, int size, OpenSearchExprValueFactory factory) { @@ -69,7 +69,7 @@ public OpenSearchQueryRequest(String indexName, int size, } /** - * Constructor of ElasticsearchQueryRequest. + * Constructor of OpenSearchQueryRequest. */ public OpenSearchQueryRequest(IndexName indexName, int size, OpenSearchExprValueFactory factory) { @@ -81,6 +81,16 @@ public OpenSearchQueryRequest(IndexName indexName, int size, this.exprValueFactory = factory; } + /** + * Constructor of OpenSearchQueryRequest. + */ + public OpenSearchQueryRequest(IndexName indexName, SearchSourceBuilder sourceBuilder, + OpenSearchExprValueFactory factory) { + this.indexName = indexName; + this.sourceBuilder = sourceBuilder; + this.exprValueFactory = factory; + } + @Override public OpenSearchResponse search(Function searchAction, Function scrollAction) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java new file mode 100644 index 0000000000..646395d790 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -0,0 +1,202 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.opensearch.request; + +import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; +import static org.opensearch.search.sort.SortOrder.ASC; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; +import org.opensearch.search.sort.SortBuilder; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.common.utils.StringUtils; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; +import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; + +/** + * OpenSearch search request builder. + */ +@EqualsAndHashCode +@Getter +@ToString +public class OpenSearchRequestBuilder { + + /** + * Default query timeout in minutes. + */ + public static final TimeValue DEFAULT_QUERY_TIMEOUT = TimeValue.timeValueMinutes(1L); + + /** + * {@link OpenSearchRequest.IndexName}. + */ + private final OpenSearchRequest.IndexName indexName; + + /** + * Index max result window. + */ + private final Integer maxResultWindow; + + /** + * Search request source builder. + */ + private final SearchSourceBuilder sourceBuilder; + + /** + * OpenSearchExprValueFactory. + */ + @EqualsAndHashCode.Exclude + @ToString.Exclude + private final OpenSearchExprValueFactory exprValueFactory; + + /** + * Query size of the request. + */ + private Integer querySize; + + public OpenSearchRequestBuilder(String indexName, + Integer maxResultWindow, + Settings settings, + OpenSearchExprValueFactory exprValueFactory) { + this(new OpenSearchRequest.IndexName(indexName), maxResultWindow, settings, exprValueFactory); + } + + /** + * Constructor. + */ + public OpenSearchRequestBuilder(OpenSearchRequest.IndexName indexName, + Integer maxResultWindow, + Settings settings, + OpenSearchExprValueFactory exprValueFactory) { + this.indexName = indexName; + this.maxResultWindow = maxResultWindow; + this.sourceBuilder = new SearchSourceBuilder(); + this.exprValueFactory = exprValueFactory; + this.querySize = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT); + sourceBuilder.from(0); + sourceBuilder.size(querySize); + sourceBuilder.timeout(DEFAULT_QUERY_TIMEOUT); + } + + /** + * Build DSL request. + * + * @return query request or scroll request + */ + public OpenSearchRequest build() { + Integer from = sourceBuilder.from(); + Integer size = sourceBuilder.size(); + + if (from + size <= maxResultWindow) { + return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory); + } else { + sourceBuilder.size(maxResultWindow - from); + return new OpenSearchScrollRequest(indexName, sourceBuilder, exprValueFactory); + } + } + + /** + * Push down query to DSL request. + * + * @param query query request + */ + public void pushDown(QueryBuilder query) { + QueryBuilder current = sourceBuilder.query(); + + if (current == null) { + sourceBuilder.query(query); + } else { + if (isBoolFilterQuery(current)) { + ((BoolQueryBuilder) current).filter(query); + } else { + sourceBuilder.query(QueryBuilders.boolQuery() + .filter(current) + .filter(query)); + } + } + + if (sourceBuilder.sorts() == null) { + sourceBuilder.sort(DOC_FIELD_NAME, ASC); // Make sure consistent order + } + } + + /** + * Push down aggregation to DSL request. + * + * @param aggregationBuilder pair of aggregation query and aggregation parser. + */ + public void pushDownAggregation( + Pair, OpenSearchAggregationResponseParser> aggregationBuilder) { + aggregationBuilder.getLeft().forEach(builder -> sourceBuilder.aggregation(builder)); + sourceBuilder.size(0); + exprValueFactory.setParser(aggregationBuilder.getRight()); + } + + /** + * Push down sort to DSL request. + * + * @param sortBuilders sortBuilders. + */ + public void pushDownSort(List> sortBuilders) { + for (SortBuilder sortBuilder : sortBuilders) { + sourceBuilder.sort(sortBuilder); + } + } + + /** + * Push down size (limit) and from (offset) to DSL request. + */ + public void pushDownLimit(Integer limit, Integer offset) { + querySize = limit; + sourceBuilder.from(offset).size(limit); + } + + /** + * Add highlight to DSL requests. + * @param field name of the field to highlight + */ + public void pushDownHighlight(String field) { + if (sourceBuilder.highlighter() != null) { + sourceBuilder.highlighter().field(StringUtils.unquoteText(field)); + } else { + HighlightBuilder highlightBuilder = + new HighlightBuilder().field(StringUtils.unquoteText(field)); + sourceBuilder.highlighter(highlightBuilder); + } + } + + /** + * Push down project list to DSL requets. + */ + public void pushDownProjects(Set projects) { + final Set projectsSet = + projects.stream().map(ReferenceExpression::getAttr).collect(Collectors.toSet()); + sourceBuilder.fetchSource(projectsSet.toArray(new String[0]), new String[0]); + } + + public void pushTypeMapping(Map typeMapping) { + exprValueFactory.setTypeMapping(typeMapping); + } + + private boolean isBoolFilterQuery(QueryBuilder current) { + return (current instanceof BoolQueryBuilder); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java index ebbebcd8eb..4509e443c0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java @@ -54,10 +54,12 @@ public class OpenSearchScrollRequest implements OpenSearchRequest { private String scrollId; /** Search request source builder. */ - private final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + private final SearchSourceBuilder sourceBuilder; + /** Constructor. */ public OpenSearchScrollRequest(IndexName indexName, OpenSearchExprValueFactory exprValueFactory) { this.indexName = indexName; + this.sourceBuilder = new SearchSourceBuilder(); this.exprValueFactory = exprValueFactory; } @@ -65,6 +67,16 @@ public OpenSearchScrollRequest(String indexName, OpenSearchExprValueFactory expr this(new IndexName(indexName), exprValueFactory); } + /** Constructor. */ + public OpenSearchScrollRequest(IndexName indexName, + SearchSourceBuilder sourceBuilder, + OpenSearchExprValueFactory exprValueFactory) { + this.indexName = indexName; + this.sourceBuilder = sourceBuilder; + this.exprValueFactory = exprValueFactory; + } + + /** Constructor. */ @Override public OpenSearchResponse search(Function searchAction, Function scrollAction) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java index 5c6d3687c6..f321497099 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java @@ -121,6 +121,16 @@ public Map getFieldTypes() { return fieldTypes; } + /** + * Get the minimum of the max result windows of the indices. + * + * @return max result window + */ + public Integer getMaxResultWindow() { + return client.getIndexMaxResultWindows(indexName.getIndexNames()) + .values().stream().min(Integer::compare).get(); + } + private ExprType transformESTypeToExprType(String openSearchType) { return OPENSEARCH_TYPE_TO_EXPR_TYPE_MAPPING.getOrDefault(openSearchType, ExprCoreType.UNKNOWN); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index c028f283a2..ef6159020f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -58,6 +58,11 @@ public class OpenSearchIndex implements Table { */ private Map cachedFieldTypes = null; + /** + * The cached max result window setting of index. + */ + private Integer cachedMaxResultWindow = null; + /** * Constructor. */ @@ -80,13 +85,24 @@ public Map getFieldTypes() { return cachedFieldTypes; } + /** + * Get the max result window setting of the table. + */ + public Integer getMaxResultWindow() { + if (cachedMaxResultWindow == null) { + cachedMaxResultWindow = + new OpenSearchDescribeIndexRequest(client, indexName).getMaxResultWindow(); + } + return cachedMaxResultWindow; + } + /** * TODO: Push down operations to index scan operator as much as possible in future. */ @Override public PhysicalPlan implement(LogicalPlan plan) { OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, settings, indexName, - new OpenSearchExprValueFactory(getFieldTypes())); + getMaxResultWindow(), new OpenSearchExprValueFactory(getFieldTypes())); /* * Visit logical plan with index scan as context so logical operators visited, such as @@ -128,7 +144,7 @@ public PhysicalPlan visitIndexScan(OpenSearchLogicalIndexScan node, OpenSearchIndexScan context) { if (null != node.getSortList()) { final SortQueryBuilder builder = new SortQueryBuilder(); - context.pushDownSort(node.getSortList().stream() + context.getRequestBuilder().pushDownSort(node.getSortList().stream() .map(sort -> builder.build(sort.getValue(), sort.getKey())) .collect(Collectors.toList())); } @@ -136,15 +152,15 @@ public PhysicalPlan visitIndexScan(OpenSearchLogicalIndexScan node, if (null != node.getFilter()) { FilterQueryBuilder queryBuilder = new FilterQueryBuilder(new DefaultExpressionSerializer()); QueryBuilder query = queryBuilder.build(node.getFilter()); - context.pushDown(query); + context.getRequestBuilder().pushDown(query); } if (node.getLimit() != null) { - context.pushDownLimit(node.getLimit(), node.getOffset()); + context.getRequestBuilder().pushDownLimit(node.getLimit(), node.getOffset()); } if (node.hasProjects()) { - context.pushDownProjects(node.getProjectList()); + context.getRequestBuilder().pushDownProjects(node.getProjectList()); } return indexScan; } @@ -158,15 +174,15 @@ public PhysicalPlan visitIndexAggregation(OpenSearchLogicalIndexAgg node, FilterQueryBuilder queryBuilder = new FilterQueryBuilder( new DefaultExpressionSerializer()); QueryBuilder query = queryBuilder.build(node.getFilter()); - context.pushDown(query); + context.getRequestBuilder().pushDown(query); } AggregationQueryBuilder builder = new AggregationQueryBuilder(new DefaultExpressionSerializer()); Pair, OpenSearchAggregationResponseParser> aggregationBuilder = builder.buildAggregationBuilder(node.getAggregatorList(), node.getGroupByList(), node.getSortList()); - context.pushDownAggregation(aggregationBuilder); - context.pushTypeMapping( + context.getRequestBuilder().pushDownAggregation(aggregationBuilder); + context.getRequestBuilder().pushTypeMapping( builder.buildTypeMapping(node.getAggregatorList(), node.getGroupByList())); return indexScan; @@ -191,7 +207,7 @@ public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) { @Override public PhysicalPlan visitHighlight(LogicalHighlight node, OpenSearchIndexScan context) { - context.pushDownHighlight(node.getHighlightField().toString()); + context.getRequestBuilder().pushDownHighlight(node.getHighlightField().toString()); return visitChild(node, context); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java index 6e88f3de89..e9746e1fae 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java @@ -6,38 +6,18 @@ package org.opensearch.sql.opensearch.storage; -import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; -import static org.opensearch.search.sort.SortOrder.ASC; - -import com.google.common.collect.Iterables; -import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; -import org.apache.commons.lang3.tuple.Pair; -import org.opensearch.index.query.BoolQueryBuilder; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.aggregations.AggregationBuilder; -import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; -import org.opensearch.search.sort.SortBuilder; import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.common.utils.StringUtils; import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.data.type.ExprType; -import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; -import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequest; +import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.response.OpenSearchResponse; -import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; import org.opensearch.sql.storage.TableScanOperator; /** @@ -50,11 +30,24 @@ public class OpenSearchIndexScan extends TableScanOperator { /** OpenSearch client. */ private final OpenSearchClient client; - /** Search request. */ + /** Search request builder. */ @EqualsAndHashCode.Include @Getter @ToString.Include - private final OpenSearchRequest request; + private final OpenSearchRequestBuilder requestBuilder; + + /** Search request. */ + @EqualsAndHashCode.Include + @ToString.Include + private OpenSearchRequest request; + + /** Total query size. */ + @EqualsAndHashCode.Include + @ToString.Include + private Integer querySize; + + /** Number of rows returned. */ + private Integer queryCount; /** Search response for current batch. */ private Iterator iterator; @@ -62,133 +55,57 @@ public class OpenSearchIndexScan extends TableScanOperator { /** * Constructor. */ - public OpenSearchIndexScan(OpenSearchClient client, - Settings settings, String indexName, + public OpenSearchIndexScan(OpenSearchClient client, Settings settings, + String indexName, Integer maxResultWindow, OpenSearchExprValueFactory exprValueFactory) { - this(client, settings, new OpenSearchRequest.IndexName(indexName), exprValueFactory); + this(client, settings, + new OpenSearchRequest.IndexName(indexName),maxResultWindow, exprValueFactory); } /** * Constructor. */ - public OpenSearchIndexScan(OpenSearchClient client, - Settings settings, OpenSearchRequest.IndexName indexName, - OpenSearchExprValueFactory exprValueFactory) { + public OpenSearchIndexScan(OpenSearchClient client, Settings settings, + OpenSearchRequest.IndexName indexName, Integer maxResultWindow, + OpenSearchExprValueFactory exprValueFactory) { this.client = client; - this.request = new OpenSearchQueryRequest(indexName, - settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), exprValueFactory); + this.requestBuilder = new OpenSearchRequestBuilder( + indexName, maxResultWindow, settings,exprValueFactory); } @Override public void open() { super.open(); - - // For now pull all results immediately once open - List responses = new ArrayList<>(); - OpenSearchResponse response = client.search(request); - while (!response.isEmpty()) { - responses.add(response); - response = client.search(request); - } - iterator = Iterables.concat(responses.toArray(new OpenSearchResponse[0])).iterator(); + querySize = requestBuilder.getQuerySize(); + request = requestBuilder.build(); + iterator = Collections.emptyIterator(); + queryCount = 0; + fetchNextBatch(); } @Override public boolean hasNext() { + if (queryCount >= querySize) { + iterator = Collections.emptyIterator(); + } else if (!iterator.hasNext()) { + fetchNextBatch(); + } return iterator.hasNext(); } @Override public ExprValue next() { + queryCount++; return iterator.next(); } - /** - * Push down query to DSL request. - * @param query query request - */ - public void pushDown(QueryBuilder query) { - SearchSourceBuilder source = request.getSourceBuilder(); - QueryBuilder current = source.query(); - - if (current == null) { - source.query(query); - } else { - if (isBoolFilterQuery(current)) { - ((BoolQueryBuilder) current).filter(query); - } else { - source.query(QueryBuilders.boolQuery() - .filter(current) - .filter(query)); - } - } - - if (source.sorts() == null) { - source.sort(DOC_FIELD_NAME, ASC); // Make sure consistent order - } - } - - /** - * Push down aggregation to DSL request. - * @param aggregationBuilder pair of aggregation query and aggregation parser. - */ - public void pushDownAggregation( - Pair, OpenSearchAggregationResponseParser> aggregationBuilder) { - SearchSourceBuilder source = request.getSourceBuilder(); - aggregationBuilder.getLeft().forEach(builder -> source.aggregation(builder)); - source.size(0); - request.getExprValueFactory().setParser(aggregationBuilder.getRight()); - } - - /** - * Push down sort to DSL request. - * - * @param sortBuilders sortBuilders. - */ - public void pushDownSort(List> sortBuilders) { - SearchSourceBuilder source = request.getSourceBuilder(); - for (SortBuilder sortBuilder : sortBuilders) { - source.sort(sortBuilder); - } - } - - /** - * Push down size (limit) and from (offset) to DSL request. - */ - public void pushDownLimit(Integer limit, Integer offset) { - SearchSourceBuilder sourceBuilder = request.getSourceBuilder(); - sourceBuilder.from(offset).size(limit); - } - - /** - * Add highlight to DSL requests. - * @param field name of the field to highlight - */ - public void pushDownHighlight(String field) { - SearchSourceBuilder sourceBuilder = request.getSourceBuilder(); - if (sourceBuilder.highlighter() != null) { - sourceBuilder.highlighter().field(StringUtils.unquoteText(field)); - } else { - HighlightBuilder highlightBuilder = - new HighlightBuilder().field(StringUtils.unquoteText(field)); - sourceBuilder.highlighter(highlightBuilder); + private void fetchNextBatch() { + OpenSearchResponse response = client.search(request); + if (!response.isEmpty()) { + iterator = response.iterator(); } } - /** - * Push down project list to DSL requets. - */ - public void pushDownProjects(Set projects) { - SearchSourceBuilder sourceBuilder = request.getSourceBuilder(); - final Set projectsSet = - projects.stream().map(ReferenceExpression::getAttr).collect(Collectors.toSet()); - sourceBuilder.fetchSource(projectsSet.toArray(new String[0]), new String[0]); - } - - public void pushTypeMapping(Map typeMapping) { - request.getExprValueFactory().setTypeMapping(typeMapping); - } - @Override public void close() { super.close(); @@ -196,12 +113,8 @@ public void close() { client.cleanup(request); } - private boolean isBoolFilterQuery(QueryBuilder current) { - return (current instanceof BoolQueryBuilder); - } - @Override public String explain() { - return getRequest().toString(); + return getRequestBuilder().build().toString(); } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java index 50410e07cc..8fdb93427b 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.URL; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -72,6 +73,7 @@ class OpenSearchNodeClientTest { private static final String TEST_MAPPING_FILE = "mappings/accounts.json"; + private static final String TEST_MAPPING_SETTINGS_FILE = "mappings/accounts2.json"; @Mock(answer = RETURNS_DEEP_STUBS) private NodeClient nodeClient; @@ -151,6 +153,36 @@ public void getIndexMappingsWithNonExistIndex() { assertThrows(IndexNotFoundException.class, () -> client.getIndexMappings("non_exist_index")); } + @Test + public void getIndexMaxResultWindows() throws IOException { + URL url = Resources.getResource(TEST_MAPPING_SETTINGS_FILE); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "accounts"; + ClusterService clusterService = mockClusterServiceForSettings(indexName, mappings); + OpenSearchNodeClient client = new OpenSearchNodeClient(clusterService, nodeClient); + + Map indexMaxResultWindows = client.getIndexMaxResultWindows(indexName); + assertEquals(1, indexMaxResultWindows.size()); + + Integer indexMaxResultWindow = indexMaxResultWindows.values().iterator().next(); + assertEquals(100, indexMaxResultWindow); + } + + @Test + public void getIndexMaxResultWindowsWithDefaultSettings() throws IOException { + URL url = Resources.getResource(TEST_MAPPING_FILE); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "accounts"; + ClusterService clusterService = mockClusterServiceForSettings(indexName, mappings); + OpenSearchNodeClient client = new OpenSearchNodeClient(clusterService, nodeClient); + + Map indexMaxResultWindows = client.getIndexMaxResultWindows(indexName); + assertEquals(1, indexMaxResultWindows.size()); + + Integer indexMaxResultWindow = indexMaxResultWindows.values().iterator().next(); + assertEquals(10000, indexMaxResultWindow); + } + /** Jacoco enforce this constant lambda be tested. */ @Test public void testAllFieldsPredicate() { @@ -353,6 +385,33 @@ public ClusterService mockClusterService(String indexName, Throwable t) { return mockService; } + public ClusterService mockClusterServiceForSettings(String indexName, String mappings) { + ClusterService mockService = mock(ClusterService.class); + ClusterState mockState = mock(ClusterState.class); + Metadata mockMetaData = mock(Metadata.class); + + when(mockService.state()).thenReturn(mockState); + when(mockState.metadata()).thenReturn(mockMetaData); + try { + ImmutableOpenMap.Builder indexBuilder = + ImmutableOpenMap.builder(); + IndexMetadata indexMetadata = IndexMetadata.fromXContent(createParser(mappings)); + + indexBuilder.put(indexName, indexMetadata); + when(mockMetaData.getIndices()).thenReturn(indexBuilder.build()); + + // IndexNameExpressionResolver use this method to check if index exists. If not, + // IndexNotFoundException is thrown. + IndexAbstraction indexAbstraction = mock(IndexAbstraction.class); + when(indexAbstraction.getIndices()).thenReturn(Collections.singletonList(indexMetadata)); + when(mockMetaData.getIndicesLookup()) + .thenReturn(ImmutableSortedMap.of(indexName, indexAbstraction)); + } catch (IOException e) { + throw new IllegalStateException("Failed to mock cluster service", e); + } + return mockService; + } + private XContentParser createParser(String mappings) throws IOException { return XContentType.JSON .xContent() diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java index 0c2503ea57..bc334aaf39 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java @@ -34,6 +34,8 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse; +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; @@ -43,6 +45,7 @@ import org.opensearch.client.indices.GetMappingsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.DeprecationHandler; import org.opensearch.common.xcontent.NamedXContentRegistry; @@ -128,6 +131,61 @@ void getIndexMappingsWithIOException() throws IOException { assertThrows(IllegalStateException.class, () -> client.getIndexMappings("test")); } + @Test + void getIndexMaxResultWindowsSettings() throws IOException { + String indexName = "test"; + Integer maxResultWindow = 1000; + + GetSettingsResponse response = mock(GetSettingsResponse.class); + Settings maxResultWindowSettings = Settings.builder() + .put("index.max_result_window", maxResultWindow) + .build(); + Settings emptySettings = Settings.builder().build(); + ImmutableOpenMap indexToSettings = + mockSettings(indexName, maxResultWindowSettings); + ImmutableOpenMap indexToDefaultSettings = + mockSettings(indexName, emptySettings); + when(response.getIndexToSettings()).thenReturn(indexToSettings); + when(response.getIndexToDefaultSettings()).thenReturn(indexToDefaultSettings); + when(restClient.indices().getSettings(any(GetSettingsRequest.class), any())) + .thenReturn(response); + + Map indexMaxResultWindows = client.getIndexMaxResultWindows(indexName); + assertEquals(1, indexMaxResultWindows.size()); + assertEquals(maxResultWindow, indexMaxResultWindows.values().iterator().next()); + } + + @Test + void getIndexMaxResultWindowsDefaultSettings() throws IOException { + String indexName = "test"; + Integer maxResultWindow = 10000; + + GetSettingsResponse response = mock(GetSettingsResponse.class); + Settings maxResultWindowSettings = Settings.builder() + .put("index.max_result_window", maxResultWindow) + .build(); + Settings emptySettings = Settings.builder().build(); + ImmutableOpenMap indexToSettings = + mockSettings(indexName, emptySettings); + ImmutableOpenMap indexToDefaultSettings = + mockSettings(indexName, maxResultWindowSettings); + when(response.getIndexToSettings()).thenReturn(indexToSettings); + when(response.getIndexToDefaultSettings()).thenReturn(indexToDefaultSettings); + when(restClient.indices().getSettings(any(GetSettingsRequest.class), any())) + .thenReturn(response); + + Map indexMaxResultWindows = client.getIndexMaxResultWindows(indexName); + assertEquals(1, indexMaxResultWindows.size()); + assertEquals(maxResultWindow, indexMaxResultWindows.values().iterator().next()); + } + + @Test + void getIndexMaxResultWindowsWithIOException() throws IOException { + when(restClient.indices().getSettings(any(GetSettingsRequest.class), any())) + .thenThrow(new IOException()); + assertThrows(IllegalStateException.class, () -> client.getIndexMaxResultWindows("test")); + } + @Test void search() throws IOException { // Mock first scroll request @@ -277,6 +335,12 @@ private Map mockFieldMappings(String indexName, String return ImmutableMap.of(indexName, IndexMetadata.fromXContent(createParser(mappings)).mapping()); } + private ImmutableOpenMap mockSettings(String indexName, Settings settings) { + ImmutableOpenMap.Builder indexToSettingsBuilder = ImmutableOpenMap.builder(); + indexToSettingsBuilder.put(indexName, settings); + return indexToSettingsBuilder.build(); + } + private XContentParser createParser(String mappings) throws IOException { return XContentType.JSON .xContent() diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java index 24c305a75e..f1a0a7d5d7 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import lombok.RequiredArgsConstructor; import org.junit.jupiter.api.BeforeEach; @@ -126,7 +127,7 @@ void explainSuccessfully() { Settings settings = mock(Settings.class); when(settings.getSettingValue(QUERY_SIZE_LIMIT)).thenReturn(100); PhysicalPlan plan = new OpenSearchIndexScan(mock(OpenSearchClient.class), - settings, "test", mock(OpenSearchExprValueFactory.class)); + settings, "test", 10000, mock(OpenSearchExprValueFactory.class)); AtomicReference result = new AtomicReference<>(); executor.explain(plan, new ResponseListener() { diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index ee981a4abc..e5c5046b81 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -88,6 +88,7 @@ public void testProtectIndexScan() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); String indexName = "test"; + Integer maxResultWindow = 10000; NamedExpression include = named("age", ref("age", INTEGER)); ReferenceExpression exclude = ref("name", STRING); ReferenceExpression dedupeField = ref("name", STRING); @@ -124,7 +125,7 @@ public void testProtectIndexScan() { resourceMonitor( new OpenSearchIndexScan( client, settings, indexName, - exprValueFactory)), + maxResultWindow, exprValueFactory)), filterExpr), aggregators, groupByExprs), @@ -152,7 +153,7 @@ public void testProtectIndexScan() { filter( new OpenSearchIndexScan( client, settings, indexName, - exprValueFactory), + maxResultWindow, exprValueFactory), filterExpr), aggregators, groupByExprs), diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java new file mode 100644 index 0000000000..43b9353190 --- /dev/null +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.opensearch.request; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; + +@ExtendWith(MockitoExtension.class) +public class OpenSearchRequestBuilderTest { + + public static final TimeValue DEFAULT_QUERY_TIMEOUT = TimeValue.timeValueMinutes(1L); + @Mock + private Settings settings; + + @Mock + private OpenSearchExprValueFactory factory; + + @BeforeEach + void setup() { + when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + } + + @Test + void buildQueryRequest() { + Integer maxResultWindow = 500; + Integer limit = 200; + Integer offset = 0; + OpenSearchRequestBuilder builder = + new OpenSearchRequestBuilder("test", maxResultWindow, settings, factory); + builder.pushDownLimit(limit, offset); + + assertEquals( + new OpenSearchQueryRequest( + new OpenSearchRequest.IndexName("test"), + new SearchSourceBuilder() + .from(offset) + .size(limit) + .timeout(DEFAULT_QUERY_TIMEOUT), + factory), + builder.build()); + } + + @Test + void buildScrollRequestWithCorrectSize() { + Integer maxResultWindow = 500; + Integer limit = 800; + Integer offset = 10; + OpenSearchRequestBuilder builder = + new OpenSearchRequestBuilder("test", maxResultWindow, settings, factory); + builder.pushDownLimit(limit, offset); + + assertEquals( + new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), + new SearchSourceBuilder() + .from(offset) + .size(maxResultWindow - offset) + .timeout(DEFAULT_QUERY_TIMEOUT), + factory), + builder.build()); + } +} diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java index c83172955c..b85d60c1fb 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java @@ -20,6 +20,7 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.planner.logical.LogicalAD; import org.opensearch.sql.planner.logical.LogicalHighlight; import org.opensearch.sql.planner.logical.LogicalMLCommons; @@ -76,10 +77,12 @@ public void visitHighlight() { LogicalHighlight node = Mockito.mock(LogicalHighlight.class, Answers.RETURNS_DEEP_STUBS); Mockito.when(node.getChild().get(0)).thenReturn(Mockito.mock(LogicalPlan.class)); + OpenSearchRequestBuilder requestBuilder = Mockito.mock(OpenSearchRequestBuilder.class); + Mockito.when(indexScan.getRequestBuilder()).thenReturn(requestBuilder); OpenSearchIndex.OpenSearchDefaultImplementor implementor = new OpenSearchIndex.OpenSearchDefaultImplementor(indexScan, client); implementor.visitHighlight(node, indexScan); - verify(indexScan).pushDownHighlight(any()); + verify(requestBuilder).pushDownHighlight(any()); } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java index 41769914d9..a1f2869ca5 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java @@ -62,7 +62,7 @@ void setup() { void queryEmptyResult() { mockResponse(); try (OpenSearchIndexScan indexScan = - new OpenSearchIndexScan(client, settings, "test", exprValueFactory)) { + new OpenSearchIndexScan(client, settings, "test", 3, exprValueFactory)) { indexScan.open(); assertFalse(indexScan.hasNext()); } @@ -70,13 +70,90 @@ void queryEmptyResult() { } @Test - void queryAllResults() { + void queryAllResultsWithQuery() { + mockResponse(new ExprValue[]{ + employee(1, "John", "IT"), + employee(2, "Smith", "HR"), + employee(3, "Allen", "IT")}); + + try (OpenSearchIndexScan indexScan = + new OpenSearchIndexScan(client, settings, "employees", 10, exprValueFactory)) { + indexScan.open(); + + assertTrue(indexScan.hasNext()); + assertEquals(employee(1, "John", "IT"), indexScan.next()); + + assertTrue(indexScan.hasNext()); + assertEquals(employee(2, "Smith", "HR"), indexScan.next()); + + assertTrue(indexScan.hasNext()); + assertEquals(employee(3, "Allen", "IT"), indexScan.next()); + + assertFalse(indexScan.hasNext()); + } + verify(client).cleanup(any()); + } + + @Test + void queryAllResultsWithScroll() { mockResponse( new ExprValue[]{employee(1, "John", "IT"), employee(2, "Smith", "HR")}, new ExprValue[]{employee(3, "Allen", "IT")}); try (OpenSearchIndexScan indexScan = - new OpenSearchIndexScan(client, settings, "employees", exprValueFactory)) { + new OpenSearchIndexScan(client, settings, "employees", 2, exprValueFactory)) { + indexScan.open(); + + assertTrue(indexScan.hasNext()); + assertEquals(employee(1, "John", "IT"), indexScan.next()); + + assertTrue(indexScan.hasNext()); + assertEquals(employee(2, "Smith", "HR"), indexScan.next()); + + assertTrue(indexScan.hasNext()); + assertEquals(employee(3, "Allen", "IT"), indexScan.next()); + + assertFalse(indexScan.hasNext()); + } + verify(client).cleanup(any()); + } + + @Test + void querySomeResultsWithQuery() { + mockResponse(new ExprValue[]{ + employee(1, "John", "IT"), + employee(2, "Smith", "HR"), + employee(3, "Allen", "IT"), + employee(4, "Bob", "HR")}); + + try (OpenSearchIndexScan indexScan = + new OpenSearchIndexScan(client, settings, "employees", 10, exprValueFactory)) { + indexScan.getRequestBuilder().pushDownLimit(3, 0); + indexScan.open(); + + assertTrue(indexScan.hasNext()); + assertEquals(employee(1, "John", "IT"), indexScan.next()); + + assertTrue(indexScan.hasNext()); + assertEquals(employee(2, "Smith", "HR"), indexScan.next()); + + assertTrue(indexScan.hasNext()); + assertEquals(employee(3, "Allen", "IT"), indexScan.next()); + + assertFalse(indexScan.hasNext()); + } + verify(client).cleanup(any()); + } + + @Test + void querySomeResultsWithScroll() { + mockResponse( + new ExprValue[]{employee(1, "John", "IT"), employee(2, "Smith", "HR")}, + new ExprValue[]{employee(3, "Allen", "IT"), employee(4, "Bob", "HR")}); + + try (OpenSearchIndexScan indexScan = + new OpenSearchIndexScan(client, settings, "employees", 2, exprValueFactory)) { + indexScan.getRequestBuilder().pushDownLimit(3, 0); indexScan.open(); assertTrue(indexScan.hasNext()); @@ -135,19 +212,19 @@ public PushDownAssertion(OpenSearchClient client, OpenSearchExprValueFactory valueFactory, Settings settings) { this.client = client; - this.indexScan = new OpenSearchIndexScan(client, settings, "test", valueFactory); + this.indexScan = new OpenSearchIndexScan(client, settings, "test", 10000, valueFactory); this.response = mock(OpenSearchResponse.class); this.factory = valueFactory; when(response.isEmpty()).thenReturn(true); } PushDownAssertion pushDown(QueryBuilder query) { - indexScan.pushDown(query); + indexScan.getRequestBuilder().pushDown(query); return this; } PushDownAssertion pushDownHighlight(String query) { - indexScan.pushDownHighlight(query); + indexScan.getRequestBuilder().pushDownHighlight(query); return this; } @@ -187,10 +264,8 @@ public OpenSearchResponse answer(InvocationOnMock invocation) { when(response.isEmpty()).thenReturn(false); ExprValue[] searchHit = searchHitBatches[batchNum]; when(response.iterator()).thenReturn(Arrays.asList(searchHit).iterator()); - } else if (batchNum == totalBatch) { - when(response.isEmpty()).thenReturn(true); } else { - fail("Search request after empty response returned already"); + when(response.isEmpty()).thenReturn(true); } batchNum++; diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 847ac8dfc0..f1754a455d 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -70,7 +70,6 @@ import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlanDSL; import org.opensearch.sql.planner.physical.ProjectOperator; -import org.opensearch.sql.storage.Table; @ExtendWith(MockitoExtension.class) class OpenSearchIndexTest { @@ -109,7 +108,7 @@ void getFieldTypes() { .put("blob", "binary") .build()))); - Table index = new OpenSearchIndex(client, settings, "test"); + OpenSearchIndex index = new OpenSearchIndex(client, settings, "test"); Map fieldTypes = index.getFieldTypes(); assertThat( fieldTypes, @@ -134,30 +133,35 @@ void getFieldTypes() { @Test void implementRelationOperatorOnly() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); String indexName = "test"; LogicalPlan plan = relation(indexName); - Table index = new OpenSearchIndex(client, settings, indexName); + OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName); + Integer maxResultWindow = index.getMaxResultWindow(); assertEquals( - new OpenSearchIndexScan(client, settings, indexName, exprValueFactory), + new OpenSearchIndexScan(client, settings, indexName, maxResultWindow, exprValueFactory), index.implement(plan)); } @Test void implementRelationOperatorWithOptimization() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); String indexName = "test"; LogicalPlan plan = relation(indexName); - Table index = new OpenSearchIndex(client, settings, indexName); + OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName); + Integer maxResultWindow = index.getMaxResultWindow(); assertEquals( - new OpenSearchIndexScan(client, settings, indexName, exprValueFactory), + new OpenSearchIndexScan(client, settings, indexName, maxResultWindow, exprValueFactory), index.implement(index.optimize(plan))); } @Test void implementOtherLogicalOperators() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); String indexName = "test"; NamedExpression include = named("age", ref("age", INTEGER)); @@ -191,7 +195,8 @@ void implementOtherLogicalOperators() { dedupeField), include); - Table index = new OpenSearchIndex(client, settings, indexName); + OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName); + Integer maxResultWindow = index.getMaxResultWindow(); assertEquals( PhysicalPlanDSL.project( PhysicalPlanDSL.dedupe( @@ -200,7 +205,7 @@ void implementOtherLogicalOperators() { PhysicalPlanDSL.remove( PhysicalPlanDSL.rename( new OpenSearchIndexScan(client, settings, indexName, - exprValueFactory), + maxResultWindow, exprValueFactory), mappings), exclude), newEvalField), @@ -213,6 +218,7 @@ void implementOtherLogicalOperators() { @Test void shouldImplLogicalIndexScan() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); ReferenceExpression field = ref("name", STRING); NamedExpression named = named("n", field); @@ -235,6 +241,7 @@ void shouldImplLogicalIndexScan() { @Test void shouldNotPushDownFilterFarFromRelation() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); ReferenceExpression field = ref("name", STRING); Expression filterExpr = dsl.equal(field, literal("John")); @@ -260,6 +267,7 @@ void shouldNotPushDownFilterFarFromRelation() { @Test void shouldImplLogicalIndexScanAgg() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); ReferenceExpression field = ref("name", STRING); Expression filterExpr = dsl.equal(field, literal("John")); @@ -296,6 +304,7 @@ void shouldImplLogicalIndexScanAgg() { @Test void shouldNotPushDownAggregationFarFromRelation() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); ReferenceExpression field = ref("name", STRING); Expression filterExpr = dsl.equal(field, literal("John")); @@ -320,6 +329,7 @@ void shouldNotPushDownAggregationFarFromRelation() { @Test void shouldImplIndexScanWithSort() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); ReferenceExpression field = ref("name", STRING); NamedExpression named = named("n", field); @@ -342,6 +352,7 @@ void shouldImplIndexScanWithSort() { @Test void shouldImplIndexScanWithLimit() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); ReferenceExpression field = ref("name", STRING); NamedExpression named = named("n", field); @@ -363,6 +374,7 @@ void shouldImplIndexScanWithLimit() { @Test void shouldImplIndexScanWithSortAndLimit() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); ReferenceExpression field = ref("name", STRING); NamedExpression named = named("n", field); @@ -387,6 +399,7 @@ void shouldImplIndexScanWithSortAndLimit() { @Test void shouldNotPushDownLimitFarFromRelationButUpdateScanSize() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); String indexName = "test"; OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName); @@ -411,6 +424,7 @@ void shouldNotPushDownLimitFarFromRelationButUpdateScanSize() { @Test void shouldPushDownProjects() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); String indexName = "test"; OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName); @@ -425,7 +439,7 @@ indexName, projects(ref("intV", INTEGER)) assertTrue(((ProjectOperator) plan).getInput() instanceof OpenSearchIndexScan); final FetchSourceContext fetchSource = - ((OpenSearchIndexScan) ((ProjectOperator) plan).getInput()).getRequest() + ((OpenSearchIndexScan) ((ProjectOperator) plan).getInput()).getRequestBuilder() .getSourceBuilder().fetchSource(); assertThat(fetchSource.includes(), arrayContaining("intV")); assertThat(fetchSource.excludes(), emptyArray()); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/utils/Utils.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/utils/Utils.java index 15ca9d491f..2ed9a16434 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/utils/Utils.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/utils/Utils.java @@ -31,7 +31,9 @@ public class Utils { * Build ElasticsearchLogicalIndexScan. */ public static LogicalPlan indexScan(String tableName, Expression filter) { - return OpenSearchLogicalIndexScan.builder().relationName(tableName).filter(filter).build(); + return OpenSearchLogicalIndexScan.builder().relationName(tableName) + .filter(filter) + .build(); } /** diff --git a/opensearch/src/test/resources/mappings/accounts2.json b/opensearch/src/test/resources/mappings/accounts2.json new file mode 100644 index 0000000000..d300b8c523 --- /dev/null +++ b/opensearch/src/test/resources/mappings/accounts2.json @@ -0,0 +1,93 @@ +{ + "accounts": { + "mappings": { + "_doc": { + "properties": { + "address": { + "type": "text" + }, + "age": { + "type": "integer" + }, + "balance": { + "type": "double" + }, + "city": { + "type": "keyword" + }, + "birthday": { + "type": "date" + }, + "location": { + "type": "geo_point" + }, + "new_field": { + "type": "some_new_es_type_outside_type_system" + }, + "field with spaces": { + "type": "text" + }, + "employer": { + "type": "text", + "fields": { + "raw": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "projects": { + "type": "nested", + "properties": { + "members": { + "type": "nested", + "properties": { + "name": { + "type": "text" + } + } + }, + "active": { + "type": "boolean" + }, + "release": { + "type": "date" + } + } + }, + "manager": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "address": { + "type": "keyword" + }, + "salary": { + "type": "long" + } + } + } + } + } + }, + "settings": { + "index": { + "number_of_shards": 5, + "number_of_replicas": 0, + "max_result_window": 100, + "version": { + "created": "6050399" + } + } + }, + "mapping_version": "1", + "settings_version": "1" + } +} \ No newline at end of file