From 620aa144d3c359b691af6f0628de3927ebcbca7c Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Tue, 28 May 2024 17:11:00 +0800 Subject: [PATCH 1/7] Triple http limiting the size of the HTTP request and response --- .../dubbo/remoting/http12/HttpUtils.java | 6 +- .../exception/HttpOverPayloadException.java | 24 +++++++ .../h1/Http1ServerUnaryChannelObserver.java | 16 +++++ .../http12/h1/LimitedByteBufOutputStream.java | 62 +++++++++++++++++++ .../http12/message/codec/BinaryCodec.java | 3 + .../http12/message/codec/HtmlCodec.java | 3 + .../http12/message/codec/JsonCodec.java | 9 ++- .../http12/message/codec/JsonPbCodec.java | 5 ++ .../http12/message/codec/PlainTextCodec.java | 3 + .../message/codec/UrlEncodeFormCodec.java | 5 ++ .../http12/message/codec/XmlCodec.java | 5 ++ .../http12/message/codec/YamlCodec.java | 9 +++ .../http12/netty4/h1/NettyHttp1Channel.java | 11 +++- .../http12/netty4/h1/NettyHttp1Codec.java | 9 ++- .../h1/NettyHttp1ConnectionHandler.java | 7 ++- .../rpc/protocol/tri/TripleHttp2Protocol.java | 2 +- .../tri/rest/RestHttpMessageCodec.java | 3 + 17 files changed, 171 insertions(+), 11 deletions(-) create mode 100644 dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java create mode 100644 dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java index a25c1076698..23c53f8a3f3 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java @@ -185,14 +185,14 @@ public static String readPostValue(InterfaceHttpData item) { } public static HttpRequest.FileUpload readUpload(InterfaceHttpData item) { - return new DefaultFileUploadAdaptee((FileUpload) item); + return new DefaultFileUploadAdapter((FileUpload) item); } - private static class DefaultFileUploadAdaptee implements HttpRequest.FileUpload { + private static class DefaultFileUploadAdapter implements HttpRequest.FileUpload { private final FileUpload fu; private InputStream inputStream; - DefaultFileUploadAdaptee(FileUpload fu) { + DefaultFileUploadAdapter(FileUpload fu) { this.fu = fu; } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java new file mode 100644 index 00000000000..c98d51ba4bb --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.http12.exception; + +public class HttpOverPayloadException extends HttpStatusException { + + public HttpOverPayloadException(String message) { + super(500, message); + } +} diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java index 8cb2a36edd2..682701b7339 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java @@ -20,8 +20,11 @@ import org.apache.dubbo.remoting.http12.HttpHeaderNames; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; +import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; import io.netty.buffer.ByteBufOutputStream; @@ -40,6 +43,10 @@ protected void doOnNext(Object data) throws Throwable { @Override protected void doOnError(Throwable throwable) throws Throwable { + if (throwable instanceof HttpOverPayloadException) { + handleOverPayload((HttpOverPayloadException) throwable); + return; + } String statusCode = resolveStatusCode(throwable); Object data = buildErrorResponse(statusCode, throwable); HttpOutputMessage httpOutputMessage = buildMessage(data); @@ -55,4 +62,13 @@ protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMe httpMetadata.headers().set(HttpHeaderNames.CONTENT_LENGTH.getName(), String.valueOf(contentLength)); } } + + private void handleOverPayload(HttpOverPayloadException overPayloadException) { + HttpMetadata httpMetadata = encodeHttpMetadata(); + List overPayloadStatus = + Arrays.asList(String.valueOf(overPayloadException.getStatusCode()), overPayloadException.getMessage()); + httpMetadata.headers().put(HttpHeaderNames.STATUS.getName(), overPayloadStatus); + httpMetadata.headers().set(HttpHeaderNames.CONTENT_LENGTH.getName(), "0"); + sendHeader(httpMetadata); + } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java new file mode 100644 index 00000000000..895f1f75686 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.http12.h1; + +import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; + +import java.io.IOException; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; + +public class LimitedByteBufOutputStream extends ByteBufOutputStream { + + private final int capacity; + + private int writeIndex; + + public LimitedByteBufOutputStream(ByteBuf byteBuf, int capacity) { + super(byteBuf); + this.capacity = capacity == 0 ? Integer.MAX_VALUE : capacity; + this.writeIndex = 0; + } + + @Override + public void write(int b) throws IOException { + ensureCapacity(1); + super.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + ensureCapacity(b.length); + super.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + ensureCapacity(len); + super.write(b, off, len); + } + + private void ensureCapacity(int len) { + writeIndex += len; + if (writeIndex > capacity) { + throw new HttpOverPayloadException("Response Entity Too Large"); + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java index bb3ee1af5c4..b9399277648 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.io.StreamUtils; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; @@ -49,6 +50,8 @@ public void encode(OutputStream os, Object data, Charset charset) throws EncodeE public Object decode(InputStream is, Class targetType, Charset charset) throws DecodeException { try { return StreamUtils.readBytes(is); + } catch (HttpStatusException e) { + throw e; } catch (Exception e) { throw new DecodeException(e); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java index ac1ae805abe..ca3fff4f107 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.io.StreamUtils; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; @@ -48,6 +49,8 @@ public Object decode(InputStream is, Class targetType, Charset charset) throw if (targetType == String.class) { return StreamUtils.toString(is, charset); } + } catch (HttpStatusException e) { + throw e; } catch (Exception e) { throw new DecodeException(e); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java index aea3fdb66a8..c16dbe2b247 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.utils.JsonUtils; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; @@ -35,6 +36,8 @@ public class JsonCodec implements HttpMessageCodec { public void encode(OutputStream os, Object data, Charset charset) throws EncodeException { try { os.write(JsonUtils.toJson(data).getBytes(charset)); + } catch (HttpStatusException e) { + throw e; } catch (Throwable t) { throw new EncodeException("Error encoding json", t); } @@ -43,6 +46,8 @@ public void encode(OutputStream os, Object data, Charset charset) throws EncodeE public void encode(OutputStream os, Object[] data, Charset charset) throws EncodeException { try { os.write(JsonUtils.toJson(data).getBytes(charset)); + } catch (HttpStatusException e) { + throw e; } catch (Throwable t) { throw new EncodeException("Error encoding json", t); } @@ -52,6 +57,8 @@ public void encode(OutputStream os, Object[] data, Charset charset) throws Encod public Object decode(InputStream is, Class targetType, Charset charset) throws DecodeException { try { return JsonUtils.toJavaObject(StreamUtils.toString(is, charset), targetType); + } catch (HttpStatusException e) { + throw e; } catch (Throwable t) { throw new DecodeException("Error decoding json", t); } @@ -78,7 +85,7 @@ public Object[] decode(InputStream is, Class[] targetTypes, Charset charset) return new Object[] {JsonUtils.convertObject(obj, targetTypes[0])}; } throw new DecodeException("Json must be array"); - } catch (DecodeException e) { + } catch (HttpStatusException e) { throw e; } catch (Throwable t) { throw new DecodeException("Error decoding json", t); diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java index 86131abb996..a3b2dbdea4d 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.utils.MethodUtils; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import java.io.IOException; import java.io.InputStream; @@ -54,6 +55,8 @@ public Object decode(InputStream is, Class targetType, Charset charset) throw JsonFormat.parser().ignoringUnknownFields().merge(StreamUtils.toString(is, charset), newBuilder); return newBuilder.build(); } + } catch (HttpStatusException e) { + throw e; } catch (Throwable e) { throw new DecodeException("Error decoding jsonPb", e); } @@ -67,6 +70,8 @@ public Object[] decode(InputStream is, Class[] targetTypes, Charset charset) // protobuf only support one parameter return new Object[] {decode(is, targetTypes[0], charset)}; } + } catch (HttpStatusException e) { + throw e; } catch (Throwable e) { throw new DecodeException("Error decoding jsonPb", e); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java index 33d067686ed..48e41c657dc 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.io.StreamUtils; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; @@ -51,6 +52,8 @@ public Object decode(InputStream is, Class targetType, Charset charset) throw if (targetType == String.class) { return StreamUtils.toString(is, charset); } + } catch (HttpStatusException e) { + throw e; } catch (Exception e) { throw new DecodeException(e); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java index bd51a2c91e6..643bc6211e5 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.io.StreamUtils; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; @@ -63,6 +64,8 @@ public void encode(OutputStream outputStream, Object data, Charset charset) thro } else { throw new EncodeException("UrlEncodeFrom media-type only supports String or Map as return type."); } + } catch (HttpStatusException e) { + throw e; } catch (Exception e) { throw new EncodeException(e); } @@ -99,6 +102,8 @@ else if (Arrays.stream(targetTypes) } else { return res.values().toArray(); } + } catch (HttpStatusException e) { + throw e; } catch (Exception e) { throw new DecodeException(e); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java index d63728083a3..3627aea6e4c 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java @@ -18,6 +18,7 @@ import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; @@ -47,6 +48,8 @@ public void encode(OutputStream os, Object data, Charset charset) throws EncodeE try (OutputStreamWriter writer = new OutputStreamWriter(os, charset)) { marshaller.marshal(data, writer); } + } catch (HttpStatusException e) { + throw e; } catch (Exception e) { throw new EncodeException("Error encoding xml", e); } @@ -63,6 +66,8 @@ public Object decode(InputStream is, Class targetType, Charset charset) throw Unmarshaller unmarshaller = context.createUnmarshaller(); return unmarshaller.unmarshal(xmlSource); } + } catch (HttpStatusException e) { + throw e; } catch (Exception e) { throw new DecodeException("Error decoding xml", e); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java index 30380eb1599..9362bcaf8b1 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.utils.DefaultSerializeClassChecker; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; @@ -43,6 +44,8 @@ public class YamlCodec implements HttpMessageCodec { public Object decode(InputStream is, Class targetType, Charset charset) throws DecodeException { try (InputStreamReader reader = new InputStreamReader(is, charset)) { return createYaml().loadAs(reader, (Class) targetType); + } catch (HttpStatusException e) { + throw e; } catch (Throwable t) { throw new DecodeException("Error decoding yaml", t); } @@ -69,6 +72,8 @@ public Object[] decode(InputStream is, Class[] targetTypes, Charset charset) } } return results; + } catch (HttpStatusException e) { + throw e; } catch (Throwable t) { throw new DecodeException("Error decoding yaml", t); } @@ -78,6 +83,8 @@ public Object[] decode(InputStream is, Class[] targetTypes, Charset charset) public void encode(OutputStream os, Object data, Charset charset) throws EncodeException { try (OutputStreamWriter writer = new OutputStreamWriter(os, charset)) { createYaml().dump(data, writer); + } catch (HttpStatusException e) { + throw e; } catch (Throwable t) { throw new EncodeException("Error encoding yaml", t); } @@ -87,6 +94,8 @@ public void encode(OutputStream os, Object data, Charset charset) throws EncodeE public void encode(OutputStream os, Object[] data, Charset charset) throws EncodeException { try (OutputStreamWriter writer = new OutputStreamWriter(os, charset)) { createYaml().dump(data, writer); + } catch (HttpStatusException e) { + throw e; } catch (Throwable t) { throw new EncodeException("Error encoding yaml", t); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java index 443bd21aec0..b3683349389 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java @@ -16,24 +16,28 @@ */ package org.apache.dubbo.remoting.http12.netty4.h1; +import org.apache.dubbo.config.nested.TripleConfig; import org.apache.dubbo.remoting.http12.HttpChannel; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; import org.apache.dubbo.remoting.http12.h1.Http1OutputMessage; +import org.apache.dubbo.remoting.http12.h1.LimitedByteBufOutputStream; import org.apache.dubbo.remoting.http12.netty4.NettyHttpChannelFutureListener; import java.net.SocketAddress; import java.util.concurrent.CompletableFuture; -import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.Channel; public class NettyHttp1Channel implements HttpChannel { private final Channel channel; - public NettyHttp1Channel(Channel channel) { + private final TripleConfig tripleConfig; + + public NettyHttp1Channel(Channel channel, TripleConfig tripleConfig) { this.channel = channel; + this.tripleConfig = tripleConfig; } @Override @@ -52,7 +56,8 @@ public CompletableFuture writeMessage(HttpOutputMessage httpOutputMessage) @Override public HttpOutputMessage newOutputMessage() { - return new Http1OutputMessage(new ByteBufOutputStream(channel.alloc().buffer())); + return new Http1OutputMessage( + new LimitedByteBufOutputStream(channel.alloc().buffer(), tripleConfig.getMaxResponseBodySize())); } @Override diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Codec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Codec.java index dfb7e3e39d7..5e4fee46a82 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Codec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Codec.java @@ -16,6 +16,7 @@ */ package org.apache.dubbo.remoting.http12.netty4.h1; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.remoting.http12.HttpHeaderNames; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; @@ -85,8 +86,12 @@ private void doWriteHeader(ChannelHandlerContext ctx, HttpMetadata msg, ChannelP // process status List statusHeaders = msg.headers().remove(HttpHeaderNames.STATUS.getName()); HttpResponseStatus status = HttpResponseStatus.OK; - if (!(statusHeaders == null || statusHeaders.isEmpty())) { - status = HttpResponseStatus.valueOf(Integer.parseInt(statusHeaders.get(0))); + if (CollectionUtils.isNotEmpty(statusHeaders)) { + if (statusHeaders.size() == 1) { + status = HttpResponseStatus.valueOf(Integer.parseInt(statusHeaders.get(0))); + } else { + status = new HttpResponseStatus(Integer.parseInt(statusHeaders.get(0)), statusHeaders.get(1)); + } } // process normal headers DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status); diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java index 7380024f7c7..72649fce24e 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java @@ -17,6 +17,7 @@ package org.apache.dubbo.remoting.http12.netty4.h1; import org.apache.dubbo.common.URL; +import org.apache.dubbo.config.nested.TripleConfig; import org.apache.dubbo.remoting.http12.h1.Http1Request; import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener; import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListenerFactory; @@ -33,12 +34,16 @@ public class NettyHttp1ConnectionHandler extends SimpleChannelInboundHandler handlers) { handlers.add(new ChannelHandlerPretender(new HttpObjectAggregator(tripleConfig.getMaxBodySize()))); handlers.add(new ChannelHandlerPretender(new NettyHttp1Codec())); handlers.add(new ChannelHandlerPretender(new NettyHttp1ConnectionHandler( - url, frameworkModel, DefaultHttp11ServerTransportListenerFactory.INSTANCE))); + url, frameworkModel, tripleConfig, DefaultHttp11ServerTransportListenerFactory.INSTANCE))); } private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url) { diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java index 23462b7b6b6..448a2b7db45 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java @@ -21,6 +21,7 @@ import org.apache.dubbo.remoting.http12.HttpResponse; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageDecoder; import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder; import org.apache.dubbo.remoting.http12.message.MediaType; @@ -108,6 +109,8 @@ public void encode(OutputStream os, Object data) throws EncodeException { if (messageEncoder.mediaType().isPureText() && type != String.class) { data = typeConverter.convert(data, String.class); } + } catch (HttpStatusException e) { + throw e; } catch (Exception e) { throw new EncodeException(e); } From b2406cf6714dc00fc80c4e54d8b1ffaccc273150 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Mon, 3 Jun 2024 20:18:42 +0800 Subject: [PATCH 2/7] Limit http1 and http2 response body --- .../AbstractServerHttpChannelObserver.java | 20 +++++++++++++--- .../remoting/http12/HttpHeaderNames.java | 4 +++- .../h1/Http1ServerUnaryChannelObserver.java | 23 +++++++++++-------- .../http12/h1/LimitedByteBufOutputStream.java | 12 +++++----- .../http12/netty4/h1/NettyHttp1Channel.java | 2 +- .../http12/netty4/h1/NettyHttp1Codec.java | 9 ++------ .../netty4/h2/NettyH2StreamChannel.java | 10 ++++++-- .../h2/NettyHttp2ProtocolSelectorHandler.java | 7 +++++- .../rpc/protocol/tri/TripleHeaderEnum.java | 3 ++- .../rpc/protocol/tri/TripleHttp2Protocol.java | 16 ++++++------- .../GenericHttp2ServerTransportListener.java | 14 +++++++++++ 11 files changed, 80 insertions(+), 40 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java index 26ce8b705e1..5392267610d 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java @@ -17,6 +17,7 @@ package org.apache.dubbo.remoting.http12; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; import org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException; import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder; @@ -35,6 +36,8 @@ public abstract class AbstractServerHttpChannelObserver implements CustomizableH private boolean headerSent; + private boolean completed; + protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) { this.httpChannel = httpChannel; } @@ -87,7 +90,11 @@ protected void doOnNext(Object data) throws Throwable { public final void onError(Throwable throwable) { if (throwable instanceof HttpResultPayloadException) { onNext(((HttpResultPayloadException) throwable).getResult()); - doOnCompleted(null); + onCompleted(null); + return; + } + if (throwable instanceof HttpOverPayloadException) { + onCompleted(throwable); return; } try { @@ -95,7 +102,7 @@ public final void onError(Throwable throwable) { } catch (Throwable ex) { throwable = new EncodeException(ex); } finally { - doOnCompleted(throwable); + onCompleted(throwable); } } @@ -110,7 +117,14 @@ protected void doOnError(Throwable throwable) throws Throwable { @Override public final void onCompleted() { - doOnCompleted(null); + onCompleted(null); + } + + private void onCompleted(Throwable throwable) { + if (!completed) { + doOnCompleted(throwable); + completed = true; + } } protected void doOnCompleted(Throwable throwable) { diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java index 65623ccacb7..a75f5da1ce0 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java @@ -27,7 +27,9 @@ public enum HttpHeaderNames { TE("te"), - ACCEPT("accept"); + ACCEPT("accept"), + + ERROR_MESSAGE("error-message"); private final String name; diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java index 682701b7339..fcddb5366ca 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java @@ -18,13 +18,12 @@ import org.apache.dubbo.remoting.http12.HttpChannel; import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.remoting.http12.HttpHeaders; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; import io.netty.buffer.ByteBufOutputStream; @@ -43,10 +42,6 @@ protected void doOnNext(Object data) throws Throwable { @Override protected void doOnError(Throwable throwable) throws Throwable { - if (throwable instanceof HttpOverPayloadException) { - handleOverPayload((HttpOverPayloadException) throwable); - return; - } String statusCode = resolveStatusCode(throwable); Object data = buildErrorResponse(statusCode, throwable); HttpOutputMessage httpOutputMessage = buildMessage(data); @@ -63,12 +58,20 @@ protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMe } } + @Override + protected void doOnCompleted(Throwable throwable) { + if (throwable instanceof HttpOverPayloadException) { + handleOverPayload((HttpOverPayloadException) throwable); + } + super.doOnCompleted(throwable); + } + private void handleOverPayload(HttpOverPayloadException overPayloadException) { HttpMetadata httpMetadata = encodeHttpMetadata(); - List overPayloadStatus = - Arrays.asList(String.valueOf(overPayloadException.getStatusCode()), overPayloadException.getMessage()); - httpMetadata.headers().put(HttpHeaderNames.STATUS.getName(), overPayloadStatus); - httpMetadata.headers().set(HttpHeaderNames.CONTENT_LENGTH.getName(), "0"); + HttpHeaders headers = httpMetadata.headers(); + headers.set(HttpHeaderNames.STATUS.getName(), String.valueOf(overPayloadException.getStatusCode())); + headers.set(HttpHeaderNames.CONTENT_LENGTH.getName(), "0"); + headers.set(HttpHeaderNames.ERROR_MESSAGE.getName(), overPayloadException.getMessage()); sendHeader(httpMetadata); } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java index 895f1f75686..160506db282 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java @@ -27,12 +27,13 @@ public class LimitedByteBufOutputStream extends ByteBufOutputStream { private final int capacity; - private int writeIndex; - public LimitedByteBufOutputStream(ByteBuf byteBuf, int capacity) { - super(byteBuf); + this(byteBuf, false, capacity); + } + + public LimitedByteBufOutputStream(ByteBuf byteBuf, boolean releaseOnClose, int capacity) { + super(byteBuf, releaseOnClose); this.capacity = capacity == 0 ? Integer.MAX_VALUE : capacity; - this.writeIndex = 0; } @Override @@ -54,8 +55,7 @@ public void write(byte[] b, int off, int len) throws IOException { } private void ensureCapacity(int len) { - writeIndex += len; - if (writeIndex > capacity) { + if (writtenBytes() + len > capacity) { throw new HttpOverPayloadException("Response Entity Too Large"); } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java index b3683349389..34ce1169cee 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java @@ -57,7 +57,7 @@ public CompletableFuture writeMessage(HttpOutputMessage httpOutputMessage) @Override public HttpOutputMessage newOutputMessage() { return new Http1OutputMessage( - new LimitedByteBufOutputStream(channel.alloc().buffer(), tripleConfig.getMaxResponseBodySize())); + new LimitedByteBufOutputStream(channel.alloc().buffer(), true, tripleConfig.getMaxResponseBodySize())); } @Override diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Codec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Codec.java index 5e4fee46a82..dfb7e3e39d7 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Codec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Codec.java @@ -16,7 +16,6 @@ */ package org.apache.dubbo.remoting.http12.netty4.h1; -import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.remoting.http12.HttpHeaderNames; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; @@ -86,12 +85,8 @@ private void doWriteHeader(ChannelHandlerContext ctx, HttpMetadata msg, ChannelP // process status List statusHeaders = msg.headers().remove(HttpHeaderNames.STATUS.getName()); HttpResponseStatus status = HttpResponseStatus.OK; - if (CollectionUtils.isNotEmpty(statusHeaders)) { - if (statusHeaders.size() == 1) { - status = HttpResponseStatus.valueOf(Integer.parseInt(statusHeaders.get(0))); - } else { - status = new HttpResponseStatus(Integer.parseInt(statusHeaders.get(0)), statusHeaders.get(1)); - } + if (!(statusHeaders == null || statusHeaders.isEmpty())) { + status = HttpResponseStatus.valueOf(Integer.parseInt(statusHeaders.get(0))); } // process normal headers DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status); diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java index ade163606a3..2aa9892a92e 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java @@ -16,8 +16,10 @@ */ package org.apache.dubbo.remoting.http12.netty4.h2; +import org.apache.dubbo.config.nested.TripleConfig; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; +import org.apache.dubbo.remoting.http12.h1.LimitedByteBufOutputStream; import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; import org.apache.dubbo.remoting.http12.h2.Http2OutputMessage; import org.apache.dubbo.remoting.http12.h2.Http2OutputMessageFrame; @@ -35,8 +37,11 @@ public class NettyH2StreamChannel implements H2StreamChannel { private final Http2StreamChannel http2StreamChannel; - public NettyH2StreamChannel(Http2StreamChannel http2StreamChannel) { + private final TripleConfig tripleConfig; + + public NettyH2StreamChannel(Http2StreamChannel http2StreamChannel, TripleConfig tripleConfig) { this.http2StreamChannel = http2StreamChannel; + this.tripleConfig = tripleConfig; } @Override @@ -57,7 +62,8 @@ public CompletableFuture writeMessage(HttpOutputMessage httpOutputMessage) @Override public Http2OutputMessage newOutputMessage(boolean endStream) { ByteBuf buffer = http2StreamChannel.alloc().buffer(); - ByteBufOutputStream outputStream = new ByteBufOutputStream(buffer, true); + ByteBufOutputStream outputStream = + new LimitedByteBufOutputStream(buffer, true, tripleConfig.getMaxResponseBodySize()); return new Http2OutputMessageFrame(outputStream, endStream); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java index 582b6562728..23596186891 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java @@ -17,6 +17,7 @@ package org.apache.dubbo.remoting.http12.netty4.h2; import org.apache.dubbo.common.URL; +import org.apache.dubbo.config.nested.TripleConfig; import org.apache.dubbo.remoting.http12.HttpHeaderNames; import org.apache.dubbo.remoting.http12.HttpHeaders; import org.apache.dubbo.remoting.http12.HttpMetadata; @@ -42,14 +43,18 @@ public class NettyHttp2ProtocolSelectorHandler extends SimpleChannelInboundHandl private final FrameworkModel frameworkModel; + private final TripleConfig tripleConfig; + private final Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory; public NettyHttp2ProtocolSelectorHandler( URL url, FrameworkModel frameworkModel, + TripleConfig tripleConfig, Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory) { this.url = url; this.frameworkModel = frameworkModel; + this.tripleConfig = tripleConfig; this.defaultHttp2ServerTransportListenerFactory = defaultHttp2ServerTransportListenerFactory; } @@ -61,7 +66,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata metadata) { if (factory == null) { throw new UnsupportedMediaTypeException(contentType); } - H2StreamChannel h2StreamChannel = new NettyH2StreamChannel((Http2StreamChannel) ctx.channel()); + H2StreamChannel h2StreamChannel = new NettyH2StreamChannel((Http2StreamChannel) ctx.channel(), tripleConfig); HttpWriteQueueHandler writeQueueHandler = ctx.channel().parent().pipeline().get(HttpWriteQueueHandler.class); if (writeQueueHandler != null) { diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java index 2c42b3f7bdf..1d0ea4ee31c 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java @@ -46,7 +46,8 @@ public enum TripleHeaderEnum { SERVICE_TIMEOUT("tri-service-timeout"), TRI_HEADER_CONVERT("tri-header-convert"), TRI_EXCEPTION_CODE("tri-exception-code"), - ; + TRI_STATUS("tri-status"), + TRI_MESSAGE("tri-message"); static final Map enumMap = new HashMap<>(); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index 6e8eb69a241..f34d01b0374 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -142,11 +142,11 @@ private void configurerHttp1Handlers(URL url, List handlers) { protocol -> { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { return new Http2ServerUpgradeCodec( - buildHttp2FrameCodec(url), + buildHttp2FrameCodec(tripleConfig), new HttpWriteQueueHandler(), new FlushConsolidationHandler(64, true), new TripleServerConnectionHandler(), - buildHttp2MultiplexHandler(url), + buildHttp2MultiplexHandler(url, tripleConfig), new TripleTailHandler()); } // Not upgrade request @@ -162,21 +162,22 @@ private void configurerHttp1Handlers(URL url, List handlers) { url, frameworkModel, tripleConfig, DefaultHttp11ServerTransportListenerFactory.INSTANCE))); } - private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url) { + private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url, TripleConfig tripleConfig) { return new Http2MultiplexHandler(new ChannelInitializer() { @Override protected void initChannel(Http2StreamChannel ch) { final ChannelPipeline p = ch.pipeline(); p.addLast(new NettyHttp2FrameCodec()); p.addLast(new NettyHttp2ProtocolSelectorHandler( - url, frameworkModel, GenericHttp2ServerTransportListenerFactory.INSTANCE)); + url, frameworkModel, tripleConfig, GenericHttp2ServerTransportListenerFactory.INSTANCE)); } }); } private void configurerHttp2Handlers(URL url, List handlers) { - final Http2FrameCodec codec = buildHttp2FrameCodec(url); - final Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url); + TripleConfig tripleConfig = getTripleConfig(url); + final Http2FrameCodec codec = buildHttp2FrameCodec(tripleConfig); + final Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url, tripleConfig); handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler())); handlers.add(new ChannelHandlerPretender(codec)); handlers.add(new ChannelHandlerPretender(new FlushConsolidationHandler(64, true))); @@ -185,8 +186,7 @@ private void configurerHttp2Handlers(URL url, List handlers) { handlers.add(new ChannelHandlerPretender(new TripleTailHandler())); } - private Http2FrameCodec buildHttp2FrameCodec(URL url) { - TripleConfig tripleConfig = getTripleConfig(url); + private Http2FrameCodec buildHttp2FrameCodec(TripleConfig tripleConfig) { return TripleHttp2FrameCodecBuilder.forServer() .customizeConnection((connection) -> connection.remote().flowController(new TriHttp2RemoteFlowController(connection, tripleConfig))) diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java index a44fefcd479..916a108710a 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java @@ -19,7 +19,9 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; import org.apache.dubbo.common.threadpool.serial.SerializingExecutor; +import org.apache.dubbo.remoting.http12.HttpHeaders; import org.apache.dubbo.remoting.http12.HttpMethods; +import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; import org.apache.dubbo.remoting.http12.h2.CancelStreamException; import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; import org.apache.dubbo.remoting.http12.h2.Http2Header; @@ -42,6 +44,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor; import org.apache.dubbo.rpc.protocol.tri.ReflectionPackableMethod; import org.apache.dubbo.rpc.protocol.tri.RpcInvocationBuildContext; +import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum; import org.apache.dubbo.rpc.protocol.tri.h12.AbstractServerTransportListener; import org.apache.dubbo.rpc.protocol.tri.h12.BiStreamServerCallListener; import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener; @@ -74,6 +77,17 @@ public GenericHttp2ServerTransportListener( serverChannelObserver = new Http2ServerCallToObserverAdapter(frameworkModel, h2StreamChannel); serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE); serverChannelObserver.setStreamingDecoder(streamingDecoder); + serverChannelObserver.setTrailersCustomizer(this::httpTrailersCustomize); + } + + private void httpTrailersCustomize(HttpHeaders httpHeaders, Throwable throwable) { + if (throwable instanceof HttpOverPayloadException) { + HttpOverPayloadException exception = (HttpOverPayloadException) throwable; + httpHeaders.set(TripleHeaderEnum.TRI_STATUS.getHeader(), String.valueOf(exception.getStatusCode())); + httpHeaders.set(TripleHeaderEnum.TRI_MESSAGE.getHeader(), exception.getMessage()); + } else { + httpHeaders.set(TripleHeaderEnum.TRI_STATUS.getHeader(), "0"); + } } protected StreamingDecoder newStreamingDecoder() { From 4d9e7b255a2edd5a6886fc20fec3c6b833f77da9 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Wed, 5 Jun 2024 17:24:52 +0800 Subject: [PATCH 3/7] Native http2 unary calls use Http2ServerUnaryChannelObserver, The flag should also be set when the http1 connection is disconnected --- .../AbstractServerHttpChannelObserver.java | 25 +++++++-- .../remoting/http12/HttpChannelObserver.java | 2 +- .../http12/HttpTransportListener.java | 2 +- .../h1/Http1ServerUnaryChannelObserver.java | 19 ------- .../http12/h2/Http2ServerChannelObserver.java | 25 ++------- .../http12/h2/Http2TransportListener.java | 5 +- .../h1/NettyHttp1ConnectionHandler.java | 1 + .../h2/NettyHttp2ProtocolSelectorHandler.java | 2 +- .../rpc/protocol/tri/TripleHeaderEnum.java | 4 +- .../GrpcHttp2ServerTransportListener.java | 3 ++ .../DefaultHttp11ServerTransportListener.java | 5 ++ .../GenericHttp2ServerTransportListener.java | 51 ++++++++++--------- .../Http2ServerUnaryChannelObserver.java | 44 ++++++++++++++++ 13 files changed, 108 insertions(+), 80 deletions(-) create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java index 5392267610d..fb9c54f79cc 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java @@ -17,7 +17,6 @@ package org.apache.dubbo.remoting.http12; import org.apache.dubbo.remoting.http12.exception.EncodeException; -import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; import org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException; import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder; @@ -38,6 +37,8 @@ public abstract class AbstractServerHttpChannelObserver implements CustomizableH private boolean completed; + private boolean closed; + protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) { this.httpChannel = httpChannel; } @@ -72,6 +73,9 @@ public void setResponseEncoder(HttpMessageEncoder responseEncoder) { @Override public final void onNext(Object data) { + if (closed) { + return; + } try { doOnNext(data); } catch (Throwable e) { @@ -88,15 +92,14 @@ protected void doOnNext(Object data) throws Throwable { @Override public final void onError(Throwable throwable) { + if (closed) { + return; + } if (throwable instanceof HttpResultPayloadException) { onNext(((HttpResultPayloadException) throwable).getResult()); onCompleted(null); return; } - if (throwable instanceof HttpOverPayloadException) { - onCompleted(throwable); - return; - } try { doOnError(throwable); } catch (Throwable ex) { @@ -117,6 +120,9 @@ protected void doOnError(Throwable throwable) throws Throwable { @Override public final void onCompleted() { + if (closed) { + return; + } onCompleted(null); } @@ -212,4 +218,13 @@ protected final void sendHeader(HttpMetadata httpMetadata) { getHttpChannel().writeHeader(httpMetadata); headerSent = true; } + + @Override + public void close() throws Exception { + closed(); + } + + protected final void closed() { + closed = true; + } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java index 4cb03426428..294d7311bff 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java @@ -18,7 +18,7 @@ import org.apache.dubbo.common.stream.StreamObserver; -public interface HttpChannelObserver extends StreamObserver { +public interface HttpChannelObserver extends StreamObserver, AutoCloseable { HttpChannel getHttpChannel(); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java index 9265a3ba93a..185a1717287 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java @@ -16,7 +16,7 @@ */ package org.apache.dubbo.remoting.http12; -public interface HttpTransportListener
{ +public interface HttpTransportListener
extends AutoCloseable { void onMetadata(HEADER metadata); diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java index fcddb5366ca..8cb2a36edd2 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java @@ -18,10 +18,8 @@ import org.apache.dubbo.remoting.http12.HttpChannel; import org.apache.dubbo.remoting.http12.HttpHeaderNames; -import org.apache.dubbo.remoting.http12.HttpHeaders; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; -import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; import java.io.OutputStream; @@ -57,21 +55,4 @@ protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMe httpMetadata.headers().set(HttpHeaderNames.CONTENT_LENGTH.getName(), String.valueOf(contentLength)); } } - - @Override - protected void doOnCompleted(Throwable throwable) { - if (throwable instanceof HttpOverPayloadException) { - handleOverPayload((HttpOverPayloadException) throwable); - } - super.doOnCompleted(throwable); - } - - private void handleOverPayload(HttpOverPayloadException overPayloadException) { - HttpMetadata httpMetadata = encodeHttpMetadata(); - HttpHeaders headers = httpMetadata.headers(); - headers.set(HttpHeaderNames.STATUS.getName(), String.valueOf(overPayloadException.getStatusCode())); - headers.set(HttpHeaderNames.CONTENT_LENGTH.getName(), "0"); - headers.set(HttpHeaderNames.ERROR_MESSAGE.getName(), overPayloadException.getMessage()); - sendHeader(httpMetadata); - } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java index 61568cc5bb5..bd43f2ad926 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java @@ -37,8 +37,6 @@ public class Http2ServerChannelObserver extends AbstractServerHttpChannelObserve private boolean autoRequestN = true; - private boolean closed = false; - public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) { super(h2StreamChannel); } @@ -78,7 +76,7 @@ public CancellationContext getCancellationContext() { public void cancel(Throwable throwable) { if (throwable instanceof CancelStreamException) { if (((CancelStreamException) throwable).isCancelByRemote()) { - closed = true; + closed(); } } this.cancellationContext.cancel(throwable); @@ -89,22 +87,6 @@ public void cancel(Throwable throwable) { getHttpChannel().writeResetFrame(errorCode); } - @Override - public void doOnNext(Object data) throws Throwable { - if (closed) { - return; - } - super.doOnNext(data); - } - - @Override - public void doOnError(Throwable throwable) throws Throwable { - if (closed) { - return; - } - super.doOnError(throwable); - } - @Override public void request(int count) { this.streamingDecoder.request(count); @@ -120,8 +102,9 @@ public boolean isAutoRequestN() { return autoRequestN; } - public void onStreamClosed() { - closed = true; + @Override + public void close() throws Exception { + super.close(); streamingDecoder.onStreamClosed(); } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java index 09ad7fe4225..16531e7a936 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java @@ -16,7 +16,4 @@ */ package org.apache.dubbo.remoting.http12.h2; -public interface Http2TransportListener extends CancelableTransportListener { - - void onStreamClosed(); -} +public interface Http2TransportListener extends CancelableTransportListener {} diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java index 72649fce24e..452faba1b27 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java @@ -55,5 +55,6 @@ protected void channelRead0(ChannelHandlerContext ctx, Http1Request http1Request new NettyHttp1Channel(ctx.channel(), tripleConfig), url, frameworkModel); http1TransportListener.onMetadata(http1Request); http1TransportListener.onData(http1Request); + ctx.channel().closeFuture().addListener(future -> http1TransportListener.close()); } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java index 23596186891..26fda3e87e8 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java @@ -75,7 +75,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata metadata) { } ChannelPipeline pipeline = ctx.pipeline(); Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); - ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); + ctx.channel().closeFuture().addListener(future -> http2TransportListener.close()); pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener)); pipeline.remove(this); ctx.fireChannelRead(metadata); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java index 1d0ea4ee31c..1bd9545b467 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java @@ -45,9 +45,7 @@ public enum TripleHeaderEnum { SERVICE_GROUP("tri-service-group"), SERVICE_TIMEOUT("tri-service-timeout"), TRI_HEADER_CONVERT("tri-header-convert"), - TRI_EXCEPTION_CODE("tri-exception-code"), - TRI_STATUS("tri-status"), - TRI_MESSAGE("tri-message"); + TRI_EXCEPTION_CODE("tri-exception-code"); static final Map enumMap = new HashMap<>(); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java index 9ae78813025..de69baafe9f 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java @@ -80,6 +80,9 @@ protected HttpMessageListener buildHttpMessageListener() { return getContext().isHasStub() ? super.buildHttpMessageListener() : new LazyFindMethodListener(); } + @Override + protected void onUnary() {} + @Override protected void onMetadataCompletion(Http2Header metadata) { super.onMetadataCompletion(metadata); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java index 950a2d93506..3f9406a3ccc 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java @@ -108,6 +108,11 @@ protected void onError(Throwable throwable) { serverChannelObserver.onError(throwable); } + @Override + public void close() throws Exception { + serverChannelObserver.close(); + } + private static class AutoCompleteUnaryServerCallListener extends UnaryServerCallListener { public AutoCompleteUnaryServerCallListener( diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java index 916a108710a..0c18f24de8d 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java @@ -19,9 +19,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; import org.apache.dubbo.common.threadpool.serial.SerializingExecutor; -import org.apache.dubbo.remoting.http12.HttpHeaders; import org.apache.dubbo.remoting.http12.HttpMethods; -import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; import org.apache.dubbo.remoting.http12.h2.CancelStreamException; import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; import org.apache.dubbo.remoting.http12.h2.Http2Header; @@ -44,7 +42,6 @@ import org.apache.dubbo.rpc.model.MethodDescriptor; import org.apache.dubbo.rpc.protocol.tri.ReflectionPackableMethod; import org.apache.dubbo.rpc.protocol.tri.RpcInvocationBuildContext; -import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum; import org.apache.dubbo.rpc.protocol.tri.h12.AbstractServerTransportListener; import org.apache.dubbo.rpc.protocol.tri.h12.BiStreamServerCallListener; import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener; @@ -64,8 +61,9 @@ public class GenericHttp2ServerTransportListener extends AbstractServerTransport private final ExecutorSupport executorSupport; private final StreamingDecoder streamingDecoder; - private final Http2ServerChannelObserver serverChannelObserver; - + private final FrameworkModel frameworkModel; + private final H2StreamChannel h2StreamChannel; + private Http2ServerChannelObserver serverChannelObserver; private ServerCallListener serverCallListener; public GenericHttp2ServerTransportListener( @@ -77,17 +75,8 @@ public GenericHttp2ServerTransportListener( serverChannelObserver = new Http2ServerCallToObserverAdapter(frameworkModel, h2StreamChannel); serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE); serverChannelObserver.setStreamingDecoder(streamingDecoder); - serverChannelObserver.setTrailersCustomizer(this::httpTrailersCustomize); - } - - private void httpTrailersCustomize(HttpHeaders httpHeaders, Throwable throwable) { - if (throwable instanceof HttpOverPayloadException) { - HttpOverPayloadException exception = (HttpOverPayloadException) throwable; - httpHeaders.set(TripleHeaderEnum.TRI_STATUS.getHeader(), String.valueOf(exception.getStatusCode())); - httpHeaders.set(TripleHeaderEnum.TRI_MESSAGE.getHeader(), exception.getMessage()); - } else { - httpHeaders.set(TripleHeaderEnum.TRI_STATUS.getHeader(), "0"); - } + this.frameworkModel = frameworkModel; + this.h2StreamChannel = h2StreamChannel; } protected StreamingDecoder newStreamingDecoder() { @@ -128,11 +117,9 @@ protected HttpMessageListener buildHttpMessageListener() { private ServerCallListener startListener( RpcInvocation invocation, MethodDescriptor methodDescriptor, Invoker invoker) { - Http2ServerChannelObserver responseObserver = getServerChannelObserver(); - CancellationContext cancellationContext = RpcContext.getCancellationContext(); - responseObserver.setCancellationContext(cancellationContext); switch (methodDescriptor.getRpcType()) { case UNARY: + onUnary(); boolean applyCustomizeException = false; if (!getContext().isHasStub()) { MethodMetadata methodMetadata = getContext().getMethodMetadata(); @@ -141,19 +128,34 @@ private ServerCallListener startListener( methodMetadata.getActualRequestTypes(), methodMetadata.getActualResponseType()); } - UnaryServerCallListener unaryServerCallListener = startUnary(invocation, invoker, responseObserver); + onListenerStart(); + UnaryServerCallListener unaryServerCallListener = + startUnary(invocation, invoker, getServerChannelObserver()); unaryServerCallListener.setApplyCustomizeException(applyCustomizeException); return unaryServerCallListener; case SERVER_STREAM: - return startServerStreaming(invocation, invoker, responseObserver); + onListenerStart(); + return startServerStreaming(invocation, invoker, getServerChannelObserver()); case BI_STREAM: case CLIENT_STREAM: - return startBiStreaming(invocation, invoker, responseObserver); + onListenerStart(); + return startBiStreaming(invocation, invoker, getServerChannelObserver()); default: throw new IllegalStateException("Can not reach here"); } } + protected void onUnary() { + serverChannelObserver = new Http2ServerUnaryChannelObserver(frameworkModel, h2StreamChannel); + serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE); + serverChannelObserver.setStreamingDecoder(streamingDecoder); + } + + protected void onListenerStart() { + CancellationContext cancellationContext = RpcContext.getCancellationContext(); + serverChannelObserver.setCancellationContext(cancellationContext); + } + private UnaryServerCallListener startUnary( RpcInvocation invocation, Invoker invoker, Http2ServerChannelObserver responseObserver) { return new UnaryServerCallListener(invocation, invoker, responseObserver); @@ -215,9 +217,8 @@ protected final Http2ServerChannelObserver getServerChannelObserver() { } @Override - public void onStreamClosed() { - // doing on event loop thread - getServerChannelObserver().onStreamClosed(); + public void close() throws Exception { + getServerChannelObserver().close(); } private static class Http2StreamingDecodeListener implements ListeningDecoder.Listener { diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java new file mode 100644 index 00000000000..d5c16e5c614 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.h12.http2; + +import org.apache.dubbo.remoting.http12.HttpOutputMessage; +import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; +import org.apache.dubbo.rpc.model.FrameworkModel; + +public class Http2ServerUnaryChannelObserver extends Http2ServerCallToObserverAdapter { + + public Http2ServerUnaryChannelObserver(FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) { + super(frameworkModel, h2StreamChannel); + } + + @Override + public void doOnNext(Object data) throws Throwable { + HttpOutputMessage httpOutputMessage = buildMessage(data); + sendHeader(buildMetadata(resolveStatusCode(data), data, httpOutputMessage)); + sendMessage(httpOutputMessage); + } + + @Override + public void doOnError(Throwable throwable) throws Throwable { + String statusCode = resolveStatusCode(throwable); + Object data = buildErrorResponse(statusCode, throwable); + HttpOutputMessage httpOutputMessage = buildMessage(data); + sendHeader(buildMetadata(statusCode, data, httpOutputMessage)); + sendMessage(httpOutputMessage); + } +} From d6e5fde264dc374972625dc0ef0613722cea39ab Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Wed, 5 Jun 2024 17:33:18 +0800 Subject: [PATCH 4/7] Format code --- .../org/apache/dubbo/remoting/http12/HttpHeaderNames.java | 4 +--- .../apache/dubbo/remoting/http12/HttpTransportListener.java | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java index a75f5da1ce0..65623ccacb7 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpHeaderNames.java @@ -27,9 +27,7 @@ public enum HttpHeaderNames { TE("te"), - ACCEPT("accept"), - - ERROR_MESSAGE("error-message"); + ACCEPT("accept"); private final String name; diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java index 185a1717287..57e1419eb01 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java @@ -16,7 +16,8 @@ */ package org.apache.dubbo.remoting.http12; -public interface HttpTransportListener
extends AutoCloseable { +public interface HttpTransportListener
+ extends AutoCloseable { void onMetadata(HEADER metadata); From 0dc3eee2a3f022fde4aee7422568595d27492e9f Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Fri, 7 Jun 2024 11:52:20 +0800 Subject: [PATCH 5/7] Fix some problems --- .../{h1 => }/LimitedByteBufOutputStream.java | 2 +- .../http12/netty4/h1/NettyHttp1Channel.java | 2 +- .../netty4/h2/NettyH2StreamChannel.java | 2 +- .../netty4/h2/NettyHttp2FrameCodec.java | 2 +- .../rpc/protocol/tri/TripleHttp2Protocol.java | 2 ++ .../GrpcHttp2ServerTransportListener.java | 7 +++++ .../h12/grpc/GrpcServerChannelObserver.java | 31 +++++++++++++++++++ .../GenericHttp2ServerTransportListener.java | 7 ++++- .../Http2ServerUnaryChannelObserver.java | 8 +++++ 9 files changed, 58 insertions(+), 5 deletions(-) rename dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/{h1 => }/LimitedByteBufOutputStream.java (97%) create mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java similarity index 97% rename from dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java rename to dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java index 160506db282..f9124a924e3 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/LimitedByteBufOutputStream.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.remoting.http12.h1; +package org.apache.dubbo.remoting.http12; import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException; diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java index 34ce1169cee..e8d6b21f6ff 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java @@ -21,7 +21,7 @@ import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; import org.apache.dubbo.remoting.http12.h1.Http1OutputMessage; -import org.apache.dubbo.remoting.http12.h1.LimitedByteBufOutputStream; +import org.apache.dubbo.remoting.http12.LimitedByteBufOutputStream; import org.apache.dubbo.remoting.http12.netty4.NettyHttpChannelFutureListener; import java.net.SocketAddress; diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java index 2aa9892a92e..ba11b394b28 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java @@ -19,7 +19,7 @@ import org.apache.dubbo.config.nested.TripleConfig; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; -import org.apache.dubbo.remoting.http12.h1.LimitedByteBufOutputStream; +import org.apache.dubbo.remoting.http12.LimitedByteBufOutputStream; import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; import org.apache.dubbo.remoting.http12.h2.Http2OutputMessage; import org.apache.dubbo.remoting.http12.h2.Http2OutputMessageFrame; diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java index 9496c94a003..ccac52f5ffb 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java @@ -91,7 +91,7 @@ private Http2InputMessage onHttp2DataFrame(Http2DataFrame dataFrame) { private Http2HeadersFrame encodeHttp2HeadersFrame(Http2Header http2Header) { HttpHeaders headers = http2Header.headers(); - DefaultHttp2Headers http2Headers = new DefaultHttp2Headers(); + DefaultHttp2Headers http2Headers = new DefaultHttp2Headers(false); for (Map.Entry> entry : headers.entrySet()) { String name = entry.getKey(); List value = entry.getValue(); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index f34d01b0374..cee75e3ec36 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -97,6 +97,7 @@ public void configClientPipeline(URL url, ChannelOperator operator, ContextOpera .maxFrameSize(tripleConfig.getMaxFrameSize()) .maxHeaderListSize(tripleConfig.getMaxHeaderListSize())) .frameLogger(CLIENT_LOGGER) + .validateHeaders(false) .build(); // codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter()); List handlers = new ArrayList<>(); @@ -198,6 +199,7 @@ private Http2FrameCodec buildHttp2FrameCodec(TripleConfig tripleConfig) { .maxFrameSize(tripleConfig.getMaxFrameSize()) .maxHeaderListSize(tripleConfig.getMaxHeaderListSize())) .frameLogger(SERVER_LOGGER) + .validateHeaders(false) .build(); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java index de69baafe9f..1b25dc57a1c 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java @@ -26,6 +26,7 @@ import org.apache.dubbo.remoting.http12.exception.UnimplementedException; import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; import org.apache.dubbo.remoting.http12.h2.Http2Header; +import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver; import org.apache.dubbo.remoting.http12.h2.Http2TransportListener; import org.apache.dubbo.remoting.http12.message.MethodMetadata; import org.apache.dubbo.remoting.http12.message.StreamingDecoder; @@ -75,6 +76,12 @@ protected StreamingDecoder newStreamingDecoder() { return new GrpcStreamingDecoder(); } + @Override + protected Http2ServerChannelObserver newHttp2ServerChannelObserver( + FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) { + return new GrpcServerChannelObserver(frameworkModel, h2StreamChannel); + } + @Override protected HttpMessageListener buildHttpMessageListener() { return getContext().isHasStub() ? super.buildHttpMessageListener() : new LazyFindMethodListener(); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java new file mode 100644 index 00000000000..5b044c400fe --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri.h12.grpc; + +import org.apache.dubbo.remoting.http12.h2.H2StreamChannel; +import org.apache.dubbo.rpc.model.FrameworkModel; +import org.apache.dubbo.rpc.protocol.tri.h12.http2.Http2ServerCallToObserverAdapter; + +public class GrpcServerChannelObserver extends Http2ServerCallToObserverAdapter { + + public GrpcServerChannelObserver(FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) { + super(frameworkModel, h2StreamChannel); + } + + @Override + protected void doOnError(Throwable throwable) {} +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java index 0c18f24de8d..9c46944ccf5 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java @@ -72,7 +72,7 @@ public GenericHttp2ServerTransportListener( executorSupport = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()) .getExecutorSupport(url); streamingDecoder = newStreamingDecoder(); - serverChannelObserver = new Http2ServerCallToObserverAdapter(frameworkModel, h2StreamChannel); + serverChannelObserver = newHttp2ServerChannelObserver(frameworkModel, h2StreamChannel); serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE); serverChannelObserver.setStreamingDecoder(streamingDecoder); this.frameworkModel = frameworkModel; @@ -83,6 +83,11 @@ protected StreamingDecoder newStreamingDecoder() { return new DefaultStreamingDecoder(); } + protected Http2ServerChannelObserver newHttp2ServerChannelObserver( + FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) { + return new Http2ServerCallToObserverAdapter(frameworkModel, h2StreamChannel); + } + @Override protected Executor initializeExecutor(Http2Header metadata) { return new SerializingExecutor(executorSupport.getExecutor(metadata)); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java index d5c16e5c614..10d2c2a5306 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java @@ -41,4 +41,12 @@ public void doOnError(Throwable throwable) throws Throwable { sendHeader(buildMetadata(statusCode, data, httpOutputMessage)); sendMessage(httpOutputMessage); } + + @Override + protected void doOnCompleted(Throwable throwable) {} + + @Override + protected HttpOutputMessage encodeHttpOutputMessage(Object data) { + return getHttpChannel().newOutputMessage(true); + } } From b3ed3e4ea7431bfa02535ff4892a9a26f9753a4b Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Fri, 7 Jun 2024 11:58:40 +0800 Subject: [PATCH 6/7] Revert netty new api --- .../dubbo/remoting/http12/LimitedByteBufOutputStream.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java index f9124a924e3..ac70f88f42a 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java @@ -28,11 +28,7 @@ public class LimitedByteBufOutputStream extends ByteBufOutputStream { private final int capacity; public LimitedByteBufOutputStream(ByteBuf byteBuf, int capacity) { - this(byteBuf, false, capacity); - } - - public LimitedByteBufOutputStream(ByteBuf byteBuf, boolean releaseOnClose, int capacity) { - super(byteBuf, releaseOnClose); + super(byteBuf); this.capacity = capacity == 0 ? Integer.MAX_VALUE : capacity; } From fbba3fdd1c143fca8424f8759bcf6aa091f33b06 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Fri, 7 Jun 2024 14:08:49 +0800 Subject: [PATCH 7/7] Code format --- .../dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java index 588245153c0..086dde1afdc 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java @@ -20,8 +20,8 @@ import org.apache.dubbo.remoting.http12.HttpChannel; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; -import org.apache.dubbo.remoting.http12.h1.Http1OutputMessage; import org.apache.dubbo.remoting.http12.LimitedByteBufOutputStream; +import org.apache.dubbo.remoting.http12.h1.Http1OutputMessage; import org.apache.dubbo.remoting.http12.netty4.NettyHttpChannelFutureListener; import java.net.SocketAddress;