Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #96] this is a PoC to solve Issue #96. #97

Merged
merged 5 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions grpc-dump/dump/dump_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,38 @@ import (

"github.com/bradleyjkemp/grpc-tools/internal"
"github.com/bradleyjkemp/grpc-tools/internal/proto_decoder"

"github.com/bradleyjkemp/grpc-tools/internal/proto_descriptor"
"github.com/golang/protobuf/jsonpb"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// dump interceptor implements a gRPC.StreamingServerInterceptor that dumps all RPC details

type pbm struct {
*dynamic.Message
}

func (p *pbm) MarshalJSON() ([]byte, error) {
fd := make([]*desc.FileDescriptor, 0)
proto_descriptor.MsgDesc.Lock()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As proto_descriptor.MsgDesc is only going to be mutated during startup (i.e. when loading proto files), I wonder if this step of converting to creating a new dynamic.AnyResolver needs to be done every time?

Should (/could) this just be done once during proto file loading?

Copy link
Contributor Author

@marcellanz marcellanz Jun 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Beside that, and as I mentioned in #96,

is probably not clean in the general way grpc-tools is designed

proto_descriptor.MsgDesc is a global variable and I'm not sure if this is the right way to bring these message describtors into the dump interceptor. It was the fastest way to have them be available, but should be probably done in a cleaner way. Wdyt?

defer proto_descriptor.MsgDesc.Unlock()
for _, d := range proto_descriptor.MsgDesc.Desc {
fd = append(fd, d.GetFile())
}
return p.MarshalJSONPB(
&jsonpb.Marshaler{
AnyResolver: dynamic.AnyResolver(
dynamic.NewMessageFactoryWithDefaults(),
fd...,
),
})
}

func dumpInterceptor(logger logrus.FieldLogger, output io.Writer, decoder proto_decoder.MessageDecoder) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
dss := &recordedServerStream{ServerStream: ss}
Expand All @@ -31,6 +55,8 @@ func dumpInterceptor(logger logrus.FieldLogger, output io.Writer, decoder proto_

fullMethod := strings.Split(info.FullMethod, "/")
md, _ := metadata.FromIncomingContext(ss.Context())
dss.Lock()
marcellanz marked this conversation as resolved.
Show resolved Hide resolved
defer dss.Unlock()
rpc := internal.RPC{
Service: fullMethod[1],
Method: fullMethod[2],
Expand All @@ -42,15 +68,19 @@ func dumpInterceptor(logger logrus.FieldLogger, output io.Writer, decoder proto_
}

var err error
for _, message := range rpc.Messages {
message.Message, err = decoder.Decode(info.FullMethod, message)
for i := range rpc.Messages {
msg, err := decoder.Decode(info.FullMethod, rpc.Messages[i])
if err != nil {
logger.WithError(err).Warn("Failed to decode message")
}
rpc.Messages[i].Message = &pbm{msg}
}
dump, err := json.Marshal(rpc)
if err != nil {
logger.WithError(err).Fatal("Failed to marshal rpc")
}

dump, _ := json.Marshal(rpc)
fmt.Fprintln(output, string(dump))
dss.Unlock()
marcellanz marked this conversation as resolved.
Show resolved Hide resolved
marcellanz marked this conversation as resolved.
Show resolved Hide resolved
return rpcErr
}
}
6 changes: 5 additions & 1 deletion internal/proto_decoder/unknown_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package proto_decoder

import (
"fmt"
"regexp"

"github.com/bradleyjkemp/grpc-tools/internal"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/builder"
"github.com/jhump/protoreflect/dynamic"
"github.com/pkg/errors"
"regexp"
)

// When we don't have an actual proto message descriptor, this takes a best effort
Expand Down Expand Up @@ -110,6 +111,9 @@ func (u *unknownFieldResolver) enrichMessage(descriptor *builder.MessageBuilder,
if err != nil {
return errors.Wrap(err, "failed to convert nested message to dynamic")
}
if dynamicNestedMessage == nil {
return nil
}

err = u.enrichMessage(nestedMessageDescriptor, dynamicNestedMessage)
if err != nil {
Expand Down
26 changes: 24 additions & 2 deletions internal/proto_descriptor/protos.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package proto_descriptor

import (
"fmt"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
"os"
"path/filepath"
"sync"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
)

func LoadProtoDescriptors(descriptorPaths ...string) (map[string]*desc.MethodDescriptor, error) {
Expand All @@ -21,6 +23,13 @@ func LoadProtoDescriptors(descriptorPaths ...string) (map[string]*desc.MethodDes
return convertDescriptorsToMap(descriptors), nil
}

type MessageDesc struct {
Desc map[string]*desc.MessageDescriptor
sync.Mutex
}

var MsgDesc = MessageDesc{Desc: make(map[string]*desc.MessageDescriptor, 0)}

// recursively walks through all files in the given directories and
// finds .proto files that contains service definitions
func LoadProtoDirectories(roots ...string) (map[string]*desc.MethodDescriptor, error) {
Expand All @@ -45,6 +54,14 @@ func LoadProtoDirectories(roots ...string) (map[string]*desc.MethodDescriptor, e
fmt.Fprintf(os.Stderr, "Skipping %s due to parse error %s", path, err)
return nil
}
fileDescs, err := parser.ParseFiles(relpath)
for _, fileDesc := range fileDescs {
for _, mt := range fileDesc.GetMessageTypes() {
MsgDesc.Lock()
MsgDesc.Desc[mt.GetFullyQualifiedName()] = mt
MsgDesc.Unlock()
}
}
if len(descs[0].Service) > 0 {
// this file is interesting so
fileDesc, err := parser.ParseFiles(relpath)
Expand All @@ -53,6 +70,11 @@ func LoadProtoDirectories(roots ...string) (map[string]*desc.MethodDescriptor, e
return nil
}
servicesFiles = append(servicesFiles, fileDesc[0])
for _, smt := range fileDesc[0].GetMessageTypes() {
MsgDesc.Lock()
MsgDesc.Desc[smt.GetFullyQualifiedName()] = smt
MsgDesc.Unlock()
}
}
}
return nil
Expand Down