Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch completer + additional completer test suite and benchmarks #258

Merged
merged 1 commit into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- The River CLI now supports `river bench` to benchmark River's job throughput against a database. [PR #254](https://github.com/riverqueue/river/pull/254).
- The River client gets a new `Client.SubscribeConfig` function that lets a subscriber specify the maximum size of their subscription channel. [PR #258](https://github.com/riverqueue/river/pull/258).

### Changed

- River uses a new job completer that batches up completion work so that large numbers of them can be performed more efficiently. In a purely synthetic (i.e. mostly unrealistic) benchmark, River's job throughput increases ~4.5x. [PR #258](https://github.com/riverqueue/river/pull/258).
- Changed default client IDs to be a combination of hostname and the time which the client started. This can still be changed by specifying `Config.ID`. [PR #255](https://github.com/riverqueue/river/pull/255).
- Notifier refactored for better robustness and testability. [PR #253](https://github.com/riverqueue/river/pull/253).

Expand Down
138 changes: 98 additions & 40 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ type Client[TTx any] struct {
driver riverdriver.Driver[TTx]
elector *leadership.Elector

// fetchNewWorkCancel cancels the context used for fetching new work. This
// fetchWorkCancel cancels the context used for fetching new work. This
// will be used to stop fetching new work whenever stop is initiated, or
// when the context provided to Run is itself cancelled.
fetchNewWorkCancel context.CancelCauseFunc
fetchWorkCancel context.CancelCauseFunc
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was alternatively named fetchWork versus fetchNewWork depending on the place, so here just standardized on the former since it's a little more succinct and the naming still describes it nicely.


monitor *clientMonitor
notifier *notifier.Notifier
Expand Down Expand Up @@ -428,10 +428,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

completer := jobcompleter.NewAsyncCompleter(archetype, driver.GetExecutor(), 100)

client := &Client[TTx]{
completer: completer,
config: config,
driver: driver,
monitor: newClientMonitor(),
Expand Down Expand Up @@ -460,6 +457,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// we'll need to add a config for this.
instanceName := "default"

client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())
client.services = append(client.services, client.completer)

client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)

Expand Down Expand Up @@ -582,14 +582,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return errors.New("at least one Worker must be added to the Workers bundle")
}

// We use separate contexts for fetching and working to allow for a graceful
// stop. However, both inherit from the provided context so if it is
// cancelled a more aggressive stop will be initiated.
fetchCtx, fetchNewWorkCancel := context.WithCancelCause(ctx)
c.fetchNewWorkCancel = fetchNewWorkCancel
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))
c.workCancel = workCancel
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved these down a ways so it's harder to accidentally use these vars in the interim.


// Before doing anything else, make an initial connection to the database to
// verify that it appears healthy. Many of the subcomponents below start up
// in a goroutine and in case of initial failure, only produce a log line,
Expand All @@ -602,6 +594,14 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return fmt.Errorf("error making initial connection to database: %w", err)
}

// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
stopServicesOnError := func() {
startstop.StopAllParallel(c.services)
c.monitor.Stop()
}

// Monitor should be the first subprocess to start, and the last to stop.
// It's not part of the waitgroup because we need to wait for everything else
// to shut down prior to closing the monitor.
Expand All @@ -612,19 +612,40 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// Receives job complete notifications from the completer and distributes
// them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
if c.completer != nil {
// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ fetchCtx. This ensures that even
// when fetch is cancelled on shutdown, the completer is still given a
// separate opportunity to start stopping only after the producers have
// finished up and returned.
if err := c.completer.Start(ctx); err != nil {
stopServicesOnError()
return err
}

for _, service := range c.services {
if err := service.Start(fetchCtx); err != nil {
// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
startstop.StopAllParallel(c.services)
// Receives job complete notifications from the completer and
// distributes them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
}

c.monitor.Stop()
// We use separate contexts for fetching and working to allow for a graceful
// stop. However, both inherit from the provided context so if it is
// cancelled a more aggressive stop will be initiated.
fetchCtx, fetchWorkCancel := context.WithCancelCause(ctx)
c.fetchWorkCancel = fetchWorkCancel
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))
c.workCancel = workCancel

