diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java b/rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java index 809125402..aab491793 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java @@ -86,6 +86,8 @@ final class RequestChannelRequesterFlux extends Flux Context cachedContext; CoreSubscriber inboundSubscriber; boolean inboundDone; + long requested; + long produced; CompositeByteBuf frames; @@ -138,6 +140,8 @@ public final void request(long n) { return; } + this.requested = Operators.addCap(this.requested, n); + long previousState = addRequestN(STATE, this, n, this.requesterLeaseTracker == null); if (isTerminated(previousState)) { return; @@ -706,6 +710,27 @@ public final void handlePayload(Payload value) { return; } + final long produced = this.produced; + if (this.requested == produced) { + value.release(); + if (!tryCancel()) { + return; + } + + final Throwable cause = + Exceptions.failWithOverflow( + "The number of messages received exceeds the number requested"); + final RequestInterceptor requestInterceptor = this.requestInterceptor; + if (requestInterceptor != null) { + requestInterceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, cause); + } + + this.inboundSubscriber.onError(cause); + return; + } + + this.produced = produced + 1; + this.inboundSubscriber.onNext(value); } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java b/rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java index 8dac9858d..32128fee4 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java @@ -88,6 +88,8 @@ final class RequestChannelResponderSubscriber extends Flux boolean inboundDone; boolean outboundDone; + long requested; + long produced; public RequestChannelResponderSubscriber( int streamId, @@ -179,6 +181,8 @@ public void request(long n) { return; } + this.requested = Operators.addCap(this.requested, n); + long previousState = StateUtils.addRequestN(STATE, this, n); if (isTerminated(previousState)) { // full termination can be the result of both sides completion / cancelFrame / remote or local @@ -196,6 +200,9 @@ public void request(long n) { Payload firstPayload = this.firstPayload; if (firstPayload != null) { this.firstPayload = null; + + this.produced++; + inboundSubscriber.onNext(firstPayload); } @@ -216,6 +223,8 @@ public void request(long n) { final Payload firstPayload = this.firstPayload; this.firstPayload = null; + this.produced++; + inboundSubscriber.onNext(firstPayload); inboundSubscriber.onComplete(); @@ -238,6 +247,9 @@ public void request(long n) { final Payload firstPayload = this.firstPayload; this.firstPayload = null; + + this.produced++; + inboundSubscriber.onNext(firstPayload); previousState = markFirstFrameSent(STATE, this); @@ -416,6 +428,58 @@ final void handlePayload(Payload p) { return; } + final long produced = this.produced; + if (this.requested == produced) { + p.release(); + + this.inboundDone = true; + + final Throwable cause = + Exceptions.failWithOverflow( + "The number of messages received exceeds the number requested"); + boolean wasThrowableAdded = Exceptions.addThrowable(INBOUND_ERROR, this, cause); + + long previousState = markTerminated(STATE, this); + if (isTerminated(previousState)) { + if (!wasThrowableAdded) { + Operators.onErrorDropped(cause, this.inboundSubscriber.currentContext()); + } + return; + } + + this.requesterResponderSupport.remove(this.streamId, this); + + this.connection.sendFrame( + streamId, + ErrorFrameCodec.encode( + this.allocator, streamId, new CanceledException(cause.getMessage()))); + + if (!isSubscribed(previousState)) { + final Payload firstPayload = this.firstPayload; + this.firstPayload = null; + firstPayload.release(); + } else if (isFirstFrameSent(previousState) && !isInboundTerminated(previousState)) { + Throwable inboundError = Exceptions.terminate(INBOUND_ERROR, this); + if (inboundError != TERMINATED) { + //noinspection ConstantConditions + this.inboundSubscriber.onError(inboundError); + } + } + + // this is downstream subscription so need to cancel it just in case error signal has not + // reached it + // needs for disconnected upstream and downstream case + this.outboundSubscription.cancel(); + + final RequestInterceptor interceptor = requestInterceptor; + if (interceptor != null) { + interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, cause); + } + return; + } + + this.produced = produced + 1; + this.inboundSubscriber.onNext(p); } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java b/rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java index 424451a58..55ec43feb 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestStreamRequesterFlux.java @@ -65,6 +65,8 @@ final class RequestStreamRequesterFlux extends Flux CoreSubscriber inboundSubscriber; CompositeByteBuf frames; boolean done; + long requested; + long produced; RequestStreamRequesterFlux(Payload payload, RequesterResponderSupport requesterResponderSupport) { this.allocator = requesterResponderSupport.getAllocator(); @@ -134,6 +136,8 @@ public final void request(long n) { return; } + this.requested = Operators.addCap(this.requested, n); + final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker; final boolean leaseEnabled = requesterLeaseTracker != null; final long previousState = addRequestN(STATE, this, n, !leaseEnabled); @@ -295,6 +299,34 @@ public final void handlePayload(Payload p) { return; } + final long produced = this.produced; + if (this.requested == produced) { + p.release(); + + long previousState = markTerminated(STATE, this); + if (isTerminated(previousState)) { + return; + } + + final int streamId = this.streamId; + this.requesterResponderSupport.remove(streamId, this); + + final IllegalStateException cause = + Exceptions.failWithOverflow( + "The number of messages received exceeds the number requested"); + this.connection.sendFrame(streamId, CancelFrameCodec.encode(this.allocator, streamId)); + + final RequestInterceptor requestInterceptor = this.requestInterceptor; + if (requestInterceptor != null) { + requestInterceptor.onTerminate(streamId, FrameType.REQUEST_STREAM, cause); + } + + this.inboundSubscriber.onError(cause); + return; + } + + this.produced = produced + 1; + this.inboundSubscriber.onNext(p); } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RequestChannelRequesterFluxTest.java b/rsocket-core/src/test/java/io/rsocket/core/RequestChannelRequesterFluxTest.java index e39b0d690..c1e0a6876 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RequestChannelRequesterFluxTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RequestChannelRequesterFluxTest.java @@ -16,6 +16,7 @@ package io.rsocket.core; import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK; +import static io.rsocket.frame.FrameType.CANCEL; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -40,6 +41,7 @@ import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -513,6 +515,77 @@ public void errorShouldTerminateExecution(String terminationMode) { stateAssert.isTerminated(); } + @Test + public void failOnOverflow() { + final TestRequesterResponderSupport activeStreams = TestRequesterResponderSupport.client(); + final LeaksTrackingByteBufAllocator allocator = activeStreams.getAllocator(); + final TestDuplexConnection sender = activeStreams.getDuplexConnection(); + final TestPublisher publisher = TestPublisher.create(); + + final RequestChannelRequesterFlux requestChannelRequesterFlux = + new RequestChannelRequesterFlux(publisher, activeStreams); + final StateAssert stateAssert = + StateAssert.assertThat(requestChannelRequesterFlux); + + // state machine check + + stateAssert.isUnsubscribed(); + activeStreams.assertNoActiveStreams(); + + final AssertSubscriber assertSubscriber = + requestChannelRequesterFlux.subscribeWith(AssertSubscriber.create(0)); + activeStreams.assertNoActiveStreams(); + + // state machine check + stateAssert.hasSubscribedFlagOnly(); + + assertSubscriber.request(1); + stateAssert.hasSubscribedFlag().hasRequestN(1).hasNoFirstFrameSentFlag(); + activeStreams.assertNoActiveStreams(); + + Payload payload1 = TestRequesterResponderSupport.randomPayload(allocator); + + publisher.next(payload1.retain()); + + FrameAssert.assertThat(sender.awaitFrame()) + .typeOf(FrameType.REQUEST_CHANNEL) + .hasPayload(payload1) + .hasRequestN(1) + .hasNoLeaks(); + payload1.release(); + + stateAssert.hasSubscribedFlag().hasRequestN(1).hasFirstFrameSentFlag(); + activeStreams.assertHasStream(1, requestChannelRequesterFlux); + + publisher.assertMaxRequested(1); + + Payload nextPayload = TestRequesterResponderSupport.genericPayload(allocator); + requestChannelRequesterFlux.handlePayload(nextPayload); + + Payload unrequestedPayload = TestRequesterResponderSupport.genericPayload(allocator); + requestChannelRequesterFlux.handlePayload(unrequestedPayload); + + final ByteBuf cancelFrame = sender.awaitFrame(); + FrameAssert.assertThat(cancelFrame) + .isNotNull() + .typeOf(CANCEL) + .hasClientSideStreamId() + .hasStreamId(1) + .hasNoLeaks(); + + assertSubscriber + .assertValuesWith(p -> PayloadAssert.assertThat(p).isSameAs(nextPayload).hasNoLeaks()) + .assertError() + .assertErrorMessage("The number of messages received exceeds the number requested"); + + publisher.assertWasCancelled(); + + activeStreams.assertNoActiveStreams(); + // state machine check + stateAssert.isTerminated(); + Assertions.assertThat(sender.isEmpty()).isTrue(); + } + /* * +--------------------------------+ * | Racing Test Cases | diff --git a/rsocket-core/src/test/java/io/rsocket/core/RequestChannelResponderSubscriberTest.java b/rsocket-core/src/test/java/io/rsocket/core/RequestChannelResponderSubscriberTest.java index 32af4e3b6..890458caf 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RequestChannelResponderSubscriberTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RequestChannelResponderSubscriberTest.java @@ -263,6 +263,143 @@ public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately(String comp allocator.assertHasNoLeaks(); } + @Test + public void failOnOverflow() { + final TestRequesterResponderSupport activeStreams = TestRequesterResponderSupport.client(); + final LeaksTrackingByteBufAllocator allocator = activeStreams.getAllocator(); + final TestDuplexConnection sender = activeStreams.getDuplexConnection(); + final Payload firstPayload = TestRequesterResponderSupport.genericPayload(allocator); + final TestPublisher publisher = TestPublisher.create(); + + final RequestChannelResponderSubscriber requestChannelResponderSubscriber = + new RequestChannelResponderSubscriber(1, 1, firstPayload, activeStreams); + final StateAssert stateAssert = + StateAssert.assertThat(requestChannelResponderSubscriber); + activeStreams.activeStreams.put(1, requestChannelResponderSubscriber); + + // state machine check + stateAssert.isUnsubscribed().hasRequestN(0); + activeStreams.assertHasStream(1, requestChannelResponderSubscriber); + + publisher.subscribe(requestChannelResponderSubscriber); + publisher.assertMaxRequested(1); + // state machine check + stateAssert.isUnsubscribed().hasRequestN(0); + + final AssertSubscriber assertSubscriber = + requestChannelResponderSubscriber.subscribeWith(AssertSubscriber.create(0)); + Assertions.assertThat(firstPayload.refCnt()).isOne(); + + // state machine check + stateAssert.hasSubscribedFlagOnly().hasRequestN(0); + + assertSubscriber.request(1); + + // state machine check + stateAssert.hasSubscribedFlag().hasFirstFrameSentFlag().hasRequestN(1); + + // should not send requestN since 1 is remaining + Assertions.assertThat(sender.isEmpty()).isTrue(); + + assertSubscriber.request(1); + + stateAssert.hasSubscribedFlag().hasRequestN(2).hasFirstFrameSentFlag(); + + // should not send requestN since 1 is remaining + FrameAssert.assertThat(sender.awaitFrame()) + .typeOf(REQUEST_N) + .hasStreamId(1) + .hasRequestN(1) + .hasNoLeaks(); + + Payload nextPayload = TestRequesterResponderSupport.genericPayload(allocator); + requestChannelResponderSubscriber.handlePayload(nextPayload); + + Payload unrequestedPayload = TestRequesterResponderSupport.genericPayload(allocator); + requestChannelResponderSubscriber.handlePayload(unrequestedPayload); + + final ByteBuf cancelErrorFrame = sender.awaitFrame(); + FrameAssert.assertThat(cancelErrorFrame) + .isNotNull() + .typeOf(ERROR) + .hasData("The number of messages received exceeds the number requested") + .hasClientSideStreamId() + .hasStreamId(1) + .hasNoLeaks(); + + assertSubscriber + .assertValuesWith( + p -> PayloadAssert.assertThat(p).isSameAs(firstPayload).hasNoLeaks(), + p -> PayloadAssert.assertThat(p).isSameAs(nextPayload).hasNoLeaks()) + .assertErrorMessage("The number of messages received exceeds the number requested"); + + Assertions.assertThat(firstPayload.refCnt()).isZero(); + Assertions.assertThat(nextPayload.refCnt()).isZero(); + Assertions.assertThat(unrequestedPayload.refCnt()).isZero(); + stateAssert.isTerminated(); + activeStreams.assertNoActiveStreams(); + + Assertions.assertThat(sender.isEmpty()).isTrue(); + allocator.assertHasNoLeaks(); + } + + @Test + public void failOnOverflowBeforeFirstPayloadIsSent() { + final TestRequesterResponderSupport activeStreams = TestRequesterResponderSupport.client(); + final LeaksTrackingByteBufAllocator allocator = activeStreams.getAllocator(); + final TestDuplexConnection sender = activeStreams.getDuplexConnection(); + final Payload firstPayload = TestRequesterResponderSupport.genericPayload(allocator); + final TestPublisher publisher = TestPublisher.create(); + + final RequestChannelResponderSubscriber requestChannelResponderSubscriber = + new RequestChannelResponderSubscriber(1, 1, firstPayload, activeStreams); + final StateAssert stateAssert = + StateAssert.assertThat(requestChannelResponderSubscriber); + activeStreams.activeStreams.put(1, requestChannelResponderSubscriber); + + // state machine check + stateAssert.isUnsubscribed().hasRequestN(0); + activeStreams.assertHasStream(1, requestChannelResponderSubscriber); + + publisher.subscribe(requestChannelResponderSubscriber); + publisher.assertMaxRequested(1); + // state machine check + stateAssert.isUnsubscribed().hasRequestN(0); + + final AssertSubscriber assertSubscriber = + requestChannelResponderSubscriber.subscribeWith(AssertSubscriber.create(0)); + Assertions.assertThat(firstPayload.refCnt()).isOne(); + + // state machine check + stateAssert.hasSubscribedFlagOnly().hasRequestN(0); + + Payload unrequestedPayload = TestRequesterResponderSupport.genericPayload(allocator); + requestChannelResponderSubscriber.handlePayload(unrequestedPayload); + + final ByteBuf cancelErrorFrame = sender.awaitFrame(); + FrameAssert.assertThat(cancelErrorFrame) + .isNotNull() + .typeOf(ERROR) + .hasData("The number of messages received exceeds the number requested") + .hasClientSideStreamId() + .hasStreamId(1) + .hasNoLeaks(); + + assertSubscriber.request(1); + + assertSubscriber + .assertValuesWith(p -> PayloadAssert.assertThat(p).isSameAs(firstPayload).hasNoLeaks()) + .assertErrorMessage("The number of messages received exceeds the number requested"); + + Assertions.assertThat(firstPayload.refCnt()).isZero(); + Assertions.assertThat(unrequestedPayload.refCnt()).isZero(); + stateAssert.isTerminated(); + activeStreams.assertNoActiveStreams(); + + Assertions.assertThat(sender.isEmpty()).isTrue(); + allocator.assertHasNoLeaks(); + } + /* * +--------------------------------+ * | Racing Test Cases | @@ -664,7 +801,7 @@ public void shouldHaveNoLeaksOnReassemblyAndCancelRacing(String terminationMode) ; final TestPublisher publisher = TestPublisher.createNoncompliant(DEFER_CANCELLATION, CLEANUP_ON_TERMINATE); - final AssertSubscriber assertSubscriber = new AssertSubscriber<>(1); + final AssertSubscriber assertSubscriber = new AssertSubscriber<>(2); Payload firstPayload = TestRequesterResponderSupport.genericPayload(allocator); final RequestChannelResponderSubscriber requestOperator = @@ -725,8 +862,17 @@ public void shouldHaveNoLeaksOnReassemblyAndCancelRacing(String terminationMode) assertSubscriber.assertTerminated().assertError(); } - final ByteBuf frame = sender.awaitFrame(); - FrameAssert.assertThat(frame) + final ByteBuf requstFrame = sender.awaitFrame(); + FrameAssert.assertThat(requstFrame) + .isNotNull() + .typeOf(REQUEST_N) + .hasRequestN(1) + .hasClientSideStreamId() + .hasStreamId(1) + .hasNoLeaks(); + + final ByteBuf terminalFrame = sender.awaitFrame(); + FrameAssert.assertThat(terminalFrame) .isNotNull() .typeOf(terminationMode.equals("cancel") ? CANCEL : ERROR) .hasClientSideStreamId() diff --git a/rsocket-core/src/test/java/io/rsocket/core/RequestStreamRequesterFluxTest.java b/rsocket-core/src/test/java/io/rsocket/core/RequestStreamRequesterFluxTest.java index 88dd5441e..8702d1a80 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RequestStreamRequesterFluxTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RequestStreamRequesterFluxTest.java @@ -926,7 +926,7 @@ public void shouldErrorOnIncorrectRefCntInGivenPayloadLatePhase() { final TestRequesterResponderSupport activeStreams = TestRequesterResponderSupport.client(); final LeaksTrackingByteBufAllocator allocator = activeStreams.getAllocator(); final TestDuplexConnection sender = activeStreams.getDuplexConnection(); - ; + final Payload payload = ByteBufPayload.create(""); final RequestStreamRequesterFlux requestStreamRequesterFlux = @@ -1129,6 +1129,87 @@ static Stream> shouldErrorIfNoAvailabilityS .isInstanceOf(RuntimeException.class)); } + @Test + public void failOnOverflow() { + final TestRequesterResponderSupport activeStreams = TestRequesterResponderSupport.client(); + final LeaksTrackingByteBufAllocator allocator = activeStreams.getAllocator(); + final TestDuplexConnection sender = activeStreams.getDuplexConnection(); + final Payload payload = TestRequesterResponderSupport.genericPayload(allocator); + + final RequestStreamRequesterFlux requestStreamRequesterFlux = + new RequestStreamRequesterFlux(payload, activeStreams); + final StateAssert stateAssert = + StateAssert.assertThat(requestStreamRequesterFlux); + + // state machine check + + stateAssert.isUnsubscribed(); + activeStreams.assertNoActiveStreams(); + + final AssertSubscriber assertSubscriber = + requestStreamRequesterFlux.subscribeWith(AssertSubscriber.create(0)); + Assertions.assertThat(payload.refCnt()).isOne(); + activeStreams.assertNoActiveStreams(); + // state machine check + stateAssert.hasSubscribedFlagOnly(); + + assertSubscriber.request(1); + + Assertions.assertThat(payload.refCnt()).isZero(); + activeStreams.assertHasStream(1, requestStreamRequesterFlux); + + // state machine check + stateAssert.hasSubscribedFlag().hasRequestN(1).hasFirstFrameSentFlag(); + + final ByteBuf frame = sender.awaitFrame(); + FrameAssert.assertThat(frame) + .isNotNull() + .hasPayloadSize( + "testData".getBytes(CharsetUtil.UTF_8).length + + "testMetadata".getBytes(CharsetUtil.UTF_8).length) + .hasMetadata("testMetadata") + .hasData("testData") + .hasNoFragmentsFollow() + .hasRequestN(1) + .typeOf(FrameType.REQUEST_STREAM) + .hasClientSideStreamId() + .hasStreamId(1) + .hasNoLeaks(); + + Assertions.assertThat(sender.isEmpty()).isTrue(); + + Payload requestedPayload = TestRequesterResponderSupport.randomPayload(allocator); + requestStreamRequesterFlux.handlePayload(requestedPayload); + + Payload unrequestedPayload = TestRequesterResponderSupport.randomPayload(allocator); + requestStreamRequesterFlux.handlePayload(unrequestedPayload); + + final ByteBuf cancelFrame = sender.awaitFrame(); + FrameAssert.assertThat(cancelFrame) + .isNotNull() + .typeOf(FrameType.CANCEL) + .hasClientSideStreamId() + .hasStreamId(1) + .hasNoLeaks(); + + assertSubscriber + .assertValuesWith(p -> PayloadAssert.assertThat(p).isEqualTo(requestedPayload).hasNoLeaks()) + .assertError() + .assertErrorMessage("The number of messages received exceeds the number requested"); + + PayloadAssert.assertThat(requestedPayload).isReleased(); + PayloadAssert.assertThat(unrequestedPayload).isReleased(); + + Assertions.assertThat(payload.refCnt()).isZero(); + activeStreams.assertNoActiveStreams(); + + Assertions.assertThat(sender.isEmpty()).isTrue(); + + // state machine check + stateAssert.isTerminated(); + allocator.assertHasNoLeaks(); + } + @Test public void checkName() { final TestRequesterResponderSupport activeStreams = TestRequesterResponderSupport.client();