From e355d53c4ab299408a30f8f9d8743bca8ec902b1 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 14 Sep 2020 15:41:09 +0300 Subject: [PATCH] cleanups Signed-off-by: Oleh Dokuka --- .../resume/InMemoryResumableFramesStore.java | 8 +-- .../resume/ResumableDuplexConnection.java | 62 +++++++------------ .../rsocket/resume/ResumableFramesStore.java | 6 +- .../test/LeaksTrackingByteBufAllocator.java | 4 +- .../java/io/rsocket/test/TransportTest.java | 2 +- 5 files changed, 32 insertions(+), 50 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java index d68e564df..a6148bd08 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java @@ -151,8 +151,7 @@ public boolean resumableFrameReceived(ByteBuf frame) { } } - @Override - public void pauseImplied() { + void pauseImplied() { for (; ; ) { final long impliedPosition = this.impliedPosition; @@ -163,8 +162,7 @@ public void pauseImplied() { } } - @Override - public void resumeImplied() { + void resumeImplied() { for (; ; ) { final long impliedPosition = this.impliedPosition; @@ -269,6 +267,7 @@ public void request(long n) {} @Override public void cancel() { + pauseImplied(); state = 0; } @@ -285,6 +284,7 @@ public void subscribe(CoreSubscriber actual) { } this.actual = actual; + resumeImplied(); STATE.compareAndSet(this, 0, 1); } } diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 9873e55f2..60484f9d1 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -74,30 +74,33 @@ public boolean connect(DuplexConnection nextConnection) { activeConnection.dispose(); - final FrameReceivingSubscriber frameReceivingSubscriber = - new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber); - this.activeReceivingSubscriber = frameReceivingSubscriber; - final Disposable disposable = - resumableFramesStore - .resumeStream() - .subscribe(f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f)); - resumableFramesStore.resumeImplied(); - nextConnection.receive().subscribe(frameReceivingSubscriber); - nextConnection - .onClose() - .doFinally( - __ -> { - frameReceivingSubscriber.dispose(); - disposable.dispose(); - resumableFramesStore.pauseImplied(); - }) - .subscribe(); + initConnection(nextConnection); + return true; } else { return false; } } + void initConnection(DuplexConnection nextConnection) { + final FrameReceivingSubscriber frameReceivingSubscriber = + new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber); + this.activeReceivingSubscriber = frameReceivingSubscriber; + final Disposable disposable = + resumableFramesStore + .resumeStream() + .subscribe(f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f)); + nextConnection.receive().subscribe(frameReceivingSubscriber); + nextConnection + .onClose() + .doFinally( + __ -> { + frameReceivingSubscriber.dispose(); + disposable.dispose(); + }) + .subscribe(); + } + public void disconnect() { final DuplexConnection activeConnection = this.activeConnection; if (activeConnection != DisposedConnection.INSTANCE) { @@ -172,6 +175,7 @@ public void dispose() { } framesSaverDisposable.dispose(); + activeReceivingSubscriber.dispose(); savableFramesSender.dispose(); onClose.onComplete(); } @@ -189,27 +193,7 @@ public SocketAddress remoteAddress() { @Override public void request(long n) { if (state == 1 && STATE.compareAndSet(this, 1, 2)) { - final DuplexConnection connection = this.activeConnection; - if (connection != null) { - final FrameReceivingSubscriber frameReceivingSubscriber = - new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber); - this.activeReceivingSubscriber = frameReceivingSubscriber; - final Disposable disposable = - resumableFramesStore - .resumeStream() - .subscribe(f -> connection.sendFrame(FrameHeaderCodec.streamId(f), f)); - resumableFramesStore.resumeImplied(); - connection.receive().subscribe(activeReceivingSubscriber); - connection - .onClose() - .doFinally( - __ -> { - frameReceivingSubscriber.dispose(); - disposable.dispose(); - resumableFramesStore.pauseImplied(); - }) - .subscribe(); - } + initConnection(this.activeConnection); } } diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableFramesStore.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableFramesStore.java index 66470d29a..80d9a36dd 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableFramesStore.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableFramesStore.java @@ -47,13 +47,11 @@ public interface ResumableFramesStore extends Closeable { /** @return Implied frame position as defined by RSocket protocol */ long frameImpliedPosition(); - void pauseImplied(); - - void resumeImplied(); - /** * Received resumable frame as defined by RSocket protocol. Implementation must increment frame * implied position + * + * @return {@code true} if information about the frame has been stored */ boolean resumableFrameReceived(ByteBuf frame); } diff --git a/rsocket-test/src/main/java/io/rsocket/test/LeaksTrackingByteBufAllocator.java b/rsocket-test/src/main/java/io/rsocket/test/LeaksTrackingByteBufAllocator.java index a12c87d30..2ca646936 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/LeaksTrackingByteBufAllocator.java +++ b/rsocket-test/src/main/java/io/rsocket/test/LeaksTrackingByteBufAllocator.java @@ -60,9 +60,9 @@ public LeaksTrackingByteBufAllocator assertHasNoLeaks() { final Duration awaitZeroRefCntDuration = this.awaitZeroRefCntDuration; if (!unreleased.isEmpty() && !awaitZeroRefCntDuration.isZero()) { - long end = System.nanoTime() + awaitZeroRefCntDuration.toNanos(); + long endTimeInMillis = System.currentTimeMillis() + awaitZeroRefCntDuration.toMillis(); boolean hasUnreleased; - while (System.nanoTime() < end) { + while (System.currentTimeMillis() <= endTimeInMillis) { hasUnreleased = false; for (ByteBuf bb : unreleased) { if (bb.refCnt() != 0) { diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index e5e351768..48472dec9 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -446,7 +446,7 @@ class TransportPair implements Disposable { private static final String metadata = "metadata"; private final LeaksTrackingByteBufAllocator byteBufAllocator = - LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofSeconds(10)); + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofMinutes(1)); private final TestRSocket responder;