Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for aggregation profiler with concurrent aggregation #8801

Merged
merged 4 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Exclude 'benchmarks' from codecov report ([#8805](https://github.com/opensearch-project/OpenSearch/pull/8805))
- [Refactor] MediaTypeParser to MediaTypeParserRegistry ([#8636](https://github.com/opensearch-project/OpenSearch/pull/8636))
- Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807))
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))

### Deprecated

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
context.minimumScore(source.minScore());
}
if (source.profile()) {
context.setProfilers(new Profilers(context.searcher()));
context.setProfilers(new Profilers(context.searcher(), context.isConcurrentSegmentSearchEnabled()));
}
if (source.timeout() != null) {
context.timeout(source.timeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ public abstract class AbstractProfileBreakdown<T extends Enum<T>> {
/**
* The accumulated timings for this query node
*/
private final Timer[] timings;
private final T[] timingTypes;
protected final Timer[] timings;
protected final T[] timingTypes;
public static final String TIMING_TYPE_COUNT_SUFFIX = "_count";
public static final String TIMING_TYPE_START_TIME_SUFFIX = "_start_time";

/** Sole constructor. */
public AbstractProfileBreakdown(Class<T> clazz) {
Expand All @@ -74,17 +76,10 @@ public void setTimer(T timing, Timer timer) {
* Build a timing count breakdown for current instance
*/
public Map<String, Long> toBreakdownMap() {
return buildBreakdownMap(this);
}

/**
* Build a timing count breakdown for arbitrary instance
*/
protected final Map<String, Long> buildBreakdownMap(AbstractProfileBreakdown<T> breakdown) {
Map<String, Long> map = new HashMap<>(breakdown.timings.length * 2);
for (T timingType : breakdown.timingTypes) {
map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType.toString() + "_count", breakdown.timings[timingType.ordinal()].getCount());
Map<String, Long> map = new HashMap<>(this.timings.length * 3);
for (T timingType : this.timingTypes) {
map.put(timingType.toString(), this.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, this.timings[timingType.ordinal()].getCount());
}
return Collections.unmodifiableMap(map);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.search.profile;

import org.opensearch.Version;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -44,8 +45,10 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

Expand All @@ -69,14 +72,23 @@ public final class ProfileResult implements Writeable, ToXContentObject {
static final ParseField BREAKDOWN = new ParseField("breakdown");
static final ParseField DEBUG = new ParseField("debug");
static final ParseField NODE_TIME = new ParseField("time");
static final ParseField MAX_SLICE_NODE_TIME = new ParseField("max_slice_time");
static final ParseField MIN_SLICE_NODE_TIME = new ParseField("min_slice_time");
static final ParseField AVG_SLICE_NODE_TIME = new ParseField("avg_slice_time");
static final ParseField NODE_TIME_RAW = new ParseField("time_in_nanos");
static final ParseField MAX_SLICE_NODE_TIME_RAW = new ParseField("max_slice_time_in_nanos");
static final ParseField MIN_SLICE_NODE_TIME_RAW = new ParseField("min_slice_time_in_nanos");
static final ParseField AVG_SLICE_NODE_TIME_RAW = new ParseField("avg_slice_time_in_nanos");
static final ParseField CHILDREN = new ParseField("children");

private final String type;
private final String description;
private final Map<String, Long> breakdown;
private final Map<String, Object> debug;
private final long nodeTime;
private Long maxSliceNodeTime;
private Long minSliceNodeTime;
private Long avgSliceNodeTime;
private final List<ProfileResult> children;

public ProfileResult(
Expand All @@ -86,13 +98,30 @@ public ProfileResult(
Map<String, Object> debug,
long nodeTime,
List<ProfileResult> children
) {
this(type, description, breakdown, debug, nodeTime, children, null, null, null);
}

public ProfileResult(
String type,
String description,
Map<String, Long> breakdown,
Map<String, Object> debug,
long nodeTime,
List<ProfileResult> children,
Long maxSliceNodeTime,
Long minSliceNodeTime,
Long avgSliceNodeTime
) {
this.type = type;
this.description = description;
this.breakdown = Objects.requireNonNull(breakdown, "required breakdown argument missing");
this.debug = debug == null ? Map.of() : debug;
this.children = children == null ? List.of() : children;
this.nodeTime = nodeTime;
this.maxSliceNodeTime = maxSliceNodeTime;
this.minSliceNodeTime = minSliceNodeTime;
this.avgSliceNodeTime = avgSliceNodeTime;
}

/**
Expand All @@ -105,6 +134,15 @@ public ProfileResult(StreamInput in) throws IOException {
breakdown = in.readMap(StreamInput::readString, StreamInput::readLong);
debug = in.readMap(StreamInput::readString, StreamInput::readGenericValue);
children = in.readList(ProfileResult::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.maxSliceNodeTime = in.readOptionalLong();
this.minSliceNodeTime = in.readOptionalLong();
this.avgSliceNodeTime = in.readOptionalLong();
} else {
this.maxSliceNodeTime = null;
this.minSliceNodeTime = null;
this.avgSliceNodeTime = null;
}
}

@Override
Expand All @@ -115,6 +153,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(breakdown, StreamOutput::writeString, StreamOutput::writeLong);
out.writeMap(debug, StreamOutput::writeString, StreamOutput::writeGenericValue);
out.writeList(children);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalLong(maxSliceNodeTime);
out.writeOptionalLong(minSliceNodeTime);
out.writeOptionalLong(avgSliceNodeTime);
}
}

/**
Expand Down Expand Up @@ -154,6 +197,18 @@ public long getTime() {
return nodeTime;
}

public Long getMaxSliceTime() {
return maxSliceNodeTime;
}

public Long getMinSliceTime() {
return minSliceNodeTime;
}

public Long getAvgSliceTime() {
return avgSliceNodeTime;
}

/**
* Returns a list of all profiled children queries
*/
Expand All @@ -168,9 +223,27 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(DESCRIPTION.getPreferredName(), description);
if (builder.humanReadable()) {
builder.field(NODE_TIME.getPreferredName(), new TimeValue(getTime(), TimeUnit.NANOSECONDS).toString());
if (getMaxSliceTime() != null) {
builder.field(MAX_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMaxSliceTime(), TimeUnit.NANOSECONDS).toString());
}
if (getMinSliceTime() != null) {
builder.field(MIN_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getMinSliceTime(), TimeUnit.NANOSECONDS).toString());
}
if (getAvgSliceTime() != null) {
builder.field(AVG_SLICE_NODE_TIME.getPreferredName(), new TimeValue(getAvgSliceTime(), TimeUnit.NANOSECONDS).toString());
}
}
builder.field(NODE_TIME_RAW.getPreferredName(), getTime());
builder.field(BREAKDOWN.getPreferredName(), breakdown);
if (getMaxSliceTime() != null) {
builder.field(MAX_SLICE_NODE_TIME_RAW.getPreferredName(), getMaxSliceTime());
}
if (getMinSliceTime() != null) {
builder.field(MIN_SLICE_NODE_TIME_RAW.getPreferredName(), getMinSliceTime());
}
if (getAvgSliceTime() != null) {
builder.field(AVG_SLICE_NODE_TIME_RAW.getPreferredName(), getAvgSliceTime());
}
createBreakdownView(builder);
reta marked this conversation as resolved.
Show resolved Hide resolved
if (false == debug.isEmpty()) {
builder.field(DEBUG.getPreferredName(), debug);
}
Expand All @@ -186,6 +259,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder.endObject();
}

private void createBreakdownView(XContentBuilder builder) throws IOException {
Map<String, Long> modifiedBreakdown = new LinkedHashMap<>(breakdown);
removeStartTimeFields(modifiedBreakdown);
builder.field(BREAKDOWN.getPreferredName(), modifiedBreakdown);
}

static void removeStartTimeFields(Map<String, Long> modifiedBreakdown) {
Iterator<Map.Entry<String, Long>> iterator = modifiedBreakdown.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
if (entry.getKey().endsWith(AbstractProfileBreakdown.TIMING_TYPE_START_TIME_SUFFIX)) {
iterator.remove();
}
}
}

private static final InstantiatingObjectParser<ProfileResult, Void> PARSER;
static {
InstantiatingObjectParser.Builder<ProfileResult, Void> parser = InstantiatingObjectParser.builder(
Expand All @@ -199,6 +288,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
parser.declareObject(optionalConstructorArg(), (p, c) -> p.map(), DEBUG);
parser.declareLong(constructorArg(), NODE_TIME_RAW);
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> fromXContent(p), CHILDREN);
parser.declareLong(optionalConstructorArg(), MAX_SLICE_NODE_TIME_RAW);
parser.declareLong(optionalConstructorArg(), MIN_SLICE_NODE_TIME_RAW);
parser.declareLong(optionalConstructorArg(), AVG_SLICE_NODE_TIME_RAW);
PARSER = parser.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.profile.aggregation.AggregationProfiler;
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
import org.opensearch.search.profile.query.QueryProfiler;

import java.util.ArrayList;
Expand All @@ -50,18 +51,20 @@ public final class Profilers {
private final ContextIndexSearcher searcher;
private final List<QueryProfiler> queryProfilers;
private final AggregationProfiler aggProfiler;
private final boolean isConcurrentSegmentSearchEnabled;

/** Sole constructor. This {@link Profilers} instance will initially wrap one {@link QueryProfiler}. */
public Profilers(ContextIndexSearcher searcher) {
public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearchEnabled) {
this.searcher = searcher;
this.isConcurrentSegmentSearchEnabled = isConcurrentSegmentSearchEnabled;
this.queryProfilers = new ArrayList<>();
this.aggProfiler = new AggregationProfiler();
this.aggProfiler = isConcurrentSegmentSearchEnabled ? new ConcurrentAggregationProfiler() : new AggregationProfiler();
addQueryProfiler();
}

/** Switch to a new profile. */
public QueryProfiler addQueryProfiler() {
QueryProfiler profiler = new QueryProfiler(searcher.getExecutor() != null);
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
searcher.setProfiler(profiler);
queryProfilers.add(profiler);
return profiler;
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/search/profile/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
public class Timer {

private boolean doTiming;
private long timing, count, lastCount, start;
private long timing, count, lastCount, start, earliestTimerStartTime;

/** pkg-private for testing */
long nanoTime() {
Expand All @@ -71,6 +71,9 @@ public final void start() {
doTiming = (count - lastCount) >= Math.min(lastCount >>> 8, 1024);
if (doTiming) {
start = nanoTime();
if (count == 0) {
earliestTimerStartTime = start;
}
}
count++;
}
Expand All @@ -92,6 +95,14 @@ public final long getCount() {
return count;
}

/** Return the timer start time in nanoseconds.*/
public final long getEarliestTimerStartTime() {
if (start != 0) {
throw new IllegalStateException("#start call misses a matching #stop call");
}
return earliestTimerStartTime;
}

/** Return an approximation of the total time spent between consecutive calls of #start and #stop. */
public final long getApproximateTiming() {
if (start != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.search.profile.AbstractProfileBreakdown;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -62,4 +63,18 @@ public void addDebugInfo(String key, Object value) {
protected Map<String, Object> toDebugMap() {
return unmodifiableMap(extra);
}

/**
* Build a timing count startTime breakdown for aggregation timing types
*/
@Override
public Map<String, Long> toBreakdownMap() {
Map<String, Long> map = new HashMap<>(timings.length * 3);
for (AggregationTimingType timingType : timingTypes) {
map.put(timingType.toString(), timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, timings[timingType.ordinal()].getCount());
map.put(timingType + TIMING_TYPE_START_TIME_SUFFIX, timings[timingType.ordinal()].getEarliestTimerStartTime());
}
return Collections.unmodifiableMap(map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.opensearch.search.profile.AbstractProfiler;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -47,29 +45,25 @@
*/
public class AggregationProfiler extends AbstractProfiler<AggregationProfileBreakdown, Aggregator> {

private final Map<List<String>, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>();
private final Map<Aggregator, AggregationProfileBreakdown> profileBreakdownLookup = new HashMap<>();

public AggregationProfiler() {
super(new InternalAggregationProfileTree());
}

/**
* This method does not need to be thread safe for concurrent search use case as well.
* The {@link AggregationProfileBreakdown} for each Aggregation operator is created in sync path when
* {@link org.opensearch.search.aggregations.BucketCollector#preCollection()} is called
* on the Aggregation collector instances during construction.
*/
@Override
public AggregationProfileBreakdown getQueryBreakdown(Aggregator agg) {
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved
List<String> path = getAggregatorPath(agg);
AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(path);
AggregationProfileBreakdown aggregationProfileBreakdown = profileBreakdownLookup.get(agg);
if (aggregationProfileBreakdown == null) {
aggregationProfileBreakdown = super.getQueryBreakdown(agg);
profileBreakdownLookup.put(path, aggregationProfileBreakdown);
profileBreakdownLookup.put(agg, aggregationProfileBreakdown);
}
return aggregationProfileBreakdown;
}

public static List<String> getAggregatorPath(Aggregator agg) {
LinkedList<String> path = new LinkedList<>();
while (agg != null) {
path.addFirst(agg.name());
agg = agg.parent();
}
return path;
}
}
Loading