Skip to content

Commit

Permalink
Batch completer + additional completer test suite and benchmarks
Browse files Browse the repository at this point in the history
Here, add a new completer using a completion strategy designed to be
much faster than what we're doing right now. Rather than blindly
throwing completion work into goroutine slots, it accumulates "batches"
of completions to be carried out, and using a debounced channel to fire
periodically (currently, up to every 100 milliseconds) and submit entire
batches for completion at once up to 2,000 jobs.

For the purposes of not grossly expanding the `riverdriver` interface,
the completer only batches jobs being set to `completed`, which under
most normal workloads we expect to be the vast common case. Jobs going
to other states are fed into a member `AsyncCompleter`, thereby allowing
the `BatchCompleter` to keeps implementation quite simple.

According to in-package benchmarking, the new completer is in the range
of 3-5x faster than `AsyncCompleter` (the one currently in use by River
client), and 10-15x faster than `InlineCompleter`.

    $ go test -bench=. ./internal/jobcompleter
    goos: darwin
    goarch: arm64
    pkg: github.com/riverqueue/river/internal/jobcompleter
    BenchmarkAsyncCompleter_Concurrency10/Completion-8                 10851            112318 ns/op
    BenchmarkAsyncCompleter_Concurrency10/RotatingStates-8             11386            120706 ns/op
    BenchmarkAsyncCompleter_Concurrency100/Completion-8                 9763            116773 ns/op
    BenchmarkAsyncCompleter_Concurrency100/RotatingStates-8            10884            115718 ns/op
    BenchmarkBatchCompleter/Completion-8                               54916             27314 ns/op
    BenchmarkBatchCompleter/RotatingStates-8                           11518            100997 ns/op
    BenchmarkInlineCompleter/Completion-8                               4656            369281 ns/op
    BenchmarkInlineCompleter/RotatingStates-8                           1561            794136 ns/op
    PASS
    ok      github.com/riverqueue/river/internal/jobcompleter       21.123s

Along with the new completer, we also add a vastly more thorough test
suite to help tease out race conditions and test edges that were
previously being ignored completely. For most cases we drop the heavy
mocking that was happening before, which was having the effect of
minimizing the surface area under test, and producing misleading timing
that wasn't realistic.

Similarly, we bring in a new benchmark framework to allow us to easily
vet and compare completer implementations relative to each other. The
expectation is that this will act as a more synthetic proxy, with the
new benchmarking tool in #254 providing a more realistic end-to-end
measurement.
  • Loading branch information
brandur committed Mar 12, 2024
1 parent ccea9c7 commit b7480ad
Show file tree
Hide file tree
Showing 21 changed files with 1,598 additions and 281 deletions.
84 changes: 62 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

completer := jobcompleter.NewAsyncCompleter(archetype, driver.GetExecutor(), 100)

client := &Client[TTx]{
completer: completer,
config: config,
driver: driver,
monitor: newClientMonitor(),
Expand Down Expand Up @@ -461,6 +458,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// we'll need to add a config for this.
instanceName := "default"

client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())
client.services = append(client.services, client.completer)

client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)

Expand Down Expand Up @@ -606,6 +606,14 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return fmt.Errorf("error making initial connection to database: %w", err)
}

// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
stopServicesOnError := func() {
startstop.StopAllParallel(c.services)
c.monitor.Stop()
}

// Monitor should be the first subprocess to start, and the last to stop.
// It's not part of the waitgroup because we need to wait for everything else
// to shut down prior to closing the monitor.
Expand All @@ -616,26 +624,24 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// Receives job complete notifications from the completer and distributes
// them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)

for _, service := range c.services {
if err := service.Start(fetchNewWorkCtx); err != nil {
// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
startstop.StopAllParallel(c.services)

c.monitor.Stop()

stopServicesOnError()
if errors.Is(context.Cause(ctx), rivercommon.ErrShutdown) {
return nil
}
return err
}
}

// The completer is not included in the main services list so that it can be
// stopped last.
if c.completer != nil {
// Receives job complete notifications from the completer and
// distributes them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
}

if c.elector != nil {
c.wg.Add(1)
go func() {
Expand All @@ -658,14 +664,14 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
c.wg.Wait()

// Stop all mainline services where stop order isn't important.
startstop.StopAllParallel(c.services)

// Once the producers have all finished, we know that completers have at least
// enqueued any remaining work. Wait for the completer to finish.
//
// This list of services contains the completer, which should always stop
// after the producers so that any remaining work that was enqueued will
// have a chance to have its state completed as it finishes.
//
// TODO: there's a risk here that the completer is stuck on a job that won't
// complete. We probably need a timeout or way to move on in those cases.
c.completer.Wait()
startstop.StopAllParallel(c.services)

// Will only be started if this client was leader, but can tolerate a stop
// without having been started.
Expand Down Expand Up @@ -764,7 +770,41 @@ func (c *Client[TTx]) Stopped() <-chan struct{} {
// versions. If new event kinds are added, callers will have to explicitly add
// them to their requested list and ensure they can be handled correctly.
func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
for _, kind := range kinds {
return c.SubscribeConfig(&SubscribeConfig{Kinds: kinds})
}

// The default maximum size of the subscribe channel. Events that would overflow
// it will be dropped.
const subscribeChanSizeDefault = 1_000

// SubscribeConfig is more thorough subscription configuration used for
// Client.SubscribeConfig.
type SubscribeConfig struct {
// ChanSize is the size of the buffered channel that will be created for the
// subscription. Incoming events that overall this number because a listener
// isn't reading from the channel in a timely manner will be dropped.
//
// Defaults to 1000.
ChanSize int

// Kinds are the kinds of events that the subscription will receive.
// Requiring that kinds are specified explicitly allows for forward
// compatibility in case new kinds of events are added in future versions.
// If new event kinds are added, callers will have to explicitly add them to
// their requested list and esnure they can be handled correctly.
Kinds []EventKind
}

// Special internal variant that lets us inject an overridden size.
func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
if config.ChanSize < 0 {
panic("SubscribeConfig.ChanSize must be greater or equal to 1")
}
if config.ChanSize == 0 {
config.ChanSize = subscribeChanSizeDefault
}

for _, kind := range config.Kinds {
if _, ok := allKinds[kind]; !ok {
panic(fmt.Errorf("unknown event kind: %s", kind))
}
Expand All @@ -773,15 +813,15 @@ func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

subChan := make(chan *Event, subscribeChanSize)
subChan := make(chan *Event, config.ChanSize)

// Just gives us an easy way of removing the subscription again later.
subID := c.subscriptionsSeq
c.subscriptionsSeq++

c.subscriptions[subID] = &eventSubscription{
Chan: subChan,
Kinds: sliceutil.KeyBy(kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
}

cancel := func() {
Expand Down
Loading

0 comments on commit b7480ad

Please sign in to comment.