From 0741e39c188c3c3546a7d7651d546a8a356a08f4 Mon Sep 17 00:00:00 2001 From: guowei shieh Date: Mon, 31 Aug 2020 12:02:01 -0700 Subject: [PATCH] honor time option --- grpcreplay/grpcreplay.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/grpcreplay/grpcreplay.go b/grpcreplay/grpcreplay.go index 013e779..e63e219 100644 --- a/grpcreplay/grpcreplay.go +++ b/grpcreplay/grpcreplay.go @@ -263,6 +263,7 @@ type Resender struct { streams []*stream r io.Reader srp SendRequestProcessor + opts *ResenderOptions } type SendRequestProcessor interface { @@ -272,16 +273,20 @@ type SendRequestProcessor interface { NextClose(i int) } -func NewSender(filename string, srp SendRequestProcessor) (*Resender, error) { +type ResenderOptions struct { + HonorTime bool +} + +func NewSender(filename string, srp SendRequestProcessor, opts *ResenderOptions) (*Resender, error) { f, err := os.Open(filename) if err != nil { return nil, err } - return NewResenderReader(f, srp) + return NewResenderReader(f, srp, opts) } -func NewResenderReader(r io.Reader, srp SendRequestProcessor) (*Resender, error) { - return &Resender{srp: srp, r: bufio.NewReader(r)}, nil +func NewResenderReader(r io.Reader, srp SendRequestProcessor, opts *ResenderOptions) (*Resender, error) { + return &Resender{srp: srp, r: bufio.NewReader(r), opts: opts}, nil } func (rep *Resender) Run() error { @@ -304,12 +309,18 @@ func (rep *Resender) Run() error { } switch e.kind { case pb.Entry_REQUEST: + if rep.opts.HonorTime { + time.Sleep(time.Duration(e.delay)) + } rep.srp.NextCall(e.method, e.msg.msg) case pb.Entry_RESPONSE: continue case pb.Entry_CREATE_STREAM: + if rep.opts.HonorTime { + time.Sleep(time.Duration(e.delay)) + } rep.srp.NextStream(e.method, i) s := &stream{method: e.method, createIndex: i} s.createErr = e.msg.err @@ -317,6 +328,9 @@ func (rep *Resender) Run() error { rep.streams = append(rep.streams, s) case pb.Entry_SEND: + if rep.opts.HonorTime { + time.Sleep(time.Duration(e.delay)) + } s := streamsByIndex[e.refIndex] if s == nil { return fmt.Errorf("resender: no stream for send #%d", i) @@ -327,6 +341,9 @@ func (rep *Resender) Run() error { continue case pb.Entry_CLOSE: + if rep.opts.HonorTime { + time.Sleep(time.Duration(e.delay)) + } rep.srp.NextClose(e.refIndex) default: