Skip to content

Commit

Permalink
Add support for newline delimited JSON Content-Type (#22947)
Browse files Browse the repository at this point in the history
This commit adds support for the newline delimited JSON Content-Type, which is how
the bulk, multi-search, and multi-search template APIs expect data to be formatted. The
`elasticsearch-js` client has also been using this content type for these types of requests.

Closes #22943
  • Loading branch information
jaymode committed Feb 7, 2017
1 parent b9b9400 commit 4612931
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public static XContentType fromMediaTypeOrFormat(String mediaType) {
return type;
}
}
if(mediaType.toLowerCase(Locale.ROOT).startsWith("application/*")) {
final String lowercaseMediaType = mediaType.toLowerCase(Locale.ROOT);
if (lowercaseMediaType.startsWith("application/*")) {
return JSON;
}

Expand All @@ -152,6 +153,7 @@ public static XContentType fromMediaType(String mediaType) {
return type;
}
}

return null;
}

Expand Down
38 changes: 30 additions & 8 deletions core/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -179,6 +180,10 @@ public void dispatchRequest(RestRequest request, RestChannel channel, ThreadCont

if (contentLength > 0 && hasContentTypeOrCanAutoDetect(request, handler) == false) {
sendContentTypeErrorMessage(request, responseChannel);
} else if (contentLength > 0 && handler != null && handler.supportsContentStream() &&
request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) {
responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(RestStatus.NOT_ACCEPTABLE, "Content-Type [" +
request.getXContentType() + "] does not support stream parsing. Use JSON or SMILE instead"));
} else {
if (canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
Expand Down Expand Up @@ -242,22 +247,39 @@ private boolean hasContentTypeOrCanAutoDetect(final RestRequest restRequest, fin
// be removed!
deprecationLogger.deprecated("Plain text request bodies are deprecated. Use request parameters or body " +
"in a supported format.");
} else if (isContentTypeRequired) {
return false;
} else {
deprecationLogger.deprecated("Content type detection for rest requests is deprecated. Specify the content type using " +
"the [Content-Type] header.");
XContentType xContentType = XContentFactory.xContentType(restRequest.content());
if (xContentType == null) {
} else if (restHandler != null && restHandler.supportsContentStream() && restRequest.header("Content-Type") != null) {
final String lowercaseMediaType = restRequest.header("Content-Type").toLowerCase(Locale.ROOT);
// we also support line-delimited JSON, which isn't official and has a few variations
// http://specs.okfnlabs.org/ndjson/
// https://github.com/ndjson/ndjson-spec/blob/48ea03cea6796b614cfbff4d4eb921f0b1d35c26/specification.md
if (lowercaseMediaType.equals("application/x-ldjson") || lowercaseMediaType.equals("application/x-ndjson")) {
restRequest.setXContentType(XContentType.JSON);
} else if (isContentTypeRequired) {
return false;
} else {
restRequest.setXContentType(xContentType);
return autoDetectXContentType(restRequest);
}
} else if (isContentTypeRequired) {
return false;
} else {
return autoDetectXContentType(restRequest);
}
}
return true;
}

private boolean autoDetectXContentType(RestRequest restRequest) {
deprecationLogger.deprecated("Content type detection for rest requests is deprecated. Specify the content type using " +
"the [Content-Type] header.");
XContentType xContentType = XContentFactory.xContentType(restRequest.content());
if (xContentType == null) {
return false;
} else {
restRequest.setXContentType(xContentType);
}
return true;
}

private void sendContentTypeErrorMessage(RestRequest restRequest, RestChannel channel) throws IOException {
final List<String> contentTypeHeader = restRequest.getAllHeaderValues("Content-Type");
final String errorMessage;
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/elasticsearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.rest;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.XContent;

/**
* Handler for REST requests
Expand All @@ -46,4 +47,13 @@ default boolean canTripCircuitBreaker() {
default boolean supportsPlainText() {
return false;
}

/**
* Indicates if the RestHandler supports content as a stream. A stream would be multiple objects delineated by
* {@link XContent#streamSeparator()}. If a handler returns true this will affect the types of content that can be sent to
* this endpoint.
*/
default boolean supportsContentStream() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public RestResponse buildResponse(BulkResponse response, XContentBuilder builder
});
}

