Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Sep 14, 2020
1 parent b76e58a commit e355d53
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ public boolean resumableFrameReceived(ByteBuf frame) {
}
}

@Override
public void pauseImplied() {
void pauseImplied() {
for (; ; ) {
final long impliedPosition = this.impliedPosition;

Expand All @@ -163,8 +162,7 @@ public void pauseImplied() {
}
}

@Override
public void resumeImplied() {
void resumeImplied() {
for (; ; ) {
final long impliedPosition = this.impliedPosition;

Expand Down Expand Up @@ -269,6 +267,7 @@ public void request(long n) {}

@Override
public void cancel() {
pauseImplied();
state = 0;
}

Expand All @@ -285,6 +284,7 @@ public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
}

this.actual = actual;
resumeImplied();
STATE.compareAndSet(this, 0, 1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,33 @@ public boolean connect(DuplexConnection nextConnection) {

activeConnection.dispose();

final FrameReceivingSubscriber frameReceivingSubscriber =
new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber);
this.activeReceivingSubscriber = frameReceivingSubscriber;
final Disposable disposable =
resumableFramesStore
.resumeStream()
.subscribe(f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f));
resumableFramesStore.resumeImplied();
nextConnection.receive().subscribe(frameReceivingSubscriber);
nextConnection
.onClose()
.doFinally(
__ -> {
frameReceivingSubscriber.dispose();
disposable.dispose();
resumableFramesStore.pauseImplied();
})
.subscribe();
initConnection(nextConnection);

return true;
} else {
return false;
}
}

void initConnection(DuplexConnection nextConnection) {
final FrameReceivingSubscriber frameReceivingSubscriber =
new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber);
this.activeReceivingSubscriber = frameReceivingSubscriber;
final Disposable disposable =
resumableFramesStore
.resumeStream()
.subscribe(f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f));
nextConnection.receive().subscribe(frameReceivingSubscriber);
nextConnection
.onClose()
.doFinally(
__ -> {
frameReceivingSubscriber.dispose();
disposable.dispose();
})
.subscribe();
}

public void disconnect() {
final DuplexConnection activeConnection = this.activeConnection;
if (activeConnection != DisposedConnection.INSTANCE) {
Expand Down Expand Up @@ -172,6 +175,7 @@ public void dispose() {
}

framesSaverDisposable.dispose();
activeReceivingSubscriber.dispose();
savableFramesSender.dispose();
onClose.onComplete();
}
Expand All @@ -189,27 +193,7 @@ public SocketAddress remoteAddress() {
@Override
public void request(long n) {
if (state == 1 && STATE.compareAndSet(this, 1, 2)) {
final DuplexConnection connection = this.activeConnection;
if (connection != null) {
final FrameReceivingSubscriber frameReceivingSubscriber =
new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber);
this.activeReceivingSubscriber = frameReceivingSubscriber;
final Disposable disposable =
resumableFramesStore
.resumeStream()
.subscribe(f -> connection.sendFrame(FrameHeaderCodec.streamId(f), f));
resumableFramesStore.resumeImplied();
connection.receive().subscribe(activeReceivingSubscriber);
connection
.onClose()
.doFinally(
__ -> {
frameReceivingSubscriber.dispose();
disposable.dispose();
resumableFramesStore.pauseImplied();
})
.subscribe();
}
initConnection(this.activeConnection);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ public interface ResumableFramesStore extends Closeable {
/** @return Implied frame position as defined by RSocket protocol */
long frameImpliedPosition();

void pauseImplied();

void resumeImplied();

/**
* Received resumable frame as defined by RSocket protocol. Implementation must increment frame
* implied position
*
* @return {@code true} if information about the frame has been stored
*/
boolean resumableFrameReceived(ByteBuf frame);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public LeaksTrackingByteBufAllocator assertHasNoLeaks() {

final Duration awaitZeroRefCntDuration = this.awaitZeroRefCntDuration;
if (!unreleased.isEmpty() && !awaitZeroRefCntDuration.isZero()) {
long end = System.nanoTime() + awaitZeroRefCntDuration.toNanos();
long endTimeInMillis = System.currentTimeMillis() + awaitZeroRefCntDuration.toMillis();
boolean hasUnreleased;
while (System.nanoTime() < end) {
while (System.currentTimeMillis() <= endTimeInMillis) {
hasUnreleased = false;
for (ByteBuf bb : unreleased) {
if (bb.refCnt() != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ class TransportPair<T, S extends Closeable> implements Disposable {
private static final String metadata = "metadata";

private final LeaksTrackingByteBufAllocator byteBufAllocator =
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofSeconds(10));
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofMinutes(1));

private final TestRSocket responder;

Expand Down

0 comments on commit e355d53

Please sign in to comment.