Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resourcemanager: scheduler subtask in the pool's task #40670

Merged
merged 13 commits into from
Jan 29, 2023
2 changes: 2 additions & 0 deletions resourcemanager/pooltask/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go_library(
srcs = [
"task.go",
"task_manager.go",
"task_manager_iterator.go",
"task_manager_scheduler.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager/pooltask",
visibility = ["//visibility:public"],
Expand Down
36 changes: 18 additions & 18 deletions resourcemanager/pooltask/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,29 @@ type tContainer[T any, U any, C any, CT any, TF Context[CT]] struct {
task *TaskBox[T, U, C, CT, TF]
}

type meta struct {
stats *list.List
createTS time.Time
origin int32
running int32
type meta[T any, U any, C any, CT any, TF Context[CT]] struct {
stats *list.List
createTS time.Time
initialConcurrency int32
running atomic.Int32
}

func newStats(concurrency int32) *meta {
s := &meta{
createTS: time.Now(),
stats: list.New(),
origin: concurrency,
func newStats[T any, U any, C any, CT any, TF Context[CT]](concurrency int32) *meta[T, U, C, CT, TF] {
s := &meta[T, U, C, CT, TF]{
createTS: time.Now(),
stats: list.New(),
initialConcurrency: concurrency,
}
return s
}

func (m *meta) getOriginConcurrency() int32 {
return m.origin
func (m *meta[T, U, C, CT, TF]) getOriginConcurrency() int32 {
return m.initialConcurrency
}

// TaskStatusContainer is a container that can control or watch the pool.
type TaskStatusContainer[T any, U any, C any, CT any, TF Context[CT]] struct {
stats map[uint64]*meta
stats map[uint64]*meta[T, U, C, CT, TF]
rw sync.RWMutex
}

Expand All @@ -70,7 +70,7 @@ func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskMa
task := make([]TaskStatusContainer[T, U, C, CT, TF], shard)
for i := 0; i < shard; i++ {
task[i] = TaskStatusContainer[T, U, C, CT, TF]{
stats: make(map[uint64]*meta),
stats: make(map[uint64]*meta[T, U, C, CT, TF]),
}
}
return TaskManager[T, U, C, CT, TF]{
Expand All @@ -83,7 +83,7 @@ func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskMa
func (t *TaskManager[T, U, C, CT, TF]) RegisterTask(taskID uint64, concurrency int32) {
id := getShardID(taskID)
t.task[id].rw.Lock()
t.task[id].stats[taskID] = newStats(concurrency)
t.task[id].stats[taskID] = newStats[T, U, C, CT, TF](concurrency)
t.task[id].rw.Unlock()
}

Expand Down Expand Up @@ -113,7 +113,7 @@ func (t *TaskManager[T, U, C, CT, TF]) AddSubTask(taskID uint64, task *TaskBox[T
t.running.Inc()
t.task[shardID].rw.Lock()
t.task[shardID].stats[taskID].stats.PushBack(tc)
t.task[shardID].stats[taskID].running++ // running job in this task
t.task[shardID].stats[taskID].running.Inc() // running job in this task
t.task[shardID].rw.Unlock()
}

Expand All @@ -122,7 +122,7 @@ func (t *TaskManager[T, U, C, CT, TF]) ExitSubTask(taskID uint64) {
shardID := getShardID(taskID)
t.running.Dec() // total running tasks
t.task[shardID].rw.Lock()
t.task[shardID].stats[taskID].running-- // running job in this task
t.task[shardID].stats[taskID].running.Dec() // running job in this task
t.task[shardID].rw.Unlock()
}

Expand All @@ -131,7 +131,7 @@ func (t *TaskManager[T, U, C, CT, TF]) Running(taskID uint64) int32 {
shardID := getShardID(taskID)
t.task[shardID].rw.Lock()
defer t.task[shardID].rw.Unlock()
return t.task[shardID].stats[taskID].running
return t.task[shardID].stats[taskID].running.Load()
}

// StopTask is to stop a task by TaskID.
Expand Down
128 changes: 128 additions & 0 deletions resourcemanager/pooltask/task_manager_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pooltask

import (
"container/list"
"time"
)

func (t *TaskManager[T, U, C, CT, TF]) getBoostTask() (tid uint64, result *TaskBox[T, U, C, CT, TF]) {
// boost task
// 1、the count of running task is less than concurrency
// 2、less run time, more possible to boost
tid, element := t.iter(canBoost[T, U, C, CT, TF])
if element != nil {
return tid, element.Value.(tContainer[T, U, C, CT, TF]).task
}
return 0, nil
}

func (t *TaskManager[T, U, C, CT, TF]) pauseTask() {
// pause task,
// 1、more run time, more possible to pause
// 2、if task have been boosted, first to pause.
tid, result := t.iter(canPause[T, U, C, CT, TF])
if result != nil {
result.Value.(tContainer[T, U, C, CT, TF]).task.status.CompareAndSwap(RunningTask, StopTask)
// delete it from list
shardID := getShardID(tid)
t.task[shardID].rw.Lock()
defer t.task[shardID].rw.Unlock()
t.task[shardID].stats[tid].stats.Remove(result)
}
}

func (t *TaskManager[T, U, C, CT, TF]) iter(fn func(m *meta[T, U, C, CT, TF], max time.Time) (*list.Element, bool)) (tid uint64, result *list.Element) {
var compareTS time.Time
for i := 0; i < shard; i++ {
breakFind := func(index int) (breakFind bool) {
t.task[i].rw.RLock()
defer t.task[i].rw.RUnlock()
for id, stats := range t.task[i].stats {
if result == nil {
result = findTask[T, U, C, CT, TF](stats, RunningTask)
tid = id
compareTS = stats.createTS
continue
}
newResult, pauseFind := fn(stats, compareTS)
if pauseFind {
result = newResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If here we call canBoost, and we return nil, true https://github.com/pingcap/tidb/pull/40670/files#diff-f1e67ff3fe9cec68e358a2e0ab7bc91ecca0dfcee2e9827282a0b6c060449a64R101. Then newResult is nil, is this expected and why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, it is a misjudgment.

tid = id
compareTS = stats.createTS
return true
}
if newResult != nil {
result = newResult
tid = id
compareTS = stats.createTS
}
}
return false
}(shard)
if breakFind {
break
}
}
return tid, result
}

func canPause[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], max time.Time) (result *list.Element, isBreak bool) {
if m.initialConcurrency < m.running.Load() {
box := findTask[T, U, C, CT, TF](m, RunningTask)
if box != nil {
return box, true
}
}
if m.createTS.Before(max) {
box := findTask[T, U, C, CT, TF](m, RunningTask)
if box != nil {
return box, false
}
}
return nil, false
}

func canBoost[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], min time.Time) (result *list.Element, isBreak bool) {
if m.running.Load() >= m.initialConcurrency {
return nil, true
}
if m.createTS.After(min) {
box := getTask[T, U, C, CT, TF](m)
if box != nil {
return box, false
}
}
return nil, false
}

func findTask[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], status int32) *list.Element {
for e := m.stats.Front(); e != nil; e = e.Next() {
box := e.Value.(tContainer[T, U, C, CT, TF])
if box.task.status.Load() == status {
return e
}
}
return nil
}

func getTask[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF]) *list.Element {
e := m.stats.Front()
if e != nil {
return e
}
return nil
}
28 changes: 28 additions & 0 deletions resourcemanager/pooltask/task_manager_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pooltask

// Overclock is to increase the concurrency of pool.
func (t *TaskManager[T, U, C, CT, TF]) Overclock() (tid uint64, task *TaskBox[T, U, C, CT, TF]) {
if t.concurrency > t.running.Load() {
return t.getBoostTask()
}
return 0, nil
}

// Downclock is to decrease the concurrency of pool.
func (t *TaskManager[T, U, C, CT, TF]) Downclock() {
t.pauseTask()
}
12 changes: 11 additions & 1 deletion util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,22 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
p.SetLastTuneTs(time.Now())
p.capacity.Store(int32(size))
if size > capacity {
// boost
for i := 0; i < size-capacity; i++ {
if tid, boostTask := p.taskManager.Overclock(); boostTask != nil {
p.addWaitingTask()
p.taskManager.AddSubTask(tid, boostTask.Clone())
p.taskCh <- boostTask
}
}
if size-capacity == 1 {
p.cond.Signal()
return
}
p.cond.Broadcast()
return
}
if size < capacity {
p.taskManager.Downclock()
}
}

Expand Down
74 changes: 74 additions & 0 deletions util/gpool/spmc/spmcpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package spmc

import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/tidb/resourcemanager/pooltask"
rmutil "github.com/pingcap/tidb/resourcemanager/util"
Expand Down Expand Up @@ -121,6 +123,78 @@ func TestStopPool(t *testing.T) {
pool.ReleaseAndWait()
}

func TestTuneSimplePool(t *testing.T) {
testTunePool(t, "TestTuneSimplePool")
}

func TestTuneMultiPool(t *testing.T) {
var concurrency = 5
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func(id int) {
testTunePool(t, fmt.Sprintf("TestTuneMultiPool%d", id))
wg.Done()
}(i)
}
wg.Wait()
}

func testTunePool(t *testing.T, name string) {
type ConstArgs struct {
a int
}
myArgs := ConstArgs{a: 10}
// init the pool
// input type, output type, constArgs type
pool, err := NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext](name, 10, rmutil.UNKNOWN)
require.NoError(t, err)
pool.SetConsumerFunc(func(task int, constArgs ConstArgs, ctx any) int {
return task + constArgs.a
})

exit := make(chan struct{})

pfunc := func() (int, error) {
select {
case <-exit:
return 0, gpool.ErrProducerClosed
default:
return 1, nil
}
}
// add new task
resultCh, control := pool.AddProducer(pfunc, myArgs, pooltask.NilContext{}, WithConcurrency(10))
tid := control.TaskID()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for result := range resultCh {
require.Greater(t, result, 10)
}
}()
time.Sleep(1 * time.Second)
newSize := pool.Cap() - 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))

newSize = pool.Cap() + 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))

// exit test
close(exit)
control.Wait()
wg.Wait()
// close pool
pool.ReleaseAndWait()
}

func TestPoolWithEnoughCapacity(t *testing.T) {
const (
RunTimes = 1000
Expand Down
2 changes: 1 addition & 1 deletion util/gpool/spmc/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (w *goWorker[T, U, C, CT, TF]) run() {
for t := range f.GetTaskCh() {
if f.GetStatus() == pooltask.StopTask {
f.Done()
continue
break
}
f.GetResultCh() <- w.pool.consumerFunc(t.Task, f.ConstArgs(), ctx)
f.Done()
Expand Down