Skip to content

Commit

Permalink
feat: add "ftl bench" command (#2884)
Browse files Browse the repository at this point in the history
This benchmarks FTL calls at the specified concurrency level.

eg.

```
~/dev/ftl $ ftl bench -j16 -c1024 echo.echo
Starting benchmark
  Verb: echo.echo
  Count: 1024
  Parallelism: 16
Results:
  Successes: 16384
  Errors: 0
Timing percentiles:
  50%: 10.868375ms
  90%: 15.968125ms
  95%: 18.338459ms
  99%: 24.135084ms
Standard deviation: ±3.713237ms
```

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
alecthomas and github-actions[bot] committed Sep 28, 2024
1 parent 68aa44b commit 647858f
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 7 deletions.
22 changes: 16 additions & 6 deletions backend/controller/timeline/timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"time"

"github.com/alecthomas/atomic"

"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
Expand Down Expand Up @@ -42,10 +44,12 @@ type InEvent interface {
}

type Service struct {
ctx context.Context
conn *stdsql.DB
encryption *encryption.Service
events chan InEvent
ctx context.Context
conn *stdsql.DB
encryption *encryption.Service
events chan InEvent
lastDroppedError atomic.Value[time.Time]
lastFailedError atomic.Value[time.Time]
}

func New(ctx context.Context, conn *stdsql.DB, encryption *encryption.Service) *Service {
Expand All @@ -71,7 +75,10 @@ func (s *Service) EnqueueEvent(ctx context.Context, event InEvent) {
select {
case s.events <- event:
default:
log.FromContext(ctx).Warnf("Dropping event %T due to full queue", event)
if time.Since(s.lastDroppedError.Load()) > 10*time.Second {
log.FromContext(ctx).Warnf("Dropping event %T due to full queue", event)
s.lastDroppedError.Store(time.Now())
}
}
}

Expand Down Expand Up @@ -133,7 +140,10 @@ func (s *Service) flushEvents(events []InEvent) {
lastError = err
}
if lastError != nil {
logger.Errorf(lastError, "Failed to insert %d events, most recent error", failures)
if time.Since(s.lastFailedError.Load()) > 10*time.Second {
logger.Errorf(lastError, "Failed to insert %d events, most recent error", failures)
s.lastFailedError.Store(time.Now())
}
observability.Timeline.Failed(s.ctx, failures)
}
observability.Timeline.Inserted(s.ctx, len(events)-failures)
Expand Down
129 changes: 129 additions & 0 deletions frontend/cli/cmd_bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package main

import (
"context"
"fmt"
"math"
"sort"
"sync/atomic"
"time"

"connectrpc.com/connect"
"github.com/jpillora/backoff"
"github.com/titanous/json5"
"golang.org/x/sync/errgroup"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/go-runtime/ftl/reflection"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/rpc"
)

type benchCmd struct {
Count int `short:"c" help:"Number of times to call the Verb in each thread." default:"128"`
Parallelism int `short:"j" help:"Number of concurrent benchmarks to create." default:"${numcpu}"`
Wait time.Duration `short:"w" help:"Wait up to this elapsed time for the FTL cluster to become available." default:"1m"`
Verb reflection.Ref `arg:"" required:"" help:"Full path of Verb to call." predictor:"verbs"`
Request string `arg:"" optional:"" help:"JSON5 request payload." default:"{}"`
}

func (c *benchCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient) error {
ctx, cancel := context.WithTimeout(ctx, c.Wait)
defer cancel()
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil {
return fmt.Errorf("FTL cluster did not become ready: %w", err)
}
logger := log.FromContext(ctx)
request := map[string]any{}
err := json5.Unmarshal([]byte(c.Request), &request)
if err != nil {
return fmt.Errorf("invalid request: %w", err)
}

fmt.Printf("Starting benchmark\n")
fmt.Printf(" Verb: %s\n", c.Verb)
fmt.Printf(" Count: %d\n", c.Count)
fmt.Printf(" Parallelism: %d\n", c.Parallelism)

var errors int64
var success int64
wg := errgroup.Group{}
timings := make([][]time.Duration, c.Parallelism)
for job := range c.Parallelism {
wg.Go(func() error {
for range c.Count {
start := time.Now()
// otherwise, we have a match so call the verb
_, err := client.Call(ctx, connect.NewRequest(&ftlv1.CallRequest{
Verb: c.Verb.ToProto(),
Body: []byte(c.Request),
}))
if err != nil {
// Only log error once.
if atomic.AddInt64(&errors, 1) == 1 {
logger.Errorf(err, "Error calling %s", c.Verb)
}
} else {
atomic.AddInt64(&success, 1)
}
timings[job] = append(timings[job], time.Since(start))
}
return nil
})
}
_ = wg.Wait() //nolint: errcheck

// Display timing percentiles.
var allTimings []time.Duration
for _, t := range timings {
allTimings = append(allTimings, t...)
}
sort.Slice(allTimings, func(i, j int) bool { return allTimings[i] < allTimings[j] })
fmt.Printf("Results:\n")
fmt.Printf(" Successes: %d\n", success)
fmt.Printf(" Errors: %d\n", errors)
fmt.Printf("Timing percentiles:\n")
for p, t := range computePercentiles(allTimings) {
fmt.Printf(" %d%%: %s\n", p, t)
}
fmt.Printf("Standard deviation: ±%v\n", computeStandardDeviation(allTimings))
return nil
}

func computePercentiles(timings []time.Duration) map[int]time.Duration {
percentiles := map[int]time.Duration{}
for _, p := range []int{50, 90, 95, 99} {
percentiles[p] = percentile(timings, p)
}
return percentiles
}

func percentile(timings []time.Duration, p int) time.Duration {
if len(timings) == 0 {
return 0
}
i := int(float64(len(timings)) * float64(p) / 100)
return timings[i]
}

func computeStandardDeviation(timings []time.Duration) time.Duration {
if len(timings) == 0 {
return 0
}

var sum time.Duration
for _, t := range timings {
sum += t
}
mean := float64(sum) / float64(len(timings))

var varianceSum float64
for _, t := range timings {
diff := float64(t) - mean
varianceSum += diff * diff
}
variance := varianceSum / float64(len(timings))

return time.Duration(math.Sqrt(variance))
}
2 changes: 1 addition & 1 deletion frontend/cli/cmd_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func callVerb(ctx context.Context, client ftlv1connect.VerbServiceClient, ctlCli
if cerr := new(connect.Error); errors.As(err, &cerr) && cerr.Code() == connect.CodeNotFound {
suggestions, err := findSuggestions(ctx, ctlCli, verb)

// if we have suggestions, return a helpful error message. otherwise continue to the original error
// If we have suggestions, return a helpful error message, otherwise continue to the original error.
if err == nil {
return fmt.Errorf("verb not found: %s\n\nDid you mean one of these?\n%s", verb, strings.Join(suggestions, "\n"))
}
Expand Down
1 change: 1 addition & 0 deletions frontend/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type InteractiveCLI struct {
New newCmd `cmd:"" help:"Create a new FTL module."`
PS psCmd `cmd:"" help:"List deployments."`
Call callCmd `cmd:"" help:"Call an FTL function."`
Bench benchCmd `cmd:"" help:"Benchmark an FTL function."`
Replay replayCmd `cmd:"" help:"Call an FTL function with the same request body as the last invocation."`
Update updateCmd `cmd:"" help:"Update a deployment."`
Kill killCmd `cmd:"" help:"Kill a deployment."`
Expand Down

0 comments on commit 647858f

Please sign in to comment.