Skip to content

Commit

Permalink
DefaultPayload#create properly copies ByteBuf content
Browse files Browse the repository at this point in the history
Closes gh-970

Signed-off-by: Rossen Stoyanchev <rstoyanchev@vmware.com>
  • Loading branch information
rstoyanchev committed Mar 3, 2021
1 parent 0601f88 commit e06ee7f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
13 changes: 11 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static Payload create(ByteBuf data) {

public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
try {
return create(data.nioBuffer(), metadata == null ? null : metadata.nioBuffer());
return create(toBytes(data), metadata != null ? toBytes(metadata) : null);
} finally {
data.release();
if (metadata != null) {
Expand All @@ -110,7 +110,16 @@ public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
}

public static Payload create(Payload payload) {
return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null);
return create(
toBytes(payload.data()), payload.hasMetadata() ? toBytes(payload.metadata()) : null);
}

private static byte[] toBytes(ByteBuf byteBuf) {
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.markReaderIndex();
byteBuf.readBytes(bytes);
byteBuf.resetReaderIndex();
return bytes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,14 @@ public void ensuresThatSetupPayloadCanBeRetained() {
@Test
public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions() {
Payload setupPayload = ByteBufPayload.create("TestData", "TestMetadata");

Assertions.assertThat(setupPayload.refCnt()).isOne();

// Keep the data and metadata around so we can try changing them independently
ByteBuf dataBuf = setupPayload.data();
ByteBuf metadataBuf = setupPayload.metadata();
dataBuf.retain();
metadataBuf.retain();

TestClientTransport testClientTransport = new TestClientTransport();
Mono<RSocket> connectionMono =
RSocketConnector.create().setupPayload(setupPayload).connect(testClientTransport);
Expand All @@ -92,6 +97,15 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
.expectComplete()
.verify(Duration.ofMillis(100));

// Changing the original data and metadata should not impact the SetupPayload
dataBuf.writerIndex(dataBuf.readerIndex());
dataBuf.writeChar('d');
dataBuf.release();

metadataBuf.writerIndex(metadataBuf.readerIndex());
metadataBuf.writeChar('m');
metadataBuf.release();

Assertions.assertThat(testClientTransport.testConnection().getSent())
.hasSize(2)
.allMatch(
Expand All @@ -100,7 +114,11 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
return payload.getDataUtf8().equals("TestData")
&& payload.getMetadataUtf8().equals("TestMetadata");
})
.allMatch(ReferenceCounted::release);
.allMatch(
byteBuf -> {
System.out.println("calling release " + byteBuf.refCnt());
return byteBuf.release();
});
Assertions.assertThat(setupPayload.refCnt()).isZero();
}

Expand Down

0 comments on commit e06ee7f

Please sign in to comment.