Skip to content

Commit

Permalink
honor time option (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
guoweis-work authored Aug 31, 2020
1 parent f83fabd commit b17b4e0
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions grpcreplay/grpcreplay.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ type Resender struct {
streams []*stream
r io.Reader
srp SendRequestProcessor
opts *ResenderOptions
}

type SendRequestProcessor interface {
Expand All @@ -272,16 +273,20 @@ type SendRequestProcessor interface {
NextClose(i int)
}

func NewSender(filename string, srp SendRequestProcessor) (*Resender, error) {
type ResenderOptions struct {
HonorTime bool
}

func NewSender(filename string, srp SendRequestProcessor, opts *ResenderOptions) (*Resender, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}
return NewResenderReader(f, srp)
return NewResenderReader(f, srp, opts)
}

func NewResenderReader(r io.Reader, srp SendRequestProcessor) (*Resender, error) {
return &Resender{srp: srp, r: bufio.NewReader(r)}, nil
func NewResenderReader(r io.Reader, srp SendRequestProcessor, opts *ResenderOptions) (*Resender, error) {
return &Resender{srp: srp, r: bufio.NewReader(r), opts: opts}, nil
}

func (rep *Resender) Run() error {
Expand All @@ -304,19 +309,28 @@ func (rep *Resender) Run() error {
}
switch e.kind {
case pb.Entry_REQUEST:
if rep.opts.HonorTime {
time.Sleep(time.Duration(e.delay))
}
rep.srp.NextCall(e.method, e.msg.msg)

case pb.Entry_RESPONSE:
continue

case pb.Entry_CREATE_STREAM:
if rep.opts.HonorTime {
time.Sleep(time.Duration(e.delay))
}
rep.srp.NextStream(e.method, i)
s := &stream{method: e.method, createIndex: i}
s.createErr = e.msg.err
streamsByIndex[i] = s
rep.streams = append(rep.streams, s)

case pb.Entry_SEND:
if rep.opts.HonorTime {
time.Sleep(time.Duration(e.delay))
}
s := streamsByIndex[e.refIndex]
if s == nil {
return fmt.Errorf("resender: no stream for send #%d", i)
Expand All @@ -327,6 +341,9 @@ func (rep *Resender) Run() error {
continue

case pb.Entry_CLOSE:
if rep.opts.HonorTime {
time.Sleep(time.Duration(e.delay))
}
rep.srp.NextClose(e.refIndex)

default:
Expand Down

0 comments on commit b17b4e0

Please sign in to comment.