Skip to content

Commit 31e0e53

Browse files
committed
Leverage Jackson non-blocking parser
This commit introduces the Jackson2Tokenizer as a replacement for the JsonObjectDecoder. The latter was dropped because of its complexity, and hard dependency on Netty's ByteBuf. The new Jackson2Tokenizer leverages the new non-blocking JSON parser, using it to parse the incoming data buffers into TokenBuffers, each token buffer representing one JSON object. As with JsonObjectDecoder, it also supports streaming individual JSON array elements. Issue: SPR-14528
1 parent b778f94 commit 31e0e53

File tree

6 files changed

+391
-424
lines changed

6 files changed

+391
-424
lines changed

build.gradle

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ configure(allprojects) { project ->
5959
ext.httpclientVersion = "4.5.3"
6060
ext.interceptorApiVersion = "1.2"
6161
ext.jackson2Version = "2.9.0.pr4"
62+
ext.jsonassertVersion = "1.5.0"
6263
ext.javamailVersion = "1.6.0-rc2"
6364
ext.jaxbVersion = "2.2.11"
6465
ext.jaxwsVersion = "2.2.11"
@@ -772,6 +773,7 @@ project("spring-web") {
772773
testCompile("com.squareup.okhttp3:mockwebserver:${okhttp3Version}")
773774
testCompile("org.xmlunit:xmlunit-matchers:${xmlunitVersion}")
774775
testCompile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
776+
testCompile("org.skyscreamer:jsonassert:${jsonassertVersion}")
775777
testRuntime("com.sun.mail:javax.mail:${javamailVersion}")
776778
testRuntime("com.sun.xml.bind:jaxb-core:${jaxbVersion}")
777779
testRuntime("com.sun.xml.bind:jaxb-impl:${jaxbVersion}")
@@ -1037,7 +1039,7 @@ project("spring-test") {
10371039
optional("org.seleniumhq.selenium:selenium-java:3.4.0") {
10381040
exclude group: "io.netty", module: "netty"
10391041
}
1040-
optional("org.skyscreamer:jsonassert:1.5.0")
1042+
optional("org.skyscreamer:jsonassert:${jsonassertVersion}")
10411043
optional("com.jayway.jsonpath:json-path:2.2.0")
10421044
optional("org.reactivestreams:reactive-streams")
10431045
optional("io.projectreactor:reactor-core")
@@ -1286,6 +1288,22 @@ configure(project(':spring-core')) {
12861288
}
12871289
}
12881290

1291+
/*
1292+
* Copyright 2002-2017 the original author or authors.
1293+
*
1294+
* Licensed under the Apache License, Version 2.0 (the "License");
1295+
* you may not use this file except in compliance with the License.
1296+
* You may obtain a copy of the License at
1297+
*
1298+
* http://www.apache.org/licenses/LICENSE-2.0
1299+
*
1300+
* Unless required by applicable law or agreed to in writing, software
1301+
* distributed under the License is distributed on an "AS IS" BASIS,
1302+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1303+
* See the License for the specific language governing permissions and
1304+
* limitations under the License.
1305+
*/
1306+
12891307
/*
12901308
* Support publication of artifacts versioned by topic branch.
12911309
* CI builds supply `-P BRANCH_NAME=<TOPIC>` to gradle at build time.

spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonDecoder.java

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@
2121
import java.util.List;
2222
import java.util.Map;
2323

24+
import com.fasterxml.jackson.core.JsonFactory;
25+
import com.fasterxml.jackson.core.JsonParser;
2426
import com.fasterxml.jackson.core.JsonProcessingException;
2527
import com.fasterxml.jackson.databind.JavaType;
2628
import com.fasterxml.jackson.databind.ObjectMapper;
2729
import com.fasterxml.jackson.databind.ObjectReader;
2830
import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
31+
import com.fasterxml.jackson.databind.util.TokenBuffer;
32+
import org.eclipse.jetty.io.RuntimeIOException;
2933
import org.reactivestreams.Publisher;
3034
import reactor.core.publisher.Flux;
3135
import reactor.core.publisher.Mono;
@@ -35,7 +39,6 @@
3539
import org.springframework.core.codec.CodecException;
3640
import org.springframework.core.codec.DecodingException;
3741
import org.springframework.core.io.buffer.DataBuffer;
38-
import org.springframework.core.io.buffer.DataBufferUtils;
3942
import org.springframework.http.codec.HttpMessageDecoder;
4043
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
4144
import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -54,11 +57,6 @@
5457
*/
5558
public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMessageDecoder<Object> {
5659

57-
private final JsonObjectDecoder fluxDecoder = new JsonObjectDecoder(true);
58-
59-
private final JsonObjectDecoder monoDecoder = new JsonObjectDecoder(false);
60-
61-
6260
public Jackson2JsonDecoder() {
6361
super(Jackson2ObjectMapperBuilder.json().build());
6462
}
@@ -67,7 +65,6 @@ public Jackson2JsonDecoder(ObjectMapper mapper, MimeType... mimeTypes) {
6765
super(mapper, mimeTypes);
6866
}
6967

70-
7168
@Override
7269
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
7370
JavaType javaType = objectMapper().getTypeFactory().constructType(elementType.getType());
@@ -76,7 +73,6 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
7673
objectMapper().canDeserialize(javaType) && supportsMimeType(mimeType));
7774
}
7875

79-
8076
@Override
8177
public List<MimeType> getDecodableMimeTypes() {
8278
return JSON_MIME_TYPES;
@@ -86,20 +82,27 @@ public List<MimeType> getDecodableMimeTypes() {
8682
public Flux<Object> decode(Publisher<DataBuffer> input, ResolvableType elementType,
8783
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
8884

89-
return decodeInternal(this.fluxDecoder, input, elementType, mimeType, hints);
85+
Flux<TokenBuffer> tokens = Flux.from(input)
86+
.flatMap(new Jackson2Tokenizer(nonBlockingParser(), true));
87+
88+
return decodeInternal(tokens, elementType, mimeType, hints);
9089
}
9190

9291
@Override
9392
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
9493
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
9594

96-
return decodeInternal(this.monoDecoder, input, elementType, mimeType, hints).singleOrEmpty();
95+
Flux<TokenBuffer> tokens = Flux.from(input)
96+
.flatMap(new Jackson2Tokenizer(nonBlockingParser(), false));
97+
98+
return decodeInternal(tokens, elementType, mimeType, hints).singleOrEmpty();
9799
}
98100

99-
private Flux<Object> decodeInternal(JsonObjectDecoder objectDecoder, Publisher<DataBuffer> inputStream,
100-
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
101+
private Flux<Object> decodeInternal(Flux<TokenBuffer> tokens,
102+
ResolvableType elementType, @Nullable MimeType mimeType,
103+
@Nullable Map<String, Object> hints) {
101104

102-
Assert.notNull(inputStream, "'inputStream' must not be null");
105+
Assert.notNull(tokens, "'tokens' must not be null");
103106
Assert.notNull(elementType, "'elementType' must not be null");
104107

105108
Class<?> contextClass = getParameter(elementType).map(MethodParameter::getContainingClass).orElse(null);
@@ -110,26 +113,21 @@ private Flux<Object> decodeInternal(JsonObjectDecoder objectDecoder, Publisher<D
110113
objectMapper().readerWithView(jsonView).forType(javaType) :
111114
objectMapper().readerFor(javaType));
112115

113-
return objectDecoder.decode(inputStream, elementType, mimeType, hints)
114-
.flatMap(dataBuffer -> {
115-
if (dataBuffer.readableByteCount() == 0) {
116-
return Mono.empty();
117-
}
118-
try {
119-
Object value = reader.readValue(dataBuffer.asInputStream());
120-
DataBufferUtils.release(dataBuffer);
121-
return Mono.just(value);
122-
}
123-
catch (InvalidDefinitionException ex) {
124-
return Mono.error(new CodecException("Type definition error: " + ex.getType(), ex));
125-
}
126-
catch (JsonProcessingException ex) {
127-
return Mono.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
128-
}
129-
catch (IOException ex) {
130-
return Mono.error(new DecodingException("I/O error while parsing input stream", ex));
131-
}
132-
});
116+
return tokens.flatMap(tokenBuffer -> {
117+
try {
118+
Object value = reader.readValue(tokenBuffer.asParser());
119+
return Mono.just(value);
120+
}
121+
catch (InvalidDefinitionException ex) {
122+
return Mono.error(new CodecException("Type definition error: " + ex.getType(), ex));
123+
}
124+
catch (JsonProcessingException ex) {
125+
return Mono.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex));
126+
}
127+
catch (IOException ex) {
128+
return Mono.error(new DecodingException("I/O error while parsing input stream", ex));
129+
}
130+
});
133131
}
134132

135133

@@ -147,4 +145,13 @@ protected <A extends Annotation> A getAnnotation(MethodParameter parameter, Clas
147145
return parameter.getParameterAnnotation(annotType);
148146
}
149147

148+
private JsonParser nonBlockingParser() {
149+
try {
150+
JsonFactory factory = this.objectMapper().getFactory();
151+
return factory.createNonBlockingByteArrayParser();
152+
}
153+
catch (IOException ex) {
154+
throw new RuntimeIOException(ex);
155+
}
156+
}
150157
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2002-2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.codec.json;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.function.Function;
23+
24+
import com.fasterxml.jackson.core.JsonParser;
25+
import com.fasterxml.jackson.core.JsonProcessingException;
26+
import com.fasterxml.jackson.core.JsonToken;
27+
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
28+
import com.fasterxml.jackson.databind.util.TokenBuffer;
29+
import reactor.core.publisher.Flux;
30+
31+
import org.springframework.core.codec.DecodingException;
32+
import org.springframework.core.io.buffer.DataBuffer;
33+
import org.springframework.core.io.buffer.DataBufferUtils;
34+
import org.springframework.util.Assert;
35+
36+
/**
37+
* Function that transforms an arbitrary split byte stream representing JSON objects into a
38+
* {@code Flux<TokenBuffer>}, where each token buffer is a well-formed JSON object.
39+
*
40+
* @author Arjen Poutsma
41+
* @since 5.0
42+
*/
43+
class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
44+
45+
private final JsonParser parser;
46+
47+
private final boolean tokenizeArrayElements;
48+
49+
private TokenBuffer tokenBuffer;
50+
51+
private int objectDepth;
52+
53+
private int arrayDepth;
54+
55+
// TODO: change to ByteBufferFeeder when supported by Jackson
56+
private ByteArrayFeeder inputFeeder;
57+
58+
/**
59+
* Create a new instance of the {@code Jackson2Tokenizer}.
60+
* @param parser the non-blocking parser, obtained via
61+
* {@link com.fasterxml.jackson.core.JsonFactory#createNonBlockingByteArrayParser}
62+
* @param tokenizeArrayElements if {@code true} and the "top level" JSON object is an array,
63+
* each of its elements is returned individually and immediately after it was fully received
64+
*/
65+
public Jackson2Tokenizer(JsonParser parser, boolean tokenizeArrayElements) {
66+
Assert.notNull(parser, "'parser' must not be null");
67+
68+
this.parser = parser;
69+
this.tokenizeArrayElements = tokenizeArrayElements;
70+
this.tokenBuffer = new TokenBuffer(parser);
71+
this.inputFeeder = (ByteArrayFeeder) this.parser.getNonBlockingInputFeeder();
72+
}
73+
74+
@Override
75+
public Flux<TokenBuffer> apply(DataBuffer dataBuffer) {
76+
byte[] bytes = new byte[dataBuffer.readableByteCount()];
77+
dataBuffer.read(bytes);
78+
DataBufferUtils.release(dataBuffer);
79+
80+
try {
81+
this.inputFeeder.feedInput(bytes, 0, bytes.length);
82+
List<TokenBuffer> result = new ArrayList<>();
83+
84+
while (true) {
85+
JsonToken token = this.parser.nextToken();
86+
if (token == JsonToken.NOT_AVAILABLE) {
87+
break;
88+
}
89+
calculateDepth(token);
90+
91+
if (!this.tokenizeArrayElements) {
92+
processTokenNormal(token, result);
93+
}
94+
else {
95+
processTokenArray(token, result);
96+
}
97+
}
98+
return Flux.fromIterable(result);
99+
}
100+
catch (JsonProcessingException ex) {
101+
return Flux.error(new DecodingException(
102+
"JSON decoding error: " + ex.getOriginalMessage(), ex));
103+
}
104+
catch (Exception ex) {
105+
return Flux.error(ex);
106+
}
107+
}
108+
109+
private void calculateDepth(JsonToken token) {
110+
switch (token) {
111+
case START_OBJECT:
112+
this.objectDepth++;
113+
break;
114+
case END_OBJECT:
115+
this.objectDepth--;
116+
break;
117+
case START_ARRAY:
118+
this.arrayDepth++;
119+
break;
120+
case END_ARRAY:
121+
this.arrayDepth--;
122+
break;
123+
}
124+
}
125+
126+
private void processTokenNormal(JsonToken token, List<TokenBuffer> result) throws IOException {
127+
this.tokenBuffer.copyCurrentEvent(this.parser);
128+
129+
if (token == JsonToken.END_OBJECT || token == JsonToken.END_ARRAY) {
130+
if (this.objectDepth == 0 && this.arrayDepth == 0) {
131+
result.add(this.tokenBuffer);
132+
this.tokenBuffer = new TokenBuffer(this.parser);
133+
}
134+
}
135+
136+
}
137+
138+
private void processTokenArray(JsonToken token, List<TokenBuffer> result) throws IOException {
139+
if (token != JsonToken.START_ARRAY && token != JsonToken.END_ARRAY) {
140+
this.tokenBuffer.copyCurrentEvent(this.parser);
141+
}
142+
143+
if (token == JsonToken.END_OBJECT && this.objectDepth == 0 &&
144+
(this.arrayDepth == 1 || this.arrayDepth == 0)) {
145+
result.add(this.tokenBuffer);
146+
this.tokenBuffer = new TokenBuffer(this.parser);
147+
}
148+
149+
}
150+
151+
}

0 commit comments

Comments
 (0)