Skip to content

Commit

Permalink
extract Client subscriptions into service (#379)
Browse files Browse the repository at this point in the history
* extract Client subscriptions into service

This change extracts the Client subscriptions logic into a separate
`startstop.Service` which can be started and stopped along with the
other services. The important change that enables this is switching from
a _callback_ for job events to a _channel_ for job events. The channel
is passed to the completer during init, and the completer then owns it
as the sole sender. When the completer is stopped, it must close the
channel to indicate that there are no more job completion events to be
processed.

This moves us closer to having all the key client services be able to
be managed as a single pool of services, and they can all have their
shutdown initiated in parallel. Importantly, this paves the way for
additional services to be added (even by external libraries) without
needing to deal with more complex startup & shutdown ordering scenarios.

In order to make this work with a client that can be started and stopped
repeatedly, a new `ResetSubscribeChan` method was added to the
`JobCompleter` interface to be called at the beginning of each
`Client.Start()` call.

* normalize subscription manager service

Respect a stop, but in the case of one, makes sure to clear the
subscription channel before leaving, which means that it still correctly
clears all events on a client shutdown. This gives us a way to use the
stress test because all we need to do is close the channel in advance
before calling startstoptest.Stress (the service is still a little weird
compared to other because it requires that channel close, but a little
less so).

Also normalizes things a bit by removing the custom Stop implementation,
which most services shouldn't need.

Co-Authored-By: Brandur Leach <brandur@brandur.org>

---------

Co-authored-by: Brandur Leach <brandur@brandur.org>
  • Loading branch information
bgentry and brandur authored Jun 10, 2024
1 parent e6dc4a0 commit 0e57338
Show file tree
Hide file tree
Showing 9 changed files with 688 additions and 400 deletions.
184 changes: 15 additions & 169 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ import (
"os"
"regexp"
"strings"
"sync"
"time"

"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/componentstatus"
"github.com/riverqueue/river/internal/dblist"
"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/internal/leadership"
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/maintenance/startstop"
Expand Down Expand Up @@ -304,6 +302,7 @@ type Client[TTx any] struct {
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
completerSubscribeCh chan []jobcompleter.CompleterJobUpdated
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
Expand All @@ -314,12 +313,7 @@ type Client[TTx any] struct {
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
services []startstop.Service
subscriptions map[int]*eventSubscription
subscriptionsMu sync.Mutex
subscriptionsSeq int // used for generating simple IDs
statsAggregate jobstats.JobStatistics
statsMu sync.Mutex
statsNumJobs int
subscriptionManager *subscriptionManager
stopped chan struct{}
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter
Expand Down Expand Up @@ -471,7 +465,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
driver: driver,
monitor: newClientMonitor(),
producersByQueueName: make(map[string]*producer),
subscriptions: make(map[int]*eventSubscription),
testSignals: clientTestSignals{},
uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
Expand All @@ -490,8 +483,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return nil, errMissingDatabasePoolWithQueues
}

client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())
client.services = append(client.services, client.completer)
client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor(), nil)
client.subscriptionManager = newSubscriptionManager(archetype, nil)
client.services = append(client.services, client.completer, client.subscriptionManager)

// In poll only mode, we don't try to initialize a notifier that uses
// listen/notify. Instead, each service polls for changes it's
Expand All @@ -517,7 +511,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
MaxWorkers: queueConfig.MaxWorkers,
Notifier: client.notifier,
Queue: queue,
QueueEventCallback: client.distributeQueueEvent,
QueueEventCallback: client.subscriptionManager.distributeQueueEvent,
RetryPolicy: config.RetryPolicy,
SchedulerInterval: config.schedulerInterval,
StatusFunc: client.monitor.SetProducerStatus,
Expand Down Expand Up @@ -666,6 +660,13 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return fmt.Errorf("error making initial connection to database: %w", err)
}

// Each time we start, we need a fresh completer subscribe channel to
// send job completion events on, because the completer will close it
// each time it shuts down.
c.completerSubscribeCh = make(chan []jobcompleter.CompleterJobUpdated, 10)
c.completer.ResetSubscribeChan(c.completerSubscribeCh)
c.subscriptionManager.ResetSubscribeChan(c.completerSubscribeCh)

// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
Expand Down Expand Up @@ -695,10 +696,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// Receives job complete notifications from the completer and
// distributes them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)

// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
Expand Down Expand Up @@ -758,17 +755,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
c.queueMaintainer,
))

// Remove all subscriptions and close corresponding channels.
func() {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

for subID, sub := range c.subscriptions {
close(sub.Chan)
delete(c.subscriptions, subID)
}
}()

// Shut down the monitor last so it can broadcast final status updates:
c.monitor.Stop()
}()
Expand Down Expand Up @@ -881,154 +867,14 @@ type SubscribeConfig struct {

// Special internal variant that lets us inject an overridden size.
func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
if config.ChanSize < 0 {
panic("SubscribeConfig.ChanSize must be greater or equal to 1")
}
if config.ChanSize == 0 {
config.ChanSize = subscribeChanSizeDefault
}

for _, kind := range config.Kinds {
if _, ok := allKinds[kind]; !ok {
panic(fmt.Errorf("unknown event kind: %s", kind))
}
}

c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

subChan := make(chan *Event, config.ChanSize)

// Just gives us an easy way of removing the subscription again later.
subID := c.subscriptionsSeq
c.subscriptionsSeq++

c.subscriptions[subID] = &eventSubscription{
Chan: subChan,
Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
}

cancel := func() {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

// May no longer be present in case this was called after a stop.
sub, ok := c.subscriptions[subID]
if !ok {
return
}

close(sub.Chan)

delete(c.subscriptions, subID)
}

return subChan, cancel
}

// Distribute a single event into any listening subscriber channels.
//
// Job events should specify the job and stats, while queue events should only specify
// the queue.
func (c *Client[TTx]) distributeJobEvent(job *rivertype.JobRow, stats *JobStatistics) {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

// Quick path so we don't need to allocate anything if no one is listening.
if len(c.subscriptions) < 1 {
return
}

var event *Event
switch job.State {
case rivertype.JobStateCancelled:
event = &Event{Kind: EventKindJobCancelled, Job: job, JobStats: stats}
case rivertype.JobStateCompleted:
event = &Event{Kind: EventKindJobCompleted, Job: job, JobStats: stats}
case rivertype.JobStateScheduled:
event = &Event{Kind: EventKindJobSnoozed, Job: job, JobStats: stats}
case rivertype.JobStateAvailable, rivertype.JobStateDiscarded, rivertype.JobStateRetryable, rivertype.JobStateRunning:
event = &Event{Kind: EventKindJobFailed, Job: job, JobStats: stats}
case rivertype.JobStatePending:
panic("completion subscriber unexpectedly received job in pending state, river bug")
default:
// linter exhaustive rule prevents this from being reached
panic("unreachable state to distribute, river bug")
}

// All subscription channels are non-blocking so this is always fast and
// there's no risk of falling behind what producers are sending.
for _, sub := range c.subscriptions {
if sub.ListensFor(event.Kind) {
select {
case sub.Chan <- event:
default:
}
}
}
}

func (c *Client[TTx]) distributeQueueEvent(event *Event) {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

// All subscription channels are non-blocking so this is always fast and
// there's no risk of falling behind what producers are sending.
for _, sub := range c.subscriptions {
if sub.ListensFor(event.Kind) {
select {
case sub.Chan <- event:
default:
}
}
}
}

// Callback invoked by the completer and which prompts the client to update
// statistics and distribute jobs into any listening subscriber channels.
// (Subscriber channels are non-blocking so this should be quite fast.)
func (c *Client[TTx]) distributeJobCompleterCallback(update jobcompleter.CompleterJobUpdated) {
func() {
c.statsMu.Lock()
defer c.statsMu.Unlock()

stats := update.JobStats
c.statsAggregate.CompleteDuration += stats.CompleteDuration
c.statsAggregate.QueueWaitDuration += stats.QueueWaitDuration
c.statsAggregate.RunDuration += stats.RunDuration
c.statsNumJobs++
}()

c.distributeJobEvent(update.Job, jobStatisticsFromInternal(update.JobStats))
return c.subscriptionManager.SubscribeConfig(config)
}

// Dump aggregate stats from job completions to logs periodically. These
// numbers don't mean much in themselves, but can give a rough idea of the
// proportions of each compared to each other, and may help flag outlying values
// indicative of a problem.
func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, stopped chan struct{}) error {
// Handles a potential divide by zero.
safeDurationAverage := func(d time.Duration, n int) time.Duration {
if n == 0 {
return 0
}
return d / time.Duration(n)
}

logStats := func() {
c.statsMu.Lock()
defer c.statsMu.Unlock()

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Job stats (since last stats line)",
"num_jobs_run", c.statsNumJobs,
"average_complete_duration", safeDurationAverage(c.statsAggregate.CompleteDuration, c.statsNumJobs),
"average_queue_wait_duration", safeDurationAverage(c.statsAggregate.QueueWaitDuration, c.statsNumJobs),
"average_run_duration", safeDurationAverage(c.statsAggregate.RunDuration, c.statsNumJobs))

c.statsAggregate = jobstats.JobStatistics{}
c.statsNumJobs = 0
}

if !shouldStart {
return nil
}
Expand All @@ -1047,7 +893,7 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, stoppe
return

case <-ticker.C:
logStats()
c.subscriptionManager.logStats(ctx, c.baseService.Name)
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2957,7 +2957,7 @@ func Test_Client_Subscribe(t *testing.T) {
// Drops through immediately because the channel is closed.
riverinternaltest.WaitOrTimeout(t, subscribeChan)

require.Empty(t, client.subscriptions)
require.Empty(t, client.subscriptionManager.subscriptions)
})
}

Expand Down
Loading

0 comments on commit 0e57338

Please sign in to comment.