@Override
public boolean supportsContentStream() {
return true;
}

static final class Fields {
static final String ITEMS = "items";
static final String ERRORS = "errors";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public static void parseMultiLineRequest(RestRequest request, IndicesOptions ind
}
}

@Override
public boolean supportsContentStream() {
return true;
}

private static int findNextMarker(byte marker, int from, BytesReference data, int length) {
for (int i = from; i < length; i++) {
if (data.get(i) == marker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -68,6 +69,17 @@ public void testSimpleBulk1() throws Exception {
assertThat(((IndexRequest) bulkRequest.requests().get(2)).source(), equalTo(new BytesArray("{ \"field1\" : \"value3\" }")));
}

public void testSimpleBulkWithCarriageReturn() throws Exception {
String bulkAction = "{ \"index\":{\"_index\":\"test\",\"_type\":\"type1\",\"_id\":\"1\"} }\r\n{ \"field1\" : \"value1\" }\r\n";
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
assertThat(bulkRequest.numberOfActions(), equalTo(1));
assertThat(((IndexRequest) bulkRequest.requests().get(0)).source(), equalTo(new BytesArray("{ \"field1\" : \"value1\" }\r")));
Map<String, Object> sourceMap = XContentHelper.convertToMap(((IndexRequest) bulkRequest.requests().get(0)).source(),
false, XContentType.JSON).v2();
assertEquals("value1", sourceMap.get("field1"));
}

public void testSimpleBulk2() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk2.json");
BulkRequest bulkRequest = new BulkRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ public void testSimpleAdd() throws Exception {
assertThat(request.requests().get(7).types().length, equalTo(0));
}

public void testSimpleAddWithCarriageReturn() throws Exception {
final String requestContent = "{\"index\":\"test\", \"ignore_unavailable\" : true, \"expand_wildcards\" : \"open,closed\"}}\r\n" +
"{\"query\" : {\"match_all\" :{}}}\r\n";
FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry())
.withContent(new BytesArray(requestContent), XContentType.JSON).build();
MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, true);
assertThat(request.requests().size(), equalTo(1));
assertThat(request.requests().get(0).indices()[0], equalTo("test"));
assertThat(request.requests().get(0).indicesOptions(),
equalTo(IndicesOptions.fromOptions(true, true, true, true, IndicesOptions.strictExpandOpenAndForbidClosed())));
assertThat(request.requests().get(0).types().length, equalTo(0));
}

public void testSimpleAdd2() throws Exception {
MultiSearchRequest request = parseMultiSearchRequest("/org/elasticsearch/action/search/simple-msearch2.json");
assertThat(request.requests().size(), equalTo(5));
Expand Down
151 changes: 151 additions & 0 deletions core/src/test/java/org/elasticsearch/rest/RestControllerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.rest;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -44,6 +45,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.yaml.YamlXContent;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
Expand Down Expand Up @@ -311,6 +313,155 @@ public void testDispatchWorksWithAutoDetection() {
assertWarnings("Content type detection for rest requests is deprecated. Specify the content type using the [Content-Type] header.");
}

public void testDispatchWorksWithNewlineDelimitedJson() {
final String mimeType = randomFrom("application/x-ldjson", "application/x-ndjson");
String content = randomAsciiOfLengthBetween(1, BREAKER_LIMIT.bytesAsInt());
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray(content), null).withPath("/foo")
.withHeaders(Collections.singletonMap("Content-Type", Collections.singletonList(mimeType))).build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}

@Override
public boolean supportsContentStream() {
return true;
}
});

assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
}

public void testDispatchWithContentStream() {
final String mimeType = randomFrom("application/json", "application/smile");
String content = randomAsciiOfLengthBetween(1, BREAKER_LIMIT.bytesAsInt());
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray(content), null).withPath("/foo")
.withHeaders(Collections.singletonMap("Content-Type", Collections.singletonList(mimeType))).build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}

