Skip to content

Commit

Permalink
Tracing for deep search path
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <davizane@amazon.com>
  • Loading branch information
dzane17 committed Jan 31, 2024
1 parent fb2c5f2 commit 7896045
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 89 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Tracing for deep search path ([#12099](https://github.com/opensearch-project/OpenSearch/pull/12099))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.transport.Transport;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -220,6 +221,7 @@ public final void start() {
null
)
);
searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchRequestContext);
return;
}
executePhase(this);
Expand Down Expand Up @@ -439,21 +441,23 @@ private void onPhaseEnd(SearchRequestContext searchRequestContext) {
}
}

void onPhaseStart(SearchPhase phase) {
void onPhaseStart(SearchPhase phase, SearchRequestContext searchRequestContext) {
setCurrentPhase(phase);
if (SearchPhaseName.isValidName(phase.getName())) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this, searchRequestContext);
}
}

private void onRequestEnd(SearchRequestContext searchRequestContext) {
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext);
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchRequestContext);
}

private void executePhase(SearchPhase phase) {
try {
onPhaseStart(phase);
phase.recordAndRun();
onPhaseStart(phase, searchRequestContext);
try (SpanScope spanScope = searchRequestContext.getTracer().withSpanInScope(searchRequestContext.getPhaseSpan())) {
phase.recordAndRun();
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
Expand Down Expand Up @@ -705,6 +709,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
searchContextId = null;
}
}
searchRequestContext.setSearchTask(getTask());
searchRequestContext.setTotalHits(internalSearchResponse.hits().getTotalHits());
searchRequestContext.setShardStats(results.getNumShards(), successfulOps.get(), skippedOps.get(), failures.length);
onPhaseEnd(searchRequestContext);
Expand All @@ -717,7 +722,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (SearchPhaseName.isValidName(phase.getName())) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, searchRequestContext);
}
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.noop.NoopSpan;
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.util.EnumMap;
import java.util.HashMap;
Expand All @@ -23,20 +27,45 @@
*/
@InternalApi
class SearchRequestContext {
private final SearchRequest searchRequest;
private SearchTask searchTask;
private final SearchRequestOperationsListener searchRequestOperationsListener;
private long absoluteStartNanos;
private final Map<String, Long> phaseTookMap;
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;
private final Tracer tracer;
private Span requestSpan;
private Span phaseSpan;

private final SearchRequest searchRequest;
/**
* This constructor is for testing only
*/
SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, SearchRequest searchRequest) {
this(searchRequestOperationsListener, searchRequest, NoopTracer.INSTANCE);
}

SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) {
SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, SearchRequest searchRequest, Tracer tracer) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.searchRequest = searchRequest;
this.tracer = tracer;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.searchRequest = searchRequest;
this.requestSpan = NoopSpan.INSTANCE;
this.phaseSpan = NoopSpan.INSTANCE;
}

SearchRequest getSearchRequest() {
return searchRequest;
}

void setSearchTask(SearchTask searchTask) {
this.searchTask = searchTask;
}

SearchTask getSearchTask() {
return searchTask;
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
Expand Down Expand Up @@ -107,6 +136,26 @@ String formattedShardStats() {
);
}
}

Tracer getTracer() {
return tracer;
}

void setRequestSpan(Span requestSpan) {
this.requestSpan = requestSpan;
}

Span getRequestSpan() {
return requestSpan;
}

void setPhaseSpan(Span phaseSpan) {
this.phaseSpan = phaseSpan;
}

Span getPhaseSpan() {
return phaseSpan;
}
}

enum ShardStatsFieldNames {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.telemetry.tracing.AttributeNames;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanContext;
import org.opensearch.telemetry.tracing.Tracer;

import static org.opensearch.core.common.Strings.capitalize;

/**
* Listener for search request tracing on the coordinator node
*
* @opensearch.internal
*/
public final class SearchRequestCoordinatorTrace extends SearchRequestOperationsListener {
private final Tracer tracer;

public SearchRequestCoordinatorTrace(Tracer tracer) {
this.tracer = tracer;
}

@Override
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.setPhaseSpan(
tracer.startSpan(
SpanBuilder.from(
"coord" + capitalize(context.getCurrentPhase().getName()),
new SpanContext(searchRequestContext.getRequestSpan())
)
)
);
}

@Override
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.getPhaseSpan().endSpan();
}

@Override
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.getPhaseSpan().endSpan();
}

@Override
void onRequestEnd(SearchRequestContext searchRequestContext) {
Span requestSpan = searchRequestContext.getRequestSpan();

// add response-related attributes on request end
requestSpan.addAttribute(
AttributeNames.TOTAL_HITS,
searchRequestContext.totalHits() == null ? "0" : searchRequestContext.totalHits().toString()
);
requestSpan.addAttribute(
AttributeNames.SHARDS,
searchRequestContext.formattedShardStats().isEmpty() ? "null" : searchRequestContext.formattedShardStats()
);
requestSpan.addAttribute(
AttributeNames.SOURCE,
searchRequestContext.getSearchRequest().source() == null ? "null" : searchRequestContext.getSearchRequest().source().toString()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}

abstract void onPhaseStart(SearchPhaseContext context);
abstract void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext);

abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);

abstract void onPhaseFailure(SearchPhaseContext context);
abstract void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext);

void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}
void onRequestEnd(SearchRequestContext searchRequestContext) {}

