Skip to content

Commit

Permalink
fixes test
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Apr 21, 2020
1 parent 5492447 commit 8a3103e
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ public void accept(long l) {
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}
})
.doFinally(signalType -> channelProcessors.remove(streamId))
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);

// not chained, as the payload should be enqueued in the Unicast processor before this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
* for releasing the payload to free memory when they no long need it.
*/
public class ZeroCopyPayloadDecoder implements PayloadDecoder {

@Override
public Payload apply(ByteBuf byteBuf) {
ByteBuf m;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,7 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
.hasMessage(INVALID_PAYLOAD_ERROR_MESSAGE))
.verify();
// FIXME: should be removed
Assertions.assertThat(rule.connection.getSent())
.allMatch(bb -> bb.release());
Assertions.assertThat(rule.connection.getSent()).allMatch(bb -> bb.release());
rule.assertHasNoLeaks();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ public Mono<Payload> requestResponse(Payload payload) {
@Test
@Timeout(2_000)
public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmentation() {
ByteBufAllocator allocator = rule.alloc();
final int streamId = 4;
final AtomicBoolean cancelled = new AtomicBoolean();
byte[] metadata = new byte[FrameLengthFlyweight.FRAME_LENGTH_MASK];
Expand Down Expand Up @@ -226,18 +225,26 @@ protected void hookOnSubscribe(Subscription subscription) {
.isInstanceOf(IllegalArgumentException.class)
.hasToString("java.lang.IllegalArgumentException: " + INVALID_PAYLOAD_ERROR_MESSAGE);
Assertions.assertThat(rule.connection.getSent())
.filteredOn(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
.hasSize(1)
.first()
.matches(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
.matches(bb -> ErrorFrameFlyweight.dataUtf8(bb).contains(INVALID_PAYLOAD_ERROR_MESSAGE))
.matches(ReferenceCounted::release);

assertThat("Subscription not cancelled.", cancelled.get(), is(true));

// FIXME: needs to be removed. May have cancel frame as well
Assertions.assertThat(rule.connection.getSent())
.filteredOn(bb -> bb.refCnt() != 0)
.allMatch(ReferenceCounted::release);
rule.init();
rule.setAcceptingSocket(acceptingSocket);
}
// FIXME: needs to be removed
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);

// FIXME: needs to be removed. May have cancel frame as well
Assertions.assertThat(rule.connection.getSent())
.filteredOn(bb -> bb.refCnt() != 0)
.allMatch(ReferenceCounted::release);
rule.assertHasNoLeaks();
}

Expand Down

0 comments on commit 8a3103e

Please sign in to comment.