From cadf9748fe6cf4a3510ff8158738438ea9293c2f Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 15 Apr 2024 17:57:01 -0700 Subject: [PATCH] Add cluster setting to dynamically configure filter rewrite optimization (#13179) --------- Signed-off-by: bowenlan-amzn --- CHANGELOG.md | 18 ++++++++++ .../aggregations/bucket/FilterRewriteIT.java | 36 ++++++++++++++++++- .../common/settings/ClusterSettings.java | 1 + .../search/DefaultSearchContext.java | 15 ++++++++ .../org/opensearch/search/SearchService.java | 9 +++++ .../bucket/FastFilterRewriteHelper.java | 13 ++++--- .../search/internal/SearchContext.java | 4 +++ 7 files changed, 90 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 607a46ea79a8c..057a998e332d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,24 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.12.x] ### Added +- Constant Keyword Field ([#12285](https://github.com/opensearch-project/OpenSearch/pull/12285)) +- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818)) +- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768)) +- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865)) +- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697)) +- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954)) +- [Tiered Caching] Make took time caching policy setting dynamic ([#13063](https://github.com/opensearch-project/OpenSearch/pull/13063)) +- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569)) +- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044)) +- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656)) +- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704)) +- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686)) +- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967)) +- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531)) +- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868)) +- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) +- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) +- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/FilterRewriteIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/FilterRewriteIT.java index 35e024603f0f2..0789e771a25da 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/FilterRewriteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/FilterRewriteIT.java @@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.settings.Settings; @@ -34,6 +35,7 @@ import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS; import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -93,7 +95,7 @@ public void testMinDocCountOnDateHistogram() throws Exception { final SearchResponse allResponse = client().prepareSearch("idx") .setSize(0) .setQuery(QUERY) - .addAggregation(dateHistogram("histo").field("date").dateHistogramInterval(DateHistogramInterval.DAY).minDocCount(0)) + .addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0)) .get(); final Histogram allHisto = allResponse.getAggregations().get("histo"); @@ -104,4 +106,36 @@ public void testMinDocCountOnDateHistogram() throws Exception { assertEquals(entry.getValue(), results.get(entry.getKey())); } } + + public void testDisableOptimizationGivesSameResults() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .setSize(0) + .addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0)) + .get(); + + final Histogram allHisto1 = response.getAggregations().get("histo"); + + final ClusterUpdateSettingsResponse updateSettingResponse = client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(MAX_AGGREGATION_REWRITE_FILTERS.getKey(), 0)) + .get(); + + assertEquals(updateSettingResponse.getTransientSettings().get(MAX_AGGREGATION_REWRITE_FILTERS.getKey()), "0"); + + response = client().prepareSearch("idx") + .setSize(0) + .addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0)) + .get(); + + final Histogram allHisto2 = response.getAggregations().get("histo"); + + assertEquals(allHisto1, allHisto2); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().putNull(MAX_AGGREGATION_REWRITE_FILTERS.getKey())) + .get(); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index e79112750f69d..9ff2f8760f1b2 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -516,6 +516,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.MAX_OPEN_SCROLL_CONTEXT, SearchService.MAX_OPEN_PIT_CONTEXT, SearchService.MAX_PIT_KEEPALIVE_SETTING, + SearchService.MAX_AGGREGATION_REWRITE_FILTERS, CreatePitController.PIT_INIT_KEEP_ALIVE, Node.WRITE_PORTS_FILE_SETTING, Node.NODE_NAME_SETTING, diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 498d763c00db5..cd836163c5f13 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -107,6 +107,7 @@ import java.util.function.LongSupplier; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS; /** * The main search context used during search phase @@ -185,6 +186,7 @@ final class DefaultSearchContext extends SearchContext { private final Function requestToAggReduceContextBuilder; private final boolean concurrentSearchSettingsEnabled; private final SetOnce requestShouldUseConcurrentSearch = new SetOnce<>(); + private final int maxAggRewriteFilters; DefaultSearchContext( ReaderContext readerContext, @@ -238,6 +240,8 @@ final class DefaultSearchContext extends SearchContext { queryBoost = request.indexBoost(); this.lowLevelCancellation = lowLevelCancellation; this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder; + + this.maxAggRewriteFilters = evaluateFilterRewriteSetting(); } @Override @@ -977,4 +981,15 @@ public int getTargetMaxSliceCount() { return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING); } + @Override + public int maxAggRewriteFilters() { + return maxAggRewriteFilters; + } + + private int evaluateFilterRewriteSetting() { + if (clusterService != null) { + return clusterService.getClusterSettings().get(MAX_AGGREGATION_REWRITE_FILTERS); + } + return 0; + } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 572ef2034ba38..7f11283c9af80 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -270,6 +270,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + // value 0 means rewrite filters optimization in aggregations will be disabled + public static final Setting MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting( + "search.max_aggregation_rewrite_filters", + 72, + 0, + Property.Dynamic, + Property.NodeScope + ); + public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index ea2cb36add9b2..80816c026b1e3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -68,7 +68,6 @@ private FastFilterRewriteHelper() {} private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class); - private static final int MAX_NUM_FILTER_BUCKETS = 1024; private static final Map, Function> queryWrappers; // Initialize the wrapper map for unwrapping the query @@ -191,8 +190,9 @@ private static Weight[] createFilterForAggregations( int bucketCount = 0; while (roundedLow <= fieldType.convertNanosToMillis(high)) { bucketCount++; - if (bucketCount > MAX_NUM_FILTER_BUCKETS) { - logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS); + int maxNumFilterBuckets = context.maxAggRewriteFilters(); + if (bucketCount > maxNumFilterBuckets) { + logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets); return null; } // Below rounding is needed as the interval could return in @@ -272,6 +272,8 @@ public void setAggregationType(AggregationType aggregationType) { } public boolean isRewriteable(final Object parent, final int subAggLength) { + if (context.maxAggRewriteFilters() == 0) return false; + boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength); logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); this.rewriteable = rewriteable; @@ -588,8 +590,9 @@ private static long[][] createRangesFromAgg( int bucketCount = 0; while (roundedLow <= fieldType.convertNanosToMillis(high)) { bucketCount++; - if (bucketCount > MAX_NUM_FILTER_BUCKETS) { - logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS); + int maxNumFilterBuckets = context.maxAggRewriteFilters(); + if (bucketCount > maxNumFilterBuckets) { + logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets); return null; } // Below rounding is needed as the interval could return in diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 57f5dc955a5da..ff45c44216b75 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -489,4 +489,8 @@ public String toString() { public abstract boolean shouldUseTimeSeriesDescSortOptimization(); public abstract int getTargetMaxSliceCount(); + + public int maxAggRewriteFilters() { + return 0; + } }