From a8678fb67d4a1fbb960c49b23dfa7793a309bee4 Mon Sep 17 00:00:00 2001 From: Brandur Date: Tue, 14 Nov 2023 17:20:23 -0800 Subject: [PATCH] Refactor: Reduce rescuer LOCs / number of queries + optimize I've been looking at ways we could start consolidating queries in `river_job.sql` a bit because the file's currently quite a bit of a mess. One pretty easy one that came out is that we multiple queries for the rescuer that can be just one without much trouble. Here, make that change, which also allows to simplify the rescuer's core run loop by quite a bit and reduce the lines of code in there. Also expect a slight performance improvement as we need only one round trip to the database and the streamlined code lets us preallocate a number of Go slices and reduce the number of `append`s happening. --- client_test.go | 15 ++-- internal/dbsqlc/river_job.sql | 22 ++---- internal/dbsqlc/river_job.sql.go | 90 +++++++++------------- rescuer.go | 127 +++++++++++-------------------- rescuer_test.go | 13 ++-- 5 files changed, 101 insertions(+), 166 deletions(-) diff --git a/client_test.go b/client_test.go index 7f6a4f23..1bd42318 100644 --- a/client_test.go +++ b/client_test.go @@ -1035,26 +1035,27 @@ func Test_Client_Maintenance(t *testing.T) { // with the number of errors that corresponds to its attempt count. Without // that, the rescued/errored jobs can retry immediately with no backoff and // cause flakiness as they quickly get picked back up again. - var errors []AttemptError errorCount := int(params.Attempt - 1) if params.Attempt == 0 { errorCount = int(params.Attempt) } + + errorsBytes := make([][]byte, errorCount) for i := 0; i < errorCount; i++ { - errors = append(errors, AttemptError{ + var err error + errorsBytes[i], err = json.Marshal(AttemptError{ At: time.Now(), Error: "mocked error", Num: i + 1, Trace: "none", }) + require.NoError(t, err) } - errorBytes, err := marshalAllErrors(errors) - require.NoError(t, err) job, err := queries.JobInsert(ctx, dbtx, dbsqlc.JobInsertParams{ Attempt: valutil.FirstNonZero(params.Attempt, int16(1)), AttemptedAt: params.AttemptedAt, - Errors: errorBytes, + Errors: errorsBytes, FinalizedAt: params.FinalizedAt, Kind: valutil.FirstNonZero(params.Kind, "test_kind"), MaxAttempts: valutil.FirstNonZero(params.MaxAttempts, int16(rivercommon.DefaultMaxAttempts)), @@ -1222,10 +1223,8 @@ func Test_Client_Maintenance(t *testing.T) { client.testSignals.electedLeader.WaitOrTimeout() svc := maintenance.GetService[*rescuer](client.queueMaintainer) - // We should both schedule retries and discard jobs: svc.TestSignals.FetchedBatch.WaitOrTimeout() - svc.TestSignals.RetriedJobs.WaitOrTimeout() - svc.TestSignals.DiscardedJobs.WaitOrTimeout() + svc.TestSignals.UpdatedBatch.WaitOrTimeout() requireJobHasState := func(jobID int64, state dbsqlc.JobState) { t.Helper() diff --git a/internal/dbsqlc/river_job.sql b/internal/dbsqlc/river_job.sql index 0894fb45..06f8107b 100644 --- a/internal/dbsqlc/river_job.sql +++ b/internal/dbsqlc/river_job.sql @@ -458,32 +458,22 @@ updated_job AS ( updated_job ) ORDER BY scheduled_at DESC NULLS LAST LIMIT 1; --- name: JobUpdateStuckForDiscard :exec -UPDATE river_job -SET - errors = array_append(errors, updated_job.error), - state = updated_job.state, - finalized_at = now() -FROM ( - SELECT - unnest(@id::bigint[]) AS id, - unnest(@errors::jsonb[]) AS error, - 'discarded'::river_job_state AS state -) AS updated_job -WHERE river_job.id = updated_job.id; --- name: JobUpdateStuckForRetry :exec +-- Run by the rescuer to queue for retry or discard depending on job state. +-- name: JobRescueMany :exec UPDATE river_job SET errors = array_append(errors, updated_job.error), + finalized_at = updated_job.finalized_at, scheduled_at = updated_job.scheduled_at, state = updated_job.state FROM ( SELECT unnest(@id::bigint[]) AS id, + unnest(@error::jsonb[]) AS error, + nullif(unnest(@finalized_at::timestamptz[]), '0001-01-01 00:00:00 +0000') AS finalized_at, unnest(@scheduled_at::timestamptz[]) AS scheduled_at, - unnest(@errors::jsonb[]) AS error, - 'retryable'::river_job_state AS state + unnest(@state::text[])::river_job_state AS state ) AS updated_job WHERE river_job.id = updated_job.id; diff --git a/internal/dbsqlc/river_job.sql.go b/internal/dbsqlc/river_job.sql.go index 8d3e34c2..f12408fa 100644 --- a/internal/dbsqlc/river_job.sql.go +++ b/internal/dbsqlc/river_job.sql.go @@ -549,6 +549,44 @@ type JobInsertManyParams struct { Tags []string } +const jobRescueMany = `-- name: JobRescueMany :exec +UPDATE river_job +SET + errors = array_append(errors, updated_job.error), + finalized_at = updated_job.finalized_at, + scheduled_at = updated_job.scheduled_at, + state = updated_job.state +FROM ( + SELECT + unnest($1::bigint[]) AS id, + unnest($2::jsonb[]) AS error, + nullif(unnest($3::timestamptz[]), '0001-01-01 00:00:00 +0000') AS finalized_at, + unnest($4::timestamptz[]) AS scheduled_at, + unnest($5::text[])::river_job_state AS state +) AS updated_job +WHERE river_job.id = updated_job.id +` + +type JobRescueManyParams struct { + ID []int64 + Error [][]byte + FinalizedAt []time.Time + ScheduledAt []time.Time + State []string +} + +// Run by the rescuer to queue for retry or discard depending on job state. +func (q *Queries) JobRescueMany(ctx context.Context, db DBTX, arg JobRescueManyParams) error { + _, err := db.Exec(ctx, jobRescueMany, + arg.ID, + arg.Error, + arg.FinalizedAt, + arg.ScheduledAt, + arg.State, + ) + return err +} + const jobSchedule = `-- name: JobSchedule :one WITH jobs_to_schedule AS ( SELECT id @@ -1078,58 +1116,6 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg JobUpdateParams) ( return &i, err } -const jobUpdateStuckForDiscard = `-- name: JobUpdateStuckForDiscard :exec -UPDATE river_job -SET - errors = array_append(errors, updated_job.error), - state = updated_job.state, - finalized_at = now() -FROM ( - SELECT - unnest($1::bigint[]) AS id, - unnest($2::jsonb[]) AS error, - 'discarded'::river_job_state AS state -) AS updated_job -WHERE river_job.id = updated_job.id -` - -type JobUpdateStuckForDiscardParams struct { - ID []int64 - Errors [][]byte -} - -func (q *Queries) JobUpdateStuckForDiscard(ctx context.Context, db DBTX, arg JobUpdateStuckForDiscardParams) error { - _, err := db.Exec(ctx, jobUpdateStuckForDiscard, arg.ID, arg.Errors) - return err -} - -const jobUpdateStuckForRetry = `-- name: JobUpdateStuckForRetry :exec -UPDATE river_job -SET - errors = array_append(errors, updated_job.error), - scheduled_at = updated_job.scheduled_at, - state = updated_job.state -FROM ( - SELECT - unnest($1::bigint[]) AS id, - unnest($2::timestamptz[]) AS scheduled_at, - unnest($3::jsonb[]) AS error, - 'retryable'::river_job_state AS state -) AS updated_job -WHERE river_job.id = updated_job.id -` - -type JobUpdateStuckForRetryParams struct { - ID []int64 - ScheduledAt []time.Time - Errors [][]byte -} - -func (q *Queries) JobUpdateStuckForRetry(ctx context.Context, db DBTX, arg JobUpdateStuckForRetryParams) error { - _, err := db.Exec(ctx, jobUpdateStuckForRetry, arg.ID, arg.ScheduledAt, arg.Errors) - return err -} - const pGAdvisoryXactLock = `-- name: PGAdvisoryXactLock :exec SELECT pg_advisory_xact_lock($1) ` diff --git a/rescuer.go b/rescuer.go index 036e7fe6..ad5541fd 100644 --- a/rescuer.go +++ b/rescuer.go @@ -25,15 +25,13 @@ const ( // Test-only properties. type rescuerTestSignals struct { - DiscardedJobs rivercommon.TestSignal[struct{}] // notifies when runOnce has discarded jobs from the batch - FetchedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs - RetriedJobs rivercommon.TestSignal[struct{}] // notifies when runOnce has retried jobs from the batch + FetchedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs + UpdatedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch } func (ts *rescuerTestSignals) Init() { - ts.DiscardedJobs.Init() ts.FetchedBatch.Init() - ts.RetriedJobs.Init() + ts.UpdatedBatch.Init() } type rescuerConfig struct { @@ -79,7 +77,7 @@ type rescuer struct { Config *rescuerConfig TestSignals rescuerTestSignals - batchSize int32 // configurable for test purposes + batchSize int // configurable for test purposes dbExecutor dbutil.Executor queries *dbsqlc.Queries } @@ -133,7 +131,7 @@ func (s *rescuer) Start(ctx context.Context) error { s.Logger.InfoContext(ctx, s.Name+": Ran successfully", slog.Int64("num_jobs_discarded", res.NumJobsDiscarded), - slog.Int64("num_jobs_retry_scheduled", res.NumJobsRetryScheduled), + slog.Int64("num_jobs_retry_scheduled", res.NumJobsRetried), ) } }() @@ -142,8 +140,8 @@ func (s *rescuer) Start(ctx context.Context) error { } type rescuerRunOnceResult struct { - NumJobsRetryScheduled int64 - NumJobsDiscarded int64 + NumJobsDiscarded int64 + NumJobsRetried int64 } func (s *rescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) { @@ -157,86 +155,63 @@ func (s *rescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) { s.TestSignals.FetchedBatch.Signal(struct{}{}) - jobIDsToDiscard := make([]int64, 0, len(stuckJobs)) - attemptErrorsForDiscard := make([]AttemptError, 0, len(stuckJobs)) - - jobIDsToRetry := make([]int64, 0, len(stuckJobs)) - attemptErrorsForRetry := make([]AttemptError, 0, len(stuckJobs)) - timestampsForRetry := make([]time.Time, 0, len(stuckJobs)) + // Return quickly in case there's no work to do. + if len(stuckJobs) < 1 { + return res, nil + } now := time.Now().UTC() - for _, job := range stuckJobs { - shouldRetry, retryAt := s.makeRetryDecision(ctx, job) - if shouldRetry { - jobIDsToRetry = append(jobIDsToRetry, job.ID) - timestampsForRetry = append(timestampsForRetry, retryAt) - attemptError := AttemptError{ - At: now, - Error: "Stuck job rescued by Rescuer", - Num: max(int(job.Attempt), 0), - Trace: "TODO", - } - attemptErrorsForRetry = append(attemptErrorsForRetry, attemptError) - } else { - jobIDsToDiscard = append(jobIDsToDiscard, job.ID) - attemptError := AttemptError{ - At: now, - Error: "Stuck job rescued by Rescuer", - Num: max(int(job.Attempt), 0), - Trace: "TODO", - } - attemptErrorsForDiscard = append(attemptErrorsForDiscard, attemptError) - } + rescueManyParams := dbsqlc.JobRescueManyParams{ + ID: make([]int64, len(stuckJobs)), + Error: make([][]byte, len(stuckJobs)), + FinalizedAt: make([]time.Time, len(stuckJobs)), + ScheduledAt: make([]time.Time, len(stuckJobs)), + State: make([]string, len(stuckJobs)), } - if len(jobIDsToRetry) > 0 { - marshaledRetryErrors, err := marshalAllErrors(attemptErrorsForRetry) - if err != nil { - return nil, fmt.Errorf("error marshaling retry errors: %w", err) - } + for i, job := range stuckJobs { + rescueManyParams.ID[i] = job.ID - err = s.queries.JobUpdateStuckForRetry(ctx, s.dbExecutor, dbsqlc.JobUpdateStuckForRetryParams{ - ID: jobIDsToRetry, - ScheduledAt: timestampsForRetry, - Errors: marshaledRetryErrors, + rescueManyParams.Error[i], err = json.Marshal(AttemptError{ + At: now, + Error: "Stuck job rescued by Rescuer", + Num: max(int(job.Attempt), 0), + Trace: "TODO", }) if err != nil { - return nil, fmt.Errorf("error updating stuck jobs for retry: %w", err) + return nil, fmt.Errorf("error marshaling error JSON: %w", err) } - s.TestSignals.RetriedJobs.Signal(struct{}{}) - } - if len(jobIDsToDiscard) > 0 { - marshaledDiscardErrors, err := marshalAllErrors(attemptErrorsForDiscard) - if err != nil { - return nil, fmt.Errorf("error marshaling discard errors: %w", err) + shouldRetry, retryAt := s.makeRetryDecision(ctx, job) + if shouldRetry { + res.NumJobsRetried++ + rescueManyParams.ScheduledAt[i] = retryAt + rescueManyParams.State[i] = string(dbsqlc.JobStateRetryable) + } else { + res.NumJobsDiscarded++ + rescueManyParams.FinalizedAt[i] = now + rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value + rescueManyParams.State[i] = string(dbsqlc.JobStateDiscarded) } + } - err = s.queries.JobUpdateStuckForDiscard(ctx, s.dbExecutor, dbsqlc.JobUpdateStuckForDiscardParams{ - ID: jobIDsToDiscard, - Errors: marshaledDiscardErrors, - }) - if err != nil { - return nil, fmt.Errorf("error updating stuck jobs for discard: %w", err) - } - s.TestSignals.DiscardedJobs.Signal(struct{}{}) + err = s.queries.JobRescueMany(ctx, s.dbExecutor, rescueManyParams) + if err != nil { + return nil, fmt.Errorf("error rescuing stuck jobs: %w", err) } - numDiscarded := int64(len(jobIDsToDiscard)) - numRetried := int64(len(jobIDsToRetry)) - res.NumJobsDiscarded += numDiscarded - res.NumJobsRetryScheduled += numRetried + s.TestSignals.UpdatedBatch.Signal(struct{}{}) // Number of rows fetched was less than query `LIMIT` which means work is // done for this round: - if int32(len(stuckJobs)) < s.batchSize { + if len(stuckJobs) < s.batchSize { break } s.Logger.InfoContext(ctx, s.Name+": Rescued batch of jobs", - slog.Int64("num_jobs_discarded", numDiscarded), - slog.Int64("num_jobs_retried", numRetried), + slog.Int64("num_jobs_discarded", res.NumJobsDiscarded), + slog.Int64("num_jobs_retried", res.NumJobsRetried), ) s.CancellableSleepRandomBetween(ctx, maintenance.BatchBackoffMin, maintenance.BatchBackoffMax) @@ -253,7 +228,7 @@ func (s *rescuer) getStuckJobs(ctx context.Context) ([]*dbsqlc.RiverJob, error) return s.queries.JobGetStuck(ctx, s.dbExecutor, dbsqlc.JobGetStuckParams{ StuckHorizon: stuckHorizon, - LimitCount: s.batchSize, + LimitCount: int32(s.batchSize), }) } @@ -280,17 +255,3 @@ func (s *rescuer) makeRetryDecision(ctx context.Context, internalJob *dbsqlc.Riv } return job.Attempt < max(int(internalJob.MaxAttempts), 0), nextRetry } - -func marshalAllErrors(errors []AttemptError) ([][]byte, error) { - results := make([][]byte, len(errors)) - - for i, attemptErr := range errors { - payload, err := json.Marshal(attemptErr) - if err != nil { - return nil, err - } - results[i] = payload - } - - return results, nil -} diff --git a/rescuer_test.go b/rescuer_test.go index ee5ea288..ed8ec285 100644 --- a/rescuer_test.go +++ b/rescuer_test.go @@ -139,8 +139,7 @@ func TestRescuer(t *testing.T) { require.NoError(cleaner.Start(ctx)) cleaner.TestSignals.FetchedBatch.WaitOrTimeout() - cleaner.TestSignals.DiscardedJobs.WaitOrTimeout() - cleaner.TestSignals.RetriedJobs.WaitOrTimeout() + cleaner.TestSignals.UpdatedBatch.WaitOrTimeout() confirmRetried := func(jobBefore *dbsqlc.RiverJob) { jobAfter, err := queries.JobGetByID(ctx, bundle.tx, jobBefore.ID) @@ -186,7 +185,7 @@ func TestRescuer(t *testing.T) { jobs := make([]*dbsqlc.RiverJob, numJobs) - for i := 0; i < int(numJobs); i++ { + for i := 0; i < numJobs; i++ { job := insertJob(ctx, bundle.tx, insertJobParams{State: dbsqlc.JobStateRunning, AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour))}) jobs[i] = job } @@ -195,9 +194,9 @@ func TestRescuer(t *testing.T) { // See comment above. Exactly two batches are expected. cleaner.TestSignals.FetchedBatch.WaitOrTimeout() - cleaner.TestSignals.RetriedJobs.WaitOrTimeout() + cleaner.TestSignals.UpdatedBatch.WaitOrTimeout() cleaner.TestSignals.FetchedBatch.WaitOrTimeout() - cleaner.TestSignals.RetriedJobs.WaitOrTimeout() // need to wait until after this for the conn to be free + cleaner.TestSignals.UpdatedBatch.WaitOrTimeout() // need to wait until after this for the conn to be free for _, job := range jobs { jobUpdated, err := queries.JobGetByID(ctx, bundle.tx, job.ID) @@ -261,7 +260,7 @@ func TestRescuer(t *testing.T) { require.NoError(t, rescuer.Start(ctx)) rescuer.TestSignals.FetchedBatch.WaitOrTimeout() - rescuer.TestSignals.DiscardedJobs.WaitOrTimeout() + rescuer.TestSignals.UpdatedBatch.WaitOrTimeout() rescuer.Stop() @@ -270,7 +269,7 @@ func TestRescuer(t *testing.T) { require.NoError(t, rescuer.Start(ctx)) rescuer.TestSignals.FetchedBatch.WaitOrTimeout() - rescuer.TestSignals.DiscardedJobs.WaitOrTimeout() + rescuer.TestSignals.UpdatedBatch.WaitOrTimeout() job1After, err := queries.JobGetByID(ctx, bundle.tx, job1.ID) require.NoError(t, err)