From 8ed41d643ae148bab8e4cc324778f56a7032836a Mon Sep 17 00:00:00 2001 From: Dave Brophy Date: Thu, 26 Oct 2017 19:07:43 -0700 Subject: [PATCH] Responses in summary --- blast-config.yaml | 2 +- blaster.go | 6 +- blaster_test.go | 8 +- dummyworker/dummyworker.go | 6 +- httpworker/httpworker.go | 6 +- loop-rate.go | 3 +- loop-status.go | 185 +++++++++++++++++++++++++++++-------- loop-ticker.go | 3 +- loop-worker.go | 9 +- 9 files changed, 174 insertions(+), 54 deletions(-) diff --git a/blast-config.yaml b/blast-config.yaml index 2e670f2..aac3348 100644 --- a/blast-config.yaml +++ b/blast-config.yaml @@ -64,7 +64,7 @@ log-data: # []string with the BLAST_LOG_OUTPUT environment variable or the --log-output flag. # ------------------------- log-output: - - "code" + - "status" # ------------------------- # payload-template diff --git a/blaster.go b/blaster.go index 813772e..495d459 100644 --- a/blaster.go +++ b/blaster.go @@ -65,6 +65,8 @@ type statsDef struct { requestsFailed uint64 requestsSuccessDuration uint64 requestsDurationQueue *FiloQueue + requestsStatusQueue *FiloQueue + requestsStatusTotal *ThreadSaveMapIntInt workersBusy int64 ticksSkipped uint64 @@ -83,6 +85,8 @@ func New(ctx context.Context, cancel context.CancelFunc) *Blaster { changeRateChannel: make(chan float64, 1), stats: statsDef{ requestsDurationQueue: &FiloQueue{}, + requestsStatusQueue: &FiloQueue{}, + requestsStatusTotal: NewThreadSaveMapIntInt(), }, } @@ -155,7 +159,7 @@ func (b *Blaster) start(ctx context.Context) error { fmt.Fprintln(b.out, "Waiting for processes to finish...") b.mainWait.Wait() - b.printStatus() + b.printStatus(true) return nil } diff --git a/blaster_test.go b/blaster_test.go index 2ccc634..910b547 100644 --- a/blaster_test.go +++ b/blaster_test.go @@ -59,7 +59,7 @@ func TestNew(t *testing.T) { must(t, b.start(ctx)) - mustMatch(t, outbuf, 1, `\nSuccess\:\s+3\n`) + mustMatch(t, outbuf, 1, `\nSuccess\:\s+3 requests\n`) outLog.must(t, 1, []string{"45583464115695f2|e60a15c85c691ab8", "true"}) outLog.must(t, 2, []string{"6258a554f446f0a7|4111d6d36a631a68", "true"}) @@ -77,8 +77,8 @@ func TestNew(t *testing.T) { must(t, b1.loadPreviousLogsFromReader(outLog.reader())) must(t, b1.start(ctx)) - mustMatch(t, outbuf1, 1, `\nSuccess\:\s+1\n`) - mustMatch(t, outbuf1, 1, `\nSkipped\:\s+3 \(from previous run\)\n`) + mustMatch(t, outbuf1, 1, `\nSuccess\:\s+1 requests\n`) + mustMatch(t, outbuf1, 1, `\nSkipped\:\s+3 requests \(from previous run\)\n`) outLog.must(t, 4, []string{"b0528e8eb39663df|9010bda07e0d725b", "true"}) b2, outbuf2 := defaultOptions( @@ -91,7 +91,7 @@ func TestNew(t *testing.T) { ) must(t, b2.start(ctx)) - mustMatch(t, outbuf2, 1, `\nFailed\:\s+1\n`) + mustMatch(t, outbuf2, 1, `\nFailed\:\s+1 requests\n`) outLog.must(t, 5, []string{"d91d9c633503397f|8ecfa63bc2072fe5", "false"}) } diff --git a/dummyworker/dummyworker.go b/dummyworker/dummyworker.go index 141af2a..bb3ee7a 100644 --- a/dummyworker/dummyworker.go +++ b/dummyworker/dummyworker.go @@ -56,11 +56,11 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[stri // Dummy worker - return an error sometimes errorrand := r.Float64() if errorrand > 0.95 { - return map[string]interface{}{"code": 500}, errors.New("Error 500") + return map[string]interface{}{"status": 500}, errors.New("Error 500") } else if errorrand > 0.7 { - return map[string]interface{}{"code": 404}, errors.New("Error 404") + return map[string]interface{}{"status": 404}, errors.New("Error 404") } else { - return map[string]interface{}{"code": 200}, nil + return map[string]interface{}{"status": 200}, nil } } diff --git a/httpworker/httpworker.go b/httpworker/httpworker.go index ad2cc85..0375697 100644 --- a/httpworker/httpworker.go +++ b/httpworker/httpworker.go @@ -30,12 +30,12 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (response request = request.WithContext(ctx) r, err := http.DefaultClient.Do(request) if err != nil { - return map[string]interface{}{"code": r.StatusCode}, errors.WithStack(err) + return map[string]interface{}{"status": r.StatusCode}, errors.WithStack(err) } if r.StatusCode != 200 { - return map[string]interface{}{"code": r.StatusCode}, errors.New("Non 200 status code") + return map[string]interface{}{"status": r.StatusCode}, errors.New("Non 200 status") } - return map[string]interface{}{"code": 200}, nil + return map[string]interface{}{"status": 200}, nil } type def struct { diff --git a/loop-rate.go b/loop-rate.go index e10bc53..4202ed0 100644 --- a/loop-rate.go +++ b/loop-rate.go @@ -45,8 +45,7 @@ func (b *Blaster) startRateLoop(ctx context.Context) { case s := <-readString(): s = strings.TrimSpace(s) if s == "" { - b.printStatus() - b.printRatePrompt() + b.printStatus(false) continue } f, err := strconv.ParseFloat(s, 64) diff --git a/loop-status.go b/loop-status.go index 68b879e..adc66fc 100644 --- a/loop-status.go +++ b/loop-status.go @@ -3,8 +3,10 @@ package blast import ( "context" "fmt" + "sort" "sync" "sync/atomic" + "text/tabwriter" "time" ) @@ -23,49 +25,121 @@ func (b *Blaster) startStatusLoop(ctx context.Context) { ticker.Stop() return case <-ticker.C: - b.printStatus() - b.printRatePrompt() + b.printStatus(false) } } }() } -func (b *Blaster) printStatus() { - var durationTotal, durationInstant uint64 +func (b *Blaster) printStatus(final bool) { success := atomic.LoadUint64(&b.stats.requestsSuccess) + + type def struct { + status int + total uint64 + instant uint64 + } + statusQueue := b.stats.requestsStatusQueue.All() + statusTotals := b.stats.requestsStatusTotal.All() + summary := map[int]def{} + for _, status := range statusQueue { + if status == 0 { + continue + } + if _, found := summary[status]; !found { + summary[status] = def{ + status: status, + total: 0, + instant: 0, + } + } + current := summary[status] + summary[status] = def{ + status: current.status, + total: current.total, + instant: current.instant + 1, + } + } + for status, total := range statusTotals { + if status == 0 { + continue + } + if _, found := summary[status]; !found { + summary[status] = def{ + status: status, + total: 0, + instant: 0, + } + } + current := summary[status] + summary[status] = def{ + status: current.status, + total: total, + instant: current.instant, + } + } + var ordered []def + for _, v := range summary { + ordered = append(ordered, v) + } + sort.Slice(ordered, func(i, j int) bool { return ordered[i].status < ordered[j].status }) + + w := tabwriter.NewWriter(b.out, 0, 0, 1, ' ', 0) + + w.Write([]byte("\n")) + w.Write([]byte("Summary\n")) + w.Write([]byte("=======\n")) + if !final { + if len(b.config.PayloadVariants) > 1 { + requestsPerSec := b.rate * float64(len(b.config.PayloadVariants)) + w.Write([]byte(fmt.Sprintf("Rate:\t%.0f items/sec (%.0f requests/sec)\n", b.rate, requestsPerSec))) + } else { + w.Write([]byte(fmt.Sprintf("Rate:\t%.0f items/sec \n", b.rate))) + } + } + w.Write([]byte(fmt.Sprintf("Started:\t%d requests\n", atomic.LoadUint64(&b.stats.requestsStarted)))) + w.Write([]byte(fmt.Sprintf("Finished:\t%d requests\n", atomic.LoadUint64(&b.stats.requestsFinished)))) + w.Write([]byte(fmt.Sprintf("Success:\t%d requests\n", atomic.LoadUint64(&b.stats.requestsSuccess)))) + w.Write([]byte(fmt.Sprintf("Failed:\t%d requests\n", atomic.LoadUint64(&b.stats.requestsFailed)))) + + requestsSkipped := atomic.LoadUint64(&b.stats.requestsSkipped) + if requestsSkipped > 0 { + w.Write([]byte(fmt.Sprintf("Skipped:\t%d requests (from previous run)\n", requestsSkipped))) + } + if success > 0 { - durationTotal = atomic.LoadUint64(&b.stats.requestsSuccessDuration) / success + durationTotal := atomic.LoadUint64(&b.stats.requestsSuccessDuration) / success + if success > INSTANT_COUNT { + durationInstant := b.stats.requestsDurationQueue.Sum() / INSTANT_COUNT + w.Write([]byte(fmt.Sprintf("Latency:\t%v ms per request (last %d: %v ms per request)\n", durationTotal, INSTANT_COUNT, durationInstant))) + } else { + w.Write([]byte(fmt.Sprintf("Latency:\t%v ms per request\n", durationTotal))) + } + } else { + w.Write([]byte(fmt.Sprintf("Latency:\tn/a\n"))) } - if success > INSTANT_COUNT { - durationInstant = b.stats.requestsDurationQueue.Sum() / INSTANT_COUNT + + if !final { + w.Write([]byte(fmt.Sprintf("Concurrency:\t%d / %d workers in use\n", atomic.LoadInt64(&b.stats.workersBusy), b.config.Workers))) } - fmt.Fprintf(b.out, ` -Status -====== -Rate: %.0f items / second -Started: %d items -Finished: %d items -Success: %d -Failed: %d -Skipped: %d (from previous run) -Latency: %v ms (last %d requests: %v ms) -Concurrency: %d / %d workers in use -Skipped ticks: %d (when all workers are busy) + skippedTicks := atomic.LoadUint64(&b.stats.ticksSkipped) + if skippedTicks > 0 { + w.Write([]byte(fmt.Sprintf("Skipped ticks:\t%d (when all workers are busy)\n", skippedTicks))) + } + w.Write([]byte("\t\n")) + if len(ordered) > 0 { + w.Write([]byte("Responses\t\n")) + w.Write([]byte("=========\t\n")) + for _, v := range ordered { + w.Write([]byte(fmt.Sprintf("%d:\t%d requests (last %d: %d requests)\n", v.status, v.total, INSTANT_COUNT, v.instant))) + } + w.Write([]byte("\n")) + } + w.Flush() -`, - b.rate, - atomic.LoadUint64(&b.stats.requestsStarted), - atomic.LoadUint64(&b.stats.requestsFinished), - atomic.LoadUint64(&b.stats.requestsSuccess), - atomic.LoadUint64(&b.stats.requestsFailed), - atomic.LoadUint64(&b.stats.requestsSkipped), - durationTotal, - INSTANT_COUNT, - durationInstant, - atomic.LoadInt64(&b.stats.workersBusy), - b.config.Workers, - atomic.LoadUint64(&b.stats.ticksSkipped), - ) + if !final { + b.printRatePrompt() + } } func (b *Blaster) printRatePrompt() { @@ -78,13 +152,40 @@ Rate? ) } +func NewThreadSaveMapIntInt() *ThreadSaveMapIntInt { + return &ThreadSaveMapIntInt{ + data: map[int]uint64{}, + } +} + +type ThreadSaveMapIntInt struct { + data map[int]uint64 + m sync.Mutex +} + +func (t *ThreadSaveMapIntInt) Increment(key int) { + t.m.Lock() + defer t.m.Unlock() + t.data[key] = t.data[key] + 1 +} + +func (t *ThreadSaveMapIntInt) All() map[int]uint64 { + t.m.Lock() + defer t.m.Unlock() + out := map[int]uint64{} + for k, v := range t.data { + out[k] = v + } + return out +} + type FiloQueue struct { - data [INSTANT_COUNT]uint64 + data [INSTANT_COUNT]int m sync.Mutex cursor int } -func (f *FiloQueue) Add(v uint64) { +func (f *FiloQueue) Add(v int) { f.m.Lock() defer f.m.Unlock() f.data[f.cursor] = v @@ -94,12 +195,22 @@ func (f *FiloQueue) Add(v uint64) { } } -func (f *FiloQueue) Sum() uint64 { +func (f *FiloQueue) Sum() int { f.m.Lock() defer f.m.Unlock() - var sum uint64 + var sum int for _, v := range f.data { sum += v } return sum } + +func (f *FiloQueue) All() [INSTANT_COUNT]int { + f.m.Lock() + defer f.m.Unlock() + var out [INSTANT_COUNT]int + for i, v := range f.data { + out[i] = v + } + return out +} diff --git a/loop-ticker.go b/loop-ticker.go index 97d46fd..7e752c0 100644 --- a/loop-ticker.go +++ b/loop-ticker.go @@ -28,8 +28,7 @@ func (b *Blaster) startTickerLoop(ctx context.Context) { b.rate = rate ticker.Stop() ticker = time.NewTicker(time.Second / time.Duration(b.rate)) - b.printStatus() - b.printRatePrompt() + b.printStatus(false) case b.mainChannel <- struct{}{}: // if main loop is waiting, send it a message default: diff --git a/loop-worker.go b/loop-worker.go index 5e0aea0..88d7d3d 100644 --- a/loop-worker.go +++ b/loop-worker.go @@ -114,7 +114,7 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri if success { atomic.AddUint64(&b.stats.requestsSuccess, 1) atomic.AddUint64(&b.stats.requestsSuccessDuration, uint64(elapsed)) - b.stats.requestsDurationQueue.Add(uint64(elapsed)) + b.stats.requestsDurationQueue.Add(int(elapsed)) } else { atomic.AddUint64(&b.stats.requestsFailed, 1) } @@ -138,6 +138,13 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri extraFields = append(extraFields, val) } + if status, ok := out["status"]; ok { + if statusInt, ok := status.(int); ok { + b.stats.requestsStatusTotal.Increment(statusInt) + b.stats.requestsStatusQueue.Add(statusInt) + } + } + lr := logRecord{ PayloadHash: hash, Result: success,