diff --git a/labs/arthas-grpc-server/README.md b/labs/arthas-grpc-server/README.md new file mode 100644 index 0000000000..20bf89a4c4 --- /dev/null +++ b/labs/arthas-grpc-server/README.md @@ -0,0 +1,3 @@ +# Arthas Grpc + +这个模块提供了一个轻量级的 Grpc 实现,目前任在开发中 \ No newline at end of file diff --git a/labs/arthas-grpc-server/pom.xml b/labs/arthas-grpc-server/pom.xml new file mode 100644 index 0000000000..0074082ace --- /dev/null +++ b/labs/arthas-grpc-server/pom.xml @@ -0,0 +1,170 @@ + + + + + arthas-all + com.taobao.arthas + ${revision} + ../../pom.xml + + 4.0.0 + arthas-grpc-server + arthas-grpc-server + https://github.com/alibaba/arthas + + + 8 + 8 + UTF-8 + 1.46.0 + + + + + + io.grpc + grpc-bom + ${grpc.version} + pom + import + + + + + + + + + io.netty + netty-codec-http2 + 4.1.72.Final + + + + com.google.protobuf + protobuf-java + 3.19.2 + + + + + org.slf4j + slf4j-api + 2.0.12 + + + + + ch.qos.logback + logback-classic + 1.5.0 + + + + com.taobao.arthas + arthas-common + ${project.version} + + + + + + io.grpc + grpc-netty + provided + + + io.netty + netty-codec-http2 + + + + + io.grpc + grpc-services + provided + + + org.junit.vintage + junit-vintage-engine + test + + + org.junit.jupiter + junit-jupiter + test + + + + javax.annotation + javax.annotation-api + 1.3.2 + provided + true + + + com.alibaba.arthas + arthas-repackage-logger + + + ch.qos.logback + logback-classic + + + ch.qos.logback + logback-core + + + org.slf4j + slf4j-api + + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + ${basedir}/src/main/proto + com.google.protobuf:protoc:3.11.0:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.28.0:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + + + kr.motd.maven + os-maven-plugin + 1.4.1.Final + + + + + + + mac + + + mac + + + + osx-x86_64 + + + + \ No newline at end of file diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcBootstrap.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcBootstrap.java new file mode 100644 index 0000000000..c953ffc954 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcBootstrap.java @@ -0,0 +1,13 @@ +package com.taobao.arthas.grpc.server; + +/** + * @author: FengYe + * @date: 2024/10/13 02:40 + * @description: ArthasGrpcServerBootstrap + */ +public class ArthasGrpcBootstrap { + public static void main(String[] args) { + ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(9090, null); + arthasGrpcServer.start(); + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcServer.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcServer.java new file mode 100644 index 0000000000..a931f54284 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcServer.java @@ -0,0 +1,73 @@ +package com.taobao.arthas.grpc.server; + +import com.alibaba.arthas.deps.ch.qos.logback.classic.Level; +import com.alibaba.arthas.deps.ch.qos.logback.classic.LoggerContext; +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.taobao.arthas.grpc.server.handler.GrpcDispatcher; +import com.taobao.arthas.grpc.server.handler.Http2Handler; +import com.taobao.arthas.grpc.server.handler.executor.GrpcExecutorFactory; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; + +import java.lang.invoke.MethodHandles; + +/** + * @author: FengYe + * @date: 2024/7/3 上午12:30 + * @description: ArthasGrpcServer + */ +public class ArthasGrpcServer { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); + + private int port = 9090; + + private String grpcServicePackageName; + + public ArthasGrpcServer(int port, String grpcServicePackageName) { + this.port = port; + this.grpcServicePackageName = grpcServicePackageName; + } + + public void start() { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(10); + + GrpcDispatcher grpcDispatcher = new GrpcDispatcher(); + grpcDispatcher.loadGrpcService(grpcServicePackageName); + GrpcExecutorFactory grpcExecutorFactory = new GrpcExecutorFactory(); + grpcExecutorFactory.loadExecutor(grpcDispatcher); + + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ch.pipeline().addLast(Http2FrameCodecBuilder.forServer().build()); + ch.pipeline().addLast(new Http2Handler(grpcDispatcher, grpcExecutorFactory)); + } + }); + Channel channel = b.bind(port).sync().channel(); + logger.info("ArthasGrpcServer start successfully on port: {}", port); + channel.closeFuture().sync(); + } catch (InterruptedException e) { + logger.error("ArthasGrpcServer start error", e); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcDispatcher.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcDispatcher.java new file mode 100644 index 0000000000..4ad3313f11 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcDispatcher.java @@ -0,0 +1,198 @@ +package com.taobao.arthas.grpc.server.handler; + + +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.taobao.arthas.grpc.server.handler.annotation.GrpcMethod; +import com.taobao.arthas.grpc.server.handler.annotation.GrpcService; +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import com.taobao.arthas.grpc.server.utils.ReflectUtil; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * @author: FengYe + * @date: 2024/9/6 01:12 + * @description: GrpcDelegrate + */ +public class GrpcDispatcher { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); + + public static final String DEFAULT_GRPC_SERVICE_PACKAGE_NAME = "com.taobao.arthas.grpc.server.service.impl"; + + public static Map grpcInvokeMap = new HashMap<>(); + +// public static Map clientStreamInvokeMap = new HashMap<>(); + + public static Map requestParseFromMap = new HashMap<>(); + + public static Map requestToByteArrayMap = new HashMap<>(); + + public static Map responseParseFromMap = new HashMap<>(); + + public static Map responseToByteArrayMap = new HashMap<>(); + + public static Map grpcInvokeTypeMap = new HashMap<>(); + + public void loadGrpcService(String grpcServicePackageName) { + List> classes = ReflectUtil.findClasses(Optional.ofNullable(grpcServicePackageName).orElse(DEFAULT_GRPC_SERVICE_PACKAGE_NAME)); + for (Class clazz : classes) { + if (clazz.isAnnotationPresent(GrpcService.class)) { + try { + // 处理 service + GrpcService grpcService = clazz.getAnnotation(GrpcService.class); + Object instance = clazz.getDeclaredConstructor().newInstance(); + // 处理 method + MethodHandles.Lookup lookup = MethodHandles.lookup(); + Method[] declaredMethods = clazz.getDeclaredMethods(); + for (Method method : declaredMethods) { + if (method.isAnnotationPresent(GrpcMethod.class)) { + GrpcMethod grpcMethod = method.getAnnotation(GrpcMethod.class); + MethodHandle grpcInvoke = lookup.unreflect(method); + String grpcMethodKey = generateGrpcMethodKey(grpcService.value(), grpcMethod.value()); + grpcInvokeTypeMap.put(grpcMethodKey, grpcMethod.grpcType()); + grpcInvokeMap.put(grpcMethodKey, grpcInvoke.bindTo(instance)); + + + Class requestClass = null; + Class responseClass = null; + if (GrpcInvokeTypeEnum.UNARY.equals(grpcMethod.grpcType())) { + requestClass = grpcInvoke.type().parameterType(1); + responseClass = grpcInvoke.type().returnType(); + } else if (GrpcInvokeTypeEnum.CLIENT_STREAM.equals(grpcMethod.grpcType()) || GrpcInvokeTypeEnum.BI_STREAM.equals(grpcMethod.grpcType())) { + responseClass = getInnerGenericClass(method.getGenericParameterTypes()[0]); + requestClass = getInnerGenericClass(method.getGenericReturnType()); + } else if (GrpcInvokeTypeEnum.SERVER_STREAM.equals(grpcMethod.grpcType())) { + requestClass = getInnerGenericClass(method.getGenericParameterTypes()[0]); + responseClass = getInnerGenericClass(method.getGenericParameterTypes()[1]); + } + MethodHandle requestParseFrom = lookup.findStatic(requestClass, "parseFrom", MethodType.methodType(requestClass, byte[].class)); + MethodHandle responseParseFrom = lookup.findStatic(responseClass, "parseFrom", MethodType.methodType(responseClass, byte[].class)); + MethodHandle requestToByteArray = lookup.findVirtual(requestClass, "toByteArray", MethodType.methodType(byte[].class)); + MethodHandle responseToByteArray = lookup.findVirtual(responseClass, "toByteArray", MethodType.methodType(byte[].class)); + requestParseFromMap.put(grpcMethodKey, requestParseFrom); + responseParseFromMap.put(grpcMethodKey, responseParseFrom); + requestToByteArrayMap.put(grpcMethodKey, requestToByteArray); + responseToByteArrayMap.put(grpcMethodKey, responseToByteArray); + + +// switch (grpcMethod.grpcType()) { +// case UNARY: +// unaryInvokeMap.put(grpcMethodKey, grpcInvoke.bindTo(instance)); +// return; +// case CLIENT_STREAM: +// Object invoke = grpcInvoke.bindTo(instance).invoke(); +// if (!(invoke instanceof StreamObserver)) { +// throw new RuntimeException(grpcMethodKey + " return class is not StreamObserver!"); +// } +// clientStreamInvokeMap.put(grpcMethodKey, (StreamObserver) invoke); +// return; +// case SERVER_STREAM: +// return; +// case BI_STREAM: +// return; +// } + } + } + } catch (Throwable e) { + logger.error("GrpcDispatcher loadGrpcService error.", e); + } + } + } + } + + public GrpcResponse doUnaryExecute(String service, String method, byte[] arg) throws Throwable { + MethodHandle methodHandle = grpcInvokeMap.get(generateGrpcMethodKey(service, method)); + MethodType type = grpcInvokeMap.get(generateGrpcMethodKey(service, method)).type(); + Object req = requestParseFromMap.get(generateGrpcMethodKey(service, method)).invoke(arg); + Object execute = methodHandle.invoke(req); + GrpcResponse grpcResponse = new GrpcResponse(); + grpcResponse.setClazz(type.returnType()); + grpcResponse.setService(service); + grpcResponse.setMethod(method); + grpcResponse.writeResponseData(execute); + return grpcResponse; + } + + public GrpcResponse unaryExecute(GrpcRequest request) throws Throwable { + MethodHandle methodHandle = grpcInvokeMap.get(request.getGrpcMethodKey()); + MethodType type = grpcInvokeMap.get(request.getGrpcMethodKey()).type(); + Object req = requestParseFromMap.get(request.getGrpcMethodKey()).invoke(request.readData()); + Object execute = methodHandle.invoke(req); + GrpcResponse grpcResponse = new GrpcResponse(); + grpcResponse.setClazz(type.returnType()); + grpcResponse.setService(request.getService()); + grpcResponse.setMethod(request.getMethod()); + grpcResponse.writeResponseData(execute); + return grpcResponse; + } + + public StreamObserver clientStreamExecute(GrpcRequest request, StreamObserver responseObserver) throws Throwable { + MethodHandle methodHandle = grpcInvokeMap.get(request.getGrpcMethodKey()); + return (StreamObserver) methodHandle.invoke(responseObserver); + } + + public void serverStreamExecute(GrpcRequest request, StreamObserver responseObserver) throws Throwable { + MethodHandle methodHandle = grpcInvokeMap.get(request.getGrpcMethodKey()); + Object req = requestParseFromMap.get(request.getGrpcMethodKey()).invoke(request.readData()); + methodHandle.invoke(req, responseObserver); + } + + public StreamObserver biStreamExecute(GrpcRequest request, StreamObserver responseObserver) throws Throwable { + MethodHandle methodHandle = grpcInvokeMap.get(request.getGrpcMethodKey()); + return (StreamObserver) methodHandle.invoke(responseObserver); + } + + /** + * 获取指定 service method 对应的入参类型 + * + * @param serviceName + * @param methodName + * @return + */ + public static Class getRequestClass(String serviceName, String methodName) { + //protobuf 规范只能有单入参 + return Optional.ofNullable(grpcInvokeMap.get(generateGrpcMethodKey(serviceName, methodName))).orElseThrow(() -> new RuntimeException("The specified grpc method does not exist")).type().parameterArray()[0]; + } + + public static String generateGrpcMethodKey(String serviceName, String methodName) { + return serviceName + "." + methodName; + } + + public static void checkGrpcType(GrpcRequest request) { + request.setGrpcType( + Optional.ofNullable(grpcInvokeTypeMap.get(generateGrpcMethodKey(request.getService(), request.getMethod()))) + .orElse(GrpcInvokeTypeEnum.UNARY) + ); + request.setStreamFirstData(true); + } + + public static Class getInnerGenericClass(Type type) { + if (type instanceof Class) { + return (Class) type; + } + if (type instanceof ParameterizedType) { + ParameterizedType paramType = (ParameterizedType) type; + Type[] actualTypeArguments = paramType.getActualTypeArguments(); + if (actualTypeArguments.length > 0) { + Type innerType = actualTypeArguments[0]; // 获取第一个实际类型参数 + if (innerType instanceof ParameterizedType) { + return getInnerGenericClass(innerType); // 递归调用获取最内层类型 + } else if (innerType instanceof Class) { + return (Class) innerType; // 直接返回 Class 类型 + } + } + } + return null; // 如果没有找到对应的类型 + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcRequest.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcRequest.java new file mode 100644 index 0000000000..53b6fb1341 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcRequest.java @@ -0,0 +1,193 @@ +package com.taobao.arthas.grpc.server.handler; + +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import com.taobao.arthas.grpc.server.utils.ByteUtil; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http2.Http2Headers; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; + +/** + * @author: FengYe + * @date: 2024/9/4 23:07 + * @description: GrpcRequest grpc 请求体 + */ +public class GrpcRequest { + + /** + * 请求对应的 streamId + */ + private Integer streamId; + + /** + * 请求的 service + */ + private String service; + + /** + * 请求的 method + */ + private String method; + + /** + * 二进制数据,可能包含多个 grpc body,每个 body 都带有 5 个 byte 的前缀,分别是 boolean compressed - int length + */ + private ByteBuf byteData; + + /** + * 二进制数据的长度 + */ + private int length; + + /** + * 请求class + */ + private Class clazz; + + /** + * 是否是 grpc 流式请求 + */ + private boolean stream; + + /** + * 是否是 grpc 流式请求的第一个data + */ + private boolean streamFirstData; + + /** + * http2 headers + */ + private Http2Headers headers; + + /** + * grpc 调用类型 + */ + private GrpcInvokeTypeEnum grpcType; + + + public GrpcRequest(Integer streamId, String path, String method) { + this.streamId = streamId; + this.service = path; + this.method = method; + this.byteData = ByteUtil.newByteBuf(); + } + + public void writeData(ByteBuf byteBuf) { + byte[] bytes = ByteUtil.getBytes(byteBuf); + if (bytes.length == 0) { + return; + } + byte[] decompressedData = decompressGzip(bytes); + if (decompressedData == null) { + return; + } + byteData.writeBytes(ByteUtil.newByteBuf(decompressedData)); + } + + /** + * 读取部分数据 + * + * @return + */ + public synchronized byte[] readData() { + if (byteData.readableBytes() == 0) { + return null; + } + boolean compressed = byteData.readBoolean(); + int length = byteData.readInt(); + byte[] bytes = new byte[length]; + byteData.readBytes(bytes); + return bytes; + } + + public void clearData() { + byteData.clear(); + } + + private byte[] decompressGzip(byte[] compressedData) { + boolean isGzip = (compressedData.length > 2 && (compressedData[0] & 0xff) == 0x1f && (compressedData[1] & 0xff) == 0x8b); + if (isGzip) { + try (InputStream byteStream = new ByteArrayInputStream(compressedData); + GZIPInputStream gzipStream = new GZIPInputStream(byteStream); + ByteArrayOutputStream out = new ByteArrayOutputStream()) { + + byte[] buffer = new byte[1024]; + int len; + while ((len = gzipStream.read(buffer)) != -1) { + out.write(buffer, 0, len); + } + return out.toByteArray(); + } catch (IOException e) { + System.err.println("Failed to decompress GZIP data: " + e.getMessage()); + // Optionally rethrow the exception or return an Optional + return null; // or throw new RuntimeException(e); + } + } else { + return compressedData; + } + } + + public String getGrpcMethodKey() { + return service + "." + method; + } + + public Integer getStreamId() { + return streamId; + } + + public String getService() { + return service; + } + + public String getMethod() { + return method; + } + + public ByteBuf getByteData() { + return byteData; + } + + public Class getClazz() { + return clazz; + } + + public void setClazz(Class clazz) { + this.clazz = clazz; + } + + public boolean isStream() { + return stream; + } + + public void setStream(boolean stream) { + this.stream = stream; + } + + public boolean isStreamFirstData() { + return streamFirstData; + } + + public void setStreamFirstData(boolean streamFirstData) { + this.streamFirstData = streamFirstData; + } + + public Http2Headers getHeaders() { + return headers; + } + + public void setHeaders(Http2Headers headers) { + this.headers = headers; + } + + public GrpcInvokeTypeEnum getGrpcType() { + return grpcType; + } + + public void setGrpcType(GrpcInvokeTypeEnum grpcType) { + this.grpcType = grpcType; + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcResponse.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcResponse.java new file mode 100644 index 0000000000..a275dd0144 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcResponse.java @@ -0,0 +1,114 @@ +package com.taobao.arthas.grpc.server.handler; + + +import arthas.grpc.common.ArthasGrpc; +import com.taobao.arthas.grpc.server.handler.annotation.GrpcMethod; +import com.taobao.arthas.grpc.server.handler.annotation.GrpcService; +import com.taobao.arthas.grpc.server.utils.ByteUtil; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.Http2Headers; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +/** + * @author: FengYe + * @date: 2024/9/5 02:05 + * @description: GrpcResponse + */ +public class GrpcResponse { + + private Map headers; + + /** + * 请求的 service + */ + private String service; + + /** + * 请求的 method + */ + private String method; + + /** + * 二进制数据 + */ + private ByteBuf byteData; + + /** + * 响应class + */ + private Class clazz; + + { + headers = new HashMap<>(); + headers.put("content-type", "application/grpc"); + headers.put("grpc-encoding", "identity"); + headers.put("grpc-accept-encoding", "identity,deflate,gzip"); + } + + public GrpcResponse() { + } + + public GrpcResponse(Method method) { + this.service = method.getDeclaringClass().getAnnotation(GrpcService.class).value(); + this.method = method.getAnnotation(GrpcMethod.class).value(); + } + + public Http2Headers getEndHeader() { + Http2Headers endHeader = new DefaultHttp2Headers().status("200"); + headers.forEach(endHeader::set); + return endHeader; + } + + public Http2Headers getEndStreamHeader() { + return new DefaultHttp2Headers().set("grpc-status", "0"); + } + + public static Http2Headers getDefaultEndStreamHeader() { + return new DefaultHttp2Headers().set("grpc-status", "0"); + } + + public ByteBuf getResponseData() { + return byteData; + } + + public void writeResponseData(Object response) { + byte[] encode = null; + try { + if (ArthasGrpc.ErrorRes.class.equals(clazz)) { + encode = ((ArthasGrpc.ErrorRes) response).toByteArray(); + } else { + encode = (byte[]) GrpcDispatcher.responseToByteArrayMap.get(GrpcDispatcher.generateGrpcMethodKey(service, method)).invoke(response); + } + } catch (Throwable e) { + throw new RuntimeException(e); + } + this.byteData = ByteUtil.newByteBuf(); + this.byteData.writeBoolean(false); + this.byteData.writeInt(encode.length); + this.byteData.writeBytes(encode); + } + + public void setClazz(Class clazz) { + this.clazz = clazz; + } + + public String getService() { + return service; + } + + public void setService(String service) { + this.service = service; + } + + public String getMethod() { + return method; + } + + public void setMethod(String method) { + this.method = method; + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2FrameRequest.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2FrameRequest.java new file mode 100644 index 0000000000..1e8058dcce --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2FrameRequest.java @@ -0,0 +1,16 @@ +package com.taobao.arthas.grpc.server.handler; + +import java.util.List; + +/** + * @author: FengYe + * @date: 2024/9/18 23:12 + * @description: 一个 http2 的 frame 中可能存在多个 grpc 的请求体 + */ +public class Http2FrameRequest { + + /** + * grpc 请求体 + */ + private List grpcRequests; +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2Handler.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2Handler.java new file mode 100644 index 0000000000..d0068b213f --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2Handler.java @@ -0,0 +1,113 @@ +package com.taobao.arthas.grpc.server.handler; + + +import arthas.grpc.common.ArthasGrpc; +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.taobao.arthas.grpc.server.handler.executor.GrpcExecutorFactory; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.http2.*; +import io.netty.util.concurrent.EventExecutorGroup; + +import java.io.*; +import java.lang.invoke.MethodHandles; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author: FengYe + * @date: 2024/7/7 下午9:58 + * @description: Http2Handler + */ +public class Http2Handler extends SimpleChannelInboundHandler { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); + + private GrpcDispatcher grpcDispatcher; + + private GrpcExecutorFactory grpcExecutorFactory; + + private final EventExecutorGroup executorGroup = new NioEventLoopGroup(); + + /** + * 暂存收到的所有请求的数据 + */ + private ConcurrentHashMap dataBuffer = new ConcurrentHashMap<>(); + + private static final String HEADER_PATH = ":path"; + + public Http2Handler(GrpcDispatcher grpcDispatcher, GrpcExecutorFactory grpcExecutorFactory) { + this.grpcDispatcher = grpcDispatcher; + this.grpcExecutorFactory = grpcExecutorFactory; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) throws IOException { + if (frame instanceof Http2HeadersFrame) { + handleGrpcRequest((Http2HeadersFrame) frame, ctx); + } else if (frame instanceof Http2DataFrame) { + handleGrpcData((Http2DataFrame) frame, ctx); + } else if (frame instanceof Http2ResetFrame) { + handleResetStream((Http2ResetFrame) frame, ctx); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + + private void handleGrpcRequest(Http2HeadersFrame headersFrame, ChannelHandlerContext ctx) { + int id = headersFrame.stream().id(); + String path = headersFrame.headers().get(HEADER_PATH).toString(); + // 去掉前面的斜杠,然后按斜杠分割 + String[] parts = path.substring(1).split("/"); + GrpcRequest grpcRequest = new GrpcRequest(headersFrame.stream().id(), parts[0], parts[1]); + grpcRequest.setHeaders(headersFrame.headers()); + GrpcDispatcher.checkGrpcType(grpcRequest); + dataBuffer.put(id, grpcRequest); + System.out.println("Received headers: " + headersFrame.headers()); + } + + private void handleGrpcData(Http2DataFrame dataFrame, ChannelHandlerContext ctx) throws IOException { + int streamId = dataFrame.stream().id(); + GrpcRequest grpcRequest = dataBuffer.get(streamId); + ByteBuf content = dataFrame.content(); + grpcRequest.writeData(content); + + executorGroup.execute(() -> { + try { + grpcExecutorFactory.getExecutor(grpcRequest.getGrpcType()).execute(grpcRequest, dataFrame, ctx); + } catch (Throwable e) { + logger.error("handleGrpcData error", e); + processError(ctx, e, dataFrame.stream()); + } + }); + } + + private void handleResetStream(Http2ResetFrame resetFrame, ChannelHandlerContext ctx) { + int id = resetFrame.stream().id(); + System.out.println("handleResetStream"); + dataBuffer.remove(id); + } + + private void processError(ChannelHandlerContext ctx, Throwable e, Http2FrameStream stream) { + GrpcResponse response = new GrpcResponse(); + ArthasGrpc.ErrorRes.Builder builder = ArthasGrpc.ErrorRes.newBuilder(); + ArthasGrpc.ErrorRes errorRes = builder.setErrorMsg(Optional.ofNullable(e.getMessage()).orElse("")).build(); + response.setClazz(ArthasGrpc.ErrorRes.class); + response.writeResponseData(errorRes); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndHeader()).stream(stream)); + ctx.writeAndFlush(new DefaultHttp2DataFrame(response.getResponseData()).stream(stream)); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(stream)); + } +} \ No newline at end of file diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/StreamObserver.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/StreamObserver.java new file mode 100644 index 0000000000..570661ee3b --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/StreamObserver.java @@ -0,0 +1,13 @@ +package com.taobao.arthas.grpc.server.handler; + +/** + * @author: FengYe + * @date: 2024/10/24 00:22 + * @description: StreamObserver + */ +public interface StreamObserver { + + void onNext(V req); + + void onCompleted(); +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/annotation/GrpcMethod.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/annotation/GrpcMethod.java new file mode 100644 index 0000000000..dec0e2bef0 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/annotation/GrpcMethod.java @@ -0,0 +1,23 @@ +package com.taobao.arthas.grpc.server.handler.annotation; + +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author: FengYe + * @date: 2024/9/6 01:57 + * @description: GrpcMethod + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface GrpcMethod { + String value() default ""; + + boolean stream() default false; + + GrpcInvokeTypeEnum grpcType() default GrpcInvokeTypeEnum.UNARY; +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/annotation/GrpcService.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/annotation/GrpcService.java new file mode 100644 index 0000000000..c54dc7d624 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/annotation/GrpcService.java @@ -0,0 +1,17 @@ +package com.taobao.arthas.grpc.server.handler.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author: FengYe + * @date: 2024/9/6 01:57 + * @description: GrpcService + */ +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface GrpcService { + String value() default ""; +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/constant/GrpcInvokeTypeEnum.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/constant/GrpcInvokeTypeEnum.java new file mode 100644 index 0000000000..46fb252b15 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/constant/GrpcInvokeTypeEnum.java @@ -0,0 +1,13 @@ +package com.taobao.arthas.grpc.server.handler.constant; + +/** + * @author: FengYe + * @date: 2024/10/24 01:06 + * @description: StreamTypeEnum + */ +public enum GrpcInvokeTypeEnum { + UNARY, + SERVER_STREAM, + CLIENT_STREAM, + BI_STREAM; +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/AbstractGrpcExecutor.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/AbstractGrpcExecutor.java new file mode 100644 index 0000000000..d814749dd3 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/AbstractGrpcExecutor.java @@ -0,0 +1,22 @@ +package com.taobao.arthas.grpc.server.handler.executor; + +import com.taobao.arthas.grpc.server.handler.GrpcDispatcher; +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.StreamObserver; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author: FengYe + * @date: 2024/10/24 02:07 + * @description: AbstractGrpcExecutor + */ +public abstract class AbstractGrpcExecutor implements GrpcExecutor{ + protected GrpcDispatcher dispatcher; + + protected ConcurrentHashMap> requestStreamObserverMap = new ConcurrentHashMap<>(); + + public AbstractGrpcExecutor(GrpcDispatcher dispatcher) { + this.dispatcher = dispatcher; + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/BiStreamExecutor.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/BiStreamExecutor.java new file mode 100644 index 0000000000..1291463997 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/BiStreamExecutor.java @@ -0,0 +1,66 @@ +package com.taobao.arthas.grpc.server.handler.executor; + +import com.taobao.arthas.grpc.server.handler.GrpcDispatcher; +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.GrpcResponse; +import com.taobao.arthas.grpc.server.handler.StreamObserver; +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2DataFrame; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author: FengYe + * @date: 2024/10/24 01:52 + * @description: BiStreamProcessor + */ +public class BiStreamExecutor extends AbstractGrpcExecutor { + + public BiStreamExecutor(GrpcDispatcher dispatcher) { + super(dispatcher); + } + + @Override + public GrpcInvokeTypeEnum supportGrpcType() { + return GrpcInvokeTypeEnum.BI_STREAM; + } + + @Override + public void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable { + Integer streamId = request.getStreamId(); + + StreamObserver requestObserver = requestStreamObserverMap.computeIfAbsent(streamId, id->{ + StreamObserver responseObserver = new StreamObserver() { + AtomicBoolean sendHeader = new AtomicBoolean(false); + + @Override + public void onNext(GrpcResponse res) { + // 控制流只能响应一次header + if (!sendHeader.get()) { + sendHeader.compareAndSet(false, true); + context.writeAndFlush(new DefaultHttp2HeadersFrame(res.getEndHeader()).stream(frame.stream())); + } + context.writeAndFlush(new DefaultHttp2DataFrame(res.getResponseData()).stream(frame.stream())); + } + + @Override + public void onCompleted() { + context.writeAndFlush(new DefaultHttp2HeadersFrame(GrpcResponse.getDefaultEndStreamHeader(), true).stream(frame.stream())); + } + }; + try { + return dispatcher.biStreamExecute(request, responseObserver); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }); + + requestObserver.onNext(request); + if (frame.isEndStream()) { + requestObserver.onCompleted(); + } + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/ClientStreamExecutor.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/ClientStreamExecutor.java new file mode 100644 index 0000000000..d58547928a --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/ClientStreamExecutor.java @@ -0,0 +1,66 @@ +package com.taobao.arthas.grpc.server.handler.executor; + +import com.taobao.arthas.grpc.server.handler.GrpcDispatcher; +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.GrpcResponse; +import com.taobao.arthas.grpc.server.handler.StreamObserver; +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2DataFrame; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author: FengYe + * @date: 2024/10/24 01:51 + * @description: UnaryProcessor + */ +public class ClientStreamExecutor extends AbstractGrpcExecutor { + + public ClientStreamExecutor(GrpcDispatcher dispatcher) { + super(dispatcher); + } + + @Override + public GrpcInvokeTypeEnum supportGrpcType() { + return GrpcInvokeTypeEnum.CLIENT_STREAM; + } + + @Override + public void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable { + Integer streamId = request.getStreamId(); + + StreamObserver requestObserver = requestStreamObserverMap.computeIfAbsent(streamId, id->{ + StreamObserver responseObserver = new StreamObserver() { + AtomicBoolean sendHeader = new AtomicBoolean(false); + + @Override + public void onNext(GrpcResponse res) { + // 控制流只能响应一次header + if (!sendHeader.get()) { + sendHeader.compareAndSet(false, true); + context.writeAndFlush(new DefaultHttp2HeadersFrame(res.getEndHeader()).stream(frame.stream())); + } + context.writeAndFlush(new DefaultHttp2DataFrame(res.getResponseData()).stream(frame.stream())); + } + + @Override + public void onCompleted() { + context.writeAndFlush(new DefaultHttp2HeadersFrame(GrpcResponse.getDefaultEndStreamHeader(), true).stream(frame.stream())); + } + }; + try { + return dispatcher.clientStreamExecute(request, responseObserver); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }); + + requestObserver.onNext(request); + if (frame.isEndStream()) { + requestObserver.onCompleted(); + } + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/GrpcExecutor.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/GrpcExecutor.java new file mode 100644 index 0000000000..6f1fa7f00b --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/GrpcExecutor.java @@ -0,0 +1,17 @@ +package com.taobao.arthas.grpc.server.handler.executor; + +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2DataFrame; + +/** + * @author: FengYe + * @date: 2024/10/24 01:50 + * @description: GrpcProcessor + */ +public interface GrpcExecutor { + GrpcInvokeTypeEnum supportGrpcType(); + + void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable; +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/GrpcExecutorFactory.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/GrpcExecutorFactory.java new file mode 100644 index 0000000000..c102a25764 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/GrpcExecutorFactory.java @@ -0,0 +1,55 @@ +package com.taobao.arthas.grpc.server.handler.executor; + +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.taobao.arthas.grpc.server.handler.GrpcDispatcher; +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import com.taobao.arthas.grpc.server.utils.ReflectUtil; + +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author: FengYe + * @date: 2024/10/24 01:56 + * @description: GrpcExecutorFactory + */ +public class GrpcExecutorFactory { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); + + public static final String DEFAULT_GRPC_EXECUTOR_PACKAGE_NAME = "com.taobao.arthas.grpc.server.handler.executor"; + + private final Map map = new HashMap<>(); + + public void loadExecutor(GrpcDispatcher dispatcher) { + List> classes = ReflectUtil.findClasses(DEFAULT_GRPC_EXECUTOR_PACKAGE_NAME); + for (Class clazz : classes) { + if (GrpcExecutor.class.isAssignableFrom(clazz)) { + try { + if (AbstractGrpcExecutor.class.equals(clazz) || GrpcExecutor.class.equals(clazz)) { + continue; + } + if (AbstractGrpcExecutor.class.isAssignableFrom(clazz)) { + Constructor constructor = clazz.getConstructor(GrpcDispatcher.class); + GrpcExecutor executor = (GrpcExecutor) constructor.newInstance(dispatcher); + map.put(executor.supportGrpcType(), executor); + } else { + Constructor constructor = clazz.getConstructor(); + GrpcExecutor executor = (GrpcExecutor) constructor.newInstance(); + map.put(executor.supportGrpcType(), executor); + } + } catch (Exception e) { + logger.error("GrpcExecutorFactory loadExecutor error", e); + } + } + } + } + + public GrpcExecutor getExecutor(GrpcInvokeTypeEnum grpcType) { + return map.get(grpcType); + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/ServerStreamExecutor.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/ServerStreamExecutor.java new file mode 100644 index 0000000000..5192fa41de --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/ServerStreamExecutor.java @@ -0,0 +1,57 @@ +package com.taobao.arthas.grpc.server.handler.executor; + +import com.taobao.arthas.grpc.server.handler.GrpcDispatcher; +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.GrpcResponse; +import com.taobao.arthas.grpc.server.handler.StreamObserver; +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2DataFrame; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author: FengYe + * @date: 2024/10/24 01:51 + * @description: UnaryProcessor + */ +public class ServerStreamExecutor extends AbstractGrpcExecutor { + + public ServerStreamExecutor(GrpcDispatcher dispatcher) { + super(dispatcher); + } + + @Override + public GrpcInvokeTypeEnum supportGrpcType() { + return GrpcInvokeTypeEnum.SERVER_STREAM; + } + + @Override + public void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable { + StreamObserver responseObserver = new StreamObserver() { + AtomicBoolean sendHeader = new AtomicBoolean(false); + + @Override + public void onNext(GrpcResponse res) { + // 控制流只能响应一次header + if (!sendHeader.get()) { + sendHeader.compareAndSet(false, true); + context.writeAndFlush(new DefaultHttp2HeadersFrame(res.getEndHeader()).stream(frame.stream())); + } + context.writeAndFlush(new DefaultHttp2DataFrame(res.getResponseData()).stream(frame.stream())); + } + + @Override + public void onCompleted() { + context.writeAndFlush(new DefaultHttp2HeadersFrame(GrpcResponse.getDefaultEndStreamHeader(), true).stream(frame.stream())); + } + }; + try { + dispatcher.serverStreamExecute(request, responseObserver); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/UnaryExecutor.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/UnaryExecutor.java new file mode 100644 index 0000000000..0995bd3f89 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/executor/UnaryExecutor.java @@ -0,0 +1,38 @@ +package com.taobao.arthas.grpc.server.handler.executor; + +import com.taobao.arthas.grpc.server.handler.GrpcDispatcher; +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.GrpcResponse; +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2DataFrame; + +/** + * @author: FengYe + * @date: 2024/10/24 01:51 + * @description: UnaryProcessor + */ +public class UnaryExecutor extends AbstractGrpcExecutor { + + public UnaryExecutor(GrpcDispatcher dispatcher) { + super(dispatcher); + } + + @Override + public GrpcInvokeTypeEnum supportGrpcType() { + return GrpcInvokeTypeEnum.UNARY; + } + + @Override + public void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable { + // 一元调用,等到 endStream 再响应 + if (frame.isEndStream()) { + GrpcResponse response = dispatcher.unaryExecute(request); + context.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndHeader()).stream(frame.stream())); + context.writeAndFlush(new DefaultHttp2DataFrame(response.getResponseData()).stream(frame.stream())); + context.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(frame.stream())); + } + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/ArthasSampleService.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/ArthasSampleService.java new file mode 100644 index 0000000000..f0098f93c5 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/ArthasSampleService.java @@ -0,0 +1,26 @@ +package com.taobao.arthas.grpc.server.service; + +import arthas.grpc.unittest.ArthasUnittest; +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.GrpcResponse; +import com.taobao.arthas.grpc.server.handler.StreamObserver; + + +/** + * @author: FengYe + * @date: 2024/6/30 下午11:42 + * @description: ArthasSampleService + */ +public interface ArthasSampleService { + ArthasUnittest.ArthasUnittestResponse unary(ArthasUnittest.ArthasUnittestRequest command); + + ArthasUnittest.ArthasUnittestResponse unaryAddSum(ArthasUnittest.ArthasUnittestRequest command); + + ArthasUnittest.ArthasUnittestResponse unaryGetSum(ArthasUnittest.ArthasUnittestRequest command); + + StreamObserver> clientStreamSum(StreamObserver> observer); + + void serverStream(ArthasUnittest.ArthasUnittestRequest request, StreamObserver> observer); + + StreamObserver> biStream(StreamObserver> observer); +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/impl/ArthasSampleServiceImpl.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/impl/ArthasSampleServiceImpl.java new file mode 100644 index 0000000000..a3961cb31b --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/impl/ArthasSampleServiceImpl.java @@ -0,0 +1,133 @@ +package com.taobao.arthas.grpc.server.service.impl; + +import arthas.grpc.unittest.ArthasUnittest; +import com.google.protobuf.InvalidProtocolBufferException; +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.GrpcResponse; +import com.taobao.arthas.grpc.server.handler.StreamObserver; +import com.taobao.arthas.grpc.server.handler.annotation.GrpcMethod; +import com.taobao.arthas.grpc.server.handler.annotation.GrpcService; +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import com.taobao.arthas.grpc.server.service.ArthasSampleService; +import com.taobao.arthas.grpc.server.utils.ByteUtil; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author: FengYe + * @date: 2024/6/30 下午11:43 + * @description: ArthasSampleServiceImpl + */ +@GrpcService("arthas.grpc.unittest.ArthasUnittestService") +public class ArthasSampleServiceImpl implements ArthasSampleService { + + private ConcurrentHashMap map = new ConcurrentHashMap<>(); + + @Override + @GrpcMethod(value = "unary") + public ArthasUnittest.ArthasUnittestResponse unary(ArthasUnittest.ArthasUnittestRequest command) { + ArthasUnittest.ArthasUnittestResponse.Builder builder = ArthasUnittest.ArthasUnittestResponse.newBuilder(); + builder.setMessage(command.getMessage()); + return builder.build(); + } + + @Override + @GrpcMethod(value = "unaryAddSum") + public ArthasUnittest.ArthasUnittestResponse unaryAddSum(ArthasUnittest.ArthasUnittestRequest command) { + ArthasUnittest.ArthasUnittestResponse.Builder builder = ArthasUnittest.ArthasUnittestResponse.newBuilder(); + builder.setMessage(command.getMessage()); + map.merge(command.getId(), command.getNum(), Integer::sum); + return builder.build(); + } + + @Override + @GrpcMethod(value = "unaryGetSum") + public ArthasUnittest.ArthasUnittestResponse unaryGetSum(ArthasUnittest.ArthasUnittestRequest command) { + ArthasUnittest.ArthasUnittestResponse.Builder builder = ArthasUnittest.ArthasUnittestResponse.newBuilder(); + builder.setMessage(command.getMessage()); + Integer sum = map.getOrDefault(command.getId(), 0); + builder.setNum(sum); + return builder.build(); + } + + @Override + @GrpcMethod(value = "clientStreamSum", grpcType = GrpcInvokeTypeEnum.CLIENT_STREAM) + public StreamObserver> clientStreamSum(StreamObserver> observer) { + return new StreamObserver>() { + AtomicInteger sum = new AtomicInteger(0); + + @Override + public void onNext(GrpcRequest req) { + try { + byte[] bytes = req.readData(); + while (bytes != null && bytes.length != 0) { + ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.parseFrom(bytes); + sum.addAndGet(request.getNum()); + bytes = req.readData(); + } + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onCompleted() { + ArthasUnittest.ArthasUnittestResponse response = ArthasUnittest.ArthasUnittestResponse.newBuilder() + .setNum(sum.get()) + .build(); + GrpcResponse grpcResponse = new GrpcResponse<>(); + grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService"); + grpcResponse.setMethod("clientStreamSum"); + grpcResponse.writeResponseData(response); + observer.onNext(grpcResponse); + observer.onCompleted(); + } + }; + } + + @Override + @GrpcMethod(value = "serverStream", grpcType = GrpcInvokeTypeEnum.SERVER_STREAM) + public void serverStream(ArthasUnittest.ArthasUnittestRequest request, StreamObserver> observer) { + + for (int i = 0; i < 5; i++) { + ArthasUnittest.ArthasUnittestResponse response = ArthasUnittest.ArthasUnittestResponse.newBuilder() + .setMessage("Server response " + i + " to " + request.getMessage()) + .build(); + GrpcResponse grpcResponse = new GrpcResponse<>(); + grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService"); + grpcResponse.setMethod("serverStream"); + grpcResponse.writeResponseData(response); + observer.onNext(grpcResponse); + } + observer.onCompleted(); + } + + @Override + @GrpcMethod(value = "biStream", grpcType = GrpcInvokeTypeEnum.BI_STREAM) + public StreamObserver> biStream(StreamObserver> observer) { + return new StreamObserver>() { + @Override + public void onNext(GrpcRequest req) { + try { + byte[] bytes = req.readData(); + while (bytes != null && bytes.length != 0) { + GrpcResponse grpcResponse = new GrpcResponse<>(); + grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService"); + grpcResponse.setMethod("biStream"); + grpcResponse.writeResponseData(ArthasUnittest.ArthasUnittestResponse.parseFrom(bytes)); + observer.onNext(grpcResponse); + bytes = req.readData(); + } + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + }; + } +} \ No newline at end of file diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/utils/ByteUtil.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/utils/ByteUtil.java new file mode 100644 index 0000000000..2bd239521d --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/utils/ByteUtil.java @@ -0,0 +1,33 @@ +package com.taobao.arthas.grpc.server.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + +/** + * @author: FengYe + * @date: 2024/9/5 00:51 + * @description: ByteUtil + */ +public class ByteUtil { + + public static ByteBuf newByteBuf() { + return PooledByteBufAllocator.DEFAULT.buffer(); + } + + public static ByteBuf newByteBuf(byte[] bytes) { + return PooledByteBufAllocator.DEFAULT.buffer(bytes.length).writeBytes(bytes); + } + + public static byte[] getBytes(ByteBuf buf) { + if (buf.hasArray()) { + // 如果 ByteBuf 是一个支持底层数组的实现,直接获取数组 + return buf.array(); + } else { + // 创建一个新的 byte 数组 + byte[] bytes = new byte[buf.readableBytes()]; + // 将 ByteBuf 的内容复制到 byte 数组中 + buf.getBytes(buf.readerIndex(), bytes); + return bytes; + } + } +} diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/utils/ReflectUtil.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/utils/ReflectUtil.java new file mode 100644 index 0000000000..c6e3e053ed --- /dev/null +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/utils/ReflectUtil.java @@ -0,0 +1,35 @@ +package com.taobao.arthas.grpc.server.utils; + +import java.io.File; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +/** + * @author: FengYe + * @date: 2024/9/6 02:20 + * @description: ReflectUtil + */ +public class ReflectUtil { + public static List> findClasses(String packageName) { + List> classes = new ArrayList<>(); + String path = packageName.replace('.', '/'); + try { + URL resource = Thread.currentThread().getContextClassLoader().getResource(path); + if (resource != null) { + File directory = new File(resource.toURI()); + if (directory.exists()) { + for (File file : directory.listFiles()) { + if (file.isFile() && file.getName().endsWith(".class")) { + String className = packageName + '.' + file.getName().replace(".class", ""); + classes.add(Class.forName(className)); + } + } + } + } + } catch (Exception e) { + + } + return classes; + } +} diff --git a/labs/arthas-grpc-server/src/main/proto/arthasGrpc.proto b/labs/arthas-grpc-server/src/main/proto/arthasGrpc.proto new file mode 100644 index 0000000000..103bb9b4c4 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/proto/arthasGrpc.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package arthas.grpc.common; + +message ErrorRes { + string errorMsg = 1; +} \ No newline at end of file diff --git a/labs/arthas-grpc-server/src/main/proto/arthasUnittest.proto b/labs/arthas-grpc-server/src/main/proto/arthasUnittest.proto new file mode 100644 index 0000000000..6b925dcce9 --- /dev/null +++ b/labs/arthas-grpc-server/src/main/proto/arthasUnittest.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package arthas.grpc.unittest; + +service ArthasUnittestService { + rpc unary(ArthasUnittestRequest) returns (ArthasUnittestResponse); + rpc unaryAddSum(ArthasUnittestRequest) returns (ArthasUnittestResponse); + rpc unaryGetSum(ArthasUnittestRequest) returns (ArthasUnittestResponse); + rpc clientStreamSum(stream ArthasUnittestRequest) returns (ArthasUnittestResponse); + rpc serverStream(ArthasUnittestRequest) returns (stream ArthasUnittestResponse); + rpc biStream(stream ArthasUnittestRequest) returns (stream ArthasUnittestResponse); +} + +message ArthasUnittestRequest { + int32 id = 1; + string message = 2; + int32 num = 3; +} + +message ArthasUnittestResponse{ + int32 id = 1; + string message = 2; + int32 num = 3; +} diff --git a/labs/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java b/labs/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java new file mode 100644 index 0000000000..0181a94258 --- /dev/null +++ b/labs/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java @@ -0,0 +1,268 @@ +package unittest.grpc; + +import arthas.grpc.unittest.ArthasUnittest; +import arthas.grpc.unittest.ArthasUnittestServiceGrpc; +import com.taobao.arthas.grpc.server.ArthasGrpcServer; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author: FengYe + * @date: 2024/9/24 00:17 + * @description: GrpcUnaryTest + */ +public class GrpcTest { + private static final String HOST = "localhost"; + private static final int PORT = 9090; + private static final String HOST_PORT = HOST + ":" + PORT; + private static final String UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME = "unittest.grpc.service.impl"; + private ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub blockingStub = null; + Random random = new Random(); + ExecutorService threadPool = Executors.newFixedThreadPool(10); + + @Before + public void startServer() { + Thread grpcWebProxyStart = new Thread(() -> { + ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(PORT, UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME); + arthasGrpcServer.start(); + }); + grpcWebProxyStart.start(); + } + + @Test + public void testUnary() { + ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT) + .usePlaintext() + .build(); + + ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel); + + try { + ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("unaryInvoke").build(); + ArthasUnittest.ArthasUnittestResponse res = stub.unary(request); + System.out.println(res.getMessage()); + } finally { + channel.shutdownNow(); + } + } + + @Test + public void testUnarySum() throws InterruptedException { + ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT) + .usePlaintext() + .build(); + + ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel); + for (int i = 0; i < 10; i++) { + AtomicInteger sum = new AtomicInteger(0); + int finalId = i; + for (int j = 0; j < 100; j++) { + int num = random.nextInt(101); + sum.addAndGet(num); + threadPool.submit(() -> { + addSum(stub, finalId, num); + }); + } + Thread.sleep(2000L); + int grpcSum = getSum(stub, finalId); + System.out.println("id:" + finalId + ",sum:" + sum.get() + ",grpcSum:" + grpcSum); + Assert.assertEquals(sum.get(), grpcSum); + } + channel.shutdown(); + } + + // 用于测试客户端流 + @Test + public void testClientStreamSum() throws Throwable { + ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) + .usePlaintext() + .build(); + + ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); + + AtomicInteger sum = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(1); + StreamObserver clientStreamObserver = stub.clientStreamSum(new StreamObserver() { + @Override + public void onNext(ArthasUnittest.ArthasUnittestResponse response) { + System.out.println("local sum:" + sum + ", grpc sum:" + response.getNum()); + Assert.assertEquals(sum.get(), response.getNum()); + } + + @Override + public void onError(Throwable t) { + System.err.println("Error: " + t); + } + + @Override + public void onCompleted() { + System.out.println("testClientStreamSum completed."); + latch.countDown(); + } + }); + + for (int j = 0; j < 1000; j++) { + int num = random.nextInt(1001); + sum.addAndGet(num); + clientStreamObserver.onNext(ArthasUnittest.ArthasUnittestRequest.newBuilder().setNum(num).build()); + } + + clientStreamObserver.onCompleted(); + latch.await(); + channel.shutdown(); + } + + // 用于测试请求数据隔离性 + @Test + public void testDataIsolation() throws InterruptedException { + ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) + .usePlaintext() + .build(); + + ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); + for (int i = 0; i < 10; i++) { + threadPool.submit(() -> { + AtomicInteger sum = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(1); + StreamObserver clientStreamObserver = stub.clientStreamSum(new StreamObserver() { + @Override + public void onNext(ArthasUnittest.ArthasUnittestResponse response) { + System.out.println("local sum:" + sum + ", grpc sum:" + response.getNum()); + Assert.assertEquals(sum.get(), response.getNum()); + } + + @Override + public void onError(Throwable t) { + System.err.println("Error: " + t); + } + + @Override + public void onCompleted() { + System.out.println("testDataIsolation completed."); + latch.countDown(); + } + }); + + for (int j = 0; j < 5; j++) { + int num = random.nextInt(101); + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + sum.addAndGet(num); + clientStreamObserver.onNext(ArthasUnittest.ArthasUnittestRequest.newBuilder().setNum(num).build()); + } + + clientStreamObserver.onCompleted(); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + channel.shutdown(); + }); + } + Thread.sleep(7000L); + } + + @Test + public void testServerStream() throws InterruptedException { + ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) + .usePlaintext() + .build(); + + ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); + + ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("serverStream").build(); + + stub.serverStream(request, new StreamObserver() { + @Override + public void onNext(ArthasUnittest.ArthasUnittestResponse value) { + System.out.println("testServerStream client receive: " + value.getMessage()); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + System.out.println("testServerStream completed"); + } + }); + + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + channel.shutdown(); + } + } + + // 用于测试双向流 + @Test + public void testBiStream() throws Throwable { + ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) + .usePlaintext() + .build(); + + ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); + + CountDownLatch latch = new CountDownLatch(1); + StreamObserver biStreamObserver = stub.biStream(new StreamObserver() { + @Override + public void onNext(ArthasUnittest.ArthasUnittestResponse response) { + System.out.println("testBiStream receive: "+response.getMessage()); + } + + @Override + public void onError(Throwable t) { + System.err.println("Error: " + t); + } + + @Override + public void onCompleted() { + System.out.println("testBiStream completed."); + latch.countDown(); + } + }); + + String[] messages = new String[]{"testBiStream1","testBiStream2","testBiStream3"}; + for (String msg : messages) { + ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage(msg).build(); + biStreamObserver.onNext(request); + } + + Thread.sleep(2000); + biStreamObserver.onCompleted(); + latch.await(); + channel.shutdown(); + } + + private void addSum(ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub, int id, int num) { + ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setId(id).setNum(num).build(); + ArthasUnittest.ArthasUnittestResponse res = stub.unaryAddSum(request); + } + + private int getSum(ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub, int id) { + ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setId(id).build(); + ArthasUnittest.ArthasUnittestResponse res = stub.unaryGetSum(request); + return res.getNum(); + } +} diff --git a/labs/arthas-grpc-server/src/test/java/unittest/grpc/service/ArthasUnittestService.java b/labs/arthas-grpc-server/src/test/java/unittest/grpc/service/ArthasUnittestService.java new file mode 100644 index 0000000000..6ba3081d21 --- /dev/null +++ b/labs/arthas-grpc-server/src/test/java/unittest/grpc/service/ArthasUnittestService.java @@ -0,0 +1,26 @@ +package unittest.grpc.service; + +import arthas.grpc.unittest.ArthasUnittest.ArthasUnittestRequest; +import arthas.grpc.unittest.ArthasUnittest.ArthasUnittestResponse; +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.GrpcResponse; +import com.taobao.arthas.grpc.server.handler.StreamObserver; + +/** + * @author: FengYe + * @date: 2024/6/30 下午11:42 + * @description: ArthasSampleService + */ +public interface ArthasUnittestService { + ArthasUnittestResponse unary(ArthasUnittestRequest command); + + ArthasUnittestResponse unaryAddSum(ArthasUnittestRequest command); + + ArthasUnittestResponse unaryGetSum(ArthasUnittestRequest command); + + StreamObserver> clientStreamSum(StreamObserver> observer); + + void serverStream(ArthasUnittestRequest request, StreamObserver> observer); + + StreamObserver> biStream(StreamObserver> observer); +} diff --git a/labs/arthas-grpc-server/src/test/java/unittest/grpc/service/impl/ArthasUnittestServiceImpl.java b/labs/arthas-grpc-server/src/test/java/unittest/grpc/service/impl/ArthasUnittestServiceImpl.java new file mode 100644 index 0000000000..aba8faff18 --- /dev/null +++ b/labs/arthas-grpc-server/src/test/java/unittest/grpc/service/impl/ArthasUnittestServiceImpl.java @@ -0,0 +1,134 @@ +package unittest.grpc.service.impl; + +import arthas.grpc.unittest.ArthasUnittest; +import arthas.grpc.unittest.ArthasUnittest.ArthasUnittestRequest; +import arthas.grpc.unittest.ArthasUnittest.ArthasUnittestResponse; +import com.google.protobuf.InvalidProtocolBufferException; +import com.taobao.arthas.grpc.server.handler.GrpcRequest; +import com.taobao.arthas.grpc.server.handler.GrpcResponse; +import com.taobao.arthas.grpc.server.handler.StreamObserver; +import com.taobao.arthas.grpc.server.handler.annotation.GrpcMethod; +import com.taobao.arthas.grpc.server.handler.annotation.GrpcService; +import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum; +import unittest.grpc.service.ArthasUnittestService; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author: FengYe + * @date: 2024/6/30 下午11:43 + * @description: ArthasSampleServiceImpl + */ +@GrpcService("arthas.grpc.unittest.ArthasUnittestService") +public class ArthasUnittestServiceImpl implements ArthasUnittestService { + + private ConcurrentHashMap map = new ConcurrentHashMap<>(); + + @Override + @GrpcMethod(value = "unary") + public ArthasUnittestResponse unary(ArthasUnittestRequest command) { + ArthasUnittestResponse.Builder builder = ArthasUnittestResponse.newBuilder(); + builder.setMessage(command.getMessage()); + return builder.build(); + } + + @Override + @GrpcMethod(value = "unaryAddSum") + public ArthasUnittestResponse unaryAddSum(ArthasUnittestRequest command) { + ArthasUnittestResponse.Builder builder = ArthasUnittestResponse.newBuilder(); + builder.setMessage(command.getMessage()); + map.merge(command.getId(), command.getNum(), Integer::sum); + return builder.build(); + } + + @Override + @GrpcMethod(value = "unaryGetSum") + public ArthasUnittestResponse unaryGetSum(ArthasUnittestRequest command) { + ArthasUnittestResponse.Builder builder = ArthasUnittestResponse.newBuilder(); + builder.setMessage(command.getMessage()); + Integer sum = map.getOrDefault(command.getId(), 0); + builder.setNum(sum); + return builder.build(); + } + + @Override + @GrpcMethod(value = "clientStreamSum", grpcType = GrpcInvokeTypeEnum.CLIENT_STREAM) + public StreamObserver> clientStreamSum(StreamObserver> observer) { + return new StreamObserver>() { + AtomicInteger sum = new AtomicInteger(0); + + @Override + public void onNext(GrpcRequest req) { + try { + byte[] bytes = req.readData(); + while (bytes != null && bytes.length != 0) { + ArthasUnittestRequest request = ArthasUnittestRequest.parseFrom(bytes); + sum.addAndGet(request.getNum()); + bytes = req.readData(); + } + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onCompleted() { + ArthasUnittestResponse response = ArthasUnittestResponse.newBuilder() + .setNum(sum.get()) + .build(); + GrpcResponse grpcResponse = new GrpcResponse<>(); + grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService"); + grpcResponse.setMethod("clientStreamSum"); + grpcResponse.writeResponseData(response); + observer.onNext(grpcResponse); + observer.onCompleted(); + } + }; + } + + @Override + @GrpcMethod(value = "serverStream", grpcType = GrpcInvokeTypeEnum.SERVER_STREAM) + public void serverStream(ArthasUnittestRequest request, StreamObserver> observer) { + + for (int i = 0; i < 5; i++) { + ArthasUnittest.ArthasUnittestResponse response = ArthasUnittest.ArthasUnittestResponse.newBuilder() + .setMessage("Server response " + i + " to " + request.getMessage()) + .build(); + GrpcResponse grpcResponse = new GrpcResponse<>(); + grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService"); + grpcResponse.setMethod("serverStream"); + grpcResponse.writeResponseData(response); + observer.onNext(grpcResponse); + } + observer.onCompleted(); + } + + @Override + @GrpcMethod(value = "biStream", grpcType = GrpcInvokeTypeEnum.BI_STREAM) + public StreamObserver> biStream(StreamObserver> observer) { + return new StreamObserver>() { + @Override + public void onNext(GrpcRequest req) { + try { + byte[] bytes = req.readData(); + while (bytes != null && bytes.length != 0) { + GrpcResponse grpcResponse = new GrpcResponse<>(); + grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService"); + grpcResponse.setMethod("biStream"); + grpcResponse.writeResponseData(ArthasUnittestResponse.parseFrom(bytes)); + observer.onNext(grpcResponse); + bytes = req.readData(); + } + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + }; + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index bf39b515e6..3250fdb135 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,7 @@ labs/cluster-management/native-agent-management-web labs/cluster-management/native-agent-proxy labs/cluster-management/native-agent-common + labs/arthas-grpc-server