for _, service := range c.services {
// TODO(brandur): Reevaluate the use of fetchCtx here. It's currently
// necessary to speed up shutdown so that all services start shutting
// down before having to wait for the producers to finish, but as
// stopping becomes more normalized (hopefully by making the client
// itself a start/stop service), we can likely accomplish that in a
// cleaner way.
if err := service.Start(fetchCtx); err != nil {
stopServicesOnError()
if errors.Is(context.Cause(ctx), rivercommon.ErrShutdown) {
return nil
}
Expand Down Expand Up @@ -656,18 +677,21 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
producer.Stop()
}

// Stop all mainline services where stop order isn't important. Contains the
// elector and notifier, amongst others.
startstop.StopAllParallel(c.services)

// Once the producers have all finished, we know that completers have at least
// enqueued any remaining work. Wait for the completer to finish.
//
// TODO: there's a risk here that the completer is stuck on a job that won't
// complete. We probably need a timeout or way to move on in those cases.
c.completer.Wait()
// Stop all mainline services where stop order isn't important.
startstop.StopAllParallel(append(
// This list of services contains the completer, which should always
// stop after the producers so that any remaining work that was enqueued
// will have a chance to have its state completed as it finishes.
//
// TODO: there's a risk here that the completer is stuck on a job that
// won't complete. We probably need a timeout or way to move on in those
// cases.
c.services,

c.queueMaintainer.Stop()
// Will only be started if this client was leader, but can tolerate a stop
// without having been started.
c.queueMaintainer,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added queueMaintainer here so that it starts being stopped at the same time as other services. Makes shutdown a little faster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, there's probably no reason for this not to stop earlier. Well, maybe it should wait until after the elector has given up any leadership it may hold? 🤔 I guess it's fine if not, worst case they skip a cycle that's about to happen but somebody else will pick it up, right? Is that true of all maintenance services?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my rationale is basically: the maintenance services are going to be spending most of their time in wait loops anyway, and we expect all maintenance services to be spending most of their time in waits between loops under typical operation. Shutdown should take < 1s once it hits this point, so a new elector and maintenance services should come up quite quickly.

The only service that comes to mind is the periodic job enqueuer because it's essentially stateless now. If it was 59 minutes into about to working a once an hour job, that wait will have to start again on a client elsewhere. But that said, the difference in stopping the queue maintainer a second later will be negligible.

The queue maintainer is a constellation of ~5 services so I did find that having it start to stop in parallel along with these other services did help somewhat in making the top level client tests a little faster in aggregate (since the client is stopped on ~every test). I think overall the tiny tradeoff (if there is one even), is worth it.

))

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": All services stopped")

Expand Down Expand Up @@ -701,12 +725,12 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
// There's no need to call this method if a hard stop has already been initiated
// by cancelling the context passed to Start or by calling StopAndCancel.
func (c *Client[TTx]) Stop(ctx context.Context) error {
if c.fetchNewWorkCancel == nil {
if c.fetchWorkCancel == nil {
return errors.New("client not started")
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Stop started")
c.fetchNewWorkCancel(rivercommon.ErrShutdown)
c.fetchWorkCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)
}

Expand All @@ -731,7 +755,7 @@ func (c *Client[TTx]) awaitStop(ctx context.Context) error {
// instead.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.fetchNewWorkCancel(rivercommon.ErrShutdown)
c.fetchWorkCancel(rivercommon.ErrShutdown)
c.workCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)
}
Expand Down Expand Up @@ -762,7 +786,41 @@ func (c *Client[TTx]) Stopped() <-chan struct{} {
// versions. If new event kinds are added, callers will have to explicitly add
// them to their requested list and ensure they can be handled correctly.
func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
for _, kind := range kinds {
return c.SubscribeConfig(&SubscribeConfig{Kinds: kinds})
}

// The default maximum size of the subscribe channel. Events that would overflow
// it will be dropped.
const subscribeChanSizeDefault = 1_000

// SubscribeConfig is more thorough subscription configuration used for
// Client.SubscribeConfig.
type SubscribeConfig struct {
// ChanSize is the size of the buffered channel that will be created for the
// subscription. Incoming events that overall this number because a listener
// isn't reading from the channel in a timely manner will be dropped.
//
// Defaults to 1000.
ChanSize int

// Kinds are the kinds of events that the subscription will receive.
// Requiring that kinds are specified explicitly allows for forward
// compatibility in case new kinds of events are added in future versions.
// If new event kinds are added, callers will have to explicitly add them to
// their requested list and esnure they can be handled correctly.
Kinds []EventKind
}

// Special internal variant that lets us inject an overridden size.
func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
if config.ChanSize < 0 {
panic("SubscribeConfig.ChanSize must be greater or equal to 1")
}
Comment on lines +816 to +818
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't care about max? if you want to be dumb go right ahead...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not sure it makes sense to try and enforce a cap. Any Go user can try to make(chan struct{}, 1_000_000_000_000) anytime they want.

if config.ChanSize == 0 {
config.ChanSize = subscribeChanSizeDefault
}

for _, kind := range config.Kinds {
if _, ok := allKinds[kind]; !ok {
panic(fmt.Errorf("unknown event kind: %s", kind))
}
Expand All @@ -771,15 +829,15 @@ func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

subChan := make(chan *Event, subscribeChanSize)
subChan := make(chan *Event, config.ChanSize)

// Just gives us an easy way of removing the subscription again later.
subID := c.subscriptionsSeq
c.subscriptionsSeq++

c.subscriptions[subID] = &eventSubscription{
Chan: subChan,
Kinds: sliceutil.KeyBy(kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
}

cancel := func() {
Expand Down
Loading
Loading