Skip to content

Commit

Permalink
Refactor unknown message decoding (#30)
Browse files Browse the repository at this point in the history
* Initial unknown resolver refactor

* Finish moving all decoders

* Add test coverage for proto file parsing
  • Loading branch information
bradleyjkemp authored Jul 2, 2019
1 parent 94c7a06 commit 1b97c5b
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 68 deletions.
41 changes: 4 additions & 37 deletions grpc-dump/dump_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@ import (
"encoding/json"
"fmt"
"github.com/bradleyjkemp/grpc-tools/internal"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/bradleyjkemp/grpc-tools/internal/proto_decoder"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"os"
"strings"
)

// dump interceptor implements a gRPC.StreamingServerInterceptor that dumps all RPC details
func dumpInterceptor(knownMethods map[string]*desc.MethodDescriptor) grpc.StreamServerInterceptor {
func dumpInterceptor(decoder proto_decoder.MessageDecoder) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
dss := &recordedServerStream{ServerStream: ss}
err := handler(srv, dss)
Expand All @@ -39,38 +35,9 @@ func dumpInterceptor(knownMethods map[string]*desc.MethodDescriptor) grpc.Stream
Metadata: md,
}

knownMethod := knownMethods[info.FullMethod]
for _, message := range rpc.Messages {
dyn, _ := dynamic.AsDynamicMessage(&empty.Empty{})
if knownMethod != nil {
// have proper type information so e.g. can have field names in the text representation
switch message.MessageOrigin {
case internal.ClientMessage:
dyn = dynamic.NewMessage(knownMethod.GetInputType())
case internal.ServerMessage:
dyn = dynamic.NewMessage(knownMethod.GetOutputType())
}
}
err = proto.Unmarshal(message.RawMessage, dyn)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to unmarshal message: %v\n", err)
}

if knownMethod == nil {
unknownMessage, err := generateDescriptorForUnknownMessage(dyn).Build()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to decode unknown message: %v\n", err)
continue
}
dyn = dynamic.NewMessage(unknownMessage)
// now unmarshal again using the new generated message type
err = proto.Unmarshal(message.RawMessage, dyn)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to unmarshal message: %v\n", err)
continue
}
}
message.Message = dyn
message.Message, err = decoder.Decode(info.FullMethod, message.MessageOrigin, message.RawMessage)
// TODO: log warning if error occurs here
}

dump, _ := json.Marshal(rpc)
Expand Down
21 changes: 9 additions & 12 deletions grpc-dump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import (
"flag"
"fmt"
"github.com/bradleyjkemp/grpc-tools/grpc-proxy"
"github.com/bradleyjkemp/grpc-tools/internal/proto_descriptor"
"github.com/bradleyjkemp/grpc-tools/internal/proto_decoder"
_ "github.com/bradleyjkemp/grpc-tools/internal/versionflag"
"github.com/jhump/protoreflect/desc"
"os"
"strings"
)
Expand All @@ -28,28 +27,26 @@ func main() {
}

