Skip to content

Commit

Permalink
More tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dave committed Oct 23, 2017
1 parent 487724b commit dbe0e3e
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 56 deletions.
24 changes: 14 additions & 10 deletions blaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@ import (
"runtime"
"sync"
"time"

"github.com/leemcloughlin/gofarmhash"
)

const DEBUG = false
const INSTANT_COUNT = 100

type Blaster struct {
config *configDef
rate float64
skip map[farmhash.Uint128]struct{}
dataCloser io.Closer
dataReader DataReader
dataHeaders []string
logCloser io.Closer
logWriter LogWriteFlusher
cancel context.CancelFunc
out io.Writer
config *configDef
rate float64
skip map[farmhash.Uint128]struct{}
dataCloser io.Closer
dataReader DataReader
dataHeaders []string
logCloser io.Closer
logWriter LogWriteFlusher
cancel context.CancelFunc
out io.Writer
rateInputReader io.Reader

mainChannel chan struct{}
errorChannel chan error
Expand Down Expand Up @@ -122,6 +124,8 @@ func (b *Blaster) Start(ctx context.Context) error {
}
defer b.flushAndCloseLog()

b.rateInputReader = os.Stdin

return b.start(ctx)
}

Expand Down
284 changes: 242 additions & 42 deletions blaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,169 @@ import (
"bytes"
"context"
"encoding/csv"
"errors"
"regexp"
"strings"
"sync"
"testing"

"fmt"

"time"

"github.com/pkg/errors"
)

func defaultOptions(
ctx context.Context,
cancel context.CancelFunc,
in string,
workerType string,
logWriter LogWriteFlusher,
workerLog *LoggingWorkerLog,
) (*Blaster, *bytes.Buffer) {
b := New(ctx, cancel)
b.RegisterWorkerType("success", workerLog.NewSuccess)
b.RegisterWorkerType("fail", workerLog.NewFail)
b.RegisterWorkerType("hang", workerLog.NewHang)
b.config = &configDef{}
b.config.Workers = 1
b.config.WorkerType = workerType
b.config.PayloadVariants = []map[string]string{{}}
b.rate = 100
b.dataHeaders = []string{"head"}
b.dataReader = csv.NewReader(strings.NewReader(in))
b.logWriter = logWriter
outbuf := new(bytes.Buffer)
b.out = outbuf
b.rateInputReader = strings.NewReader("")
return b, outbuf
}

