Skip to content

Commit

Permalink
unified insert path for periodic jobs (#679)
Browse files Browse the repository at this point in the history
Job insert middleware were not being utilized for periodic jobs,
resulting in behavioral differences (especially Pro features). This
insertion path has been refactored to rely on the unified insertion path
from the client.

Fixes #675.
  • Loading branch information
bgentry authored Nov 16, 2024
1 parent e9cbdd0 commit 9e5cac7
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 119 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Cancellation of running jobs relied on a channel that was only being received when in the job fetch routine, meaning that jobs which were cancelled would not be cancelled until the next scheduled fetch. This was fixed by also receiving from the job cancellation channel when in the main producer loop, even if no fetches are happening. [PR #678](https://github.com/riverqueue/river/pull/678).
- Job insert middleware were not being utilized for periodic jobs. This insertion path has been refactored to rely on the unified insertion path from the client. Fixes #675. [PR #679](https://github.com/riverqueue/river/pull/679).

## [0.14.1] - 2024-11-04

Expand Down
41 changes: 28 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
{
periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
NotifyInsert: client.maybeNotifyInsertForQueues,
Insert: client.insertMany,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, periodicJobEnqueuer)
client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals
Expand Down Expand Up @@ -1335,7 +1335,7 @@ func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *

func (c *Client[TTx]) insert(ctx context.Context, tx riverdriver.ExecutorTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
params := []InsertManyParams{{Args: args, InsertOpts: opts}}
results, err := c.insertMany(ctx, tx, params)
results, err := c.validateParamsAndInsertMany(ctx, tx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1386,7 +1386,7 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
}
defer tx.Rollback(ctx)

inserted, err := c.insertMany(ctx, tx, params)
inserted, err := c.validateParamsAndInsertMany(ctx, tx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1421,11 +1421,26 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
exec := c.driver.UnwrapExecutor(tx)
return c.insertMany(ctx, exec, params)
return c.validateParamsAndInsertMany(ctx, exec, params)
}

func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
return c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
// validateParamsAndInsertMany is a helper method that wraps the insertMany
// method to provide param validation and conversion prior to calling the actual
// insertMany method. This allows insertMany to be reused by the
// PeriodicJobEnqueuer which cannot reference top-level river package types.
func (c *Client[TTx]) validateParamsAndInsertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(params)
if err != nil {
return nil, err
}

return c.insertMany(ctx, tx, insertParams)
}

// insertMany is a shared code path for InsertMany and InsertManyTx, also used
// by the PeriodicJobEnqueuer.
func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) {
return c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
results, err := c.pilot.JobInsertMany(ctx, tx, insertParams)
if err != nil {
return nil, err
Expand All @@ -1446,14 +1461,9 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
func (c *Client[TTx]) insertManyShared(
ctx context.Context,
tx riverdriver.ExecutorTx,
rawParams []InsertManyParams,
insertParams []*rivertype.JobInsertParams,
execute func(context.Context, []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error),
) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(rawParams)
if err != nil {
return nil, err
}

doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
finalInsertParams := sliceutil.Map(insertParams, func(params *rivertype.JobInsertParams) *riverdriver.JobInsertFastParams {
return (*riverdriver.JobInsertFastParams)(params)
Expand Down Expand Up @@ -1584,7 +1594,12 @@ func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []Ins
}

func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) (int, error) {
results, err := c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(params)
if err != nil {
return 0, err
}

results, err := c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
count, err := tx.JobInsertFastManyNoReturning(ctx, insertParams)
if err != nil {
return nil, err
Expand Down
33 changes: 33 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3559,6 +3559,39 @@ func Test_Client_Maintenance(t *testing.T) {
}
})

t.Run("PeriodicJobEnqueuerUsesMiddleware", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)

worker := &periodicJobWorker{}
AddWorker(config.Workers, worker)
config.PeriodicJobs = []*PeriodicJob{
NewPeriodicJob(cron.Every(time.Minute), func() (JobArgs, *InsertOpts) {
return periodicJobArgs{}, nil
}, &PeriodicJobOpts{RunOnStart: true}),
}
config.JobInsertMiddleware = []rivertype.JobInsertMiddleware{&overridableJobMiddleware{
insertManyFunc: func(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) {
for _, job := range manyParams {
job.EncodedArgs = []byte(`{"from": "middleware"}`)
}
return doInner(ctx)
},
}}

client, bundle := setup(t, config)

startAndWaitForQueueMaintainer(ctx, t, client)

svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
svc.TestSignals.InsertedJobs.WaitOrTimeout()

jobs, err := bundle.exec.JobGetByKindMany(ctx, []string{(periodicJobArgs{}).Kind()})
require.NoError(t, err)
require.Len(t, jobs, 1, "Expected to find exactly one job of kind: "+(periodicJobArgs{}).Kind())
})

t.Run("QueueCleaner", func(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 3 additions & 0 deletions internal/maintenance/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/timeutil"
"github.com/riverqueue/river/rivershared/util/valutil"
"github.com/riverqueue/river/rivertype"
)

const (
Expand All @@ -33,6 +34,8 @@ func (ts *JobSchedulerTestSignals) Init() {
ts.ScheduledBatch.Init()
}

type InsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error)

// NotifyInsert is a function to call to emit notifications for queues where
// jobs were scheduled.
type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error
Expand Down
42 changes: 12 additions & 30 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert")

// Test-only properties.
type PeriodicJobEnqueuerTestSignals struct {
EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop
InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted
NotifiedQueues testsignal.TestSignal[[]string] // notifies when queues are sent an insert notification
SkippedJob testsignal.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams
EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop
InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted
SkippedJob testsignal.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams
}

func (ts *PeriodicJobEnqueuerTestSignals) Init() {
ts.EnteredLoop.Init()
ts.InsertedJobs.Init()
ts.NotifiedQueues.Init()
ts.SkippedJob.Init()
}

Expand Down Expand Up @@ -59,9 +57,8 @@ func (j *PeriodicJob) mustValidate() *PeriodicJob {
type PeriodicJobEnqueuerConfig struct {
AdvisoryLockPrefix int32

// NotifyInsert is a function to call to emit notifications for queues
// where jobs were scheduled.
NotifyInsert NotifyInsertFunc
// Insert is the function to call to insert jobs into the database.
Insert InsertFunc

// PeriodicJobs are the periodic jobs with which to configure the enqueuer.
PeriodicJobs []*PeriodicJob
Expand Down Expand Up @@ -104,7 +101,7 @@ func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJo
svc := baseservice.Init(archetype, &PeriodicJobEnqueuer{
Config: (&PeriodicJobEnqueuerConfig{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
NotifyInsert: config.NotifyInsert,
Insert: config.Insert,
PeriodicJobs: config.PeriodicJobs,
}).mustValidate(),

Expand Down Expand Up @@ -223,7 +220,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
defer s.mu.RUnlock()

var (
insertParamsMany []*riverdriver.JobInsertFastParams
insertParamsMany []*rivertype.JobInsertParams
now = s.Time.NowUTC()
)

Expand Down Expand Up @@ -269,7 +266,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
for {
select {
case <-timerUntilNextRun.C:
var insertParamsMany []*riverdriver.JobInsertFastParams
var insertParamsMany []*rivertype.JobInsertParams

now := s.Time.NowUTC()

Expand Down Expand Up @@ -329,7 +326,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
return nil
}

func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*riverdriver.JobInsertFastParams) {
func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*rivertype.JobInsertParams) {
if len(insertParamsMany) == 0 {
return
}
Expand All @@ -341,28 +338,13 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany
}
defer tx.Rollback(ctx)

queues := make([]string, 0, len(insertParamsMany))

if len(insertParamsMany) > 0 {
results, err := tx.JobInsertFastMany(ctx, insertParamsMany)
_, err := s.Config.Insert(ctx, tx, insertParamsMany)
if err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error inserting periodic jobs",
"error", err.Error(), "num_jobs", len(insertParamsMany))
return
}
for _, result := range results {
if !result.UniqueSkippedAsDuplicate {
queues = append(queues, result.Job.Queue)
}
}
}

if len(queues) > 0 {
if err := s.Config.NotifyInsert(ctx, tx, queues); err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error notifying insert", "error", err.Error())
return
}
s.TestSignals.NotifiedQueues.Signal(queues)
}

if err := tx.Commit(ctx); err != nil {
Expand All @@ -373,7 +355,7 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany
s.TestSignals.InsertedJobs.Signal(struct{}{})
}

func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*rivertype.JobInsertParams, error), scheduledAt time.Time) (*riverdriver.JobInsertFastParams, bool) {
func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*rivertype.JobInsertParams, error), scheduledAt time.Time) (*rivertype.JobInsertParams, bool) {
insertParams, err := constructorFunc()
if err != nil {
if errors.Is(err, ErrNoJobToInsert) {
Expand All @@ -389,7 +371,7 @@ func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, c
insertParams.ScheduledAt = &scheduledAt
}

return (*riverdriver.JobInsertFastParams)(insertParams), true
return insertParams, true
}

const periodicJobEnqueuerVeryLongDuration = 24 * time.Hour
Expand Down
Loading

0 comments on commit 9e5cac7

Please sign in to comment.