diff --git a/cli/bench_command.go b/cli/bench_command.go index f293146a..7dd9f6a8 100644 --- a/cli/bench_command.go +++ b/cli/bench_command.go @@ -445,7 +445,7 @@ func getPublishSubject(c *benchCmd, number int) string { } } -func coreNATSPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int) { +func coreNATSPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int, offset int) { var m *nats.Msg var err error @@ -467,12 +467,12 @@ func coreNATSPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg } if !c.request { - err = nc.Publish(getPublishSubject(&c, i), msg) + err = nc.Publish(getPublishSubject(&c, i+offset), msg) if err != nil { log.Fatalf("Publish error: %v", err) } } else { - m, err = nc.Request(getPublishSubject(&c, i), msg, time.Second) + m, err = nc.Request(getPublishSubject(&c, i+offset), msg, time.Second) if err != nil { log.Fatalf("Request error %v", err) } @@ -486,7 +486,7 @@ func coreNATSPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg state = "Finished " } -func jsPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int) { +func jsPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int, offset int) { js, err := nc.JetStream() if err != nil { log.Fatalf("Couldn't get the JetStream context: %v", err) @@ -505,7 +505,7 @@ func jsPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte state = "Publishing" futures := make([]nats.PubAckFuture, min(c.pubBatch, c.numMsg-i)) for j := 0; j < c.pubBatch && i+j < c.numMsg; j++ { - futures[j], err = js.PublishAsync(getPublishSubject(&c, i+j), msg) + futures[j], err = js.PublishAsync(getPublishSubject(&c, i+j+offset), msg) if err != nil { log.Fatalf("PubAsync error: %v", err) } @@ -538,7 +538,7 @@ func jsPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte if progress != nil { progress.Incr() } - _, err = js.Publish(getPublishSubject(&c, i), msg) + _, err = js.Publish(getPublishSubject(&c, i+offset), msg) if err != nil { log.Fatalf("Publish: %v", err) } @@ -604,11 +604,11 @@ func (c *benchCmd) runPublisher(bm *bench.Benchmark, nc *nats.Conn, startwg *syn start := time.Now() if !c.js && !c.kv { - coreNATSPublisher(*c, nc, progress, msg, numMsg) + coreNATSPublisher(*c, nc, progress, msg, numMsg, offset) } else if c.kv { kvPutter(*c, nc, progress, msg, numMsg, offset) } else if c.js { - jsPublisher(*c, nc, progress, msg, numMsg) + jsPublisher(*c, nc, progress, msg, numMsg, offset) } err := nc.Flush()