From 44f58cf96dd17fd6774fb968f8d508b5d092baec Mon Sep 17 00:00:00 2001 From: "382673304@qq.com" <382673304@qq.com> Date: Fri, 16 Oct 2020 13:45:47 +0800 Subject: [PATCH 1/2] feat: add dubbo-gen stream support --- common/proxy/proxy_factory/default.go | 14 +++- .../protoc-gen-dubbo/plugin/dubbo/dubbo.go | 73 ++++++------------- 2 files changed, 36 insertions(+), 51 deletions(-) diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 1b8ca22201..2eb38ce826 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -94,6 +94,9 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati proto := url.Protocol path := strings.TrimPrefix(url.Path, "/") args := invocation.Arguments() + for _, v := range args { + logger.Debugf("temp arg = %+v", v) + } // get service svc := common.ServiceMap.GetService(proto, path) @@ -105,6 +108,7 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati // get method method := svc.Method()[methodName] + logger.Debugf("method = %s", method.Method().Name) if method == nil { logger.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto) result.SetError(perrors.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto)) @@ -112,10 +116,12 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati } in := []reflect.Value{svc.Rcvr()} + logger.Debugf("in = %+v, len = %d", in, len(in)) if method.CtxType() != nil { ctx = context.WithValue(ctx, constant.AttachmentKey, invocation.Attachments()) in = append(in, method.SuiteContext(ctx)) } + logger.Debugf("after check method.CtxType") // prepare argv if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { @@ -134,15 +140,19 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati } } + logger.Debugf("after check ArgsType and so on") + // prepare replyv var replyv reflect.Value + logger.Debugf("argsType len = %d, name = %s", len(method.ArgsType()), method.ArgsType()[0].Name()) + if method.ReplyType() == nil && len(method.ArgsType()) > 0 { replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) in = append(in, replyv) } - + logger.Debugf("before call") returnValues := method.Method().Func.Call(in) - + logger.Debugf("after call") var retErr interface{} if len(returnValues) == 1 { retErr = returnValues[0].Interface() diff --git a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go index 1af4fafdc6..a911e3c88c 100644 --- a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go +++ b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go @@ -211,7 +211,7 @@ func (g *dubboGrpc) generateService(file *generator.FileDescriptor, service *pb. g.P("HandlerType: (*", grpcserverType, ")(nil),") g.P("Methods: []", grpcPkg, ".MethodDesc{") for i, method := range service.Method { - if method.GetServerStreaming() || method.GetClientStreaming() { + if method.GetClientStreaming() || method.GetServerStreaming() { continue } g.P("{") @@ -220,7 +220,23 @@ func (g *dubboGrpc) generateService(file *generator.FileDescriptor, service *pb. g.P("},") } g.P("},") - g.P("Streams: []", grpcPkg, ".StreamDesc{},") + g.P("Streams: []", grpcPkg, ".StreamDesc{") + for i, method := range service.Method { + if !method.GetClientStreaming() && !method.GetServerStreaming() { + continue + } + g.P("{") + g.P("StreamName: ", strconv.Quote(method.GetName()), ",") + g.P("Handler: ", handlerNames[i], ",") + if method.GetServerStreaming() { + g.P("ServerStreams: true,") + } + if method.GetClientStreaming() { + g.P("ClientStreams: true,") + } + g.P("},") + } + g.P("},") g.P("Metadata: \"", file.GetName(), "\",") g.P("}") g.P("}") @@ -240,7 +256,7 @@ func (g *dubboGrpc) generateClientSignature(servName string, method *pb.MethodDe } respName := "out *" + g.typeName(method.GetOutputType()) if method.GetServerStreaming() || method.GetClientStreaming() { - respName = servName + "_" + generator.CamelCase(origMethName) + "Client" + respName = "stream *" + servName + "_" + generator.CamelCase(origMethName) + "Client" } return fmt.Sprintf("%s func(ctx %s.Context%s, %s) error", methName, contextPkg, reqArg, respName) } @@ -252,7 +268,6 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method * methName := generator.CamelCase(method.GetName()) hname := fmt.Sprintf("_DUBBO_%s_%s_Handler", servName, methName) inType := g.typeName(method.GetInputType()) - outType := g.typeName(method.GetOutputType()) if !method.GetServerStreaming() && !method.GetClientStreaming() { g.P("func ", hname, "(srv interface{}, ctx ", contextPkg, ".Context, dec func(interface{}) error, interceptor ", grpcPkg, ".UnaryServerInterceptor) (interface{}, error) {") @@ -286,6 +301,11 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method * } streamType := unexport(servName) + methName + "Server" g.P("func ", hname, "(srv interface{}, stream ", grpcPkg, ".ServerStream) error {") + g.P("_, ok := srv.(dgrpc.DubboGrpcService)") + g.P(`invo := invocation.NewRPCInvocation("`, methName, `", nil, nil)`) + g.P("if !ok {") + g.P("fmt.Println(invo)") + g.P("}") if !method.GetClientStreaming() { g.P("m := new(", inType, ")") g.P("if err := stream.RecvMsg(m); err != nil { return err }") @@ -296,50 +316,5 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method * g.P("}") g.P() - genSend := method.GetServerStreaming() - genSendAndClose := !method.GetServerStreaming() - genRecv := method.GetClientStreaming() - - // Stream auxiliary types and methods. - g.P("type ", servName, "_", methName, "Server interface {") - if genSend { - g.P("Send(*", outType, ") error") - } - if genSendAndClose { - g.P("SendAndClose(*", outType, ") error") - } - if genRecv { - g.P("Recv() (*", inType, ", error)") - } - g.P(grpcPkg, ".ServerStream") - g.P("}") - g.P() - - g.P("type ", streamType, " struct {") - g.P(grpcPkg, ".ServerStream") - g.P("}") - g.P() - - if genSend { - g.P("func (x *", streamType, ") Send(m *", outType, ") error {") - g.P("return x.ServerStream.SendMsg(m)") - g.P("}") - g.P() - } - if genSendAndClose { - g.P("func (x *", streamType, ") SendAndClose(m *", outType, ") error {") - g.P("return x.ServerStream.SendMsg(m)") - g.P("}") - g.P() - } - if genRecv { - g.P("func (x *", streamType, ") Recv() (*", inType, ", error) {") - g.P("m := new(", inType, ")") - g.P("if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err }") - g.P("return m, nil") - g.P("}") - g.P() - } - return hname } From 87228566f173d1d2e110b43c41a5a37abf5d7b53 Mon Sep 17 00:00:00 2001 From: "382673304@qq.com" <382673304@qq.com> Date: Fri, 16 Oct 2020 13:56:35 +0800 Subject: [PATCH 2/2] chore: fix clientImpl signature --- common/proxy/proxy_factory/default.go | 14 ++------------ .../grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go | 5 +++-- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 2eb38ce826..1b8ca22201 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -94,9 +94,6 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati proto := url.Protocol path := strings.TrimPrefix(url.Path, "/") args := invocation.Arguments() - for _, v := range args { - logger.Debugf("temp arg = %+v", v) - } // get service svc := common.ServiceMap.GetService(proto, path) @@ -108,7 +105,6 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati // get method method := svc.Method()[methodName] - logger.Debugf("method = %s", method.Method().Name) if method == nil { logger.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto) result.SetError(perrors.Errorf("cannot find method [%s] of service [%s] in %s", methodName, path, proto)) @@ -116,12 +112,10 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati } in := []reflect.Value{svc.Rcvr()} - logger.Debugf("in = %+v, len = %d", in, len(in)) if method.CtxType() != nil { ctx = context.WithValue(ctx, constant.AttachmentKey, invocation.Attachments()) in = append(in, method.SuiteContext(ctx)) } - logger.Debugf("after check method.CtxType") // prepare argv if (len(method.ArgsType()) == 1 || len(method.ArgsType()) == 2 && method.ReplyType() == nil) && method.ArgsType()[0].String() == "[]interface {}" { @@ -140,19 +134,15 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati } } - logger.Debugf("after check ArgsType and so on") - // prepare replyv var replyv reflect.Value - logger.Debugf("argsType len = %d, name = %s", len(method.ArgsType()), method.ArgsType()[0].Name()) - if method.ReplyType() == nil && len(method.ArgsType()) > 0 { replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) in = append(in, replyv) } - logger.Debugf("before call") + returnValues := method.Method().Func.Call(in) - logger.Debugf("after call") + var retErr interface{} if len(returnValues) == 1 { retErr = returnValues[0].Interface() diff --git a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go index a911e3c88c..a9f50e8287 100644 --- a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go +++ b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go @@ -211,7 +211,7 @@ func (g *dubboGrpc) generateService(file *generator.FileDescriptor, service *pb. g.P("HandlerType: (*", grpcserverType, ")(nil),") g.P("Methods: []", grpcPkg, ".MethodDesc{") for i, method := range service.Method { - if method.GetClientStreaming() || method.GetServerStreaming() { + if method.GetServerStreaming() || method.GetClientStreaming() { continue } g.P("{") @@ -256,7 +256,8 @@ func (g *dubboGrpc) generateClientSignature(servName string, method *pb.MethodDe } respName := "out *" + g.typeName(method.GetOutputType()) if method.GetServerStreaming() || method.GetClientStreaming() { - respName = "stream *" + servName + "_" + generator.CamelCase(origMethName) + "Client" + respName = servName + "_" + generator.CamelCase(origMethName) + "Client" + return fmt.Sprintf("%s func(ctx %s.Context%s) (%s, error)", methName, contextPkg, reqArg, respName) } return fmt.Sprintf("%s func(ctx %s.Context%s, %s) error", methName, contextPkg, reqArg, respName) }