Skip to content

Commit

Permalink
fixes integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Sep 14, 2020
1 parent 78a747a commit 2c0e76d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.rsocket.transport.ServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
Expand Down Expand Up @@ -47,7 +46,6 @@ void responderRejectSetup() {
}

@Test
@Disabled("FIXME: needs to be revised")
void requesterStreamsTerminatedOnZeroErrorFrame() {
LeaksTrackingByteBufAllocator allocator =
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public LeaksTrackingByteBufAllocator assertHasNoLeaks() {

final Duration awaitZeroRefCntDuration = this.awaitZeroRefCntDuration;
if (!unreleased.isEmpty() && !awaitZeroRefCntDuration.isZero()) {
long endTimeInMillis = System.currentTimeMillis() + awaitZeroRefCntDuration.toMillis();
final long startTime = System.currentTimeMillis();
final long endTimeInMillis = startTime + awaitZeroRefCntDuration.toMillis();
boolean hasUnreleased;
while (System.currentTimeMillis() <= endTimeInMillis) {
hasUnreleased = false;
Expand All @@ -72,14 +73,21 @@ public LeaksTrackingByteBufAllocator assertHasNoLeaks() {
}

if (!hasUnreleased) {
break;
System.out.println("all the buffers are released...");
return this;
}

parkNanos(100);
System.out.println("await buffers to be released");
for (int i = 0; i < 100; i++) {
System.gc();
parkNanos(1000);
System.gc();
}
}
}

Assertions.assertThat(unreleased).allMatch(bb -> bb.refCnt() == 0);
System.out.println("all the buffers are released...");
} finally {
tracker.clear();
}
Expand Down
5 changes: 5 additions & 0 deletions rsocket-test/src/main/java/io/rsocket/test/TransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ default void setUp() {
default void close() {
getTransportPair().responder.awaitAllInteractionTermination(getTimeout());
getTransportPair().dispose();
getTransportPair().awaitClosed();
getTransportPair().byteBufAllocator.assertHasNoLeaks();
Hooks.resetOnOperatorDebug();
}
Expand Down Expand Up @@ -611,6 +612,10 @@ public String expectedPayloadMetadata() {
return metadata;
}

public void awaitClosed() {
server.onClose().and(client.onClose()).block(Duration.ofMinutes(1));
}

private static class AsyncDuplexConnection implements DuplexConnection {

private final DuplexConnection duplexConnection;
Expand Down

0 comments on commit 2c0e76d

Please sign in to comment.