Skip to content

Commit

Permalink
feat: add support for recovery callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro authored and pconstantinou committed Jun 26, 2024
1 parent 93ad210 commit 584a182
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 41 deletions.
3 changes: 3 additions & 0 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func (m *MemBackend) Start(ctx context.Context, h handler.Handler) (err error) {
queueCapacity = defaultMemQueueCapacity
}

h.RecoverCallback = m.config.RecoveryCallback

m.handlers.Store(h.Queue, h)
m.queues.Store(h.Queue, make(chan *jobs.Job, queueCapacity))

Expand Down Expand Up @@ -183,6 +185,7 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()
h.Queue = queue
h.RecoverCallback = m.config.RecoveryCallback

err = m.Start(ctx, h)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) {
p.logger.Debug("starting job processing", slog.String("queue", h.Queue))
p.mu.Lock()
p.cancelFuncs = append(p.cancelFuncs, cancel)
h.RecoverCallback = p.config.RecoveryCallback
p.handlers[h.Queue] = h
p.mu.Unlock()

Expand Down Expand Up @@ -491,6 +492,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha

queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr))
h.Queue = queue
h.RecoverCallback = p.config.RecoveryCallback

ctx, cancel := context.WithCancel(ctx)
p.mu.Lock()
Expand Down
58 changes: 58 additions & 0 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,3 +1095,61 @@ func TestJobWithPastDeadline(t *testing.T) {
t.Errorf("job should have resulted in a status of 'failed', but its status is %s", status)
}
}

func TestHandlerRecoveryCallback(t *testing.T) {
connString, _ := prepareAndCleanupDB(t)
const queue = "testing"
timeoutTimer := time.After(5 * time.Second)
recoveryFuncCalled := make(chan bool, 1)
defer close(recoveryFuncCalled)

ctx := context.Background()
nq, err := neoq.New(ctx,
neoq.WithBackend(postgres.Backend),
postgres.WithConnectionString(connString),
neoq.WithRecoveryCallback(func(ctx context.Context, _ error) (err error) {
recoveryFuncCalled <- true
return
}))

if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h := handler.New(queue, func(ctx context.Context) (err error) {
panic("abort mission!")
})
h.WithOptions(
handler.JobTimeout(500*time.Millisecond),
handler.Concurrency(1),
)

// process jobs on the test queue
err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}

jid, err := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
},
})
if err != nil || jid == jobs.DuplicateJobID {
t.Fatal("job was not enqueued. either it was duplicate or this error caused it:", err)
}

select {
case <-timeoutTimer:
err = errors.New("timed out waiting for job") // nolint: goerr113
return
case <-recoveryFuncCalled:
break
}

if err != nil {
t.Error(err)
}
}
3 changes: 3 additions & 0 deletions backends/redis/redis_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ..

// Start starts processing jobs with the specified queue and handler
func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) {
h.RecoverCallback = b.config.RecoveryCallback

b.mux.HandleFunc(h.Queue, func(ctx context.Context, t *asynq.Task) (err error) {
taskID := t.ResultWriter().TaskID()
var p map[string]any
Expand Down Expand Up @@ -279,6 +281,7 @@ func (b *RedisBackend) StartCron(ctx context.Context, cronSpec string, h handler

queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr))
h.Queue = queue
h.RecoverCallback = b.config.RecoveryCallback

err = b.Start(ctx, h)
if err != nil {
Expand Down
56 changes: 49 additions & 7 deletions backends/redis/redis_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ import (
"time"

"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/backends"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/testutils"
"github.com/hibiken/asynq"
"github.com/stretchr/testify/suite"
)

const (
Expand Down Expand Up @@ -444,25 +442,69 @@ result_loop:
}
}

func TestSuite(t *testing.T) {
func TestHandlerRecoveryCallback(t *testing.T) {
const queue = "testing"
timeoutTimer := time.After(5 * time.Second)
recoveryFuncCalled := make(chan bool, 1)
defer close(recoveryFuncCalled)
ctx := context.Background()

connString := os.Getenv("TEST_REDIS_URL")
if connString == "" {
t.Skip("Skipping: TEST_REDIS_URL not set")
return
}

password := os.Getenv("REDIS_PASSWORD")
ctx := context.Background()
nq, err := neoq.New(
ctx,
neoq.WithBackend(Backend),
neoq.WithLogLevel(logging.LogLevelDebug),
WithAddr(connString),
WithPassword(password),
WithShutdownTimeout(500*time.Millisecond))
neoq.WithRecoveryCallback(func(ctx context.Context, _ error) (err error) {
recoveryFuncCalled <- true
return
}),
)
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h := handler.New(queue, func(ctx context.Context) (err error) {
panic("abort mission!")
})
h.WithOptions(
handler.JobTimeout(500*time.Millisecond),
handler.Concurrency(1),
)

// process jobs on the test queue
err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}

jid, err := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": fmt.Sprintf("hello world %d", internal.RandInt(10000000)),
},
})
if err != nil || jid == jobs.DuplicateJobID {
t.Fatal("job was not enqueued. either it was duplicate or this error caused it:", err)
}