func TestNew(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
workerLog := new(LoggingWorkerLog)
outLog := new(LoggingWriter)
outLog.Write([]string{"hash", "result"})

newBlast := func(in string, workerType string, logbuf *bytes.Buffer) (*Blaster, *bytes.Buffer) {
b := New(ctx, cancel)
b.RegisterWorkerType("success", NewImmediateSuccessWorker)
b.RegisterWorkerType("fail", NewImmediateFailWorker)
b.config = &configDef{}
b.config.Workers = 1
b.config.WorkerType = workerType
b.config.PayloadVariants = []map[string]string{{}}
b.rate = 100
b.dataHeaders = []string{"head"}
b.dataReader = csv.NewReader(strings.NewReader(in))
b.logWriter = csv.NewWriter(logbuf)
outbuf := new(bytes.Buffer)
b.out = outbuf
return b, outbuf
}

logbuf := new(bytes.Buffer)
logbuf.WriteString("hash,result\n")
b, outbuf := defaultOptions(
ctx,
cancel,
"a\nb\nc",
"success",
outLog,
workerLog,
)

b, outbuf := newBlast("a\nb\nc", "success", logbuf)
must(t, b.start(ctx))
b.logWriter.Flush()

mustMatch(t, outbuf, 1, `\nSuccess\:\s+0\n`)
mustMatch(t, outbuf, 1, `\nSuccess\:\s+3\n`)
mustMatch(t, logbuf, 1, `3763b9c0e1b2307c\|c1377b027e806557\,true\n`)
mustMatch(t, logbuf, 1, `db7a669e37739bf\|b4a36ba02942a475\,true\n`)
mustMatch(t, logbuf, 1, `deb69562b047222\|3cec67420f8a6588\,true\n`)

b1, outbuf1 := newBlast("a\nb\nc\nd", "success", logbuf)
outLog.must(t, 1, []string{"3763b9c0e1b2307c|c1377b027e806557", "true"})
outLog.must(t, 2, []string{"db7a669e37739bf|b4a36ba02942a475", "true"})
outLog.must(t, 3, []string{"deb69562b047222|3cec67420f8a6588", "true"})

b1, outbuf1 := defaultOptions(
ctx,
cancel,
"a\nb\nc\nd",
"success",
outLog,
workerLog,
)

must(t, b1.loadPreviousLogsFromReader(bytes.NewBuffer(logbuf.Bytes())))
must(t, b1.loadPreviousLogsFromReader(outLog.reader()))
must(t, b1.start(ctx))
b1.logWriter.Flush()

mustMatch(t, outbuf1, 1, `\nSuccess\:\s+0\n`)
mustMatch(t, outbuf1, 1, `\nSuccess\:\s+1\n`)
mustMatch(t, outbuf1, 1, `\nSkipped\:\s+3 \(from previous run\)\n`)
mustMatch(t, logbuf, 1, `73d81ec7b7251e65\|fab9096e8c84809f\,true\n`)
outLog.must(t, 4, []string{"73d81ec7b7251e65|fab9096e8c84809f", "true"})

b2, outbuf2 := defaultOptions(
ctx,
cancel,
"e",
"fail",
outLog,
workerLog,
)

b2, outbuf2 := newBlast("e", "fail", logbuf)
must(t, b2.start(ctx))
b2.logWriter.Flush()
mustMatch(t, outbuf2, 1, `\nFailed\:\s+1\n`)
outLog.must(t, 5, []string{"21e8bfb8d271d28d|c546009ae399ca09", "false"})

//for i, v := range outLog.Log {
// fmt.Println(i, v)
//}

}

func TestPayloadVariants(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
workerLog := new(LoggingWorkerLog)
outLog := new(LoggingWriter)
outLog.Write([]string{"hash", "result"})

b, _ := defaultOptions(
ctx,
cancel,
"a\nb",
"success",
outLog,
workerLog,
)
b.config.PayloadTemplate = map[string]interface{}{
"v1": "{{head}}-{{p1}}",
"v2": "{{p2}}",
}
b.config.PayloadVariants = []map[string]string{
{"p1": "p1v1", "p2": "p2v1"},
{"p1": "p1v2", "p2": "p2v2"},
}
must(t, b.start(ctx))

workerLog.must(t, 0, map[string]string{"v1": "a-p1v1", "v2": "p2v1"})
workerLog.must(t, 1, map[string]string{"v1": "a-p1v2", "v2": "p2v2"})
workerLog.must(t, 2, map[string]string{"v1": "b-p1v1", "v2": "p2v1"})
workerLog.must(t, 3, map[string]string{"v1": "b-p1v2", "v2": "p2v2"})

}

func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
workerLog := new(LoggingWorkerLog)
outLog := new(LoggingWriter)
outLog.Write([]string{"hash", "result"})

b, _ := defaultOptions(
ctx,
cancel,
"a\nb\nc",
"hang",
outLog,
workerLog,
)
b.rate = 20
finished := make(chan struct{})
go func() {
must(t, b.start(ctx))
close(finished)
}()
<-time.After(time.Millisecond * 70) // rate is 20/sec, so first will fire at 50ms
b.cancel()
select {
case <-finished:
case <-time.After(time.Millisecond * 200):
t.Fatal("timeout")
}

workerLog.mustLen(t, 1)
workerLog.must(t, 0, map[string]string{"_cancelled": "true"})

