Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement hard shutdown #77

Merged
merged 4 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ jobs:
ports: ['7419:7419']

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.17

- name: Install linter
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.44.0
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.56.1

- name: Test
run: make lint test
11 changes: 11 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# faktory\_worker\_go

## 1.7.0

- Implement hard shutdown timeout of 25 seconds. [#76]
Your job funcs should implement `context` package semantics.
If you use `Manager.Run()`, FWG will now gracefully shutdown.
After a default delay of 25 seconds, FWG will cancel the root Context which should quickly cancel any lingering jobs running under that Manager.
If your jobs run long and do not respond to context cancellation, you risk orphaning any jobs in-progress.
They will linger on the Busy tab until the job's `reserve_for` timeout.

Please also note that `RunWithContext` added in `1.6.0` does not implement the shutdown delay but the README example contains the code to implement it.

## 1.6.0

- Upgrade to Go 1.17 and Faktory 1.6.0.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test:
go test -v ./...
go test ./...

work:
go run test/main.go
Expand Down
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

![travis](https://travis-ci.org/contribsys/faktory_worker_go.svg?branch=master)

This repository provides a Faktory worker process for Go apps. This
worker process fetches background jobs from the Faktory server and processes them.
This repository provides a Faktory worker process for Go apps.
This worker process fetches background jobs from the Faktory server and processes them.

How is this different from all the other Go background worker libraries?
They all use Redis or another "dumb" datastore. This library is far
simpler because the Faktory server implements most of the data storage, retry logic,
Web UI, etc.
They all use Redis or another "dumb" datastore.
This library is far simpler because the Faktory server implements most of the data storage, retry logic, Web UI, etc.

# Installation

Expand Down Expand Up @@ -55,6 +54,8 @@ func main() {

// use up to N goroutines to execute jobs
mgr.Concurrency = 20
// wait up to 25 seconds to let jobs in progress finish
mgr.ShutdownTimeout = 25 * time.Second

// pull jobs from these queues, in this order of precedence
mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")
Expand Down Expand Up @@ -99,6 +100,8 @@ func main() {

// use up to N goroutines to execute jobs
mgr.Concurrency = 20
// wait up to 25 seconds to let jobs in progress finish
mgr.ShutdownTimeout = 25 * time.Second

// pull jobs from these queues, in this order of precedence
mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")
Expand All @@ -116,6 +119,7 @@ func main() {
stopSignals := []os.Signal{
syscall.SIGTERM,
syscall.SIGINT,
// TODO Implement the TSTP signal to call mgr.Quiet()
}
stop := make(chan os.Signal, len(stopSignals))
for _, s := range stopSignals {
Expand All @@ -127,17 +131,18 @@ func main() {
case <-ctx.Done():
return
case <-stop:
cancel()
break
}
}

_ = time.AfterFunc(mgr.ShutdownTimeout, cancel)
_ = mgr.Terminate(true) // never returns
}()

<-ctx.Done()
}
```

See `test/main.go` for a working example.

# Testing

`faktory_worker_go` provides helpers that allow you to configure tests to execute jobs inline if you prefer. In this example, the application has defined its own wrapper function for `client.Push`.
Expand Down Expand Up @@ -225,7 +230,7 @@ See [the wiki](https://github.com/contribsys/faktory/wiki) for details.

# Author

Mike Perham, @getajobmike, @contribsys
Mike Perham, https://ruby.social/@getajobmike

# License

Expand Down
16 changes: 7 additions & 9 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ var (
// because execution should be orthogonal to
// most of the Job payload contents.
//
// func myJob(ctx context.Context, args ...interface{}) error {
// helper := worker.HelperFor(ctx)
// jid := helper.Jid()
//
// helper.With(func(cl *faktory.Client) error {
// cl.Push("anotherJob", 4, "arg")
// })
// func myJob(ctx context.Context, args ...interface{}) error {
// helper := worker.HelperFor(ctx)
// jid := helper.Jid()
//
// helper.With(func(cl *faktory.Client) error {
// cl.Push("anotherJob", 4, "arg")
// })
type Helper interface {
Jid() string
JobType() string
Expand Down Expand Up @@ -127,8 +126,7 @@ func HelperFor(ctx context.Context) Helper {
return nil
}

func jobContext(pool *faktory.Pool, job *faktory.Job) context.Context {
ctx := context.Background()
func jobContext(ctx context.Context, pool *faktory.Pool, job *faktory.Job) context.Context {
ctx = context.WithValue(ctx, poolKey, pool)
ctx = context.WithValue(ctx, jobKey, job)
return ctx
Expand Down
13 changes: 9 additions & 4 deletions context_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package faktory_worker

import (
"context"
"errors"
"regexp"
"testing"
"time"

faktory "github.com/contribsys/faktory/client"
"github.com/stretchr/testify/assert"
Expand All @@ -21,15 +23,16 @@ func TestSimpleContext(t *testing.T) {
//cl, err := faktory.Open()
//assert.NoError(t, err)
//cl.Push(job)

ctx := jobContext(pool, job)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ctx = jobContext(ctx, pool, job)
help := HelperFor(ctx)
assert.Equal(t, help.Jid(), job.Jid)
assert.Empty(t, help.Bid())
assert.Equal(t, "something", help.JobType())

_, ok := ctx.Deadline()
assert.False(t, ok)
assert.True(t, ok)

//assert.NoError(t, ctx.TrackProgress(45, "Working....", nil))

Expand All @@ -51,7 +54,9 @@ func TestBatchContext(t *testing.T) {
job.SetCustom("track", 1)
job.SetCustom("bid", "nosuchbatch")

ctx := jobContext(pool, job)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ctx = jobContext(ctx, pool, job)
help := HelperFor(ctx)
assert.Equal(t, help.Jid(), job.Jid)
assert.Equal(t, "nosuchbatch", help.Bid())
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/contribsys/faktory_worker_go

go 1.20

toolchain go1.21.4
go 1.21

require (
github.com/contribsys/faktory v1.8.0
Expand Down
44 changes: 31 additions & 13 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"
"sync"
"time"

faktory "github.com/contribsys/faktory/client"
)
Expand All @@ -16,11 +17,12 @@ import (
type Manager struct {
mut sync.Mutex

Concurrency int
Logger Logger
ProcessWID string
Labels []string
Pool *faktory.Pool
Concurrency int
Logger Logger
ProcessWID string
Labels []string
Pool *faktory.Pool
ShutdownTimeout time.Duration

queues []string
middleware []MiddlewareFunc
Expand All @@ -31,6 +33,7 @@ type Manager struct {
shutdownWaiter *sync.WaitGroup
jobHandlers map[string]Handler
eventHandlers map[lifecycleEventType][]LifecycleEventHandler
cancelFunc context.CancelFunc

// This only needs to be computed once. Store it here to keep things fast.
weightedPriorityQueuesEnabled bool
Expand All @@ -57,10 +60,10 @@ func (mgr *Manager) isRegistered(name string) bool {
}

// dispatch immediately executes a job, including all middleware.
func (mgr *Manager) dispatch(job *faktory.Job) error {
func (mgr *Manager) dispatch(ctx context.Context, job *faktory.Job) error {
perform := mgr.jobHandlers[job.Type]

return dispatch(mgr.middleware, jobContext(mgr.Pool, job), job, perform)
return dispatch(jobContext(ctx, mgr.Pool, job), mgr.middleware, job, perform)
}

// InlineDispatch is designed for testing. It immediate executes a job, including all middleware,
Expand All @@ -75,7 +78,7 @@ func (mgr *Manager) InlineDispatch(job *faktory.Job) error {
return fmt.Errorf("couldn't set up worker process for inline dispatch - %w", err)
}

err = mgr.dispatch(job)
err = mgr.dispatch(context.Background(), job)
if err != nil {
return fmt.Errorf("job was dispatched inline but failed. Job type %s, with args %+v - %w", job.Type, job.Args, err)
}
Expand Down Expand Up @@ -117,8 +120,14 @@ func (mgr *Manager) Terminate(reallydie bool) {
mgr.Logger.Info("Shutting down...")
mgr.state = "terminate"
close(mgr.done)

if mgr.cancelFunc != nil {
// cancel any jobs which are lingering
time.AfterFunc(mgr.ShutdownTimeout, mgr.cancelFunc)
}
mgr.fireEvent(Shutdown)
mgr.shutdownWaiter.Wait()
mgr.shutdownWaiter.Wait() // can't pass this point until all jobs are done

mgr.Pool.Close()
mgr.Logger.Info("Goodbye")
if reallydie {
Expand All @@ -134,6 +143,12 @@ func NewManager() *Manager {
Labels: []string{"golang-" + Version},
Pool: nil,

// best practice is to give jobs 25 seconds to finish their work
// and then use the last 5 seconds to force any lingering jobs to
// stop by closing their Context. Many cloud services default to a
// hard 30 second timeout beforing KILLing the process.
ShutdownTimeout: 25 * time.Second,

state: "",
queues: []string{"default"},
done: make(chan interface{}),
Expand Down Expand Up @@ -182,7 +197,7 @@ func (mgr *Manager) setUpWorkerProcess() error {
// If the context is present then os signals will be ignored, the context must be canceled for the method to return
// after running.
func (mgr *Manager) RunWithContext(ctx context.Context) error {
err := mgr.boot()
err := mgr.boot(ctx)
if err != nil {
return err
}
Expand All @@ -191,7 +206,7 @@ func (mgr *Manager) RunWithContext(ctx context.Context) error {
return nil
}

func (mgr *Manager) boot() error {
func (mgr *Manager) boot(ctx context.Context) error {
err := mgr.setUpWorkerProcess()
if err != nil {
return err
Expand All @@ -203,7 +218,7 @@ func (mgr *Manager) boot() error {
mgr.Logger.Infof("faktory_worker_go %s PID %d now ready to process jobs", Version, os.Getpid())
mgr.Logger.Debugf("Using Faktory Client API %s", faktory.Version)
for i := 0; i < mgr.Concurrency; i++ {
go process(mgr, i)
go process(ctx, mgr, i)
}

return nil
Expand All @@ -212,7 +227,10 @@ func (mgr *Manager) boot() error {
// Run starts processing jobs.
// This method does not return unless an error is encountered while starting.
func (mgr *Manager) Run() error {
err := mgr.boot()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
mgr.cancelFunc = cancel
err := mgr.boot(ctx)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ func (mgr *Manager) Use(middleware ...MiddlewareFunc) {
mgr.middleware = append(mgr.middleware, middleware...)
}

func dispatch(chain []MiddlewareFunc, ctx context.Context, job *faktory.Job, perform Handler) error {
func dispatch(ctx context.Context, chain []MiddlewareFunc, job *faktory.Job, perform Handler) error {
if len(chain) == 0 {
return perform(ctx, job)
}

link := chain[0]
rest := chain[1:]
return link(ctx, job, func(ctx context.Context) error {
return dispatch(rest, ctx, job, perform)
return dispatch(ctx, rest, job, perform)
})
}
7 changes: 5 additions & 2 deletions middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package faktory_worker
import (
"context"
"testing"
"time"

faktory "github.com/contribsys/faktory/client"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -34,12 +35,14 @@ func TestMiddleware(t *testing.T) {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
job := faktory.NewJob("blah", 1, 2)
ctx := jobContext(mgr.Pool, job)
ctx = jobContext(ctx, mgr.Pool, job)
assert.Nil(t, ctx.Value(EXAMPLE))
assert.EqualValues(t, 0, counter)

err = dispatch(mgr.middleware, ctx, job, blahFunc)
err = dispatch(ctx, mgr.middleware, job, blahFunc)

assert.NoError(t, err)
assert.EqualValues(t, 1, counter)
Expand Down
Loading
Loading