From abf8a5af53b7ca5f916dc2b23f453279ea9ed443 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Mon, 12 Dec 2022 21:01:59 +0900 Subject: [PATCH] feat(benchmark): add `--print-json` to print benchmark result as JSON This patch adds a new flag - `--print-json` to the benchmark tool's test command. This flag lets the benchmark print to its output as JSON. Updates #257 --- cmd/benchmark/test_command.go | 13 ++++ internal/benchmark/benchmark.go | 5 +- internal/benchmark/config.go | 7 ++ internal/benchmark/metrics.go | 76 ++++--------------- internal/benchmark/report.go | 117 ++++++++++++++++++++++++----- internal/benchmark/report_codec.go | 89 ++++++++++++++++++++++ internal/benchmark/target.go | 6 +- 7 files changed, 226 insertions(+), 87 deletions(-) create mode 100644 internal/benchmark/report_codec.go diff --git a/cmd/benchmark/test_command.go b/cmd/benchmark/test_command.go index 2286ff7b2..1b341b643 100644 --- a/cmd/benchmark/test_command.go +++ b/cmd/benchmark/test_command.go @@ -56,6 +56,10 @@ var ( Name: "report-interval", Value: benchmark.DefaultReportInterval, } + flagPrintJSON = &cli.BoolFlag{ + Name: "print-json", + Usage: "Print json output if it is set", + } ) func newCommandTest() *cli.Command { @@ -72,6 +76,7 @@ func newCommandTest() *cli.Command { flagSubscribers, flagDuration, flagReportInterval, + flagPrintJSON, }, Action: runCommandTest, } @@ -126,12 +131,20 @@ func runCommandTest(c *cli.Context) error { reportInterval := c.Duration(flagReportInterval.Name) + var enc benchmark.ReportEncoder + if c.Bool(flagPrintJSON.Name) { + enc = benchmark.JsonEncoder{} + } else { + enc = benchmark.StringEncoder{} + } + bm, err := benchmark.New( benchmark.WithClusterID(clusterID), benchmark.WithTargets(targets...), benchmark.WithMetadataRepository(c.StringSlice(flagMRAddrs.Name)), benchmark.WithDuration(duration), benchmark.WithReportInterval(reportInterval), + benchmark.WithReportEncoder(enc), ) if err != nil { return err diff --git a/internal/benchmark/benchmark.go b/internal/benchmark/benchmark.go index 44755e8e6..e74794566 100644 --- a/internal/benchmark/benchmark.go +++ b/internal/benchmark/benchmark.go @@ -82,7 +82,7 @@ func (bm *Benchmark) Run() error { defer func() { cancel() wg.Done() - fmt.Println(bm.metrics.String()) + fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush())) }() for { select { @@ -98,8 +98,7 @@ func (bm *Benchmark) Run() error { slog.Debug("loader failed") return case <-reportTick.C: - s := bm.metrics.String() - fmt.Println(s) + fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush())) } } diff --git a/internal/benchmark/config.go b/internal/benchmark/config.go index be6922c11..7c1d2ec82 100644 --- a/internal/benchmark/config.go +++ b/internal/benchmark/config.go @@ -22,6 +22,7 @@ type config struct { mraddrs []string duration time.Duration reportInterval time.Duration + reportEncoder ReportEncoder } func newConfig(opts []Option) (config, error) { @@ -99,3 +100,9 @@ func WithReportInterval(reportInterval time.Duration) Option { cfg.reportInterval = reportInterval }) } + +func WithReportEncoder(enc ReportEncoder) Option { + return newFuncOption(func(cfg *config) { + cfg.reportEncoder = enc + }) +} diff --git a/internal/benchmark/metrics.go b/internal/benchmark/metrics.go index 6a4eda54a..0ca023778 100644 --- a/internal/benchmark/metrics.go +++ b/internal/benchmark/metrics.go @@ -1,77 +1,22 @@ package benchmark import ( - "fmt" - "strings" "sync" "time" - - "github.com/kakao/varlog/pkg/util/units" ) type Metrics struct { loaderMetrics []*LoaderMetrics } -func (m Metrics) String() string { - // arps: appended requests per second - // abps: appended megabytes per second - // adur: mean/min/max append duration in milliseconds - // slps: subscribed logs per second - // sbps: subscribed megabytes per second - // eelat: end-to-end latency in milliseconds - var sb strings.Builder - fmt.Fprintf(&sb, "___tgt") // 6 spaces - fmt.Fprintf(&sb, "__arpsR") // 7 spaces - fmt.Fprintf(&sb, "__arpsT") // 7 spaces - - fmt.Fprintf(&sb, "_______abpsR") // 12 spaces - fmt.Fprintf(&sb, "_______abpsT") // 12 spaces - - fmt.Fprintf(&sb, "__adurR") // 7 spaces - fmt.Fprintf(&sb, "__adurT") // 7 spaces - - fmt.Fprintf(&sb, "__slpsR") // 7 spaces - fmt.Fprintf(&sb, "__slpsT") // 7 spaces - - fmt.Fprintf(&sb, "_______sbpsR") // 12 spaces - fmt.Fprintf(&sb, "_______sbpsT") // 12 spaces - - fmt.Fprintf(&sb, "__eelatR") // 8 spaces - fmt.Fprintf(&sb, "__eelatT\n") // 8 spaces +func (m Metrics) Flush() TargetReports { + trs := TargetReports{ + Reports: make([]TargetReport, len(m.loaderMetrics)), + } for idx, lm := range m.loaderMetrics { - recent, total := lm.Flush() - fmt.Fprintf(&sb, "%6s", lm.tgt) - - // arps - fmt.Fprintf(&sb, "%7.1f", recent.AppendReport.RequestsPerSecond) - fmt.Fprintf(&sb, "%7.1f", total.AppendReport.RequestsPerSecond) - - // abps - fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(recent.AppendReport.BytesPerSecond)) - fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(total.AppendReport.BytesPerSecond)) - - // adur - fmt.Fprintf(&sb, "%7.1f", recent.AppendReport.Duration) - fmt.Fprintf(&sb, "%7.1f", total.AppendReport.Duration) - - // slps - fmt.Fprintf(&sb, "%7.1f", recent.SubscribeReport.LogsPerSecond) - fmt.Fprintf(&sb, "%7.1f", total.SubscribeReport.LogsPerSecond) - - // sbps - fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(recent.SubscribeReport.BytesPerSecond)) - fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(total.SubscribeReport.BytesPerSecond)) - - // eelat - fmt.Fprintf(&sb, "%8.1f", recent.EndToEndReport.Latency) - fmt.Fprintf(&sb, "%8.1f", total.EndToEndReport.Latency) - - if idx < len(m.loaderMetrics)-1 { - fmt.Fprint(&sb, "\n") - } + trs.Reports[idx] = lm.Flush() } - return sb.String() + return trs } type LoaderMetrics struct { @@ -138,11 +83,12 @@ func (lm *LoaderMetrics) ReportSubscribeMetrics(m SubscribeMetrics) bool { return true } -func (lm *LoaderMetrics) Flush() (recent, total Report) { +func (lm *LoaderMetrics) Flush() TargetReport { lm.mu.Lock() defer lm.mu.Unlock() now := time.Now() + var recent, total Report total.AppendReport = NewAppendReportFromMetrics(lm.appendMetrics.total, now.Sub(lm.initTime)) total.SubscribeReport = NewSubscribeReportFromMetrics(lm.subscribeMetrics.total, now.Sub(lm.initTime)) @@ -154,5 +100,9 @@ func (lm *LoaderMetrics) Flush() (recent, total Report) { lm.subscribeMetrics.recent = SubscribeMetrics{} lm.lastTime = now - return recent, total + return TargetReport{ + Target: lm.tgt.String(), + Recent: recent, + Total: total, + } } diff --git a/internal/benchmark/report.go b/internal/benchmark/report.go index fcf221024..db44bff13 100644 --- a/internal/benchmark/report.go +++ b/internal/benchmark/report.go @@ -1,28 +1,14 @@ package benchmark import ( + "encoding/json" "time" ) type AppendReport struct { - RequestsPerSecond float64 - BytesPerSecond float64 - Duration float64 -} - -type SubscribeReport struct { - LogsPerSecond float64 - BytesPerSecond float64 -} - -type EndToEndReport struct { - Latency float64 -} - -type Report struct { - AppendReport - SubscribeReport - EndToEndReport + RequestsPerSecond float64 `json:"requestsPerSecond"` + BytesPerSecond float64 `json:"bytesPerSecond"` + Duration float64 `json:"duration"` } func NewAppendReportFromMetrics(metrics AppendMetrics, interval time.Duration) AppendReport { @@ -33,9 +19,104 @@ func NewAppendReportFromMetrics(metrics AppendMetrics, interval time.Duration) A } } +type SubscribeReport struct { + LogsPerSecond float64 `json:"logsPerSecond"` + BytesPerSecond float64 `json:"bytesPerSecond"` +} + func NewSubscribeReportFromMetrics(metrics SubscribeMetrics, interval time.Duration) SubscribeReport { return SubscribeReport{ LogsPerSecond: float64(metrics.logs) / interval.Seconds(), BytesPerSecond: float64(metrics.bytes) / interval.Seconds(), } } + +type EndToEndReport struct { + Latency float64 `json:"latency"` +} + +type Report struct { + AppendReport `json:"append"` + SubscribeReport `json:"subscribe"` + EndToEndReport `json:"endToEnd"` +} + +type TargetReport struct { + Target string `json:"target"` + Recent Report `json:"recent"` + Total Report `json:"total"` +} + +type TargetReports struct { + Reports []TargetReport +} + +func (trs TargetReports) String() string { + enc := StringEncoder{} + buf, err := enc.Encode(trs) + if err != nil { + panic(err) + } + return string(buf) + //// arps: appended requests per second + //// abps: appended megabytes per second + //// adur: mean/min/max append duration in milliseconds + //// slps: subscribed logs per second + //// sbps: subscribed megabytes per second + //// eelat: end-to-end latency in milliseconds + //var sb strings.Builder + //fmt.Fprintf(&sb, "___tgt") // 6 spaces + //fmt.Fprintf(&sb, "__arpsR") // 7 spaces + //fmt.Fprintf(&sb, "__arpsT") // 7 spaces + // + //fmt.Fprintf(&sb, "_______abpsR") // 12 spaces + //fmt.Fprintf(&sb, "_______abpsT") // 12 spaces + // + //fmt.Fprintf(&sb, "__adurR") // 7 spaces + //fmt.Fprintf(&sb, "__adurT") // 7 spaces + // + //fmt.Fprintf(&sb, "__slpsR") // 7 spaces + //fmt.Fprintf(&sb, "__slpsT") // 7 spaces + // + //fmt.Fprintf(&sb, "_______sbpsR") // 12 spaces + //fmt.Fprintf(&sb, "_______sbpsT") // 12 spaces + // + //fmt.Fprintf(&sb, "__eelatR") // 8 spaces + //fmt.Fprintf(&sb, "__eelatT\n") // 8 spaces + //for idx, rpt := range trs.Reports { + // fmt.Fprintf(&sb, "%6s", rpt.Target) + // + // // arps + // fmt.Fprintf(&sb, "%7.1f", rpt.Recent.AppendReport.RequestsPerSecond) + // fmt.Fprintf(&sb, "%7.1f", rpt.Total.AppendReport.RequestsPerSecond) + // + // // abps + // fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(rpt.Recent.AppendReport.BytesPerSecond)) + // fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(rpt.Total.AppendReport.BytesPerSecond)) + // + // // adur + // fmt.Fprintf(&sb, "%7.1f", rpt.Recent.AppendReport.Duration) + // fmt.Fprintf(&sb, "%7.1f", rpt.Total.AppendReport.Duration) + // + // // slps + // fmt.Fprintf(&sb, "%7.1f", rpt.Recent.SubscribeReport.LogsPerSecond) + // fmt.Fprintf(&sb, "%7.1f", rpt.Total.SubscribeReport.LogsPerSecond) + // + // // sbps + // fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(rpt.Recent.SubscribeReport.BytesPerSecond)) + // fmt.Fprintf(&sb, "%10s/s", units.ToByteSizeString(rpt.Total.SubscribeReport.BytesPerSecond)) + // + // // eelat + // fmt.Fprintf(&sb, "%8.1f", rpt.Recent.EndToEndReport.Latency) + // fmt.Fprintf(&sb, "%8.1f", rpt.Total.EndToEndReport.Latency) + // + // if idx < len(trs.Reports)-1 { + // fmt.Fprint(&sb, "\n") + // } + //} + //return sb.String() +} + +func (trs TargetReports) JSON() ([]byte, error) { + return json.Marshal(trs) +} diff --git a/internal/benchmark/report_codec.go b/internal/benchmark/report_codec.go new file mode 100644 index 000000000..7dbcb2ee7 --- /dev/null +++ b/internal/benchmark/report_codec.go @@ -0,0 +1,89 @@ +package benchmark + +import ( + "bytes" + "encoding/json" + "fmt" + + "github.com/kakao/varlog/pkg/util/units" +) + +type ReportEncoder interface { + Encode(trs TargetReports) ([]byte, error) +} + +type JsonEncoder struct{} + +func (je JsonEncoder) Encode(trs TargetReports) ([]byte, error) { + return json.Marshal(trs) +} + +type StringEncoder struct{} + +func (se StringEncoder) Encode(trs TargetReports) ([]byte, error) { + var buf bytes.Buffer + // arps: appended requests per second + // abps: appended megabytes per second + // adur: mean/min/max append duration in milliseconds + // slps: subscribed logs per second + // sbps: subscribed megabytes per second + // eelat: end-to-end latency in milliseconds + fmt.Fprintf(&buf, "___tgt") // 6 spaces + fmt.Fprintf(&buf, "__arpsR") // 7 spaces + fmt.Fprintf(&buf, "__arpsT") // 7 spaces + + fmt.Fprintf(&buf, "_______abpsR") // 12 spaces + fmt.Fprintf(&buf, "_______abpsT") // 12 spaces + + fmt.Fprintf(&buf, "__adurR") // 7 spaces + fmt.Fprintf(&buf, "__adurT") // 7 spaces + + fmt.Fprintf(&buf, "__slpsR") // 7 spaces + fmt.Fprintf(&buf, "__slpsT") // 7 spaces + + fmt.Fprintf(&buf, "_______sbpsR") // 12 spaces + fmt.Fprintf(&buf, "_______sbpsT") // 12 spaces + + fmt.Fprintf(&buf, "__eelatR") // 8 spaces + fmt.Fprintf(&buf, "__eelatT\n") // 8 spaces + for idx, rpt := range trs.Reports { + fmt.Fprintf(&buf, "%6s", rpt.Target) + + // arps + fmt.Fprintf(&buf, "%7.1f", rpt.Recent.AppendReport.RequestsPerSecond) + fmt.Fprintf(&buf, "%7.1f", rpt.Total.AppendReport.RequestsPerSecond) + + // abps + fmt.Fprintf(&buf, "%10s/s", units.ToByteSizeString(rpt.Recent.AppendReport.BytesPerSecond)) + fmt.Fprintf(&buf, "%10s/s", units.ToByteSizeString(rpt.Total.AppendReport.BytesPerSecond)) + + // adur + fmt.Fprintf(&buf, "%7.1f", rpt.Recent.AppendReport.Duration) + fmt.Fprintf(&buf, "%7.1f", rpt.Total.AppendReport.Duration) + + // slps + fmt.Fprintf(&buf, "%7.1f", rpt.Recent.SubscribeReport.LogsPerSecond) + fmt.Fprintf(&buf, "%7.1f", rpt.Total.SubscribeReport.LogsPerSecond) + + // sbps + fmt.Fprintf(&buf, "%10s/s", units.ToByteSizeString(rpt.Recent.SubscribeReport.BytesPerSecond)) + fmt.Fprintf(&buf, "%10s/s", units.ToByteSizeString(rpt.Total.SubscribeReport.BytesPerSecond)) + + // eelat + fmt.Fprintf(&buf, "%8.1f", rpt.Recent.EndToEndReport.Latency) + fmt.Fprintf(&buf, "%8.1f", rpt.Total.EndToEndReport.Latency) + + if idx < len(trs.Reports)-1 { + fmt.Fprint(&buf, "\n") + } + } + return buf.Bytes(), nil +} + +func MustEncode(enc ReportEncoder, trs TargetReports) string { + buf, err := enc.Encode(trs) + if err != nil { + panic(err) + } + return string(buf) +} diff --git a/internal/benchmark/target.go b/internal/benchmark/target.go index c0f489c34..8c92990f0 100644 --- a/internal/benchmark/target.go +++ b/internal/benchmark/target.go @@ -8,15 +8,15 @@ import ( ) type Target struct { - TopicID types.TopicID - LogStreamID types.LogStreamID + TopicID types.TopicID `json:"topicId"` + LogStreamID types.LogStreamID `json:"logStreamId"` MessageSize uint BatchSize uint AppendersCount uint SubscribersCount uint } -func (tgt *Target) Valid() error { +func (tgt Target) Valid() error { if tgt.TopicID.Invalid() { return fmt.Errorf("invalid topic %v", tgt.TopicID) }