Skip to content

Commit

Permalink
Additional DataBuffer hints
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Dec 8, 2020
1 parent 01fb4db commit cb44ae6
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory,
byte[] bytes = byteBuilder.toByteArray();
DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
Hints.touchDataBuffer(buffer, hints, logger);

return buffer;
}
Expand Down Expand Up @@ -267,6 +268,7 @@ private DataBuffer encodeStreamingValue(Object value, DataBufferFactory bufferFa
DataBuffer buffer = bufferFactory.allocateBuffer(length + separator.length);
buffer.write(bytes, offset, length);
buffer.write(separator);
Hints.touchDataBuffer(buffer, hints, logger);

return buffer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,18 @@ public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
// We must resolve value first however, for a chance to handle potential error.
if (body instanceof Mono) {
return ((Mono<? extends DataBuffer>) body)
.flatMap(buffer ->
doCommit(() -> {
try {
return writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}
catch (Throwable ex) {
return Mono.error(ex);
}
}).doOnError(ex -> DataBufferUtils.release(buffer)))
.flatMap(buffer -> {
touchDataBuffer(buffer);
return doCommit(() -> {
try {
return writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}
catch (Throwable ex) {
return Mono.error(ex);
}
}).doOnError(ex -> DataBufferUtils.release(buffer));
})
.doOnError(t -> getHeaders().clearContentHeaders());
}
else {
Expand Down Expand Up @@ -323,4 +325,13 @@ else if (this.state.compareAndSet(State.COMMIT_ACTION_FAILED, State.COMMITTING))
*/
protected abstract void applyCookies();

/**
* Allow sub-classes to associate a hint with the data buffer if it is a
* pooled buffer and supports leak tracking.
* @param buffer the buffer to attach a hint to
* @since 5.3.2
*/
protected void touchDataBuffer(DataBuffer buffer) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelId;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerResponse;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
Expand All @@ -43,6 +47,9 @@
*/
class ReactorServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage {

private static final Log logger = LogFactory.getLog(ReactorServerHttpResponse.class);


private final HttpServerResponse response;


Expand Down Expand Up @@ -115,4 +122,14 @@ private Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffer
Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf);
}

@Override
protected void touchDataBuffer(DataBuffer buffer) {
if (logger.isDebugEnabled()) {
this.response.withConnection(connection -> {
ChannelId id = connection.channel().id();
DataBufferUtils.touch(buffer, "Channel id: " + id.asShortText());
});
}
}

}

0 comments on commit cb44ae6

Please sign in to comment.