Skip to content

Commit

Permalink
Refactor: Reduce rescuer LOCs / number of queries + optimize
Browse files Browse the repository at this point in the history
I've been looking at ways we could start consolidating queries in
`river_job.sql` a bit because the file's currently quite a bit of a
mess. One pretty easy one that came out is that we multiple queries for
the rescuer that can be just one without much trouble.

Here, make that change, which also allows to simplify the rescuer's core
run loop by quite a bit and reduce the lines of code in there. Also
expect a slight performance improvement as we need only one round trip
to the database and the streamlined code lets us preallocate a number of
Go slices and reduce the number of `append`s happening.
  • Loading branch information
brandur committed Nov 15, 2023
1 parent 03f52d0 commit a8678fb
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 166 deletions.
15 changes: 7 additions & 8 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,26 +1035,27 @@ func Test_Client_Maintenance(t *testing.T) {
// with the number of errors that corresponds to its attempt count. Without
// that, the rescued/errored jobs can retry immediately with no backoff and
// cause flakiness as they quickly get picked back up again.
var errors []AttemptError
errorCount := int(params.Attempt - 1)
if params.Attempt == 0 {
errorCount = int(params.Attempt)
}

errorsBytes := make([][]byte, errorCount)
for i := 0; i < errorCount; i++ {
errors = append(errors, AttemptError{
var err error
errorsBytes[i], err = json.Marshal(AttemptError{
At: time.Now(),
Error: "mocked error",
Num: i + 1,
Trace: "none",
})
require.NoError(t, err)
}
errorBytes, err := marshalAllErrors(errors)
require.NoError(t, err)

job, err := queries.JobInsert(ctx, dbtx, dbsqlc.JobInsertParams{
Attempt: valutil.FirstNonZero(params.Attempt, int16(1)),
AttemptedAt: params.AttemptedAt,
Errors: errorBytes,
Errors: errorsBytes,
FinalizedAt: params.FinalizedAt,
Kind: valutil.FirstNonZero(params.Kind, "test_kind"),
MaxAttempts: valutil.FirstNonZero(params.MaxAttempts, int16(rivercommon.DefaultMaxAttempts)),
Expand Down Expand Up @@ -1222,10 +1223,8 @@ func Test_Client_Maintenance(t *testing.T) {

client.testSignals.electedLeader.WaitOrTimeout()
svc := maintenance.GetService[*rescuer](client.queueMaintainer)
// We should both schedule retries and discard jobs:
svc.TestSignals.FetchedBatch.WaitOrTimeout()
svc.TestSignals.RetriedJobs.WaitOrTimeout()
svc.TestSignals.DiscardedJobs.WaitOrTimeout()
svc.TestSignals.UpdatedBatch.WaitOrTimeout()

requireJobHasState := func(jobID int64, state dbsqlc.JobState) {
t.Helper()
Expand Down
22 changes: 6 additions & 16 deletions internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -458,32 +458,22 @@ updated_job AS (
updated_job
) ORDER BY scheduled_at DESC NULLS LAST LIMIT 1;

-- name: JobUpdateStuckForDiscard :exec
UPDATE river_job
SET
errors = array_append(errors, updated_job.error),
state = updated_job.state,
finalized_at = now()
FROM (
SELECT
unnest(@id::bigint[]) AS id,
unnest(@errors::jsonb[]) AS error,
'discarded'::river_job_state AS state
) AS updated_job
WHERE river_job.id = updated_job.id;

-- name: JobUpdateStuckForRetry :exec
-- Run by the rescuer to queue for retry or discard depending on job state.
-- name: JobRescueMany :exec
UPDATE river_job
SET
errors = array_append(errors, updated_job.error),
finalized_at = updated_job.finalized_at,
scheduled_at = updated_job.scheduled_at,
state = updated_job.state
FROM (
SELECT
unnest(@id::bigint[]) AS id,
unnest(@error::jsonb[]) AS error,
nullif(unnest(@finalized_at::timestamptz[]), '0001-01-01 00:00:00 +0000') AS finalized_at,
unnest(@scheduled_at::timestamptz[]) AS scheduled_at,
unnest(@errors::jsonb[]) AS error,
'retryable'::river_job_state AS state
unnest(@state::text[])::river_job_state AS state
) AS updated_job
WHERE river_job.id = updated_job.id;

Expand Down
90 changes: 38 additions & 52 deletions internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

127 changes: 44 additions & 83 deletions rescuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ const (

// Test-only properties.
type rescuerTestSignals struct {
DiscardedJobs rivercommon.TestSignal[struct{}] // notifies when runOnce has discarded jobs from the batch
FetchedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs
RetriedJobs rivercommon.TestSignal[struct{}] // notifies when runOnce has retried jobs from the batch
FetchedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs
UpdatedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch
}

func (ts *rescuerTestSignals) Init() {
ts.DiscardedJobs.Init()
ts.FetchedBatch.Init()
ts.RetriedJobs.Init()
ts.UpdatedBatch.Init()
}

type rescuerConfig struct {
Expand Down Expand Up @@ -79,7 +77,7 @@ type rescuer struct {
Config *rescuerConfig
TestSignals rescuerTestSignals

batchSize int32 // configurable for test purposes
batchSize int // configurable for test purposes
dbExecutor dbutil.Executor
queries *dbsqlc.Queries
}
Expand Down Expand Up @@ -133,7 +131,7 @@ func (s *rescuer) Start(ctx context.Context) error {

s.Logger.InfoContext(ctx, s.Name+": Ran successfully",
slog.Int64("num_jobs_discarded", res.NumJobsDiscarded),
slog.Int64("num_jobs_retry_scheduled", res.NumJobsRetryScheduled),
slog.Int64("num_jobs_retry_scheduled", res.NumJobsRetried),
)
}
}()
Expand All @@ -142,8 +140,8 @@ func (s *rescuer) Start(ctx context.Context) error {
}

type rescuerRunOnceResult struct {
NumJobsRetryScheduled int64
NumJobsDiscarded int64
NumJobsDiscarded int64
NumJobsRetried int64
}

func (s *rescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) {
Expand All @@ -157,86 +155,63 @@ func (s *rescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) {

s.TestSignals.FetchedBatch.Signal(struct{}{})

jobIDsToDiscard := make([]int64, 0, len(stuckJobs))
attemptErrorsForDiscard := make([]AttemptError, 0, len(stuckJobs))

jobIDsToRetry := make([]int64, 0, len(stuckJobs))
attemptErrorsForRetry := make([]AttemptError, 0, len(stuckJobs))
timestampsForRetry := make([]time.Time, 0, len(stuckJobs))
// Return quickly in case there's no work to do.
if len(stuckJobs) < 1 {
return res, nil
}

now := time.Now().UTC()

for _, job := range stuckJobs {
shouldRetry, retryAt := s.makeRetryDecision(ctx, job)
if shouldRetry {
jobIDsToRetry = append(jobIDsToRetry, job.ID)
timestampsForRetry = append(timestampsForRetry, retryAt)
attemptError := AttemptError{
At: now,
Error: "Stuck job rescued by Rescuer",
Num: max(int(job.Attempt), 0),
Trace: "TODO",
}
attemptErrorsForRetry = append(attemptErrorsForRetry, attemptError)
} else {
jobIDsToDiscard = append(jobIDsToDiscard, job.ID)
attemptError := AttemptError{
At: now,
Error: "Stuck job rescued by Rescuer",
Num: max(int(job.Attempt), 0),
Trace: "TODO",
}
attemptErrorsForDiscard = append(attemptErrorsForDiscard, attemptError)
}
rescueManyParams := dbsqlc.JobRescueManyParams{
ID: make([]int64, len(stuckJobs)),
Error: make([][]byte, len(stuckJobs)),
FinalizedAt: make([]time.Time, len(stuckJobs)),
ScheduledAt: make([]time.Time, len(stuckJobs)),
State: make([]string, len(stuckJobs)),
}

if len(jobIDsToRetry) > 0 {
marshaledRetryErrors, err := marshalAllErrors(attemptErrorsForRetry)
if err != nil {
return nil, fmt.Errorf("error marshaling retry errors: %w", err)
}
for i, job := range stuckJobs {
rescueManyParams.ID[i] = job.ID

err = s.queries.JobUpdateStuckForRetry(ctx, s.dbExecutor, dbsqlc.JobUpdateStuckForRetryParams{
ID: jobIDsToRetry,
ScheduledAt: timestampsForRetry,
Errors: marshaledRetryErrors,
rescueManyParams.Error[i], err = json.Marshal(AttemptError{
At: now,
Error: "Stuck job rescued by Rescuer",
Num: max(int(job.Attempt), 0),
Trace: "TODO",
})
if err != nil {
return nil, fmt.Errorf("error updating stuck jobs for retry: %w", err)
return nil, fmt.Errorf("error marshaling error JSON: %w", err)
}
s.TestSignals.RetriedJobs.Signal(struct{}{})
}

if len(jobIDsToDiscard) > 0 {
marshaledDiscardErrors, err := marshalAllErrors(attemptErrorsForDiscard)
if err != nil {
return nil, fmt.Errorf("error marshaling discard errors: %w", err)
shouldRetry, retryAt := s.makeRetryDecision(ctx, job)
if shouldRetry {
res.NumJobsRetried++
rescueManyParams.ScheduledAt[i] = retryAt
rescueManyParams.State[i] = string(dbsqlc.JobStateRetryable)
} else {
res.NumJobsDiscarded++
rescueManyParams.FinalizedAt[i] = now
rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value
rescueManyParams.State[i] = string(dbsqlc.JobStateDiscarded)
}
}

err = s.queries.JobUpdateStuckForDiscard(ctx, s.dbExecutor, dbsqlc.JobUpdateStuckForDiscardParams{
ID: jobIDsToDiscard,
Errors: marshaledDiscardErrors,
})
if err != nil {
return nil, fmt.Errorf("error updating stuck jobs for discard: %w", err)
}
s.TestSignals.DiscardedJobs.Signal(struct{}{})
err = s.queries.JobRescueMany(ctx, s.dbExecutor, rescueManyParams)
if err != nil {
return nil, fmt.Errorf("error rescuing stuck jobs: %w", err)
}

numDiscarded := int64(len(jobIDsToDiscard))
numRetried := int64(len(jobIDsToRetry))
res.NumJobsDiscarded += numDiscarded
res.NumJobsRetryScheduled += numRetried
s.TestSignals.UpdatedBatch.Signal(struct{}{})

// Number of rows fetched was less than query `LIMIT` which means work is
// done for this round:
if int32(len(stuckJobs)) < s.batchSize {
if len(stuckJobs) < s.batchSize {
break
}

s.Logger.InfoContext(ctx, s.Name+": Rescued batch of jobs",
slog.Int64("num_jobs_discarded", numDiscarded),
slog.Int64("num_jobs_retried", numRetried),
slog.Int64("num_jobs_discarded", res.NumJobsDiscarded),
slog.Int64("num_jobs_retried", res.NumJobsRetried),
)

s.CancellableSleepRandomBetween(ctx, maintenance.BatchBackoffMin, maintenance.BatchBackoffMax)
Expand All @@ -253,7 +228,7 @@ func (s *rescuer) getStuckJobs(ctx context.Context) ([]*dbsqlc.RiverJob, error)

return s.queries.JobGetStuck(ctx, s.dbExecutor, dbsqlc.JobGetStuckParams{
StuckHorizon: stuckHorizon,
LimitCount: s.batchSize,
LimitCount: int32(s.batchSize),
})
}

Expand All @@ -280,17 +255,3 @@ func (s *rescuer) makeRetryDecision(ctx context.Context, internalJob *dbsqlc.Riv
}
return job.Attempt < max(int(internalJob.MaxAttempts), 0), nextRetry
}

func marshalAllErrors(errors []AttemptError) ([][]byte, error) {
results := make([][]byte, len(errors))

for i, attemptErr := range errors {
payload, err := json.Marshal(attemptErr)
if err != nil {
return nil, err
}
results[i] = payload
}

return results, nil
}
Loading

0 comments on commit a8678fb

Please sign in to comment.