Skip to content

Commit

Permalink
Make publishers publish on subjects with an offset when using --multi…
Browse files Browse the repository at this point in the history
…subject mode (#547)
  • Loading branch information
jnmoyne authored Jul 30, 2022
1 parent 06cd739 commit 7b15a8d
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions cli/bench_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func getPublishSubject(c *benchCmd, number int) string {
}
}

func coreNATSPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int) {
func coreNATSPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int, offset int) {

var m *nats.Msg
var err error
Expand All @@ -467,12 +467,12 @@ func coreNATSPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg
}

if !c.request {
err = nc.Publish(getPublishSubject(&c, i), msg)
err = nc.Publish(getPublishSubject(&c, i+offset), msg)
if err != nil {
log.Fatalf("Publish error: %v", err)
}
} else {
m, err = nc.Request(getPublishSubject(&c, i), msg, time.Second)
m, err = nc.Request(getPublishSubject(&c, i+offset), msg, time.Second)
if err != nil {
log.Fatalf("Request error %v", err)
}
Expand All @@ -486,7 +486,7 @@ func coreNATSPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg
state = "Finished "
}

func jsPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int) {
func jsPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte, numMsg int, offset int) {
js, err := nc.JetStream()
if err != nil {
log.Fatalf("Couldn't get the JetStream context: %v", err)
Expand All @@ -505,7 +505,7 @@ func jsPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte
state = "Publishing"
futures := make([]nats.PubAckFuture, min(c.pubBatch, c.numMsg-i))
for j := 0; j < c.pubBatch && i+j < c.numMsg; j++ {
futures[j], err = js.PublishAsync(getPublishSubject(&c, i+j), msg)
futures[j], err = js.PublishAsync(getPublishSubject(&c, i+j+offset), msg)
if err != nil {
log.Fatalf("PubAsync error: %v", err)
}
Expand Down Expand Up @@ -538,7 +538,7 @@ func jsPublisher(c benchCmd, nc *nats.Conn, progress *uiprogress.Bar, msg []byte
if progress != nil {
progress.Incr()
}
_, err = js.Publish(getPublishSubject(&c, i), msg)
_, err = js.Publish(getPublishSubject(&c, i+offset), msg)
if err != nil {
log.Fatalf("Publish: %v", err)
}
Expand Down Expand Up @@ -604,11 +604,11 @@ func (c *benchCmd) runPublisher(bm *bench.Benchmark, nc *nats.Conn, startwg *syn
start := time.Now()

if !c.js && !c.kv {
coreNATSPublisher(*c, nc, progress, msg, numMsg)
coreNATSPublisher(*c, nc, progress, msg, numMsg, offset)
} else if c.kv {
kvPutter(*c, nc, progress, msg, numMsg, offset)
} else if c.js {
jsPublisher(*c, nc, progress, msg, numMsg)
jsPublisher(*c, nc, progress, msg, numMsg, offset)
}

err := nc.Flush()
Expand Down

0 comments on commit 7b15a8d

Please sign in to comment.