Skip to content

Commit

Permalink
Add verbose pipeline parameter to output each processor's execution d…
Browse files Browse the repository at this point in the history
…etails

Junwei Dai <junweid@amazon.com>
  • Loading branch information
Junwei Dai committed Dec 12, 2024
1 parent 5aa6509 commit e0aa8a1
Show file tree
Hide file tree
Showing 13 changed files with 470 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
Expand Down Expand Up @@ -394,6 +395,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
List<ShardSearchFailure> failures = new ArrayList<>();
Clusters clusters = Clusters.EMPTY;
List<SearchExtBuilder> extBuilders = new ArrayList<>();
List<ProcessorExecutionDetail> processorResult = new ArrayList<>();
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
Expand Down Expand Up @@ -530,7 +532,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
terminatedEarly,
profile,
numReducePhases,
extBuilders
extBuilders,
processorResult
);
return new SearchResponse(
searchResponseSections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
Expand All @@ -65,7 +66,6 @@
public class SearchResponseSections implements ToXContentFragment {

public static final ParseField EXT_FIELD = new ParseField("ext");

protected final SearchHits hits;
protected final Aggregations aggregations;
protected final Suggest suggest;
Expand All @@ -74,6 +74,7 @@ public class SearchResponseSections implements ToXContentFragment {
protected final Boolean terminatedEarly;
protected final int numReducePhases;
protected final List<SearchExtBuilder> searchExtBuilders = new ArrayList<>();
protected final List<ProcessorExecutionDetail> processorResult = new ArrayList<>();

public SearchResponseSections(
SearchHits hits,
Expand All @@ -84,7 +85,17 @@ public SearchResponseSections(
SearchProfileShardResults profileResults,
int numReducePhases
) {
this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList());
this(
hits,
aggregations,
suggest,
timedOut,
terminatedEarly,
profileResults,
numReducePhases,
Collections.emptyList(),
Collections.emptyList()
);
}

public SearchResponseSections(
Expand All @@ -95,7 +106,8 @@ public SearchResponseSections(
Boolean terminatedEarly,
SearchProfileShardResults profileResults,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilders
List<SearchExtBuilder> searchExtBuilders,
List<ProcessorExecutionDetail> processorResult
) {
this.hits = hits;
this.aggregations = aggregations;
Expand All @@ -104,6 +116,7 @@ public SearchResponseSections(
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
this.processorResult.addAll(processorResult);
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
}

Expand Down Expand Up @@ -166,13 +179,21 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
}
builder.endObject();
}

if (!processorResult.isEmpty()) {
builder.field("processor_result", processorResult);
}
return builder;
}

public List<SearchExtBuilder> getSearchExtBuilders() {
return Collections.unmodifiableList(this.searchExtBuilders);
}

public List<ProcessorExecutionDetail> getProcessorResult() {
return processorResult;
}

protected void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
if (request.hasParam("timeout")) {
searchSourceBuilder.timeout(request.paramAsTime("timeout", null));
}
if (request.hasParam("verbose_pipeline")) {
searchSourceBuilder.verbosePipeline(request.paramAsBoolean("verbose_pipeline", false));
}
if (request.hasParam("terminate_after")) {
int terminateAfter = request.paramAsInt("terminate_after", SearchContext.DEFAULT_TERMINATE_AFTER);
if (terminateAfter < 0) {
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchHits.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.search.TotalHits.Relation;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -166,6 +167,22 @@ public SearchHit[] getHits() {
return this.hits;
}

/**
* Creates a deep copy of this SearchHits instance.
*
* @return a deep copy of the current SearchHits object
* @throws IOException if an I/O exception occurs during serialization or deserialization
*/
public SearchHits deepCopy() throws IOException {
try (BytesStreamOutput out = new BytesStreamOutput()) {
this.writeTo(out);

try (StreamInput in = out.bytes().streamInput()) {
return new SearchHits(in);
}
}
}

/**
* Return the hit as the provided position.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
public static final ParseField SLICE = new ParseField("slice");
public static final ParseField POINT_IN_TIME = new ParseField("pit");
public static final ParseField SEARCH_PIPELINE = new ParseField("search_pipeline");
public static final ParseField VERBOSE_SEARCH_PIPELINE = new ParseField("verbose_pipeline");

public static SearchSourceBuilder fromXContent(XContentParser parser) throws IOException {
return fromXContent(parser, true);
Expand Down Expand Up @@ -226,6 +227,8 @@ public static HighlightBuilder highlight() {

private String searchPipeline;

private boolean verbosePipeline;

/**
* Constructs a new search source builder.
*/
Expand Down Expand Up @@ -301,6 +304,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
}
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
searchPipeline = in.readOptionalString();
verbosePipeline = in.readBoolean();
}
}

Expand Down Expand Up @@ -384,6 +388,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeOptionalString(searchPipeline);
out.writeOptionalBoolean(verbosePipeline);
}
}

