Skip to content

Commit

Permalink
improves BaseDuplexConnection and related subclasses
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
  • Loading branch information
OlegDokuka committed Sep 14, 2022
1 parent 9804688 commit 000f6da
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void sendFrame(int streamId, ByteBuf frame) {
protected abstract void doOnClose();

@Override
public final Mono<Void> onClose() {
public Mono<Void> onClose() {
return onClose.asMono();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.SocketAddress;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

/** An implementation of {@link DuplexConnection} that connects via TCP. */
Expand Down Expand Up @@ -67,24 +68,19 @@ protected void doOnClose() {
connection.dispose();
}

@Override
public Mono<Void> onClose() {
return super.onClose().and(connection.onDispose());
}

@Override
public void sendErrorAndClose(RSocketErrorException e) {
final ByteBuf errorFrame = ErrorFrameCodec.encode(alloc(), 0, e);
connection
.outbound()
.sendObject(FrameLengthCodec.encode(alloc(), errorFrame.readableBytes(), errorFrame))
.then()
.subscribe(
null,
t -> onClose.tryEmitError(t),
() -> {
final Throwable cause = e.getCause();
if (cause == null) {
onClose.tryEmitEmpty();
} else {
onClose.tryEmitError(cause);
}
});
.subscribe(connection.disposeSubscriber());
sender.onComplete();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.SocketAddress;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

/**
Expand Down Expand Up @@ -72,6 +73,11 @@ protected void doOnClose() {
connection.dispose();
}

@Override
public Mono<Void> onClose() {
return super.onClose().and(connection.onDispose());
}

@Override
public Flux<ByteBuf> receive() {
return connection.inbound().receive();
Expand All @@ -83,17 +89,7 @@ public void sendErrorAndClose(RSocketErrorException e) {
connection
.outbound()
.sendObject(new BinaryWebSocketFrame(errorFrame))
.then()
.subscribe(
null,
t -> onClose.tryEmitError(t),
() -> {
final Throwable cause = e.getCause();
if (cause == null) {
onClose.tryEmitEmpty();
} else {
onClose.tryEmitError(cause);
}
});
.subscribe(connection.disposeSubscriber());
sender.onComplete();
}
}

0 comments on commit 000f6da

Please sign in to comment.