Skip to content

Commit 25101fb

Browse files
committed
Additional fixes for discarding data buffers
Closes gh-26232
1 parent cb44ae6 commit 25101fb

File tree

2 files changed

+22
-11
lines changed

2 files changed

+22
-11
lines changed

spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType eleme
132132
message.getHeaders().setContentLength(buffer.readableByteCount());
133133
return message.writeWith(Mono.just(buffer)
134134
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
135-
});
135+
})
136+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
136137
}
137138

138139
if (isStreamingMediaType(contentType)) {

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

+20-10
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,27 @@ public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
213213
return ((Mono<? extends DataBuffer>) body)
214214
.flatMap(buffer -> {
215215
touchDataBuffer(buffer);
216-
return doCommit(() -> {
217-
try {
218-
return writeWithInternal(Mono.fromCallable(() -> buffer)
219-
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
220-
}
221-
catch (Throwable ex) {
222-
return Mono.error(ex);
223-
}
224-
}).doOnError(ex -> DataBufferUtils.release(buffer));
216+
AtomicReference<Boolean> subscribed = new AtomicReference<>(false);
217+
return doCommit(
218+
() -> {
219+
try {
220+
return writeWithInternal(Mono.fromCallable(() -> buffer)
221+
.doOnSubscribe(s -> subscribed.set(true))
222+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
223+
}
224+
catch (Throwable ex) {
225+
return Mono.error(ex);
226+
}
227+
})
228+
.doOnError(ex -> DataBufferUtils.release(buffer))
229+
.doOnCancel(() -> {
230+
if (!subscribed.get()) {
231+
DataBufferUtils.release(buffer);
232+
}
233+
});
225234
})
226-
.doOnError(t -> getHeaders().clearContentHeaders());
235+
.doOnError(t -> getHeaders().clearContentHeaders())
236+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
227237
}
228238
else {
229239
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))

0 commit comments

Comments
 (0)