From 9614fd7b7e6662e5d6046011ccbfc0ce6e740d08 Mon Sep 17 00:00:00 2001 From: Ben Hale Date: Mon, 16 Apr 2018 09:35:54 -0700 Subject: [PATCH] Fix Reference Count Leak Previously, there were a couple of frames that converted Strings to ByteBufs automatically as part of construction. Those frames though, retained any ByteBufs passed into them so that they could be released by the creators as necessary. Since the automatic construction was not doing this release on the String-based ByteBufs, there was a minor leak of references. This change updates those locations to release the String-based ByteBufs and accurately account for their retention and release. --- .../fragmentation/FrameFragmenter.java | 2 +- .../fragmentation/FrameReassembler.java | 2 +- .../framing/AbstractRecyclableFrame.java | 4 +- .../java/io/rsocket/framing/DataFrame.java | 1 + .../java/io/rsocket/framing/ErrorFrame.java | 9 +- .../io/rsocket/framing/ExtensionFrame.java | 13 ++- .../io/rsocket/framing/MetadataFrame.java | 1 + .../io/rsocket/framing/MetadataPushFrame.java | 10 ++- .../java/io/rsocket/framing/PayloadFrame.java | 12 ++- .../rsocket/framing/RequestChannelFrame.java | 18 ++-- .../framing/RequestFireAndForgetFrame.java | 13 ++- .../rsocket/framing/RequestResponseFrame.java | 12 ++- .../rsocket/framing/RequestStreamFrame.java | 17 ++-- .../java/io/rsocket/framing/ResumeFrame.java | 18 ++-- .../java/io/rsocket/framing/SetupFrame.java | 87 +++++++++++-------- .../util/AbstractionLeakingFrameUtils.java | 5 +- ...sposableUtil.java => DisposableUtils.java} | 8 +- 17 files changed, 161 insertions(+), 71 deletions(-) rename rsocket-core/src/main/java/io/rsocket/util/{DisposableUtil.java => DisposableUtils.java} (88%) diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java index 8b65ee63c..e178bc971 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java @@ -17,7 +17,7 @@ package io.rsocket.fragmentation; import static io.rsocket.framing.PayloadFrame.createPayloadFrame; -import static io.rsocket.util.DisposableUtil.disposeQuietly; +import static io.rsocket.util.DisposableUtils.disposeQuietly; import static java.lang.Math.min; import io.netty.buffer.ByteBuf; diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java index 7b4e08232..ba1886bcf 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java @@ -16,7 +16,7 @@ package io.rsocket.fragmentation; -import static io.rsocket.util.DisposableUtil.disposeQuietly; +import static io.rsocket.util.DisposableUtils.disposeQuietly; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; diff --git a/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java index 5eb369bd9..3cd56f93d 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java @@ -16,12 +16,14 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static java.nio.charset.StandardCharsets.UTF_8; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.util.Recycler.Handle; +import io.netty.util.ReferenceCounted; import java.util.Objects; import reactor.util.annotation.Nullable; @@ -51,7 +53,7 @@ abstract class AbstractRecyclableFrame getMetadataLength() { * if you store it. * * @return the metadata directly, or {@code null} if the Metadata flag is not set + * @see #getMetadataAsUtf8() * @see #mapMetadata(Function) */ @Nullable diff --git a/rsocket-core/src/main/java/io/rsocket/framing/MetadataPushFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/MetadataPushFrame.java index 8b982455d..2490fe84c 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/MetadataPushFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/MetadataPushFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.framing.FrameType.METADATA_PUSH; import static io.rsocket.util.RecyclerFactory.createRecycler; @@ -69,8 +70,13 @@ public static MetadataPushFrame createMetadataPushFrame(ByteBuf byteBuf) { public static MetadataPushFrame createMetadataPushFrame( ByteBufAllocator byteBufAllocator, String metadata) { - return createMetadataPushFrame( - byteBufAllocator, getUtf8AsByteBufRequired(metadata, "metadata must not be null")); + ByteBuf metadataByteBuf = getUtf8AsByteBufRequired(metadata, "metadata must not be null"); + + try { + return createMetadataPushFrame(byteBufAllocator, metadataByteBuf); + } finally { + release(metadataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/PayloadFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/PayloadFrame.java index 50b6e42b2..c91777df6 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/PayloadFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/PayloadFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -78,8 +79,15 @@ public static PayloadFrame createPayloadFrame( @Nullable String metadata, @Nullable String data) { - return createPayloadFrame( - byteBufAllocator, follows, complete, getUtf8AsByteBuf(metadata), getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createPayloadFrame(byteBufAllocator, follows, complete, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/RequestChannelFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/RequestChannelFrame.java index f1e74bd62..95b5def86 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/RequestChannelFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/RequestChannelFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -83,13 +84,16 @@ public static RequestChannelFrame createRequestChannelFrame( @Nullable String metadata, @Nullable String data) { - return createRequestChannelFrame( - byteBufAllocator, - follows, - complete, - initialRequestN, - getUtf8AsByteBuf(metadata), - getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createRequestChannelFrame( + byteBufAllocator, follows, complete, initialRequestN, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/RequestFireAndForgetFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/RequestFireAndForgetFrame.java index 3749ee329..2f4dbe978 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/RequestFireAndForgetFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/RequestFireAndForgetFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -74,8 +75,16 @@ public static RequestFireAndForgetFrame createRequestFireAndForgetFrame( @Nullable String metadata, @Nullable String data) { - return createRequestFireAndForgetFrame( - byteBufAllocator, follows, getUtf8AsByteBuf(metadata), getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createRequestFireAndForgetFrame( + byteBufAllocator, follows, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/RequestResponseFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/RequestResponseFrame.java index e0b0d9291..ce834ca20 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/RequestResponseFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/RequestResponseFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -74,8 +75,15 @@ public static RequestResponseFrame createRequestResponseFrame( @Nullable String metadata, @Nullable String data) { - return createRequestResponseFrame( - byteBufAllocator, follows, getUtf8AsByteBuf(metadata), getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createRequestResponseFrame(byteBufAllocator, follows, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/RequestStreamFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/RequestStreamFrame.java index f372c631a..fa5747a63 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/RequestStreamFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/RequestStreamFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.util.RecyclerFactory.createRecycler; import io.netty.buffer.ByteBuf; @@ -79,12 +80,16 @@ public static RequestStreamFrame createRequestStreamFrame( @Nullable String metadata, @Nullable String data) { - return createRequestStreamFrame( - byteBufAllocator, - follows, - initialRequestN, - getUtf8AsByteBuf(metadata), - getUtf8AsByteBuf(data)); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createRequestStreamFrame( + byteBufAllocator, follows, initialRequestN, metadataByteBuf, dataByteBuf); + } finally { + release(metadataByteBuf); + release(dataByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/ResumeFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/ResumeFrame.java index c575d2ba2..1ea2eda31 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/ResumeFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/ResumeFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.framing.FrameType.RESUME; import static io.rsocket.framing.LengthUtils.getLengthAsUnsignedShort; import static io.rsocket.util.NumberUtils.requireUnsignedShort; @@ -70,12 +71,19 @@ public static ResumeFrame createResumeFrame( long lastReceivedServerPosition, long firstAvailableClientPosition) { - return createResumeFrame( - byteBufAllocator, + ByteBuf resumeIdentificationTokenByteBuf = getUtf8AsByteBufRequired( - resumeIdentificationToken, "resumeIdentificationToken must not be null"), - lastReceivedServerPosition, - firstAvailableClientPosition); + resumeIdentificationToken, "resumeIdentificationToken must not be null"); + + try { + return createResumeFrame( + byteBufAllocator, + resumeIdentificationTokenByteBuf, + lastReceivedServerPosition, + firstAvailableClientPosition); + } finally { + release(resumeIdentificationToken); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/framing/SetupFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/SetupFrame.java index b58f2a943..0591578a6 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/SetupFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/SetupFrame.java @@ -16,6 +16,7 @@ package io.rsocket.framing; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.framing.LengthUtils.getLengthAsUnsignedByte; import static io.rsocket.framing.LengthUtils.getLengthAsUnsignedShort; import static io.rsocket.util.NumberUtils.requireUnsignedShort; @@ -110,16 +111,26 @@ public static SetupFrame createSetupFrame( @Nullable String metadata, @Nullable String data) { - return createSetupFrame( - byteBufAllocator, - lease, - keepAliveInterval, - maxLifetime, - getUtf8AsByteBuf(resumeIdentificationToken), - metadataMimeType, - dataMimeType, - getUtf8AsByteBuf(metadata), - getUtf8AsByteBuf(data)); + ByteBuf resumeIdentificationTokenByteBuf = getUtf8AsByteBuf(resumeIdentificationToken); + ByteBuf metadataByteBuf = getUtf8AsByteBuf(metadata); + ByteBuf dataByteBuf = getUtf8AsByteBuf(data); + + try { + return createSetupFrame( + byteBufAllocator, + lease, + keepAliveInterval, + maxLifetime, + resumeIdentificationTokenByteBuf, + metadataMimeType, + dataMimeType, + metadataByteBuf, + dataByteBuf); + } finally { + release(resumeIdentificationTokenByteBuf); + release(metadataByteBuf); + release(dataByteBuf); + } } /** @@ -211,38 +222,46 @@ public static SetupFrame createSetupFrame( ByteBuf dataMimeTypeByteBuf = getUtf8AsByteBufRequired(dataMimeType, "dataMimeType must not be null"); - ByteBuf byteBuf = createFrameTypeAndFlags(byteBufAllocator, FrameType.SETUP); - - if (lease) { - byteBuf = setFlag(byteBuf, FLAG_LEASE); - } + try { + ByteBuf byteBuf = createFrameTypeAndFlags(byteBufAllocator, FrameType.SETUP); - byteBuf = - byteBuf - .writeShort(requireUnsignedShort(majorVersion)) - .writeShort(requireUnsignedShort(minorVersion)) - .writeInt(toIntExact(keepAliveInterval.toMillis())) - .writeInt(toIntExact(maxLifetime.toMillis())); + if (lease) { + byteBuf = setFlag(byteBuf, FLAG_LEASE); + } - if (resumeIdentificationToken != null) { byteBuf = - setFlag(byteBuf, FLAG_RESUME_ENABLED) - .writeShort(getLengthAsUnsignedShort(resumeIdentificationToken)); + byteBuf + .writeShort(requireUnsignedShort(majorVersion)) + .writeShort(requireUnsignedShort(minorVersion)) + .writeInt(toIntExact(keepAliveInterval.toMillis())) + .writeInt(toIntExact(maxLifetime.toMillis())); + + if (resumeIdentificationToken != null) { + byteBuf = + setFlag(byteBuf, FLAG_RESUME_ENABLED) + .writeShort(getLengthAsUnsignedShort(resumeIdentificationToken)); + byteBuf = + Unpooled.wrappedBuffer( + byteBuf, resumeIdentificationToken.retain(), byteBufAllocator.buffer()); + } + + byteBuf = byteBuf.writeByte(getLengthAsUnsignedByte(metadataMimeTypeByteBuf)); byteBuf = Unpooled.wrappedBuffer( - byteBuf, resumeIdentificationToken.retain(), byteBufAllocator.buffer()); - } + byteBuf, metadataMimeTypeByteBuf.retain(), byteBufAllocator.buffer()); - byteBuf = byteBuf.writeByte(getLengthAsUnsignedByte(metadataMimeTypeByteBuf)); - byteBuf = Unpooled.wrappedBuffer(byteBuf, metadataMimeTypeByteBuf, byteBufAllocator.buffer()); - - byteBuf = byteBuf.writeByte(getLengthAsUnsignedByte(dataMimeTypeByteBuf)); - byteBuf = Unpooled.wrappedBuffer(byteBuf, dataMimeTypeByteBuf, byteBufAllocator.buffer()); + byteBuf = byteBuf.writeByte(getLengthAsUnsignedByte(dataMimeTypeByteBuf)); + byteBuf = + Unpooled.wrappedBuffer(byteBuf, dataMimeTypeByteBuf.retain(), byteBufAllocator.buffer()); - byteBuf = appendMetadata(byteBufAllocator, byteBuf, metadata); - byteBuf = appendData(byteBuf, data); + byteBuf = appendMetadata(byteBufAllocator, byteBuf, metadata); + byteBuf = appendData(byteBuf, data); - return RECYCLER.get().setByteBuf(byteBuf); + return RECYCLER.get().setByteBuf(byteBuf); + } finally { + release(metadataMimeTypeByteBuf); + release(dataMimeTypeByteBuf); + } } /** diff --git a/rsocket-core/src/main/java/io/rsocket/util/AbstractionLeakingFrameUtils.java b/rsocket-core/src/main/java/io/rsocket/util/AbstractionLeakingFrameUtils.java index c7215da36..7e5a5d771 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/AbstractionLeakingFrameUtils.java +++ b/rsocket-core/src/main/java/io/rsocket/util/AbstractionLeakingFrameUtils.java @@ -16,9 +16,10 @@ package io.rsocket.util; +import static io.netty.util.ReferenceCountUtil.release; import static io.rsocket.framing.FrameLengthFrame.createFrameLengthFrame; import static io.rsocket.framing.StreamIdFrame.createStreamIdFrame; -import static io.rsocket.util.DisposableUtil.disposeQuietly; +import static io.rsocket.util.DisposableUtils.disposeQuietly; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Frame; @@ -60,7 +61,7 @@ public static Tuple2 fromAbstractionLeakingFr return Tuples.of(streamIdFrame.getStreamId(), frame); } finally { disposeQuietly(frameLengthFrame, streamIdFrame); - abstractionLeakingFrame.release(); + release(abstractionLeakingFrame); } } diff --git a/rsocket-core/src/main/java/io/rsocket/util/DisposableUtil.java b/rsocket-core/src/main/java/io/rsocket/util/DisposableUtils.java similarity index 88% rename from rsocket-core/src/main/java/io/rsocket/util/DisposableUtil.java rename to rsocket-core/src/main/java/io/rsocket/util/DisposableUtils.java index 92dd1d82c..c87a08220 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DisposableUtil.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DisposableUtils.java @@ -19,9 +19,9 @@ import reactor.core.Disposable; /** Utilities for working with the {@link Disposable} type. */ -public final class DisposableUtil { +public final class DisposableUtils { - private DisposableUtil() {} + private DisposableUtils() {} /** * Calls the {@link Disposable#dispose()} method if the instance is not null. If any exceptions @@ -34,7 +34,9 @@ public static void disposeQuietly(Disposable... disposables) { .forEach( disposable -> { try { - disposable.dispose(); + if (disposable != null) { + disposable.dispose(); + } } catch (RuntimeException e) { // Suppress any exceptions during disposal }