diff --git a/cmd/benchmark/test_command.go b/cmd/benchmark/test_command.go index 1b341b643..32beac2c3 100644 --- a/cmd/benchmark/test_command.go +++ b/cmd/benchmark/test_command.go @@ -60,6 +60,11 @@ var ( Name: "print-json", Usage: "Print json output if it is set", } + flagPipelineSize = &cli.IntFlag{ + Name: "pipeline-size", + Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.", + Value: 0, + } ) func newCommandTest() *cli.Command { @@ -77,6 +82,7 @@ func newCommandTest() *cli.Command { flagDuration, flagReportInterval, flagPrintJSON, + flagPipelineSize, }, Action: runCommandTest, } @@ -109,6 +115,7 @@ func runCommandTest(c *cli.Context) error { return fmt.Errorf("malformed target %s: invalid log stream %s", str, toks[1]) } } + target.PipelineSize = c.Int(flagPipelineSize.Name) targets[idx] = target } diff --git a/internal/benchmark/loader.go b/internal/benchmark/loader.go index b086a06ec..805242903 100644 --- a/internal/benchmark/loader.go +++ b/internal/benchmark/loader.go @@ -120,50 +120,110 @@ func (loader *Loader) Close() error { return err } -func (loader *Loader) appendLoop(ctx context.Context, c varlog.Log) error { +func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func(), closeFunc func()) { begin := true - var am AppendMetrics - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: + notifyBegin := func(meta varlogpb.LogEntryMeta) { + loader.begin.ch <- varlogpb.LogSequenceNumber{ + LLSN: meta.LLSN, + GLSN: meta.GLSN, } + begin = false + } - ts := time.Now() - var res varlog.AppendResult - if loader.LogStreamID.Invalid() { - res = c.Append(ctx, loader.TopicID, loader.batch) - } else { - res = c.AppendTo(ctx, loader.TopicID, loader.LogStreamID, loader.batch) - } - dur := time.Since(ts) - if res.Err != nil { - return fmt.Errorf("append: %w", res.Err) - } - if begin { - loader.begin.ch <- varlogpb.LogSequenceNumber{ - LLSN: res.Metadata[0].LLSN, - GLSN: res.Metadata[0].GLSN, - } - begin = false - } - cnt := len(res.Metadata) + debugLog := func(meta []varlogpb.LogEntryMeta) { + cnt := len(meta) + loader.logger.Debug("append", + slog.Int("count", cnt), + slog.Any("logstream", meta[0].LogStreamID), + slog.Any("firstGLSN", meta[0].GLSN), + slog.Any("lastGLSN", meta[cnt-1].GLSN), + slog.Any("firstLLSN", meta[0].LLSN), + slog.Any("lastLLSN", meta[cnt-1].LLSN), + ) + } + + recordMetrics := func(dur time.Duration) { am.bytes += int64(loader.BatchSize * loader.MessageSize) am.requests++ am.durationMS += dur.Milliseconds() - if loader.metrics.ReportAppendMetrics(am) { - am = AppendMetrics{} + if loader.metrics.ReportAppendMetrics(*am) { + *am = AppendMetrics{} } + } - loader.logger.Debug("append", - slog.Int("count", cnt), - slog.Any("logstream", res.Metadata[0].LogStreamID), - slog.Any("firstGLSN", res.Metadata[0].GLSN), - slog.Any("lastGLSN", res.Metadata[cnt-1].GLSN), - slog.Any("firstLLSN", res.Metadata[0].LLSN), - slog.Any("lastLLSN", res.Metadata[cnt-1].LLSN), - ) + tpid, lsid := loader.TopicID, loader.LogStreamID + + closeFunc = func() {} + if lsid.Invalid() { + return func() { + ts := time.Now() + res := c.Append(ctx, loader.TopicID, loader.batch) + if res.Err != nil { + panic(res.Err) + } + dur := time.Since(ts) + recordMetrics(dur) + if begin { + notifyBegin(res.Metadata[0]) + } + debugLog(res.Metadata) + }, closeFunc + } + + if loader.PipelineSize == 0 { + return func() { + ts := time.Now() + res := c.AppendTo(ctx, tpid, lsid, loader.batch) + if res.Err != nil { + panic(res.Err) + } + dur := time.Since(ts) + recordMetrics(dur) + if begin { + notifyBegin(res.Metadata[0]) + } + debugLog(res.Metadata) + }, closeFunc + } + + lsa, err := c.NewLogStreamAppender(loader.TopicID, loader.LogStreamID, + varlog.WithPipelineSize(loader.PipelineSize), + ) + if err != nil { + panic(err) + } + closeFunc = lsa.Close + return func() { + ts := time.Now() + err := lsa.AppendBatch(loader.batch, func(lem []varlogpb.LogEntryMeta, err error) { + if err != nil { + panic(err) + } + dur := time.Since(ts) + recordMetrics(dur) + if begin { + notifyBegin(lem[0]) + } + debugLog(lem) + }) + if err != nil { + panic(err) + } + }, closeFunc +} + +func (loader *Loader) appendLoop(ctx context.Context, c varlog.Log) error { + var am AppendMetrics + appendFunc, closeFunc := loader.makeAppendFunc(ctx, c, &am) + defer closeFunc() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + appendFunc() + } } } diff --git a/internal/benchmark/target.go b/internal/benchmark/target.go index 8c92990f0..ec2f516f9 100644 --- a/internal/benchmark/target.go +++ b/internal/benchmark/target.go @@ -14,6 +14,7 @@ type Target struct { BatchSize uint AppendersCount uint SubscribersCount uint + PipelineSize int } func (tgt Target) Valid() error {