Skip to content

Commit

Permalink
Add XContent chunking to SearchResponse (#94736)
Browse files Browse the repository at this point in the history
This commit adds xcontent chunking to SearchResponse and MultiSearchResponse
by making SearchHits implement ChunkedToXContent.

Relates to #89838
  • Loading branch information
romseygeek authored May 12, 2023
1 parent 7710faa commit a3edf6b
Show file tree
Hide file tree
Showing 30 changed files with 259 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;

import java.util.List;

Expand All @@ -38,6 +38,6 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
SearchRequest searchRequest = new SearchRequest();
return channel -> client.execute(NoopSearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
return channel -> client.execute(NoopSearchAction.INSTANCE, searchRequest, new RestChunkedToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -133,7 +134,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

if (searchResponse != null) {
builder.field("response");
searchResponse.toXContent(builder, params);
ChunkedToXContent.wrapAsToXContent(searchResponse).toXContent(builder, params);
}
if (error != null) {
builder.startObject("error");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -101,16 +102,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
void innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (hasResponse()) {
response.innerToXContent(builder, params);
ChunkedToXContent.wrapAsToXContent(p -> response.innerToXContentChunked(p)).toXContent(builder, params);
} else {
// we can assume the template is always json as we convert it before compiling it
try (InputStream stream = source.streamInput()) {
builder.rawField(TEMPLATE_OUTPUT_FIELD.getPreferredName(), stream, XContentType.JSON);
}
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,12 +570,11 @@ public static void generateThrowableXContent(XContentBuilder builder, Params par
* This method is usually used when the {@link Exception} is rendered as a full XContent object, and its output can be parsed
* by the {@link #failureFromXContent(XContentParser)} method.
*/
public static void generateFailureXContent(XContentBuilder builder, Params params, @Nullable Exception e, boolean detailed)
public static XContentBuilder generateFailureXContent(XContentBuilder builder, Params params, @Nullable Exception e, boolean detailed)
throws IOException {
// No exception to render as an error
if (e == null) {
builder.field(ERROR, "unknown");
return;
return builder.field(ERROR, "unknown");
}

// Render the exception with a simple message
Expand All @@ -589,8 +588,7 @@ public static void generateFailureXContent(XContentBuilder builder, Params param
}
t = t.getCause();
}
builder.field(ERROR, message);
return;
return builder.field(ERROR, message);
}

// Render the exception with all details
Expand All @@ -606,7 +604,7 @@ public static void generateFailureXContent(XContentBuilder builder, Params param
builder.endArray();
}
generateThrowableXContent(builder, params, e);
builder.endObject();
return builder.endObject();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParser.Token;

Expand All @@ -34,7 +36,7 @@
/**
* A multi search response.
*/
public class MultiSearchResponse extends ActionResponse implements Iterable<MultiSearchResponse.Item>, ToXContentObject {
public class MultiSearchResponse extends ActionResponse implements Iterable<MultiSearchResponse.Item>, ChunkedToXContentObject {

private static final ParseField RESPONSES = new ParseField(Fields.RESPONSES);
private static final ParseField TOOK_IN_MILLIS = new ParseField("took");
Expand All @@ -52,7 +54,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
/**
* A search response item, holding the actual search response, or an error message if it failed.
*/
public static class Item implements Writeable {
public static class Item implements Writeable, ChunkedToXContent {
private final SearchResponse response;
private final Exception exception;

Expand Down Expand Up @@ -82,6 +84,25 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
if (isFailure()) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
Iterators.single((b, p) -> ElasticsearchException.generateFailureXContent(b, p, Item.this.getFailure(), true)),
Iterators.single((b, p) -> b.field(Fields.STATUS, ExceptionsHelper.status(Item.this.getFailure()).getStatus())),
ChunkedToXContentHelper.endObject()
);
} else {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
Item.this.getResponse().innerToXContentChunked(params),
Iterators.single((b, p) -> b.field(Fields.STATUS, Item.this.getResponse().status().getStatus())),
ChunkedToXContentHelper.endObject()
);
}
}

/**
* Is it a failed search?
*/
Expand Down Expand Up @@ -150,24 +171,14 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("took", tookInMillis);
builder.startArray(Fields.RESPONSES);
for (Item item : items) {
builder.startObject();
if (item.isFailure()) {
ElasticsearchException.generateFailureXContent(builder, params, item.getFailure(), true);
builder.field(Fields.STATUS, ExceptionsHelper.status(item.getFailure()).getStatus());
} else {
item.getResponse().innerToXContent(builder, params);
builder.field(Fields.STATUS, item.getResponse().status().getStatus());
}
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
Iterators.single((b, p) -> b.field("took", tookInMillis).startArray(Fields.RESPONSES)),
Iterators.flatMap(Iterators.forArray(items), item -> item.toXContentChunked(params)),
Iterators.single((b, p) -> b.endArray()),
ChunkedToXContentHelper.endObject()
);
}

public static MultiSearchResponse fromXContext(XContentParser parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -29,13 +31,15 @@
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParser.Token;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -47,7 +51,7 @@
/**
* A response of a search request.
*/
public class SearchResponse extends ActionResponse implements StatusToXContentObject {
public class SearchResponse extends ActionResponse implements ChunkedToXContentObject {

private static final ParseField SCROLL_ID = new ParseField("_scroll_id");
private static final ParseField POINT_IN_TIME_ID = new ParseField("pit_id");
Expand Down Expand Up @@ -129,7 +133,6 @@ public SearchResponse(
: "SearchResponse can't have both scrollId [" + scrollId + "] and searchContextId [" + pointInTimeId + "]";
}

@Override
public RestStatus status() {
return RestStatus.status(successfulShards, totalShards, shardFailures);
}
Expand Down Expand Up @@ -264,14 +267,23 @@ public Clusters getClusters() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder, params);
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
this.innerToXContentChunked(params),
ChunkedToXContentHelper.endObject()
);
}

public Iterator<? extends ToXContent> innerToXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.singleChunk(SearchResponse.this::headerToXContent),
Iterators.single(clusters),
internalResponse.toXContentChunked(params)
);
}

public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
public XContentBuilder headerToXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
if (scrollId != null) {
builder.field(SCROLL_ID.getPreferredName(), scrollId);
}
Expand All @@ -295,8 +307,6 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
getFailedShards(),
getShardFailures()
);
clusters.toXContent(builder, params);
internalResponse.toXContent(builder, params);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@

package org.elasticsearch.action.search;

import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.profile.SearchProfileResults;
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;

/**
Expand All @@ -29,7 +31,7 @@
* to parse aggregations into, which are not serializable. This is the common part that can be
* shared between core and client.
*/
public class SearchResponseSections implements ToXContentFragment {
public class SearchResponseSections implements ChunkedToXContent {

protected final SearchHits hits;
protected final Aggregations aggregations;
Expand Down Expand Up @@ -98,18 +100,33 @@ public final Map<String, SearchProfileShardResult> profile() {
}

@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
hits.toXContent(builder, params);
if (aggregations != null) {
aggregations.toXContent(builder, params);
}
if (suggest != null) {
suggest.toXContent(builder, params);
}
if (profileResults != null) {
profileResults.toXContent(builder, params);
}
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
Iterators.flatMap(Iterators.single(hits), r -> r.toXContentChunked(params)),
Iterators.single((ToXContent) (b, p) -> {
if (aggregations != null) {
aggregations.toXContent(b, p);
}
return b;
}),
Iterators.single((b, p) -> {
if (suggest != null) {
suggest.toXContent(b, p);
}
return b;
}),
Iterators.single((b, p) -> {
if (profileResults != null) {
profileResults.toXContent(b, p);
}
return b;
})
);
}

@Override
public boolean isFragment() {
return true;
}

protected void writeTo(StreamOutput out) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
*/
package org.elasticsearch.common.xcontent;

import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;

import java.util.Iterator;

/**
* Chunked equivalent of {@link org.elasticsearch.xcontent.ToXContentObject} that serializes as a full object.
*/
Expand All @@ -16,4 +21,20 @@ public interface ChunkedToXContentObject extends ChunkedToXContent {
default boolean isFragment() {
return false;
}

/**
* Wraps the given instance in a {@link ToXContentObject} that will fully serialize the instance when serialized.
*
* @param chunkedToXContent instance to wrap
* @return x-content instance
*/
static ToXContentObject wrapAsToXContentObject(ChunkedToXContentObject chunkedToXContent) {
return (builder, params) -> {
Iterator<? extends ToXContent> serialization = chunkedToXContent.toXContentChunked(params);
while (serialization.hasNext()) {
serialization.next().toXContent(builder, params);
}
return builder;
};
}
}
Loading

0 comments on commit a3edf6b

Please sign in to comment.