From 08c74b79d254426351062560f5e5c00e02275517 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Sat, 7 Sep 2024 00:15:15 +0800 Subject: [PATCH] Close stream when channel was inactive (#14643) refine --- .../dubbo/rpc/protocol/tri/call/TripleClientCall.java | 8 ++++++++ .../dubbo/rpc/protocol/tri/stream/ClientStream.java | 2 ++ .../dubbo/rpc/protocol/tri/stream/TripleClientStream.java | 5 +++++ .../dubbo/rpc/protocol/tri/stream/TripleServerStream.java | 8 ++++++++ .../rpc/protocol/tri/transport/H2TransportListener.java | 2 ++ .../tri/transport/TripleHttp2ClientResponseHandler.java | 1 + .../rpc/protocol/tri/stream/MockClientStreamListener.java | 3 +++ .../tri/transport/AbstractH2TransportListenerTest.java | 3 +++ 8 files changed, 32 insertions(+) diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java index 330deaa0bd9..ead2951da68 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java @@ -136,6 +136,14 @@ public void onComplete( } } + @Override + public void onClose() { + if (done) { + return; + } + onCancelByRemote(TriRpcStatus.CANCELLED); + } + @Override public void onStart() { listener.onStart(TripleClientCall.this); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java index 304d3c46539..d0a73c0eef8 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java @@ -56,6 +56,8 @@ default void onComplete( boolean isReturnTriException) { onComplete(status, attachments); } + + void onClose(); } /** diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java index 8b060ae82df..120b6489b62 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java @@ -472,5 +472,10 @@ public void cancelByRemote(long errorCode) { finishProcess(transportError, null, false); }); } + + @Override + public void onClose() { + executor.execute(listener::onClose); + } } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java index e093e748c56..aa8799a39ef 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java @@ -483,6 +483,14 @@ public void cancelByRemote(long errorCode) { executor.execute(() -> listener.onCancelByRemote( TriRpcStatus.CANCELLED.withDescription("Canceled by client ,errorCode=" + errorCode))); } + + @Override + public void onClose() { + if (listener == null) { + return; + } + executor.execute(() -> listener.onCancelByRemote(TriRpcStatus.CANCELLED)); + } } private static class ServerDecoderListener implements TriDecoder.Listener { diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java index 2d6c62e0be4..78f0569ac41 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java @@ -42,4 +42,6 @@ public interface H2TransportListener { void onData(ByteBuf data, boolean endStream); void cancelByRemote(long errorCode); + + void onClose(); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java index b8e9230eb50..e7a1d8b2971 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java @@ -80,6 +80,7 @@ private void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame resetFrame) @Override public void channelInactive(ChannelHandlerContext ctx) { + transportListener.onClose(); ctx.close(); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java index 4daeea81370..35219f1f05b 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java @@ -36,6 +36,9 @@ public void onComplete(TriRpcStatus status, Map attachments) { this.status = status; } + @Override + public void onClose() {} + @Override public void onMessage(byte[] message, boolean isNeedReturnException) { this.message = message; diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java index 92d5b3c65cc..e17496372ba 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java @@ -40,6 +40,9 @@ public void onData(ByteBuf data, boolean endStream) {} @Override public void cancelByRemote(long errorCode) {} + + @Override + public void onClose() {} }; DefaultHttp2Headers headers = new DefaultHttp2Headers(); headers.scheme(HTTPS.name()).path("/foo.bar").method(HttpMethod.POST.asciiName());