Skip to content

Commit

Permalink
Merge pull request #794 from LaurenceLiZhixin/feat/dubbo-gen-stream
Browse files Browse the repository at this point in the history
[Ftr]: dubbo-gen  stream support
  • Loading branch information
zouyx authored Oct 22, 2020
2 parents ea00aab + e5e906b commit d8b2beb
Showing 1 changed file with 23 additions and 47 deletions.
70 changes: 23 additions & 47 deletions protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,23 @@ func (g *dubboGrpc) generateService(file *generator.FileDescriptor, service *pb.
g.P("},")
}
g.P("},")
g.P("Streams: []", grpcPkg, ".StreamDesc{},")
g.P("Streams: []", grpcPkg, ".StreamDesc{")
for i, method := range service.Method {
if !method.GetClientStreaming() && !method.GetServerStreaming() {
continue
}
g.P("{")
g.P("StreamName: ", strconv.Quote(method.GetName()), ",")
g.P("Handler: ", handlerNames[i], ",")
if method.GetServerStreaming() {
g.P("ServerStreams: true,")
}
if method.GetClientStreaming() {
g.P("ClientStreams: true,")
}
g.P("},")
}
g.P("},")
g.P("Metadata: \"", file.GetName(), "\",")
g.P("}")
g.P("}")
Expand All @@ -241,6 +257,7 @@ func (g *dubboGrpc) generateClientSignature(servName string, method *pb.MethodDe
respName := "out *" + g.typeName(method.GetOutputType())
if method.GetServerStreaming() || method.GetClientStreaming() {
respName = servName + "_" + generator.CamelCase(origMethName) + "Client"
return fmt.Sprintf("%s func(ctx %s.Context%s) (%s, error)", methName, contextPkg, reqArg, respName)
}
return fmt.Sprintf("%s func(ctx %s.Context%s, %s) error", methName, contextPkg, reqArg, respName)
}
Expand All @@ -252,7 +269,6 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method *
methName := generator.CamelCase(method.GetName())
hname := fmt.Sprintf("_DUBBO_%s_%s_Handler", servName, methName)
inType := g.typeName(method.GetInputType())
outType := g.typeName(method.GetOutputType())

if !method.GetServerStreaming() && !method.GetClientStreaming() {
g.P("func ", hname, "(srv interface{}, ctx ", contextPkg, ".Context, dec func(interface{}) error, interceptor ", grpcPkg, ".UnaryServerInterceptor) (interface{}, error) {")
Expand Down Expand Up @@ -286,6 +302,11 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method *
}
streamType := unexport(servName) + methName + "Server"
g.P("func ", hname, "(srv interface{}, stream ", grpcPkg, ".ServerStream) error {")
g.P("_, ok := srv.(dgrpc.DubboGrpcService)")
g.P(`invo := invocation.NewRPCInvocation("`, methName, `", nil, nil)`)
g.P("if !ok {")
g.P("fmt.Println(invo)")
g.P("}")
if !method.GetClientStreaming() {
g.P("m := new(", inType, ")")
g.P("if err := stream.RecvMsg(m); err != nil { return err }")
Expand All @@ -296,50 +317,5 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method *
g.P("}")
g.P()

genSend := method.GetServerStreaming()
genSendAndClose := !method.GetServerStreaming()
genRecv := method.GetClientStreaming()

// Stream auxiliary types and methods.
g.P("type ", servName, "_", methName, "Server interface {")
if genSend {
g.P("Send(*", outType, ") error")
}
if genSendAndClose {
g.P("SendAndClose(*", outType, ") error")
}
if genRecv {
g.P("Recv() (*", inType, ", error)")
}
g.P(grpcPkg, ".ServerStream")
g.P("}")
g.P()

g.P("type ", streamType, " struct {")
g.P(grpcPkg, ".ServerStream")
g.P("}")
g.P()

if genSend {
g.P("func (x *", streamType, ") Send(m *", outType, ") error {")
g.P("return x.ServerStream.SendMsg(m)")
g.P("}")
g.P()
}
if genSendAndClose {
g.P("func (x *", streamType, ") SendAndClose(m *", outType, ") error {")
g.P("return x.ServerStream.SendMsg(m)")
g.P("}")
g.P()
}
if genRecv {
g.P("func (x *", streamType, ") Recv() (*", inType, ", error) {")
g.P("m := new(", inType, ")")
g.P("if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err }")
g.P("return m, nil")
g.P("}")
g.P()
}

return hname
}

0 comments on commit d8b2beb

Please sign in to comment.