Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3grpc: adds log interceptor unit tests #19448

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
}

chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
newLogUnaryInterceptor(s),
newLogUnaryInterceptor(s.Logger(), s.Cfg.WarningUnaryRequestDuration),
newUnaryInterceptor(s),
serverMetrics.UnaryServerInterceptor(),
}
Expand Down
80 changes: 47 additions & 33 deletions server/etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v3rpc

import (
"context"
"go.uber.org/zap/zapcore"
"sync"
"time"
"unicode/utf8"
Expand Down Expand Up @@ -75,13 +76,12 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
}
}

func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
func newLogUnaryInterceptor(lg *zap.Logger, warnLatency time.Duration) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
startTime := time.Now()
resp, err := handler(ctx, req)
lg := s.Logger()
if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive
defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp)
defer logUnaryRequestStats(ctx, lg, warnLatency, info, startTime, req, resp)
}
return resp, err
}
Expand Down Expand Up @@ -173,43 +173,57 @@ func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, warnLatency time.
respSize = -1
}

rs := requestStats{
startTime: startTime,
timeSpent: duration,
remote: remote,
responseType: responseType,
reqCount: reqCount,
reqSize: reqSize,
respCount: respCount,
respSize: respSize,
reqContent: reqContent,
}

if enabledDebugLevel {
logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
logGenericRequestStats(lg, rs)
} else if expensiveRequest {
logExpensiveRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
logExpensiveRequestStats(lg, rs)
}
}

func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string,
) {
lg.Debug("request stats",
zap.Time("start time", startTime),
zap.Duration("time spent", duration),
zap.String("remote", remote),
zap.String("response type", responseType),
zap.Int64("request count", reqCount),
zap.Int("request size", reqSize),
zap.Int64("response count", respCount),
zap.Int("response size", respSize),
zap.String("request content", reqContent),
)
type requestStats struct {
startTime time.Time
timeSpent time.Duration
remote string
responseType string
reqCount int64
reqSize int
respCount int64
respSize int
reqContent string
}

func (rs requestStats) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddTime("start time", rs.startTime)
enc.AddDuration("time spent", rs.timeSpent)
enc.AddString("remote", rs.remote)
enc.AddString("response type", rs.responseType)
enc.AddInt64("request count", rs.reqCount)
enc.AddInt("request size", rs.reqSize)
enc.AddInt64("response count", rs.respCount)
enc.AddInt("response size", rs.respSize)
enc.AddString("request content", rs.reqContent)

return nil
}

func logGenericRequestStats(lg *zap.Logger, rs requestStats) {
lg.Debug("request stats", zap.Inline(rs))
}

func logExpensiveRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string,
) {
lg.Warn("request stats",
zap.Time("start time", startTime),
zap.Duration("time spent", duration),
zap.String("remote", remote),
zap.String("response type", responseType),
zap.Int64("request count", reqCount),
zap.Int("request size", reqSize),
zap.Int64("response count", respCount),
zap.Int("response size", respSize),
zap.String("request content", reqContent),
)
func logExpensiveRequestStats(lg *zap.Logger, rs requestStats) {
lg.Warn("request stats", zap.Inline(rs))
}

func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
Expand Down
177 changes: 177 additions & 0 deletions server/etcdserver/api/v3rpc/interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package v3rpc

import (
"context"
"github.com/stretchr/testify/assert"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
"net"
"testing"
"time"
)

func buildMockUnaryHandler(t *testing.T, mockResp interface{}, handlerLatency time.Duration) grpc.UnaryHandler {
t.Helper()
return func(ctx context.Context, req interface{}) (interface{}, error) {
// Add latency to mock handler.
time.Sleep(handlerLatency)
return mockResp, nil
}
}

func TestLogUnaryInterceptor(t *testing.T) {
// Warn on request latency if > 5ms.
handlerWarnLatencyThreshold := time.Millisecond * 5

unaryServerInfo := &grpc.UnaryServerInfo{
FullMethod: "/foo/bar",
}

address := "10.0.0.1:37928"
addr, err := net.ResolveTCPAddr("tcp", address)
assert.NoError(t, err)
p := &peer.Peer{
Addr: addr,
}

testcases := []struct {
name string
req interface{}
resp interface{}
reqLatency time.Duration
debugLogLevel bool // debugLogLevel indicates whether log level is debug.
expectedRequestStats *requestStats
}{
{"sample transaction with successful compare",
&pb.TxnRequest{
Compare: []*pb.Compare{{
Key: []byte("/users/12345/email"),
Result: pb.Compare_EQUAL,
Target: pb.Compare_VALUE,
TargetUnion: &pb.Compare_Value{Value: []byte("old.address@johndoe.com")},
}},
Success: []*pb.RequestOp{{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("/users/12345/email"),
Value: []byte("new.address@johndoe.com"),
},
},
}},
},
&pb.TxnResponse{
Succeeded: true,
Responses: []*pb.ResponseOp{{Response: &pb.ResponseOp_ResponsePut{}}},
},
0, true,
&requestStats{
reqCount: 1,
reqSize: 47,
respCount: 0,
respSize: 4,
reqContent: "compare:<target:VALUE key:\"/users/12345/email\" value_size:23 > success:<request_put:<key:\"/users/12345/email\" value_size:23 >> failure:<>",
},
},
{"sample transaction with failed compare",
&pb.TxnRequest{
Compare: []*pb.Compare{{
Key: []byte("/users/12345/email"),
Result: pb.Compare_EQUAL,
Target: pb.Compare_VALUE,
TargetUnion: &pb.Compare_Value{Value: []byte("old.address@johndoe.com")},
}},
Failure: []*pb.RequestOp{{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("/users/12345/email"),
},
}},
},
},
&pb.TxnResponse{
Succeeded: false,
Responses: []*pb.ResponseOp{{Response: &pb.ResponseOp_ResponsePut{}}},
},
0, true,
&requestStats{
reqCount: 1,
reqSize: 22,
respCount: 0,
respSize: 2,
reqContent: "compare:<target:VALUE key:\"/users/12345/email\" value_size:23 > success:<> failure:<request_range:<key:\"/users/12345/email\" > >",
},
},
{"expensive request with debug logs disabled", &pb.RangeRequest{Key: []byte("fooKey")}, &pb.RangeResponse{Count: 2},
time.Millisecond * 10, false,
&requestStats{
reqCount: 0,
reqSize: 8,
respCount: 2,
respSize: 2,
reqContent: "key:\"fooKey\" ",
},
},
// Unrecognized response types result in -1 values.
{"default request stats -1", "fooRequest", "fooResponse", 0, true,
&requestStats{
reqCount: -1,
reqSize: -1,
respCount: -1,
respSize: -1,
reqContent: "",
}},
// Low-latency handler without debug-level logging enabled generates no request stat logs.
{"no debug or warn level logs", "fooRequest", "fooResponse", 0, false, nil},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
expensiveUnaryHandler := buildMockUnaryHandler(t, tc.resp, tc.reqLatency)
logLevel := zapcore.InfoLevel
if tc.debugLogLevel {
logLevel = zapcore.DebugLevel
}

observedZapCore, observedLogs := observer.New(logLevel)
observedLogger := zap.New(observedZapCore)

interceptor := newLogUnaryInterceptor(observedLogger, handlerWarnLatencyThreshold)

ctx := peer.NewContext(context.TODO(), p)
_, err := interceptor(ctx, tc.req, unaryServerInfo, expensiveUnaryHandler)
assert.NoError(t, err)

// Filter for request stats log messages.
rsLogs := observedLogs.FilterMessage("request stats")

// No request stats if log-level is not debug or warn latency threshold is not exceeded.
if !(tc.debugLogLevel || tc.reqLatency > handlerWarnLatencyThreshold) {
assert.Equal(t, 0, rsLogs.Len())
} else {
assert.Equal(t, 1, rsLogs.Len())
le := rsLogs.All()[0]
assert.Equal(t, 1, len(le.Context))
fld := le.Context[0]
rs, ok := fld.Interface.(requestStats)
assert.True(t, ok)
assert.Equal(t, tc.expectedRequestStats.reqCount, rs.reqCount)
assert.Equal(t, tc.expectedRequestStats.reqSize, rs.reqSize)
assert.Equal(t, tc.expectedRequestStats.respCount, rs.respCount)
assert.Equal(t, tc.expectedRequestStats.respSize, rs.respSize)
assert.Equal(t, tc.expectedRequestStats.reqContent, rs.reqContent)
assert.Equal(t, unaryServerInfo.FullMethod, rs.responseType)
// Check peer info read from context.
assert.Equal(t, address, rs.remote)
if tc.debugLogLevel {
assert.Equal(t, zapcore.DebugLevel, le.Entry.Level)
} else {
// Expensive request produce warn-level log if debug-level logs are disabled.
assert.Equal(t, zapcore.WarnLevel, le.Entry.Level)
}
}
})
}
}