Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing for deep search path #12103

Merged
merged 7 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,19 @@ public SpanContext(Span span) {
Span getSpan() {
return span;
}

/**
* Sets the error for the current span behind this context
* @param cause error
*/
public void setError(final Exception cause) {
span.setError(cause);
}

/**
* Ends current span
*/
public void endSpan() {
reta marked this conversation as resolved.
Show resolved Hide resolved
span.endSpan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public SpanCreationContext attributes(Attributes attributes) {
}

/**
* Sets the parent for spann
* @param parent parent
* Sets the parent for span
* @param parent parent span context
* @return spanCreationContext
*/
public SpanCreationContext parent(SpanContext parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
public void onPhaseFailure(SearchPhaseContext context) {}
public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

Check warning on line 119 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java#L119

Added line #L119 was not covered by tests

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanCreationContext;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -116,6 +120,7 @@
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
private final SearchRequestContext searchRequestContext;
private final Tracer tracer;

private SearchPhase currentPhase;
private boolean currentPhaseHasLifecycle;
Expand All @@ -140,7 +145,8 @@
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
Tracer tracer
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -177,6 +183,7 @@
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestContext = searchRequestContext;
this.tracer = tracer;
}

@Override
Expand Down Expand Up @@ -221,6 +228,7 @@
null
)
);
onRequestEnd(searchRequestContext);
return;
}
executePhase(this);
Expand Down Expand Up @@ -454,15 +462,24 @@
}

private void executePhase(SearchPhase phase) {
try {
Span phaseSpan = tracer.startSpan(SpanCreationContext.server().name("[phase/" + phase.getName() + "]"));
dzane17 marked this conversation as resolved.
Show resolved Hide resolved
try (final SpanScope scope = tracer.withSpanInScope(phaseSpan)) {
onPhaseStart(phase);
phase.recordAndRun();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
}

if (currentPhaseHasLifecycle == false) {
phaseSpan.setError(e);

Check warning on line 475 in server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java#L475

Added line #L475 was not covered by tests
}

onPhaseFailure(phase, "", e);
} finally {
if (currentPhaseHasLifecycle == false) {
phaseSpan.endSpan();
}
}
}

Expand Down Expand Up @@ -727,7 +744,7 @@
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (currentPhaseHasLifecycle) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, cause);
}
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.MinAndMax;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.Comparator;
Expand Down Expand Up @@ -91,7 +92,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
Tracer tracer
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -112,7 +114,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.List;
Expand Down Expand Up @@ -77,7 +78,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
final Tracer tracer
) {
super(
SearchPhaseName.DFS_PRE_QUERY.getName(),
Expand All @@ -97,7 +99,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.Map;
Expand Down Expand Up @@ -82,7 +83,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
final Tracer tracer
) {
super(
SearchPhaseName.QUERY.getName(),
Expand All @@ -102,7 +104,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void setTotalHits(TotalHits totalHits) {
this.totalHits = totalHits;
}

TotalHits totalHits() {
public TotalHits totalHits() {
return totalHits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
@InternalApi
public abstract class SearchRequestOperationsListener {
private volatile boolean enabled;
public static final SearchRequestOperationsListener NOOP = new SearchRequestOperationsListener(false) {
reta marked this conversation as resolved.
Show resolved Hide resolved
@Override
protected void onPhaseStart(SearchPhaseContext context) {}

Check warning on line 27 in server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java#L27

Added line #L27 was not covered by tests

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

Check warning on line 30 in server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java#L30

Added line #L30 was not covered by tests

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

Check warning on line 33 in server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java#L33

Added line #L33 was not covered by tests
};

protected SearchRequestOperationsListener() {
this.enabled = true;
Expand All @@ -35,7 +45,7 @@

protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);

protected abstract void onPhaseFailure(SearchPhaseContext context);
protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);

protected void onRequestStart(SearchRequestContext searchRequestContext) {}

Expand Down Expand Up @@ -91,10 +101,10 @@
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseFailure(context);
listener.onPhaseFailure(context, cause);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {}
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

Check warning on line 143 in server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java#L143

Added line #L143 was not covered by tests

@Override
protected void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
}

Expand Down
Loading
Loading