From 9e5cac714041f46fb4f66bcc2da9db52aa37515f Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sat, 16 Nov 2024 14:37:13 -0600 Subject: [PATCH] unified insert path for periodic jobs (#679) Job insert middleware were not being utilized for periodic jobs, resulting in behavioral differences (especially Pro features). This insertion path has been refactored to rely on the unified insertion path from the client. Fixes #675. --- CHANGELOG.md | 1 + client.go | 41 +++++--- client_test.go | 33 +++++++ internal/maintenance/job_scheduler.go | 3 + internal/maintenance/periodic_job_enqueuer.go | 42 +++----- .../maintenance/periodic_job_enqueuer_test.go | 96 ++++--------------- 6 files changed, 97 insertions(+), 119 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 308cfad9..fee22369 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Cancellation of running jobs relied on a channel that was only being received when in the job fetch routine, meaning that jobs which were cancelled would not be cancelled until the next scheduled fetch. This was fixed by also receiving from the job cancellation channel when in the main producer loop, even if no fetches are happening. [PR #678](https://github.com/riverqueue/river/pull/678). +- Job insert middleware were not being utilized for periodic jobs. This insertion path has been refactored to rely on the unified insertion path from the client. Fixes #675. [PR #679](https://github.com/riverqueue/river/pull/679). ## [0.14.1] - 2024-11-04 diff --git a/client.go b/client.go index e3df81f4..0032c683 100644 --- a/client.go +++ b/client.go @@ -611,7 +611,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{ AdvisoryLockPrefix: config.AdvisoryLockPrefix, - NotifyInsert: client.maybeNotifyInsertForQueues, + Insert: client.insertMany, }, driver.GetExecutor()) maintenanceServices = append(maintenanceServices, periodicJobEnqueuer) client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals @@ -1335,7 +1335,7 @@ func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts * func (c *Client[TTx]) insert(ctx context.Context, tx riverdriver.ExecutorTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) { params := []InsertManyParams{{Args: args, InsertOpts: opts}} - results, err := c.insertMany(ctx, tx, params) + results, err := c.validateParamsAndInsertMany(ctx, tx, params) if err != nil { return nil, err } @@ -1386,7 +1386,7 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) } defer tx.Rollback(ctx) - inserted, err := c.insertMany(ctx, tx, params) + inserted, err := c.validateParamsAndInsertMany(ctx, tx, params) if err != nil { return nil, err } @@ -1421,11 +1421,26 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) // commits, and if the transaction rolls back, so too is the inserted job. func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { exec := c.driver.UnwrapExecutor(tx) - return c.insertMany(ctx, exec, params) + return c.validateParamsAndInsertMany(ctx, exec, params) } -func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { - return c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) { +// validateParamsAndInsertMany is a helper method that wraps the insertMany +// method to provide param validation and conversion prior to calling the actual +// insertMany method. This allows insertMany to be reused by the +// PeriodicJobEnqueuer which cannot reference top-level river package types. +func (c *Client[TTx]) validateParamsAndInsertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { + insertParams, err := c.insertManyParams(params) + if err != nil { + return nil, err + } + + return c.insertMany(ctx, tx, insertParams) +} + +// insertMany is a shared code path for InsertMany and InsertManyTx, also used +// by the PeriodicJobEnqueuer. +func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) { + return c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) { results, err := c.pilot.JobInsertMany(ctx, tx, insertParams) if err != nil { return nil, err @@ -1446,14 +1461,9 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, func (c *Client[TTx]) insertManyShared( ctx context.Context, tx riverdriver.ExecutorTx, - rawParams []InsertManyParams, + insertParams []*rivertype.JobInsertParams, execute func(context.Context, []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error), ) ([]*rivertype.JobInsertResult, error) { - insertParams, err := c.insertManyParams(rawParams) - if err != nil { - return nil, err - } - doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { finalInsertParams := sliceutil.Map(insertParams, func(params *rivertype.JobInsertParams) *riverdriver.JobInsertFastParams { return (*riverdriver.JobInsertFastParams)(params) @@ -1584,7 +1594,12 @@ func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []Ins } func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) (int, error) { - results, err := c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) { + insertParams, err := c.insertManyParams(params) + if err != nil { + return 0, err + } + + results, err := c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) { count, err := tx.JobInsertFastManyNoReturning(ctx, insertParams) if err != nil { return nil, err diff --git a/client_test.go b/client_test.go index 8b47c4f2..49451d4a 100644 --- a/client_test.go +++ b/client_test.go @@ -3559,6 +3559,39 @@ func Test_Client_Maintenance(t *testing.T) { } }) + t.Run("PeriodicJobEnqueuerUsesMiddleware", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, nil) + + worker := &periodicJobWorker{} + AddWorker(config.Workers, worker) + config.PeriodicJobs = []*PeriodicJob{ + NewPeriodicJob(cron.Every(time.Minute), func() (JobArgs, *InsertOpts) { + return periodicJobArgs{}, nil + }, &PeriodicJobOpts{RunOnStart: true}), + } + config.JobInsertMiddleware = []rivertype.JobInsertMiddleware{&overridableJobMiddleware{ + insertManyFunc: func(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) { + for _, job := range manyParams { + job.EncodedArgs = []byte(`{"from": "middleware"}`) + } + return doInner(ctx) + }, + }} + + client, bundle := setup(t, config) + + startAndWaitForQueueMaintainer(ctx, t, client) + + svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer) + svc.TestSignals.InsertedJobs.WaitOrTimeout() + + jobs, err := bundle.exec.JobGetByKindMany(ctx, []string{(periodicJobArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 1, "Expected to find exactly one job of kind: "+(periodicJobArgs{}).Kind()) + }) + t.Run("QueueCleaner", func(t *testing.T) { t.Parallel() diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index b09bc464..2f9a1b0c 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -15,6 +15,7 @@ import ( "github.com/riverqueue/river/rivershared/util/serviceutil" "github.com/riverqueue/river/rivershared/util/timeutil" "github.com/riverqueue/river/rivershared/util/valutil" + "github.com/riverqueue/river/rivertype" ) const ( @@ -33,6 +34,8 @@ func (ts *JobSchedulerTestSignals) Init() { ts.ScheduledBatch.Init() } +type InsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) + // NotifyInsert is a function to call to emit notifications for queues where // jobs were scheduled. type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 6ccf11c6..68b4a987 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -21,16 +21,14 @@ var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert") // Test-only properties. type PeriodicJobEnqueuerTestSignals struct { - EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop - InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted - NotifiedQueues testsignal.TestSignal[[]string] // notifies when queues are sent an insert notification - SkippedJob testsignal.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams + EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop + InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted + SkippedJob testsignal.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams } func (ts *PeriodicJobEnqueuerTestSignals) Init() { ts.EnteredLoop.Init() ts.InsertedJobs.Init() - ts.NotifiedQueues.Init() ts.SkippedJob.Init() } @@ -59,9 +57,8 @@ func (j *PeriodicJob) mustValidate() *PeriodicJob { type PeriodicJobEnqueuerConfig struct { AdvisoryLockPrefix int32 - // NotifyInsert is a function to call to emit notifications for queues - // where jobs were scheduled. - NotifyInsert NotifyInsertFunc + // Insert is the function to call to insert jobs into the database. + Insert InsertFunc // PeriodicJobs are the periodic jobs with which to configure the enqueuer. PeriodicJobs []*PeriodicJob @@ -104,7 +101,7 @@ func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJo svc := baseservice.Init(archetype, &PeriodicJobEnqueuer{ Config: (&PeriodicJobEnqueuerConfig{ AdvisoryLockPrefix: config.AdvisoryLockPrefix, - NotifyInsert: config.NotifyInsert, + Insert: config.Insert, PeriodicJobs: config.PeriodicJobs, }).mustValidate(), @@ -223,7 +220,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { defer s.mu.RUnlock() var ( - insertParamsMany []*riverdriver.JobInsertFastParams + insertParamsMany []*rivertype.JobInsertParams now = s.Time.NowUTC() ) @@ -269,7 +266,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { for { select { case <-timerUntilNextRun.C: - var insertParamsMany []*riverdriver.JobInsertFastParams + var insertParamsMany []*rivertype.JobInsertParams now := s.Time.NowUTC() @@ -329,7 +326,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { return nil } -func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*riverdriver.JobInsertFastParams) { +func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*rivertype.JobInsertParams) { if len(insertParamsMany) == 0 { return } @@ -341,28 +338,13 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany } defer tx.Rollback(ctx) - queues := make([]string, 0, len(insertParamsMany)) - if len(insertParamsMany) > 0 { - results, err := tx.JobInsertFastMany(ctx, insertParamsMany) + _, err := s.Config.Insert(ctx, tx, insertParamsMany) if err != nil { s.Logger.ErrorContext(ctx, s.Name+": Error inserting periodic jobs", "error", err.Error(), "num_jobs", len(insertParamsMany)) return } - for _, result := range results { - if !result.UniqueSkippedAsDuplicate { - queues = append(queues, result.Job.Queue) - } - } - } - - if len(queues) > 0 { - if err := s.Config.NotifyInsert(ctx, tx, queues); err != nil { - s.Logger.ErrorContext(ctx, s.Name+": Error notifying insert", "error", err.Error()) - return - } - s.TestSignals.NotifiedQueues.Signal(queues) } if err := tx.Commit(ctx); err != nil { @@ -373,7 +355,7 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany s.TestSignals.InsertedJobs.Signal(struct{}{}) } -func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*rivertype.JobInsertParams, error), scheduledAt time.Time) (*riverdriver.JobInsertFastParams, bool) { +func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*rivertype.JobInsertParams, error), scheduledAt time.Time) (*rivertype.JobInsertParams, bool) { insertParams, err := constructorFunc() if err != nil { if errors.Is(err, ErrNoJobToInsert) { @@ -389,7 +371,7 @@ func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, c insertParams.ScheduledAt = &scheduledAt } - return (*riverdriver.JobInsertFastParams)(insertParams), true + return insertParams, true } const periodicJobEnqueuerVeryLongDuration = 24 * time.Hour diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index 189ff9fc..55fc0ca7 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -2,7 +2,6 @@ package maintenance import ( "context" - "errors" "fmt" "sync" "testing" @@ -19,6 +18,7 @@ import ( "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivershared/util/randutil" + "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" ) @@ -76,6 +76,23 @@ func TestPeriodicJobEnqueuer(t *testing.T) { } } + // A simplified version of `Client.insertMany` that only inserts jobs directly + // via the driver instead of using the pilot. + insertFunc := func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) { + finalInsertParams := sliceutil.Map(insertParams, func(params *rivertype.JobInsertParams) *riverdriver.JobInsertFastParams { + return (*riverdriver.JobInsertFastParams)(params) + }) + results, err := tx.JobInsertFastMany(ctx, finalInsertParams) + if err != nil { + return nil, err + } + return sliceutil.Map(results, + func(result *riverdriver.JobInsertFastResult) *rivertype.JobInsertResult { + return (*rivertype.JobInsertResult)(result) + }, + ), nil + } + setup := func(t *testing.T) (*PeriodicJobEnqueuer, *testBundle) { t.Helper() @@ -85,16 +102,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { waitChan: make(chan struct{}), } - svc := NewPeriodicJobEnqueuer( - riversharedtest.BaseServiceArchetype(t), - &PeriodicJobEnqueuerConfig{ - NotifyInsert: func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { - for _, queue := range queues { - bundle.notificationsByQueue[queue]++ - } - return nil - }, - }, bundle.exec) + svc := NewPeriodicJobEnqueuer(riversharedtest.BaseServiceArchetype(t), &PeriodicJobEnqueuerConfig{Insert: insertFunc}, bundle.exec) svc.StaggerStartupDisable(true) svc.TestSignals.Init() @@ -208,8 +216,6 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc.TestSignals.InsertedJobs.WaitOrTimeout() requireNJobs(t, bundle.exec, "unique_periodic_job_500ms", 1) - // This initial insert should emit a notification: - svc.TestSignals.NotifiedQueues.WaitOrTimeout() // Another insert was attempted, but there's still only one job due to // uniqueness conditions. @@ -218,14 +224,6 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc.TestSignals.InsertedJobs.WaitOrTimeout() requireNJobs(t, bundle.exec, "unique_periodic_job_500ms", 1) - - // Ensure that no notifications were emitted beyond the first one because no - // additional jobs were inserted: - select { - case queues := <-svc.TestSignals.NotifiedQueues.WaitC(): - t.Fatalf("Expected no notification to be emitted, but got one for queues: %v", queues) - case <-time.After(100 * time.Millisecond): - } }) t.Run("RunOnStart", func(t *testing.T) { @@ -350,7 +348,6 @@ func TestPeriodicJobEnqueuer(t *testing.T) { for i := 0; i < 100; i++ { svc.TestSignals.InsertedJobs.WaitOrTimeout() - svc.TestSignals.NotifiedQueues.WaitOrTimeout() } }) @@ -362,7 +359,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc := NewPeriodicJobEnqueuer( riversharedtest.BaseServiceArchetype(t), &PeriodicJobEnqueuerConfig{ - NotifyInsert: func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { return nil }, + Insert: insertFunc, PeriodicJobs: []*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false), RunOnStart: true}, {ScheduleFunc: periodicIntervalSchedule(1500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_1500ms", false), RunOnStart: true}, @@ -565,59 +562,6 @@ func TestPeriodicJobEnqueuer(t *testing.T) { riversharedtest.WaitOrTimeout(t, stopped) }) - t.Run("TriggersNotificationsOnEachQueueWithNewlyAvailableJobs", func(t *testing.T) { - t.Parallel() - - svc, _ := setup(t) - - svc.AddMany([]*PeriodicJob{ - {ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorWithQueueFunc("periodic_job_5s", false, rivercommon.QueueDefault), RunOnStart: true}, - {ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorWithQueueFunc("periodic_job_15m", false, "queue2"), RunOnStart: true}, - {ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorWithQueueFunc("unique_periodic_job_5s", true, "unique"), RunOnStart: true}, - }) - - queueCh := make(chan []string, 1) - svc.Config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { - queueCh <- queues - return nil - } - - startService(t, svc) - svc.TestSignals.EnteredLoop.WaitOrTimeout() - - svc.TestSignals.InsertedJobs.WaitOrTimeout() - queues := svc.TestSignals.NotifiedQueues.WaitOrTimeout() - require.Equal(t, []string{rivercommon.QueueDefault, "queue2", "unique"}, queues) - require.Equal(t, queues, riversharedtest.WaitOrTimeout(t, queueCh)) - }) - - t.Run("RollsBackUponErrorFromNotificationAttempt", func(t *testing.T) { - t.Parallel() - - svc, bundle := setup(t) - - svc.AddMany([]*PeriodicJob{ - {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("unique_periodic_job_500ms", true)}, - }) - - svc.Config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { - return errors.New("test error") - } - - startService(t, svc) - svc.TestSignals.EnteredLoop.WaitOrTimeout() - - // Ensure that no jobs were inserted because the notification errored: - select { - case <-svc.TestSignals.InsertedJobs.WaitC(): - t.Fatal("Expected no jobs to be inserted, but one was") - case <-time.After(100 * time.Millisecond): - } - - // Should be no jobs in the DB either: - requireNJobs(t, bundle.exec, "unique_periodic_job_500ms", 0) - }) - t.Run("TimeUntilNextRun", func(t *testing.T) { t.Parallel()