Skip to content

Commit

Permalink
Responses in summary
Browse files Browse the repository at this point in the history
  • Loading branch information
dave committed Oct 27, 2017
1 parent d4fc170 commit 8ed41d6
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 54 deletions.
2 changes: 1 addition & 1 deletion blast-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ log-data:
# []string with the BLAST_LOG_OUTPUT environment variable or the --log-output flag.
# -------------------------
log-output:
- "code"
- "status"

# -------------------------
# payload-template
Expand Down
6 changes: 5 additions & 1 deletion blaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type statsDef struct {
requestsFailed uint64
requestsSuccessDuration uint64
requestsDurationQueue *FiloQueue
requestsStatusQueue *FiloQueue
requestsStatusTotal *ThreadSaveMapIntInt

workersBusy int64
ticksSkipped uint64
Expand All @@ -83,6 +85,8 @@ func New(ctx context.Context, cancel context.CancelFunc) *Blaster {
changeRateChannel: make(chan float64, 1),
stats: statsDef{
requestsDurationQueue: &FiloQueue{},
requestsStatusQueue: &FiloQueue{},
requestsStatusTotal: NewThreadSaveMapIntInt(),
},
}

Expand Down Expand Up @@ -155,7 +159,7 @@ func (b *Blaster) start(ctx context.Context) error {
fmt.Fprintln(b.out, "Waiting for processes to finish...")
b.mainWait.Wait()

b.printStatus()
b.printStatus(true)

return nil
}
Expand Down
8 changes: 4 additions & 4 deletions blaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestNew(t *testing.T) {

must(t, b.start(ctx))

mustMatch(t, outbuf, 1, `\nSuccess\:\s+3\n`)
mustMatch(t, outbuf, 1, `\nSuccess\:\s+3 requests\n`)

outLog.must(t, 1, []string{"45583464115695f2|e60a15c85c691ab8", "true"})
outLog.must(t, 2, []string{"6258a554f446f0a7|4111d6d36a631a68", "true"})
Expand All @@ -77,8 +77,8 @@ func TestNew(t *testing.T) {
must(t, b1.loadPreviousLogsFromReader(outLog.reader()))
must(t, b1.start(ctx))

mustMatch(t, outbuf1, 1, `\nSuccess\:\s+1\n`)
mustMatch(t, outbuf1, 1, `\nSkipped\:\s+3 \(from previous run\)\n`)
mustMatch(t, outbuf1, 1, `\nSuccess\:\s+1 requests\n`)
mustMatch(t, outbuf1, 1, `\nSkipped\:\s+3 requests \(from previous run\)\n`)
outLog.must(t, 4, []string{"b0528e8eb39663df|9010bda07e0d725b", "true"})

b2, outbuf2 := defaultOptions(
Expand All @@ -91,7 +91,7 @@ func TestNew(t *testing.T) {
)

must(t, b2.start(ctx))
mustMatch(t, outbuf2, 1, `\nFailed\:\s+1\n`)
mustMatch(t, outbuf2, 1, `\nFailed\:\s+1 requests\n`)
outLog.must(t, 5, []string{"d91d9c633503397f|8ecfa63bc2072fe5", "false"})

}
Expand Down
6 changes: 3 additions & 3 deletions dummyworker/dummyworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[stri
// Dummy worker - return an error sometimes
errorrand := r.Float64()
if errorrand > 0.95 {
return map[string]interface{}{"code": 500}, errors.New("Error 500")
return map[string]interface{}{"status": 500}, errors.New("Error 500")
} else if errorrand > 0.7 {
return map[string]interface{}{"code": 404}, errors.New("Error 404")
return map[string]interface{}{"status": 404}, errors.New("Error 404")
} else {
return map[string]interface{}{"code": 200}, nil
return map[string]interface{}{"status": 200}, nil
}
}

Expand Down
6 changes: 3 additions & 3 deletions httpworker/httpworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (response
request = request.WithContext(ctx)
r, err := http.DefaultClient.Do(request)
if err != nil {
return map[string]interface{}{"code": r.StatusCode}, errors.WithStack(err)
return map[string]interface{}{"status": r.StatusCode}, errors.WithStack(err)
}
if r.StatusCode != 200 {
return map[string]interface{}{"code": r.StatusCode}, errors.New("Non 200 status code")
return map[string]interface{}{"status": r.StatusCode}, errors.New("Non 200 status")
}
return map[string]interface{}{"code": 200}, nil
return map[string]interface{}{"status": 200}, nil
}

type def struct {
Expand Down
3 changes: 1 addition & 2 deletions loop-rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ func (b *Blaster) startRateLoop(ctx context.Context) {
case s := <-readString():
s = strings.TrimSpace(s)
if s == "" {
b.printStatus()
b.printRatePrompt()
b.printStatus(false)
continue
}
f, err := strconv.ParseFloat(s, 64)
Expand Down
185 changes: 148 additions & 37 deletions loop-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package blast
import (
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"text/tabwriter"
"time"
)

Expand All @@ -23,49 +25,121 @@ func (b *Blaster) startStatusLoop(ctx context.Context) {
ticker.Stop()
return
case <-ticker.C:
b.printStatus()
b.printRatePrompt()
b.printStatus(false)
}
}
}()
}

func (b *Blaster) printStatus() {
var durationTotal, durationInstant uint64
func (b *Blaster) printStatus(final bool) {
success := atomic.LoadUint64(&b.stats.requestsSuccess)

type def struct {
status int
total uint64
instant uint64
}
statusQueue := b.stats.requestsStatusQueue.All()
statusTotals := b.stats.requestsStatusTotal.All()
summary := map[int]def{}
for _, status := range statusQueue {
if status == 0 {
continue
}
if _, found := summary[status]; !found {
summary[status] = def{
status: status,
total: 0,
instant: 0,
}
}
current := summary[status]
summary[status] = def{
status: current.status,
total: current.total,
instant: current.instant + 1,
}
}
for status, total := range statusTotals {
if status == 0 {
continue
}
if _, found := summary[status]; !found {
summary[status] = def{
status: status,
total: 0,
instant: 0,
}
}
current := summary[status]
summary[status] = def{
status: current.status,
total: total,
instant: current.instant,
}
}
var ordered []def
for _, v := range summary {
ordered = append(ordered, v)
}
sort.Slice(ordered, func(i, j int) bool { return ordered[i].status < ordered[j].status })

w := tabwriter.NewWriter(b.out, 0, 0, 1, ' ', 0)

w.Write([]byte("\n"))
w.Write([]byte("Summary\n"))
w.Write([]byte("=======\n"))
if !final {
if len(b.config.PayloadVariants) > 1 {
requestsPerSec := b.rate * float64(len(b.config.PayloadVariants))
w.Write([]byte(fmt.Sprintf("Rate:\t%.0f items/sec (%.0f requests/sec)\n", b.rate, requestsPerSec)))
} else {
w.Write([]byte(fmt.Sprintf("Rate:\t%.0f items/sec \n", b.rate)))
}
}
w.Write([]byte(fmt.Sprintf("Started:\t%d requests\n", atomic.LoadUint64(&b.stats.requestsStarted))))
w.Write([]byte(fmt.Sprintf("Finished:\t%d requests\n", atomic.LoadUint64(&b.stats.requestsFinished))))
w.Write([]byte(fmt.Sprintf("Success:\t%d requests\n", atomic.LoadUint64(&b.stats.requestsSuccess))))
w.Write([]byte(fmt.Sprintf("Failed:\t%d requests\n", atomic.LoadUint64(&b.stats.requestsFailed))))

requestsSkipped := atomic.LoadUint64(&b.stats.requestsSkipped)
if requestsSkipped > 0 {
w.Write([]byte(fmt.Sprintf("Skipped:\t%d requests (from previous run)\n", requestsSkipped)))
}

if success > 0 {
durationTotal = atomic.LoadUint64(&b.stats.requestsSuccessDuration) / success
durationTotal := atomic.LoadUint64(&b.stats.requestsSuccessDuration) / success
if success > INSTANT_COUNT {
durationInstant := b.stats.requestsDurationQueue.Sum() / INSTANT_COUNT
w.Write([]byte(fmt.Sprintf("Latency:\t%v ms per request (last %d: %v ms per request)\n", durationTotal, INSTANT_COUNT, durationInstant)))
} else {
w.Write([]byte(fmt.Sprintf("Latency:\t%v ms per request\n", durationTotal)))
}
} else {
w.Write([]byte(fmt.Sprintf("Latency:\tn/a\n")))
}
if success > INSTANT_COUNT {
durationInstant = b.stats.requestsDurationQueue.Sum() / INSTANT_COUNT

if !final {
w.Write([]byte(fmt.Sprintf("Concurrency:\t%d / %d workers in use\n", atomic.LoadInt64(&b.stats.workersBusy), b.config.Workers)))
}
fmt.Fprintf(b.out, `
Status
======
Rate: %.0f items / second
Started: %d items
Finished: %d items
Success: %d
Failed: %d
Skipped: %d (from previous run)
Latency: %v ms (last %d requests: %v ms)
Concurrency: %d / %d workers in use
Skipped ticks: %d (when all workers are busy)
skippedTicks := atomic.LoadUint64(&b.stats.ticksSkipped)
if skippedTicks > 0 {
w.Write([]byte(fmt.Sprintf("Skipped ticks:\t%d (when all workers are busy)\n", skippedTicks)))
}
w.Write([]byte("\t\n"))
if len(ordered) > 0 {
w.Write([]byte("Responses\t\n"))
w.Write([]byte("=========\t\n"))
for _, v := range ordered {
w.Write([]byte(fmt.Sprintf("%d:\t%d requests (last %d: %d requests)\n", v.status, v.total, INSTANT_COUNT, v.instant)))
}
w.Write([]byte("\n"))
}
w.Flush()

`,
b.rate,
atomic.LoadUint64(&b.stats.requestsStarted),
atomic.LoadUint64(&b.stats.requestsFinished),
atomic.LoadUint64(&b.stats.requestsSuccess),
atomic.LoadUint64(&b.stats.requestsFailed),
atomic.LoadUint64(&b.stats.requestsSkipped),
durationTotal,
INSTANT_COUNT,
durationInstant,
atomic.LoadInt64(&b.stats.workersBusy),
b.config.Workers,
atomic.LoadUint64(&b.stats.ticksSkipped),
)
if !final {
b.printRatePrompt()
}
}

func (b *Blaster) printRatePrompt() {
Expand All @@ -78,13 +152,40 @@ Rate?
)
}

func NewThreadSaveMapIntInt() *ThreadSaveMapIntInt {
return &ThreadSaveMapIntInt{
data: map[int]uint64{},
}
}

type ThreadSaveMapIntInt struct {
data map[int]uint64
m sync.Mutex
}

func (t *ThreadSaveMapIntInt) Increment(key int) {
t.m.Lock()
defer t.m.Unlock()
t.data[key] = t.data[key] + 1
}

func (t *ThreadSaveMapIntInt) All() map[int]uint64 {
t.m.Lock()
defer t.m.Unlock()
out := map[int]uint64{}
for k, v := range t.data {
out[k] = v
}
return out
}

type FiloQueue struct {
data [INSTANT_COUNT]uint64
data [INSTANT_COUNT]int
m sync.Mutex
cursor int
}

func (f *FiloQueue) Add(v uint64) {
func (f *FiloQueue) Add(v int) {
f.m.Lock()
defer f.m.Unlock()
f.data[f.cursor] = v
Expand All @@ -94,12 +195,22 @@ func (f *FiloQueue) Add(v uint64) {
}
}

func (f *FiloQueue) Sum() uint64 {
func (f *FiloQueue) Sum() int {
f.m.Lock()
defer f.m.Unlock()
var sum uint64
var sum int
for _, v := range f.data {
sum += v
}
return sum
}

func (f *FiloQueue) All() [INSTANT_COUNT]int {
f.m.Lock()
defer f.m.Unlock()
var out [INSTANT_COUNT]int
for i, v := range f.data {
out[i] = v
}
return out
}
3 changes: 1 addition & 2 deletions loop-ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func (b *Blaster) startTickerLoop(ctx context.Context) {
b.rate = rate
ticker.Stop()
ticker = time.NewTicker(time.Second / time.Duration(b.rate))
b.printStatus()
b.printRatePrompt()
b.printStatus(false)
case b.mainChannel <- struct{}{}:
// if main loop is waiting, send it a message
default:
Expand Down
9 changes: 8 additions & 1 deletion loop-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri
if success {
atomic.AddUint64(&b.stats.requestsSuccess, 1)
atomic.AddUint64(&b.stats.requestsSuccessDuration, uint64(elapsed))
b.stats.requestsDurationQueue.Add(uint64(elapsed))
b.stats.requestsDurationQueue.Add(int(elapsed))
} else {
atomic.AddUint64(&b.stats.requestsFailed, 1)
}
Expand All @@ -138,6 +138,13 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri
extraFields = append(extraFields, val)
}

if status, ok := out["status"]; ok {
if statusInt, ok := status.(int); ok {
b.stats.requestsStatusTotal.Increment(statusInt)
b.stats.requestsStatusQueue.Add(statusInt)
}
}

lr := logRecord{
PayloadHash: hash,
Result: success,
Expand Down

0 comments on commit 8ed41d6

Please sign in to comment.