Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert RestGetMappingAction to chunked encoding #89906

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestSimulateTemplateAction());

registerHandler.accept(new RestPutMappingAction());
registerHandler.accept(new RestGetMappingAction(threadPool));
registerHandler.accept(new RestGetMappingAction());
registerHandler.accept(new RestGetFieldMappingAction());

registerHandler.accept(new RestRefreshAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

import static org.elasticsearch.rest.BaseRestHandler.DEFAULT_INCLUDE_TYPE_NAME_POLICY;
import static org.elasticsearch.rest.BaseRestHandler.INCLUDE_TYPE_NAME_PARAMETER;

public class GetMappingsResponse extends ActionResponse implements ToXContentFragment {
public class GetMappingsResponse extends ActionResponse implements ToXContentFragment, ChunkedToXContent {

private static final ParseField MAPPINGS = new ParseField("mappings");

Expand Down Expand Up @@ -65,8 +67,8 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (final Map.Entry<String, MappingMetadata> indexEntry : getMappings().entrySet()) {
public Iterator<ToXContent> toXContentChunked() {
return getMappings().entrySet().stream().map(indexEntry -> (ToXContent) (builder, params) -> {
builder.startObject(indexEntry.getKey());
boolean includeTypeName = params.paramAsBoolean(INCLUDE_TYPE_NAME_PARAMETER, DEFAULT_INCLUDE_TYPE_NAME_POLICY);
if (builder.getRestApiVersion() == RestApiVersion.V_7 && includeTypeName && indexEntry.getValue() != null) {
Expand All @@ -83,8 +85,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject(MAPPINGS.getPreferredName()).endObject();
}
builder.endObject();
}
return builder;
return builder;
}).iterator();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,26 @@

package org.elasticsearch.rest.action.admin.indices;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.DispatchingRestToXContentListener;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.function.LongSupplier;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.HEAD;
Expand All @@ -40,11 +39,7 @@ public class RestGetMappingAction extends BaseRestHandler {
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in get mapping request is deprecated. "
+ "Use typeless api instead";

private final ThreadPool threadPool;

public RestGetMappingAction(ThreadPool threadPool) {
this.threadPool = threadPool;
}
public RestGetMappingAction() {}

@Override
public List<Route> routes() {
Expand Down Expand Up @@ -97,37 +92,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final HttpChannel httpChannel = request.getHttpChannel();
return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
.indices()
.getMappings(
getMappingsRequest,
new DispatchingRestToXContentListener<>(threadPool.executor(ThreadPool.Names.MANAGEMENT), channel, request).map(
getMappingsResponse -> new RestGetMappingsResponse(getMappingsResponse, threadPool::relativeTimeInMillis, timeout)
)
);
}

private static final class RestGetMappingsResponse implements ToXContentObject {
private final GetMappingsResponse response;
private final LongSupplier relativeTimeSupplierMillis;
private final TimeValue timeout;
private final long startTimeMs;

private RestGetMappingsResponse(GetMappingsResponse response, LongSupplier relativeTimeSupplierMillis, TimeValue timeout) {
this.response = response;
this.relativeTimeSupplierMillis = relativeTimeSupplierMillis;
this.timeout = timeout;
this.startTimeMs = relativeTimeSupplierMillis.getAsLong();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (relativeTimeSupplierMillis.getAsLong() - startTimeMs > timeout.millis()) {
throw new ElasticsearchTimeoutException("Timed out getting mappings");
}

builder.startObject();
response.toXContent(builder, params);
builder.endObject();
return builder;
}
.getMappings(getMappingsRequest, new RestActionListener<>(channel) {
@Override
protected void processResponse(GetMappingsResponse getMappingsResponse) throws Exception {
ensureOpen();
channel.sendResponse(
new RestResponse(
RestStatus.OK,
ChunkedRestResponseBody.fromXContent(
() -> Iterators.concat(
Iterators.single((b, p) -> b.startObject()),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure why the GetMappingsResponse is a fragment and we had the complicated wrapping here but didn't want to change it in this PR so I manually wrapped it for now.

getMappingsResponse.toXContentChunked(),
Iterators.single((b, p) -> b.endObject())
),
request,
channel
)
)
);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class GetMappingsResponseTests extends AbstractWireSerializingTestCase<GetMappingsResponse> {

Expand Down Expand Up @@ -63,6 +66,22 @@ protected GetMappingsResponse createTestInstance() {
return resp;
}

public void testChunkedXContentUsesChunkPerIndex() {
final int indexCount = randomIntBetween(1, 10);
final var response = new GetMappingsResponse(
IntStream.range(0, indexCount)
.mapToObj(i -> "index-" + i)
.collect(Collectors.toUnmodifiableMap(Function.identity(), k -> createMappingsForIndex()))
);
final var chunks = response.toXContentChunked();
int chunkCount = 0;
while (chunks.hasNext()) {
chunks.next();
chunkCount++;
}
assertEquals(indexCount, chunkCount);
}

// Not meant to be exhaustive
private static Map<String, Object> randomFieldMapping() {
Map<String, Object> mappings = new HashMap<>();
Expand Down