Skip to content

Commit

Permalink
fixes OverflowException if UnicstProcessr request and onNext race (#985)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegDokuka authored Feb 25, 2021
1 parent 769ab2d commit 0601f88
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
10 changes: 9 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public Context currentContext() {

@Override
public void request(long n) {
this.s.request(n);
if (!firstRequest) {
try {
this.hookOnRemainingRequests(n);
Expand All @@ -115,6 +114,15 @@ public void request(long n) {
if (firstLoop) {
firstLoop = false;
try {
// since in all the scenarios where RequestOperator is used, the
// CorePublisher is either UnicastProcessor or UnicastProcessor.next()
// we are free to propagate unbounded demand to that publisher right after
// the first request happens. UnicastProcessor is only there to allow sending signals from
// the
// connection to a real subscriber and does not have to check the real demand
// For more info see
// https://github.com/rsocket/rsocket/blob/master/Protocol.md#handling-the-unexpected
this.s.request(Long.MAX_VALUE);
this.hookOnFirstRequest(n);
} catch (Throwable throwable) {
onError(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,51 @@ public void testWorkaround858() {
rule.assertHasNoLeaks();
}

@ParameterizedTest
@ValueSource(strings = {"stream", "channel"})
// see https://github.com/rsocket/rsocket-java/issues/959
public void testWorkaround959(String type) {
for (int i = 1; i < 20000; i += 2) {
ByteBuf buffer = rule.alloc().buffer();
buffer.writeCharSequence("test", CharsetUtil.UTF_8);

final AssertSubscriber<Payload> assertSubscriber = new AssertSubscriber<>(3);
if (type.equals("stream")) {
rule.socket.requestStream(ByteBufPayload.create(buffer)).subscribe(assertSubscriber);
} else if (type.equals("channel")) {
rule.socket
.requestChannel(Flux.just(ByteBufPayload.create(buffer)))
.subscribe(assertSubscriber);
}

final ByteBuf payloadFrame =
PayloadFrameCodec.encode(
rule.alloc(), i, false, false, true, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER);

RaceTestUtils.race(
() -> {
rule.connection.addToReceivedBuffer(payloadFrame.copy());
rule.connection.addToReceivedBuffer(payloadFrame.copy());
rule.connection.addToReceivedBuffer(payloadFrame);
},
() -> {
assertSubscriber.request(1);
assertSubscriber.request(1);
assertSubscriber.request(1);
});

Assertions.assertThat(rule.connection.getSent()).allMatch(ByteBuf::release);

Assertions.assertThat(rule.socket.isDisposed()).isFalse();

assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);
assertSubscriber.assertNoError();

rule.connection.clearSendReceiveBuffers();
rule.assertHasNoLeaks();
}
}

public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
@Override
protected RSocketRequester newRSocket() {
Expand Down

0 comments on commit 0601f88

Please sign in to comment.