//for i, v := range workerLog.Log {
// fmt.Println(i, v)
//}

}

Expand All @@ -80,26 +189,117 @@ func mustMatch(t *testing.T, buf *bytes.Buffer, num int, pattern string) {
}
}

type DummyCloser struct{}
type LoggingWriter struct {
Log [][]string
}

func (DummyCloser) Close() error { return nil }
func (l *LoggingWriter) Write(record []string) error {
l.Log = append(l.Log, record)
return nil
}
func (l *LoggingWriter) Flush() {}

func NewImmediateSuccessWorker() Worker {
return &ImmediateSuccessWorker{}
func (l *LoggingWriter) reader() *bytes.Buffer {
buf := &bytes.Buffer{}
w := csv.NewWriter(buf)
for _, v := range l.Log {
w.Write(v)
}
w.Flush()
return buf
}

type ImmediateSuccessWorker struct{}
func (l *LoggingWriter) must(t *testing.T, index int, expected []string) {
t.Helper()
record := l.Log[index]
if len(record) == len(expected) {
found := true
for i, value := range record {
if value != expected[i] {
found = false
break
}
}
if found {
return
}
}
t.Fatalf("Record %s not found at index %d in output log %s", expected, index, l.Log)
}

func (*ImmediateSuccessWorker) Send(context.Context, map[string]interface{}) error {
return nil
type LoggingWorkerLog struct {
Log []map[string]string
m sync.Mutex
}

func NewImmediateFailWorker() Worker {
return &ImmediateFailWorker{}
func (l *LoggingWorkerLog) mustLen(t *testing.T, length int) {
if len(l.Log) != length {
t.Fatalf("Worker log is not length %d:\n%v", length, l.Log)
}
}

type ImmediateFailWorker struct{}
func (l *LoggingWorkerLog) must(t *testing.T, index int, expected map[string]string) {
t.Helper()
record := l.Log[index]
if len(record) == len(expected) {
found := true
for k, value := range record {
if value != expected[k] {
found = false
break
}
}
if found {
return
}
}
t.Fatalf("Record %s not found at index %d in worker log %s", expected, index, l.Log)
}

func (*ImmediateFailWorker) Send(context.Context, map[string]interface{}) error {
func (l *LoggingWorkerLog) Append(message map[string]string) {
l.m.Lock()
defer l.m.Unlock()
l.Log = append(l.Log, message)
}

type LoggingWorker struct {
Result bool
Hang bool
Log *LoggingWorkerLog
}

func (l *LoggingWorkerLog) NewSuccess() Worker {
return &LoggingWorker{Log: l, Result: true}
}

func (l *LoggingWorkerLog) NewFail() Worker {
return &LoggingWorker{Log: l, Result: false}
}

func (l *LoggingWorkerLog) NewHang() Worker {
return &LoggingWorker{Log: l, Result: true, Hang: true}
}

func (l *LoggingWorker) Send(ctx context.Context, in map[string]interface{}) error {
log := map[string]string{}
if l.Hang {
select {
case <-time.After(time.Second):
log["_hung"] = "true"
case <-ctx.Done():
log["_cancelled"] = "true"
}
}
for k, v := range in {
log[k] = fmt.Sprint(v)
}
l.Log.Append(log)
if l.Result {
return nil
}
return errors.New("fail")
}

type DummyCloser struct{}

func (DummyCloser) Close() error { return nil }
2 changes: 1 addition & 1 deletion logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (b *Blaster) openLogAndInit() error {
b.logWriter = csv.NewWriter(logFile)
b.logCloser = logFile
if s.Size() == 0 {
fields := append([]string{"payload-hash", "result"}, b.config.LogData...)
fields := append([]string{"hash", "result"}, b.config.LogData...)
if err := b.logWriter.Write(fields); err != nil {
return errors.WithStack(err)
}
Expand Down
Loading

0 comments on commit dbe0e3e

Please sign in to comment.