Expand Down Expand Up @@ -1142,6 +1147,26 @@ public SearchSourceBuilder pipeline(String searchPipeline) {
return this;
}

/**
* Enables or disables verbose mode for the search pipeline.
*
* When verbose mode is enabled, detailed information about each processor
* in the search pipeline is included in the search response. This includes
* the processor name, execution status, input, output, and time taken for processing.
*
* This parameter is primarily intended for debugging purposes, allowing users
* to track how data flows and transforms through the search pipeline.
*
*/
public SearchSourceBuilder verbosePipeline(boolean verbosePipeline) {
this.verbosePipeline = verbosePipeline;
return this;
}

public Boolean verbosePipeline() {
return verbosePipeline;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
Expand Down Expand Up @@ -1240,6 +1265,7 @@ private SearchSourceBuilder shallowCopy(
rewrittenBuilder.derivedFieldsObject = derivedFieldsObject;
rewrittenBuilder.derivedFields = derivedFields;
rewrittenBuilder.searchPipeline = searchPipeline;
rewrittenBuilder.verbosePipeline = verbosePipeline;
return rewrittenBuilder;
}

Expand Down Expand Up @@ -1309,6 +1335,8 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
profile = parser.booleanValue();
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipeline = parser.text();
} else if (VERBOSE_SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
verbosePipeline = parser.booleanValue();
} else {
throw new ParsingException(
parser.getTokenLocation(),
Expand Down Expand Up @@ -1920,7 +1948,8 @@ public int hashCode() {
pointInTimeBuilder,
derivedFieldsObject,
derivedFields,
searchPipeline
searchPipeline,
verbosePipeline
);
}

Expand Down Expand Up @@ -1966,7 +1995,8 @@ public boolean equals(Object obj) {
&& Objects.equals(pointInTimeBuilder, other.pointInTimeBuilder)
&& Objects.equals(derivedFieldsObject, other.derivedFieldsObject)
&& Objects.equals(derivedFields, other.derivedFields)
&& Objects.equals(searchPipeline, other.searchPipeline);
&& Objects.equals(searchPipeline, other.searchPipeline)
&& Objects.equals(verbosePipeline, other.verbosePipeline);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;

Expand Down Expand Up @@ -73,7 +74,17 @@ public InternalSearchResponse(
Boolean terminatedEarly,
int numReducePhases
) {
this(hits, aggregations, suggest, profileResults, timedOut, terminatedEarly, numReducePhases, Collections.emptyList());
this(
hits,
aggregations,
suggest,
profileResults,
timedOut,
terminatedEarly,
numReducePhases,
Collections.emptyList(),
Collections.emptyList()
);
}

public InternalSearchResponse(
Expand All @@ -84,9 +95,20 @@ public InternalSearchResponse(
boolean timedOut,
Boolean terminatedEarly,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilderList
List<SearchExtBuilder> searchExtBuilderList,
List<ProcessorExecutionDetail> processorResult
) {
super(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilderList);
super(
hits,
aggregations,
suggest,
timedOut,
terminatedEarly,
profileResults,
numReducePhases,
searchExtBuilderList,
processorResult
);
}

public InternalSearchResponse(StreamInput in) throws IOException {
Expand All @@ -98,7 +120,8 @@ public InternalSearchResponse(StreamInput in) throws IOException {
in.readOptionalBoolean(),
in.readOptionalWriteable(SearchProfileShardResults::new),
in.readVInt(),
readSearchExtBuildersOnOrAfter(in)
readSearchExtBuildersOnOrAfter(in),
readProcessorResultOnOrAfter(in)
);
}

Expand All @@ -112,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(profileResults);
out.writeVInt(numReducePhases);
writeSearchExtBuildersOnOrAfter(out, searchExtBuilders);
writeProcessorResultOnOrAfter(out, processorResult);
}

private static List<SearchExtBuilder> readSearchExtBuildersOnOrAfter(StreamInput in) throws IOException {
Expand All @@ -123,4 +147,15 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List<Searc
out.writeNamedWriteableList(searchExtBuilders);
}
}

private static List<ProcessorExecutionDetail> readProcessorResultOnOrAfter(StreamInput in) throws IOException {
return (in.getVersion().onOrAfter(Version.V_2_18_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList();
}

private static void writeProcessorResultOnOrAfter(StreamOutput out, List<ProcessorExecutionDetail> processorResult) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeCollection(processorResult, (o, detail) -> detail.writeTo(o));
}
}

}
Loading

0 comments on commit e0aa8a1

Please sign in to comment.