Skip to content

Commit df44017

Browse files
authored
Restart zero worker if there is still work to do (#18658)
* Restart zero worker if there is still work to do It is possible for the zero worker to timeout before all the work is finished. This may mean that work may take a long time to complete because a worker will only be induced on repushing. Also ensure that requested count is reset after pulls and push mirror sync requests and add some more trace logging to the queue push. Fix #18607 Signed-off-by: Andrew Thornton <art27@cantab.net>
1 parent 4d93984 commit df44017

File tree

2 files changed

+32
-10
lines changed

2 files changed

+32
-10
lines changed

modules/queue/workerpool.go

+14
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ func (p *WorkerPool) hasNoWorkerScaling() bool {
115115
return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0)
116116
}
117117

118+
// zeroBoost will add a temporary boost worker for a no worker queue
119+
// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function
120+
// (This is because addWorkers has to be called whilst unlocked)
118121
func (p *WorkerPool) zeroBoost() {
119122
ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
120123
mq := GetManager().GetManagedQueue(p.qid)
@@ -316,6 +319,17 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
316319
}
317320
p.pause()
318321
}
322+
select {
323+
case <-p.baseCtx.Done():
324+
// this worker queue is shut-down don't reboost
325+
default:
326+
if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
327+
// OK there are no workers but... there's still work to be done -> Reboost
328+
p.zeroBoost()
329+
// p.lock will be unlocked by zeroBoost
330+
return
331+
}
332+
}
319333
p.lock.Unlock()
320334
}()
321335
}

services/mirror/mirror.go

+18-10
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,13 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
5959

6060
handler := func(idx int, bean interface{}, limit int) error {
6161
var item SyncRequest
62+
var repo *repo_model.Repository
6263
if m, ok := bean.(*repo_model.Mirror); ok {
6364
if m.Repo == nil {
6465
log.Error("Disconnected mirror found: %d", m.ID)
6566
return nil
6667
}
68+
repo = m.Repo
6769
item = SyncRequest{
6870
Type: PullMirrorType,
6971
RepoID: m.RepoID,
@@ -73,6 +75,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
7375
log.Error("Disconnected push-mirror found: %d", m.ID)
7476
return nil
7577
}
78+
repo = m.Repo
7679
item = SyncRequest{
7780
Type: PushMirrorType,
7881
RepoID: m.RepoID,
@@ -89,17 +92,16 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
8992
default:
9093
}
9194

92-
// Check if this request is already in the queue
93-
has, err := mirrorQueue.Has(&item)
94-
if err != nil {
95-
return err
96-
}
97-
if has {
98-
return nil
99-
}
100-
10195
// Push to the Queue
10296
if err := mirrorQueue.Push(&item); err != nil {
97+
if err == queue.ErrAlreadyInQueue {
98+
if item.Type == PushMirrorType {
99+
log.Trace("PushMirrors for %-v already queued for sync", repo)
100+
} else {
101+
log.Trace("PullMirrors for %-v already queued for sync", repo)
102+
}
103+
return nil
104+
}
103105
return err
104106
}
105107

@@ -110,23 +112,29 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
110112
return nil
111113
}
112114

115+
pullMirrorsRequested := 0
113116
if pullLimit != 0 {
117+
requested = 0
114118
if err := repo_model.MirrorsIterate(func(idx int, bean interface{}) error {
115119
return handler(idx, bean, pullLimit)
116120
}); err != nil && err != errLimit {
117121
log.Error("MirrorsIterate: %v", err)
118122
return err
119123
}
124+
pullMirrorsRequested, requested = requested, 0
120125
}
126+
pushMirrorsRequested := 0
121127
if pushLimit != 0 {
128+
requested = 0
122129
if err := repo_model.PushMirrorsIterate(func(idx int, bean interface{}) error {
123130
return handler(idx, bean, pushLimit)
124131
}); err != nil && err != errLimit {
125132
log.Error("PushMirrorsIterate: %v", err)
126133
return err
127134
}
135+
pushMirrorsRequested, requested = requested, 0
128136
}
129-
log.Trace("Finished: Update")
137+
log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested)
130138
return nil
131139
}
132140

0 commit comments

Comments
 (0)