From 68a883d65ab56c9f92448a7f9f17cc037d1df209 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Thu, 13 Jun 2024 20:02:22 +0800 Subject: [PATCH 1/3] Restore the serialization mode of PackableMethod --- .../http12/h2/Http2ServerChannelObserver.java | 4 +- .../message/LengthFieldStreamingDecoder.java | 11 +- .../http12/message/StreamingDecoder.java | 7 - .../rpc/protocol/tri/DescriptorUtils.java | 7 +- .../tri/h12/grpc/GrpcCompositeCodec.java | 119 +++++++------- .../h12/grpc/GrpcCompositeCodecFactory.java | 7 +- .../GrpcHttp2ServerTransportListener.java | 27 +-- .../h12/grpc/ProtobufHttpMessageCodec.java | 56 ------- .../tri/h12/grpc/WrapperHttpMessageCodec.java | 154 ------------------ .../GenericHttp2ServerTransportListener.java | 4 +- 10 files changed, 78 insertions(+), 318 deletions(-) delete mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java delete mode 100644 dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java index bd43f2ad926..c3c8f8c9a82 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java @@ -79,7 +79,9 @@ public void cancel(Throwable throwable) { closed(); } } - this.cancellationContext.cancel(throwable); + if (cancellationContext != null) { + cancellationContext.cancel(throwable); + } long errorCode = 0; if (throwable instanceof ErrorCodeHolder) { errorCode = ((ErrorCodeHolder) throwable).getErrorCode(); diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java index 980ccf2ccf7..77821ea431a 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java @@ -20,7 +20,6 @@ import org.apache.dubbo.remoting.http12.exception.DecodeException; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -46,8 +45,6 @@ public class LengthFieldStreamingDecoder implements StreamingDecoder { private int requiredLength; - private InputStream dataHeader = new ByteArrayInputStream(new byte[0]); - public LengthFieldStreamingDecoder() { this(4); } @@ -146,16 +143,12 @@ private void deliver() { } private void processHeader() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(lengthFieldOffset + lengthFieldLength); byte[] offsetData = new byte[lengthFieldOffset]; int ignore = accumulate.read(offsetData); - bos.write(offsetData); processOffset(new ByteArrayInputStream(offsetData), lengthFieldOffset); byte[] lengthBytes = new byte[lengthFieldLength]; ignore = accumulate.read(lengthBytes); - bos.write(lengthBytes); requiredLength = bytesToInt(lengthBytes); - this.dataHeader = new ByteArrayInputStream(bos.toByteArray()); // Continue reading the frame body. state = DecodeState.PAYLOAD; @@ -183,8 +176,8 @@ private void processBody() throws IOException { requiredLength = lengthFieldOffset + lengthFieldLength; } - protected void invokeListener(InputStream inputStream) { - this.listener.onFragmentMessage(dataHeader, inputStream); + public void invokeListener(InputStream inputStream) { + this.listener.onFragmentMessage(inputStream); } protected byte[] readRawMessage(InputStream inputStream, int length) throws IOException { diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java index 5372eea9c03..c271ef212fe 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java @@ -39,13 +39,6 @@ interface FragmentListener { */ void onFragmentMessage(InputStream rawMessage); - /** - * @param rawMessage raw message - */ - default void onFragmentMessage(InputStream dataHeader, InputStream rawMessage) { - onFragmentMessage(rawMessage); - } - default void onClose() {} } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java index 64f25fff4c5..c80eb21b66d 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java @@ -18,6 +18,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.io.StreamUtils; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.remoting.http12.exception.UnimplementedException; import org.apache.dubbo.rpc.Invoker; @@ -28,6 +29,8 @@ import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache; import org.apache.dubbo.rpc.stub.StubSuppliers; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.List; @@ -124,9 +127,10 @@ public static MethodDescriptor findReflectionMethodDescriptor( } public static MethodDescriptor findTripleMethodDescriptor( - ServiceDescriptor serviceDescriptor, String methodName, byte[] data) { + ServiceDescriptor serviceDescriptor, String methodName, InputStream rawMessage) throws IOException { MethodDescriptor methodDescriptor = findReflectionMethodDescriptor(serviceDescriptor, methodName); if (methodDescriptor == null) { + byte[] data = StreamUtils.readBytes(rawMessage); List methodDescriptors = serviceDescriptor.getMethods(methodName); TripleRequestWrapper request = TripleRequestWrapper.parseFrom(data); String[] paramTypes = request.getArgTypes().toArray(new String[0]); @@ -141,6 +145,7 @@ public static MethodDescriptor findTripleMethodDescriptor( if (methodDescriptor == null) { throw new UnimplementedException("method:" + methodName); } + rawMessage.reset(); } return methodDescriptor; } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java index df181003799..aef98805c1b 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java @@ -16,38 +16,62 @@ */ package org.apache.dubbo.rpc.protocol.tri.h12.grpc; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.ConfigurationUtils; +import org.apache.dubbo.common.io.StreamUtils; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; +import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; +import org.apache.dubbo.rpc.model.FrameworkModel; +import org.apache.dubbo.rpc.model.MethodDescriptor; +import org.apache.dubbo.rpc.model.PackableMethod; +import org.apache.dubbo.rpc.model.PackableMethodFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -import com.google.protobuf.Message; - -import static org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE_CLASS_NAME; +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY; public class GrpcCompositeCodec implements HttpMessageCodec { - private final ProtobufHttpMessageCodec protobufHttpMessageCodec; + private static final String PACKABLE_METHOD_CACHE = "PACKABLE_METHOD_CACHE"; - private final WrapperHttpMessageCodec wrapperHttpMessageCodec; + private final URL url; - public GrpcCompositeCodec( - ProtobufHttpMessageCodec protobufHttpMessageCodec, WrapperHttpMessageCodec wrapperHttpMessageCodec) { - this.protobufHttpMessageCodec = protobufHttpMessageCodec; - this.wrapperHttpMessageCodec = wrapperHttpMessageCodec; - } + private final FrameworkModel frameworkModel; + + private final String mediaType; - public void setEncodeTypes(Class[] encodeTypes) { - this.wrapperHttpMessageCodec.setEncodeTypes(encodeTypes); + private PackableMethod packableMethod; + + public GrpcCompositeCodec(URL url, FrameworkModel frameworkModel, String mediaType) { + this.url = url; + this.frameworkModel = frameworkModel; + this.mediaType = mediaType; } - public void setDecodeTypes(Class[] decodeTypes) { - this.wrapperHttpMessageCodec.setDecodeTypes(decodeTypes); + public void loadPackableMethod(MethodDescriptor methodDescriptor) { + if (methodDescriptor instanceof PackableMethod) { + packableMethod = (PackableMethod) methodDescriptor; + return; + } + Map cacheMap = (Map) url.getServiceModel() + .getServiceMetadata() + .getAttributeMap() + .computeIfAbsent(PACKABLE_METHOD_CACHE, k -> new ConcurrentHashMap<>()); + packableMethod = cacheMap.computeIfAbsent(methodDescriptor, md -> frameworkModel + .getExtensionLoader(PackableMethodFactory.class) + .getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel()) + .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY)) + .create(methodDescriptor, url, mediaType)); } @Override @@ -58,34 +82,38 @@ public void encode(OutputStream outputStream, Object data, Charset charset) thro try { int compressed = 0; outputStream.write(compressed); - if (isProtobuf(data)) { - ProtobufWriter.write(protobufHttpMessageCodec, outputStream, data); - return; - } - // wrapper - wrapperHttpMessageCodec.encode(outputStream, data); - } catch (IOException e) { + byte[] bytes = packableMethod.packResponse(data); + writeLength(outputStream, bytes.length); + outputStream.write(bytes); + } catch (HttpStatusException e) { + throw e; + } catch (Exception e) { throw new EncodeException(e); } } @Override public Object decode(InputStream inputStream, Class targetType, Charset charset) throws DecodeException { - if (isProtoClass(targetType)) { - return protobufHttpMessageCodec.decode(inputStream, targetType, charset); + try { + byte[] data = StreamUtils.readBytes(inputStream); + return packableMethod.parseRequest(data); + } catch (HttpStatusException e) { + throw e; + } catch (Exception e) { + throw new DecodeException(e); } - return wrapperHttpMessageCodec.decode(inputStream, targetType, charset); } @Override public Object[] decode(InputStream inputStream, Class[] targetTypes, Charset charset) throws DecodeException { - if (targetTypes.length > 1) { - return wrapperHttpMessageCodec.decode(inputStream, targetTypes, charset); + Object message = decode(inputStream, ArrayUtils.isEmpty(targetTypes) ? null : targetTypes[0], charset); + if (message instanceof Object[]) { + return (Object[]) message; } - return HttpMessageCodec.super.decode(inputStream, targetTypes, charset); + return new Object[] {message}; } - private static void writeLength(OutputStream outputStream, int length) { + private void writeLength(OutputStream outputStream, int length) { try { outputStream.write(((length >> 24) & 0xFF)); outputStream.write(((length >> 16) & 0xFF)); @@ -100,39 +128,4 @@ private static void writeLength(OutputStream outputStream, int length) { public MediaType mediaType() { return MediaType.APPLICATION_GRPC; } - - private static boolean isProtobuf(Object data) { - if (data == null) { - return false; - } - return isProtoClass(data.getClass()); - } - - private static boolean isProtoClass(Class clazz) { - while (clazz != Object.class && clazz != null) { - Class[] interfaces = clazz.getInterfaces(); - if (interfaces.length > 0) { - for (Class clazzInterface : interfaces) { - if (PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) { - return true; - } - } - } - clazz = clazz.getSuperclass(); - } - return false; - } - - /** - * lazy init protobuf class - */ - private static class ProtobufWriter { - - private static void write(HttpMessageCodec codec, OutputStream outputStream, Object data) { - int serializedSize = ((Message) data).getSerializedSize(); - // write length - writeLength(outputStream, serializedSize); - codec.encode(outputStream, data); - } - } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java index 553c661cc0b..1fa6842b7dd 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java @@ -22,7 +22,6 @@ import org.apache.dubbo.remoting.http12.message.HttpMessageDecoderFactory; import org.apache.dubbo.remoting.http12.message.HttpMessageEncoderFactory; import org.apache.dubbo.remoting.http12.message.MediaType; -import org.apache.dubbo.remoting.utils.UrlUtils; import org.apache.dubbo.rpc.model.FrameworkModel; @Activate @@ -30,11 +29,7 @@ public class GrpcCompositeCodecFactory implements HttpMessageEncoderFactory, Htt @Override public HttpMessageCodec createCodec(URL url, FrameworkModel frameworkModel, String mediaType) { - String serializeName = UrlUtils.serializationOrDefault(url); - WrapperHttpMessageCodec wrapperHttpMessageCodec = new WrapperHttpMessageCodec(url, frameworkModel); - wrapperHttpMessageCodec.setSerializeType(serializeName); - ProtobufHttpMessageCodec protobufHttpMessageCodec = new ProtobufHttpMessageCodec(); - return new GrpcCompositeCodec(protobufHttpMessageCodec, wrapperHttpMessageCodec); + return new GrpcCompositeCodec(url, frameworkModel, mediaType); } @Override diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java index 1b25dc57a1c..d8a57e205a1 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java @@ -18,7 +18,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.constants.CommonConstants; -import org.apache.dubbo.common.io.StreamUtils; import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.remoting.http12.HttpHeaders; @@ -28,11 +27,11 @@ import org.apache.dubbo.remoting.http12.h2.Http2Header; import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver; import org.apache.dubbo.remoting.http12.h2.Http2TransportListener; -import org.apache.dubbo.remoting.http12.message.MethodMetadata; import org.apache.dubbo.remoting.http12.message.StreamingDecoder; import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.TriRpcStatus; import org.apache.dubbo.rpc.model.FrameworkModel; +import org.apache.dubbo.rpc.model.MethodDescriptor; import org.apache.dubbo.rpc.protocol.tri.DescriptorUtils; import org.apache.dubbo.rpc.protocol.tri.RpcInvocationBuildContext; import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor; @@ -40,8 +39,6 @@ import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener; import org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListener; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -155,39 +152,29 @@ public void onMessage(InputStream inputStream) { private class DetermineMethodDescriptorListener implements StreamingDecoder.FragmentListener { - @Override - public void onFragmentMessage(InputStream rawMessage) {} - @Override public void onClose() { getStreamingDecoder().close(); } @Override - public void onFragmentMessage(InputStream dataHeader, InputStream rawMessage) { + public void onFragmentMessage(InputStream rawMessage) { try { - ByteArrayOutputStream merged = - new ByteArrayOutputStream(dataHeader.available() + rawMessage.available()); - StreamUtils.copy(dataHeader, merged); - byte[] data = StreamUtils.readBytes(rawMessage); - RpcInvocationBuildContext context = getContext(); if (null == context.getMethodDescriptor()) { - context.setMethodDescriptor(DescriptorUtils.findTripleMethodDescriptor( - context.getServiceDescriptor(), context.getMethodName(), data)); + MethodDescriptor methodDescriptor = DescriptorUtils.findTripleMethodDescriptor( + context.getServiceDescriptor(), context.getMethodName(), rawMessage); + context.setMethodDescriptor(methodDescriptor); setHttpMessageListener(GrpcHttp2ServerTransportListener.super.buildHttpMessageListener()); // replace decoder GrpcCompositeCodec grpcCompositeCodec = (GrpcCompositeCodec) context.getHttpMessageDecoder(); - MethodMetadata methodMetadata = context.getMethodMetadata(); - grpcCompositeCodec.setDecodeTypes(methodMetadata.getActualRequestTypes()); - grpcCompositeCodec.setEncodeTypes(new Class[] {methodMetadata.getActualResponseType()}); + grpcCompositeCodec.loadPackableMethod(methodDescriptor); getServerChannelObserver().setResponseEncoder(grpcCompositeCodec); } - merged.write(data); - getHttpMessageListener().onMessage(new ByteArrayInputStream(merged.toByteArray())); + getStreamingDecoder().invokeListener(rawMessage); } catch (IOException e) { throw new DecodeException(e); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java deleted file mode 100644 index 81de958115a..00000000000 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/ProtobufHttpMessageCodec.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.protocol.tri.h12.grpc; - -import org.apache.dubbo.remoting.http12.exception.DecodeException; -import org.apache.dubbo.remoting.http12.exception.EncodeException; -import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; -import org.apache.dubbo.remoting.http12.message.MediaType; -import org.apache.dubbo.rpc.protocol.tri.SingleProtobufUtils; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.Charset; - -public class ProtobufHttpMessageCodec implements HttpMessageCodec { - - private static final MediaType MEDIA_TYPE = new MediaType("application", "x-protobuf"); - - @Override - public void encode(OutputStream outputStream, Object data, Charset charset) throws EncodeException { - try { - SingleProtobufUtils.serialize(data, outputStream); - } catch (IOException e) { - throw new EncodeException(e); - } - } - - @Override - public Object decode(InputStream inputStream, Class targetType, Charset charset) throws DecodeException { - try { - return SingleProtobufUtils.deserialize(inputStream, targetType); - } catch (IOException e) { - throw new DecodeException(e); - } - } - - @Override - public MediaType mediaType() { - return MEDIA_TYPE; - } -} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java deleted file mode 100644 index f7993c1ba86..00000000000 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/WrapperHttpMessageCodec.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.protocol.tri.h12.grpc; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.constants.CommonConstants; -import org.apache.dubbo.common.serialize.MultipleSerialization; -import org.apache.dubbo.config.Constants; -import org.apache.dubbo.remoting.http12.exception.DecodeException; -import org.apache.dubbo.remoting.http12.exception.EncodeException; -import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; -import org.apache.dubbo.remoting.http12.message.MediaType; -import org.apache.dubbo.remoting.transport.CodecSupport; -import org.apache.dubbo.rpc.model.FrameworkModel; -import org.apache.dubbo.rpc.protocol.tri.TripleConstant; -import org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.Charset; - -public class WrapperHttpMessageCodec implements HttpMessageCodec { - - private static final MediaType MEDIA_TYPE = new MediaType("application", "triple+wrapper"); - - private static final String DEFAULT_SERIALIZE_TYPE = "fastjson2"; - - private final MultipleSerialization serialization; - - private final URL url; - - private Class[] encodeTypes; - - private Class[] decodeTypes; - - private String serializeType = DEFAULT_SERIALIZE_TYPE; - - public WrapperHttpMessageCodec(URL url, FrameworkModel frameworkModel) { - this.url = url; - this.serialization = frameworkModel - .getExtensionLoader(MultipleSerialization.class) - .getExtension(url.getParameter(Constants.MULTI_SERIALIZATION_KEY, CommonConstants.DEFAULT_KEY)); - } - - public void setSerializeType(String serializeType) { - this.serializeType = serializeType; - } - - public void setEncodeTypes(Class[] encodeTypes) { - this.encodeTypes = encodeTypes; - } - - public void setDecodeTypes(Class[] decodeTypes) { - this.decodeTypes = decodeTypes; - } - - @Override - public void encode(OutputStream outputStream, Object data, Charset charset) throws EncodeException { - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - serialization.serialize(url, serializeType, encodeTypes[0], data, bos); - byte[] encoded = TripleCustomerProtocolWapper.TripleResponseWrapper.Builder.newBuilder() - .setSerializeType(serializeType) - .setType(encodeTypes[0].getName()) - .setData(bos.toByteArray()) - .build() - .toByteArray(); - writeLength(outputStream, encoded.length); - outputStream.write(encoded); - } catch (IOException e) { - throw new EncodeException(e); - } - } - - @Override - public void encode(OutputStream outputStream, Object[] data, Charset charset) throws EncodeException { - // TODO - } - - @Override - public Object decode(InputStream inputStream, Class targetType, Charset charset) throws DecodeException { - Object[] decode = this.decode(inputStream, new Class[] {targetType}, charset); - if (decode == null || decode.length == 0) { - return null; - } - return decode[0]; - } - - @Override - public Object[] decode(InputStream inputStream, Class[] targetTypes, Charset charset) throws DecodeException { - try { - int len; - byte[] data = new byte[4096]; - ByteArrayOutputStream bos = new ByteArrayOutputStream(4096); - while ((len = inputStream.read(data)) != -1) { - bos.write(data, 0, len); - } - TripleCustomerProtocolWapper.TripleRequestWrapper wrapper = - TripleCustomerProtocolWapper.TripleRequestWrapper.parseFrom(bos.toByteArray()); - final String serializeType = convertHessianFromWrapper(wrapper.getSerializeType()); - CodecSupport.checkSerialization(serializeType, url); - setSerializeType(wrapper.getSerializeType()); - Object[] ret = new Object[wrapper.getArgs().size()]; - for (int i = 0; i < wrapper.getArgs().size(); i++) { - ByteArrayInputStream in = - new ByteArrayInputStream(wrapper.getArgs().get(i)); - try { - ret[i] = this.serialization.deserialize(url, wrapper.getSerializeType(), targetTypes[i], in); - } catch (ClassNotFoundException e) { - throw new DecodeException(e); - } - } - return ret; - } catch (IOException e) { - throw new DecodeException(e); - } - } - - @Override - public MediaType mediaType() { - return MEDIA_TYPE; - } - - private static void writeLength(OutputStream outputStream, int length) throws IOException { - outputStream.write(((length >> 24) & 0xFF)); - outputStream.write(((length >> 16) & 0xFF)); - outputStream.write(((length >> 8) & 0xFF)); - outputStream.write((length & 0xFF)); - } - - private static String convertHessianFromWrapper(String serializeType) { - if (TripleConstant.HESSIAN4.equals(serializeType)) { - return TripleConstant.HESSIAN2; - } - return serializeType; - } -} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java index 9c46944ccf5..500e2abf398 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java @@ -210,7 +210,9 @@ protected void onFinally(Http2InputMessage message) {} @Override public void cancelByRemote(long errorCode) { serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode)); - serverCallListener.onCancel(errorCode); + if (serverCallListener != null) { + serverCallListener.onCancel(errorCode); + } } protected StreamingDecoder getStreamingDecoder() { From 648fae0d5faaa7107f232db48a1f5f6b7fccf371 Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Mon, 17 Jun 2024 17:59:11 +0800 Subject: [PATCH 2/3] Add custom function when Setting Method Descriptor --- .../tri/h12/AbstractServerTransportListener.java | 3 +++ .../h12/grpc/GrpcHttp2ServerTransportListener.java | 13 ++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java index d856dfad945..1060ea2c0ce 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java @@ -204,6 +204,7 @@ protected RpcInvocation buildRpcInvocation(RpcInvocationBuildContext context) { methodDescriptor = DescriptorUtils.findMethodDescriptor( context.getServiceDescriptor(), context.getMethodName(), context.isHasStub()); context.setMethodDescriptor(methodDescriptor); + onSettingMethodDescriptor(methodDescriptor); } MethodMetadata methodMetadata = context.getMethodMetadata(); if (methodMetadata == null) { @@ -280,4 +281,6 @@ protected final HttpMessageListener getHttpMessageListener() { protected void setHttpMessageListener(HttpMessageListener httpMessageListener) { this.httpMessageListener = httpMessageListener; } + + protected void onSettingMethodDescriptor(MethodDescriptor methodDescriptor) {} } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java index 82eaa8f1090..a0bf8f6366a 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java @@ -135,6 +135,13 @@ protected GrpcStreamingDecoder getStreamingDecoder() { return (GrpcStreamingDecoder) super.getStreamingDecoder(); } + @Override + protected void onSettingMethodDescriptor(MethodDescriptor methodDescriptor) { + GrpcCompositeCodec grpcCompositeCodec = (GrpcCompositeCodec) getContext().getHttpMessageDecoder(); + grpcCompositeCodec.loadPackableMethod(methodDescriptor); + super.onSettingMethodDescriptor(methodDescriptor); + } + private class LazyFindMethodListener implements HttpMessageListener { private final StreamingDecoder streamingDecoder; @@ -166,13 +173,9 @@ public void onFragmentMessage(InputStream rawMessage) { MethodDescriptor methodDescriptor = DescriptorUtils.findTripleMethodDescriptor( context.getServiceDescriptor(), context.getMethodName(), rawMessage); context.setMethodDescriptor(methodDescriptor); + onSettingMethodDescriptor(methodDescriptor); setHttpMessageListener(GrpcHttp2ServerTransportListener.super.buildHttpMessageListener()); - - // replace decoder - GrpcCompositeCodec grpcCompositeCodec = (GrpcCompositeCodec) context.getHttpMessageDecoder(); - grpcCompositeCodec.loadPackableMethod(methodDescriptor); - getServerChannelObserver().setResponseEncoder(grpcCompositeCodec); } getStreamingDecoder().invokeListener(rawMessage); From 3ed9ac277fee10bf806c633a25fd46f5f77bc90a Mon Sep 17 00:00:00 2001 From: TomlongTK Date: Mon, 17 Jun 2024 18:02:51 +0800 Subject: [PATCH 3/3] Format code --- .../tri/h12/grpc/GrpcHttp2ServerTransportListener.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java index a0bf8f6366a..3aa1bc0ac77 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java @@ -137,7 +137,8 @@ protected GrpcStreamingDecoder getStreamingDecoder() { @Override protected void onSettingMethodDescriptor(MethodDescriptor methodDescriptor) { - GrpcCompositeCodec grpcCompositeCodec = (GrpcCompositeCodec) getContext().getHttpMessageDecoder(); + GrpcCompositeCodec grpcCompositeCodec = + (GrpcCompositeCodec) getContext().getHttpMessageDecoder(); grpcCompositeCodec.loadPackableMethod(methodDescriptor); super.onSettingMethodDescriptor(methodDescriptor); }