@Override
public boolean supportsContentStream() {
return true;
}
});

assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
}

public void testDispatchWithContentStreamAutoDetect() {
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray("{}"), null).withPath("/foo").build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}

@Override
public boolean supportsContentStream() {
return true;
}
});

assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
assertWarnings("Content type detection for rest requests is deprecated. Specify the content type using the [Content-Type] header.");
}

public void testNonStreamingXContentCausesErrorResponse() throws IOException {
// auto detect
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(YamlXContent.contentBuilder().startObject().endObject().bytes(), null).withPath("/foo").build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.NOT_ACCEPTABLE);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}

@Override
public boolean supportsContentStream() {
return true;
}
});

assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());

assertWarnings("Content type detection for rest requests is deprecated. Specify the content type using the [Content-Type] header.");

// specified
fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(YamlXContent.contentBuilder().startObject().endObject().bytes(), XContentType.YAML).withPath("/foo").build();
channel = new AssertingChannel(fakeRestRequest, true, RestStatus.NOT_ACCEPTABLE);
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
}

public void testStrictModeContentStream() {
restController = new RestController(
Settings.builder().put(HttpTransportSettings.SETTING_HTTP_CONTENT_TYPE_REQUIRED.getKey(), true).build(),
Collections.emptySet(), null, null, circuitBreakerService);
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray("{}"), null).withPath("/foo")
.build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.NOT_ACCEPTABLE);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}

@Override
public boolean supportsContentStream() {
return true;
}
});
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
}

public void testUnknownContentWithContentStream() {
FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray("aaaabbbbb"), null).withPath("/foo")
.withHeaders(Collections.singletonMap("Content-Type", Collections.singletonList("foo/bar")))
.build();
AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.NOT_ACCEPTABLE);
restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}

@Override
public boolean supportsContentStream() {
return true;
}
});
assertFalse(channel.sendResponseCalled.get());
restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
assertTrue(channel.sendResponseCalled.get());
assertWarnings("Content type detection for rest requests is deprecated. Specify the content type using the [Content-Type] header.");
}

private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements
HttpServerTransport {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public static MultiSearchTemplateRequest parseRequest(RestRequest restRequest, b
});
return multiRequest;
}

@Override
public boolean supportsContentStream() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,24 @@ public void testParseRequest() throws Exception {
assertEquals(1, request.requests().get(1).getScriptParams().size());
assertEquals(1, request.requests().get(2).getScriptParams().size());
}

public void testParseWithCarriageReturn() throws Exception {
final String content = "{\"index\":[\"test0\", \"test1\"], \"request_cache\": true}\r\n" +
"{\"inline\": {\"query\" : {\"match_{{template}}\" :{}}}, \"params\": {\"template\": \"all\" } }\r\n";
RestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry())
.withContent(new BytesArray(content), XContentType.JSON).build();

MultiSearchTemplateRequest request = RestMultiSearchTemplateAction.parseRequest(restRequest, true);

assertThat(request.requests().size(), equalTo(1));
assertThat(request.requests().get(0).getRequest().indices()[0], equalTo("test0"));
assertThat(request.requests().get(0).getRequest().indices()[1], equalTo("test1"));
assertThat(request.requests().get(0).getRequest().indices(), arrayContaining("test0", "test1"));
assertThat(request.requests().get(0).getRequest().requestCache(), equalTo(true));
assertThat(request.requests().get(0).getRequest().preference(), nullValue());
assertNotNull(request.requests().get(0).getScript());
assertEquals(ScriptType.INLINE, request.requests().get(0).getScriptType());
assertEquals("{\"query\":{\"match_{{template}}\":{}}}", request.requests().get(0).getScript());
assertEquals(1, request.requests().get(0).getScriptParams().size());
}
}
Loading

0 comments on commit 4612931

Please sign in to comment.