diff --git a/blaster/blaster.go b/blaster/blaster.go index effc996..09e2ff2 100644 --- a/blaster/blaster.go +++ b/blaster/blaster.go @@ -69,12 +69,13 @@ type Blaster struct { payloadRenderer renderer workerRenderer renderer - mainChannel chan struct{} + mainChannel chan int errorChannel chan error workerChannel chan workDef logChannel chan logRecord dataFinishedChannel chan struct{} workersFinishedChannel chan struct{} + itemFinishedChannel chan struct{} changeRateChannel chan float64 signalChannel chan os.Signal @@ -157,7 +158,7 @@ func New(ctx context.Context, cancel context.CancelFunc) *Blaster { changeRateChannel: make(chan float64, 1), errorChannel: make(chan error), logChannel: make(chan logRecord), - mainChannel: make(chan struct{}), + mainChannel: make(chan int), workerChannel: make(chan workDef), Rate: 10, Workers: 10, @@ -252,8 +253,8 @@ func (b *Blaster) Start(ctx context.Context) (Summary, error) { panic("Must specify workers!") } - if b.Rate <= 0 { - panic("Must specify rate!") + if b.Rate < 0 { + panic("Rate must not be negative!") } err := b.start(ctx) diff --git a/blaster/blaster_test.go b/blaster/blaster_test.go index 6246597..61dba4d 100644 --- a/blaster/blaster_test.go +++ b/blaster/blaster_test.go @@ -5,7 +5,6 @@ import ( "context" "encoding/csv" "regexp" - "strings" "sync" "testing" @@ -13,154 +12,342 @@ import ( "time" + "io" + + "strings" + "github.com/pkg/errors" ) -func defaultOptions( - ctx context.Context, - cancel context.CancelFunc, - in string, - workerType string, - logWriter csvWriteFlusher, - workerLog *LoggingWorkerLog, -) (*Blaster, *ThreadSafeBuffer) { +func TestSuccess(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + b := New(ctx, cancel) + b.Rate = 0 // set rate to 0 so we can inject items synthetically + b.itemFinishedChannel = make(chan struct{}) + + workerLog := new(LoggingWorker) + b.SetWorker(workerLog.NewSuccess) + + finished := make(chan struct{}) + go func() { + must(t, b.start(ctx)) + close(finished) + }() + + // synthetically call the main channel, which is what the ticker would do + b.mainChannel <- 0 + <-b.itemFinishedChannel + + b.mainChannel <- 0 + <-b.itemFinishedChannel + + // start graceful exit process + close(b.dataFinishedChannel) + + // wait for the start method to finish + <-finished + + b.Exit() + + workerLog.mustLen(t, 2) + workerLog.must(t, 0, map[string]string{"_success": "true"}) + workerLog.must(t, 1, map[string]string{"_success": "true"}) + +} + +func TestFail(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + b := New(ctx, cancel) + b.Rate = 0 // set rate to 0 so we can inject items synthetically + b.itemFinishedChannel = make(chan struct{}) + + worker := new(LoggingWorker) + b.SetWorker(worker.NewFail) + + finished := make(chan struct{}) + go func() { + must(t, b.start(ctx)) + close(finished) + }() + + // synthetically call the main channel, which is what the ticker would do + b.mainChannel <- 0 + <-b.itemFinishedChannel + + b.mainChannel <- 0 + <-b.itemFinishedChannel + + // start graceful exit process + close(b.dataFinishedChannel) + + // wait for the start method to finish + <-finished + + b.Exit() + + worker.mustLen(t, 2) + worker.must(t, 0, map[string]string{"_success": "false"}) + worker.must(t, 1, map[string]string{"_success": "false"}) + +} + +func TestHung(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + b := New(ctx, cancel) + b.Rate = 0 // set rate to 0 so we can inject items synthetically + b.SetTimeout(200 * time.Millisecond) + b.itemFinishedChannel = make(chan struct{}) + + worker := new(LoggingWorker) + b.SetWorker(worker.NewHang(100)) + + finished := make(chan struct{}) + go func() { + must(t, b.start(ctx)) + close(finished) + }() + + // synthetically call the main channel, which is what the ticker would do + b.mainChannel <- 0 + <-b.itemFinishedChannel + + // start graceful exit process + close(b.dataFinishedChannel) + + // wait for the start method to finish + <-finished + + b.Exit() + + worker.mustLen(t, 1) + worker.must(t, 0, map[string]string{"_hung": "true"}) + +} + +func TestTimeout(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) b := New(ctx, cancel) - b.RegisterWorkerType("success", workerLog.NewSuccess) - b.RegisterWorkerType("fail", workerLog.NewFail) - b.RegisterWorkerType("hang", workerLog.NewHang) - b.Initialise(ctx, Config{ - Rate: 100, - Resume: true, - Workers: 1, - WorkerType: workerType, - Headers: []string{"head"}, - }) - b.SetData(strings.NewReader(in)) - b.logWriter = logWriter - outbuf := new(ThreadSafeBuffer) - b.SetOutput(outbuf) - return b, outbuf + b.Rate = 0 // set rate to 0 so we can inject items synthetically + b.SetTimeout(5 * time.Millisecond) + b.itemFinishedChannel = make(chan struct{}) + + worker := new(LoggingWorker) + b.SetWorker(worker.NewHang(500)) + + finished := make(chan struct{}) + go func() { + must(t, b.start(ctx)) + close(finished) + }() + + // synthetically call the main channel, which is what the ticker would do + b.mainChannel <- 0 + <-b.itemFinishedChannel + + // start graceful exit process + close(b.dataFinishedChannel) + + // wait for the start method to finish + <-finished + + b.Exit() + + worker.mustLen(t, 1) + worker.must(t, 0, map[string]string{"_cancelled": "true"}) + } -func TestNew(t *testing.T) { +func TestCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) - workerLog := new(LoggingWorkerLog) - outLog := new(LoggingWriter) - outLog.Write([]string{"hash", "result"}) - - b, outbuf := defaultOptions( - ctx, - cancel, - "a\nb\nc", - "success", - outLog, - workerLog, - ) - - must(t, b.start(ctx)) - - mustMatch(t, outbuf, 1, `\n\[success\]\s*\n---------\s*\nCount\:\s+3\s`) - - outLog.must(t, 1, []string{"45583464115695f2|e60a15c85c691ab8", "true"}) - outLog.must(t, 2, []string{"6258a554f446f0a7|4111d6d36a631a68", "true"}) - outLog.must(t, 3, []string{"d0e4144aef1f25ee|f44a70605aeac064", "true"}) - - b1, outbuf1 := defaultOptions( - ctx, - cancel, - "a\nb\nc\nd", - "success", - outLog, - workerLog, - ) - - must(t, b1.LoadLogs(outLog.reader())) - must(t, b1.start(ctx)) - - mustMatch(t, outbuf1, 1, `\n\[success\]\s*\n---------\s*\nCount\:\s+1\s`) - mustMatch(t, outbuf1, 1, `\nSkipped\:\s+3 from previous runs`) - outLog.must(t, 4, []string{"b0528e8eb39663df|9010bda07e0d725b", "true"}) - - b2, outbuf2 := defaultOptions( - ctx, - cancel, - "e", - "fail", - outLog, - workerLog, - ) - - must(t, b2.start(ctx)) - mustMatch(t, outbuf2, 1, `\n\[fail\]\s*\n------\s*\nCount\:\s+1\s`) - outLog.must(t, 5, []string{"d91d9c633503397f|8ecfa63bc2072fe5", "false"}) + b := New(ctx, cancel) + b.Rate = 0 // set rate to 0 so we can inject items synthetically + b.SetTimeout(500 * time.Millisecond) + + worker := new(LoggingWorker) + b.SetWorker(worker.NewHang(400)) + + finished := make(chan struct{}) + go func() { + must(t, b.start(ctx)) + close(finished) + }() + + // synthetically call the main channel, which is what the ticker would do + b.mainChannel <- 0 + + b.Exit() + + // wait for the start method to finish + <-finished + + worker.mustLen(t, 1) + worker.must(t, 0, map[string]string{"_cancelled": "true"}) + +} + +func TestLog(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + b := New(ctx, cancel) + b.Rate = 0 // set rate to 0 so we can inject items synthetically + b.itemFinishedChannel = make(chan struct{}) + + worker := new(LoggingWorker) + b.SetWorker(worker.NewSuccess) + + log := &LoggingWriter{buf: new(bytes.Buffer)} + b.SetLog(log) + + out := new(ThreadSafeBuffer) + b.SetOutput(out) + + b.Headers = []string{"head"} + b.SetData(strings.NewReader("a\nb")) + + finished := make(chan struct{}) + go func() { + must(t, b.start(ctx)) + close(finished) + }() + + // synthetically call the main channel, which is what the ticker would do + b.mainChannel <- 0 + <-b.itemFinishedChannel + + b.mainChannel <- 0 + <-b.itemFinishedChannel + + // another tick and the data will reach EOF, and gracefully exit + b.mainChannel <- 0 + + // wait for the start method to finish + <-finished + + b.Exit() + + worker.mustLen(t, 2) + worker.must(t, 0, map[string]string{"_success": "true"}) + worker.must(t, 1, map[string]string{"_success": "true"}) + + log.mustLen(t, 2) + log.must(t, 0, []string{"45583464115695f2|e60a15c85c691ab8", "true"}) + log.must(t, 1, []string{"6258a554f446f0a7|4111d6d36a631a68", "true"}) + + mustMatch(t, out, 1, `\n\[success\]\s*\n---------\s*\nCount\:\s+2\s`) + +} + +func TestResume(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + b := New(ctx, cancel) + b.Resume = true + b.Rate = 0 // set rate to 0 so we can inject items synthetically + b.itemFinishedChannel = make(chan struct{}) + + worker := new(LoggingWorker) + b.SetWorker(worker.NewSuccess) + + log := &LoggingWriter{buf: new(bytes.Buffer)} + b.SetLog(log) + + out := new(ThreadSafeBuffer) + b.SetOutput(out) + + b.Headers = []string{"head"} + b.SetData(strings.NewReader("a\nb\nc")) + + // In this log fragment, second item failed on first run so will retry: + must(t, b.LoadLogs(bytes.NewBufferString("hash,result\n45583464115695f2|e60a15c85c691ab8,true\n6258a554f446f0a7|4111d6d36a631a68,false"))) + + finished := make(chan struct{}) + go func() { + must(t, b.start(ctx)) + close(finished) + }() + + // this will skip the first item and complete the second item + b.mainChannel <- 0 + <-b.itemFinishedChannel + + // this complete the third item + b.mainChannel <- 0 + <-b.itemFinishedChannel + + // another tick and the data will reach EOF, and gracefully exit + b.mainChannel <- 0 + + // wait for the start method to finish + <-finished + + b.Exit() + + mustMatch(t, out, 1, `\n\[success\]\s*\n---------\s*\nCount\:\s+2\s`) + mustMatch(t, out, 1, `\nSkipped\:\s+1 from previous runs`) + + log.mustLen(t, 2) + log.must(t, 0, []string{"6258a554f446f0a7|4111d6d36a631a68", "true"}) + log.must(t, 1, []string{"d0e4144aef1f25ee|f44a70605aeac064", "true"}) } func TestPayloadVariants(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) - workerLog := new(LoggingWorkerLog) - outLog := new(LoggingWriter) - outLog.Write([]string{"hash", "result"}) - - b, _ := defaultOptions( - ctx, - cancel, - "a\nb", - "success", - outLog, - workerLog, - ) + b := New(ctx, cancel) + b.Rate = 0 // set rate to 0 so we can inject items synthetically + b.itemFinishedChannel = make(chan struct{}) + must(t, b.SetPayloadTemplate(map[string]interface{}{ "v1": "{{.head}}-{{.p1}}", "v2": "{{.p2}}", })) + b.PayloadVariants = []map[string]string{ {"p1": "p1v1", "p2": "p2v1"}, {"p1": "p1v2", "p2": "p2v2"}, } - must(t, b.start(ctx)) - workerLog.must(t, 0, map[string]string{"v1": "a-p1v1", "v2": "p2v1"}) - workerLog.must(t, 1, map[string]string{"v1": "a-p1v2", "v2": "p2v2"}) - workerLog.must(t, 2, map[string]string{"v1": "b-p1v1", "v2": "p2v1"}) - workerLog.must(t, 3, map[string]string{"v1": "b-p1v2", "v2": "p2v2"}) + b.Headers = []string{"head"} + b.SetData(strings.NewReader("a\nb")) -} + worker := new(LoggingWorker) + b.SetWorker(worker.NewSuccess) -func TestCancel(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - workerLog := new(LoggingWorkerLog) - outLog := new(LoggingWriter) - outLog.Write([]string{"hash", "result"}) - - b, _ := defaultOptions( - ctx, - cancel, - "a\nb\nc", - "hang", - outLog, - workerLog, - ) - b.Rate = 20 finished := make(chan struct{}) go func() { must(t, b.start(ctx)) close(finished) }() - <-time.After(time.Millisecond * 70) // rate is 20/sec, so first will fire at 50ms - b.cancel() - select { - case <-finished: - case <-time.After(time.Millisecond * 200): - t.Fatal("timeout") - } - workerLog.mustLen(t, 1) - workerLog.must(t, 0, map[string]string{"_cancelled": "true"}) + // each signal on the main channel will complete all the payload variants of an item, but + // itemFinishedChannel needs to be read once for each variant + b.mainChannel <- 0 + <-b.itemFinishedChannel + <-b.itemFinishedChannel + + b.mainChannel <- 0 + <-b.itemFinishedChannel + <-b.itemFinishedChannel - //for i, v := range workerLog.Log { - // fmt.Println(i, v) - //} + // another tick and the data will reach EOF, and gracefully exit + b.mainChannel <- 0 + + // wait for the start method to finish + <-finished + + b.Exit() + + worker.mustLen(t, 4) + worker.must(t, -1, map[string]string{"_success": "true", "v1": "a-p1v1", "v2": "p2v1"}) + worker.must(t, -1, map[string]string{"_success": "true", "v1": "a-p1v2", "v2": "p2v2"}) + worker.must(t, -1, map[string]string{"_success": "true", "v1": "b-p1v1", "v2": "p2v1"}) + worker.must(t, -1, map[string]string{"_success": "true", "v1": "b-p1v2", "v2": "p2v2"}) } @@ -185,33 +372,65 @@ func mustMatch(t *testing.T, buf *ThreadSafeBuffer, num int, pattern string) { } type LoggingWriter struct { - Log [][]string + buf *bytes.Buffer +} + +func (l *LoggingWriter) Write(p []byte) (n int, err error) { + return l.buf.Write(p) } func (l *LoggingWriter) Debug() { - for _, v := range l.Log { - fmt.Println(v) + reader := csv.NewReader(bytes.NewBuffer(l.buf.Bytes())) + for { + r, err := reader.Read() + if err != nil { + if err == io.EOF { + break + } else { + fmt.Println(err.Error()) + } + } + fmt.Println(r) } } -func (l *LoggingWriter) Write(record []string) error { - l.Log = append(l.Log, record) - return nil -} -func (l *LoggingWriter) Flush() {} -func (l *LoggingWriter) reader() *bytes.Buffer { - buf := &bytes.Buffer{} - w := csv.NewWriter(buf) - for _, v := range l.Log { - w.Write(v) +func (l *LoggingWriter) mustLen(t *testing.T, expected int) { + t.Helper() + var log [][]string + reader := csv.NewReader(bytes.NewBuffer(l.buf.Bytes())) + for { + r, err := reader.Read() + if err != nil { + if err == io.EOF { + break + } else { + fmt.Println(err.Error()) + } + } + log = append(log, r) + } + if expected != len(log) { + t.Fatalf("Log is not length %d:\n%v", expected, log) } - w.Flush() - return buf } func (l *LoggingWriter) must(t *testing.T, index int, expected []string) { t.Helper() - record := l.Log[index] + var log [][]string + reader := csv.NewReader(bytes.NewBuffer(l.buf.Bytes())) + for { + r, err := reader.Read() + if err != nil { + if err == io.EOF { + break + } else { + fmt.Println(err.Error()) + } + } + log = append(log, r) + } + + record := log[index] if len(record) == len(expected) { found := true for i, value := range record { @@ -224,71 +443,95 @@ func (l *LoggingWriter) must(t *testing.T, index int, expected []string) { return } } - t.Fatalf("Record %s not found at index %d in output log %s", expected, index, l.Log) + t.Fatalf("Record %s not found at index %d in output log %s", expected, index, log) } -type LoggingWorkerLog struct { +type LoggingWorker struct { Log []map[string]string m sync.Mutex } -func (l *LoggingWorkerLog) mustLen(t *testing.T, length int) { +func (l *LoggingWorker) Debug() { + for _, r := range l.Log { + fmt.Println(r) + } +} + +func (l *LoggingWorker) mustLen(t *testing.T, length int) { if len(l.Log) != length { t.Fatalf("Worker log is not length %d:\n%v", length, l.Log) } } -func (l *LoggingWorkerLog) must(t *testing.T, index int, expected map[string]string) { +func (l *LoggingWorker) must(t *testing.T, index int, expected map[string]string) { t.Helper() - record := l.Log[index] - if len(record) == len(expected) { - found := true - for k, value := range record { - if value != expected[k] { - found = false - break + compare := func(record map[string]string) bool { + if len(record) == len(expected) { + found := true + for k, value := range record { + if value != expected[k] { + found = false + break + } + } + if found { + return true } } - if found { + return false + } + if index > -1 { + if compare(l.Log[index]) { return } + t.Fatalf("Record %s not found at index %d in worker log %s", expected, index, l.Log) + } else { + for _, record := range l.Log { + if compare(record) { + return + } + } + t.Fatalf("Record %s not found in worker log %s", expected, l.Log) } - t.Fatalf("Record %s not found at index %d in worker log %s", expected, index, l.Log) } -func (l *LoggingWorkerLog) Append(message map[string]string) { +func (l *LoggingWorker) Append(message map[string]string) { l.m.Lock() defer l.m.Unlock() l.Log = append(l.Log, message) } -type LoggingWorker struct { +type loggingWorker struct { Result bool - Hang bool - Log *LoggingWorkerLog + Hang int + Log *LoggingWorker } -func (l *LoggingWorkerLog) NewSuccess() Worker { - return &LoggingWorker{Log: l, Result: true} +func (l *LoggingWorker) NewSuccess() Worker { + return &loggingWorker{Log: l, Result: true} } -func (l *LoggingWorkerLog) NewFail() Worker { - return &LoggingWorker{Log: l, Result: false} +func (l *LoggingWorker) NewFail() Worker { + return &loggingWorker{Log: l, Result: false} } -func (l *LoggingWorkerLog) NewHang() Worker { - return &LoggingWorker{Log: l, Result: true, Hang: true} +func (l *LoggingWorker) NewHang(duration int) func() Worker { + return func() Worker { return &loggingWorker{Log: l, Result: true, Hang: duration} } } -func (l *LoggingWorker) Send(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) { +func (l *loggingWorker) Send(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) { log := map[string]string{} - if l.Hang { + if l.Hang > 0 { select { - case <-time.After(time.Second): + case <-time.After(time.Duration(l.Hang) * time.Millisecond): log["_hung"] = "true" case <-ctx.Done(): log["_cancelled"] = "true" } + } else if l.Result { + log["_success"] = "true" + } else { + log["_success"] = "false" } for k, v := range in { log[k] = fmt.Sprint(v) diff --git a/blaster/config.go b/blaster/config.go index 139b91b..6e00b7d 100644 --- a/blaster/config.go +++ b/blaster/config.go @@ -261,9 +261,7 @@ func (b *Blaster) unmarshalConfig(c *Config) error { // Initialise configures the Blaster with config options in a provided Config func (b *Blaster) Initialise(ctx context.Context, c Config) error { - if c.Rate != 0 { - b.Rate = c.Rate - } + b.Rate = c.Rate if c.Workers != 0 { b.Workers = c.Workers } diff --git a/blaster/examples_test.go b/blaster/examples_test.go index fd9a185..46e7a13 100644 --- a/blaster/examples_test.go +++ b/blaster/examples_test.go @@ -32,9 +32,11 @@ func ExampleBlaster_Start_batchJob() { fmt.Println(err.Error()) return } - fmt.Printf("%#v", summary) + fmt.Printf("Success == 2: %v\n", summary.Success == 2) + fmt.Printf("Fail == 0: %v", summary.Fail == 0) // Output: - // blaster.Summary{Success:2, Fail:0} + // Success == 2: true + // Fail == 0: true } func ExampleBlaster_Start_loadTest() { @@ -48,7 +50,7 @@ func ExampleBlaster_Start_loadTest() { }, } }) - b.Rate = 100 + b.Rate = 1000 wg := &sync.WaitGroup{} wg.Add(1) go func() { @@ -57,12 +59,14 @@ func ExampleBlaster_Start_loadTest() { fmt.Println(err.Error()) return } - fmt.Printf("Fail: %d", summary.Fail) + fmt.Printf("Success > 10: %v\n", summary.Success > 10) + fmt.Printf("Fail == 0: %v", summary.Fail == 0) wg.Done() }() <-time.After(time.Millisecond * 100) b.Exit() wg.Wait() // Output: - // Fail: 0 + // Success > 10: true + // Fail == 0: true } diff --git a/blaster/loop-main.go b/blaster/loop-main.go index 1763f97..cb6b833 100644 --- a/blaster/loop-main.go +++ b/blaster/loop-main.go @@ -21,7 +21,10 @@ func (b *Blaster) startMainLoop(ctx context.Context) { select { case <-ctx.Done(): return - case <-b.mainChannel: + case <-b.dataFinishedChannel: + // If dataFinishedChannel is closed externally (e.g. in tests), we should return. + return + case segment := <-b.mainChannel: for { var record []string if b.dataReader != nil { @@ -76,7 +79,7 @@ func (b *Blaster) startMainLoop(ctx context.Context) { skipped = false - b.workerChannel <- workDef{data: data, hash: hash} + b.workerChannel <- workDef{data: data, hash: hash, segment: segment} } if skipped { // if we've skipped all variants, continue with the next item immediately @@ -90,6 +93,7 @@ func (b *Blaster) startMainLoop(ctx context.Context) { } type workDef struct { - data map[string]string - hash farmhash.Uint128 + segment int + data map[string]string + hash farmhash.Uint128 } diff --git a/blaster/loop-ticker.go b/blaster/loop-ticker.go index 6281f48..f62bc57 100644 --- a/blaster/loop-ticker.go +++ b/blaster/loop-ticker.go @@ -8,31 +8,69 @@ import ( func (b *Blaster) startTickerLoop(ctx context.Context) { b.mainWait.Add(1) - ticker := time.NewTicker(time.Second / time.Duration(b.Rate/float64(len(b.PayloadVariants)))) + + var ticker *time.Ticker + + updateTicker := func() { + if b.Rate == 0 { + ticker = &time.Ticker{} // empty *time.Ticker will have nil C, so block forever. + return + } + ticker = time.NewTicker(time.Second / time.Duration(b.Rate/float64(len(b.PayloadVariants)))) + } + + changeRate := func(rate float64) { + b.Rate = rate + if ticker != nil { + ticker.Stop() + } + b.metrics.addSegment(b.Rate) + updateTicker() + b.printStatus(false) + } + + updateTicker() go func() { defer b.mainWait.Done() defer b.println("Exiting ticker loop") + defer func() { + if ticker != nil { + ticker.Stop() + } + }() for { - <-ticker.C + + // First wait for a tick... but we should also wait for an exit signal, data finished + // signal or rate change command (we could be waiting forever on rate = 0). select { + case <-ticker.C: + // continue case <-ctx.Done(): - ticker.Stop() return case <-b.dataFinishedChannel: - ticker.Stop() return case rate := <-b.changeRateChannel: - b.Rate = rate - ticker.Stop() - ticker = time.NewTicker(time.Second / time.Duration(b.Rate/float64(len(b.PayloadVariants)))) - b.metrics.addSegment(b.Rate) - b.printStatus(false) - case b.mainChannel <- struct{}{}: + // Restart the for loop after a rate change. If rate == 0, we may not want to send + // any more. + changeRate(rate) + continue + } + + segment := b.metrics.currentSegment() + + // Next send on the main channel. The channel won't have a listener if there is no idle + // worker. In this case we should continue and log a miss. + select { + case b.mainChannel <- segment: // if main loop is waiting, send it a message + case <-ctx.Done(): + return + case <-b.dataFinishedChannel: + return default: // if main loop is busy, skip this tick - b.metrics.logMiss() + b.metrics.logMiss(segment) } } }() diff --git a/blaster/loop-worker.go b/blaster/loop-worker.go index 583bcf5..a2ff7cb 100644 --- a/blaster/loop-worker.go +++ b/blaster/loop-worker.go @@ -65,6 +65,10 @@ func (b *Blaster) startWorkers(ctx context.Context) { b.error(err) return } + if b.itemFinishedChannel != nil { + // only used in tests + b.itemFinishedChannel <- struct{}{} + } } } }(i) @@ -73,10 +77,8 @@ func (b *Blaster) startWorkers(ctx context.Context) { func (b *Blaster) send(ctx context.Context, w Worker, work workDef) error { - currentSegment := b.metrics.currentSegment() - b.metrics.logStart(currentSegment) - - b.metrics.logBusy(currentSegment) + b.metrics.logStart(work.segment) + b.metrics.logBusy(work.segment) b.metrics.busy.Inc(1) defer b.metrics.busy.Dec(1) @@ -141,7 +143,7 @@ func (b *Blaster) send(ctx context.Context, w Worker, work workDef) error { if val == "" { val = "(none)" } - b.metrics.logFinish(currentSegment, val, time.Since(start), success) + b.metrics.logFinish(work.segment, val, time.Since(start), success) if b.logWriter != nil { var fields []string diff --git a/blaster/metrics.go b/blaster/metrics.go index 69e70c4..0cbbe6d 100644 --- a/blaster/metrics.go +++ b/blaster/metrics.go @@ -39,11 +39,11 @@ func newMetricsDef(b *Blaster) *metricsDef { return m } -func (m *metricsDef) logMiss() { +func (m *metricsDef) logMiss(segment int) { m.sync.RLock() defer m.sync.RUnlock() m.all.missed.Inc(1) - m.segments[m.current].missed.Inc(1) + m.segments[segment].missed.Inc(1) } func (m *metricsDef) logBusy(segment int) {