s := backends.NewNeoQTestSuite(nq)
suite.Run(t, s)
select {
case <-timeoutTimer:
err = errors.New("timed out waiting for job") // nolint: goerr113
return
case <-recoveryFuncCalled:
break
}

if err != nil {
t.Error(err)
}
}
8 changes: 4 additions & 4 deletions gomod2nix.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ schema = 3
version = "v0.0.0-20221227161230-091c0ba34f0a"
hash = "sha256-rBtUw15WPPDp2eulHXH5e2zCIed1OPFYwlCpgDOnGRM="
[mod."github.com/jackc/pgx/v5"]
version = "v5.3.1"
hash = "sha256-0v6gXZIirv80mlnUx3ycxB2/TLvv3rUnm98Ke1ZjYDQ="
version = "v5.5.4"
hash = "sha256-T4nYUbDDiyN7v6BRhEkPJ9slatzUMrEyoGAyjfK9syI="
[mod."github.com/jackc/puddle/v2"]
version = "v2.2.0"
hash = "sha256-S9Ldac+a4auQt99hToXZ/WSuUhcEk/A5aDgQAb48B8M="
version = "v2.2.1"
hash = "sha256-Edf8SLT/8l+xfHm9IjUGxs1MHtic2VgRyfqb6OzGA9k="
[mod."github.com/jsuar/go-cron-descriptor"]
version = "v0.1.0"
hash = "sha256-zbADYCEzVcOlvemQa+Ly+6mRcCu3qsFxyeTd9jzZj38="
Expand Down
69 changes: 49 additions & 20 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"log"
"runtime"
"runtime/debug"
"strings"
"time"

"golang.org/x/exp/slog"
)

