Skip to content

Commit

Permalink
test(benchmark): support log stream appender
Browse files Browse the repository at this point in the history
This PR adds the `--pipeline-size` flag to the benchmark tool. It makes the benchmark test client
use LogStreamAppender, which allows pipelined append requests.

- No pipelined requests if the flag is zero (default value).
- Use pipelined requests by using LogStreamAppender if the flag is greater than zero.

The benchmark tool cannot support per-target level pipelined yet. Thus, if the `--pipeline-size` is
greater than zero, all the log stream-specific targets will use LogStreamAppender.
  • Loading branch information
ijsong committed Jun 4, 2023
1 parent f8a7676 commit 53b2258
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 36 deletions.
7 changes: 7 additions & 0 deletions cmd/benchmark/test_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ var (
Name: "print-json",
Usage: "Print json output if it is set",
}
flagPipelineSize = &cli.IntFlag{
Name: "pipeline-size",
Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.",
Value: 0,
}
)

func newCommandTest() *cli.Command {
Expand All @@ -77,6 +82,7 @@ func newCommandTest() *cli.Command {
flagDuration,
flagReportInterval,
flagPrintJSON,
flagPipelineSize,
},
Action: runCommandTest,
}
Expand Down Expand Up @@ -109,6 +115,7 @@ func runCommandTest(c *cli.Context) error {
return fmt.Errorf("malformed target %s: invalid log stream %s", str, toks[1])
}
}
target.PipelineSize = c.Int(flagPipelineSize.Name)
targets[idx] = target
}

Expand Down
132 changes: 96 additions & 36 deletions internal/benchmark/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,50 +120,110 @@ func (loader *Loader) Close() error {
return err
}

func (loader *Loader) appendLoop(ctx context.Context, c varlog.Log) error {
func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func(), closeFunc func()) {
begin := true
var am AppendMetrics
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
notifyBegin := func(meta varlogpb.LogEntryMeta) {
loader.begin.ch <- varlogpb.LogSequenceNumber{
LLSN: meta.LLSN,
GLSN: meta.GLSN,
}
begin = false
}

ts := time.Now()
var res varlog.AppendResult
if loader.LogStreamID.Invalid() {
res = c.Append(ctx, loader.TopicID, loader.batch)
} else {
res = c.AppendTo(ctx, loader.TopicID, loader.LogStreamID, loader.batch)
}
dur := time.Since(ts)
if res.Err != nil {
return fmt.Errorf("append: %w", res.Err)
}
if begin {
loader.begin.ch <- varlogpb.LogSequenceNumber{
LLSN: res.Metadata[0].LLSN,
GLSN: res.Metadata[0].GLSN,
}
begin = false
}
cnt := len(res.Metadata)
debugLog := func(meta []varlogpb.LogEntryMeta) {
cnt := len(meta)
loader.logger.Debug("append",
slog.Int("count", cnt),
slog.Any("logstream", meta[0].LogStreamID),
slog.Any("firstGLSN", meta[0].GLSN),
slog.Any("lastGLSN", meta[cnt-1].GLSN),
slog.Any("firstLLSN", meta[0].LLSN),
slog.Any("lastLLSN", meta[cnt-1].LLSN),
)
}

recordMetrics := func(dur time.Duration) {
am.bytes += int64(loader.BatchSize * loader.MessageSize)
am.requests++
am.durationMS += dur.Milliseconds()
if loader.metrics.ReportAppendMetrics(am) {
am = AppendMetrics{}
if loader.metrics.ReportAppendMetrics(*am) {
*am = AppendMetrics{}
}
}

loader.logger.Debug("append",
slog.Int("count", cnt),
slog.Any("logstream", res.Metadata[0].LogStreamID),
slog.Any("firstGLSN", res.Metadata[0].GLSN),
slog.Any("lastGLSN", res.Metadata[cnt-1].GLSN),
slog.Any("firstLLSN", res.Metadata[0].LLSN),
slog.Any("lastLLSN", res.Metadata[cnt-1].LLSN),
)
tpid, lsid := loader.TopicID, loader.LogStreamID

closeFunc = func() {}
if lsid.Invalid() {
return func() {
ts := time.Now()
res := c.Append(ctx, loader.TopicID, loader.batch)
if res.Err != nil {
panic(res.Err)
}
dur := time.Since(ts)
recordMetrics(dur)
if begin {
notifyBegin(res.Metadata[0])
}
debugLog(res.Metadata)
}, closeFunc
}

if loader.PipelineSize == 0 {
return func() {
ts := time.Now()
res := c.AppendTo(ctx, tpid, lsid, loader.batch)
if res.Err != nil {
panic(res.Err)
}
dur := time.Since(ts)
recordMetrics(dur)
if begin {
notifyBegin(res.Metadata[0])
}
debugLog(res.Metadata)
}, closeFunc
}

lsa, err := c.NewLogStreamAppender(loader.TopicID, loader.LogStreamID,
varlog.WithPipelineSize(loader.PipelineSize),
)
if err != nil {
panic(err)
}
closeFunc = lsa.Close
return func() {
ts := time.Now()
err := lsa.AppendBatch(loader.batch, func(lem []varlogpb.LogEntryMeta, err error) {
if err != nil {
panic(err)
}
dur := time.Since(ts)
recordMetrics(dur)
if begin {
notifyBegin(lem[0])
}
debugLog(lem)
})
if err != nil {
panic(err)
}
}, closeFunc
}

func (loader *Loader) appendLoop(ctx context.Context, c varlog.Log) error {
var am AppendMetrics
appendFunc, closeFunc := loader.makeAppendFunc(ctx, c, &am)
defer closeFunc()

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
appendFunc()
}
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/benchmark/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Target struct {
BatchSize uint
AppendersCount uint
SubscribersCount uint
PipelineSize int
}

func (tgt Target) Valid() error {
Expand Down

0 comments on commit 53b2258

Please sign in to comment.