Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resourcemange: fix delete more workers than expected #40894

Merged
merged 14 commits into from
Feb 2, 2023
13 changes: 13 additions & 0 deletions resourcemanager/pooltask/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ func (t *TaskManager[T, U, C, CT, TF]) AddSubTask(taskID uint64, task *TaskBox[T
t.task[shardID].rw.Unlock()
}

// SubTaskCnt is to get the count of pool task in the queue.
func (t *TaskManager[T, U, C, CT, TF]) SubTaskCnt(taskID uint64) int {
shardID := getShardID(taskID)
t.task[shardID].rw.RLock()
defer t.task[shardID].rw.RUnlock()
return t.task[shardID].stats[taskID].stats.Len()
}

// ExitSubTask is to exit a task, and it will decrease the count of running pooltask.
func (t *TaskManager[T, U, C, CT, TF]) ExitSubTask(taskID uint64) {
shardID := getShardID(taskID)
Expand Down Expand Up @@ -148,3 +156,8 @@ func (t *TaskManager[T, U, C, CT, TF]) StopTask(taskID uint64) {
}
}
}

// GetOriginConcurrency return the concurrency of the pool at the init.
func (t *TaskManager[T, U, C, CT, TF]) GetOriginConcurrency() int32 {
return t.concurrency
}
6 changes: 3 additions & 3 deletions resourcemanager/pooltask/task_manager_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package pooltask

// Overclock is to increase the concurrency of pool.
func (t *TaskManager[T, U, C, CT, TF]) Overclock() (tid uint64, task *TaskBox[T, U, C, CT, TF]) {
if t.concurrency > t.running.Load() {
return t.getBoostTask()
if t.running.Load()+1 >= int32(t.SubTaskCnt(tid)) {
return
}
return 0, nil
return t.getBoostTask()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old logic is not very clear. If tasks in the queue are ready to run, it is unnecessary to create a task.

}

// Downclock is to decrease the concurrency of pool.
Expand Down
3 changes: 3 additions & 0 deletions resourcemanager/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
var (
// MinSchedulerInterval is the minimum interval between two scheduling.
MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond)

// MaxOverclockCount is the maximum number of overclock goroutine.
MaxOverclockCount = 1
)

// GoroutinePool is a pool interface
Expand Down
8 changes: 6 additions & 2 deletions util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,18 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
if capacity == -1 || size <= 0 || size == capacity {
return
}
if p.taskManager.GetOriginConcurrency()+int32(util.MaxOverclockCount) < int32(size) {
return
}
p.SetLastTuneTs(time.Now())
p.capacity.Store(int32(size))
if size > capacity {
for i := 0; i < size-capacity; i++ {
if tid, boostTask := p.taskManager.Overclock(); boostTask != nil {
p.addWaitingTask()
p.taskManager.AddSubTask(tid, boostTask.Clone())
p.taskCh <- boostTask
newTask := boostTask.Clone()
p.taskManager.AddSubTask(tid, newTask)
p.taskCh <- newTask
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an bug which it doesn't add copied object into the task queue

}
}
if size-capacity == 1 {
Expand Down
39 changes: 30 additions & 9 deletions util/gpool/spmc/spmcpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,22 @@ func testTunePool(t *testing.T, name string) {
}
}()
time.Sleep(1 * time.Second)
newSize := pool.Cap() - 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
downclockPool(t, pool, tid)
overclockPool(t, pool, tid)

newSize = pool.Cap() + 1
pool.Tune(newSize)
// at Overclock mode
overclockPool(t, pool, tid)

// Overclock mode, But it is invalid. It should keep the same size.
size := pool.Cap()
pool.Tune(pool.Cap() + 1)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
require.Equal(t, size, pool.Cap())
require.Equal(t, int32(size), pool.taskManager.Running(tid))

for n := pool.Cap(); n > 1; n-- {
downclockPool(t, pool, tid)
}

// exit test
close(exit)
Expand All @@ -195,6 +200,22 @@ func testTunePool(t *testing.T, name string) {
pool.ReleaseAndWait()
}

func overclockPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](t *testing.T, pool *Pool[T, U, C, CT, TF], tid uint64) {
newSize := pool.Cap() + 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
}

func downclockPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](t *testing.T, pool *Pool[T, U, C, CT, TF], tid uint64) {
newSize := pool.Cap() - 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
}

func TestPoolWithEnoughCapacity(t *testing.T) {
const (
RunTimes = 1000
Expand Down
2 changes: 1 addition & 1 deletion util/gpool/spmc/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (w *goWorker[T, U, C, CT, TF]) run() {
f.GetResultCh() <- w.pool.consumerFunc(t.Task, f.ConstArgs(), ctx)
f.Done()
}
w.pool.ExitSubTask(f.TaskID())
}
w.pool.ExitSubTask(f.TaskID())
f.Finish()
if ok := w.pool.revertWorker(w); !ok {
return
Expand Down