From 3bf313f4c54d34f19ab2883db075e59596825b14 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Wed, 23 Aug 2023 10:09:40 -0700 Subject: [PATCH 1/4] Parameterize parent-join search tests and add coverage for sparse slice case Signed-off-by: Jay Deng --- .../AbstractParentChildTestCase.java | 44 +++++++++++++++++++ .../join/aggregations/ChildrenIT.java | 39 ++++++++++++++++ .../join/aggregations/ParentIT.java | 39 ++++++++++++++++ .../join/query/ChildQuerySearchIT.java | 23 ++++++++++ .../opensearch/join/query/InnerHitsIT.java | 23 ++++++++++ .../join/query/ParentChildTestCase.java | 7 ++- 6 files changed, 174 insertions(+), 1 deletion(-) diff --git a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/AbstractParentChildTestCase.java b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/AbstractParentChildTestCase.java index dac1b313777a6..e049edf843069 100644 --- a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/AbstractParentChildTestCase.java +++ b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/AbstractParentChildTestCase.java @@ -33,6 +33,11 @@ package org.opensearch.join.aggregations; import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.SearchRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Requests; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.join.query.ParentChildTestCase; import org.junit.Before; @@ -44,6 +49,7 @@ import java.util.Set; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; /** * Small base test-class which combines stuff used for Children and Parent aggregation tests @@ -52,6 +58,10 @@ public abstract class AbstractParentChildTestCase extends ParentChildTestCase { protected final Map categoryToControl = new HashMap<>(); protected final Map articleToControl = new HashMap<>(); + public AbstractParentChildTestCase(Settings dynamicSettings) { + super(dynamicSettings); + } + @Before public void setupCluster() throws Exception { assertAcked( @@ -154,4 +164,38 @@ private ParentControl(String category) { this.category = category; } } + + // Test when there is 1 child document and 1 parent document per segment. + public void testSparseSegments() throws InterruptedException { + assertAcked( + prepareCreate("sparse").setMapping( + addFieldMappings( + buildParentJoinFieldMappingFromSimplifiedDef("join_field", true, "article", "comment"), + "commenter", + "keyword", + "category", + "keyword" + ) + ) + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + + List requests = new ArrayList<>(); + requests.add(createIndexRequest("sparse", "article", "article-0", null, "category", List.of("0"))); + indexRandom(true, false, requests); + client().admin().indices().refresh(Requests.refreshRequest("sparse")).actionGet(); + requests = new ArrayList<>(); + requests.add(createIndexRequest("sparse", "comment", "comment-0", "article-0", "commenter", "0")); + indexRandom(true, false, requests); + + SearchResponse searchResponse = getSearchRequest().get(); + assertSearchResponse(searchResponse); + validateSpareSegmentsSearchResponse(searchResponse); + } + + abstract SearchRequestBuilder getSearchRequest(); + + abstract void validateSpareSegmentsSearchResponse(SearchResponse searchResponse); } diff --git a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ChildrenIT.java b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ChildrenIT.java index 72c502c616ff8..5fc0a202ae45e 100644 --- a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ChildrenIT.java +++ b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ChildrenIT.java @@ -31,13 +31,17 @@ package org.opensearch.join.aggregations; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.lucene.search.join.ScoreMode; import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.search.SearchHit; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.InternalAggregation; @@ -47,14 +51,18 @@ import org.opensearch.search.sort.SortOrder; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.index.query.QueryBuilders.termQuery; import static org.opensearch.join.aggregations.JoinAggregationBuilders.children; import static org.opensearch.join.query.JoinQueryBuilders.hasChildQuery; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.aggregations.AggregationBuilders.sum; import static org.opensearch.search.aggregations.AggregationBuilders.terms; import static org.opensearch.search.aggregations.AggregationBuilders.topHits; @@ -69,6 +77,23 @@ public class ChildrenIT extends AbstractParentChildTestCase { + public ChildrenIT(Settings settings) { + super(settings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } + public void testChildrenAggs() throws Exception { SearchResponse searchResponse = client().prepareSearch("test") .setQuery(matchQuery("randomized", true)) @@ -407,4 +432,18 @@ public void testPostCollectAllLeafReaders() throws Exception { children = parents.getBuckets().get(0).getAggregations().get("child_docs"); assertThat(children.getDocCount(), equalTo(2L)); } + + @Override + SearchRequestBuilder getSearchRequest() { + return client().prepareSearch("sparse") + .setSize(10000) + .setQuery(matchAllQuery()) + .addAggregation(children("to_comment", "comment").subAggregation(terms("commenters").field("commenter").size(10000))); + } + + @Override + void validateSpareSegmentsSearchResponse(SearchResponse searchResponse) { + Children children = searchResponse.getAggregations().get("to_comment"); + assertEquals(children.getDocCount(), 1); + } } diff --git a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ParentIT.java b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ParentIT.java index 351b0beec481b..e955da974d5be 100644 --- a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ParentIT.java +++ b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ParentIT.java @@ -32,12 +32,18 @@ package org.opensearch.join.aggregations; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.search.aggregations.Aggregation; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; import org.opensearch.search.aggregations.bucket.terms.Terms; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -47,8 +53,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.join.aggregations.JoinAggregationBuilders.parent; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.aggregations.AggregationBuilders.terms; import static org.opensearch.search.aggregations.AggregationBuilders.topHits; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; @@ -56,6 +64,23 @@ public class ParentIT extends AbstractParentChildTestCase { + public ParentIT(Settings settings) { + super(settings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } + public void testSimpleParentAgg() throws Exception { final SearchRequestBuilder searchRequest = client().prepareSearch("test") .setSize(10000) @@ -264,4 +289,18 @@ public void testTermsParentAggTerms() throws Exception { } } } + + @Override + SearchRequestBuilder getSearchRequest() { + return client().prepareSearch("sparse") + .setSize(10000) + .setQuery(matchAllQuery()) + .addAggregation(parent("to_article", "comment").subAggregation(terms("category").field("category").size(10000))); + } + + @Override + void validateSpareSegmentsSearchResponse(SearchResponse searchResponse) { + Parent parentAgg = searchResponse.getAggregations().get("to_article"); + assertEquals(parentAgg.getDocCount(), 1); + } } diff --git a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/ChildQuerySearchIT.java b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/ChildQuerySearchIT.java index 037af62427f14..c43d6352b26f8 100644 --- a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/ChildQuerySearchIT.java +++ b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/ChildQuerySearchIT.java @@ -31,6 +31,8 @@ package org.opensearch.join.query; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.lucene.search.join.ScoreMode; import org.opensearch.action.explain.ExplainResponse; import org.opensearch.action.index.IndexRequestBuilder; @@ -42,6 +44,7 @@ import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.IdsQueryBuilder; @@ -65,6 +68,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -87,6 +92,7 @@ import static org.opensearch.join.query.JoinQueryBuilders.hasChildQuery; import static org.opensearch.join.query.JoinQueryBuilders.hasParentQuery; import static org.opensearch.join.query.JoinQueryBuilders.parentId; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; @@ -100,6 +106,23 @@ public class ChildQuerySearchIT extends ParentChildTestCase { + public ChildQuerySearchIT(Settings settings) { + super(settings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } + public void testMultiLevelChild() throws Exception { assertAcked( prepareCreate("test").setMapping( diff --git a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/InnerHitsIT.java b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/InnerHitsIT.java index ffcc9cf38545f..39da86c7fd726 100644 --- a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/InnerHitsIT.java +++ b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/InnerHitsIT.java @@ -32,11 +32,15 @@ package org.opensearch.join.query; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.lucene.search.join.ScoreMode; import org.apache.lucene.util.ArrayUtil; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchPhaseExecutionException; import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexSettings; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.InnerHitBuilder; @@ -54,6 +58,7 @@ import org.opensearch.search.sort.SortOrder; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -73,6 +78,7 @@ import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.opensearch.join.query.JoinQueryBuilders.hasChildQuery; import static org.opensearch.join.query.JoinQueryBuilders.hasParentQuery; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; @@ -87,6 +93,23 @@ public class InnerHitsIT extends ParentChildTestCase { + public InnerHitsIT(Settings settings) { + super(settings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } + @Override protected Collection> nodePlugins() { ArrayList> plugins = new ArrayList<>(super.nodePlugins()); diff --git a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/ParentChildTestCase.java b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/ParentChildTestCase.java index f10b0b26a8669..8c19c0aafe763 100644 --- a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/ParentChildTestCase.java +++ b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/query/ParentChildTestCase.java @@ -41,6 +41,7 @@ import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedOpenSearchIntegTestCase; import java.io.IOException; import java.util.Arrays; @@ -50,7 +51,11 @@ import java.util.Map; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE) -public abstract class ParentChildTestCase extends OpenSearchIntegTestCase { +public abstract class ParentChildTestCase extends ParameterizedOpenSearchIntegTestCase { + + public ParentChildTestCase(Settings dynamicSettings) { + super(dynamicSettings); + } @Override protected boolean ignoreExternalCluster() { From d27b5c0637f4ba67bebbab0c8e172be080cbdcc2 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Wed, 23 Aug 2023 11:11:44 -0700 Subject: [PATCH 2/4] Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../search/DefaultSearchContext.java | 30 ++++++++++++++----- .../org/opensearch/search/SearchService.java | 8 +++-- .../aggregations/AggregatorFactories.java | 4 +++ .../aggregations/AggregatorFactory.java | 13 ++++++++ .../bucket/terms/TermsAggregatorFactory.java | 4 +++ .../internal/FilteredSearchContext.java | 4 +-- .../search/internal/SearchContext.java | 4 +-- .../query/QueryPhaseSearcherWrapper.java | 4 +-- .../opensearch/search/SearchServiceTests.java | 6 ++-- .../internal/ContextIndexSearcherTests.java | 4 +-- .../search/query/QueryPhaseTests.java | 4 +-- .../opensearch/test/TestSearchContext.java | 2 +- 13 files changed, 63 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 199461fd93cd7..d7b83a023d60a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -152,6 +152,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129)) - Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177)) - Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412)) +- Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index ef467d494f694..22a3614245842 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -65,6 +65,7 @@ import org.opensearch.index.search.NestedHelper; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; @@ -183,7 +184,8 @@ final class DefaultSearchContext extends SearchContext { private final QueryShardContext queryShardContext; private final FetchPhase fetchPhase; private final Function requestToAggReduceContextBuilder; - private final boolean useConcurrentSearch; + private final boolean concurrentSearchSettingsEnabled; + private boolean requestShouldUseConcurrentSearch; DefaultSearchContext( ReaderContext readerContext, @@ -214,14 +216,14 @@ final class DefaultSearchContext extends SearchContext { this.indexShard = readerContext.indexShard(); this.clusterService = clusterService; this.engineSearcher = readerContext.acquireSearcher("search"); - this.useConcurrentSearch = useConcurrentSearch(executor); + this.concurrentSearchSettingsEnabled = evaluateConcurrentSegmentSearchSettings(executor); this.searcher = new ContextIndexSearcher( engineSearcher.getIndexReader(), engineSearcher.getSimilarity(), engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy(), lowLevelCancellation, - useConcurrentSearch ? executor : null, + concurrentSearchSettingsEnabled ? executor : null, this ); this.relativeTimeSupplier = relativeTimeSupplier; @@ -879,8 +881,20 @@ public Profilers getProfilers() { * Returns concurrent segment search status for the search context */ @Override - public boolean isConcurrentSegmentSearchEnabled() { - return useConcurrentSearch; + public boolean shouldUseConcurrentSearch() { + // concurrentSearchSettingsEnabled is evaluated when searchContext is created + return concurrentSearchSettingsEnabled && requestShouldUseConcurrentSearch; + } + + public void evaluateRequestShouldUseConcurrentSearch() { + requestShouldUseConcurrentSearch = true; + if (aggregations() != null && aggregations().factories() != null) { + for (AggregatorFactory factory : aggregations().factories().getFactories()) { + if (factory.traverseChildFactories() == false) { + requestShouldUseConcurrentSearch = false; + } + } + } } public void setProfilers(Profilers profilers) { @@ -910,7 +924,7 @@ public ReaderContext readerContext() { @Override public InternalAggregation.ReduceContext partialOnShard() { InternalAggregation.ReduceContext rc = requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction(); - rc.setSliceLevel(isConcurrentSegmentSearchEnabled()); + rc.setSliceLevel(shouldUseConcurrentSearch()); return rc; } @@ -929,7 +943,7 @@ public BucketCollectorProcessor bucketCollectorProcessor() { * @return true: use concurrent search * false: otherwise */ - private boolean useConcurrentSearch(Executor concurrentSearchExecutor) { + private boolean evaluateConcurrentSegmentSearchSettings(Executor concurrentSearchExecutor) { if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH) && (clusterService != null) && (concurrentSearchExecutor != null)) { @@ -946,7 +960,7 @@ private boolean useConcurrentSearch(Executor concurrentSearchExecutor) { @Override public int getTargetMaxSliceCount() { - if (isConcurrentSegmentSearchEnabled() == false) { + if (shouldUseConcurrentSearch() == false) { throw new IllegalStateException("Target slice count should not be used when concurrent search is disabled"); } return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING); diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index fac461755acff..761a8efe1f669 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1283,9 +1283,6 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.minScore() != null) { context.minimumScore(source.minScore()); } - if (source.profile()) { - context.setProfilers(new Profilers(context.searcher(), context.isConcurrentSegmentSearchEnabled())); - } if (source.timeout() != null) { context.timeout(source.timeout()); } @@ -1419,6 +1416,11 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc final CollapseContext collapseContext = source.collapse().build(queryShardContext); context.collapse(collapseContext); } + context.evaluateRequestShouldUseConcurrentSearch(); + if (source.profile()) { + context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch())); + } + logger.info("Using concurrent search for request: " + context.shouldUseConcurrentSearch()); } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java index 81fd741e9139c..cb71131b6dfd7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java @@ -257,6 +257,10 @@ private AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; } + public AggregatorFactory[] getFactories() { + return factories; + } + /** * Create all aggregators so that they can be consumed with multiple * buckets. diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java index 05686f35c2166..53943f0097f34 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java @@ -114,4 +114,17 @@ public AggregatorFactory getParent() { public String getStatsSubtype() { return OTHER_SUBTYPE; } + + protected boolean supportsConcurrentSegmentSearch() { + return false; + } + + public boolean traverseChildFactories() { + for (AggregatorFactory aggregatorFactory : factories.getFactories()) { + if (aggregatorFactory.traverseChildFactories() == false) { + return false; + } + } + return supportsConcurrentSegmentSearch(); + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 62844b4499dba..a4d73bfd3e634 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -558,4 +558,8 @@ public String toString() { } } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java index 5cd25d3b71704..32de5fc9864ce 100644 --- a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java @@ -561,8 +561,8 @@ public BucketCollectorProcessor bucketCollectorProcessor() { } @Override - public boolean isConcurrentSegmentSearchEnabled() { - return in.isConcurrentSegmentSearchEnabled(); + public boolean shouldUseConcurrentSearch() { + return in.shouldUseConcurrentSearch(); } @Override diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 3320462727fce..590ce4b077cbc 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -399,7 +399,7 @@ public final void assignRescoreDocIds(RescoreDocIds rescoreDocIds) { /** * Returns concurrent segment search status for the search context */ - public boolean isConcurrentSegmentSearchEnabled() { + public boolean shouldUseConcurrentSearch() { return false; } @@ -407,7 +407,7 @@ public boolean isConcurrentSegmentSearchEnabled() { * Returns local bucket count thresholds based on concurrent segment search status */ public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) { - if (isConcurrentSegmentSearchEnabled()) { + if (shouldUseConcurrentSearch()) { return new LocalBucketCountThresholds(0, ArrayUtil.MAX_ARRAY_LENGTH - 1); } else { return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize()); diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java index 9336b490a5333..115f7503631c1 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java @@ -57,7 +57,7 @@ public boolean searchWith( boolean hasFilterCollector, boolean hasTimeout ) throws IOException { - if (searchContext.isConcurrentSegmentSearchEnabled()) { + if (searchContext.shouldUseConcurrentSearch()) { LOGGER.info("Using concurrent search over segments (experimental)"); return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); } else { @@ -72,7 +72,7 @@ public boolean searchWith( */ @Override public AggregationProcessor aggregationProcessor(SearchContext searchContext) { - if (searchContext.isConcurrentSegmentSearchEnabled()) { + if (searchContext.shouldUseConcurrentSearch()) { LOGGER.info("Using concurrent search over segments (experimental)"); return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext); } else { diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 4259ca9750ac7..d2b837b956e08 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1274,7 +1274,7 @@ public void testConcurrentSegmentSearchSearchContext() throws IOException { .get() .getSetting(index, IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()) ); - assertEquals(concurrentSearchEnabled, searchContext.isConcurrentSegmentSearchEnabled()); + assertEquals(concurrentSearchEnabled, searchContext.shouldUseConcurrentSearch()); // verify executor nullability with concurrent search enabled/disabled if (concurrentSearchEnabled) { assertNotNull(searchContext.searcher().getExecutor()); @@ -1328,7 +1328,7 @@ public void testConcurrentSegmentSearchIsSetOnceDuringContextCreation() throws I .get(); try (DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()))) { // verify concurrent search state in context - assertEquals(concurrentSearchSetting, searchContext.isConcurrentSegmentSearchEnabled()); + assertEquals(concurrentSearchSetting, searchContext.shouldUseConcurrentSearch()); // verify executor state in searcher assertEquals(concurrentSearchSetting, (searchContext.searcher().getExecutor() != null)); @@ -1342,7 +1342,7 @@ public void testConcurrentSegmentSearchIsSetOnceDuringContextCreation() throws I .get(); // verify that concurrent segment search is still set to same expected value for the context - assertEquals(concurrentSearchSetting, searchContext.isConcurrentSegmentSearchEnabled()); + assertEquals(concurrentSearchSetting, searchContext.shouldUseConcurrentSearch()); } } diff --git a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java index f8f1ff09a314c..b1f70dfce176c 100644 --- a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java +++ b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java @@ -383,7 +383,7 @@ public void testGetSlicesWithNonNullExecutorButCSDisabled() throws Exception { IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); - when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(false); + when(searchContext.shouldUseConcurrentSearch()).thenReturn(false); ContextIndexSearcher searcher = new ContextIndexSearcher( directoryReader, IndexSearcher.getDefaultSimilarity(), @@ -406,7 +406,7 @@ public void testGetSlicesWithNonNullExecutorButCSDisabled() throws Exception { mock(ExecutorService.class), searchContext ); - when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(true); + when(searchContext.shouldUseConcurrentSearch()).thenReturn(true); when(searchContext.getTargetMaxSliceCount()).thenReturn(4); int expectedSliceCount = 4; IndexSearcher.LeafSlice[] slices = searcher.slices(leaves); diff --git a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java index d773962987682..39126a607f968 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java @@ -1209,7 +1209,7 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); - when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(executor != null); + when(searchContext.shouldUseConcurrentSearch()).thenReturn(executor != null); if (executor != null) { when(searchContext.getTargetMaxSliceCount()).thenReturn(randomIntBetween(0, 2)); } else { @@ -1232,7 +1232,7 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); - when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(executor != null); + when(searchContext.shouldUseConcurrentSearch()).thenReturn(executor != null); if (executor != null) { when(searchContext.getTargetMaxSliceCount()).thenReturn(randomIntBetween(0, 2)); } else { diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index a26488feed6fd..dd4a05b67271c 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -632,7 +632,7 @@ public Profilers getProfilers() { * Returns concurrent segment search status for the search context */ @Override - public boolean isConcurrentSegmentSearchEnabled() { + public boolean shouldUseConcurrentSearch() { return concurrentSegmentSearchEnabled; } From de1e8c3e385d5b3c48dd9a236d376f11c8c98992 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Wed, 23 Aug 2023 13:30:36 -0700 Subject: [PATCH 3/4] Add supportsConcurrentSegmentSearch override for all AggregatorFactory concrete classes Signed-off-by: Jay Deng --- .../matrix/stats/MatrixStatsAggregatorFactory.java | 5 +++++ .../bucket/geogrid/GeoHashGridAggregatorFactory.java | 5 +++++ .../bucket/geogrid/GeoTileGridAggregatorFactory.java | 5 +++++ .../aggregations/metrics/GeoBoundsAggregatorFactory.java | 5 +++++ .../java/org/opensearch/join/aggregations/ParentIT.java | 2 +- .../join/aggregations/ChildrenAggregatorFactory.java | 5 +++++ .../join/aggregations/ParentAggregatorFactory.java | 5 +++++ .../org/opensearch/action/search/TransportSearchIT.java | 5 +++++ .../java/org/opensearch/search/DefaultSearchContext.java | 7 ++++--- .../bucket/adjacency/AdjacencyMatrixAggregatorFactory.java | 4 ++++ .../bucket/composite/CompositeAggregationFactory.java | 5 +++++ .../bucket/filter/FilterAggregatorFactory.java | 4 ++++ .../bucket/filter/FiltersAggregatorFactory.java | 4 ++++ .../bucket/global/GlobalAggregatorFactory.java | 5 +++++ .../histogram/AutoDateHistogramAggregatorFactory.java | 5 +++++ .../bucket/histogram/DateHistogramAggregatorFactory.java | 5 +++++ .../bucket/histogram/HistogramAggregatorFactory.java | 5 +++++ .../histogram/VariableWidthHistogramAggregatorFactory.java | 5 +++++ .../bucket/missing/MissingAggregatorFactory.java | 5 +++++ .../bucket/nested/NestedAggregatorFactory.java | 4 ++++ .../bucket/nested/ReverseNestedAggregatorFactory.java | 5 +++++ .../bucket/range/AbstractRangeAggregatorFactory.java | 5 +++++ .../bucket/range/BinaryRangeAggregatorFactory.java | 4 ++++ .../bucket/range/DateRangeAggregatorFactory.java | 4 ++++ .../bucket/range/GeoDistanceRangeAggregatorFactory.java | 5 +++++ .../aggregations/bucket/range/RangeAggregatorFactory.java | 5 +++++ .../bucket/sampler/DiversifiedAggregatorFactory.java | 5 +++++ .../bucket/sampler/SamplerAggregatorFactory.java | 4 ++++ .../bucket/terms/MultiTermsAggregationFactory.java | 5 +++++ .../bucket/terms/RareTermsAggregatorFactory.java | 5 +++++ .../bucket/terms/SignificantTermsAggregatorFactory.java | 5 +++++ .../bucket/terms/SignificantTextAggregatorFactory.java | 5 +++++ .../search/aggregations/metrics/AvgAggregatorFactory.java | 5 +++++ .../aggregations/metrics/CardinalityAggregatorFactory.java | 5 +++++ .../metrics/ExtendedStatsAggregatorFactory.java | 5 +++++ .../aggregations/metrics/GeoCentroidAggregatorFactory.java | 5 +++++ .../search/aggregations/metrics/MaxAggregatorFactory.java | 5 +++++ .../metrics/MedianAbsoluteDeviationAggregatorFactory.java | 5 +++++ .../search/aggregations/metrics/MinAggregatorFactory.java | 5 +++++ .../metrics/PercentileRanksAggregatorFactory.java | 5 +++++ .../aggregations/metrics/PercentilesAggregatorFactory.java | 5 +++++ .../metrics/ScriptedMetricAggregatorFactory.java | 5 +++++ .../aggregations/metrics/StatsAggregatorFactory.java | 5 +++++ .../search/aggregations/metrics/SumAggregatorFactory.java | 5 +++++ .../aggregations/metrics/TopHitsAggregatorFactory.java | 4 ++++ .../aggregations/metrics/ValueCountAggregatorFactory.java | 5 +++++ .../aggregations/metrics/WeightedAvgAggregatorFactory.java | 5 +++++ .../aggregations/DelayedShardAggregationBuilder.java | 5 +++++ .../opensearch/search/aggregations/AggregatorTestCase.java | 5 +++++ 49 files changed, 232 insertions(+), 4 deletions(-) diff --git a/modules/aggs-matrix-stats/src/main/java/org/opensearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java b/modules/aggs-matrix-stats/src/main/java/org/opensearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java index f7ab0db3c9607..24f74f3859157 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/opensearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java +++ b/modules/aggs-matrix-stats/src/main/java/org/opensearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java @@ -89,4 +89,9 @@ protected Aggregator doCreateInternal( } return new MatrixStatsAggregator(name, typedValuesSources, searchContext, parent, multiValueMode, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoHashGridAggregatorFactory.java b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoHashGridAggregatorFactory.java index 197ab2d99f114..60ee1973c1080 100644 --- a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoHashGridAggregatorFactory.java +++ b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoHashGridAggregatorFactory.java @@ -196,4 +196,9 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) { true ); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoTileGridAggregatorFactory.java b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoTileGridAggregatorFactory.java index d5a3919684345..54b82f9770b63 100644 --- a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoTileGridAggregatorFactory.java +++ b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoTileGridAggregatorFactory.java @@ -194,4 +194,9 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) { true ); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/metrics/GeoBoundsAggregatorFactory.java b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/metrics/GeoBoundsAggregatorFactory.java index 780f25ba3d7fb..fc9cce3cf98c1 100644 --- a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/metrics/GeoBoundsAggregatorFactory.java +++ b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/metrics/GeoBoundsAggregatorFactory.java @@ -89,4 +89,9 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register(GeoBoundsAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.GEOPOINT, GeoBoundsAggregator::new, true); builder.register(GeoBoundsAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.GEO_SHAPE, GeoBoundsGeoShapeAggregator::new, true); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ParentIT.java b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ParentIT.java index e955da974d5be..04703a65aa19d 100644 --- a/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ParentIT.java +++ b/modules/parent-join/src/internalClusterTest/java/org/opensearch/join/aggregations/ParentIT.java @@ -71,7 +71,7 @@ public ParentIT(Settings settings) { @ParametersFactory public static Collection parameters() { return Arrays.asList( - new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } ); } diff --git a/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ChildrenAggregatorFactory.java b/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ChildrenAggregatorFactory.java index 793b35111cfe2..bbca89fc56820 100644 --- a/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ChildrenAggregatorFactory.java +++ b/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ChildrenAggregatorFactory.java @@ -118,4 +118,9 @@ public String getStatsSubtype() { // Child Aggregation is registered in non-standard way, so it might return child's values type return OTHER_SUBTYPE; } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ParentAggregatorFactory.java b/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ParentAggregatorFactory.java index 40c07c8f53e20..008ae3106641d 100644 --- a/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ParentAggregatorFactory.java +++ b/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ParentAggregatorFactory.java @@ -118,4 +118,9 @@ public String getStatsSubtype() { // Parent Aggregation is registered in non-standard way return OTHER_SUBTYPE; } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return false; + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java index 2dffc393ef749..f0a3b5a5901ce 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/search/TransportSearchIT.java @@ -594,6 +594,11 @@ protected Aggregator createInternal( ) throws IOException { return new TestAggregator(name, parent, searchContext); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } }; } diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 22a3614245842..a9d23f4c60529 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -185,7 +185,7 @@ final class DefaultSearchContext extends SearchContext { private final FetchPhase fetchPhase; private final Function requestToAggReduceContextBuilder; private final boolean concurrentSearchSettingsEnabled; - private boolean requestShouldUseConcurrentSearch; + private boolean requestShouldUseConcurrentSearch = true; DefaultSearchContext( ReaderContext readerContext, @@ -882,12 +882,13 @@ public Profilers getProfilers() { */ @Override public boolean shouldUseConcurrentSearch() { - // concurrentSearchSettingsEnabled is evaluated when searchContext is created return concurrentSearchSettingsEnabled && requestShouldUseConcurrentSearch; } + /** + * Evaluate if parsed request supports concurrent segment search + */ public void evaluateRequestShouldUseConcurrentSearch() { - requestShouldUseConcurrentSearch = true; if (aggregations() != null && aggregations().factories() != null) { for (AggregatorFactory factory : aggregations().factories().getFactories()) { if (factory.traverseChildFactories() == false) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregatorFactory.java index fe1270e10c80e..99ffb563ba2a8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregatorFactory.java @@ -91,4 +91,8 @@ public Aggregator createInternal( return new AdjacencyMatrixAggregator(name, factories, separator, keys, weights, searchContext, parent, metadata); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java index 09691a69c75f4..2ff79fb623def 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java @@ -77,4 +77,9 @@ protected Aggregator createInternal( ) throws IOException { return new CompositeAggregator(name, factories, searchContext, parent, metadata, size, sources, afterKey); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FilterAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FilterAggregatorFactory.java index 4ab573cf0a6b6..55c841f5b9c04 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FilterAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FilterAggregatorFactory.java @@ -101,4 +101,8 @@ public Aggregator createInternal( return new FilterAggregator(name, () -> this.getWeight(), factories, searchContext, parent, cardinality, metadata); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregatorFactory.java index 795f81a08d8d5..35d968b789a21 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregatorFactory.java @@ -146,4 +146,8 @@ public Aggregator createInternal( ); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/global/GlobalAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/global/GlobalAggregatorFactory.java index 419ae9f16d9e6..47de1fcda29c9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/global/GlobalAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/global/GlobalAggregatorFactory.java @@ -82,4 +82,9 @@ public Aggregator createInternal( } return new GlobalAggregator(name, factories, searchContext, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java index 7434ef84ee92f..059b88c9475ed 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java @@ -124,4 +124,9 @@ protected Aggregator createUnmapped(SearchContext searchContext, Aggregator pare metadata ); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorFactory.java index dd74d83c665de..807ec1ab4e4b7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorFactory.java @@ -148,4 +148,9 @@ protected Aggregator createUnmapped(SearchContext searchContext, Aggregator pare metadata ); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java index 321c16cdba970..7506dcde23641 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java @@ -149,4 +149,9 @@ protected Aggregator createUnmapped(SearchContext searchContext, Aggregator pare metadata ); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java index d9d9a74eb958f..b846bf72ef4c5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorFactory.java @@ -116,4 +116,9 @@ protected Aggregator createUnmapped(SearchContext searchContext, Aggregator pare metadata ); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregatorFactory.java index cfa2bd3f7097c..3032d695a3ee2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregatorFactory.java @@ -85,4 +85,9 @@ protected MissingAggregator doCreateInternal( .getAggregator(MissingAggregationBuilder.REGISTRY_KEY, config) .build(name, factories, config, searchContext, parent, cardinality, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregatorFactory.java index ca1018795b518..a43d41882e475 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregatorFactory.java @@ -100,4 +100,8 @@ public InternalAggregation buildEmptyAggregation() { } } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/ReverseNestedAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/ReverseNestedAggregatorFactory.java index 27cd8a2688836..816f05052b6a2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/ReverseNestedAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/ReverseNestedAggregatorFactory.java @@ -83,6 +83,11 @@ public Aggregator createInternal( } } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } + /** * Unmapped class for reverse nested agg * diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java index bfd7845e7e16f..41f2768eb7544 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java @@ -122,4 +122,9 @@ protected Aggregator doCreateInternal( metadata ); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/BinaryRangeAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/BinaryRangeAggregatorFactory.java index 0ee440ecc8487..fc4b4273df703 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/BinaryRangeAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/BinaryRangeAggregatorFactory.java @@ -103,4 +103,8 @@ protected Aggregator doCreateInternal( .build(name, factories, config.getValuesSource(), config.format(), ranges, keyed, searchContext, parent, cardinality, metadata); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/DateRangeAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/DateRangeAggregatorFactory.java index d243a89c632d7..dcf6b84164991 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/DateRangeAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/DateRangeAggregatorFactory.java @@ -72,4 +72,8 @@ public DateRangeAggregatorFactory( ); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java index 3208d35c6a407..728f43094cf7e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java @@ -172,6 +172,11 @@ protected Aggregator doCreateInternal( ); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } + /** * The source location for the distance calculation * diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregatorFactory.java index 803bceaf57fb5..c58b2e881803c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/range/RangeAggregatorFactory.java @@ -73,4 +73,9 @@ public RangeAggregatorFactory( metadata ); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/sampler/DiversifiedAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/sampler/DiversifiedAggregatorFactory.java index 41ef823a375c0..5f81c76b69385 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/sampler/DiversifiedAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/sampler/DiversifiedAggregatorFactory.java @@ -159,4 +159,9 @@ public InternalAggregation buildEmptyAggregation() { } }; } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/sampler/SamplerAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/sampler/SamplerAggregatorFactory.java index fa98c799352a6..d3db8a66ee21f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/sampler/SamplerAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/sampler/SamplerAggregatorFactory.java @@ -73,4 +73,8 @@ public Aggregator createInternal( return new SamplerAggregator(name, shardSize, factories, searchContext, parent, metadata); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregationFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregationFactory.java index aa6da630aa9f3..7134999e4aa85 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregationFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregationFactory.java @@ -157,6 +157,11 @@ protected Aggregator createInternal( ); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } + /** * Supplier for internal values source * diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorFactory.java index 93b8eca370d46..b5f3abe89ac59 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorFactory.java @@ -237,6 +237,11 @@ protected Aggregator doCreateInternal( ); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } + /** * Execution mode for rare terms agg * diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index 54fb746b97ebb..f6802a58dfed2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -306,6 +306,11 @@ protected Aggregator doCreateInternal( ); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } + /** * The execution mode for the significant terms agg * diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java index 8acc69083dea4..81366c212c86c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java @@ -312,4 +312,9 @@ public void close() { Releasables.close(dupSequenceSpotters); } } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java index 75419b7c64b12..0a09fae1eaebe 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java @@ -90,4 +90,9 @@ protected Aggregator doCreateInternal( .getAggregator(AvgAggregationBuilder.REGISTRY_KEY, config) .build(name, config, searchContext, parent, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java index 47084436d3d4f..980667b45324e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java @@ -89,6 +89,11 @@ protected Aggregator doCreateInternal( .build(name, config, precision(), searchContext, parent, metadata); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } + private int precision() { return precisionThreshold == null ? HyperLogLogPlusPlus.DEFAULT_PRECISION diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ExtendedStatsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ExtendedStatsAggregatorFactory.java index 20203b22b2459..99b3d09517a1f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ExtendedStatsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ExtendedStatsAggregatorFactory.java @@ -94,4 +94,9 @@ protected Aggregator doCreateInternal( .getAggregator(ExtendedStatsAggregationBuilder.REGISTRY_KEY, config) .build(name, config, searchContext, parent, sigma, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/GeoCentroidAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/GeoCentroidAggregatorFactory.java index 1d450eeae98d8..a3fc91c6b62fb 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/GeoCentroidAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/GeoCentroidAggregatorFactory.java @@ -81,6 +81,11 @@ protected Aggregator doCreateInternal( .build(name, config, searchContext, parent, metadata); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } + static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register(GeoCentroidAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.GEOPOINT, GeoCentroidAggregator::new, true); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java index 96f1af94f2d07..4fe936c8b7797 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java @@ -90,4 +90,9 @@ protected Aggregator doCreateInternal( .getAggregator(MaxAggregationBuilder.REGISTRY_KEY, config) .build(name, config, searchContext, parent, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MedianAbsoluteDeviationAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MedianAbsoluteDeviationAggregatorFactory.java index 9776595d5a76d..3ef3c2afc7875 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MedianAbsoluteDeviationAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MedianAbsoluteDeviationAggregatorFactory.java @@ -95,4 +95,9 @@ protected Aggregator doCreateInternal( .getAggregator(MedianAbsoluteDeviationAggregationBuilder.REGISTRY_KEY, config) .build(name, config.getValuesSource(), config.format(), searchContext, parent, metadata, compression); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java index b117f70c81baf..58fbe5edefd12 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java @@ -90,4 +90,9 @@ protected Aggregator doCreateInternal( .getAggregator(MinAggregationBuilder.REGISTRY_KEY, config) .build(name, config, searchContext, parent, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/PercentileRanksAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/PercentileRanksAggregatorFactory.java index 19352d30a5177..d3c18bcad1a43 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/PercentileRanksAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/PercentileRanksAggregatorFactory.java @@ -111,4 +111,9 @@ protected Aggregator doCreateInternal( .getAggregator(PercentileRanksAggregationBuilder.REGISTRY_KEY, config) .build(name, config.getValuesSource(), searchContext, parent, percents, percentilesConfig, keyed, config.format(), metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/PercentilesAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/PercentilesAggregatorFactory.java index e249863e25313..148e26e038923 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/PercentilesAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/PercentilesAggregatorFactory.java @@ -103,4 +103,9 @@ protected Aggregator doCreateInternal( .getAggregator(PercentilesAggregationBuilder.REGISTRY_KEY, config) .build(name, config.getValuesSource(), searchContext, parent, percents, percentilesConfig, keyed, config.format(), metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ScriptedMetricAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ScriptedMetricAggregatorFactory.java index 5c831d60f75a8..58ef54ed64482 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ScriptedMetricAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ScriptedMetricAggregatorFactory.java @@ -124,6 +124,11 @@ public Aggregator createInternal( ); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } + private static Script deepCopyScript(Script script, SearchContext context, Map aggParams) { if (script != null) { Map params = mergeParams(aggParams, deepCopyParams(script.getParams(), context)); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregatorFactory.java index 0c10df174efa0..0e96e631044dd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregatorFactory.java @@ -90,4 +90,9 @@ protected Aggregator doCreateInternal( .getAggregator(StatsAggregationBuilder.REGISTRY_KEY, config) .build(name, config, searchContext, parent, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java index b3506ff958833..ef9b93920ba18 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java @@ -90,4 +90,9 @@ protected Aggregator doCreateInternal( .getAggregator(SumAggregationBuilder.REGISTRY_KEY, config) .build(name, config, searchContext, parent, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/TopHitsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/TopHitsAggregatorFactory.java index e312983cd6d24..ba371327c6893 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/TopHitsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/TopHitsAggregatorFactory.java @@ -155,4 +155,8 @@ public Aggregator createInternal( return new TopHitsAggregator(searchContext.fetchPhase(), subSearchContext, name, searchContext, parent, metadata); } + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java index feed42e911856..4a04dd2e0a932 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java @@ -84,4 +84,9 @@ protected Aggregator doCreateInternal( .getAggregator(ValueCountAggregationBuilder.REGISTRY_KEY, config) .build(name, config, searchContext, parent, metadata); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/WeightedAvgAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/WeightedAvgAggregatorFactory.java index 9a27e9801d5fe..111245cae99e5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/WeightedAvgAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/WeightedAvgAggregatorFactory.java @@ -95,4 +95,9 @@ protected Aggregator doCreateInternal( public String getStatsSubtype() { return configs.get(VALUE_FIELD.getPreferredName()).valueSourceType().typeName(); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } } diff --git a/test/external-modules/delayed-aggs/src/main/java/org/opensearch/search/aggregations/DelayedShardAggregationBuilder.java b/test/external-modules/delayed-aggs/src/main/java/org/opensearch/search/aggregations/DelayedShardAggregationBuilder.java index 02952eb7390dc..006632ca93925 100644 --- a/test/external-modules/delayed-aggs/src/main/java/org/opensearch/search/aggregations/DelayedShardAggregationBuilder.java +++ b/test/external-modules/delayed-aggs/src/main/java/org/opensearch/search/aggregations/DelayedShardAggregationBuilder.java @@ -137,6 +137,11 @@ protected Aggregator createInternal( } while (searchContext.getRelativeTimeInMillis() - start < delay.getMillis()); return factory.create(searchContext, parent, cardinality); } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } }; } diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 3a6147850f090..5c649f1dc832d 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -1059,6 +1059,11 @@ public InternalAggregation buildEmptyAggregation() { } }; } + + @Override + protected boolean supportsConcurrentSegmentSearch() { + return true; + } }; } From e19e3db69af2d7bd141439dabd9d67da28dc40b8 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Thu, 24 Aug 2023 18:45:36 -0700 Subject: [PATCH 4/4] Addressing feedback Signed-off-by: Jay Deng --- .../aggregations/ParentAggregatorFactory.java | 1 + .../opensearch/search/DefaultSearchContext.java | 17 ++++++++--------- .../org/opensearch/search/SearchService.java | 2 +- .../aggregations/AggregatorFactories.java | 9 +++++++-- .../search/aggregations/AggregatorFactory.java | 12 +++++------- .../opensearch/search/SearchServiceTests.java | 2 ++ 6 files changed, 24 insertions(+), 19 deletions(-) diff --git a/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ParentAggregatorFactory.java b/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ParentAggregatorFactory.java index 008ae3106641d..9a21cd1db3200 100644 --- a/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ParentAggregatorFactory.java +++ b/modules/parent-join/src/main/java/org/opensearch/join/aggregations/ParentAggregatorFactory.java @@ -121,6 +121,7 @@ public String getStatsSubtype() { @Override protected boolean supportsConcurrentSegmentSearch() { + // See https://github.com/opensearch-project/OpenSearch/issues/9316 return false; } } diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index a9d23f4c60529..ef8a6c9f36b0c 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -45,6 +45,7 @@ import org.opensearch.action.search.SearchType; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; +import org.opensearch.common.SetOnce; import org.opensearch.common.lease.Releasables; import org.opensearch.common.lucene.search.Queries; import org.opensearch.common.unit.TimeValue; @@ -65,7 +66,6 @@ import org.opensearch.index.search.NestedHelper; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; @@ -185,7 +185,7 @@ final class DefaultSearchContext extends SearchContext { private final FetchPhase fetchPhase; private final Function requestToAggReduceContextBuilder; private final boolean concurrentSearchSettingsEnabled; - private boolean requestShouldUseConcurrentSearch = true; + private final SetOnce requestShouldUseConcurrentSearch = new SetOnce<>(); DefaultSearchContext( ReaderContext readerContext, @@ -878,11 +878,12 @@ public Profilers getProfilers() { } /** - * Returns concurrent segment search status for the search context + * Returns concurrent segment search status for the search context. This should only be used after request parsing, during which requestShouldUseConcurrentSearch will be set. */ @Override public boolean shouldUseConcurrentSearch() { - return concurrentSearchSettingsEnabled && requestShouldUseConcurrentSearch; + assert requestShouldUseConcurrentSearch.get() != null : "requestShouldUseConcurrentSearch must be set"; + return concurrentSearchSettingsEnabled && Boolean.TRUE.equals(requestShouldUseConcurrentSearch.get()); } /** @@ -890,11 +891,9 @@ public boolean shouldUseConcurrentSearch() { */ public void evaluateRequestShouldUseConcurrentSearch() { if (aggregations() != null && aggregations().factories() != null) { - for (AggregatorFactory factory : aggregations().factories().getFactories()) { - if (factory.traverseChildFactories() == false) { - requestShouldUseConcurrentSearch = false; - } - } + requestShouldUseConcurrentSearch.set(aggregations().factories().allFactoriesSupportConcurrentSearch()); + } else { + requestShouldUseConcurrentSearch.set(true); } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 761a8efe1f669..a02f9601eb093 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1237,6 +1237,7 @@ private void processFailure(ReaderContext context, Exception exc) { private void parseSource(DefaultSearchContext context, SearchSourceBuilder source, boolean includeAggregations) { // nothing to parse... if (source == null) { + context.evaluateRequestShouldUseConcurrentSearch(); return; } SearchShardTarget shardTarget = context.shardTarget(); @@ -1420,7 +1421,6 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.profile()) { context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch())); } - logger.info("Using concurrent search for request: " + context.shouldUseConcurrentSearch()); } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java index cb71131b6dfd7..9b8ebe0b4e5e4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java @@ -257,8 +257,13 @@ private AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; } - public AggregatorFactory[] getFactories() { - return factories; + public boolean allFactoriesSupportConcurrentSearch() { + for (AggregatorFactory factory : factories) { + if (factory.supportsConcurrentSegmentSearch() == false || factory.evaluateChildFactories() == false) { + return false; + } + } + return true; } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java index 53943f0097f34..759d043743978 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java @@ -115,16 +115,14 @@ public String getStatsSubtype() { return OTHER_SUBTYPE; } + /** + * Implementation should override this method and return true if the Aggregator created by the factory works with concurrent segment search execution model + */ protected boolean supportsConcurrentSegmentSearch() { return false; } - public boolean traverseChildFactories() { - for (AggregatorFactory aggregatorFactory : factories.getFactories()) { - if (aggregatorFactory.traverseChildFactories() == false) { - return false; - } - } - return supportsConcurrentSegmentSearch(); + public boolean evaluateChildFactories() { + return factories.allFactoriesSupportConcurrentSearch(); } } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index d2b837b956e08..dc0eb62b9f0e5 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1274,6 +1274,7 @@ public void testConcurrentSegmentSearchSearchContext() throws IOException { .get() .getSetting(index, IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()) ); + searchContext.evaluateRequestShouldUseConcurrentSearch(); assertEquals(concurrentSearchEnabled, searchContext.shouldUseConcurrentSearch()); // verify executor nullability with concurrent search enabled/disabled if (concurrentSearchEnabled) { @@ -1328,6 +1329,7 @@ public void testConcurrentSegmentSearchIsSetOnceDuringContextCreation() throws I .get(); try (DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()))) { // verify concurrent search state in context + searchContext.evaluateRequestShouldUseConcurrentSearch(); assertEquals(concurrentSearchSetting, searchContext.shouldUseConcurrentSearch()); // verify executor state in searcher assertEquals(concurrentSearchSetting, (searchContext.searcher().getExecutor() != null));