boolean isEnabled(SearchRequest searchRequest) {
return isEnabled();
Expand Down Expand Up @@ -69,10 +69,10 @@ static final class CompositeListener extends SearchRequestOperationsListener {
}

@Override
void onPhaseStart(SearchPhaseContext context) {
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseStart(context);
listener.onPhaseStart(context, searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e);
}
Expand All @@ -91,10 +91,10 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo
}

@Override
void onPhaseFailure(SearchPhaseContext context) {
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseFailure(context);
listener.onPhaseFailure(context, searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
}
Expand All @@ -113,10 +113,10 @@ void onRequestStart(SearchRequestContext searchRequestContext) {
}

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
public void onRequestEnd(SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onRequestEnd(context, searchRequestContext);
listener.onRequestEnd(searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onRequestEnd listener [{}] failed", listener), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,29 +134,29 @@ public SearchRequestSlowLog(ClusterService clusterService) {
}

@Override
void onPhaseStart(SearchPhaseContext context) {}
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onPhaseFailure(SearchPhaseContext context) {}
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
void onRequestEnd(SearchRequestContext searchRequestContext) {
long tookInNanos = System.nanoTime() - searchRequestContext.getAbsoluteStartNanos();

if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) {
logger.warn(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext));
logger.warn(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext));
} else if (infoThreshold >= 0 && tookInNanos > infoThreshold && level.isLevelEnabledFor(SlowLogLevel.INFO)) {
logger.info(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext));
logger.info(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext));
} else if (debugThreshold >= 0 && tookInNanos > debugThreshold && level.isLevelEnabledFor(SlowLogLevel.DEBUG)) {
logger.debug(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext));
logger.debug(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext));
} else if (traceThreshold >= 0 && tookInNanos > traceThreshold && level.isLevelEnabledFor(SlowLogLevel.TRACE)) {
logger.trace(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext));
logger.trace(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext));
}
}

Expand All @@ -167,15 +167,11 @@ void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequest
*/
static final class SearchRequestSlowLogMessage extends OpenSearchLogMessage {

SearchRequestSlowLogMessage(SearchPhaseContext context, long tookInNanos, SearchRequestContext searchRequestContext) {
super(prepareMap(context, tookInNanos, searchRequestContext), message(context, tookInNanos, searchRequestContext));
SearchRequestSlowLogMessage(long tookInNanos, SearchRequestContext searchRequestContext) {
super(prepareMap(tookInNanos, searchRequestContext), message(tookInNanos, searchRequestContext));
}

private static Map<String, Object> prepareMap(
SearchPhaseContext context,
long tookInNanos,
SearchRequestContext searchRequestContext
) {
private static Map<String, Object> prepareMap(long tookInNanos, SearchRequestContext searchRequestContext) {
final Map<String, Object> messageFields = new HashMap<>();
messageFields.put("took", TimeValue.timeValueNanos(tookInNanos));
messageFields.put("took_millis", TimeUnit.NANOSECONDS.toMillis(tookInNanos));
Expand All @@ -185,22 +181,24 @@ private static Map<String, Object> prepareMap(
} else {
messageFields.put("total_hits", "-1");
}
messageFields.put("search_type", context.getRequest().searchType());
messageFields.put("search_type", searchRequestContext.getSearchRequest().searchType());
messageFields.put("shards", searchRequestContext.formattedShardStats());

if (context.getRequest().source() != null) {
String source = escapeJson(context.getRequest().source().toString(FORMAT_PARAMS));
if (searchRequestContext.getSearchRequest().source() != null) {
String source = escapeJson(searchRequestContext.getSearchRequest().source().toString(FORMAT_PARAMS));
messageFields.put("source", source);
} else {
messageFields.put("source", "{}");
}

messageFields.put("id", context.getTask().getHeader(Task.X_OPAQUE_ID));
if (searchRequestContext.getSearchTask() != null) {
messageFields.put("id", searchRequestContext.getSearchTask().getHeader(Task.X_OPAQUE_ID));
} else {
messageFields.put("id", "");
}
return messageFields;
}

// Message will be used in plaintext logs
private static String message(SearchPhaseContext context, long tookInNanos, SearchRequestContext searchRequestContext) {
private static String message(long tookInNanos, SearchRequestContext searchRequestContext) {
final StringBuilder sb = new StringBuilder();
sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], ");
sb.append("took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)).append("], ");
Expand All @@ -210,15 +208,15 @@ private static String message(SearchPhaseContext context, long tookInNanos, Sear
} else {
sb.append("total_hits[-1]");
}
sb.append("search_type[").append(context.getRequest().searchType()).append("], ");
sb.append("search_type[").append(searchRequestContext.getSearchRequest().searchType()).append("], ");
sb.append("shards[").append(searchRequestContext.formattedShardStats()).append("], ");
if (context.getRequest().source() != null) {
sb.append("source[").append(context.getRequest().source().toString(FORMAT_PARAMS)).append("], ");
if (searchRequestContext.getSearchRequest().source() != null) {
sb.append("source[").append(searchRequestContext.getSearchRequest().source().toString(FORMAT_PARAMS)).append("], ");
} else {
sb.append("source[], ");
}
if (context.getTask().getHeader(Task.X_OPAQUE_ID) != null) {
sb.append("id[").append(context.getTask().getHeader(Task.X_OPAQUE_ID)).append("]");
if (searchRequestContext.getSearchTask() != null) {
sb.append("id[").append(searchRequestContext.getSearchTask().getHeader(Task.X_OPAQUE_ID)).append("]");
} else {
sb.append("id[]");
}
Expand Down
Loading

0 comments on commit 7896045

Please sign in to comment.