const (
Expand All @@ -24,13 +25,23 @@ var (
// Func is a function that Handlers execute for every Job on a queue
type Func func(ctx context.Context) error

// RecoveryCallback is a function to be called when fatal errors/panics occur in Handlers
type RecoveryCallback func(ctx context.Context, err error) (erro error)

// DefaultRecoveryCallback is the function that gets called by default when handlers panic
func DefaultRecoveryCallback(_ context.Context, _ error) (err error) {
slog.Error("recovering from a panic in the job handler", slog.Any("stack", string(debug.Stack())))
return nil
}

// Handler handles jobs on a queue
type Handler struct {
Handle Func
Concurrency int
JobTimeout time.Duration
QueueCapacity int64
Queue string
Handle Func
Concurrency int
JobTimeout time.Duration
QueueCapacity int64
Queue string
RecoverCallback RecoveryCallback // function called when fatal handler errors occur
}

// Option is function that sets optional configuration for Handlers
Expand Down Expand Up @@ -75,6 +86,13 @@ func Queue(queue string) Option {
}
}

// RecoverCallback configures the handler with a recovery function to be called when fatal errors occur in Handlers
func RecoverCallback(f RecoveryCallback) Option {
return func(h *Handler) {
h.RecoverCallback = f
}
}

// New creates new queue handlers for specific queues. This function is to be usued to create new Handlers for
// non-periodic jobs (most jobs). Use [NewPeriodic] to initialize handlers for periodic jobs.
func New(queue string, f Func, opts ...Option) (h Handler) {
Expand Down Expand Up @@ -104,6 +122,24 @@ func NewPeriodic(f Func, opts ...Option) (h Handler) {
return
}

func errorFromPanic(x any) (err error) {
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
if ok && strings.Contains(file, "runtime/") {
// The panic came from the runtime, most likely due to incorrect
// map/slice usage. The parent frame should have the real trigger.
_, file, line, ok = runtime.Caller(2) //nolint: gomnd
}

// Include the file and line number info in the error, if runtime.Caller returned ok.
if ok {
err = fmt.Errorf("panic [%s:%d]: %v", file, line, x) // nolint: goerr113
} else {
err = fmt.Errorf("panic: %v", x) // nolint: goerr113
}

return
}

// Exec executes handler functions with a concrete timeout
func Exec(ctx context.Context, handler Handler) (err error) {
timeoutCtx, cancel := context.WithTimeout(ctx, handler.JobTimeout)
Expand All @@ -115,22 +151,15 @@ func Exec(ctx context.Context, handler Handler) (err error) {
go func(ctx context.Context) {
defer func() {
if x := recover(); x != nil {
log.Printf("recovering from a panic in the job handler:\n%s", string(debug.Stack()))
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
if ok && strings.Contains(file, "runtime/") {
// The panic came from the runtime, most likely due to incorrect
// map/slice usage. The parent frame should have the real trigger.
_, file, line, ok = runtime.Caller(2) //nolint: gomnd
}

// Include the file and line number info in the error, if runtime.Caller returned ok.
if ok {
errCh <- fmt.Errorf("panic [%s:%d]: %v", file, line, x) // nolint: goerr113
} else {
errCh <- fmt.Errorf("panic: %v", x) // nolint: goerr113
err = errorFromPanic(x)
errCh <- err
if handler.RecoverCallback != nil {
err = handler.RecoverCallback(ctx, err)
if err != nil {
slog.Error("handler recovery callback also failed while recovering from panic", slog.Any("error", err))
}
}
}

done <- true
}()

Expand Down
31 changes: 21 additions & 10 deletions neoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ var ErrBackendNotSpecified = errors.New("a backend must be specified")
// per-handler basis.
type Config struct {
BackendInitializer BackendInitializer
BackendAuthPassword string // password with which to authenticate to the backend
BackendConcurrency int // total number of backend processes available to process jobs
ConnectionString string // a string containing connection details for the backend
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs
IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance)
LogLevel logging.LogLevel // the log level of the default logger
PGConnectionTimeout time.Duration // the amount of time to wait for a connection to become available before timing out
BackendAuthPassword string // password with which to authenticate to the backend
BackendConcurrency int // total number of backend processes available to process jobs
ConnectionString string // a string containing connection details for the backend
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that future jobs get scheduled
IdleTransactionTimeout int // number of milliseconds PgBackend transaction may idle before the connection is killed
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance)
LogLevel logging.LogLevel // the log level of the default logger
PGConnectionTimeout time.Duration // the amount of time to wait for a connection to become available before timing out
RecoveryCallback handler.RecoveryCallback // the recovery handler applied to all Handlers excuted by the associated Neoq instance
}

// JobOptions
Expand All @@ -58,6 +59,7 @@ func NewConfig() *Config {
return &Config{
FutureJobWindow: DefaultFutureJobWindow,
JobCheckInterval: DefaultJobCheckInterval,
RecoveryCallback: handler.DefaultRecoveryCallback,
}
}

Expand Down Expand Up @@ -126,6 +128,15 @@ func WithBackend(initializer BackendInitializer) ConfigOption {
}
}

// WithRecoveryCallback configures neoq with a function to be called when fatal errors occur in job Handlers.
//
// Recovery callbacks are useful for reporting errors to error loggers and collecting error metrics
func WithRecoveryCallback(cb handler.RecoveryCallback) ConfigOption {
return func(c *Config) {
c.RecoveryCallback = cb
}
}

// WithJobCheckInterval configures the duration of time between checking for future jobs
func WithJobCheckInterval(interval time.Duration) ConfigOption {
return func(c *Config) {
Expand Down
Loading

0 comments on commit 584a182

Please sign in to comment.