func run() error {
var knownMethods map[string]*desc.MethodDescriptor
var resolvers []proto_decoder.MessageResolver
if *protoRoots != "" {
descs, err := proto_descriptor.LoadProtoDirectories(strings.Split(*protoRoots, ",")...)
r, err := proto_decoder.NewFileResolver(strings.Split(*protoRoots, ",")...)
if err != nil {
return err
} else {
fmt.Fprintln(os.Stderr, "Loaded", len(descs), "service descriptors")
knownMethods = descs
}
resolvers = append(resolvers, r)
}
if *protoDescriptors != "" {
descs, err := proto_descriptor.LoadProtoDescriptors(strings.Split(*protoDescriptors, ",")...)
r, err := proto_decoder.NewDescriptorResolver(strings.Split(*protoRoots, ",")...)
if err != nil {
return err
} else {
fmt.Fprintln(os.Stderr, "Loaded", len(descs), "service descriptors")
knownMethods = descs
}
resolvers = append(resolvers, r)
}
// Always use the unknown message resolver
resolvers = append(resolvers, proto_decoder.NewUnknownResolver())

proxy, err := grpc_proxy.New(
grpc_proxy.WithInterceptor(dumpInterceptor(knownMethods)),
grpc_proxy.WithInterceptor(dumpInterceptor(proto_decoder.NewDecoder(resolvers...))),
grpc_proxy.DefaultFlags(),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion integration_test/test-golden.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"service":"bradleyjkemp.github.io.TestService","method":"TestUnaryClientRequest","messages":[{"message_origin":"client","raw_message":"ChEaDUNsaWVudFJlcXVlc3QgARAB","message":{"1":{"3":"ClientRequest","4":1},"2":1},"timestamp":"2019-06-24T19:19:46.644943+01:00"},{"message_origin":"server","raw_message":"ChIaDlNlcnZlclJlc3BvbnNlIAIQAg==","message":{"1":{"3":"ServerResponse","4":2},"2":2},"timestamp":"2019-06-24T19:19:46.644943+01:00"}],"metadata":{":authority":["bradleyjkemp.github.io:444"],"content-type":["application/grpc"],"user-agent":["grpc-go/1.21.1"],"via":["HTTP/2.0 127.0.0.1:16354"]}}
{"service":"bradleyjkemp.github.io.TestService","method":"TestUnaryClientRequest","messages":[{"message_origin":"client","raw_message":"ChEaDUNsaWVudFJlcXVlc3QgARAB","message":{"outerValue":{"innerValue":"ClientRequest","innerNum":1},"outerNum":1},"timestamp":"2019-06-24T19:19:46.644943+01:00"},{"message_origin":"server","raw_message":"ChIaDlNlcnZlclJlc3BvbnNlIAIQAg==","message":{"outerValue":{"innerValue":"ServerResponse","innerNum":2},"outerNum":2},"timestamp":"2019-06-24T19:19:46.644943+01:00"}],"metadata":{":authority":["bradleyjkemp.github.io:444"],"content-type":["application/grpc"],"user-agent":["grpc-go/1.21.1"],"via":["HTTP/2.0 127.0.0.1:16354"]}}
{"service":"bradleyjkemp.github.io.TestService","method":"TestStreamingServerMessages","messages":[{"message_origin":"server","raw_message":"ChIaDlNlcnZlck1lc3NhZ2UxIAMQAw==","message":{"1":{"3":"ServerMessage1","4":3},"2":3},"timestamp":"2019-06-24T19:19:46.644943+01:00"},{"message_origin":"server","raw_message":"ChIaDlNlcnZlck1lc3NhZ2UyIAQQBA==","message":{"1":{"3":"ServerMessage2","4":4},"2":4},"timestamp":"2019-06-24T19:19:46.644943+01:00"}],"metadata":{":authority":["a-different-domain.github.io:444"],"content-type":["application/grpc"],"forwarded":["proto=https"],"user-agent":["grpc-go/1.21.1"],"via":["HTTP/2.0 127.0.0.1:16354"]}}
{"service":"grpc.gateway.testing.EchoService","method":"Echo","messages":[{"message_origin":"server","raw_message":"ChIaDlNlcnZlck1lc3NhZ2UxIAMQAw==","message":{"1":{"3":"ServerMessage1","4":3},"2":3},"timestamp":"2019-06-24T19:19:46.644943+01:00"},{"message_origin":"server","raw_message":"ChIaDlNlcnZlck1lc3NhZ2UxIAMQAw==","message":{"1":{"3":"ServerMessage1","4":3},"2":3},"timestamp":"2019-06-24T19:19:46.644943+01:00"}],"metadata":{":authority":["grpc-web.github.io"],"accept":["*/*"],"accept-encoding":["gzip, deflate, br"],"accept-language":["en-US,en;q=0.9"],"cache-control":["no-cache"],"content-type":["application/grpc+proto"],"custom-header-1":["value1"],"origin":["http://localhost:8081"],"pragma":["no-cache"],"referer":["http://localhost:8081/echotest.html"],"user-agent":["Mozilla/5.0"],"via":["HTTP/2.0 127.0.0.1:16354"],"x-grpc-web":["1"],"x-user-agent":["grpc-web-javascript/0.1"]}}
{"service":"grpc.gateway.testing.EchoService","method":"Echo","messages":[{"message_origin":"server","raw_message":"ChIaDlNlcnZlck1lc3NhZ2UxIAMQAw==","message":{"1":{"3":"ServerMessage1","4":3},"2":3},"timestamp":"2019-06-24T19:19:46.644943+01:00"},{"message_origin":"server","raw_message":"ChIaDlNlcnZlck1lc3NhZ2UxIAMQAw==","message":{"1":{"3":"ServerMessage1","4":3},"2":3},"timestamp":"2019-06-24T19:19:46.644943+01:00"}],"metadata":{":authority":["grpc-web.github.io:1234"],"accept":["*/*"],"accept-encoding":["gzip, deflate, br"],"accept-language":["en-US,en;q=0.9"],"cache-control":["no-cache"],"content-type":["application/grpc+proto"],"custom-header-1":["value1"],"forwarded":["proto=https"],"origin":["http://localhost:8081"],"pragma":["no-cache"],"referer":["http://localhost:8081/echotest.html"],"user-agent":["Mozilla/5.0"],"via":["HTTP/2.0 127.0.0.1:16354"],"x-grpc-web":["1"],"x-user-agent":["grpc-web-javascript/0.1"]}}
6 changes: 5 additions & 1 deletion integration_test/test.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
syntax = "proto3";
package main;
package bradleyjkemp.github.io;

message Outer {
Inner outer_value = 1;
Expand All @@ -10,3 +10,7 @@ message Inner {
string inner_value = 3;
int64 inner_num = 4;
}

service TestService {
rpc TestUnaryClientRequest(Outer) returns (Outer) {};
}
1 change: 1 addition & 0 deletions integration_test/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fixturePID=$!
# grpc-dump will dump the requests and responses to the fixture
http_proxy=http://localhost:16353 ./grpc-dump \
--port=16354 \
--proto_roots=. \
--cert="${certFile}" \
--key="${keyFile}" > test-result.json &
dumpPID=$!
Expand Down
47 changes: 47 additions & 0 deletions internal/proto_decoder/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package proto_decoder

import (
"github.com/bradleyjkemp/grpc-tools/internal"
"github.com/golang/protobuf/proto"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
)

type MessageResolver interface {
resolve(fullMethod string, direction internal.MessageOrigin, raw []byte) (*desc.MessageDescriptor, error)
}

type MessageDecoder interface {
Decode(fullMethod string, direction internal.MessageOrigin, raw []byte) (*dynamic.Message, error)
}

type messageDecoder struct {
resolvers []MessageResolver
}

// Chain together a number of resolvers to decode incoming messages.
// Resolvers are in priority order, the first to return a nil error
// is used to decode the message.
func NewDecoder(resolvers ...MessageResolver) *messageDecoder {
return &messageDecoder{
resolvers: resolvers,
}
}

func (d *messageDecoder) Decode(fullMethod string, direction internal.MessageOrigin, raw []byte) (*dynamic.Message, error) {
var err error
for _, resolver := range d.resolvers {
var descriptor *desc.MessageDescriptor
descriptor, err = resolver.resolve(fullMethod, direction, raw)
if err != nil {
continue
}
dyn := dynamic.NewMessage(descriptor)
// now unmarshal again using the new generated message type
err = proto.Unmarshal(raw, dyn)
if err == nil {
return dyn, nil
}
}
return nil, err
}
46 changes: 46 additions & 0 deletions internal/proto_decoder/proto_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package proto_decoder

import (
"fmt"
"github.com/bradleyjkemp/grpc-tools/internal"
"github.com/bradleyjkemp/grpc-tools/internal/proto_descriptor"
"github.com/jhump/protoreflect/desc"
)

type descriptorResolver struct {
methodDescriptors map[string]*desc.MethodDescriptor
}

func (d *descriptorResolver) resolve(fullMethod string, direction internal.MessageOrigin, raw []byte) (*desc.MessageDescriptor, error) {
if descriptor, ok := d.methodDescriptors[fullMethod]; ok {
switch direction {
case internal.ClientMessage:
return descriptor.GetInputType(), nil
case internal.ServerMessage:
return descriptor.GetOutputType(), nil
}
}
return nil, fmt.Errorf("method not known")
}

func NewFileResolver(protoFileRoots ...string) (*descriptorResolver, error) {
descs, err := proto_descriptor.LoadProtoDirectories(protoFileRoots...)
if err != nil {
return nil, err
}

return &descriptorResolver{
descs,
}, nil
}

func NewDescriptorResolver(protoFileDescriptors ...string) (*descriptorResolver, error) {
descs, err := proto_descriptor.LoadProtoDescriptors(protoFileDescriptors...)
if err != nil {
return nil, err
}

return &descriptorResolver{
descs,
}, nil
}
Original file line number Diff line number Diff line change
@@ -1,46 +1,63 @@
package main
package proto_decoder

import (
"fmt"
"github.com/bradleyjkemp/grpc-tools/internal"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/builder"
"github.com/jhump/protoreflect/dynamic"
"regexp"
"sync"
"sync/atomic"
)

// When we don't have an actual proto message descriptor, this takes a best effort
// approach to generating one. It's definitely not perfect but is more useful than nothing.

type unknownMessageResolver struct {
messageCounter int64 // must only be accessed atomically
}

func NewUnknownResolver() *unknownMessageResolver {
return &unknownMessageResolver{
messageCounter: 0,
}
}

func (u *unknownMessageResolver) resolve(fullMethod string, direction internal.MessageOrigin, raw []byte) (*desc.MessageDescriptor, error) {
dyn, _ := dynamic.AsDynamicMessage(&empty.Empty{})
err := proto.Unmarshal(raw, dyn)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal bytes: %v", err)
}

return u.generateDescriptorForUnknownMessage(dyn).Build()
}

var (
asciiPattern = regexp.MustCompile(`^[ -~]*$`)
messageCounter = 0
messageCounterMutex = sync.Mutex{}
asciiPattern = regexp.MustCompile(`^[ -~]*$`)
)

// All messages must have unique names, this function generates them
func getNewMessageName() string {
messageCounterMutex.Lock()
defer messageCounterMutex.Unlock()
messageCounter++
return fmt.Sprintf("unknown_message_%d", messageCounter)
func (u *unknownMessageResolver) getNewMessageName() string {
return fmt.Sprintf("unknown_message_%d", atomic.AddInt64(&u.messageCounter, 1))
}

// This takes a dynamic.Message of unknown type and returns
// a message descriptor which will mean all fields are included
// in the JSON output.
// TODO: this would probably be better implemented within github.com/jhump/protoreflect
func generateDescriptorForUnknownMessage(message *dynamic.Message) *builder.MessageBuilder {
func (u *unknownMessageResolver) generateDescriptorForUnknownMessage(message *dynamic.Message) *builder.MessageBuilder {
fields := map[int32][]dynamic.UnknownField{}
for _, unknownFieldNum := range message.GetUnknownFields() {
fields[unknownFieldNum] = message.GetUnknownField(unknownFieldNum)
}
return makeDescriptorForFields(fields)
return u.makeDescriptorForFields(fields)
}

func makeDescriptorForFields(fields map[int32][]dynamic.UnknownField) *builder.MessageBuilder {
msg := builder.NewMessage(getNewMessageName())
func (u *unknownMessageResolver) makeDescriptorForFields(fields map[int32][]dynamic.UnknownField) *builder.MessageBuilder {
msg := builder.NewMessage(u.getNewMessageName())
for fieldNum, instances := range fields {
var fieldType *builder.FieldType
// TODO: look at all instances and merge the discovered fields together
Expand All @@ -49,7 +66,7 @@ func makeDescriptorForFields(fields map[int32][]dynamic.UnknownField) *builder.M
switch instances[0].Encoding {
// TODO: handle all wire types
case proto.WireBytes:
fieldType = handleWireBytes(instances[0])
fieldType = u.handleWireBytes(instances[0])
default:
// Fixed precision number
fieldType = builder.FieldTypeFixed64()
Expand All @@ -65,7 +82,7 @@ func makeDescriptorForFields(fields map[int32][]dynamic.UnknownField) *builder.M
return msg
}

func handleWireBytes(instance dynamic.UnknownField) *builder.FieldType {
func (u *unknownMessageResolver) handleWireBytes(instance dynamic.UnknownField) *builder.FieldType {
if asciiPattern.Match(instance.Contents) {
// highly unlikely that an entirely ASCII string is actually an embedded proto message
// TODO: make this heuristic cleverer
Expand All @@ -82,5 +99,5 @@ func handleWireBytes(instance dynamic.UnknownField) *builder.FieldType {
// looks like it wasn't a valid proto message
return builder.FieldTypeString()
}
return builder.FieldTypeMessage(generateDescriptorForUnknownMessage(dyn))
return builder.FieldTypeMessage(u.generateDescriptorForUnknownMessage(dyn))
}

0 comments on commit 1b97c5b

Please sign in to comment.