diff --git a/grpc-dump/dump/dump_interceptor.go b/grpc-dump/dump/dump_interceptor.go index 228b47a..59a2bbb 100644 --- a/grpc-dump/dump/dump_interceptor.go +++ b/grpc-dump/dump/dump_interceptor.go @@ -8,7 +8,10 @@ import ( "github.com/bradleyjkemp/grpc-tools/internal" "github.com/bradleyjkemp/grpc-tools/internal/proto_decoder" - + "github.com/bradleyjkemp/grpc-tools/internal/proto_descriptor" + "github.com/golang/protobuf/jsonpb" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/dynamic" "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -16,6 +19,27 @@ import ( ) // dump interceptor implements a gRPC.StreamingServerInterceptor that dumps all RPC details + +type pbm struct { + *dynamic.Message +} + +func (p *pbm) MarshalJSON() ([]byte, error) { + fd := make([]*desc.FileDescriptor, 0) + proto_descriptor.MsgDesc.Lock() + defer proto_descriptor.MsgDesc.Unlock() + for _, d := range proto_descriptor.MsgDesc.Desc { + fd = append(fd, d.GetFile()) + } + return p.MarshalJSONPB( + &jsonpb.Marshaler{ + AnyResolver: dynamic.AnyResolver( + dynamic.NewMessageFactoryWithDefaults(), + fd..., + ), + }) +} + func dumpInterceptor(logger logrus.FieldLogger, output io.Writer, decoder proto_decoder.MessageDecoder) grpc.StreamServerInterceptor { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { dss := &recordedServerStream{ServerStream: ss} @@ -31,6 +55,8 @@ func dumpInterceptor(logger logrus.FieldLogger, output io.Writer, decoder proto_ fullMethod := strings.Split(info.FullMethod, "/") md, _ := metadata.FromIncomingContext(ss.Context()) + dss.Lock() + defer dss.Unlock() rpc := internal.RPC{ Service: fullMethod[1], Method: fullMethod[2], @@ -42,14 +68,17 @@ func dumpInterceptor(logger logrus.FieldLogger, output io.Writer, decoder proto_ } var err error - for _, message := range rpc.Messages { - message.Message, err = decoder.Decode(info.FullMethod, message) + for i := range rpc.Messages { + msg, err := decoder.Decode(info.FullMethod, rpc.Messages[i]) if err != nil { logger.WithError(err).Warn("Failed to decode message") } + rpc.Messages[i].Message = &pbm{msg} + } + dump, err := json.Marshal(rpc) + if err != nil { + logger.WithError(err).Fatal("Failed to marshal rpc") } - - dump, _ := json.Marshal(rpc) fmt.Fprintln(output, string(dump)) return rpcErr } diff --git a/internal/proto_decoder/unknown_message.go b/internal/proto_decoder/unknown_message.go index 905f825..a83cee4 100644 --- a/internal/proto_decoder/unknown_message.go +++ b/internal/proto_decoder/unknown_message.go @@ -2,6 +2,8 @@ package proto_decoder import ( "fmt" + "regexp" + "github.com/bradleyjkemp/grpc-tools/internal" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" @@ -9,7 +11,6 @@ import ( "github.com/jhump/protoreflect/desc/builder" "github.com/jhump/protoreflect/dynamic" "github.com/pkg/errors" - "regexp" ) // When we don't have an actual proto message descriptor, this takes a best effort @@ -110,6 +111,9 @@ func (u *unknownFieldResolver) enrichMessage(descriptor *builder.MessageBuilder, if err != nil { return errors.Wrap(err, "failed to convert nested message to dynamic") } + if dynamicNestedMessage == nil { + return nil + } err = u.enrichMessage(nestedMessageDescriptor, dynamicNestedMessage) if err != nil { diff --git a/internal/proto_descriptor/protos.go b/internal/proto_descriptor/protos.go index 1200334..39ed9d3 100644 --- a/internal/proto_descriptor/protos.go +++ b/internal/proto_descriptor/protos.go @@ -2,10 +2,12 @@ package proto_descriptor import ( "fmt" - "github.com/jhump/protoreflect/desc" - "github.com/jhump/protoreflect/desc/protoparse" "os" "path/filepath" + "sync" + + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/desc/protoparse" ) func LoadProtoDescriptors(descriptorPaths ...string) (map[string]*desc.MethodDescriptor, error) { @@ -21,6 +23,13 @@ func LoadProtoDescriptors(descriptorPaths ...string) (map[string]*desc.MethodDes return convertDescriptorsToMap(descriptors), nil } +type MessageDesc struct { + Desc map[string]*desc.MessageDescriptor + sync.Mutex +} + +var MsgDesc = MessageDesc{Desc: make(map[string]*desc.MessageDescriptor, 0)} + // recursively walks through all files in the given directories and // finds .proto files that contains service definitions func LoadProtoDirectories(roots ...string) (map[string]*desc.MethodDescriptor, error) { @@ -45,6 +54,14 @@ func LoadProtoDirectories(roots ...string) (map[string]*desc.MethodDescriptor, e fmt.Fprintf(os.Stderr, "Skipping %s due to parse error %s", path, err) return nil } + fileDescs, err := parser.ParseFiles(relpath) + for _, fileDesc := range fileDescs { + for _, mt := range fileDesc.GetMessageTypes() { + MsgDesc.Lock() + MsgDesc.Desc[mt.GetFullyQualifiedName()] = mt + MsgDesc.Unlock() + } + } if len(descs[0].Service) > 0 { // this file is interesting so fileDesc, err := parser.ParseFiles(relpath) @@ -53,6 +70,11 @@ func LoadProtoDirectories(roots ...string) (map[string]*desc.MethodDescriptor, e return nil } servicesFiles = append(servicesFiles, fileDesc[0]) + for _, smt := range fileDesc[0].GetMessageTypes() { + MsgDesc.Lock() + MsgDesc.Desc[smt.GetFullyQualifiedName()] = smt + MsgDesc.Unlock() + } } } return nil