From b1dd30047d084777e8005d08965780e2d36930bc Mon Sep 17 00:00:00 2001
From: Andrew Thornton <art27@cantab.net>
Date: Mon, 7 Feb 2022 17:21:20 +0000
Subject: [PATCH 1/3] Restart zero worker if there is still work to do

It is possible for the zero worker to timeout before all the work is finished.
This may mean that work may take a long time to complete because a worker will only
be induced on repushing.

Fix #18607

Signed-off-by: Andrew Thornton <art27@cantab.net>
---
 modules/queue/workerpool.go | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 20108d35886ca..39ea59b7b1c21 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -115,6 +115,9 @@ func (p *WorkerPool) hasNoWorkerScaling() bool {
 	return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0)
 }
 
+// zeroBoost will add a temporary boost worker for a no worker queue
+// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function
+// (This is because addWorkers has to be called whilst unlocked)
 func (p *WorkerPool) zeroBoost() {
 	ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
 	mq := GetManager().GetManagedQueue(p.qid)
@@ -316,6 +319,17 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
 				}
 				p.pause()
 			}
+			select {
+			case <-p.baseCtx.Done():
+				// this worker queue is shut-down don't reboost
+			default:
+				if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
+					// OK there are no workers but... there's still work to be done -> Reboost
+					p.zeroBoost()
+					// p.lock will be unlocked by zeroBoost
+					return
+				}
+			}
 			p.lock.Unlock()
 		}()
 	}

From 71fc12e3186682bf5d24e3576b2c2da9b2f6cdd3 Mon Sep 17 00:00:00 2001
From: Andrew Thornton <art27@cantab.net>
Date: Mon, 7 Feb 2022 18:12:41 +0000
Subject: [PATCH 2/3] Also handle ErrInQueue properly

Signed-off-by: Andrew Thornton <art27@cantab.net>
---
 services/mirror/mirror.go | 12 +++---------
 1 file changed, 3 insertions(+), 9 deletions(-)

diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index 6f285ec467c63..015743920c4e6 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -89,17 +89,11 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
 		default:
 		}
 
-		// Check if this request is already in the queue
-		has, err := mirrorQueue.Has(&item)
-		if err != nil {
-			return err
-		}
-		if has {
-			return nil
-		}
-
 		// Push to the Queue
 		if err := mirrorQueue.Push(&item); err != nil {
+			if err == queue.ErrAlreadyInQueue {
+				return nil
+			}
 			return err
 		}
 

From a7ae4d767e9ff039300884ef50b0c717b63facdc Mon Sep 17 00:00:00 2001
From: Andrew Thornton <art27@cantab.net>
Date: Mon, 7 Feb 2022 18:23:27 +0000
Subject: [PATCH 3/3] ensure requested is reset between pull and push mirrors

Signed-off-by: Andrew Thornton <art27@cantab.net>
---
 services/mirror/mirror.go | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index 015743920c4e6..5639a08f96401 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -59,11 +59,13 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
 
 	handler := func(idx int, bean interface{}, limit int) error {
 		var item SyncRequest
+		var repo *repo_model.Repository
 		if m, ok := bean.(*repo_model.Mirror); ok {
 			if m.Repo == nil {
 				log.Error("Disconnected mirror found: %d", m.ID)
 				return nil
 			}
+			repo = m.Repo
 			item = SyncRequest{
 				Type:   PullMirrorType,
 				RepoID: m.RepoID,
@@ -73,6 +75,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
 				log.Error("Disconnected push-mirror found: %d", m.ID)
 				return nil
 			}
+			repo = m.Repo
 			item = SyncRequest{
 				Type:   PushMirrorType,
 				RepoID: m.RepoID,
@@ -92,6 +95,11 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
 		// Push to the Queue
 		if err := mirrorQueue.Push(&item); err != nil {
 			if err == queue.ErrAlreadyInQueue {
+				if item.Type == PushMirrorType {
+					log.Trace("PushMirrors for %-v already queued for sync", repo)
+				} else {
+					log.Trace("PullMirrors for %-v already queued for sync", repo)
+				}
 				return nil
 			}
 			return err
@@ -104,23 +112,29 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
 		return nil
 	}
 
+	pullMirrorsRequested := 0
 	if pullLimit != 0 {
+		requested = 0
 		if err := repo_model.MirrorsIterate(func(idx int, bean interface{}) error {
 			return handler(idx, bean, pullLimit)
 		}); err != nil && err != errLimit {
 			log.Error("MirrorsIterate: %v", err)
 			return err
 		}
+		pullMirrorsRequested, requested = requested, 0
 	}
+	pushMirrorsRequested := 0
 	if pushLimit != 0 {
+		requested = 0
 		if err := repo_model.PushMirrorsIterate(func(idx int, bean interface{}) error {
 			return handler(idx, bean, pushLimit)
 		}); err != nil && err != errLimit {
 			log.Error("PushMirrorsIterate: %v", err)
 			return err
 		}
+		pushMirrorsRequested, requested = requested, 0
 	}
-	log.Trace("Finished: Update")
+	log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested)
 	return nil
 }