Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress bar for benchmark tests #196

Closed
wants to merge 9 commits into from
35 changes: 32 additions & 3 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package main

import (
"errors"
"io/ioutil"
"os"
"os/signal"
"runtime"
Expand All @@ -31,9 +32,14 @@ import (
"github.com/yarpc/yab/limiter"
"github.com/yarpc/yab/statsd"
"github.com/yarpc/yab/transport"

pb "gopkg.in/cheggaaa/pb.v1"
)

var (
progressMarker int
progressUnit pb.Units

errNegativeDuration = errors.New("duration cannot be negative")
errNegativeMaxReqs = errors.New("max requests cannot be negative")
)
Expand Down Expand Up @@ -75,8 +81,10 @@ func (o BenchmarkOptions) enabled() bool {
return o.MaxDuration != 0 || o.MaxRequests != 0
}

func runWorker(t transport.Transport, m benchmarkMethod, s *benchmarkState, run *limiter.Run) {
func runWorker(t transport.Transport, m benchmarkMethod, s *benchmarkState, run *limiter.Run, pb *pb.ProgressBar) {
for cur := run; cur.More(); {
pb.Increment()

latency, err := m.call(t)
if err != nil {
s.recordError(err)
Expand All @@ -99,8 +107,10 @@ func runBenchmark(out output, allOpts Options, m benchmarkMethod) {

if opts.RPS > 0 && opts.MaxDuration > 0 {
// The RPS * duration in seconds may cap opts.MaxRequests.
// This int cast rounds down 1.0 * 999ms (0.999) = 0.999 = int(0.999) = 0
rpsMax := int(float64(opts.RPS) * opts.MaxDuration.Seconds())
if rpsMax < opts.MaxRequests || opts.MaxRequests == 0 {

if (rpsMax > 0 && rpsMax < opts.MaxRequests) || opts.MaxRequests == 0 {
opts.MaxRequests = rpsMax
}
}
Expand Down Expand Up @@ -132,6 +142,15 @@ func runBenchmark(out output, allOpts Options, m benchmarkMethod) {
states[i] = newBenchmarkState(statter)
}

progressMarker, progressUnit = progressBarSetup(&opts)
progressBar := pb.New(progressMarker)
progressBar.SetUnits(progressUnit)
progressBar.Output = ioutil.Discard
if opts.ProgressBar {
progressBar.Output = out
}
progressBar.Start()

run := limiter.New(opts.MaxRequests, opts.RPS, opts.MaxDuration)
stopOnInterrupt(out, run)

Expand All @@ -143,7 +162,7 @@ func runBenchmark(out output, allOpts Options, m benchmarkMethod) {
wg.Add(1)
go func(c transport.Transport) {
defer wg.Done()
runWorker(c, m, state, run)
runWorker(c, m, state, run, progressBar)
}(c)
}
}
Expand All @@ -153,6 +172,9 @@ func runBenchmark(out output, allOpts Options, m benchmarkMethod) {
// Wait for all the worker goroutines to end.
wg.Wait()
total := time.Since(start)
progressBar.Finish()

// progressBar.FinishPrint("Benchmark finished")

// Merge all the states into 0
overall := states[0]
Expand Down Expand Up @@ -180,3 +202,10 @@ func stopOnInterrupt(out output, r *limiter.Run) {
r.Stop()
}()
}

func progressBarSetup(opts *BenchmarkOptions) (int, pb.Units) {
if opts.MaxRequests > 0 {
return opts.MaxRequests, pb.U_NO
}
return int(opts.MaxDuration), pb.U_DURATION
}
100 changes: 94 additions & 6 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/uber/tchannel-go/testutils"
"go.uber.org/atomic"
"gopkg.in/cheggaaa/pb.v1"
)

func TestBenchmark(t *testing.T) {
Expand All @@ -41,24 +42,33 @@ func TestBenchmark(t *testing.T) {
rps int
want int
wantDuration time.Duration

wantMarker int
wantUnit pb.Units
}{
{
msg: "Capped by max requests",
n: 100,
d: 100 * time.Second,
want: 100,
msg: "Capped by max requests",
n: 100,
d: 100 * time.Second,
want: 100,
wantMarker: 100,
wantUnit: pb.U_NO,
},
{
msg: "Capped by RPS * duration",
d: 500 * time.Millisecond,
rps: 120,
want: 60,
wantDuration: 500 * time.Millisecond,
wantMarker: 60,
wantUnit: pb.U_NO,
},
{
msg: "Capped by duration",
d: 500 * time.Millisecond,
wantDuration: 500 * time.Millisecond,
wantMarker: int(500 * time.Millisecond),
wantUnit: pb.U_DURATION,
},
}

Expand All @@ -71,18 +81,20 @@ func TestBenchmark(t *testing.T) {
}))

m := benchmarkMethodForTest(t, fooMethod, transport.TChannel)
buf, _, out := getOutput(t)

for _, tt := range tests {
buf.Reset()
requests.Store(0)

start := time.Now()
buf, _, out := getOutput(t)

runBenchmark(out, Options{
BOpts: BenchmarkOptions{
MaxRequests: tt.n,
MaxDuration: tt.d,
RPS: tt.rps,
Connections: 50,
Connections: 25,
Concurrency: 2,
},
TOpts: s.transportOpts(),
Expand All @@ -96,6 +108,8 @@ func TestBenchmark(t *testing.T) {
assert.EqualValues(t, tt.want, requests.Load(),
"%v: Invalid number of requests", tt.msg)
}
assert.Equal(t, tt.wantMarker, progressMarker, "progress bar total should be: %v", tt.wantMarker)
assert.Equal(t, tt.wantUnit, progressUnit, "progress bar unit should be %v", tt.wantUnit)

if tt.wantDuration != 0 {
// Make sure the total duration is within a delta.
Expand Down Expand Up @@ -149,3 +163,77 @@ func TestRunBenchmarkErrors(t *testing.T) {
assert.Contains(t, fatalMessage, tt.wantErr, "Missing error for %+v", tt.opts)
}
}

func TestBenchmarkProgressBar(t *testing.T) {
tests := []struct {
msg string
maxRequests int
d time.Duration
rps int

wantOut string
wantMarker int
wantUnit pb.Units
}{
{
msg: "RPS simple progrss bar",
maxRequests: 100,
wantMarker: 100,
wantUnit: pb.U_NO,
wantOut: "100 / 100 100.00%",
},
{
msg: "Duration progress bar",
d: 1 * time.Second,
wantMarker: 1,
wantUnit: pb.U_DURATION,
wantOut: "1s",
},
{
msg: "RPS times duration in seconds 1 second",
d: 1 * time.Second,
rps: 177,
wantMarker: 177,
wantUnit: pb.U_NO,
wantOut: "177 / 177 100.00%",
},
{
msg: "RPS times duration in seconds",
d: 500 * time.Millisecond,
rps: 100,
wantMarker: 50,
wantUnit: pb.U_NO,
wantOut: "50 / 50 100.00%",
},
}
var requests atomic.Int32
s := newServer(t)
defer s.shutdown()
s.register(fooMethod, methods.errorIf(func() bool {
requests.Inc()
return false
}))

m := benchmarkMethodForTest(t, fooMethod, transport.TChannel)

buf, _, out := getOutput(t)
for _, tt := range tests {
buf.Reset()
requests.Store(0)

t.Run(fmt.Sprintf(tt.msg), func(t *testing.T) {
runBenchmark(out, Options{
BOpts: BenchmarkOptions{
MaxRequests: tt.maxRequests,
MaxDuration: tt.d,
RPS: tt.rps,
Connections: 50,
Concurrency: 2,
ProgressBar: true,
},
TOpts: s.transportOpts(),
}, m)
assert.Contains(t, buf.String(), tt.wantOut)
})
}
}
67 changes: 60 additions & 7 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import:
version: ^1
- package: github.com/uber/jaeger-client-go
version: ^1
- package: gopkg.in/cheggaaa/pb.v1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the github repo supports semver, maybe better to go directly with that and ^1

version: ^1
testImport:
- package: github.com/apache/thrift
version: master
Expand Down
3 changes: 3 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type BenchmarkOptions struct {

// Benchmark metrics can optionally be reported via statsd.
StatsdHostPort string `long:"statsd" description:"Optional host:port of a StatsD server to report metrics"`

// Output options
ProgressBar bool `long:"progress-bar" description:"show a progress bar for the benchmark."`
}

func newOptions() *Options {
Expand Down