Skip to content

Commit

Permalink
Close stream when channel was inactive (#14643)
Browse files Browse the repository at this point in the history
refine
  • Loading branch information
finefuture authored Sep 6, 2024
1 parent ac6704d commit 08c74b7
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ public void onComplete(
}
}

@Override
public void onClose() {
if (done) {
return;
}
onCancelByRemote(TriRpcStatus.CANCELLED);
}

@Override
public void onStart() {
listener.onStart(TripleClientCall.this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ default void onComplete(
boolean isReturnTriException) {
onComplete(status, attachments);
}

void onClose();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,5 +472,10 @@ public void cancelByRemote(long errorCode) {
finishProcess(transportError, null, false);
});
}

@Override
public void onClose() {
executor.execute(listener::onClose);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,14 @@ public void cancelByRemote(long errorCode) {
executor.execute(() -> listener.onCancelByRemote(
TriRpcStatus.CANCELLED.withDescription("Canceled by client ,errorCode=" + errorCode)));
}

@Override
public void onClose() {
if (listener == null) {
return;
}
executor.execute(() -> listener.onCancelByRemote(TriRpcStatus.CANCELLED));
}
}

private static class ServerDecoderListener implements TriDecoder.Listener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public interface H2TransportListener {
void onData(ByteBuf data, boolean endStream);

void cancelByRemote(long errorCode);

void onClose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame resetFrame)

@Override
public void channelInactive(ChannelHandlerContext ctx) {
transportListener.onClose();
ctx.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public void onComplete(TriRpcStatus status, Map<String, Object> attachments) {
this.status = status;
}

@Override
public void onClose() {}

@Override
public void onMessage(byte[] message, boolean isNeedReturnException) {
this.message = message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public void onData(ByteBuf data, boolean endStream) {}

@Override
public void cancelByRemote(long errorCode) {}

@Override
public void onClose() {}
};
DefaultHttp2Headers headers = new DefaultHttp2Headers();
headers.scheme(HTTPS.name()).path("/foo.bar").method(HttpMethod.POST.asciiName());
Expand Down

0 comments on commit 08c74b7

Please sign in to comment.