Skip to content

Commit

Permalink
Support for shutting down external task processor
Browse files Browse the repository at this point in the history
mlaflamm authored and liderman committed Jun 8, 2022

Verified

This commit was signed with the committer’s verified signature.
not-an-aardvark Teddy Katz
1 parent 29c23b8 commit 8ae3615
Showing 1 changed file with 50 additions and 21 deletions.
71 changes: 50 additions & 21 deletions processor/processor.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package processor

import (
"context"
"errors"
"fmt"
"math/rand"
"runtime/debug"
"sync"
"time"

camundaclientgo "github.com/citilinkru/camunda-client-go/v3"
@@ -15,6 +17,11 @@ type Processor struct {
client *camundaclientgo.Client
options *Options
logger func(err error)

// shutdown support
workerGroup *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}

// Options options for Processor
@@ -45,11 +52,16 @@ func NewProcessor(client *camundaclientgo.Client, options *Options, logger func(
options.WorkerId = fmt.Sprintf("worker-%d", rand.Int())
}

ctx, cancel := context.WithCancel(context.Background())
workerGroup := new(sync.WaitGroup)

return &Processor{
client: client,
options: options,
logger: logger,
}
client: client,
options: options,
logger: logger,
workerGroup: workerGroup,
ctx: ctx,
cancel: cancel}
}

// Handler a handler for external task
@@ -91,7 +103,14 @@ func (c *Context) HandleFailure(query QueryHandleFailure) error {
})
}

// AddHandler a add handler for external task
// Shutdown stop this processor and wait for running handlers to complete in-flight processing.
// The Processor cannot be reused after shutdown.
func (p *Processor) Shutdown() {
p.cancel()
p.workerGroup.Wait()
}

// AddHandler register an external task handler and start pulling for work. Calling this after a Shutdown has no effect.
func (p *Processor) AddHandler(topics []*camundaclientgo.QueryFetchAndLockTopic, handler Handler) {
if topics != nil && p.options.LockDuration != 0 {
for _, v := range topics {
@@ -109,7 +128,7 @@ func (p *Processor) AddHandler(topics []*camundaclientgo.QueryFetchAndLockTopic,
asyncResponseTimeout = &msValue
}

go p.startPuller(camundaclientgo.QueryFetchAndLock{
p.startPuller(camundaclientgo.QueryFetchAndLock{
WorkerId: p.options.WorkerId,
MaxTasks: p.options.MaxTasks,
UsePriority: p.options.UsePriority,
@@ -128,29 +147,39 @@ func (p *Processor) startPuller(query camundaclientgo.QueryFetchAndLock, handler

// create worker pool
for i := 0; i < maxParallelTaskPerHandler; i++ {
p.workerGroup.Add(1)
go p.runWorker(handler, tasksChan)
}

retries := 0
for {
tasks, err := p.client.ExternalTask.FetchAndLock(query)
if err != nil {
if retries < 60 {
retries += 1
go func() {
retries := 0
for {
select {
case <-p.ctx.Done():
close(tasksChan)
return
default:
tasks, err := p.client.ExternalTask.FetchAndLock(query)
if err != nil {
if retries < 60 {
retries += 1
}
p.logger(fmt.Errorf("failed pull: %w, sleeping: %d seconds", err, retries))
time.Sleep(time.Duration(retries) * time.Second)
continue
}
retries = 0

for _, task := range tasks {
tasksChan <- task
}
}
p.logger(fmt.Errorf("failed pull: %w, sleeping: %d seconds", err, retries))
time.Sleep(time.Duration(retries) * time.Second)
continue
}
retries = 0

for _, task := range tasks {
tasksChan <- task
}
}
}()
}

func (p *Processor) runWorker(handler Handler, tasksChan chan *camundaclientgo.ResLockedExternalTask) {
defer p.workerGroup.Done()
for task := range tasksChan {
p.handle(&Context{
Task: task,

0 comments on commit 8ae3615

Please sign in to comment.