Skip to content

Commit e78b4df

Browse files
committed
Restart zero worker if there is still work to do (go-gitea#18658)
Backport go-gitea#18658 It is possible for the zero worker to timeout before all the work is finished. This may mean that work may take a long time to complete because a worker will only be induced on repushing. Also ensure that requested count is reset after pulls and push mirror sync requests and add some more trace logging to the queue push. Fix go-gitea#18607 Signed-off-by: Andrew Thornton <art27@cantab.net>
1 parent 8671602 commit e78b4df

File tree

2 files changed

+48
-10
lines changed

2 files changed

+48
-10
lines changed

modules/queue/workerpool.go

+30
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,20 @@ func (p *WorkerPool) Push(data Data) {
8787
}
8888
}
8989

90+
// HasNoWorkerScaling will return true if the queue has no workers, and has no worker boosting
91+
func (p *WorkerPool) HasNoWorkerScaling() bool {
92+
p.lock.Lock()
93+
defer p.lock.Unlock()
94+
return p.hasNoWorkerScaling()
95+
}
96+
97+
func (p *WorkerPool) hasNoWorkerScaling() bool {
98+
return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0)
99+
}
100+
101+
// zeroBoost will add a temporary boost worker for a no worker queue
102+
// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function
103+
// (This is because addWorkers has to be called whilst unlocked)
90104
func (p *WorkerPool) zeroBoost() {
91105
ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
92106
mq := GetManager().GetManagedQueue(p.qid)
@@ -277,6 +291,22 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
277291
p.cond.Broadcast()
278292
cancel()
279293
}
294+
295+
select {
296+
case <-p.baseCtx.Done():
297+
// Don't warn if the baseCtx is shutdown
298+
default:
299+
if p.hasNoWorkerScaling() {
300+
log.Warn(
301+
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
302+
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
303+
} else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
304+
// OK there are no workers but... there's still work to be done -> Reboost
305+
p.zeroBoost()
306+
// p.lock will be unlocked by zeroBoost
307+
return
308+
}
309+
}
280310
p.lock.Unlock()
281311
}()
282312
}

services/mirror/mirror.go

+18-10
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,13 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
5959

6060
handler := func(idx int, bean interface{}, limit int) error {
6161
var item SyncRequest
62+
var repo *repo_model.Repository
6263
if m, ok := bean.(*repo_model.Mirror); ok {
6364
if m.Repo == nil {
6465
log.Error("Disconnected mirror found: %d", m.ID)
6566
return nil
6667
}
68+
repo = m.Repo
6769
item = SyncRequest{
6870
Type: PullMirrorType,
6971
RepoID: m.RepoID,
@@ -73,6 +75,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
7375
log.Error("Disconnected push-mirror found: %d", m.ID)
7476
return nil
7577
}
78+
repo = m.Repo
7679
item = SyncRequest{
7780
Type: PushMirrorType,
7881
RepoID: m.RepoID,
@@ -89,17 +92,16 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
8992
default:
9093
}
9194

92-
// Check if this request is already in the queue
93-
has, err := mirrorQueue.Has(&item)
94-
if err != nil {
95-
return err
96-
}
97-
if has {
98-
return nil
99-
}
100-
10195
// Push to the Queue
10296
if err := mirrorQueue.Push(&item); err != nil {
97+
if err == queue.ErrAlreadyInQueue {
98+
if item.Type == PushMirrorType {
99+
log.Trace("PushMirrors for %-v already queued for sync", repo)
100+
} else {
101+
log.Trace("PullMirrors for %-v already queued for sync", repo)
102+
}
103+
return nil
104+
}
103105
return err
104106
}
105107

@@ -110,23 +112,29 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
110112
return nil
111113
}
112114

115+
pullMirrorsRequested := 0
113116
if pullLimit != 0 {
117+
requested = 0
114118
if err := repo_model.MirrorsIterate(func(idx int, bean interface{}) error {
115119
return handler(idx, bean, pullLimit)
116120
}); err != nil && err != errLimit {
117121
log.Error("MirrorsIterate: %v", err)
118122
return err
119123
}
124+
pullMirrorsRequested, requested = requested, 0
120125
}
126+
pushMirrorsRequested := 0
121127
if pushLimit != 0 {
128+
requested = 0
122129
if err := repo_model.PushMirrorsIterate(func(idx int, bean interface{}) error {
123130
return handler(idx, bean, pushLimit)
124131
}); err != nil && err != errLimit {
125132
log.Error("PushMirrorsIterate: %v", err)
126133
return err
127134
}
135+
pushMirrorsRequested, requested = requested, 0
128136
}
129-
log.Trace("Finished: Update")
137+
log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested)
130138
return nil
131139
}
132140

0 commit comments

Comments
 (0)