Skip to content

Commit

Permalink
status: fix/improve status handling (#6662)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Oct 2, 2023
1 parent 1466283 commit 0772ed7
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 44 deletions.
38 changes: 37 additions & 1 deletion internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,45 @@ type Status struct {
s *spb.Status
}

// NewWithProto returns a new status including details from statusProto. This
// is meant to be used by the gRPC library only.
func NewWithProto(code codes.Code, message string, statusProto []string) *Status {
if len(statusProto) != 1 {
// No grpc-status-details bin header, or multiple; just ignore.
return &Status{s: &spb.Status{Code: normalizeCode(code), Message: message}}
}
st := &spb.Status{}
if err := proto.Unmarshal([]byte(statusProto[0]), st); err != nil {
// Probably not a google.rpc.Status proto; do not provide details.
return &Status{s: &spb.Status{Code: normalizeCode(code), Message: message}}
}
if st.Code == int32(code) {
// The codes match between the grpc-status header and the
// grpc-status-details-bin header; use the full details proto.
st.Code = normalizeCode(codes.Code(st.Code))
return &Status{s: st}
}
return &Status{
s: &spb.Status{
Code: int32(codes.Internal),
Message: fmt.Sprintf(
"grpc-status-details-bin mismatch: grpc-status=%v, grpc-message=%q, grpc-status-details-bin=%+v",
code, message, st,
),
},
}
}

func normalizeCode(c codes.Code) int32 {
if c > 16 {
return int32(codes.Unknown)
}
return int32(c)
}

// New returns a Status representing c and msg.
func New(c codes.Code, msg string) *Status {
return &Status{s: &spb.Status{Code: int32(c), Message: msg}}
return &Status{s: &spb.Status{Code: normalizeCode(c), Message: msg}}
}

// Newf returns New(c, fmt.Sprintf(format, a...)).
Expand Down
55 changes: 45 additions & 10 deletions internal/stubserver/stubserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -110,8 +111,7 @@ func RegisterServiceServerOption(f func(*grpc.Server)) grpc.ServerOption {
return &registerServiceServerOption{f: f}
}

// StartServer only starts the server. It does not create a client to it.
func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
func (ss *StubServer) setupServer(sopts ...grpc.ServerOption) (net.Listener, error) {
if ss.Network == "" {
ss.Network = "tcp"
}
Expand All @@ -127,24 +127,59 @@ func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
var err error
lis, err = net.Listen(ss.Network, ss.Address)
if err != nil {
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
return nil, fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
}
}
ss.Address = lis.Addr().String()
ss.cleanups = append(ss.cleanups, func() { lis.Close() })

s := grpc.NewServer(sopts...)
ss.S = grpc.NewServer(sopts...)
for _, so := range sopts {
switch x := so.(type) {
case *registerServiceServerOption:
x.f(s)
x.f(ss.S)
}
}

testgrpc.RegisterTestServiceServer(ss.S, ss)
ss.cleanups = append(ss.cleanups, ss.S.Stop)
return lis, nil
}

// StartHandlerServer only starts an HTTP server with a gRPC server as the
// handler. It does not create a client to it. Cannot be used in a StubServer
// that also used StartServer.
func (ss *StubServer) StartHandlerServer(sopts ...grpc.ServerOption) error {
lis, err := ss.setupServer(sopts...)
if err != nil {
return err
}

go func() {
hs := &http2.Server{}
opts := &http2.ServeConnOpts{Handler: ss.S}
for {
conn, err := lis.Accept()
if err != nil {
return
}
hs.ServeConn(conn, opts)
}
}()
ss.cleanups = append(ss.cleanups, func() { lis.Close() })

return nil
}

// StartServer only starts the server. It does not create a client to it.
// Cannot be used in a StubServer that also used StartHandlerServer.
func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
lis, err := ss.setupServer(sopts...)
if err != nil {
return err
}

