Skip to content

Commit

Permalink
add yarpc middleware to capture grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
shivam-srivastava28 committed Jul 13, 2024
1 parent 8d71902 commit 3963a01
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 0 deletions.
3 changes: 3 additions & 0 deletions runtime/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,9 @@ func (gateway *Gateway) setupGRPCClientDispatcher(config *StaticConfig) error {
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: config.MustGetString("serviceName"),
Outbounds: outbounds,
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: NewCaptureOutboundMiddleware(),
},
Logging: yarpc.LoggingConfig{
Zap: gateway.Logger,
// TODO: set proper extractors
Expand Down
100 changes: 100 additions & 0 deletions runtime/yarpc_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package zanzibar

import (
"bytes"
"context"
"fmt"
"io"
"maps"

"go.uber.org/yarpc/api/middleware"
"go.uber.org/yarpc/api/transport"
)

const _tracingKeyPrefix = "$tracing$"
const tracingKeyMappingSize = 100

// NewCaptureOutboundMiddleware captures outbound rpc calls
func NewCaptureOutboundMiddleware() middleware.UnaryOutbound {
return &captureOutboundMiddleware{}
}

type captureOutboundMiddleware struct {
}

func (m *captureOutboundMiddleware) Call(
ctx context.Context,
req *transport.Request,
next transport.UnaryOutbound,
) (*transport.Response, error) {
captureEnabled := isCaptureEnabled(ctx, req)
var event *GRPCOutgoingEvent
var err error

if req != nil && captureEnabled {
event, err = prepareRequest(req)
if err != nil || event == nil {
captureEnabled = false
}
}

// call next middleware
resp, clientErr := next.Call(ctx, req)

// if capture at this request is still enabled process response and store it in receiveInteraction
if captureEnabled && resp != nil && event != nil {
err = prepareResponse(req, resp, event)
if err != nil {
return resp, clientErr
}
if ec := GetEventContainer(ctx); ec != nil {
ec.Events = append(ec.Events, event)
}
}
return resp, clientErr
}

func isCaptureEnabled(ctx context.Context, req *transport.Request) bool {
if GetToCapture(ctx) && req != nil && req.Encoding == "grpc" {
return true
}
return false
}

func prepareRequest(request *transport.Request) (*GRPCOutgoingEvent, error) {
if request.Body == nil {
return nil, fmt.Errorf("req.Body is nil for %s::%s", request.Service, request.Procedure)
}
bodyBytes, err := io.ReadAll(request.Body)
if err != nil {
return nil, err
}
request.Body = io.NopCloser(bytes.NewReader(bodyBytes))
clonedHeaders := maps.Clone(request.Headers.OriginalItems())
return &GRPCOutgoingEvent{
ServiceName: request.Service,
MethodName: request.Procedure,
ReqHeaders: clonedHeaders,
Req: bodyBytes,
}, nil
}

func prepareResponse(req *transport.Request, resp *transport.Response, event *GRPCOutgoingEvent) error {
if resp.Body == nil {
return fmt.Errorf("resp.Body is nil for %s::%s", req.Service, req.Procedure)
}
responseBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
// close body before swapping reader
err = resp.Body.Close()
if err != nil {
return err
}
resp.Body = io.NopCloser(bytes.NewReader(responseBytes))
event.Rsp = responseBytes
event.RspHeaders = maps.Clone(resp.Headers.Items())
event.Success = !resp.ApplicationError
return nil
}

0 comments on commit 3963a01

Please sign in to comment.