Skip to content

Commit

Permalink
feat(benchmark): add --print-json to print benchmark result as JSON
Browse files Browse the repository at this point in the history
This patch adds a new flag - `--print-json` to the benchmark tool's test command. This flag lets the
benchmark print to its output as JSON.

Updates #257
  • Loading branch information
ijsong committed Dec 12, 2022
1 parent 8c91902 commit abf8a5a
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 87 deletions.
13 changes: 13 additions & 0 deletions cmd/benchmark/test_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ var (
Name: "report-interval",
Value: benchmark.DefaultReportInterval,
}
flagPrintJSON = &cli.BoolFlag{
Name: "print-json",
Usage: "Print json output if it is set",
}
)

func newCommandTest() *cli.Command {
Expand All @@ -72,6 +76,7 @@ func newCommandTest() *cli.Command {
flagSubscribers,
flagDuration,
flagReportInterval,
flagPrintJSON,
},
Action: runCommandTest,
}
Expand Down Expand Up @@ -126,12 +131,20 @@ func runCommandTest(c *cli.Context) error {

reportInterval := c.Duration(flagReportInterval.Name)

var enc benchmark.ReportEncoder
if c.Bool(flagPrintJSON.Name) {
enc = benchmark.JsonEncoder{}
} else {
enc = benchmark.StringEncoder{}
}

bm, err := benchmark.New(
benchmark.WithClusterID(clusterID),
benchmark.WithTargets(targets...),
benchmark.WithMetadataRepository(c.StringSlice(flagMRAddrs.Name)),
benchmark.WithDuration(duration),
benchmark.WithReportInterval(reportInterval),
benchmark.WithReportEncoder(enc),
)
if err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions internal/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (bm *Benchmark) Run() error {
defer func() {
cancel()
wg.Done()
fmt.Println(bm.metrics.String())
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
}()
for {
select {
Expand All @@ -98,8 +98,7 @@ func (bm *Benchmark) Run() error {
slog.Debug("loader failed")
return
case <-reportTick.C:
s := bm.metrics.String()
fmt.Println(s)
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
}
}

Expand Down
7 changes: 7 additions & 0 deletions internal/benchmark/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type config struct {
mraddrs []string
duration time.Duration
reportInterval time.Duration
reportEncoder ReportEncoder
}

func newConfig(opts []Option) (config, error) {
Expand Down Expand Up @@ -99,3 +100,9 @@ func WithReportInterval(reportInterval time.Duration) Option {
cfg.reportInterval = reportInterval
})
}

func WithReportEncoder(enc ReportEncoder) Option {
return newFuncOption(func(cfg *config) {
cfg.reportEncoder = enc
})
}
76 changes: 13 additions & 63 deletions internal/benchmark/metrics.go
Original file line number Diff line number Diff line change
@@ -1,77 +1,22 @@
package benchmark

import (
"fmt"
"strings"
"sync"
"time"

"github.com/kakao/varlog/pkg/util/units"
)

type Metrics struct {
loaderMetrics []*LoaderMetrics
}

func (m Metrics) String() string {
// arps: appended requests per second
// abps: appended megabytes per second
// adur: mean/min/max append duration in milliseconds
// slps: subscribed logs per second
// sbps: subscribed megabytes per second
// eelat: end-to-end latency in milliseconds
var sb strings.Builder
fmt.Fprintf(&sb, "___tgt") // 6 spaces
fmt.Fprintf(&sb, "__arpsR") // 7 spaces
fmt.Fprintf(&sb, "__arpsT") // 7 spaces

fmt.Fprintf(&sb, "_______abpsR") // 12 spaces
fmt.Fprintf(&sb, "_______abpsT") // 12 spaces

fmt.Fprintf(&sb, "__adurR") // 7 spaces
fmt.Fprintf(&sb, "__adurT") // 7 spaces

fmt.Fprintf(&sb, "__slpsR") // 7 spaces
fmt.Fprintf(&sb, "__slpsT") // 7 spaces

fmt.Fprintf(&sb, "_______sbpsR") // 12 spaces
fmt.Fprintf(&sb, "_______sbpsT") // 12 spaces

fmt.Fprintf(&sb, "__eelatR") // 8 spaces
fmt.Fprintf(&sb, "__eelatT\n") // 8 spaces
func (m Metrics) Flush() TargetReports {
trs := TargetReports{
Reports: make([]TargetReport, len(m.loaderMetrics)),
}
for idx, lm := range m.loaderMetrics {
recent, total := lm.Flush()
fmt.Fprintf(&sb, "%6s", lm.tgt)

// arps
fmt.Fprintf(&sb, "%7.1f", recent.AppendReport.RequestsPerSecond)
fmt.Fprintf(&sb, "%7.1f", total.AppendReport.RequestsPerSecond)

// abps
fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(recent.AppendReport.BytesPerSecond))
fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(total.AppendReport.BytesPerSecond))

// adur
fmt.Fprintf(&sb, "%7.1f", recent.AppendReport.Duration)
fmt.Fprintf(&sb, "%7.1f", total.AppendReport.Duration)

// slps
fmt.Fprintf(&sb, "%7.1f", recent.SubscribeReport.LogsPerSecond)
fmt.Fprintf(&sb, "%7.1f", total.SubscribeReport.LogsPerSecond)

// sbps
fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(recent.SubscribeReport.BytesPerSecond))
fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(total.SubscribeReport.BytesPerSecond))

// eelat
fmt.Fprintf(&sb, "%8.1f", recent.EndToEndReport.Latency)
fmt.Fprintf(&sb, "%8.1f", total.EndToEndReport.Latency)

if idx < len(m.loaderMetrics)-1 {
fmt.Fprint(&sb, "\n")
}
trs.Reports[idx] = lm.Flush()
}
return sb.String()
return trs
}

