diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java new file mode 100644 index 00000000000..62fdee1e6ec --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.http12.h2; + +import org.apache.dubbo.remoting.http12.ErrorCodeHolder; + +public class CancelStreamException extends RuntimeException implements ErrorCodeHolder { + + private final boolean cancelByRemote; + + private final long errorCode; + + private CancelStreamException(boolean cancelByRemote, long errorCode) { + this.cancelByRemote = cancelByRemote; + this.errorCode = errorCode; + } + + public boolean isCancelByRemote() { + return cancelByRemote; + } + + public static CancelStreamException fromRemote(long errorCode) { + return new CancelStreamException(true, errorCode); + } + + public static CancelStreamException fromLocal(long errorCode) { + return new CancelStreamException(false, errorCode); + } + + @Override + public long getErrorCode() { + return errorCode; + } +} diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java index c9fa6af4bb5..89d77965d62 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java @@ -37,6 +37,8 @@ public class Http2ServerChannelObserver extends AbstractServerHttpChannelObserve private boolean autoRequestN = true; + private boolean closed = false; + public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) { super(h2StreamChannel); } @@ -74,12 +76,33 @@ public CancellationContext getCancellationContext() { @Override public void cancel(Throwable throwable) { + if (throwable instanceof CancelStreamException) { + if (((CancelStreamException) throwable).isCancelByRemote()) { + closed = true; + } + } + this.cancellationContext.cancel(throwable); long errorCode = 0; if (throwable instanceof ErrorCodeHolder) { errorCode = ((ErrorCodeHolder) throwable).getErrorCode(); } getHttpChannel().writeResetFrame(errorCode); - this.cancellationContext.cancel(throwable); + } + + @Override + public void doOnNext(Object data) throws Throwable { + if (closed) { + return; + } + super.doOnNext(data); + } + + @Override + public void doOnError(Throwable throwable) throws Throwable { + if (closed) { + return; + } + super.doOnError(throwable); } @Override diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java index 16531e7a936..09ad7fe4225 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java @@ -16,4 +16,7 @@ */ package org.apache.dubbo.remoting.http12.h2; -public interface Http2TransportListener extends CancelableTransportListener {} +public interface Http2TransportListener extends CancelableTransportListener { + + void onStreamClosed(); +} diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java index c401cfeacd5..4e0a087ccd9 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java @@ -95,6 +95,9 @@ private void deliver() { if (inDelivery) { return; } + if (closed) { + return; + } inDelivery = true; try { // Process the uncompressed bytes. diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java index cc82019a7e4..582b6562728 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java @@ -24,6 +24,7 @@ import org.apache.dubbo.remoting.http12.exception.UnsupportedMediaTypeException; import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; import org.apache.dubbo.remoting.http12.h2.Http2ServerTransportListenerFactory; +import org.apache.dubbo.remoting.http12.h2.Http2TransportListener; import org.apache.dubbo.remoting.http12.h2.command.Http2WriteQueueChannel; import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler; import org.apache.dubbo.rpc.model.FrameworkModel; @@ -68,8 +69,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata metadata) { h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel, writeQueue); } ChannelPipeline pipeline = ctx.pipeline(); - pipeline.addLast( - new NettyHttp2FrameHandler(h2StreamChannel, factory.newInstance(h2StreamChannel, url, frameworkModel))); + Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); + ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); + pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener)); pipeline.remove(this); ctx.fireChannelRead(metadata); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java index 4fee48dfb43..4821563a093 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java @@ -20,7 +20,7 @@ import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; import org.apache.dubbo.common.threadpool.serial.SerializingExecutor; import org.apache.dubbo.remoting.http12.HttpMethods; -import org.apache.dubbo.remoting.http12.exception.HttpStatusException; +import org.apache.dubbo.remoting.http12.h2.CancelStreamException; import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; import org.apache.dubbo.remoting.http12.h2.Http2Header; import org.apache.dubbo.remoting.http12.h2.Http2InputMessage; @@ -176,7 +176,7 @@ protected void onError(Throwable throwable) { @Override public void cancelByRemote(long errorCode) { - serverChannelObserver.cancel(new HttpStatusException((int) errorCode)); + serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode)); serverCallListener.onCancel(errorCode); } @@ -188,6 +188,12 @@ protected final Http2ServerChannelObserver getServerChannelObserver() { return serverChannelObserver; } + @Override + public void onStreamClosed() { + // doing on event loop thread + getStreamingDecoder().close(); + } + private static class Http2StreamingDecodeListener implements ListeningDecoder.Listener { private final ServerCallListener serverCallListener;