Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert RestGetMappingAction to chunked encoding #89906

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.http;

import org.apache.http.client.methods.HttpGet;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Request;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.hamcrest.Matchers;

import java.io.InputStream;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.aMapWithSize;

@LuceneTestCase.SuppressFileSystems(value = "HandleLimitFS") // we sometimes have >2048 open files
public class RestGetMappingsIT extends HttpSmokeTestCase {

public void testGetLargeMappingsResponse() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think if we approach testing chunked encoding like this across all APIs we'll end up adding a lot of costly tests. Could we instead reduce the chunk size in these smoke tests to force chunking at a more reasonable scale?

Also I think this test passes even without the production code changes. Can we assert that the REST response really did arrive in multiple chunks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm now that I think about it more, maybe this test is pointless. I really wanted to have a test that would actually cause a round of chunked encoding where the channel becomes not-writable at least once. But now that I think about it again, that's a really redundant test probably ... I'll just remove this and rely on the fact that this is tested by the existing REST tests (that will now see a chunked response) and our Netty specific tests around flushing messages. I think that's good enough here. Sorry for the noise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm confused, but I was expecting that we'd send the response in chunks regardless of writability. I think it's worth checking that, but MAX_BYTES_PER_WRITE = 1 << 18 is just a bit big for a test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the response is always chunked but the fact that it serializes fine is already tested elsewhere. Adding a test that verifies that we actually get chunked bytes back seems quite redundant doesn't it, I mean unless someone wilfully changes the code to revert back to a normal response (which I'll make very hard shortly, PR incoming :)) this doesn't guard against anything does it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehh maybe I'm paranoid but I could imagine some future refactoring which accidentally puts all the objects into a single chunk, quietly destroying the memory-efficiency of this API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense :) How about b6517df to make sure that won't happen?

final XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject(MapperService.SINGLE_MAPPING_NAME)
.startObject("properties");
final int fields = randomIntBetween(500, 1000);
for (int i = 0; i < fields; i++) {
builder.startObject("loooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong-named-field-" + i)
.field("type", "text")
.field("store", true)
.endObject();
}
builder.endObject().endObject().endObject();
assertAcked(admin().indices().preparePutTemplate("large-mapping").setPatterns(List.of("test-idx-*")).setMapping(builder).get());
final int indexCount = randomIntBetween(50, 150);
final PlainActionFuture<Void> f = PlainActionFuture.newFuture();
final ActionListener<CreateIndexResponse> listener = new GroupedActionListener<>(f.map(l -> null), indexCount);
for (int i = 0; i < indexCount; i++) {
admin().indices()
.prepareCreate("test-idx-" + i)
.setSettings(AbstractSnapshotIntegTestCase.SINGLE_SHARD_NO_REPLICA)
.execute(listener);
}
f.get();
final var response = getRestClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_mappings"));
try (
InputStream input = response.getEntity().getContent();
XContentParser parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, input)
) {
final Map<String, Object> mappings = parser.map();
assertThat(mappings, Matchers.aMapWithSize(indexCount));
final Object idx0 = mappings.get("test-idx-0");
@SuppressWarnings("unchecked")
var properties = ((Map<String, Map<String, Object>>) idx0).get("mappings").get("properties");
assertThat((Map<?, ?>) properties, aMapWithSize(fields));
mappings.forEach((key, value) -> assertEquals(key, idx0, value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestSimulateTemplateAction());

registerHandler.accept(new RestPutMappingAction());
registerHandler.accept(new RestGetMappingAction(threadPool));
registerHandler.accept(new RestGetMappingAction());
registerHandler.accept(new RestGetFieldMappingAction());

registerHandler.accept(new RestRefreshAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

import static org.elasticsearch.rest.BaseRestHandler.DEFAULT_INCLUDE_TYPE_NAME_POLICY;
import static org.elasticsearch.rest.BaseRestHandler.INCLUDE_TYPE_NAME_PARAMETER;

public class GetMappingsResponse extends ActionResponse implements ToXContentFragment {
public class GetMappingsResponse extends ActionResponse implements ToXContentFragment, ChunkedToXContent {

private static final ParseField MAPPINGS = new ParseField("mappings");

Expand Down Expand Up @@ -65,8 +67,8 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (final Map.Entry<String, MappingMetadata> indexEntry : getMappings().entrySet()) {
public Iterator<ToXContent> toXContentChunked() {
return getMappings().entrySet().stream().map(indexEntry -> (ToXContent) (builder, params) -> {
builder.startObject(indexEntry.getKey());
boolean includeTypeName = params.paramAsBoolean(INCLUDE_TYPE_NAME_PARAMETER, DEFAULT_INCLUDE_TYPE_NAME_POLICY);
if (builder.getRestApiVersion() == RestApiVersion.V_7 && includeTypeName && indexEntry.getValue() != null) {
Expand All @@ -83,8 +85,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject(MAPPINGS.getPreferredName()).endObject();
}
builder.endObject();
}
return builder;
return builder;
}).iterator();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,26 @@

package org.elasticsearch.rest.action.admin.indices;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.DispatchingRestToXContentListener;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.function.LongSupplier;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.HEAD;
Expand All @@ -40,11 +39,7 @@ public class RestGetMappingAction extends BaseRestHandler {
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in get mapping request is deprecated. "
+ "Use typeless api instead";

private final ThreadPool threadPool;

public RestGetMappingAction(ThreadPool threadPool) {
this.threadPool = threadPool;
}
public RestGetMappingAction() {}

@Override
public List<Route> routes() {
Expand Down Expand Up @@ -97,37 +92,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final HttpChannel httpChannel = request.getHttpChannel();
return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
.indices()
.getMappings(
getMappingsRequest,
new DispatchingRestToXContentListener<>(threadPool.executor(ThreadPool.Names.MANAGEMENT), channel, request).map(
getMappingsResponse -> new RestGetMappingsResponse(getMappingsResponse, threadPool::relativeTimeInMillis, timeout)
)
);
}

private static final class RestGetMappingsResponse implements ToXContentObject {
private final GetMappingsResponse response;
private final LongSupplier relativeTimeSupplierMillis;
private final TimeValue timeout;
private final long startTimeMs;

private RestGetMappingsResponse(GetMappingsResponse response, LongSupplier relativeTimeSupplierMillis, TimeValue timeout) {
this.response = response;
this.relativeTimeSupplierMillis = relativeTimeSupplierMillis;
this.timeout = timeout;
this.startTimeMs = relativeTimeSupplierMillis.getAsLong();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (relativeTimeSupplierMillis.getAsLong() - startTimeMs > timeout.millis()) {
throw new ElasticsearchTimeoutException("Timed out getting mappings");
}

builder.startObject();
response.toXContent(builder, params);
builder.endObject();
return builder;
}
.getMappings(getMappingsRequest, new RestActionListener<>(channel) {
@Override
protected void processResponse(GetMappingsResponse getMappingsResponse) throws Exception {
ensureOpen();
channel.sendResponse(
new RestResponse(
RestStatus.OK,
ChunkedRestResponseBody.fromXContent(
() -> Iterators.concat(
Iterators.single((b, p) -> b.startObject()),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure why the GetMappingsResponse is a fragment and we had the complicated wrapping here but didn't want to change it in this PR so I manually wrapped it for now.

getMappingsResponse.toXContentChunked(),
Iterators.single((b, p) -> b.endObject())
),
request,
channel
)
)
);
}
});
}
}