diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index b562a9586ec78..880b30b9478ee 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -793,7 +793,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestSimulateTemplateAction()); registerHandler.accept(new RestPutMappingAction()); - registerHandler.accept(new RestGetMappingAction(threadPool)); + registerHandler.accept(new RestGetMappingAction()); registerHandler.accept(new RestGetFieldMappingAction()); registerHandler.accept(new RestRefreshAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponse.java index c49ba18792875..531a9ac42d85f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponse.java @@ -14,19 +14,21 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentFragment; -import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Iterator; import java.util.Map; import static org.elasticsearch.rest.BaseRestHandler.DEFAULT_INCLUDE_TYPE_NAME_POLICY; import static org.elasticsearch.rest.BaseRestHandler.INCLUDE_TYPE_NAME_PARAMETER; -public class GetMappingsResponse extends ActionResponse implements ToXContentFragment { +public class GetMappingsResponse extends ActionResponse implements ToXContentFragment, ChunkedToXContent { private static final ParseField MAPPINGS = new ParseField("mappings"); @@ -65,8 +67,8 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - for (final Map.Entry indexEntry : getMappings().entrySet()) { + public Iterator toXContentChunked() { + return getMappings().entrySet().stream().map(indexEntry -> (ToXContent) (builder, params) -> { builder.startObject(indexEntry.getKey()); boolean includeTypeName = params.paramAsBoolean(INCLUDE_TYPE_NAME_PARAMETER, DEFAULT_INCLUDE_TYPE_NAME_POLICY); if (builder.getRestApiVersion() == RestApiVersion.V_7 && includeTypeName && indexEntry.getValue() != null) { @@ -83,8 +85,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(MAPPINGS.getPreferredName()).endObject(); } builder.endObject(); - } - return builder; + return builder; + }).iterator(); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java index 95f0a6ad2bd50..155739e639dbc 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java @@ -8,27 +8,26 @@ package org.elasticsearch.rest.action.admin.indices; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.core.TimeValue; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.ChunkedRestResponseBody; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.DispatchingRestToXContentListener; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestActionListener; import org.elasticsearch.rest.action.RestCancellableNodeClient; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xcontent.ToXContentObject; -import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.List; -import java.util.function.LongSupplier; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.HEAD; @@ -40,11 +39,7 @@ public class RestGetMappingAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in get mapping request is deprecated. " + "Use typeless api instead"; - private final ThreadPool threadPool; - - public RestGetMappingAction(ThreadPool threadPool) { - this.threadPool = threadPool; - } + public RestGetMappingAction() {} @Override public List routes() { @@ -97,37 +92,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final HttpChannel httpChannel = request.getHttpChannel(); return channel -> new RestCancellableNodeClient(client, httpChannel).admin() .indices() - .getMappings( - getMappingsRequest, - new DispatchingRestToXContentListener<>(threadPool.executor(ThreadPool.Names.MANAGEMENT), channel, request).map( - getMappingsResponse -> new RestGetMappingsResponse(getMappingsResponse, threadPool::relativeTimeInMillis, timeout) - ) - ); - } - - private static final class RestGetMappingsResponse implements ToXContentObject { - private final GetMappingsResponse response; - private final LongSupplier relativeTimeSupplierMillis; - private final TimeValue timeout; - private final long startTimeMs; - - private RestGetMappingsResponse(GetMappingsResponse response, LongSupplier relativeTimeSupplierMillis, TimeValue timeout) { - this.response = response; - this.relativeTimeSupplierMillis = relativeTimeSupplierMillis; - this.timeout = timeout; - this.startTimeMs = relativeTimeSupplierMillis.getAsLong(); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - if (relativeTimeSupplierMillis.getAsLong() - startTimeMs > timeout.millis()) { - throw new ElasticsearchTimeoutException("Timed out getting mappings"); - } - - builder.startObject(); - response.toXContent(builder, params); - builder.endObject(); - return builder; - } + .getMappings(getMappingsRequest, new RestActionListener<>(channel) { + @Override + protected void processResponse(GetMappingsResponse getMappingsResponse) throws Exception { + ensureOpen(); + channel.sendResponse( + new RestResponse( + RestStatus.OK, + ChunkedRestResponseBody.fromXContent( + () -> Iterators.concat( + Iterators.single((b, p) -> b.startObject()), + getMappingsResponse.toXContentChunked(), + Iterators.single((b, p) -> b.endObject()) + ), + request, + channel + ) + ) + ); + } + }); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponseTests.java index 64e3758b4a9f5..c6a742bfe4e9b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponseTests.java @@ -18,6 +18,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class GetMappingsResponseTests extends AbstractWireSerializingTestCase { @@ -63,6 +66,22 @@ protected GetMappingsResponse createTestInstance() { return resp; } + public void testChunkedXContentUsesChunkPerIndex() { + final int indexCount = randomIntBetween(1, 10); + final var response = new GetMappingsResponse( + IntStream.range(0, indexCount) + .mapToObj(i -> "index-" + i) + .collect(Collectors.toUnmodifiableMap(Function.identity(), k -> createMappingsForIndex())) + ); + final var chunks = response.toXContentChunked(); + int chunkCount = 0; + while (chunks.hasNext()) { + chunks.next(); + chunkCount++; + } + assertEquals(indexCount, chunkCount); + } + // Not meant to be exhaustive private static Map randomFieldMapping() { Map mappings = new HashMap<>();