Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resourcemange: fix delete more workers than expected #40894

Merged
merged 14 commits into from
Feb 2, 2023
5 changes: 5 additions & 0 deletions resourcemanager/pooltask/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,8 @@ func (t *TaskManager[T, U, C, CT, TF]) StopTask(taskID uint64) {
}
}
}

// GetOriginConcurrency return the concurrency of the pool at the init.
func (t *TaskManager[T, U, C, CT, TF]) GetOriginConcurrency() int32 {
return t.concurrency
}
7 changes: 3 additions & 4 deletions resourcemanager/pooltask/task_manager_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
package pooltask

// Overclock is to increase the concurrency of pool.
// We should check the concurrency which meet the constraints before using it.
func (t *TaskManager[T, U, C, CT, TF]) Overclock() (tid uint64, task *TaskBox[T, U, C, CT, TF]) {
if t.concurrency > t.running.Load() {
return t.getBoostTask()
}
return 0, nil
return t.getBoostTask()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old logic is not very clear. If tasks in the queue are ready to run, it is unnecessary to create a task.

}

// Downclock is to decrease the concurrency of pool.
// We should check the concurrency which meet the constraints before using it.
func (t *TaskManager[T, U, C, CT, TF]) Downclock() {
t.pauseTask()
}
3 changes: 3 additions & 0 deletions resourcemanager/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
var (
// MinSchedulerInterval is the minimum interval between two scheduling.
MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond)

// MaxOverclockCount is the maximum number of overclock goroutine.
MaxOverclockCount = 1
)

// GoroutinePool is a pool interface
Expand Down
8 changes: 6 additions & 2 deletions util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,18 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
if capacity == -1 || size <= 0 || size == capacity {
return
}
if p.taskManager.GetOriginConcurrency()+int32(util.MaxOverclockCount) < int32(size) {
return
}
p.SetLastTuneTs(time.Now())
p.capacity.Store(int32(size))
if size > capacity {
for i := 0; i < size-capacity; i++ {
if tid, boostTask := p.taskManager.Overclock(); boostTask != nil {
p.addWaitingTask()
p.taskManager.AddSubTask(tid, boostTask.Clone())
p.taskCh <- boostTask
newTask := boostTask.Clone()
p.taskManager.AddSubTask(tid, newTask)
p.taskCh <- newTask
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an bug which it doesn't add copied object into the task queue

}
}
if size-capacity == 1 {
Expand Down
39 changes: 30 additions & 9 deletions util/gpool/spmc/spmcpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,22 @@ func testTunePool(t *testing.T, name string) {
}
}()
time.Sleep(1 * time.Second)
newSize := pool.Cap() - 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
downclockPool(t, pool, tid)
overclockPool(t, pool, tid)

newSize = pool.Cap() + 1
pool.Tune(newSize)
// at Overclock mode
overclockPool(t, pool, tid)

// Overclock mode, But it is invalid. It should keep the same size.
size := pool.Cap()
pool.Tune(pool.Cap() + 1)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
require.Equal(t, size, pool.Cap())
require.Equal(t, int32(size), pool.taskManager.Running(tid))

for n := pool.Cap(); n > 1; n-- {
downclockPool(t, pool, tid)
}

// exit test
close(exit)
Expand All @@ -195,6 +200,22 @@ func testTunePool(t *testing.T, name string) {
pool.ReleaseAndWait()
}

func overclockPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](t *testing.T, pool *Pool[T, U, C, CT, TF], tid uint64) {
newSize := pool.Cap() + 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
}

func downclockPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](t *testing.T, pool *Pool[T, U, C, CT, TF], tid uint64) {
newSize := pool.Cap() - 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
}

func TestPoolWithEnoughCapacity(t *testing.T) {
const (
RunTimes = 1000
Expand Down
2 changes: 1 addition & 1 deletion util/gpool/spmc/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (w *goWorker[T, U, C, CT, TF]) run() {
f.GetResultCh() <- w.pool.consumerFunc(t.Task, f.ConstArgs(), ctx)
f.Done()
}
w.pool.ExitSubTask(f.TaskID())
}
w.pool.ExitSubTask(f.TaskID())
f.Finish()
if ok := w.pool.revertWorker(w); !ok {
return
Expand Down