testgrpc.RegisterTestServiceServer(s, ss)
go s.Serve(lis)
ss.cleanups = append(ss.cleanups, s.Stop)
ss.S = s
go ss.S.Serve(lis)

return nil
}

Expand Down
11 changes: 7 additions & 4 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,20 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
h.Set("Grpc-Message", encodeGrpcMessage(m))
}

s.hdrMu.Lock()
if p := st.Proto(); p != nil && len(p.Details) > 0 {
delete(s.trailer, grpcStatusDetailsBinHeader)
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
panic(err)
}

h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
h.Set(grpcStatusDetailsBinHeader, encodeBinHeader(stBytes))
}

if md := s.Trailer(); len(md) > 0 {
for k, vv := range md {
if len(s.trailer) > 0 {
for k, vv := range s.trailer {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
if isReservedHeader(k) {
continue
Expand All @@ -243,6 +245,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
}
}
s.hdrMu.Unlock()
})

if err == nil { // transport has not been closed
Expand Down Expand Up @@ -287,7 +290,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
}

// writeCustomHeaders sets custom headers set on the stream via SetHeader
// on the first write call (Write, WriteHeader, or WriteStatus).
// on the first write call (Write, WriteHeader, or WriteStatus)
func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
h := ht.rw.Header()

Expand Down
13 changes: 2 additions & 11 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
mdata = make(map[string][]string)
contentTypeErr = "malformed header: missing HTTP content-type"
grpcMessage string
statusGen *status.Status
recvCompress string
httpStatusCode *int
httpStatusErr string
Expand Down Expand Up @@ -1434,12 +1433,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
rawStatusCode = codes.Code(uint32(code))
case "grpc-message":
grpcMessage = decodeGrpcMessage(hf.Value)
case "grpc-status-details-bin":
var err error
statusGen, err = decodeGRPCStatusDetails(hf.Value)
if err != nil {
headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
}
case ":status":
if hf.Value == "200" {
httpStatusErr = ""
Expand Down Expand Up @@ -1548,14 +1541,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

if statusGen == nil {
statusGen = status.New(rawStatusCode, grpcMessage)
}
status := istatus.NewWithProto(rawStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])

// If client received END_STREAM from server while stream was still active,
// send RST_STREAM.
rstStream := s.getState() == streamActive
t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, statusGen, mdata, true)
t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, status, mdata, true)
}

// readServerPreface reads and handles the initial settings frame from the
Expand Down
5 changes: 4 additions & 1 deletion internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,12 +1057,15 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})

if p := st.Proto(); p != nil && len(p.Details) > 0 {
// Do not use the user's grpc-status-details-bin (if present) if we are
// even attempting to set our own.
delete(s.trailer, grpcStatusDetailsBinHeader)
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
} else {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
}
}

Expand Down
18 changes: 2 additions & 16 deletions internal/transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ import (
"time"
"unicode/utf8"

"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -88,6 +85,8 @@ var (
}
)

var grpcStatusDetailsBinHeader = "grpc-status-details-bin"

// isReservedHeader checks whether hdr belongs to HTTP2 headers
// reserved by gRPC protocol. Any other headers are classified as the
// user-specified metadata.
Expand All @@ -103,7 +102,6 @@ func isReservedHeader(hdr string) bool {
"grpc-message",
"grpc-status",
"grpc-timeout",
"grpc-status-details-bin",
// Intentionally exclude grpc-previous-rpc-attempts and
// grpc-retry-pushback-ms, which are "reserved", but their API
// intentionally works via metadata.
Expand Down Expand Up @@ -154,18 +152,6 @@ func decodeMetadataHeader(k, v string) (string, error) {
return v, nil
}

func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) {
v, err := decodeBinHeader(rawDetails)
if err != nil {
return nil, err
}
st := &spb.Status{}
if err = proto.Unmarshal(v, st); err != nil {
return nil, err
}
return status.FromProto(st), nil
}

type timeoutUnit uint8

const (
Expand Down
Loading

0 comments on commit 0772ed7

Please sign in to comment.