diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/ResourceTrackerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/ResourceTrackerIT.java new file mode 100644 index 0000000000000..84b9a00a07a65 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/ResourceTrackerIT.java @@ -0,0 +1,202 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.stats; + +import org.junit.AfterClass; +import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.SearchRequestBuilder; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.metrics.ResourceTracker; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.shard.SearchOperationListener; +import org.opensearch.indices.IndicesRequestCache; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.SearchService; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.index.query.QueryBuilders.matchQuery; +import static org.opensearch.search.aggregations.AggregationBuilders.terms; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; + +@OpenSearchIntegTestCase.SuiteScopeTestCase +public class ResourceTrackerIT extends OpenSearchIntegTestCase { + + String index = "idx"; + + @Override + public void setupSuiteScopeCluster() throws Exception { + assertAcked( + client().admin() + .indices() + .prepareCreate(index) + .addMapping("type", "tag", "type=keyword", "value", "type=keyword") + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false) + ) + ); + + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Collections.singletonMap(SearchService.RESOURCE_TRACKING_SETTING.getKey(), true)) + ); + + addDocuments(index, 100); + } + + public void testToggleResourceTrackingSetting() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Collections.singletonMap(SearchService.RESOURCE_TRACKING_SETTING.getKey(), false)) + ); + + SearchRequestBuilder queryBuilder = client().prepareSearch(index).setQuery(new MatchQueryBuilder("tag", "tag99")).setSize(5); + assertSearchResponse(queryBuilder.get()); + assertNull(SearchResourceTrackerPlugin.queryTracker); + assertNull(SearchResourceTrackerPlugin.fetchTracker); + + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Collections.singletonMap(SearchService.RESOURCE_TRACKING_SETTING.getKey(), true)) + ); + assertSearchResponse(queryBuilder.get()); + assertNotNull(SearchResourceTrackerPlugin.queryTracker); + assertNotNull(SearchResourceTrackerPlugin.fetchTracker); + } + + public void testQueryPhaseMemoryTracking() { + // While exact memory usage per query is hard to assert and will vary by platform, + // this test runs 3 increasingly expensive queries and validates the ordering of memory usage + SearchRequestBuilder simpleMatchQuery = client().prepareSearch(index).setQuery(new MatchQueryBuilder("tag", "tag1")).setSize(0); + long simpleMatchMemory = getResourcesForQuery(simpleMatchQuery, false); + + SearchRequestBuilder termsAggQuery = client().prepareSearch(index).setSize(0).addAggregation(terms("values").field("value")); + long termsAggMemory = getResourcesForQuery(termsAggQuery, false); + assertTrue( + "termsAggregation memory consumption of [" + + termsAggMemory + + "] was found to be lower than simple match query [" + + simpleMatchMemory + + "]", + termsAggMemory > simpleMatchMemory + ); + + SearchRequestBuilder multiLevelAggQuery = client().prepareSearch(index) + .setSize(0) + .addAggregation(terms("values").field("value").subAggregation(terms("tagSub").field("tag"))); + long multiLevelAggMemory = getResourcesForQuery(multiLevelAggQuery, false); + assert multiLevelAggMemory > termsAggMemory; + } + + public void testFetchPhaseMemoryTracking() { + SearchRequestBuilder simpleMatchQuery5 = client().prepareSearch(index).setQuery(new MatchQueryBuilder("tag", "tag99")).setSize(5); + long simpleMatchMemory5 = getResourcesForQuery(simpleMatchQuery5, true); + + SearchRequestBuilder simpleMatchQuery100 = client().prepareSearch(index) + .setQuery(new MatchQueryBuilder("tag", "tag99")) + .setSize(50); + long simpleMatchMemory100 = getResourcesForQuery(simpleMatchQuery100, true); + assert simpleMatchMemory100 > simpleMatchMemory5; + + SearchRequestBuilder simpleMatchQuery300 = client().prepareSearch(index).setQuery(matchQuery("tag", "tag99")).setSize(100); + long simpleMatchMemory300 = getResourcesForQuery(simpleMatchQuery300, true); + assert simpleMatchMemory300 > simpleMatchMemory100; + } + + // Helper that ingests duplicate docs + private void addDocuments(String index, int count) throws Exception { + List builders = new ArrayList<>(); + for (int i = 0; i < count; i++) { + for (int j = 0; j < i; j++) { + builders.add( + client().prepareIndex(index, "type") + .setSource(jsonBuilder().startObject().field("value", "val-" + i).field("tag", "tag" + i).endObject()) + ); + } + } + indexRandom(true, builders); + } + + // helper that returns the memory allocated by a query + private long getResourcesForQuery(SearchRequestBuilder request, boolean fetchPhase) { + // we are doing some warm-up runs as first run uses showing higher memory + // TODO: why first run in test takes more memory - points towards caching but cache is being cleared after each run + // is there some other initialization overhead? buy a beer to anyone who can help answer this + for (int i = 0; i < 2; i++) { + assertSearchResponse(request.get()); + client().admin().indices().prepareClearCache("*").setFieldDataCache(true).setQueryCache(true).setRequestCache(true).execute(); + } + return fetchPhase ? SearchResourceTrackerPlugin.fetchMemorySnapshot : SearchResourceTrackerPlugin.queryMemorySnapshot; + } + + @AfterClass + public static void cleanup() throws Exception { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull(SearchService.RESOURCE_TRACKING_SETTING.getKey())) + ); + } + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(SearchResourceTrackerPlugin.class); + } + + // Helper plugin tracks the resource tracker against the last shard that was queried on the node + public static class SearchResourceTrackerPlugin extends Plugin { + public static ResourceTracker fetchTracker; + public static ResourceTracker queryTracker; + + // As query and fetch trackers are references which get reset, capture last snapshots here + public static long fetchMemorySnapshot; + public static long queryMemorySnapshot; + + @Override + public void onIndexModule(IndexModule indexModule) { + indexModule.addSearchOperationListener(new SearchOperationListener() { + @Override + public void onQueryPhase(SearchContext searchContext, long tookInNanos) { + queryTracker = searchContext.resourceTracker(); + if (queryTracker != null) { + queryMemorySnapshot = queryTracker.getMemoryAllocated(); + } + } + + @Override + public void onFetchPhase(SearchContext searchContext, long tookInNanos) { + fetchTracker = searchContext.resourceTracker(); + if (fetchTracker != null) { + fetchMemorySnapshot = fetchTracker.getMemoryAllocated(); + } + } + }); + super.onIndexModule(indexModule); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/metrics/ResourceTracker.java b/server/src/main/java/org/opensearch/common/metrics/ResourceTracker.java new file mode 100644 index 0000000000000..9d3edb5e7e4b9 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/metrics/ResourceTracker.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.metrics; + +import com.sun.management.ThreadMXBean; +import org.opensearch.common.SuppressForbidden; + +import java.lang.management.ManagementFactory; + +/* + Simple tracker for CPU consumption and memory allocated by the current thread +*/ +@SuppressForbidden(reason = "ThreadMXBean enables tracking resource consumption by a thread. " + + "It is platform dependent and i am not aware of an alternate mechanism to extract this info") +public class ResourceTracker { + + static ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean(); + long startingAllocatedBytes; + long startingCPUTime; + long memoryAllocated; + long cpuTime; + + public ResourceTracker() { + reset(); + } + + /** + * Takes current snapshot of resource usage by thread since the creation of this object + */ + public void updateMetrics() { + this.memoryAllocated = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()) - startingAllocatedBytes; + this.cpuTime = threadMXBean.getCurrentThreadCpuTime() - startingCPUTime; + } + + /** + * Wipes out local state and resets to a clean tracker + */ + public void reset() { + this.startingCPUTime = threadMXBean.getCurrentThreadCpuTime(); + this.startingAllocatedBytes = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()); + this.memoryAllocated = 0; + this.cpuTime = 0; + } + + /** + * Tracks CPU usage by the current thread + * @return cpu time in nanoseconds + */ + public long getCpuTime() { + return this.cpuTime; + } + + /** + * Returns memory allocated by the thread between object creation and last update of metrics + * @return memory allocated in bytes + */ + public long getMemoryAllocated() { + return this.memoryAllocated; + } + + @Override + public String toString() { + return "resource_tracker[memoryAllocatedBytes=" + memoryAllocated + ",cpuTime=" + cpuTime + "]"; + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7a189ebc261a6..df1c0efb3d77a 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -349,6 +349,7 @@ public void apply(Settings value, Settings current, Settings previous) { MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING, SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, + SearchService.RESOURCE_TRACKING_SETTING, ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 9ee1e6c15a2e7..afd04c41d8c9f 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -46,6 +46,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.lucene.search.Queries; +import org.opensearch.common.metrics.ResourceTracker; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.index.IndexService; @@ -119,6 +120,7 @@ final class DefaultSearchContext extends SearchContext { // terminate after count private int terminateAfter = DEFAULT_TERMINATE_AFTER; private List groupStats; + private ResourceTracker resourceTracker; private boolean explain; private boolean version = false; // by default, we don't return versions private boolean seqAndPrimaryTerm = false; @@ -757,6 +759,17 @@ public void groupStats(List groupStats) { this.groupStats = groupStats; } + @Override + @Nullable + public ResourceTracker resourceTracker() { + return this.resourceTracker; + } + + @Override + public void resourceTracker(ResourceTracker resourceTracker) { + this.resourceTracker = resourceTracker; + } + @Override public boolean version() { return version; diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 7dc993f4f1cd9..7fe0e589f8b5a 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -56,6 +56,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.metrics.ResourceTracker; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; @@ -213,6 +214,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + /** + * Enables the tracking of resource consumption per query. This is an experimental setting and underlyign implementation can change + */ + public static final Setting RESOURCE_TRACKING_SETTING = Setting.boolSetting( + "search.resource_tracking", + false, + Property.Dynamic, + Property.NodeScope + ); + public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; @@ -244,6 +255,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private volatile boolean lowLevelCancellation; + private volatile boolean resourceTrackingEnabled; + private volatile int maxOpenScrollContext; private final Cancellable keepAliveReaper; @@ -257,6 +270,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final AtomicInteger openScrollContexts = new AtomicInteger(); private final String sessionId = UUIDs.randomBase64UUID(); + public boolean isResourceTrackingEnabled() { + return resourceTrackingEnabled; + } + public SearchService( ClusterService clusterService, IndicesService indicesService, @@ -302,6 +319,9 @@ public SearchService( lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation); + + resourceTrackingEnabled = RESOURCE_TRACKING_SETTING.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(RESOURCE_TRACKING_SETTING, this::setResourceTracking); } private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) { @@ -348,6 +368,10 @@ private void setLowLevelCancellation(Boolean lowLevelCancellation) { this.lowLevelCancellation = lowLevelCancellation; } + private void setResourceTracking(Boolean resourceTrackingEnabled) { + this.resourceTrackingEnabled = resourceTrackingEnabled; + } + @Override public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRemovalReason reason) { // once an index is removed due to deletion or closing, we can just clean up all the pending search context information @@ -818,6 +842,9 @@ final SearchContext createContext( ) throws IOException { final DefaultSearchContext context = createSearchContext(readerContext, request, defaultSearchTimeout); try { + if (resourceTrackingEnabled) { + context.resourceTracker(new ResourceTracker()); + } if (request.scroll() != null) { context.scrollContext().scroll = request.scroll(); } @@ -1445,6 +1472,7 @@ private static final class SearchOperationListenerExecutor implements AutoClosea this.context = context; time = startTime; this.fetch = fetch; + resetResourceTracking(context); if (fetch) { listener.onPreFetchPhase(context); } else { @@ -1461,6 +1489,7 @@ public void close() { assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case"; if (closed == false) { closed = true; + updateResourceTracking(context); if (afterQueryTime != -1) { if (fetch) { listener.onFetchPhase(context, afterQueryTime - time); @@ -1476,5 +1505,26 @@ public void close() { } } } + + private void updateResourceTracking(SearchContext context) { + if (context.resourceTracker() != null) { + context.resourceTracker().updateMetrics(); + if (logger.isTraceEnabled()) { + logger.trace( + "Resources used by action: {} on query: {} for shard {} = {}", + context.getTask().getAction(), + context.request().source(), + context.indexShard(), + context.resourceTracker().toString() + ); + } + } + } + + private void resetResourceTracking(SearchContext context) { + if (context.resourceTracker() != null) { + context.resourceTracker().reset(); + } + } } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 7ff0eaed4be63..31c3a1480c70c 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -39,6 +39,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.metrics.ResourceTracker; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.index.cache.bitset.BitsetFilterCache; @@ -330,6 +331,11 @@ public final void assignRescoreDocIds(RescoreDocIds rescoreDocIds) { public abstract void groupStats(List groupStats); + @Nullable + public abstract ResourceTracker resourceTracker(); + + public abstract void resourceTracker(ResourceTracker resourceTracker); + public abstract boolean version(); public abstract void version(boolean version); diff --git a/server/src/main/java/org/opensearch/search/internal/SubSearchContext.java b/server/src/main/java/org/opensearch/search/internal/SubSearchContext.java index 842c6a514f112..593ca8c2532bf 100644 --- a/server/src/main/java/org/opensearch/search/internal/SubSearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SubSearchContext.java @@ -32,6 +32,7 @@ package org.opensearch.search.internal; import org.apache.lucene.search.Query; +import org.opensearch.common.metrics.ResourceTracker; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.ParsedQuery; import org.opensearch.search.aggregations.SearchContextAggregations; @@ -79,6 +80,7 @@ public class SubSearchContext extends FilteredSearchContext { private boolean trackScores; private boolean version; private boolean seqNoAndPrimaryTerm; + private ResourceTracker resourceTracker; public SubSearchContext(SearchContext context) { super(context); @@ -359,4 +361,14 @@ public FetchSearchResult fetchResult() { public long getRelativeTimeInMillis() { throw new UnsupportedOperationException("Not supported"); } + + @Override + public void resourceTracker(ResourceTracker resourceTracker) { + this.resourceTracker = resourceTracker; + } + + @Override + public ResourceTracker resourceTracker() { + return this.resourceTracker; + } } diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index a5c6eff15060d..6e7f54aa6c367 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -37,6 +37,7 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.search.SearchType; +import org.opensearch.common.metrics.ResourceTracker; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.index.IndexService; @@ -109,6 +110,7 @@ public class TestSearchContext extends SearchContext { private int terminateAfter = DEFAULT_TERMINATE_AFTER; private SearchContextAggregations aggregations; private ScrollContext scrollContext; + private ResourceTracker resourceTracker; private final long originNanoTime = System.nanoTime(); private final Map searchExtBuilders = new HashMap<>(); @@ -387,6 +389,16 @@ public SearchContext trackTotalHitsUpTo(int trackTotalHitsUpTo) { return this; } + @Override + public void resourceTracker(ResourceTracker resourceTracker) { + this.resourceTracker = resourceTracker; + } + + @Override + public ResourceTracker resourceTracker() { + return this.resourceTracker; + } + @Override public int trackTotalHitsUpTo() { return trackTotalHitsUpTo;