Skip to content

Commit d2f2918

Browse files
committed
Backport fixes for discarding data buffers
Closes gh-26232
1 parent 900c45e commit d2f2918

File tree

2 files changed

+32
-13
lines changed

2 files changed

+32
-13
lines changed

spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,8 +29,8 @@
2929
import org.springframework.core.codec.Encoder;
3030
import org.springframework.core.codec.Hints;
3131
import org.springframework.core.io.buffer.DataBuffer;
32+
import org.springframework.core.io.buffer.DataBufferUtils;
3233
import org.springframework.core.io.buffer.PooledDataBuffer;
33-
import org.springframework.http.HttpHeaders;
3434
import org.springframework.http.HttpLogging;
3535
import org.springframework.http.MediaType;
3636
import org.springframework.http.ReactiveHttpOutputMessage;
@@ -108,7 +108,6 @@ public boolean canWrite(ResolvableType elementType, @Nullable MediaType mediaTyp
108108
return this.encoder.canEncode(elementType, mediaType);
109109
}
110110

111-
@SuppressWarnings("unchecked")
112111
@Override
113112
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType elementType,
114113
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
@@ -119,23 +118,23 @@ public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType eleme
119118
inputStream, message.bufferFactory(), elementType, contentType, hints);
120119

121120
if (inputStream instanceof Mono) {
122-
HttpHeaders headers = message.getHeaders();
123121
return body
124122
.singleOrEmpty()
125123
.switchIfEmpty(Mono.defer(() -> {
126-
headers.setContentLength(0);
124+
message.getHeaders().setContentLength(0);
127125
return message.setComplete().then(Mono.empty());
128126
}))
129127
.flatMap(buffer -> {
130-
headers.setContentLength(buffer.readableByteCount());
128+
message.getHeaders().setContentLength(buffer.readableByteCount());
131129
return message.writeWith(Mono.just(buffer)
132-
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
133-
});
130+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
131+
})
132+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
134133
}
135134

136135
if (isStreamingMediaType(contentType)) {
137136
return message.writeAndFlushWith(body.map(buffer ->
138-
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
137+
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)));
139138
}
140139

141140
return message.writeWith(body);

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

+24-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -180,9 +180,29 @@ public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
180180
// Write as Mono if possible as an optimization hint to Reactor Netty
181181
// ChannelSendOperator not necessary for Mono
182182
if (body instanceof Mono) {
183-
return ((Mono<? extends DataBuffer>) body).flatMap(buffer ->
184-
doCommit(() -> writeWithInternal(Mono.just(buffer)))
185-
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
183+
return ((Mono<? extends DataBuffer>) body)
184+
.flatMap(buffer -> {
185+
AtomicReference<Boolean> subscribed = new AtomicReference<>(false);
186+
return doCommit(
187+
() -> {
188+
try {
189+
return writeWithInternal(Mono.fromCallable(() -> buffer)
190+
.doOnSubscribe(s -> subscribed.set(true))
191+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
192+
}
193+
catch (Throwable ex) {
194+
return Mono.error(ex);
195+
}
196+
})
197+
.doOnError(ex -> DataBufferUtils.release(buffer))
198+
.doOnCancel(() -> {
199+
if (!subscribed.get()) {
200+
DataBufferUtils.release(buffer);
201+
}
202+
});
203+
})
204+
.doOnError(t -> removeContentLength())
205+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
186206
}
187207
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))
188208
.doOnError(t -> removeContentLength());

0 commit comments

Comments
 (0)