diff --git a/processor/processor.go b/processor/processor.go index 83a2cb8..ca32864 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1,10 +1,12 @@ package processor import ( + "context" "errors" "fmt" "math/rand" "runtime/debug" + "sync" "time" camundaclientgo "github.com/citilinkru/camunda-client-go/v3" @@ -15,6 +17,11 @@ type Processor struct { client *camundaclientgo.Client options *Options logger func(err error) + + // shutdown support + workerGroup *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc } // Options options for Processor @@ -45,11 +52,16 @@ func NewProcessor(client *camundaclientgo.Client, options *Options, logger func( options.WorkerId = fmt.Sprintf("worker-%d", rand.Int()) } + ctx, cancel := context.WithCancel(context.Background()) + workerGroup := new(sync.WaitGroup) + return &Processor{ - client: client, - options: options, - logger: logger, - } + client: client, + options: options, + logger: logger, + workerGroup: workerGroup, + ctx: ctx, + cancel: cancel} } // Handler a handler for external task @@ -91,7 +103,14 @@ func (c *Context) HandleFailure(query QueryHandleFailure) error { }) } -// AddHandler a add handler for external task +// Shutdown stop this processor and wait for running handlers to complete in-flight processing. +// The Processor cannot be reused after shutdown. +func (p *Processor) Shutdown() { + p.cancel() + p.workerGroup.Wait() +} + +// AddHandler register an external task handler and start pulling for work. Calling this after a Shutdown has no effect. func (p *Processor) AddHandler(topics []*camundaclientgo.QueryFetchAndLockTopic, handler Handler) { if topics != nil && p.options.LockDuration != 0 { for _, v := range topics { @@ -109,7 +128,7 @@ func (p *Processor) AddHandler(topics []*camundaclientgo.QueryFetchAndLockTopic, asyncResponseTimeout = &msValue } - go p.startPuller(camundaclientgo.QueryFetchAndLock{ + p.startPuller(camundaclientgo.QueryFetchAndLock{ WorkerId: p.options.WorkerId, MaxTasks: p.options.MaxTasks, UsePriority: p.options.UsePriority, @@ -128,29 +147,39 @@ func (p *Processor) startPuller(query camundaclientgo.QueryFetchAndLock, handler // create worker pool for i := 0; i < maxParallelTaskPerHandler; i++ { + p.workerGroup.Add(1) go p.runWorker(handler, tasksChan) } - retries := 0 - for { - tasks, err := p.client.ExternalTask.FetchAndLock(query) - if err != nil { - if retries < 60 { - retries += 1 + go func() { + retries := 0 + for { + select { + case <-p.ctx.Done(): + close(tasksChan) + return + default: + tasks, err := p.client.ExternalTask.FetchAndLock(query) + if err != nil { + if retries < 60 { + retries += 1 + } + p.logger(fmt.Errorf("failed pull: %w, sleeping: %d seconds", err, retries)) + time.Sleep(time.Duration(retries) * time.Second) + continue + } + retries = 0 + + for _, task := range tasks { + tasksChan <- task + } } - p.logger(fmt.Errorf("failed pull: %w, sleeping: %d seconds", err, retries)) - time.Sleep(time.Duration(retries) * time.Second) - continue - } - retries = 0 - - for _, task := range tasks { - tasksChan <- task } - } + }() } func (p *Processor) runWorker(handler Handler, tasksChan chan *camundaclientgo.ResLockedExternalTask) { + defer p.workerGroup.Done() for task := range tasksChan { p.handle(&Context{ Task: task,