Skip to content

Commit

Permalink
remove deprecated advisory lock uniqueness, consolidate insert logic
Browse files Browse the repository at this point in the history
This removes the original unique jobs implementation in its entirety. It
was already deprecated in the previous release.

All known use cases are better supported with the new unique jobs
implementation which is also dramatically faster and supports batch
insertion.

As part of this change, single insertions now inherit the behavior of
batch insertions as far as always setting a `scheduled_at` time in the
job args prior to hitting the database. This is due to the difficulty of
trying to pass an array of nullable timestamps for `scheduled_at` to the
database using sqlc. One side effect of this is that some tests needed
to be updated because they run in a transaction, which locks in a
particular `now()` time used in `JobGetAvailble` by default. Jobs
inserted _after_ the start of that transaction would pick up a scheduled
timestamp from Go code that is _later_ than the database transaction's
timestamp, and so those jobs would never run. This was fixed in some
cases by allowing `now` to be overridden by lower level callers of that
query, whereas other tests were updated to simply insert jobs with a
past `scheduled_at` to ensure their visibility.
  • Loading branch information
bgentry committed Sep 26, 2024
1 parent ce3cfbf commit 9184e75
Show file tree
Hide file tree
Showing 21 changed files with 288 additions and 1,110 deletions.
126 changes: 47 additions & 79 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ type Client[TTx any] struct {
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter // deprecated fallback path for unique job insertion

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -495,10 +494,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
driver: driver,
producersByQueueName: make(map[string]*producer),
testSignals: clientTestSignals{},
uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
}),
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
}
client.queues = &QueueBundle{addProducer: client.addProducer}

Expand Down Expand Up @@ -1162,10 +1158,10 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts, bulk bool) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
return nil, fmt.Errorf("error marshaling args to JSON: %w", err)
}

if insertOpts == nil {
Expand All @@ -1187,7 +1183,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault)

if err := validateQueueName(queue); err != nil {
return nil, nil, err
return nil, err
}

tags := insertOpts.Tags
Expand All @@ -1199,24 +1195,24 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
} else {
for _, tag := range tags {
if len(tag) > 255 {
return nil, nil, errors.New("tags should be a maximum of 255 characters long")
return nil, errors.New("tags should be a maximum of 255 characters long")
}
if !tagRE.MatchString(tag) {
return nil, nil, errors.New("tags should match regex " + tagRE.String())
return nil, errors.New("tags should match regex " + tagRE.String())
}
}
}

if priority > 4 {
return nil, nil, errors.New("priority must be between 1 and 4")
return nil, errors.New("priority must be between 1 and 4")
}

uniqueOpts := insertOpts.UniqueOpts
if uniqueOpts.isEmpty() {
uniqueOpts = jobInsertOpts.UniqueOpts
}
if err := uniqueOpts.validate(); err != nil {
return nil, nil, err
return nil, err
}

metadata := insertOpts.Metadata
Expand All @@ -1236,21 +1232,13 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
State: rivertype.JobStateAvailable,
Tags: tags,
}
var returnUniqueOpts *dbunique.UniqueOpts
if !uniqueOpts.isEmpty() {
if uniqueOpts.isV1() {
if bulk {
return nil, nil, errors.New("bulk inserts do not support advisory lock uniqueness and cannot remove required states")
}
returnUniqueOpts = (*dbunique.UniqueOpts)(&uniqueOpts)
} else {
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
if err != nil {
return nil, nil, err
}
insertParams.UniqueStates = internalUniqueOpts.StateBitmask()
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
if err != nil {
return nil, err
}
insertParams.UniqueStates = internalUniqueOpts.StateBitmask()
}

switch {
Expand All @@ -1270,7 +1258,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
insertParams.State = rivertype.JobStatePending
}

return insertParams, returnUniqueOpts, nil
return insertParams, nil
}

var errNoDriverDBPool = errors.New("driver must have non-nil database pool to use non-transactional methods like Insert and InsertMany (try InsertTx or InsertManyTx instead")
Expand All @@ -1290,7 +1278,21 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
return nil, errNoDriverDBPool
}

return c.insert(ctx, c.driver.GetExecutor(), args, opts, false)
tx, err := c.driver.GetExecutor().Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

inserted, err := c.insert(ctx, tx, args, opts)
if err != nil {
return nil, err
}

if err := tx.Commit(ctx); err != nil {
return nil, err
}
return inserted, nil
}

// InsertTx inserts a new job with the provided args on the given transaction.
Expand All @@ -1311,52 +1313,28 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
// transactions, the job will not be worked until the transaction has committed,
// and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts, false)
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts)
}

func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts, bulk bool) (*rivertype.JobInsertResult, error) {
if err := c.validateJobArgs(args); err != nil {
return nil, err
}

params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts, bulk)
if err != nil {
return nil, err
}

tx, err := exec.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

// TODO: consolidate insertion paths for single + multi, remove deprecated uniqueness design
var jobInsertRes *riverdriver.JobInsertFastResult
if uniqueOpts == nil {
jobInsertRes, err = tx.JobInsertFast(ctx, params)
if err != nil {
return nil, err
}
} else {
if bulk {
return nil, errors.New("bulk inserts do not support advisory lock uniqueness")
}
// Old deprecated advisory lock route
c.baseService.Logger.WarnContext(ctx, "Using deprecated advisory lock uniqueness for job insert")
jobInsertRes, err = c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
func (c *Client[TTx]) insert(ctx context.Context, tx riverdriver.ExecutorTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
params := []InsertManyParams{{Args: args, InsertOpts: opts}}
results, err := c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
results, err := tx.JobInsertFastMany(ctx, insertParams)
if err != nil {
return nil, err
}
}

if err := c.maybeNotifyInsert(ctx, tx, params.State, params.Queue); err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return sliceutil.Map(results,
func(result *riverdriver.JobInsertFastResult) *rivertype.JobInsertResult {
return (*rivertype.JobInsertResult)(result)
},
), nil
})
if err != nil {
return nil, err
}

return (*rivertype.JobInsertResult)(jobInsertRes), nil
return results[0], nil
}

// InsertManyParams encapsulates a single job combined with insert options for
Expand Down Expand Up @@ -1455,13 +1433,10 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
})
}

// The shared code path for all InsertMany methods. It takes a function that
// executes the actual insert operation and allows for different implementations
// of the insert query to be passed in, each mapping their results back to a
// common result type.
//
// TODO(bgentry): this isn't yet used for the single insert path. The only thing
// blocking that is the removal of advisory lock unique inserts.
// The shared code path for all Insert and InsertMany methods. It takes a
// function that executes the actual insert operation and allows for different
// implementations of the insert query to be passed in, each mapping their
// results back to a common result type.
func (c *Client[TTx]) insertManyShared(
ctx context.Context,
tx riverdriver.ExecutorTx,
Expand Down Expand Up @@ -1503,7 +1478,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
return nil, err
}

insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true)
insertParamsItem, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1598,13 +1573,6 @@ func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.Executo
return len(results), nil
}

func (c *Client[TTx]) maybeNotifyInsert(ctx context.Context, tx riverdriver.ExecutorTx, state rivertype.JobState, queue string) error {
if state != rivertype.JobStateAvailable {
return nil
}
return c.maybeNotifyInsertForQueues(ctx, tx, []string{queue})
}

// Notify the given queues that new jobs are available. The queues list will be
// deduplicated and each will be checked to see if it is due for an insert
// notification from this client.
Expand Down
Loading

0 comments on commit 9184e75

Please sign in to comment.