Skip to content

Commit

Permalink
database/sql: fix InsertManyTx default scheduled_at
Browse files Browse the repository at this point in the history
The database/sql driver had a slightly different behavior than the pgx
driver in that it was setting the `scheduled_at` time to `null`, rather
than setting the current timestamp if one was not specified.

Additionally, the driver test suite did not cover this case at all.

Fix both of these issues. Fixes #502.
  • Loading branch information
bgentry committed Aug 5, 2024
1 parent 330176c commit aaf8a18
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 46 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- `database/sql` driver: fix default value of `scheduled_at` for `InsertManyTx` when it is not specified in `InsertOpts`. [PR #504](https://github.com/riverqueue/river/pull/504).

## [0.11.0] - 2024-08-02

### Added
Expand Down
5 changes: 2 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,9 +1169,8 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
}

// If the time is stubbed (in a test), use that for `created_at`. Otherwise,
// leave an empty value which will use the database's `now()` value, which
// keeps the timestamps of jobs inserted across many different computers
// more consistent (i.e. in case of minor time drifts).
// leave an empty value which will either use the database's `now()` or be defaulted
// by drivers as necessary.
createdAt := archetype.Time.NowUTCOrNil()

maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts)
Expand Down
116 changes: 74 additions & 42 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,53 +899,85 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
t.Run("JobInsertFastMany", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)
t.Run("AllArgs", func(t *testing.T) {
exec, _ := setup(ctx, t)

// This test needs to use a time from before the transaction begins, otherwise
// the newly-scheduled jobs won't yet show as available because their
// scheduled_at (which gets a default value from time.Now() in code) will be
// after the start of the transaction.
now := time.Now().UTC().Add(-1 * time.Minute)
// This test needs to use a time from before the transaction begins, otherwise
// the newly-scheduled jobs won't yet show as available because their
// scheduled_at (which gets a default value from time.Now() in code) will be
// after the start of the transaction.
now := time.Now().UTC().Add(-1 * time.Minute)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "data"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
ScheduledAt: &now,
State: rivertype.JobStateAvailable,
Tags: []string{"tag"},
insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "data"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
ScheduledAt: &now,
State: rivertype.JobStateAvailable,
Tags: []string{"tag"},
}
insertParams[i].ScheduledAt = &now
}
insertParams[i].ScheduledAt = &now
}

count, err := exec.JobInsertFastMany(ctx, insertParams)
require.NoError(t, err)
require.Len(t, insertParams, count)
count, err := exec.JobInsertFastMany(ctx, insertParams)
require.NoError(t, err)
require.Len(t, insertParams, count)

jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"})
require.NoError(t, err)
require.Len(t, jobsAfter, len(insertParams))
for _, job := range jobsAfter {
require.Equal(t, 0, job.Attempt)
require.Nil(t, job.AttemptedAt)
require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second)
require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs)
require.Empty(t, job.Errors)
require.Nil(t, job.FinalizedAt)
require.Equal(t, "test_kind", job.Kind)
require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts)
require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata)
require.Equal(t, rivercommon.PriorityDefault, job.Priority)
require.Equal(t, rivercommon.QueueDefault, job.Queue)
requireEqualTime(t, now, job.ScheduledAt)
require.Equal(t, rivertype.JobStateAvailable, job.State)
require.Equal(t, []string{"tag"}, job.Tags)
}
})

jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"})
require.NoError(t, err)
require.Len(t, jobsAfter, len(insertParams))
for _, job := range jobsAfter {
require.Equal(t, 0, job.Attempt)
require.Nil(t, job.AttemptedAt)
require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second)
require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs)
require.Empty(t, job.Errors)
require.Nil(t, job.FinalizedAt)
require.Equal(t, "test_kind", job.Kind)
require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts)
require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata)
require.Equal(t, rivercommon.PriorityDefault, job.Priority)
require.Equal(t, rivercommon.QueueDefault, job.Queue)
requireEqualTime(t, now, job.ScheduledAt)
require.Equal(t, rivertype.JobStateAvailable, job.State)
require.Equal(t, []string{"tag"}, job.Tags)
}
t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) {
exec, _ := setup(ctx, t)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "data"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
ScheduledAt: nil, // explicit nil
State: rivertype.JobStateAvailable,
Tags: []string{"tag"},
}
}

count, err := exec.JobInsertFastMany(ctx, insertParams)
require.NoError(t, err)
require.Len(t, insertParams, count)

jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"})
require.NoError(t, err)
require.Len(t, jobsAfter, len(insertParams))
for _, job := range jobsAfter {
require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second)
}
})
})

t.Run("JobInsertFull", func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,12 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.
State: make([]dbsqlc.RiverJobState, len(params)),
Tags: make([]string, len(params)),
}
now := time.Now()

for i := 0; i < len(params); i++ {
params := params[i]

var scheduledAt time.Time
scheduledAt := now
if params.ScheduledAt != nil {
scheduledAt = *params.ScheduledAt
}
Expand Down

0 comments on commit aaf8a18

Please sign in to comment.