From 2f0899886e629b85f0748a5a30de8a108c55ffcc Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Wed, 19 Jul 2023 23:36:42 -0700 Subject: [PATCH] Add support for aggregation profiler with concurrent aggregation Signed-off-by: Ticheng Lin --- CHANGELOG.md | 1 + .../profile/AbstractInternalProfileTree.java | 1 + .../profile/AbstractProfileBreakdown.java | 7 +- .../search/profile/ProfileResult.java | 115 +++++++++++++ .../opensearch/search/profile/Profilers.java | 2 +- .../org/opensearch/search/profile/Timer.java | 16 +- .../aggregation/AggregationProfiler.java | 161 ++++++++++++++++-- .../InternalAggregationProfileTree.java | 6 + 8 files changed, 293 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3eda3f852a24..5b4901360f3a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Perform aggregation postCollection in ContextIndexSearcher after searching leaves ([#8303](https://github.com/opensearch-project/OpenSearch/pull/8303)) - Make Span exporter configurable ([#8620](https://github.com/opensearch-project/OpenSearch/issues/8620)) +- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/search/profile/AbstractInternalProfileTree.java b/server/src/main/java/org/opensearch/search/profile/AbstractInternalProfileTree.java index 4d0949624ebed..5d041f0c8a49b 100644 --- a/server/src/main/java/org/opensearch/search/profile/AbstractInternalProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/AbstractInternalProfileTree.java @@ -186,6 +186,7 @@ private ProfileResult doGetTree(int token) { breakdown.toBreakdownMap(), breakdown.toDebugMap(), breakdown.toNodeTime(), + breakdown.toNodeStartTime(), childrenProfileResults ); } diff --git a/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java index a29d4f9a0ee20..d45ea61528c38 100644 --- a/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/AbstractProfileBreakdown.java @@ -84,7 +84,8 @@ protected final Map buildBreakdownMap(AbstractProfileBreakdown Map map = new HashMap<>(breakdown.timings.length * 2); for (T timingType : breakdown.timingTypes) { map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming()); - map.put(timingType.toString() + "_count", breakdown.timings[timingType.ordinal()].getCount()); + map.put(timingType + "_count", breakdown.timings[timingType.ordinal()].getCount()); + map.put(timingType + "_startTime", breakdown.timings[timingType.ordinal()].getTimerStartTime()); } return Collections.unmodifiableMap(map); } @@ -103,4 +104,8 @@ public final long toNodeTime() { } return total; } + + public final long toNodeStartTime() { + return timings[timingTypes[0].ordinal()].getTimerStartTime(); + } } diff --git a/server/src/main/java/org/opensearch/search/profile/ProfileResult.java b/server/src/main/java/org/opensearch/search/profile/ProfileResult.java index 89c3d7504de66..9f5d994a46170 100644 --- a/server/src/main/java/org/opensearch/search/profile/ProfileResult.java +++ b/server/src/main/java/org/opensearch/search/profile/ProfileResult.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -69,15 +70,29 @@ public final class ProfileResult implements Writeable, ToXContentObject { static final ParseField BREAKDOWN = new ParseField("breakdown"); static final ParseField DEBUG = new ParseField("debug"); static final ParseField NODE_TIME = new ParseField("time"); + static final ParseField SLICE_START_TIME = new ParseField("slice_start_time"); + static final ParseField MAX_SLICE_NODE_TIME = new ParseField("max_slice_time"); + static final ParseField MIN_SLICE_NODE_TIME = new ParseField("min_slice_time"); + static final ParseField AVG_SLICE_NODE_TIME = new ParseField("avg_slice_time"); static final ParseField NODE_TIME_RAW = new ParseField("time_in_nanos"); + static final ParseField SLICE_START_TIME_RAW = new ParseField("slice_start_time_in_nanos"); + static final ParseField MAX_SLICE_NODE_TIME_RAW = new ParseField("max_slice_time_in_nanos"); + static final ParseField MIN_SLICE_NODE_TIME_RAW = new ParseField("min_slice_time_in_nanos"); + static final ParseField AVG_SLICE_NODE_TIME_RAW = new ParseField("avg_slice_time_in_nanos"); static final ParseField CHILDREN = new ParseField("children"); + static final ParseField CONCURRENT = new ParseField("concurrent"); private final String type; private final String description; private final Map breakdown; private final Map debug; private final long nodeTime; + private final long nodeStartTime; + private final long maxSliceNodeTime; + private final long minSliceNodeTime; + private final long avgSliceNodeTime; private final List children; + private final boolean concurrent; public ProfileResult( String type, @@ -86,6 +101,48 @@ public ProfileResult( Map debug, long nodeTime, List children + ) { + this(type, description, breakdown, debug, nodeTime, -1, nodeTime, nodeTime, nodeTime, children, false); + } + + public ProfileResult( + String type, + String description, + Map breakdown, + Map debug, + long nodeTime, + long nodeStartTime, + List children + ) { + this(type, description, breakdown, debug, nodeTime, nodeStartTime, nodeTime, nodeTime, nodeTime, children, false); + } + + public ProfileResult( + String type, + String description, + Map breakdown, + Map debug, + long nodeTime, + long maxSliceNodeTime, + long minSliceNodeTime, + long avgSliceNodeTime, + List children + ) { + this(type, description, breakdown, debug, nodeTime, -1, maxSliceNodeTime, minSliceNodeTime, avgSliceNodeTime, children, true); + } + + public ProfileResult( + String type, + String description, + Map breakdown, + Map debug, + long nodeTime, + long nodeStartTime, + long maxSliceNodeTime, + long minSliceNodeTime, + long avgSliceNodeTime, + List children, + boolean concurrent ) { this.type = type; this.description = description; @@ -93,6 +150,11 @@ public ProfileResult( this.debug = debug == null ? Map.of() : debug; this.children = children == null ? List.of() : children; this.nodeTime = nodeTime; + this.nodeStartTime = nodeStartTime; + this.maxSliceNodeTime = maxSliceNodeTime; + this.minSliceNodeTime = minSliceNodeTime; + this.avgSliceNodeTime = avgSliceNodeTime; + this.concurrent = concurrent; } /** @@ -102,9 +164,14 @@ public ProfileResult(StreamInput in) throws IOException { this.type = in.readString(); this.description = in.readString(); this.nodeTime = in.readLong(); + this.nodeStartTime = in.readLong(); + this.maxSliceNodeTime = in.readLong(); + this.minSliceNodeTime = in.readLong(); + this.avgSliceNodeTime = in.readLong(); breakdown = in.readMap(StreamInput::readString, StreamInput::readLong); debug = in.readMap(StreamInput::readString, StreamInput::readGenericValue); children = in.readList(ProfileResult::new); + this.concurrent = in.readBoolean(); } @Override @@ -112,9 +179,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(type); out.writeString(description); out.writeLong(nodeTime); // not Vlong because can be negative + out.writeLong(nodeStartTime); + out.writeLong(maxSliceNodeTime); + out.writeLong(minSliceNodeTime); + out.writeLong(avgSliceNodeTime); out.writeMap(breakdown, StreamOutput::writeString, StreamOutput::writeLong); out.writeMap(debug, StreamOutput::writeString, StreamOutput::writeGenericValue); out.writeList(children); + out.writeBoolean(concurrent); } /** @@ -154,6 +226,26 @@ public long getTime() { return nodeTime; } + public long getSliceStartTime() { + return nodeStartTime; + } + + public long getMaxSliceTime() { + return maxSliceNodeTime; + } + + public long getMinSliceTime() { + return minSliceNodeTime; + } + + public long getAvgSliceTime() { + return avgSliceNodeTime; + } + + public boolean isConcurrent() { + return concurrent; + } + /** * Returns a list of all profiled children queries */ @@ -168,8 +260,26 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(DESCRIPTION.getPreferredName(), description); if (builder.humanReadable()) { builder.field(NODE_TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString()); + if (concurrent) { + builder.field(MAX_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMaxSliceTime(), TimeUnit.NANOSECONDS).toString()); + builder.field(MIN_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMinSliceTime(), TimeUnit.NANOSECONDS).toString()); + builder.field(AVG_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getAvgSliceTime(), TimeUnit.NANOSECONDS).toString()); + } } builder.field(NODE_TIME_RAW.getPreferredName(), getTime()); + if (concurrent) { + builder.field(MAX_SLICE_NODE_TIME_RAW.getPreferredName(), getMaxSliceTime()); + builder.field(MIN_SLICE_NODE_TIME_RAW.getPreferredName(), getMinSliceTime()); + builder.field(AVG_SLICE_NODE_TIME_RAW.getPreferredName(), getAvgSliceTime()); + } else { + Iterator> iterator = breakdown.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getKey().endsWith("_startTime")) { + iterator.remove(); + } + } + } builder.field(BREAKDOWN.getPreferredName(), breakdown); if (false == debug.isEmpty()) { builder.field(DEBUG.getPreferredName(), debug); @@ -198,7 +308,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws parser.declareObject(constructorArg(), (p, c) -> p.map(), BREAKDOWN); parser.declareObject(optionalConstructorArg(), (p, c) -> p.map(), DEBUG); parser.declareLong(constructorArg(), NODE_TIME_RAW); + parser.declareLong(constructorArg(), SLICE_START_TIME_RAW); + parser.declareLong(constructorArg(), MAX_SLICE_NODE_TIME_RAW); + parser.declareLong(constructorArg(), MIN_SLICE_NODE_TIME_RAW); + parser.declareLong(constructorArg(), AVG_SLICE_NODE_TIME_RAW); parser.declareObjectArray(optionalConstructorArg(), (p, c) -> fromXContent(p), CHILDREN); + parser.declareBoolean(constructorArg(), CONCURRENT); PARSER = parser.build(); } diff --git a/server/src/main/java/org/opensearch/search/profile/Profilers.java b/server/src/main/java/org/opensearch/search/profile/Profilers.java index 2bc2f3a5a3920..833c5b776a4cc 100644 --- a/server/src/main/java/org/opensearch/search/profile/Profilers.java +++ b/server/src/main/java/org/opensearch/search/profile/Profilers.java @@ -55,7 +55,7 @@ public final class Profilers { public Profilers(ContextIndexSearcher searcher) { this.searcher = searcher; this.queryProfilers = new ArrayList<>(); - this.aggProfiler = new AggregationProfiler(); + this.aggProfiler = new AggregationProfiler(searcher.getExecutor() != null); addQueryProfiler(); } diff --git a/server/src/main/java/org/opensearch/search/profile/Timer.java b/server/src/main/java/org/opensearch/search/profile/Timer.java index 231324b4a5598..ef65833ce3f7a 100644 --- a/server/src/main/java/org/opensearch/search/profile/Timer.java +++ b/server/src/main/java/org/opensearch/search/profile/Timer.java @@ -50,8 +50,8 @@ */ public class Timer { - private boolean doTiming; - private long timing, count, lastCount, start; + private boolean doTiming, isStarted; + private long timing, count, lastCount, start, timerStartTime; /** pkg-private for testing */ long nanoTime() { @@ -71,6 +71,10 @@ public final void start() { doTiming = (count - lastCount) >= Math.min(lastCount >>> 8, 1024); if (doTiming) { start = nanoTime(); + if (isStarted == false) { + timerStartTime = start; + isStarted = true; + } } count++; } @@ -92,6 +96,14 @@ public final long getCount() { return count; } + /** Return the timer start time in nanoseconds.*/ + public final long getTimerStartTime() { + if (start != 0) { + throw new IllegalStateException("#start call misses a matching #stop call"); + } + return timerStartTime; + } + /** Return an approximation of the total time spent between consecutive calls of #start and #stop. */ public final long getApproximateTiming() { if (start != 0) { diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java index 1d2cf424ee5a7..45ea4f8a777ae 100644 --- a/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/AggregationProfiler.java @@ -34,6 +34,7 @@ import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.profile.AbstractProfiler; +import org.opensearch.search.profile.ProfileResult; import java.util.HashMap; import java.util.LinkedList; @@ -47,29 +48,165 @@ */ public class AggregationProfiler extends AbstractProfiler { - private final Map, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>(); + private final Map profileBreakdownLookup = new HashMap<>(); + private final boolean concurrent; - public AggregationProfiler() { - super(new InternalAggregationProfileTree()); + private final String[] breakdownCountStatsTypes = { "build_leaf_collector_count", "collect_count" }; + + public AggregationProfiler(boolean concurrent) { + super(new InternalAggregationProfileTree(concurrent)); + this.concurrent = concurrent; } @Override public AggregationProfileBreakdown getQueryBreakdown(Aggregator agg) { - List path = getAggregatorPath(agg); - AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(path); + AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(agg); if (aggregationProfileBreakdown == null) { aggregationProfileBreakdown = super.getQueryBreakdown(agg); - profileBreakdownLookup.put(path, aggregationProfileBreakdown); + profileBreakdownLookup.put(agg, aggregationProfileBreakdown); } return aggregationProfileBreakdown; } - public static List getAggregatorPath(Aggregator agg) { - LinkedList path = new LinkedList<>(); - while (agg != null) { - path.addFirst(agg.name()); - agg = agg.parent(); + @Override + public List getTree() { + List tree = profileTree.getTree(); + if (concurrent) { + List reducedTree = new LinkedList<>(); + Map> sliceLevelAggregationMap = getSliceLevelAggregationMap(tree); + for (List profileResults : sliceLevelAggregationMap.values()) { + reducedTree.addAll(reduceProfileResultsTree(profileResults)); + } + return reducedTree; + } else { + return tree; + } + } + + private List reduceProfileResultsTree(List tree) { + String type = tree.get(0).getQueryName(); + String description = tree.get(0).getLuceneDescription(); + long maxSliceNodeEndTime = Long.MIN_VALUE; + long minSliceNodeStartTime = Long.MAX_VALUE; + long maxSliceNodeTime = Long.MIN_VALUE; + long minSliceNodeTime = Long.MAX_VALUE; + long avgSliceNodeTime = 0L; + Map breakdown = new HashMap<>(); + Map timeStatsMap = new HashMap<>(); + Map minSliceStartTimeMap = new HashMap<>(); + Map maxSliceEndTimeMap = new HashMap<>(); + Map countStatsMap = new HashMap<>(); + Map debug = new HashMap<>(); + List children = new LinkedList<>(); + + for (ProfileResult profileResult : tree) { + long profileNodeTime = profileResult.getTime(); + long sliceStartTime = profileResult.getSliceStartTime(); + + // Profiled total time + maxSliceNodeEndTime = Math.max(maxSliceNodeEndTime, sliceStartTime + profileNodeTime); + minSliceNodeStartTime = Math.min(minSliceNodeStartTime, sliceStartTime); + + // Profiled total time stats + maxSliceNodeTime = Math.max(maxSliceNodeTime, profileNodeTime); + minSliceNodeTime = Math.min(minSliceNodeTime, profileNodeTime); + avgSliceNodeTime += profileNodeTime; + + // Profiled breakdown time stats + for (AggregationTimingType timingType : AggregationTimingType.values()) { + buildBreakdownStatsMap(timeStatsMap, profileResult, timingType.toString()); + } + + // Profiled breakdown total time + for (AggregationTimingType timingType : AggregationTimingType.values()) { + String breakdownTimingType = timingType.toString(); + Long startTime = profileResult.getTimeBreakdown().get(breakdownTimingType + "_startTime"); + Long endTime = startTime + profileResult.getTimeBreakdown().get(breakdownTimingType); + minSliceStartTimeMap.put(breakdownTimingType, Math.min(minSliceStartTimeMap.getOrDefault(breakdownTimingType, Long.MAX_VALUE), startTime)); + maxSliceEndTimeMap.put(breakdownTimingType, Math.max(maxSliceEndTimeMap.getOrDefault(breakdownTimingType, Long.MIN_VALUE), endTime)); + } + + // Profiled breakdown count stats + for (String breakdownCountType : breakdownCountStatsTypes) { + buildBreakdownStatsMap(countStatsMap, profileResult, breakdownCountType); + } + + // Profiled breakdown count + for (AggregationTimingType timingType : AggregationTimingType.values()) { + String breakdownType = timingType.toString(); + String breakdownTypeCount = breakdownType + "_count"; + breakdown.put(breakdownTypeCount, breakdown.getOrDefault(breakdownTypeCount, 0L) + profileResult.getTimeBreakdown().get(breakdownTypeCount)); + } + + debug = profileResult.getDebugInfo(); + children.addAll(profileResult.getProfiledChildren()); + } + // nodeTime + long nodeTime = maxSliceNodeEndTime - minSliceNodeStartTime; + + // Profiled breakdown time stats + for (AggregationTimingType breakdownTimingType : AggregationTimingType.values()) { + buildBreakdownMap(tree, breakdown, timeStatsMap, breakdownTimingType.toString()); + } + + // Profiled breakdown total time + for (AggregationTimingType breakdownTimingType : AggregationTimingType.values()) { + String breakdownType = breakdownTimingType.toString(); + breakdown.put(breakdownType, maxSliceEndTimeMap.get(breakdownType) - minSliceStartTimeMap.get(breakdownType)); + } + + // Profiled breakdown count stats + for (String breakdownCountType : breakdownCountStatsTypes) { + buildBreakdownMap(tree, breakdown, countStatsMap, breakdownCountType); + } + + // children + if (!children.isEmpty()) { + children = reduceProfileResultsTree(children); + } + + ProfileResult reducedResult = new ProfileResult( + type, + description, + breakdown, + debug, + nodeTime, + maxSliceNodeTime, + minSliceNodeTime, + avgSliceNodeTime, + children + ); + List ret = new LinkedList<>(); + ret.add(reducedResult); + return ret; + } + + private void buildBreakdownMap(List tree, Map breakdown, Map timeStatsMap, String breakdownType) { + String maxBreakdownType = "max_" + breakdownType; + String minBreakdownType = "min_" + breakdownType; + String avgBreakdownType = "avg_" + breakdownType; + breakdown.put(maxBreakdownType, timeStatsMap.get(maxBreakdownType)); + breakdown.put(minBreakdownType, timeStatsMap.get(minBreakdownType)); + breakdown.put(avgBreakdownType, timeStatsMap.get(avgBreakdownType)/tree.size()); + } + + private void buildBreakdownStatsMap(Map statsMap, ProfileResult result, String breakdownType) { + String maxBreakdownType = "max_" + breakdownType; + String minBreakdownType = "min_" + breakdownType; + String avgBreakdownType = "avg_" + breakdownType; + statsMap.put(maxBreakdownType, Math.max(statsMap.getOrDefault(maxBreakdownType, Long.MIN_VALUE), result.getTimeBreakdown().get(breakdownType))); + statsMap.put(minBreakdownType, Math.min(statsMap.getOrDefault(minBreakdownType, Long.MAX_VALUE), result.getTimeBreakdown().get(breakdownType))); + statsMap.put(avgBreakdownType, statsMap.getOrDefault(avgBreakdownType, 0L) + result.getTimeBreakdown().get(breakdownType)); + } + + private Map> getSliceLevelAggregationMap(List tree) { + Map> sliceLevelAggregationMap = new HashMap<>(); + for (ProfileResult result : tree) { + String description = result.getLuceneDescription(); + List sliceLevelAggregationList = sliceLevelAggregationMap.getOrDefault(description, new LinkedList<>()); + sliceLevelAggregationList.add(result); + sliceLevelAggregationMap.put(description, sliceLevelAggregationList); } - return path; + return sliceLevelAggregationMap; } } diff --git a/server/src/main/java/org/opensearch/search/profile/aggregation/InternalAggregationProfileTree.java b/server/src/main/java/org/opensearch/search/profile/aggregation/InternalAggregationProfileTree.java index 36cfc53f41ccd..4b932b4c82985 100644 --- a/server/src/main/java/org/opensearch/search/profile/aggregation/InternalAggregationProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/aggregation/InternalAggregationProfileTree.java @@ -42,6 +42,12 @@ */ public class InternalAggregationProfileTree extends AbstractInternalProfileTree { + private final boolean concurrent; + + InternalAggregationProfileTree(boolean concurrent) { + this.concurrent = concurrent; + } + @Override protected AggregationProfileBreakdown createProfileBreakdown() { return new AggregationProfileBreakdown();