From 14c1a276a6e5b4f3a4086891f5584668923b4130 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Thu, 8 Mar 2018 16:06:34 +0000 Subject: [PATCH 1/2] add begin time to stats.End struct --- server.go | 12 ++++++++---- stats/stats.go | 2 ++ stream.go | 16 ++++++++++------ 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/server.go b/server.go index 0f7ff5d6022a..c55d6ab2a818 100644 --- a/server.go +++ b/server.go @@ -777,13 +777,15 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { sh := s.opts.statsHandler if sh != nil { + beginTime := time.Now() begin := &stats.Begin{ - BeginTime: time.Now(), + BeginTime: beginTime, } sh.HandleRPC(stream.Context(), begin) defer func() { end := &stats.End{ - EndTime: time.Now(), + BeginTime: beginTime, + EndTime: time.Now(), } if err != nil && err != io.EOF { end.Error = toRPCErr(err) @@ -977,13 +979,15 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { sh := s.opts.statsHandler if sh != nil { + beginTime := time.Now() begin := &stats.Begin{ - BeginTime: time.Now(), + BeginTime: beginTime, } sh.HandleRPC(stream.Context(), begin) defer func() { end := &stats.End{ - EndTime: time.Now(), + BeginTime: beginTime, + EndTime: time.Now(), } if err != nil && err != io.EOF { end.Error = toRPCErr(err) diff --git a/stats/stats.go b/stats/stats.go index d5aa2f793bf3..3f13190a0ac6 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -169,6 +169,8 @@ func (s *OutTrailer) isRPCStats() {} type End struct { // Client is true if this End is from client side. Client bool + // BeginTime is the time when the RPC began. + BeginTime time.Time // EndTime is the time when the RPC ends. EndTime time.Time // Error is the error the RPC ended with. It is an error generated from diff --git a/stream.go b/stream.go index deb735927288..d7793008cafd 100644 --- a/stream.go +++ b/stream.go @@ -204,9 +204,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth sh := cc.dopts.copts.StatsHandler if sh != nil { ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) + beginTime := time.Now() begin := &stats.Begin{ Client: true, - BeginTime: time.Now(), + BeginTime: beginTime, FailFast: c.failFast, } sh.HandleRPC(ctx, begin) @@ -214,8 +215,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if err != nil { // Only handle end stats if err != nil. end := &stats.End{ - Client: true, - Error: err, + Client: true, + Error: err, + BeginTime: beginTime, + EndTime: time.Now(), } sh.HandleRPC(ctx, end) } @@ -509,9 +512,10 @@ func (cs *clientStream) finish(err error) { } if cs.statsHandler != nil { end := &stats.End{ - Client: true, - EndTime: time.Now(), - Error: err, + Client: true, + BeginTime: time.Now(), + EndTime: time.Now(), + Error: err, } cs.statsHandler.HandleRPC(cs.statsCtx, end) } From 0f045e4d503d5e8c8ea6dc4779c64d945da812a5 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Thu, 8 Mar 2018 16:25:20 +0000 Subject: [PATCH 2/2] add test --- stats/stats_test.go | 3 +++ stream.go | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/stats/stats_test.go b/stats/stats_test.go index b6c7b998ce47..00a5e4f31b82 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -638,6 +638,9 @@ func checkEnd(t *testing.T, d *gotData, e *expectedData) { if d.ctx == nil { t.Fatalf("d.ctx = nil, want ") } + if st.BeginTime.IsZero() { + t.Fatalf("st.BeginTime = %v, want ", st.BeginTime) + } if st.EndTime.IsZero() { t.Fatalf("st.EndTime = %v, want ", st.EndTime) } diff --git a/stream.go b/stream.go index d7793008cafd..a3d6ea2b5a35 100644 --- a/stream.go +++ b/stream.go @@ -202,9 +202,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } ctx = newContextWithRPCInfo(ctx, c.failFast) sh := cc.dopts.copts.StatsHandler + var beginTime time.Time if sh != nil { ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) - beginTime := time.Now() + beginTime = time.Now() begin := &stats.Begin{ Client: true, BeginTime: beginTime, @@ -283,6 +284,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth statsCtx: ctx, statsHandler: cc.dopts.copts.StatsHandler, + beginTime: beginTime, } if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the @@ -337,6 +339,7 @@ type clientStream struct { // so that all the generated stats for a particular RPC can be associated in the processing phase. statsCtx context.Context statsHandler stats.Handler + beginTime time.Time } func (cs *clientStream) Context() context.Context { @@ -513,7 +516,7 @@ func (cs *clientStream) finish(err error) { if cs.statsHandler != nil { end := &stats.End{ Client: true, - BeginTime: time.Now(), + BeginTime: cs.beginTime, EndTime: time.Now(), Error: err, }