Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

database/sql: default to current time if scheduled_at unspecified #504

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- `database/sql` driver: fix default value of `scheduled_at` for `InsertManyTx` when it is not specified in `InsertOpts`. [PR #504](https://github.com/riverqueue/river/pull/504).

## [0.11.0] - 2024-08-02

### Added
Expand Down
5 changes: 2 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,9 +1169,8 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
}

// If the time is stubbed (in a test), use that for `created_at`. Otherwise,
// leave an empty value which will use the database's `now()` value, which
// keeps the timestamps of jobs inserted across many different computers
// more consistent (i.e. in case of minor time drifts).
// leave an empty value which will either use the database's `now()` or be defaulted
// by drivers as necessary.
createdAt := archetype.Time.NowUTCOrNil()

maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts)
Expand Down
15 changes: 15 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,21 @@ func Test_Client_InsertManyTx(t *testing.T) {
require.Len(t, jobs, 2, "Expected to find exactly two jobs of kind: "+(noOpArgs{}).Kind())
})

t.Run("SetsScheduledAtToNowByDefault", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

_, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}})
require.NoError(t, err)

insertedJobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()})
require.NoError(t, err)
require.Len(t, insertedJobs, 1)
require.Equal(t, rivertype.JobStateAvailable, insertedJobs[0].State)
require.WithinDuration(t, time.Now(), insertedJobs[0].ScheduledAt, 2*time.Second)
})

t.Run("SupportsScheduledJobs", func(t *testing.T) {
t.Parallel()

Expand Down
116 changes: 74 additions & 42 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,53 +899,85 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
t.Run("JobInsertFastMany", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)
t.Run("AllArgs", func(t *testing.T) {
exec, _ := setup(ctx, t)

// This test needs to use a time from before the transaction begins, otherwise
// the newly-scheduled jobs won't yet show as available because their
// scheduled_at (which gets a default value from time.Now() in code) will be
// after the start of the transaction.
now := time.Now().UTC().Add(-1 * time.Minute)
// This test needs to use a time from before the transaction begins, otherwise
// the newly-scheduled jobs won't yet show as available because their
// scheduled_at (which gets a default value from time.Now() in code) will be
// after the start of the transaction.
now := time.Now().UTC().Add(-1 * time.Minute)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "data"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
ScheduledAt: &now,
State: rivertype.JobStateAvailable,
Tags: []string{"tag"},
insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "data"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
ScheduledAt: &now,
State: rivertype.JobStateAvailable,
Tags: []string{"tag"},
}
insertParams[i].ScheduledAt = &now
}
insertParams[i].ScheduledAt = &now
}

count, err := exec.JobInsertFastMany(ctx, insertParams)
require.NoError(t, err)
require.Len(t, insertParams, count)
count, err := exec.JobInsertFastMany(ctx, insertParams)
require.NoError(t, err)
require.Len(t, insertParams, count)

jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"})
require.NoError(t, err)
require.Len(t, jobsAfter, len(insertParams))
for _, job := range jobsAfter {
require.Equal(t, 0, job.Attempt)
require.Nil(t, job.AttemptedAt)
require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second)
require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs)
require.Empty(t, job.Errors)
require.Nil(t, job.FinalizedAt)
require.Equal(t, "test_kind", job.Kind)
require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts)
require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata)
require.Equal(t, rivercommon.PriorityDefault, job.Priority)
require.Equal(t, rivercommon.QueueDefault, job.Queue)
requireEqualTime(t, now, job.ScheduledAt)
require.Equal(t, rivertype.JobStateAvailable, job.State)
require.Equal(t, []string{"tag"}, job.Tags)
}
})

jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"})
require.NoError(t, err)
require.Len(t, jobsAfter, len(insertParams))
for _, job := range jobsAfter {
require.Equal(t, 0, job.Attempt)
require.Nil(t, job.AttemptedAt)
require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second)
require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs)
require.Empty(t, job.Errors)
require.Nil(t, job.FinalizedAt)
require.Equal(t, "test_kind", job.Kind)
require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts)
require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata)
require.Equal(t, rivercommon.PriorityDefault, job.Priority)
require.Equal(t, rivercommon.QueueDefault, job.Queue)
requireEqualTime(t, now, job.ScheduledAt)
require.Equal(t, rivertype.JobStateAvailable, job.State)
require.Equal(t, []string{"tag"}, job.Tags)
}
t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) {
exec, _ := setup(ctx, t)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "data"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
ScheduledAt: nil, // explicit nil
State: rivertype.JobStateAvailable,
Tags: []string{"tag"},
}
}

count, err := exec.JobInsertFastMany(ctx, insertParams)
require.NoError(t, err)
require.Len(t, insertParams, count)

jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"})
require.NoError(t, err)
require.Len(t, jobsAfter, len(insertParams))
for _, job := range jobsAfter {
require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second)
}
})
})

t.Run("JobInsertFull", func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,12 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.
State: make([]dbsqlc.RiverJobState, len(params)),
Tags: make([]string, len(params)),
}
now := time.Now()

for i := 0; i < len(params); i++ {
params := params[i]

var scheduledAt time.Time
scheduledAt := now
if params.ScheduledAt != nil {
scheduledAt = *params.ScheduledAt
}
Expand Down
Loading