From 0590176a23c056b274bd30ff7cec8f8a203f0665 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sun, 2 May 2021 08:22:30 +0100 Subject: [PATCH] Only use boost workers for leveldb shadow queues (#15696) * The leveldb shadow queue of a persistable channel queue should always start with 0 workers and just use boost to add additional workers if necessary. * create a zero boost so that if there are no workers in a pool - boost to start the workers * actually set timeout appropriately on boosted workers Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel.go | 6 ++-- modules/queue/unique_queue_disk_channel.go | 10 +++--- modules/queue/workerpool.go | 42 ++++++++++++++++++++-- 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 433435c3015f2..801fd8a12235c 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -75,10 +75,10 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( BatchLength: config.BatchLength, BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - MaxWorkers: 6, + BoostWorkers: 1, + MaxWorkers: 5, }, - Workers: 1, + Workers: 0, Name: config.Name + "-level", }, DataDir: config.DataDir, diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 4a69b43eae0e3..47c4f2bdd574d 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -73,12 +73,12 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac WorkerPoolConfiguration: WorkerPoolConfiguration{ QueueLength: config.QueueLength, BatchLength: config.BatchLength, - BlockTimeout: 0, - BoostTimeout: 0, - BoostWorkers: 0, - MaxWorkers: 1, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 1, + MaxWorkers: 5, }, - Workers: 1, + Workers: 0, Name: config.Name + "-level", }, DataDir: config.DataDir, diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 45378e3dae145..0f15ccac9efd7 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -70,7 +70,11 @@ func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1) p.lock.Lock() if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) { - p.lock.Unlock() + if p.numberOfWorkers == 0 { + p.zeroBoost() + } else { + p.lock.Unlock() + } p.pushBoost(data) } else { p.lock.Unlock() @@ -78,6 +82,40 @@ func (p *WorkerPool) Push(data Data) { } } +func (p *WorkerPool) zeroBoost() { + ctx, cancel := context.WithCancel(p.baseCtx) + mq := GetManager().GetManagedQueue(p.qid) + boost := p.boostWorkers + if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { + boost = p.maxNumberOfWorkers - p.numberOfWorkers + } + if mq != nil { + log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout) + + start := time.Now() + pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) + go func() { + select { + case <-ctx.Done(): + case <-time.After(p.boostTimeout): + } + mq.RemoveWorkers(pid) + cancel() + }() + } else { + log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) + go func() { + select { + case <-ctx.Done(): + case <-time.After(p.boostTimeout): + } + cancel() + }() + } + p.lock.Unlock() + p.addWorkers(ctx, boost) +} + func (p *WorkerPool) pushBoost(data Data) { select { case p.dataChan <- data: @@ -112,7 +150,7 @@ func (p *WorkerPool) pushBoost(data Data) { log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := mq.RegisterWorkers(boost, start, false, start, cancel, false) + pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) go func() { <-ctx.Done() mq.RemoveWorkers(pid)