type LoaderMetrics struct {
Expand Down Expand Up @@ -138,11 +83,12 @@ func (lm *LoaderMetrics) ReportSubscribeMetrics(m SubscribeMetrics) bool {
return true
}

func (lm *LoaderMetrics) Flush() (recent, total Report) {
func (lm *LoaderMetrics) Flush() TargetReport {
lm.mu.Lock()
defer lm.mu.Unlock()

now := time.Now()
var recent, total Report

total.AppendReport = NewAppendReportFromMetrics(lm.appendMetrics.total, now.Sub(lm.initTime))
total.SubscribeReport = NewSubscribeReportFromMetrics(lm.subscribeMetrics.total, now.Sub(lm.initTime))
Expand All @@ -154,5 +100,9 @@ func (lm *LoaderMetrics) Flush() (recent, total Report) {
lm.subscribeMetrics.recent = SubscribeMetrics{}
lm.lastTime = now

return recent, total
return TargetReport{
Target: lm.tgt.String(),
Recent: recent,
Total: total,
}
}
117 changes: 99 additions & 18 deletions internal/benchmark/report.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,14 @@
package benchmark

import (
"encoding/json"
"time"
)

type AppendReport struct {
RequestsPerSecond float64
BytesPerSecond float64
Duration float64
}

type SubscribeReport struct {
LogsPerSecond float64
BytesPerSecond float64
}

type EndToEndReport struct {
Latency float64
}

type Report struct {
AppendReport
SubscribeReport
EndToEndReport
RequestsPerSecond float64 `json:"requestsPerSecond"`
BytesPerSecond float64 `json:"bytesPerSecond"`
Duration float64 `json:"duration"`
}

func NewAppendReportFromMetrics(metrics AppendMetrics, interval time.Duration) AppendReport {
Expand All @@ -33,9 +19,104 @@ func NewAppendReportFromMetrics(metrics AppendMetrics, interval time.Duration) A
}
}

type SubscribeReport struct {
LogsPerSecond float64 `json:"logsPerSecond"`
BytesPerSecond float64 `json:"bytesPerSecond"`
}

func NewSubscribeReportFromMetrics(metrics SubscribeMetrics, interval time.Duration) SubscribeReport {
return SubscribeReport{
LogsPerSecond: float64(metrics.logs) / interval.Seconds(),
BytesPerSecond: float64(metrics.bytes) / interval.Seconds(),
}
}

type EndToEndReport struct {
Latency float64 `json:"latency"`
}

type Report struct {
AppendReport `json:"append"`
SubscribeReport `json:"subscribe"`
EndToEndReport `json:"endToEnd"`
}

type TargetReport struct {
Target string `json:"target"`
Recent Report `json:"recent"`
Total Report `json:"total"`
}

type TargetReports struct {
Reports []TargetReport
}

func (trs TargetReports) String() string {
enc := StringEncoder{}
buf, err := enc.Encode(trs)
if err != nil {
panic(err)
}
return string(buf)
//// arps: appended requests per second
//// abps: appended megabytes per second
//// adur: mean/min/max append duration in milliseconds
//// slps: subscribed logs per second
//// sbps: subscribed megabytes per second
//// eelat: end-to-end latency in milliseconds
//var sb strings.Builder
//fmt.Fprintf(&sb, "___tgt") // 6 spaces
//fmt.Fprintf(&sb, "__arpsR") // 7 spaces
//fmt.Fprintf(&sb, "__arpsT") // 7 spaces
//
//fmt.Fprintf(&sb, "_______abpsR") // 12 spaces
//fmt.Fprintf(&sb, "_______abpsT") // 12 spaces
//
//fmt.Fprintf(&sb, "__adurR") // 7 spaces
//fmt.Fprintf(&sb, "__adurT") // 7 spaces
//
//fmt.Fprintf(&sb, "__slpsR") // 7 spaces
//fmt.Fprintf(&sb, "__slpsT") // 7 spaces
//
//fmt.Fprintf(&sb, "_______sbpsR") // 12 spaces
//fmt.Fprintf(&sb, "_______sbpsT") // 12 spaces
//
//fmt.Fprintf(&sb, "__eelatR") // 8 spaces
//fmt.Fprintf(&sb, "__eelatT\n") // 8 spaces
//for idx, rpt := range trs.Reports {
// fmt.Fprintf(&sb, "%6s", rpt.Target)
//
// // arps
// fmt.Fprintf(&sb, "%7.1f", rpt.Recent.AppendReport.RequestsPerSecond)
// fmt.Fprintf(&sb, "%7.1f", rpt.Total.AppendReport.RequestsPerSecond)
//
// // abps
// fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(rpt.Recent.AppendReport.BytesPerSecond))
// fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(rpt.Total.AppendReport.BytesPerSecond))
//
// // adur
// fmt.Fprintf(&sb, "%7.1f", rpt.Recent.AppendReport.Duration)
// fmt.Fprintf(&sb, "%7.1f", rpt.Total.AppendReport.Duration)
//
// // slps
// fmt.Fprintf(&sb, "%7.1f", rpt.Recent.SubscribeReport.LogsPerSecond)
// fmt.Fprintf(&sb, "%7.1f", rpt.Total.SubscribeReport.LogsPerSecond)
//
// // sbps
// fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(rpt.Recent.SubscribeReport.BytesPerSecond))
// fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(rpt.Total.SubscribeReport.BytesPerSecond))
//
// // eelat
// fmt.Fprintf(&sb, "%8.1f", rpt.Recent.EndToEndReport.Latency)
// fmt.Fprintf(&sb, "%8.1f", rpt.Total.EndToEndReport.Latency)
//
// if idx < len(trs.Reports)-1 {
// fmt.Fprint(&sb, "\n")
// }
//}
//return sb.String()
}

func (trs TargetReports) JSON() ([]byte, error) {
return json.Marshal(trs)
}
Loading

0 comments on commit abf8a5a

Please sign in to comment.