From 93cc736cbd293a14c6eae83df2aeba8458335134 Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Wed, 19 Jul 2023 23:36:42 -0700 Subject: [PATCH 1/4] Add support for aggregation profiler with concurrent aggregation (#8801) Signed-off-by: Ticheng Lin --- CHANGELOG.md | 1 + .../aggregation/AggregationProfilerIT.java | 167 ++++++++++++-- .../search/internal/ContextIndexSearcher.java | 4 + .../profile/AbstractProfileBreakdown.java | 14 +- .../search/profile/ProfileResult.java | 103 ++++++++- .../opensearch/search/profile/Profilers.java | 7 +- .../org/opensearch/search/profile/Timer.java | 13 +- .../AggregationProfileBreakdown.java | 15 ++ .../aggregation/AggregationProfiler.java | 23 +- .../ConcurrentAggregationProfiler.java | 216 ++++++++++++++++++ .../InternalAggregationProfileTree.java | 4 + .../search/profile/ProfileResultTests.java | 20 ++ .../opensearch/search/profile/TimerTests.java | 6 +- .../ConcurrentAggregationProfilerTests.java | 207 +++++++++++++++++ 14 files changed, 754 insertions(+), 46 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java create mode 100644 server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 861674753861a..0158523877eea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -100,6 +100,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Exclude 'benchmarks' from codecov report ([#8805](https://github.com/opensearch-project/OpenSearch/pull/8805)) - [Refactor] MediaTypeParser to MediaTypeParserRegistry ([#8636](https://github.com/opensearch-project/OpenSearch/pull/8636)) - Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807)) +- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java index 0f08c537d74d8..95350bdb012e0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java @@ -90,7 +90,52 @@ public class AggregationProfilerIT extends OpenSearchIntegTestCase { COLLECT + "_count", POST_COLLECTION + "_count", BUILD_AGGREGATION + "_count", - REDUCE + "_count" + REDUCE + "_count", + INITIALIZE + "_startTime", + BUILD_LEAF_COLLECTOR + "_startTime", + COLLECT + "_startTime", + POST_COLLECTION + "_startTime", + BUILD_AGGREGATION + "_startTime", + REDUCE + "_startTime" + ); + + private static final Set CONCURRENT_SEARCH_BREAKDOWN_KEYS = Set.of( + INITIALIZE, + BUILD_LEAF_COLLECTOR, + COLLECT, + POST_COLLECTION, + BUILD_AGGREGATION, + REDUCE, + INITIALIZE + "_count", + BUILD_LEAF_COLLECTOR + "_count", + COLLECT + "_count", + POST_COLLECTION + "_count", + BUILD_AGGREGATION + "_count", + REDUCE + "_count", + "max_" + INITIALIZE, + "max_" + BUILD_LEAF_COLLECTOR, + "max_" + COLLECT, + "max_" + POST_COLLECTION, + "max_" + BUILD_AGGREGATION, + "max_" + REDUCE, + "min_" + INITIALIZE, + "min_" + BUILD_LEAF_COLLECTOR, + "min_" + COLLECT, + "min_" + POST_COLLECTION, + "min_" + BUILD_AGGREGATION, + "min_" + REDUCE, + "avg_" + INITIALIZE, + "avg_" + BUILD_LEAF_COLLECTOR, + "avg_" + COLLECT, + "avg_" + POST_COLLECTION, + "avg_" + BUILD_AGGREGATION, + "avg_" + REDUCE, + "max_" + BUILD_LEAF_COLLECTOR + "_count", + "max_" + COLLECT + "_count", + "min_" + BUILD_LEAF_COLLECTOR + "_count", + "min_" + COLLECT + "_count", + "avg_" + BUILD_LEAF_COLLECTOR + "_count", + "avg_" + COLLECT + "_count" ); private static final String TOTAL_BUCKETS = "total_buckets"; @@ -169,7 +214,11 @@ public void testSimpleProfile() { assertThat(histoAggResult.getTime(), greaterThan(0L)); Map breakdown = histoAggResult.getTimeBreakdown(); assertThat(breakdown, notNullValue()); - assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (histoAggResult.isConcurrent()) { + assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(breakdown.get(INITIALIZE), greaterThan(0L)); assertThat(breakdown.get(COLLECT), greaterThan(0L)); assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L)); @@ -212,7 +261,11 @@ public void testMultiLevelProfile() { assertThat(histoAggResult.getTime(), greaterThan(0L)); Map histoBreakdown = histoAggResult.getTimeBreakdown(); assertThat(histoBreakdown, notNullValue()); - assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (histoAggResult.isConcurrent()) { + assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(histoBreakdown.get(COLLECT), greaterThan(0L)); assertThat(histoBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); @@ -230,7 +283,11 @@ public void testMultiLevelProfile() { assertThat(termsAggResult.getTime(), greaterThan(0L)); Map termsBreakdown = termsAggResult.getTimeBreakdown(); assertThat(termsBreakdown, notNullValue()); - assertThat(termsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (termsAggResult.isConcurrent()) { + assertThat(termsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(termsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(termsBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(termsBreakdown.get(COLLECT), greaterThan(0L)); assertThat(termsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); @@ -245,7 +302,11 @@ public void testMultiLevelProfile() { assertThat(avgAggResult.getTime(), greaterThan(0L)); Map avgBreakdown = termsAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (avgAggResult.isConcurrent()) { + assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(avgBreakdown.get(COLLECT), greaterThan(0L)); assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); @@ -298,7 +359,11 @@ public void testMultiLevelProfileBreadthFirst() { assertThat(histoAggResult.getTime(), greaterThan(0L)); Map histoBreakdown = histoAggResult.getTimeBreakdown(); assertThat(histoBreakdown, notNullValue()); - assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (histoAggResult.isConcurrent()) { + assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(histoBreakdown.get(COLLECT), greaterThan(0L)); assertThat(histoBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); @@ -316,7 +381,11 @@ public void testMultiLevelProfileBreadthFirst() { assertThat(termsAggResult.getTime(), greaterThan(0L)); Map termsBreakdown = termsAggResult.getTimeBreakdown(); assertThat(termsBreakdown, notNullValue()); - assertThat(termsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (termsAggResult.isConcurrent()) { + assertThat(termsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(termsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(termsBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(termsBreakdown.get(COLLECT), greaterThan(0L)); assertThat(termsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); @@ -331,7 +400,11 @@ public void testMultiLevelProfileBreadthFirst() { assertThat(avgAggResult.getTime(), greaterThan(0L)); Map avgBreakdown = avgAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (avgAggResult.isConcurrent()) { + assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(avgBreakdown.get(COLLECT), greaterThan(0L)); assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L)); @@ -369,7 +442,11 @@ public void testDiversifiedAggProfile() { assertThat(diversifyAggResult.getTime(), greaterThan(0L)); Map diversifyBreakdown = diversifyAggResult.getTimeBreakdown(); assertThat(diversifyBreakdown, notNullValue()); - assertThat(diversifyBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (diversifyAggResult.isConcurrent()) { + assertThat(diversifyBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(diversifyBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(diversifyBreakdown.get(COLLECT), greaterThan(0L)); @@ -386,7 +463,11 @@ public void testDiversifiedAggProfile() { assertThat(maxAggResult.getTime(), greaterThan(0L)); Map maxBreakdown = maxAggResult.getTimeBreakdown(); assertThat(maxBreakdown, notNullValue()); - assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (maxAggResult.isConcurrent()) { + assertThat(maxBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(diversifyBreakdown.get(COLLECT), greaterThan(0L)); @@ -439,7 +520,11 @@ public void testComplexProfile() { assertThat(histoAggResult.getTime(), greaterThan(0L)); Map histoBreakdown = histoAggResult.getTimeBreakdown(); assertThat(histoBreakdown, notNullValue()); - assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (histoAggResult.isConcurrent()) { + assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(histoBreakdown.get(COLLECT), greaterThan(0L)); @@ -462,7 +547,11 @@ public void testComplexProfile() { assertThat(tagsAggResult.getTime(), greaterThan(0L)); Map tagsBreakdown = tagsAggResult.getTimeBreakdown(); assertThat(tagsBreakdown, notNullValue()); - assertThat(tagsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (tagsAggResult.isConcurrent()) { + assertThat(tagsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(tagsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(tagsBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(tagsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L)); @@ -482,7 +571,11 @@ public void testComplexProfile() { assertThat(avgAggResult.getTime(), greaterThan(0L)); Map avgBreakdown = avgAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (avgAggResult.isConcurrent()) { + assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(avgBreakdown.get(COLLECT), greaterThan(0L)); @@ -498,7 +591,11 @@ public void testComplexProfile() { assertThat(maxAggResult.getTime(), greaterThan(0L)); Map maxBreakdown = maxAggResult.getTimeBreakdown(); assertThat(maxBreakdown, notNullValue()); - assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (maxAggResult.isConcurrent()) { + assertThat(maxBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(maxBreakdown.get(COLLECT), greaterThan(0L)); @@ -514,7 +611,11 @@ public void testComplexProfile() { assertThat(stringsAggResult.getTime(), greaterThan(0L)); Map stringsBreakdown = stringsAggResult.getTimeBreakdown(); assertThat(stringsBreakdown, notNullValue()); - assertThat(stringsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (stringsAggResult.isConcurrent()) { + assertThat(stringsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(stringsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(stringsBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(stringsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(stringsBreakdown.get(COLLECT), greaterThan(0L)); @@ -534,7 +635,11 @@ public void testComplexProfile() { assertThat(avgAggResult.getTime(), greaterThan(0L)); avgBreakdown = avgAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (avgAggResult.isConcurrent()) { + assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(avgBreakdown.get(COLLECT), greaterThan(0L)); @@ -550,7 +655,11 @@ public void testComplexProfile() { assertThat(maxAggResult.getTime(), greaterThan(0L)); maxBreakdown = maxAggResult.getTimeBreakdown(); assertThat(maxBreakdown, notNullValue()); - assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (maxAggResult.isConcurrent()) { + assertThat(maxBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(maxBreakdown.get(COLLECT), greaterThan(0L)); @@ -567,7 +676,11 @@ public void testComplexProfile() { assertThat(tagsAggResult.getTime(), greaterThan(0L)); tagsBreakdown = tagsAggResult.getTimeBreakdown(); assertThat(tagsBreakdown, notNullValue()); - assertThat(tagsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (tagsAggResult.isConcurrent()) { + assertThat(tagsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(tagsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(tagsBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(tagsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L)); @@ -587,7 +700,11 @@ public void testComplexProfile() { assertThat(avgAggResult.getTime(), greaterThan(0L)); avgBreakdown = avgAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (avgAggResult.isConcurrent()) { + assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(avgBreakdown.get(COLLECT), greaterThan(0L)); @@ -603,7 +720,11 @@ public void testComplexProfile() { assertThat(maxAggResult.getTime(), greaterThan(0L)); maxBreakdown = maxAggResult.getTimeBreakdown(); assertThat(maxBreakdown, notNullValue()); - assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + if (maxAggResult.isConcurrent()) { + assertThat(maxBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + } else { + assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + } assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); assertThat(maxBreakdown.get(COLLECT), greaterThan(0L)); @@ -700,7 +821,11 @@ public void testGlobalAggWithStatsSubAggregatorProfile() { assertThat(globalAggResult.getTime(), greaterThan(0L)); Map breakdown = globalAggResult.getTimeBreakdown(); assertThat(breakdown, notNullValue()); - assertEquals(BREAKDOWN_KEYS, breakdown.keySet()); + if (globalAggResult.isConcurrent()) { + assertEquals(CONCURRENT_SEARCH_BREAKDOWN_KEYS, breakdown.keySet()); + } else { + assertEquals(BREAKDOWN_KEYS, breakdown.keySet()); + } assertThat(breakdown.get(INITIALIZE), greaterThan(0L)); assertThat(breakdown.get(COLLECT), greaterThan(0L)); assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L)); diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index 5384b47cc69ec..d2eda91b48c69 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -449,6 +449,10 @@ public DirectoryReader getDirectoryReader() { return (DirectoryReader) reader; } + public SearchContext getSearchContext() { + return searchContext; + } + private static class MutableQueryTimeout implements ExitableDirectoryReader.QueryCancellation { private final Set runnables = new HashSet<>(); diff --git a/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java index a29d4f9a0ee20..d8590d06ed935 100644 --- a/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java @@ -50,8 +50,8 @@ public abstract class AbstractProfileBreakdown> { /** * The accumulated timings for this query node */ - private final Timer[] timings; - private final T[] timingTypes; + public final Timer[] timings; + public final T[] timingTypes; /** Sole constructor. */ public AbstractProfileBreakdown(Class clazz) { @@ -80,11 +80,11 @@ public Map toBreakdownMap() { /** * Build a timing count breakdown for arbitrary instance */ - protected final Map buildBreakdownMap(AbstractProfileBreakdown breakdown) { - Map map = new HashMap<>(breakdown.timings.length * 2); + protected Map buildBreakdownMap(AbstractProfileBreakdown breakdown) { + Map map = new HashMap<>(breakdown.timings.length * 3); for (T timingType : breakdown.timingTypes) { map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming()); - map.put(timingType.toString() + "_count", breakdown.timings[timingType.ordinal()].getCount()); + map.put(timingType + "_count", breakdown.timings[timingType.ordinal()].getCount()); } return Collections.unmodifiableMap(map); } @@ -103,4 +103,8 @@ public final long toNodeTime() { } return total; } + + public final long toNodeStartTime() { + return timings[timingTypes[0].ordinal()].getEarliestTimerStartTime(); + } } diff --git a/server/src/main/java/org/opensearch/search/profile/ProfileResult.java b/server/src/main/java/org/opensearch/search/profile/ProfileResult.java index 89c3d7504de66..0d90d03e28bf1 100644 --- a/server/src/main/java/org/opensearch/search/profile/ProfileResult.java +++ b/server/src/main/java/org/opensearch/search/profile/ProfileResult.java @@ -32,6 +32,7 @@ package org.opensearch.search.profile; +import org.opensearch.Version; import org.opensearch.core.ParseField; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -44,8 +45,10 @@ import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.LinkedHashMap; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -69,7 +72,13 @@ public final class ProfileResult implements Writeable, ToXContentObject { static final ParseField BREAKDOWN = new ParseField("breakdown"); static final ParseField DEBUG = new ParseField("debug"); static final ParseField NODE_TIME = new ParseField("time"); + static final ParseField MAX_SLICE_NODE_TIME = new ParseField("max_slice_time"); + static final ParseField MIN_SLICE_NODE_TIME = new ParseField("min_slice_time"); + static final ParseField AVG_SLICE_NODE_TIME = new ParseField("avg_slice_time"); static final ParseField NODE_TIME_RAW = new ParseField("time_in_nanos"); + static final ParseField MAX_SLICE_NODE_TIME_RAW = new ParseField("max_slice_time_in_nanos"); + static final ParseField MIN_SLICE_NODE_TIME_RAW = new ParseField("min_slice_time_in_nanos"); + static final ParseField AVG_SLICE_NODE_TIME_RAW = new ParseField("avg_slice_time_in_nanos"); static final ParseField CHILDREN = new ParseField("children"); private final String type; @@ -77,7 +86,11 @@ public final class ProfileResult implements Writeable, ToXContentObject { private final Map breakdown; private final Map debug; private final long nodeTime; + private final long maxSliceNodeTime; + private final long minSliceNodeTime; + private final long avgSliceNodeTime; private final List children; + private final boolean concurrent; public ProfileResult( String type, @@ -86,6 +99,21 @@ public ProfileResult( Map debug, long nodeTime, List children + ) { + this(type, description, breakdown, debug, nodeTime, children, false, -1, -1, -1); + } + + public ProfileResult( + String type, + String description, + Map breakdown, + Map debug, + long nodeTime, + List children, + boolean concurrent, + long maxSliceNodeTime, + long minSliceNodeTime, + long avgSliceNodeTime ) { this.type = type; this.description = description; @@ -93,6 +121,10 @@ public ProfileResult( this.debug = debug == null ? Map.of() : debug; this.children = children == null ? List.of() : children; this.nodeTime = nodeTime; + this.concurrent = concurrent; + this.maxSliceNodeTime = maxSliceNodeTime; + this.minSliceNodeTime = minSliceNodeTime; + this.avgSliceNodeTime = avgSliceNodeTime; } /** @@ -105,6 +137,23 @@ public ProfileResult(StreamInput in) throws IOException { breakdown = in.readMap(StreamInput::readString, StreamInput::readLong); debug = in.readMap(StreamInput::readString, StreamInput::readGenericValue); children = in.readList(ProfileResult::new); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.concurrent = in.readBoolean(); + if (concurrent) { + this.maxSliceNodeTime = in.readLong(); + this.minSliceNodeTime = in.readLong(); + this.avgSliceNodeTime = in.readLong(); + } else { + this.maxSliceNodeTime = -1; + this.minSliceNodeTime = -1; + this.avgSliceNodeTime = -1; + } + } else { + this.concurrent = false; + this.maxSliceNodeTime = -1; + this.minSliceNodeTime = -1; + this.avgSliceNodeTime = -1; + } } @Override @@ -115,6 +164,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(breakdown, StreamOutput::writeString, StreamOutput::writeLong); out.writeMap(debug, StreamOutput::writeString, StreamOutput::writeGenericValue); out.writeList(children); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeBoolean(concurrent); + if (concurrent) { + out.writeLong(maxSliceNodeTime); + out.writeLong(minSliceNodeTime); + out.writeLong(avgSliceNodeTime); + } + } } /** @@ -154,6 +211,22 @@ public long getTime() { return nodeTime; } + public long getMaxSliceTime() { + return maxSliceNodeTime; + } + + public long getMinSliceTime() { + return minSliceNodeTime; + } + + public long getAvgSliceTime() { + return avgSliceNodeTime; + } + + public boolean isConcurrent() { + return concurrent; + } + /** * Returns a list of all profiled children queries */ @@ -168,9 +241,19 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(DESCRIPTION.getPreferredName(), description); if (builder.humanReadable()) { builder.field(NODE_TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString()); + if (concurrent) { + builder.field(MAX_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMaxSliceTime(), TimeUnit.NANOSECONDS).toString()); + builder.field(MIN_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMinSliceTime(), TimeUnit.NANOSECONDS).toString()); + builder.field(AVG_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getAvgSliceTime(), TimeUnit.NANOSECONDS).toString()); + } } builder.field(NODE_TIME_RAW.getPreferredName(), getTime()); - builder.field(BREAKDOWN.getPreferredName(), breakdown); + if (concurrent) { + builder.field(MAX_SLICE_NODE_TIME_RAW.getPreferredName(), getMaxSliceTime()); + builder.field(MIN_SLICE_NODE_TIME_RAW.getPreferredName(), getMinSliceTime()); + builder.field(AVG_SLICE_NODE_TIME_RAW.getPreferredName(), getAvgSliceTime()); + } + createBreakownView(builder); if (false == debug.isEmpty()) { builder.field(DEBUG.getPreferredName(), debug); } @@ -186,6 +269,24 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.endObject(); } + private void createBreakownView(XContentBuilder builder) throws IOException { + Map modifiedBreakdown = new LinkedHashMap<>(breakdown); + if (!concurrent) { + removeStartTimeFields(modifiedBreakdown); + } + builder.field(BREAKDOWN.getPreferredName(), modifiedBreakdown); + } + + static void removeStartTimeFields(Map modifiedBreakdown) { + Iterator> iterator = modifiedBreakdown.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getKey().endsWith("_startTime")) { + iterator.remove(); + } + } + } + private static final InstantiatingObjectParser PARSER; static { InstantiatingObjectParser.Builder parser = InstantiatingObjectParser.builder( diff --git a/server/src/main/java/org/opensearch/search/profile/Profilers.java b/server/src/main/java/org/opensearch/search/profile/Profilers.java index 2bc2f3a5a3920..68285f9417762 100644 --- a/server/src/main/java/org/opensearch/search/profile/Profilers.java +++ b/server/src/main/java/org/opensearch/search/profile/Profilers.java @@ -34,6 +34,7 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.profile.aggregation.AggregationProfiler; +import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler; import org.opensearch.search.profile.query.QueryProfiler; import java.util.ArrayList; @@ -55,13 +56,15 @@ public final class Profilers { public Profilers(ContextIndexSearcher searcher) { this.searcher = searcher; this.queryProfilers = new ArrayList<>(); - this.aggProfiler = new AggregationProfiler(); + this.aggProfiler = searcher.getSearchContext().isConcurrentSegmentSearchEnabled() + ? new ConcurrentAggregationProfiler() + : new AggregationProfiler(); addQueryProfiler(); } /** Switch to a new profile. */ public QueryProfiler addQueryProfiler() { - QueryProfiler profiler = new QueryProfiler(searcher.getExecutor() != null); + QueryProfiler profiler = new QueryProfiler(searcher.getSearchContext().isConcurrentSegmentSearchEnabled()); searcher.setProfiler(profiler); queryProfilers.add(profiler); return profiler; diff --git a/server/src/main/java/org/opensearch/search/profile/Timer.java b/server/src/main/java/org/opensearch/search/profile/Timer.java index 231324b4a5598..172762cabeb6a 100644 --- a/server/src/main/java/org/opensearch/search/profile/Timer.java +++ b/server/src/main/java/org/opensearch/search/profile/Timer.java @@ -51,7 +51,7 @@ public class Timer { private boolean doTiming; - private long timing, count, lastCount, start; + private long timing, count, lastCount, start, earliestTimerStartTime; /** pkg-private for testing */ long nanoTime() { @@ -71,6 +71,9 @@ public final void start() { doTiming = (count - lastCount) >= Math.min(lastCount >>> 8, 1024); if (doTiming) { start = nanoTime(); + if (count == 0) { + earliestTimerStartTime = start; + } } count++; } @@ -92,6 +95,14 @@ public final long getCount() { return count; } + /** Return the timer start time in nanoseconds.*/ + public final long getEarliestTimerStartTime() { + if (start != 0) { + throw new IllegalStateException("#start call misses a matching #stop call"); + } + return earliestTimerStartTime; + } + /** Return an approximation of the total time spent between consecutive calls of #start and #stop. */ public final long getApproximateTiming() { if (start != 0) { diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfileBreakdown.java index 24eccba575e77..608a3cc406e37 100644 --- a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfileBreakdown.java @@ -34,6 +34,7 @@ import org.opensearch.search.profile.AbstractProfileBreakdown; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -62,4 +63,18 @@ public void addDebugInfo(String key, Object value) { protected Map toDebugMap() { return unmodifiableMap(extra); } + + /** + * Build a timing count startTime breakdown for aggregation timing types + */ + @Override + protected final Map buildBreakdownMap(AbstractProfileBreakdown breakdown) { + Map map = new HashMap<>(breakdown.timings.length * 3); + for (AggregationTimingType timingType : breakdown.timingTypes) { + map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming()); + map.put(timingType + "_count", breakdown.timings[timingType.ordinal()].getCount()); + map.put(timingType + "_startTime", breakdown.timings[timingType.ordinal()].getEarliestTimerStartTime()); + } + return Collections.unmodifiableMap(map); + } } diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java index 1d2cf424ee5a7..7b9de4804a40e 100644 --- a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java @@ -36,8 +36,6 @@ import org.opensearch.search.profile.AbstractProfiler; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; /** @@ -47,29 +45,24 @@ */ public class AggregationProfiler extends AbstractProfiler { - private final Map, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>(); + private final Map profileBreakdownLookup = new HashMap<>(); public AggregationProfiler() { super(new InternalAggregationProfileTree()); } + /** + * This method does not need to be thread safe for concurrent search use case as well. + * The `AggregationProfileBreakdown` for each Aggregation operator is created in sync path when `preCollection` is + * called on the Aggregation collector instances during construction. + */ @Override public AggregationProfileBreakdown getQueryBreakdown(Aggregator agg) { - List path = getAggregatorPath(agg); - AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(path); + AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(agg); if (aggregationProfileBreakdown == null) { aggregationProfileBreakdown = super.getQueryBreakdown(agg); - profileBreakdownLookup.put(path, aggregationProfileBreakdown); + profileBreakdownLookup.put(agg, aggregationProfileBreakdown); } return aggregationProfileBreakdown; } - - public static List getAggregatorPath(Aggregator agg) { - LinkedList path = new LinkedList<>(); - while (agg != null) { - path.addFirst(agg.name()); - agg = agg.parent(); - } - return path; - } } diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java b/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java new file mode 100644 index 0000000000000..81dbf13a15911 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java @@ -0,0 +1,216 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.search.profile.aggregation; + +import org.opensearch.search.profile.ProfileResult; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Main class to profile aggregations with concurrent execution + * + * @opensearch.internal + */ +public class ConcurrentAggregationProfiler extends AggregationProfiler { + + private static final String MAX_PREFIX = "max_"; + private static final String MIN_PREFIX = "min_"; + private static final String AVG_PREFIX = "avg_"; + private final String[] breakdownCountStatsTypes = { "build_leaf_collector_count", "collect_count" }; + + @Override + public List getTree() { + List tree = profileTree.getTree(); + List reducedTree = new LinkedList<>(); + Map> sliceLevelAggregationMap = getSliceLevelAggregationMap(tree); + for (List profileResultsAcrossSlices : sliceLevelAggregationMap.values()) { + reducedTree.addAll(reduceProfileResultsTree(profileResultsAcrossSlices)); + } + return reducedTree; + } + + private List reduceProfileResultsTree(List profileResultsAcrossSlices) { + String type = profileResultsAcrossSlices.get(0).getQueryName(); + String description = profileResultsAcrossSlices.get(0).getLuceneDescription(); + long maxSliceNodeEndTime = Long.MIN_VALUE; + long minSliceNodeStartTime = Long.MAX_VALUE; + long maxSliceNodeTime = Long.MIN_VALUE; + long minSliceNodeTime = Long.MAX_VALUE; + long avgSliceNodeTime = 0L; + Map breakdown = new HashMap<>(); + Map timeStatsMap = new HashMap<>(); + Map minSliceStartTimeMap = new HashMap<>(); + Map maxSliceEndTimeMap = new HashMap<>(); + Map countStatsMap = new HashMap<>(); + Map debug = new HashMap<>(); + List children = new LinkedList<>(); + + for (ProfileResult profileResult : profileResultsAcrossSlices) { + long profileNodeTime = profileResult.getTime(); + long sliceStartTime = profileResult.getTimeBreakdown().get("initialize_startTime"); + + // Profiled total time + maxSliceNodeEndTime = Math.max(maxSliceNodeEndTime, sliceStartTime + profileNodeTime); + minSliceNodeStartTime = Math.min(minSliceNodeStartTime, sliceStartTime); + + // Profiled total time stats + maxSliceNodeTime = Math.max(maxSliceNodeTime, profileNodeTime); + minSliceNodeTime = Math.min(minSliceNodeTime, profileNodeTime); + avgSliceNodeTime += profileNodeTime; + + // Profiled breakdown time stats + for (AggregationTimingType timingType : AggregationTimingType.values()) { + buildBreakdownStatsMap(timeStatsMap, profileResult, timingType.toString()); + } + + // Profiled breakdown total time + for (AggregationTimingType timingType : AggregationTimingType.values()) { + String breakdownTimingType = timingType.toString(); + Long startTime = profileResult.getTimeBreakdown().get(breakdownTimingType + "_startTime"); + Long endTime = startTime + profileResult.getTimeBreakdown().get(breakdownTimingType); + minSliceStartTimeMap.put( + breakdownTimingType, + Math.min(minSliceStartTimeMap.getOrDefault(breakdownTimingType, Long.MAX_VALUE), startTime) + ); + maxSliceEndTimeMap.put( + breakdownTimingType, + Math.max(maxSliceEndTimeMap.getOrDefault(breakdownTimingType, Long.MIN_VALUE), endTime) + ); + } + + // Profiled breakdown count stats + for (String breakdownCountType : breakdownCountStatsTypes) { + buildBreakdownStatsMap(countStatsMap, profileResult, breakdownCountType); + } + + // Profiled breakdown count + for (AggregationTimingType timingType : AggregationTimingType.values()) { + String breakdownType = timingType.toString(); + String breakdownTypeCount = breakdownType + "_count"; + breakdown.put( + breakdownTypeCount, + breakdown.getOrDefault(breakdownTypeCount, 0L) + profileResult.getTimeBreakdown().get(breakdownTypeCount) + ); + } + + debug = profileResult.getDebugInfo(); + children.addAll(profileResult.getProfiledChildren()); + } + // nodeTime + long nodeTime = maxSliceNodeEndTime - minSliceNodeStartTime; + avgSliceNodeTime /= profileResultsAcrossSlices.size(); + + // Profiled breakdown time stats + for (AggregationTimingType breakdownTimingType : AggregationTimingType.values()) { + buildBreakdownMap(profileResultsAcrossSlices.size(), breakdown, timeStatsMap, breakdownTimingType.toString()); + } + + // Profiled breakdown total time + for (AggregationTimingType breakdownTimingType : AggregationTimingType.values()) { + String breakdownType = breakdownTimingType.toString(); + breakdown.put(breakdownType, maxSliceEndTimeMap.get(breakdownType) - minSliceStartTimeMap.get(breakdownType)); + } + + // Profiled breakdown count stats + for (String breakdownCountType : breakdownCountStatsTypes) { + buildBreakdownMap(profileResultsAcrossSlices.size(), breakdown, countStatsMap, breakdownCountType); + } + + // children + List reducedChildrenTree = new LinkedList<>(); + if (!children.isEmpty()) { + Map> sliceLevelAggregationMap = getSliceLevelAggregationMap(children); + for (List profileResults : sliceLevelAggregationMap.values()) { + reducedChildrenTree.addAll(reduceProfileResultsTree(profileResults)); + } + } + + ProfileResult reducedResult = new ProfileResult( + type, + description, + breakdown, + debug, + nodeTime, + reducedChildrenTree, + true, + maxSliceNodeTime, + minSliceNodeTime, + avgSliceNodeTime + ); + return List.of(reducedResult); + } + + static void buildBreakdownMap(int treeSize, Map breakdown, Map statsMap, String breakdownType) { + String maxBreakdownType = MAX_PREFIX + breakdownType; + String minBreakdownType = MIN_PREFIX + breakdownType; + String avgBreakdownType = AVG_PREFIX + breakdownType; + breakdown.put(maxBreakdownType, statsMap.get(maxBreakdownType)); + breakdown.put(minBreakdownType, statsMap.get(minBreakdownType)); + breakdown.put(avgBreakdownType, statsMap.get(avgBreakdownType) / treeSize); + } + + static void buildBreakdownStatsMap(Map statsMap, ProfileResult result, String breakdownType) { + String maxBreakdownType = MAX_PREFIX + breakdownType; + String minBreakdownType = MIN_PREFIX + breakdownType; + String avgBreakdownType = AVG_PREFIX + breakdownType; + statsMap.put( + maxBreakdownType, + Math.max(statsMap.getOrDefault(maxBreakdownType, Long.MIN_VALUE), result.getTimeBreakdown().get(breakdownType)) + ); + statsMap.put( + minBreakdownType, + Math.min(statsMap.getOrDefault(minBreakdownType, Long.MAX_VALUE), result.getTimeBreakdown().get(breakdownType)) + ); + statsMap.put(avgBreakdownType, statsMap.getOrDefault(avgBreakdownType, 0L) + result.getTimeBreakdown().get(breakdownType)); + } + + /** + * @return a slice level aggregation map where the key is the description of the aggregation and + * the value is a list of ProfileResult across all slices. + */ + static Map> getSliceLevelAggregationMap(List tree) { + Map> sliceLevelAggregationMap = new HashMap<>(); + for (ProfileResult result : tree) { + String description = result.getLuceneDescription(); + final List sliceLevelAggregationList = sliceLevelAggregationMap.computeIfAbsent( + description, + k -> new LinkedList<>() + ); + sliceLevelAggregationList.add(result); + } + return sliceLevelAggregationMap; + } +} diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/InternalAggregationProfileTree.java b/server/src/main/java/org/opensearch/search/profile/aggregation/InternalAggregationProfileTree.java index 36cfc53f41ccd..34716b87c7c9c 100644 --- a/server/src/main/java/org/opensearch/search/profile/aggregation/InternalAggregationProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/InternalAggregationProfileTree.java @@ -62,6 +62,10 @@ protected String getTypeFromElement(Aggregator element) { return element.getClass().getSimpleName(); } + /** + * @return is used to group aggregations with same name across slices. + * So the name returned here should be same across slices for an aggregation operator. + */ @Override protected String getDescriptionFromElement(Aggregator element) { return element.name(); diff --git a/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java b/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java index ae5a07478e814..994aa6ce4f752 100644 --- a/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java +++ b/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java @@ -116,6 +116,10 @@ private void doFromXContentTestWithRandomFields(boolean addRandomFields) throws assertNull(parser.nextToken()); } assertEquals(profileResult.getTime(), parsed.getTime()); + assertEquals(profileResult.getMaxSliceTime(), parsed.getMaxSliceTime()); + assertEquals(profileResult.getMinSliceTime(), parsed.getMinSliceTime()); + assertEquals(profileResult.getAvgSliceTime(), parsed.getAvgSliceTime()); + assertEquals(profileResult.isConcurrent(), parsed.isConcurrent()); assertToXContentEquivalent(originalBytes, toXContent(parsed, xContentType, humanReadable), xContentType); } @@ -239,4 +243,20 @@ public void testToXContent() throws IOException { Strings.toString(builder) ); } + + public void testRemoveStartTimeFields() { + Map breakdown = new HashMap<>(); + breakdown.put("initialize_startTime", 123456L); + breakdown.put("initialize_count", 1L); + breakdown.put("initialize", 654321L); + Map modifiedBreakdown = new LinkedHashMap<>(breakdown); + assertEquals(3, modifiedBreakdown.size()); + assertEquals(123456L, (long) modifiedBreakdown.get("initialize_startTime")); + assertEquals(1L, (long) modifiedBreakdown.get("initialize_count")); + assertEquals(654321L, (long) modifiedBreakdown.get("initialize")); + ProfileResult.removeStartTimeFields(modifiedBreakdown); + assertFalse(modifiedBreakdown.containsKey("initialize_startTime")); + assertTrue(modifiedBreakdown.containsKey("initialize_count")); + assertTrue(modifiedBreakdown.containsKey("initialize")); + } } diff --git a/server/src/test/java/org/opensearch/search/profile/TimerTests.java b/server/src/test/java/org/opensearch/search/profile/TimerTests.java index deed451c21933..5997292eb8f56 100644 --- a/server/src/test/java/org/opensearch/search/profile/TimerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/TimerTests.java @@ -71,10 +71,14 @@ long nanoTime() { return time += 42; } }; - for (int i = 1; i < 100000; ++i) { + t.start(); + t.stop(); + long timerStartTime = t.getEarliestTimerStartTime(); + for (int i = 2; i < 100000; ++i) { t.start(); t.stop(); assertEquals(i, t.getCount()); + assertEquals(timerStartTime, t.getEarliestTimerStartTime()); // Make sure the cumulated timing is 42 times the number of calls as expected assertEquals(i * 42L, t.getApproximateTiming()); } diff --git a/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java new file mode 100644 index 0000000000000..88e822f576600 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java @@ -0,0 +1,207 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.search.profile.aggregation; + +import org.opensearch.search.profile.ProfileResult; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class ConcurrentAggregationProfilerTests extends OpenSearchTestCase { + + public static List createConcurrentSearchProfileTree() { + List tree = new ArrayList<>(); + // Aggregation + tree.add( + new ProfileResult( + "NumericTermsAggregator", + "test_scoped_agg", + new LinkedHashMap<>(), + new HashMap<>(), + 10847417L, + List.of( + new ProfileResult( + "GlobalOrdinalsStringTermsAggregator", + "test_terms", + new LinkedHashMap<>(), + new HashMap<>(), + 3359835L, + List.of(), + true, + 1490667L, + 1180123L, + 1240676L + ) + ), + true, + 94582L, + 18667L, + 211749L + ) + ); + tree.add( + new ProfileResult( + "NumericTermsAggregator", + "test_scoped_agg", + new LinkedHashMap<>(), + new HashMap<>(), + 10776655L, + List.of( + new ProfileResult( + "GlobalOrdinalsStringTermsAggregator", + "test_terms", + new LinkedHashMap<>(), + new HashMap<>(), + 3359567L, + List.of(), + true, + 1390554L, + 1180321L, + 1298776L + ) + ), + true, + 94560L, + 11237L, + 236440L + ) + ); + // Global Aggregation + tree.add( + new ProfileResult( + "GlobalAggregator", + "test_global_agg", + new LinkedHashMap<>(), + new HashMap<>(), + 19631335L, + List.of(), + true, + 563002L, + 142210L, + 1216631L + ) + ); + tree.add( + new ProfileResult( + "GlobalAggregator", + "test_global_agg", + new LinkedHashMap<>(), + new HashMap<>(), + 19634567L, + List.of(), + true, + 563333L, + 146783L, + 1496600L + ) + ); + return tree; + } + + public void testBuildTimeStatsBreakdownMap() { + List tree = createConcurrentSearchProfileTree(); + Map breakdown = new HashMap<>(); + Map timeStatsMap = new HashMap<>(); + timeStatsMap.put("max_initialize", 30L); + timeStatsMap.put("min_initialize", 10L); + timeStatsMap.put("avg_initialize", 60L); + ConcurrentAggregationProfiler.buildBreakdownMap(tree.size(), breakdown, timeStatsMap, "initialize"); + assertTrue(breakdown.containsKey("max_initialize")); + assertTrue(breakdown.containsKey("min_initialize")); + assertTrue(breakdown.containsKey("avg_initialize")); + assertEquals(30L, (long) breakdown.get("max_initialize")); + assertEquals(10L, (long) breakdown.get("min_initialize")); + assertEquals(15L, (long) breakdown.get("avg_initialize")); + } + + public void testBuildCountStatsBreakdownMap() { + List tree = createConcurrentSearchProfileTree(); + Map breakdown = new HashMap<>(); + Map countStatsMap = new HashMap<>(); + countStatsMap.put("max_collect_count", 3L); + countStatsMap.put("min_collect_count", 1L); + countStatsMap.put("avg_collect_count", 6L); + ConcurrentAggregationProfiler.buildBreakdownMap(tree.size(), breakdown, countStatsMap, "collect_count"); + assertTrue(breakdown.containsKey("max_collect_count")); + assertTrue(breakdown.containsKey("min_collect_count")); + assertTrue(breakdown.containsKey("avg_collect_count")); + assertEquals(3L, (long) breakdown.get("max_collect_count")); + assertEquals(1L, (long) breakdown.get("min_collect_count")); + assertEquals(1L, (long) breakdown.get("avg_collect_count")); + } + + public void testBuildBreakdownStatsMap() { + Map statsMap = new HashMap<>(); + ConcurrentAggregationProfiler.buildBreakdownStatsMap( + statsMap, + new ProfileResult("NumericTermsAggregator", "desc", Map.of("initialize", 100L), Map.of(), 130L, List.of()), + "initialize" + ); + assertTrue(statsMap.containsKey("max_initialize")); + assertTrue(statsMap.containsKey("min_initialize")); + assertTrue(statsMap.containsKey("avg_initialize")); + assertEquals(100L, (long) statsMap.get("max_initialize")); + assertEquals(100L, (long) statsMap.get("min_initialize")); + assertEquals(100L, (long) statsMap.get("avg_initialize")); + ConcurrentAggregationProfiler.buildBreakdownStatsMap( + statsMap, + new ProfileResult("NumericTermsAggregator", "desc", Map.of("initialize", 50L), Map.of(), 120L, List.of()), + "initialize" + ); + assertEquals(100L, (long) statsMap.get("max_initialize")); + assertEquals(50L, (long) statsMap.get("min_initialize")); + assertEquals(150L, (long) statsMap.get("avg_initialize")); + } + + public void testGetSliceLevelAggregationMap() { + List tree = createConcurrentSearchProfileTree(); + Map> aggregationMap = ConcurrentAggregationProfiler.getSliceLevelAggregationMap(tree); + assertEquals(2, aggregationMap.size()); + assertTrue(aggregationMap.containsKey("test_scoped_agg")); + assertTrue(aggregationMap.containsKey("test_global_agg")); + assertEquals(2, aggregationMap.get("test_scoped_agg").size()); + assertEquals(2, aggregationMap.get("test_global_agg").size()); + for (int slice_id : new int[] { 0, 1 }) { + assertEquals(1, aggregationMap.get("test_scoped_agg").get(slice_id).getProfiledChildren().size()); + assertEquals( + "test_terms", + aggregationMap.get("test_scoped_agg").get(slice_id).getProfiledChildren().get(0).getLuceneDescription() + ); + assertEquals(0, aggregationMap.get("test_global_agg").get(slice_id).getProfiledChildren().size()); + } + } +} From f53ead2e7680bdf425885d19fa6317cdc588ffd9 Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Thu, 27 Jul 2023 18:30:10 -0700 Subject: [PATCH 2/4] Address review comments for support for aggregation profiler with concurrent aggregation (#8801) Signed-off-by: Ticheng Lin --- .../aggregation/AggregationProfilerIT.java | 12 ++-- .../org/opensearch/search/SearchService.java | 2 +- .../search/internal/ContextIndexSearcher.java | 4 -- .../profile/AbstractProfileBreakdown.java | 25 +++---- .../search/profile/ProfileResult.java | 2 +- .../opensearch/search/profile/Profilers.java | 10 +-- .../AggregationProfileBreakdown.java | 12 ++-- .../aggregation/AggregationProfiler.java | 5 +- .../ConcurrentAggregationProfiler.java | 30 ++------ .../ConcurrentQueryProfileBreakdown.java | 4 +- .../search/profile/ProfileResultTests.java | 69 ++++++++++++++++++- .../opensearch/test/TestSearchContext.java | 2 +- 12 files changed, 106 insertions(+), 71 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java index 95350bdb012e0..51c92e8653585 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java @@ -91,12 +91,12 @@ public class AggregationProfilerIT extends OpenSearchIntegTestCase { POST_COLLECTION + "_count", BUILD_AGGREGATION + "_count", REDUCE + "_count", - INITIALIZE + "_startTime", - BUILD_LEAF_COLLECTOR + "_startTime", - COLLECT + "_startTime", - POST_COLLECTION + "_startTime", - BUILD_AGGREGATION + "_startTime", - REDUCE + "_startTime" + INITIALIZE + "_start_time", + BUILD_LEAF_COLLECTOR + "_start_time", + COLLECT + "_start_time", + POST_COLLECTION + "_start_time", + BUILD_AGGREGATION + "_start_time", + REDUCE + "_start_time" ); private static final Set CONCURRENT_SEARCH_BREAKDOWN_KEYS = Set.of( diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index c9c70ed52c376..0259731992f2d 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1270,7 +1270,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.minimumScore(source.minScore()); } if (source.profile()) { - context.setProfilers(new Profilers(context.searcher())); + context.setProfilers(new Profilers(context.searcher(), context.isConcurrentSegmentSearchEnabled())); } if (source.timeout() != null) { context.timeout(source.timeout()); diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index d2eda91b48c69..5384b47cc69ec 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -449,10 +449,6 @@ public DirectoryReader getDirectoryReader() { return (DirectoryReader) reader; } - public SearchContext getSearchContext() { - return searchContext; - } - private static class MutableQueryTimeout implements ExitableDirectoryReader.QueryCancellation { private final Set runnables = new HashSet<>(); diff --git a/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java index d8590d06ed935..67ab062c0e3ca 100644 --- a/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java @@ -50,8 +50,10 @@ public abstract class AbstractProfileBreakdown> { /** * The accumulated timings for this query node */ - public final Timer[] timings; - public final T[] timingTypes; + protected final Timer[] timings; + protected final T[] timingTypes; + public static final String TIMING_TYPE_COUNT_SUFFIX = "_count"; + public static final String TIMING_TYPE_START_TIME_SUFFIX = "_start_time"; /** Sole constructor. */ public AbstractProfileBreakdown(Class clazz) { @@ -74,17 +76,10 @@ public void setTimer(T timing, Timer timer) { * Build a timing count breakdown for current instance */ public Map toBreakdownMap() { - return buildBreakdownMap(this); - } - - /** - * Build a timing count breakdown for arbitrary instance - */ - protected Map buildBreakdownMap(AbstractProfileBreakdown breakdown) { - Map map = new HashMap<>(breakdown.timings.length * 3); - for (T timingType : breakdown.timingTypes) { - map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming()); - map.put(timingType + "_count", breakdown.timings[timingType.ordinal()].getCount()); + Map map = new HashMap<>(this.timings.length * 3); + for (T timingType : this.timingTypes) { + map.put(timingType.toString(), this.timings[timingType.ordinal()].getApproximateTiming()); + map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, this.timings[timingType.ordinal()].getCount()); } return Collections.unmodifiableMap(map); } @@ -103,8 +98,4 @@ public final long toNodeTime() { } return total; } - - public final long toNodeStartTime() { - return timings[timingTypes[0].ordinal()].getEarliestTimerStartTime(); - } } diff --git a/server/src/main/java/org/opensearch/search/profile/ProfileResult.java b/server/src/main/java/org/opensearch/search/profile/ProfileResult.java index 0d90d03e28bf1..dd936c33ab942 100644 --- a/server/src/main/java/org/opensearch/search/profile/ProfileResult.java +++ b/server/src/main/java/org/opensearch/search/profile/ProfileResult.java @@ -281,7 +281,7 @@ static void removeStartTimeFields(Map modifiedBreakdown) { Iterator> iterator = modifiedBreakdown.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); - if (entry.getKey().endsWith("_startTime")) { + if (entry.getKey().endsWith(AbstractProfileBreakdown.TIMING_TYPE_START_TIME_SUFFIX)) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/search/profile/Profilers.java b/server/src/main/java/org/opensearch/search/profile/Profilers.java index 68285f9417762..8e87c7ff4acd4 100644 --- a/server/src/main/java/org/opensearch/search/profile/Profilers.java +++ b/server/src/main/java/org/opensearch/search/profile/Profilers.java @@ -51,20 +51,20 @@ public final class Profilers { private final ContextIndexSearcher searcher; private final List queryProfilers; private final AggregationProfiler aggProfiler; + private final boolean isConcurrentSegmentSearchEnabled; /** Sole constructor. This {@link Profilers} instance will initially wrap one {@link QueryProfiler}. */ - public Profilers(ContextIndexSearcher searcher) { + public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearchEnabled) { this.searcher = searcher; + this.isConcurrentSegmentSearchEnabled = isConcurrentSegmentSearchEnabled; this.queryProfilers = new ArrayList<>(); - this.aggProfiler = searcher.getSearchContext().isConcurrentSegmentSearchEnabled() - ? new ConcurrentAggregationProfiler() - : new AggregationProfiler(); + this.aggProfiler = isConcurrentSegmentSearchEnabled ? new ConcurrentAggregationProfiler() : new AggregationProfiler(); addQueryProfiler(); } /** Switch to a new profile. */ public QueryProfiler addQueryProfiler() { - QueryProfiler profiler = new QueryProfiler(searcher.getSearchContext().isConcurrentSegmentSearchEnabled()); + QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled); searcher.setProfiler(profiler); queryProfilers.add(profiler); return profiler; diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfileBreakdown.java index 608a3cc406e37..d0c67915e6d8d 100644 --- a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfileBreakdown.java @@ -68,12 +68,12 @@ protected Map toDebugMap() { * Build a timing count startTime breakdown for aggregation timing types */ @Override - protected final Map buildBreakdownMap(AbstractProfileBreakdown breakdown) { - Map map = new HashMap<>(breakdown.timings.length * 3); - for (AggregationTimingType timingType : breakdown.timingTypes) { - map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming()); - map.put(timingType + "_count", breakdown.timings[timingType.ordinal()].getCount()); - map.put(timingType + "_startTime", breakdown.timings[timingType.ordinal()].getEarliestTimerStartTime()); + public Map toBreakdownMap() { + Map map = new HashMap<>(timings.length * 3); + for (AggregationTimingType timingType : timingTypes) { + map.put(timingType.toString(), timings[timingType.ordinal()].getApproximateTiming()); + map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, timings[timingType.ordinal()].getCount()); + map.put(timingType + TIMING_TYPE_START_TIME_SUFFIX, timings[timingType.ordinal()].getEarliestTimerStartTime()); } return Collections.unmodifiableMap(map); } diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java index 7b9de4804a40e..39620c25dc5a3 100644 --- a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java @@ -53,8 +53,9 @@ public AggregationProfiler() { /** * This method does not need to be thread safe for concurrent search use case as well. - * The `AggregationProfileBreakdown` for each Aggregation operator is created in sync path when `preCollection` is - * called on the Aggregation collector instances during construction. + * The {@link AggregationProfileBreakdown} for each Aggregation operator is created in sync path when + * {@link org.opensearch.search.aggregations.BucketCollector#preCollection()} is called + * on the Aggregation collector instances during construction. */ @Override public AggregationProfileBreakdown getQueryBreakdown(Aggregator agg) { diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java b/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java index 81dbf13a15911..9048ac9f2892d 100644 --- a/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java @@ -6,25 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. @@ -33,6 +14,7 @@ package org.opensearch.search.profile.aggregation; import org.opensearch.search.profile.ProfileResult; +import org.opensearch.search.profile.AbstractProfileBreakdown; import java.util.HashMap; import java.util.LinkedList; @@ -49,7 +31,8 @@ public class ConcurrentAggregationProfiler extends AggregationProfiler { private static final String MAX_PREFIX = "max_"; private static final String MIN_PREFIX = "min_"; private static final String AVG_PREFIX = "avg_"; - private final String[] breakdownCountStatsTypes = { "build_leaf_collector_count", "collect_count" }; + private static final String START_TIME_KEY = AggregationTimingType.INITIALIZE + AbstractProfileBreakdown.TIMING_TYPE_START_TIME_SUFFIX; + private static final String[] breakdownCountStatsTypes = { "build_leaf_collector_count", "collect_count" }; @Override public List getTree() { @@ -80,7 +63,7 @@ private List reduceProfileResultsTree(List profile for (ProfileResult profileResult : profileResultsAcrossSlices) { long profileNodeTime = profileResult.getTime(); - long sliceStartTime = profileResult.getTimeBreakdown().get("initialize_startTime"); + long sliceStartTime = profileResult.getTimeBreakdown().get(START_TIME_KEY); // Profiled total time maxSliceNodeEndTime = Math.max(maxSliceNodeEndTime, sliceStartTime + profileNodeTime); @@ -99,7 +82,8 @@ private List reduceProfileResultsTree(List profile // Profiled breakdown total time for (AggregationTimingType timingType : AggregationTimingType.values()) { String breakdownTimingType = timingType.toString(); - Long startTime = profileResult.getTimeBreakdown().get(breakdownTimingType + "_startTime"); + Long startTime = profileResult.getTimeBreakdown() + .get(breakdownTimingType + AbstractProfileBreakdown.TIMING_TYPE_START_TIME_SUFFIX); Long endTime = startTime + profileResult.getTimeBreakdown().get(breakdownTimingType); minSliceStartTimeMap.put( breakdownTimingType, @@ -119,7 +103,7 @@ private List reduceProfileResultsTree(List profile // Profiled breakdown count for (AggregationTimingType timingType : AggregationTimingType.values()) { String breakdownType = timingType.toString(); - String breakdownTypeCount = breakdownType + "_count"; + String breakdownTypeCount = breakdownType + AbstractProfileBreakdown.TIMING_TYPE_COUNT_SUFFIX; breakdown.put( breakdownTypeCount, breakdown.getOrDefault(breakdownTypeCount, 0L) + profileResult.getTimeBreakdown().get(breakdownTypeCount) diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java index 74ef78bc93c5f..6f0c78e8b307d 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java @@ -44,10 +44,10 @@ public AbstractProfileBreakdown context(Object context) { @Override public Map toBreakdownMap() { - final Map map = new HashMap<>(buildBreakdownMap(this)); + final Map map = new HashMap<>(super.toBreakdownMap()); for (final AbstractProfileBreakdown context : contexts.values()) { - for (final Map.Entry entry : buildBreakdownMap(context).entrySet()) { + for (final Map.Entry entry : context.toBreakdownMap().entrySet()) { map.merge(entry.getKey(), entry.getValue(), Long::sum); } } diff --git a/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java b/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java index 994aa6ce4f752..4c0417a4a7b36 100644 --- a/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java +++ b/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java @@ -242,20 +242,83 @@ public void testToXContent() throws IOException { + "}", Strings.toString(builder) ); + + result = new ProfileResult( + "profileName", + "some description", + Map.of("key1", 1234L), + Map.of(), + 1234L, + List.of(), + true, + 321L, + 123L, + 222L + ); + builder = XContentFactory.jsonBuilder().prettyPrint(); + result.toXContent(builder, ToXContent.EMPTY_PARAMS); + assertEquals( + "{\n" + + " \"type\" : \"profileName\",\n" + + " \"description\" : \"some description\",\n" + + " \"time_in_nanos\" : 1234,\n" + + " \"max_slice_time_in_nanos\" : 321,\n" + + " \"min_slice_time_in_nanos\" : 123,\n" + + " \"avg_slice_time_in_nanos\" : 222,\n" + + " \"breakdown\" : {\n" + + " \"key1\" : 1234\n" + + " }\n" + + "}", + Strings.toString(builder) + ); + + result = new ProfileResult( + "profileName", + "some description", + Map.of("key1", 1234567890L), + Map.of(), + 1234567890L, + List.of(), + true, + 87654321L, + 12345678, + 54637281L + ); + builder = XContentFactory.jsonBuilder().prettyPrint().humanReadable(true); + result.toXContent(builder, ToXContent.EMPTY_PARAMS); + assertEquals( + "{\n" + + " \"type\" : \"profileName\",\n" + + " \"description\" : \"some description\",\n" + + " \"time\" : \"1.2s\",\n" + + " \"max_slice_time\" : \"87.6ms\",\n" + + " \"min_slice_time\" : \"12.3ms\",\n" + + " \"avg_slice_time\" : \"54.6ms\",\n" + + " \"time_in_nanos\" : 1234567890,\n" + + " \"max_slice_time_in_nanos\" : 87654321,\n" + + " \"min_slice_time_in_nanos\" : 12345678,\n" + + " \"avg_slice_time_in_nanos\" : 54637281,\n" + + " \"breakdown\" : {\n" + + " \"key1\" : 1234567890\n" + + " }\n" + + "}", + Strings.toString(builder) + ); + } public void testRemoveStartTimeFields() { Map breakdown = new HashMap<>(); - breakdown.put("initialize_startTime", 123456L); + breakdown.put("initialize_start_time", 123456L); breakdown.put("initialize_count", 1L); breakdown.put("initialize", 654321L); Map modifiedBreakdown = new LinkedHashMap<>(breakdown); assertEquals(3, modifiedBreakdown.size()); - assertEquals(123456L, (long) modifiedBreakdown.get("initialize_startTime")); + assertEquals(123456L, (long) modifiedBreakdown.get("initialize_start_time")); assertEquals(1L, (long) modifiedBreakdown.get("initialize_count")); assertEquals(654321L, (long) modifiedBreakdown.get("initialize")); ProfileResult.removeStartTimeFields(modifiedBreakdown); - assertFalse(modifiedBreakdown.containsKey("initialize_startTime")); + assertFalse(modifiedBreakdown.containsKey("initialize_start_time")); assertTrue(modifiedBreakdown.containsKey("initialize_count")); assertTrue(modifiedBreakdown.containsKey("initialize")); } diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index 4e44791e77566..07efb603ac0e2 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -686,7 +686,7 @@ public TestSearchContext withCleanQueryResult() { * Add profilers to the query */ public TestSearchContext withProfilers() { - this.profilers = new Profilers(searcher); + this.profilers = new Profilers(searcher, concurrentSegmentSearchEnabled); return this; } } From 7fd4d6953241ab7acae0e0c384c257c3e8b0b19d Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Fri, 28 Jul 2023 18:18:52 -0700 Subject: [PATCH 3/4] Refactor ProfileResult class and add more tests Signed-off-by: Ticheng Lin --- .../aggregation/AggregationProfilerIT.java | 60 +++++++++----- .../search/profile/ProfileResult.java | 79 ++++++++----------- .../ConcurrentAggregationProfiler.java | 1 - .../search/profile/ProfileResultTests.java | 47 +++++------ .../AggregationProfileShardResultTests.java | 2 +- .../ConcurrentAggregationProfilerTests.java | 25 ------ .../query/QueryProfileShardResultTests.java | 2 +- 7 files changed, 102 insertions(+), 114 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java index 51c92e8653585..9d0c30c5a488f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java @@ -214,7 +214,8 @@ public void testSimpleProfile() { assertThat(histoAggResult.getTime(), greaterThan(0L)); Map breakdown = histoAggResult.getTimeBreakdown(); assertThat(breakdown, notNullValue()); - if (histoAggResult.isConcurrent()) { + if (histoAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -261,7 +262,8 @@ public void testMultiLevelProfile() { assertThat(histoAggResult.getTime(), greaterThan(0L)); Map histoBreakdown = histoAggResult.getTimeBreakdown(); assertThat(histoBreakdown, notNullValue()); - if (histoAggResult.isConcurrent()) { + if (histoAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -283,7 +285,8 @@ public void testMultiLevelProfile() { assertThat(termsAggResult.getTime(), greaterThan(0L)); Map termsBreakdown = termsAggResult.getTimeBreakdown(); assertThat(termsBreakdown, notNullValue()); - if (termsAggResult.isConcurrent()) { + if (termsAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(termsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(termsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -302,7 +305,8 @@ public void testMultiLevelProfile() { assertThat(avgAggResult.getTime(), greaterThan(0L)); Map avgBreakdown = termsAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - if (avgAggResult.isConcurrent()) { + if (avgAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -359,7 +363,8 @@ public void testMultiLevelProfileBreadthFirst() { assertThat(histoAggResult.getTime(), greaterThan(0L)); Map histoBreakdown = histoAggResult.getTimeBreakdown(); assertThat(histoBreakdown, notNullValue()); - if (histoAggResult.isConcurrent()) { + if (histoAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -381,7 +386,8 @@ public void testMultiLevelProfileBreadthFirst() { assertThat(termsAggResult.getTime(), greaterThan(0L)); Map termsBreakdown = termsAggResult.getTimeBreakdown(); assertThat(termsBreakdown, notNullValue()); - if (termsAggResult.isConcurrent()) { + if (termsAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(termsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(termsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -400,7 +406,8 @@ public void testMultiLevelProfileBreadthFirst() { assertThat(avgAggResult.getTime(), greaterThan(0L)); Map avgBreakdown = avgAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - if (avgAggResult.isConcurrent()) { + if (avgAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -442,7 +449,8 @@ public void testDiversifiedAggProfile() { assertThat(diversifyAggResult.getTime(), greaterThan(0L)); Map diversifyBreakdown = diversifyAggResult.getTimeBreakdown(); assertThat(diversifyBreakdown, notNullValue()); - if (diversifyAggResult.isConcurrent()) { + if (diversifyAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(diversifyBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(diversifyBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -463,7 +471,8 @@ public void testDiversifiedAggProfile() { assertThat(maxAggResult.getTime(), greaterThan(0L)); Map maxBreakdown = maxAggResult.getTimeBreakdown(); assertThat(maxBreakdown, notNullValue()); - if (maxAggResult.isConcurrent()) { + if (maxAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(maxBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -520,7 +529,8 @@ public void testComplexProfile() { assertThat(histoAggResult.getTime(), greaterThan(0L)); Map histoBreakdown = histoAggResult.getTimeBreakdown(); assertThat(histoBreakdown, notNullValue()); - if (histoAggResult.isConcurrent()) { + if (histoAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -547,7 +557,8 @@ public void testComplexProfile() { assertThat(tagsAggResult.getTime(), greaterThan(0L)); Map tagsBreakdown = tagsAggResult.getTimeBreakdown(); assertThat(tagsBreakdown, notNullValue()); - if (tagsAggResult.isConcurrent()) { + if (tagsAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(tagsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(tagsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -571,7 +582,8 @@ public void testComplexProfile() { assertThat(avgAggResult.getTime(), greaterThan(0L)); Map avgBreakdown = avgAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - if (avgAggResult.isConcurrent()) { + if (avgAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -591,7 +603,8 @@ public void testComplexProfile() { assertThat(maxAggResult.getTime(), greaterThan(0L)); Map maxBreakdown = maxAggResult.getTimeBreakdown(); assertThat(maxBreakdown, notNullValue()); - if (maxAggResult.isConcurrent()) { + if (maxAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(maxBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -611,7 +624,8 @@ public void testComplexProfile() { assertThat(stringsAggResult.getTime(), greaterThan(0L)); Map stringsBreakdown = stringsAggResult.getTimeBreakdown(); assertThat(stringsBreakdown, notNullValue()); - if (stringsAggResult.isConcurrent()) { + if (stringsAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(stringsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(stringsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -635,7 +649,8 @@ public void testComplexProfile() { assertThat(avgAggResult.getTime(), greaterThan(0L)); avgBreakdown = avgAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - if (avgAggResult.isConcurrent()) { + if (avgAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -655,7 +670,8 @@ public void testComplexProfile() { assertThat(maxAggResult.getTime(), greaterThan(0L)); maxBreakdown = maxAggResult.getTimeBreakdown(); assertThat(maxBreakdown, notNullValue()); - if (maxAggResult.isConcurrent()) { + if (maxAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(maxBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -676,7 +692,8 @@ public void testComplexProfile() { assertThat(tagsAggResult.getTime(), greaterThan(0L)); tagsBreakdown = tagsAggResult.getTimeBreakdown(); assertThat(tagsBreakdown, notNullValue()); - if (tagsAggResult.isConcurrent()) { + if (tagsAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(tagsBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(tagsBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -700,7 +717,8 @@ public void testComplexProfile() { assertThat(avgAggResult.getTime(), greaterThan(0L)); avgBreakdown = avgAggResult.getTimeBreakdown(); assertThat(avgBreakdown, notNullValue()); - if (avgAggResult.isConcurrent()) { + if (avgAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(avgBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(avgBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -720,7 +738,8 @@ public void testComplexProfile() { assertThat(maxAggResult.getTime(), greaterThan(0L)); maxBreakdown = maxAggResult.getTimeBreakdown(); assertThat(maxBreakdown, notNullValue()); - if (maxAggResult.isConcurrent()) { + if (maxAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertThat(maxBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); } else { assertThat(maxBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); @@ -821,7 +840,8 @@ public void testGlobalAggWithStatsSubAggregatorProfile() { assertThat(globalAggResult.getTime(), greaterThan(0L)); Map breakdown = globalAggResult.getTimeBreakdown(); assertThat(breakdown, notNullValue()); - if (globalAggResult.isConcurrent()) { + if (globalAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled assertEquals(CONCURRENT_SEARCH_BREAKDOWN_KEYS, breakdown.keySet()); } else { assertEquals(BREAKDOWN_KEYS, breakdown.keySet()); diff --git a/server/src/main/java/org/opensearch/search/profile/ProfileResult.java b/server/src/main/java/org/opensearch/search/profile/ProfileResult.java index dd936c33ab942..d96db1d2dd8da 100644 --- a/server/src/main/java/org/opensearch/search/profile/ProfileResult.java +++ b/server/src/main/java/org/opensearch/search/profile/ProfileResult.java @@ -86,11 +86,10 @@ public final class ProfileResult implements Writeable, ToXContentObject { private final Map breakdown; private final Map debug; private final long nodeTime; - private final long maxSliceNodeTime; - private final long minSliceNodeTime; - private final long avgSliceNodeTime; + private Long maxSliceNodeTime; + private Long minSliceNodeTime; + private Long avgSliceNodeTime; private final List children; - private final boolean concurrent; public ProfileResult( String type, @@ -100,7 +99,7 @@ public ProfileResult( long nodeTime, List children ) { - this(type, description, breakdown, debug, nodeTime, children, false, -1, -1, -1); + this(type, description, breakdown, debug, nodeTime, children, null, null, null); } public ProfileResult( @@ -110,10 +109,9 @@ public ProfileResult( Map debug, long nodeTime, List children, - boolean concurrent, - long maxSliceNodeTime, - long minSliceNodeTime, - long avgSliceNodeTime + Long maxSliceNodeTime, + Long minSliceNodeTime, + Long avgSliceNodeTime ) { this.type = type; this.description = description; @@ -121,7 +119,6 @@ public ProfileResult( this.debug = debug == null ? Map.of() : debug; this.children = children == null ? List.of() : children; this.nodeTime = nodeTime; - this.concurrent = concurrent; this.maxSliceNodeTime = maxSliceNodeTime; this.minSliceNodeTime = minSliceNodeTime; this.avgSliceNodeTime = avgSliceNodeTime; @@ -138,21 +135,13 @@ public ProfileResult(StreamInput in) throws IOException { debug = in.readMap(StreamInput::readString, StreamInput::readGenericValue); children = in.readList(ProfileResult::new); if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - this.concurrent = in.readBoolean(); - if (concurrent) { - this.maxSliceNodeTime = in.readLong(); - this.minSliceNodeTime = in.readLong(); - this.avgSliceNodeTime = in.readLong(); - } else { - this.maxSliceNodeTime = -1; - this.minSliceNodeTime = -1; - this.avgSliceNodeTime = -1; - } + this.maxSliceNodeTime = in.readOptionalLong(); + this.minSliceNodeTime = in.readOptionalLong(); + this.avgSliceNodeTime = in.readOptionalLong(); } else { - this.concurrent = false; - this.maxSliceNodeTime = -1; - this.minSliceNodeTime = -1; - this.avgSliceNodeTime = -1; + this.maxSliceNodeTime = null; + this.minSliceNodeTime = null; + this.avgSliceNodeTime = null; } } @@ -165,12 +154,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(debug, StreamOutput::writeString, StreamOutput::writeGenericValue); out.writeList(children); if (out.getVersion().onOrAfter(Version.V_3_0_0)) { - out.writeBoolean(concurrent); - if (concurrent) { - out.writeLong(maxSliceNodeTime); - out.writeLong(minSliceNodeTime); - out.writeLong(avgSliceNodeTime); - } + out.writeOptionalLong(maxSliceNodeTime); + out.writeOptionalLong(minSliceNodeTime); + out.writeOptionalLong(avgSliceNodeTime); } } @@ -211,22 +197,18 @@ public long getTime() { return nodeTime; } - public long getMaxSliceTime() { + public Long getMaxSliceTime() { return maxSliceNodeTime; } - public long getMinSliceTime() { + public Long getMinSliceTime() { return minSliceNodeTime; } - public long getAvgSliceTime() { + public Long getAvgSliceTime() { return avgSliceNodeTime; } - public boolean isConcurrent() { - return concurrent; - } - /** * Returns a list of all profiled children queries */ @@ -241,19 +223,27 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(DESCRIPTION.getPreferredName(), description); if (builder.humanReadable()) { builder.field(NODE_TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString()); - if (concurrent) { + if (getMaxSliceTime() != null) { builder.field(MAX_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMaxSliceTime(), TimeUnit.NANOSECONDS).toString()); + } + if (getMinSliceTime() != null) { builder.field(MIN_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMinSliceTime(), TimeUnit.NANOSECONDS).toString()); + } + if (getAvgSliceTime() != null) { builder.field(AVG_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getAvgSliceTime(), TimeUnit.NANOSECONDS).toString()); } } builder.field(NODE_TIME_RAW.getPreferredName(), getTime()); - if (concurrent) { + if (getMaxSliceTime() != null) { builder.field(MAX_SLICE_NODE_TIME_RAW.getPreferredName(), getMaxSliceTime()); + } + if (getMinSliceTime() != null) { builder.field(MIN_SLICE_NODE_TIME_RAW.getPreferredName(), getMinSliceTime()); + } + if (getAvgSliceTime() != null) { builder.field(AVG_SLICE_NODE_TIME_RAW.getPreferredName(), getAvgSliceTime()); } - createBreakownView(builder); + createBreakdownView(builder); if (false == debug.isEmpty()) { builder.field(DEBUG.getPreferredName(), debug); } @@ -269,11 +259,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.endObject(); } - private void createBreakownView(XContentBuilder builder) throws IOException { + private void createBreakdownView(XContentBuilder builder) throws IOException { Map modifiedBreakdown = new LinkedHashMap<>(breakdown); - if (!concurrent) { - removeStartTimeFields(modifiedBreakdown); - } + removeStartTimeFields(modifiedBreakdown); builder.field(BREAKDOWN.getPreferredName(), modifiedBreakdown); } @@ -300,6 +288,9 @@ static void removeStartTimeFields(Map modifiedBreakdown) { parser.declareObject(optionalConstructorArg(), (p, c) -> p.map(), DEBUG); parser.declareLong(constructorArg(), NODE_TIME_RAW); parser.declareObjectArray(optionalConstructorArg(), (p, c) -> fromXContent(p), CHILDREN); + parser.declareLong(optionalConstructorArg(), MAX_SLICE_NODE_TIME_RAW); + parser.declareLong(optionalConstructorArg(), MIN_SLICE_NODE_TIME_RAW); + parser.declareLong(optionalConstructorArg(), AVG_SLICE_NODE_TIME_RAW); PARSER = parser.build(); } diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java b/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java index 9048ac9f2892d..7c9bf55a97de5 100644 --- a/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfiler.java @@ -149,7 +149,6 @@ private List reduceProfileResultsTree(List profile debug, nodeTime, reducedChildrenTree, - true, maxSliceNodeTime, minSliceNodeTime, avgSliceNodeTime diff --git a/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java b/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java index 4c0417a4a7b36..70b876b41ba08 100644 --- a/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java +++ b/server/src/test/java/org/opensearch/search/profile/ProfileResultTests.java @@ -56,7 +56,7 @@ public class ProfileResultTests extends OpenSearchTestCase { - public static ProfileResult createTestItem(int depth) { + public static ProfileResult createTestItem(int depth, boolean concurrentSegmentSearchEnabled) { String type = randomAlphaOfLengthBetween(5, 10); String description = randomAlphaOfLengthBetween(5, 10); int breakdownsSize = randomIntBetween(0, 5); @@ -77,13 +77,28 @@ public static ProfileResult createTestItem(int depth) { int childrenSize = depth > 0 ? randomIntBetween(0, 1) : 0; List children = new ArrayList<>(childrenSize); for (int i = 0; i < childrenSize; i++) { - children.add(createTestItem(depth - 1)); + children.add(createTestItem(depth - 1, concurrentSegmentSearchEnabled)); + } + if (concurrentSegmentSearchEnabled) { + return new ProfileResult( + type, + description, + breakdown, + debug, + randomNonNegativeLong(), + children, + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); + } else { + return new ProfileResult(type, description, breakdown, debug, randomNonNegativeLong(), children); } - return new ProfileResult(type, description, breakdown, debug, randomNonNegativeLong(), children); } public void testFromXContent() throws IOException { - doFromXContentTestWithRandomFields(false); + doFromXContentTestWithRandomFields(false, false); + doFromXContentTestWithRandomFields(false, true); } /** @@ -91,11 +106,12 @@ public void testFromXContent() throws IOException { * back to be forward compatible with additions to the xContent */ public void testFromXContentWithRandomFields() throws IOException { - doFromXContentTestWithRandomFields(true); + doFromXContentTestWithRandomFields(true, false); + doFromXContentTestWithRandomFields(true, true); } - private void doFromXContentTestWithRandomFields(boolean addRandomFields) throws IOException { - ProfileResult profileResult = createTestItem(2); + private void doFromXContentTestWithRandomFields(boolean addRandomFields, boolean concurrentSegmentSearchEnabled) throws IOException { + ProfileResult profileResult = createTestItem(2, concurrentSegmentSearchEnabled); XContentType xContentType = randomFrom(XContentType.values()); boolean humanReadable = randomBoolean(); BytesReference originalBytes = toShuffledXContent(profileResult, xContentType, ToXContent.EMPTY_PARAMS, humanReadable); @@ -119,7 +135,6 @@ private void doFromXContentTestWithRandomFields(boolean addRandomFields) throws assertEquals(profileResult.getMaxSliceTime(), parsed.getMaxSliceTime()); assertEquals(profileResult.getMinSliceTime(), parsed.getMinSliceTime()); assertEquals(profileResult.getAvgSliceTime(), parsed.getAvgSliceTime()); - assertEquals(profileResult.isConcurrent(), parsed.isConcurrent()); assertToXContentEquivalent(originalBytes, toXContent(parsed, xContentType, humanReadable), xContentType); } @@ -243,18 +258,7 @@ public void testToXContent() throws IOException { Strings.toString(builder) ); - result = new ProfileResult( - "profileName", - "some description", - Map.of("key1", 1234L), - Map.of(), - 1234L, - List.of(), - true, - 321L, - 123L, - 222L - ); + result = new ProfileResult("profileName", "some description", Map.of("key1", 1234L), Map.of(), 1234L, List.of(), 321L, 123L, 222L); builder = XContentFactory.jsonBuilder().prettyPrint(); result.toXContent(builder, ToXContent.EMPTY_PARAMS); assertEquals( @@ -279,9 +283,8 @@ public void testToXContent() throws IOException { Map.of(), 1234567890L, List.of(), - true, 87654321L, - 12345678, + 12345678L, 54637281L ); builder = XContentFactory.jsonBuilder().prettyPrint().humanReadable(true); diff --git a/server/src/test/java/org/opensearch/search/profile/aggregation/AggregationProfileShardResultTests.java b/server/src/test/java/org/opensearch/search/profile/aggregation/AggregationProfileShardResultTests.java index 33c95725dcd13..75fb846909aa1 100644 --- a/server/src/test/java/org/opensearch/search/profile/aggregation/AggregationProfileShardResultTests.java +++ b/server/src/test/java/org/opensearch/search/profile/aggregation/AggregationProfileShardResultTests.java @@ -57,7 +57,7 @@ public static AggregationProfileShardResult createTestItem(int depth) { int size = randomIntBetween(0, 5); List aggProfileResults = new ArrayList<>(size); for (int i = 0; i < size; i++) { - aggProfileResults.add(ProfileResultTests.createTestItem(1)); + aggProfileResults.add(ProfileResultTests.createTestItem(depth, false)); } return new AggregationProfileShardResult(aggProfileResults); } diff --git a/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java index 88e822f576600..e36b65f0a7b69 100644 --- a/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java @@ -6,25 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. @@ -61,13 +42,11 @@ public static List createConcurrentSearchProfileTree() { new HashMap<>(), 3359835L, List.of(), - true, 1490667L, 1180123L, 1240676L ) ), - true, 94582L, 18667L, 211749L @@ -88,13 +67,11 @@ public static List createConcurrentSearchProfileTree() { new HashMap<>(), 3359567L, List.of(), - true, 1390554L, 1180321L, 1298776L ) ), - true, 94560L, 11237L, 236440L @@ -109,7 +86,6 @@ public static List createConcurrentSearchProfileTree() { new HashMap<>(), 19631335L, List.of(), - true, 563002L, 142210L, 1216631L @@ -123,7 +99,6 @@ public static List createConcurrentSearchProfileTree() { new HashMap<>(), 19634567L, List.of(), - true, 563333L, 146783L, 1496600L diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfileShardResultTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfileShardResultTests.java index e703396f5cf02..b371015621647 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfileShardResultTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfileShardResultTests.java @@ -54,7 +54,7 @@ public static QueryProfileShardResult createTestItem() { int size = randomIntBetween(0, 5); List queryProfileResults = new ArrayList<>(size); for (int i = 0; i < size; i++) { - queryProfileResults.add(ProfileResultTests.createTestItem(1)); + queryProfileResults.add(ProfileResultTests.createTestItem(1, false)); } CollectorResult profileCollector = CollectorResultTests.createTestItem(2, false); long rewriteTime = randomNonNegativeLong(); From 289208e7492a039fe8a805ff35f2d2ec7efa2666 Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Fri, 28 Jul 2023 23:29:54 -0700 Subject: [PATCH 4/4] Fix flaky QueryProfilePhaseTests.testCollapseQuerySearchResults test Signed-off-by: Ticheng Lin --- .../src/main/java/org/opensearch/test/TestSearchContext.java | 1 + 1 file changed, 1 insertion(+) diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index 07efb603ac0e2..0ce63fbe2977e 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -160,6 +160,7 @@ public TestSearchContext( this.indexShard = indexShard; this.queryShardContext = queryShardContext; this.searcher = searcher; + this.concurrentSegmentSearchEnabled = searcher != null && (searcher.getExecutor() != null); this.scrollContext = scrollContext; }