From dc5a4d024b4d95f63e60ccc898fa9b4adb7a3502 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 22:20:38 +0800 Subject: [PATCH 1/9] resourcemanager: fix unavailable Stop Signed-off-by: Weizhen Wang --- resourcemanager/pooltask/task.go | 28 +++++++++++++++------------- util/gpool/spmc/spmcpool.go | 16 ++++++++++++++-- util/gpool/spmc/spmcpool_test.go | 5 +---- 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/resourcemanager/pooltask/task.go b/resourcemanager/pooltask/task.go index e166e24f76b4c..81db478426904 100644 --- a/resourcemanager/pooltask/task.go +++ b/resourcemanager/pooltask/task.go @@ -127,27 +127,29 @@ type GPool[T any, U any, C any, CT any, TF Context[CT]] interface { // TaskController is a controller that can control or watch the pool. type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct { - pool GPool[T, U, C, CT, TF] - close chan struct{} - wg *sync.WaitGroup - taskID uint64 - resultCh chan U + pool GPool[T, U, C, CT, TF] + closeCh chan struct{} + productCloseCh chan struct{} + wg *sync.WaitGroup + taskID uint64 + resultCh chan U } // NewTaskController create a controller to deal with pooltask's status. -func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, closeCh chan struct{}, wg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] { +func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, closeCh, productCloseCh chan struct{}, wg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] { return TaskController[T, U, C, CT, TF]{ - pool: p, - taskID: taskID, - close: closeCh, - wg: wg, - resultCh: resultCh, + pool: p, + taskID: taskID, + closeCh: closeCh, + productCloseCh: productCloseCh, + wg: wg, + resultCh: resultCh, } } // Wait is to wait the pool task to stop. func (t *TaskController[T, U, C, CT, TF]) Wait() { - <-t.close + <-t.closeCh t.wg.Wait() close(t.resultCh) t.pool.DeleteTask(t.taskID) @@ -155,7 +157,7 @@ func (t *TaskController[T, U, C, CT, TF]) Wait() { // Stop is to send stop command to the task. But you still need to wait the task to stop. func (t *TaskController[T, U, C, CT, TF]) Stop() { - t.pool.StopTask(t.TaskID()) + close(t.productCloseCh) } // TaskID is to get the task id. diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index 6f65ca98aba01..1f91b3367ddaf 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -261,8 +261,9 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), var wg sync.WaitGroup result := make(chan U, opt.ResultChanLen) closeCh := make(chan struct{}) + productCloseCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, &wg, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, productCloseCh, &wg, result) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) for i := 0; i < opt.Concurrency; i++ { err := p.run() @@ -298,6 +299,11 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), } inputCh <- task } + select { + case <-productCloseCh: + return + default: + } } }() return result, tc @@ -310,10 +316,11 @@ func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg taskID := p.NewTaskID() var wg sync.WaitGroup result := make(chan U, opt.ResultChanLen) + productCloseCh := make(chan struct{}) closeCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, &wg, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, productCloseCh, &wg, result) for i := 0; i < opt.Concurrency; i++ { err := p.run() if err == gpool.ErrPoolClosed { @@ -346,6 +353,11 @@ func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg Task: task, } inputCh <- t + select { + case <-productCloseCh: + return + default: + } } }() return result, tc diff --git a/util/gpool/spmc/spmcpool_test.go b/util/gpool/spmc/spmcpool_test.go index 83d02d2d47ac2..053f931d9d46c 100644 --- a/util/gpool/spmc/spmcpool_test.go +++ b/util/gpool/spmc/spmcpool_test.go @@ -114,10 +114,7 @@ func TestStopPool(t *testing.T) { }() // Waiting task finishing control.Stop() - close(exit) control.Wait() - // it should pass. Stop can be used after the pool is closed. we should prevent it from panic. - control.Stop() wg.Wait() // close pool pool.ReleaseAndWait() @@ -193,7 +190,7 @@ func testTunePool(t *testing.T, name string) { } // exit test - close(exit) + control.Stop() control.Wait() wg.Wait() // close pool From 91f694c0b4e2868b26338f2c6744d7945a40a1fd Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 22:36:42 +0800 Subject: [PATCH 2/9] resourcemanager: fix unavailable Stop Signed-off-by: Weizhen Wang --- resourcemanager/pooltask/task.go | 1 + util/gpool/spmc/spmcpool.go | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/resourcemanager/pooltask/task.go b/resourcemanager/pooltask/task.go index 81db478426904..ad048129856ec 100644 --- a/resourcemanager/pooltask/task.go +++ b/resourcemanager/pooltask/task.go @@ -158,6 +158,7 @@ func (t *TaskController[T, U, C, CT, TF]) Wait() { // Stop is to send stop command to the task. But you still need to wait the task to stop. func (t *TaskController[T, U, C, CT, TF]) Stop() { close(t.productCloseCh) + t.pool.StopTask(t.TaskID()) } // TaskID is to get the task id. diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index 1f91b3367ddaf..bc7fee844581e 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -298,11 +298,11 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), Task: task, } inputCh <- task - } - select { - case <-productCloseCh: - return - default: + select { + case <-productCloseCh: + return + default: + } } } }() From 67c9de01f409c028899571df3dbb6e2f42609bc0 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 22:46:00 +0800 Subject: [PATCH 3/9] resourcemanager: fix unavailable Stop Signed-off-by: Weizhen Wang --- resourcemanager/pooltask/task.go | 5 ++++- util/gpool/spmc/spmcpool.go | 12 ++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/resourcemanager/pooltask/task.go b/resourcemanager/pooltask/task.go index ad048129856ec..edcad4f26e587 100644 --- a/resourcemanager/pooltask/task.go +++ b/resourcemanager/pooltask/task.go @@ -131,18 +131,20 @@ type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct { closeCh chan struct{} productCloseCh chan struct{} wg *sync.WaitGroup + prodWg *sync.WaitGroup taskID uint64 resultCh chan U } // NewTaskController create a controller to deal with pooltask's status. -func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, closeCh, productCloseCh chan struct{}, wg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] { +func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, closeCh, productCloseCh chan struct{}, wg, prodWg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] { return TaskController[T, U, C, CT, TF]{ pool: p, taskID: taskID, closeCh: closeCh, productCloseCh: productCloseCh, wg: wg, + prodWg: prodWg, resultCh: resultCh, } } @@ -158,6 +160,7 @@ func (t *TaskController[T, U, C, CT, TF]) Wait() { // Stop is to send stop command to the task. But you still need to wait the task to stop. func (t *TaskController[T, U, C, CT, TF]) Stop() { close(t.productCloseCh) + t.wg.Wait() t.pool.StopTask(t.TaskID()) } diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index bc7fee844581e..b5c74ca275976 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -258,12 +258,12 @@ func (p *Pool[T, U, C, CT, TF]) SetConsumerFunc(consumerFunc func(T, C, CT) U) { func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF]) { opt := loadTaskOptions(options...) taskID := p.NewTaskID() - var wg sync.WaitGroup + var wg, prodWg sync.WaitGroup result := make(chan U, opt.ResultChanLen) closeCh := make(chan struct{}) productCloseCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, productCloseCh, &wg, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, productCloseCh, &wg, &prodWg, result) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) for i := 0; i < opt.Concurrency; i++ { err := p.run() @@ -275,6 +275,7 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), p.taskManager.AddSubTask(taskID, &taskBox) p.taskCh <- &taskBox } + prodWg.Add(1) go func() { defer func() { if r := recover(); r != nil { @@ -282,6 +283,7 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), } close(closeCh) close(inputCh) + prodWg.Done() }() for { tasks, err := producer() @@ -314,13 +316,13 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF]) { opt := loadTaskOptions(options...) taskID := p.NewTaskID() - var wg sync.WaitGroup + var wg, prodWg sync.WaitGroup result := make(chan U, opt.ResultChanLen) productCloseCh := make(chan struct{}) closeCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, productCloseCh, &wg, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, productCloseCh, &wg, &prodWg, result) for i := 0; i < opt.Concurrency; i++ { err := p.run() if err == gpool.ErrPoolClosed { @@ -331,6 +333,7 @@ func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg p.taskManager.AddSubTask(taskID, &taskBox) p.taskCh <- &taskBox } + prodWg.Add(1) go func() { defer func() { if r := recover(); r != nil { @@ -338,6 +341,7 @@ func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg } close(closeCh) close(inputCh) + prodWg.Done() }() for { task, err := producer() From 2546192f348812290d24996828834526a58b2323 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 22:49:06 +0800 Subject: [PATCH 4/9] resourcemanager: fix unavailable Stop Signed-off-by: Weizhen Wang --- resourcemanager/pooltask/task.go | 6 ++---- util/gpool/spmc/spmcpool.go | 8 ++------ 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/resourcemanager/pooltask/task.go b/resourcemanager/pooltask/task.go index edcad4f26e587..bbd79ebe7e019 100644 --- a/resourcemanager/pooltask/task.go +++ b/resourcemanager/pooltask/task.go @@ -128,7 +128,6 @@ type GPool[T any, U any, C any, CT any, TF Context[CT]] interface { // TaskController is a controller that can control or watch the pool. type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct { pool GPool[T, U, C, CT, TF] - closeCh chan struct{} productCloseCh chan struct{} wg *sync.WaitGroup prodWg *sync.WaitGroup @@ -137,11 +136,10 @@ type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct { } // NewTaskController create a controller to deal with pooltask's status. -func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, closeCh, productCloseCh chan struct{}, wg, prodWg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] { +func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, productCloseCh chan struct{}, wg, prodWg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] { return TaskController[T, U, C, CT, TF]{ pool: p, taskID: taskID, - closeCh: closeCh, productCloseCh: productCloseCh, wg: wg, prodWg: prodWg, @@ -151,7 +149,7 @@ func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U // Wait is to wait the pool task to stop. func (t *TaskController[T, U, C, CT, TF]) Wait() { - <-t.closeCh + t.prodWg.Wait() t.wg.Wait() close(t.resultCh) t.pool.DeleteTask(t.taskID) diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index b5c74ca275976..d8b330f4a5c76 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -260,10 +260,9 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), taskID := p.NewTaskID() var wg, prodWg sync.WaitGroup result := make(chan U, opt.ResultChanLen) - closeCh := make(chan struct{}) productCloseCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, productCloseCh, &wg, &prodWg, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productCloseCh, &wg, &prodWg, result) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) for i := 0; i < opt.Concurrency; i++ { err := p.run() @@ -281,7 +280,6 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), if r := recover(); r != nil { logutil.BgLogger().Error("producer panic", zap.Any("recover", r), zap.Stack("stack")) } - close(closeCh) close(inputCh) prodWg.Done() }() @@ -319,10 +317,9 @@ func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg var wg, prodWg sync.WaitGroup result := make(chan U, opt.ResultChanLen) productCloseCh := make(chan struct{}) - closeCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, closeCh, productCloseCh, &wg, &prodWg, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productCloseCh, &wg, &prodWg, result) for i := 0; i < opt.Concurrency; i++ { err := p.run() if err == gpool.ErrPoolClosed { @@ -339,7 +336,6 @@ func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg if r := recover(); r != nil { logutil.BgLogger().Error("producer panic", zap.Any("recover", r), zap.Stack("stack")) } - close(closeCh) close(inputCh) prodWg.Done() }() From 668afe1de95c7ef15d623863f099ffa19a9f6008 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 22:58:15 +0800 Subject: [PATCH 5/9] resourcemanager: fix unavailable Stop Signed-off-by: Weizhen Wang --- util/gpool/spmc/spmcpool.go | 59 +++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index d8b330f4a5c76..1f25e557639e7 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -284,26 +284,27 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), prodWg.Done() }() for { - tasks, err := producer() - if err != nil { - if errors.Is(err, gpool.ErrProducerClosed) { - return - } - log.Error("producer error", zap.Error(err)) + select { + case <-productCloseCh: return - } - for _, task := range tasks { - wg.Add(1) - task := pooltask.Task[T]{ - Task: task, - } - inputCh <- task - select { - case <-productCloseCh: + default: + tasks, err := producer() + if err != nil { + if errors.Is(err, gpool.ErrProducerClosed) { + return + } + log.Error("producer error", zap.Error(err)) return - default: + } + for _, task := range tasks { + wg.Add(1) + task := pooltask.Task[T]{ + Task: task, + } + inputCh <- task } } + } }() return result, tc @@ -340,23 +341,23 @@ func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg prodWg.Done() }() for { - task, err := producer() - if err != nil { - if errors.Is(err, gpool.ErrProducerClosed) { - return - } - log.Error("producer error", zap.Error(err)) - return - } - wg.Add(1) - t := pooltask.Task[T]{ - Task: task, - } - inputCh <- t select { case <-productCloseCh: return default: + task, err := producer() + if err != nil { + if errors.Is(err, gpool.ErrProducerClosed) { + return + } + log.Error("producer error", zap.Error(err)) + return + } + wg.Add(1) + t := pooltask.Task[T]{ + Task: task, + } + inputCh <- t } } }() From 49892de546e4a5a5e57b352ff1fa77d5703ee30e Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 23:14:39 +0800 Subject: [PATCH 6/9] resourcemanager: fix unavailable Stop Signed-off-by: Weizhen Wang --- resourcemanager/pooltask/task.go | 2 +- util/gpool/spmc/option.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/resourcemanager/pooltask/task.go b/resourcemanager/pooltask/task.go index bbd79ebe7e019..de768c7c020d8 100644 --- a/resourcemanager/pooltask/task.go +++ b/resourcemanager/pooltask/task.go @@ -158,7 +158,7 @@ func (t *TaskController[T, U, C, CT, TF]) Wait() { // Stop is to send stop command to the task. But you still need to wait the task to stop. func (t *TaskController[T, U, C, CT, TF]) Stop() { close(t.productCloseCh) - t.wg.Wait() + t.prodWg.Wait() t.pool.StopTask(t.TaskID()) } diff --git a/util/gpool/spmc/option.go b/util/gpool/spmc/option.go index e317ce157b93d..8c86497860be9 100644 --- a/util/gpool/spmc/option.go +++ b/util/gpool/spmc/option.go @@ -103,8 +103,8 @@ func loadTaskOptions(options ...TaskOption) *TaskOptions { if opts.ResultChanLen == 0 { opts.ResultChanLen = uint64(opts.Concurrency) } - if opts.ResultChanLen == 0 { - opts.ResultChanLen = uint64(opts.Concurrency) + if opts.TaskChanLen == 0 { + opts.TaskChanLen = uint64(opts.Concurrency) } return opts } From a301a626a65621ec299d582881fff741baf055c3 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 23:21:44 +0800 Subject: [PATCH 7/9] resourcemanager: fix unavailable Stop Signed-off-by: Weizhen Wang --- resourcemanager/pooltask/task.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/resourcemanager/pooltask/task.go b/resourcemanager/pooltask/task.go index de768c7c020d8..4f675c54b9198 100644 --- a/resourcemanager/pooltask/task.go +++ b/resourcemanager/pooltask/task.go @@ -17,6 +17,8 @@ package pooltask import ( "sync" "sync/atomic" + + "github.com/pingcap/tidb/util/channel" ) // Context is a interface that can be used to create a context. @@ -160,6 +162,7 @@ func (t *TaskController[T, U, C, CT, TF]) Stop() { close(t.productCloseCh) t.prodWg.Wait() t.pool.StopTask(t.TaskID()) + channel.Clear(t.resultCh) } // TaskID is to get the task id. From 4cf07cc9fed0e92d0ce73b4d0353bc2ae84eeb00 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 23:32:51 +0800 Subject: [PATCH 8/9] resourcemanager: fix unavailable Stop Signed-off-by: Weizhen Wang --- resourcemanager/pooltask/BUILD.bazel | 5 +++- resourcemanager/pooltask/task.go | 6 ++-- util/gpool/spmc/BUILD.bazel | 1 + util/gpool/spmc/spmcpool.go | 4 +-- util/gpool/spmc/spmcpool_test.go | 42 ++++++++++++++++++++++++++++ 5 files changed, 53 insertions(+), 5 deletions(-) diff --git a/resourcemanager/pooltask/BUILD.bazel b/resourcemanager/pooltask/BUILD.bazel index c4113b69dd141..6171e1fa3598d 100644 --- a/resourcemanager/pooltask/BUILD.bazel +++ b/resourcemanager/pooltask/BUILD.bazel @@ -10,7 +10,10 @@ go_library( ], importpath = "github.com/pingcap/tidb/resourcemanager/pooltask", visibility = ["//visibility:public"], - deps = ["@org_uber_go_atomic//:atomic"], + deps = [ + "//util/channel", + "@org_uber_go_atomic//:atomic", + ], ) go_test( diff --git a/resourcemanager/pooltask/task.go b/resourcemanager/pooltask/task.go index 4f675c54b9198..0c2f2f406af21 100644 --- a/resourcemanager/pooltask/task.go +++ b/resourcemanager/pooltask/task.go @@ -135,10 +135,11 @@ type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct { prodWg *sync.WaitGroup taskID uint64 resultCh chan U + inputCh chan Task[T] } // NewTaskController create a controller to deal with pooltask's status. -func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, productCloseCh chan struct{}, wg, prodWg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] { +func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, productCloseCh chan struct{}, wg, prodWg *sync.WaitGroup, inputCh chan Task[T], resultCh chan U) TaskController[T, U, C, CT, TF] { return TaskController[T, U, C, CT, TF]{ pool: p, taskID: taskID, @@ -146,6 +147,7 @@ func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U wg: wg, prodWg: prodWg, resultCh: resultCh, + inputCh: inputCh, } } @@ -160,7 +162,7 @@ func (t *TaskController[T, U, C, CT, TF]) Wait() { // Stop is to send stop command to the task. But you still need to wait the task to stop. func (t *TaskController[T, U, C, CT, TF]) Stop() { close(t.productCloseCh) - t.prodWg.Wait() + channel.Clear(t.inputCh) t.pool.StopTask(t.TaskID()) channel.Clear(t.resultCh) } diff --git a/util/gpool/spmc/BUILD.bazel b/util/gpool/spmc/BUILD.bazel index db4d724052666..4f1ae6965cf0f 100644 --- a/util/gpool/spmc/BUILD.bazel +++ b/util/gpool/spmc/BUILD.bazel @@ -34,6 +34,7 @@ go_test( embed = [":spmc"], flaky = True, race = "on", + shard_count = 2, deps = [ "//resourcemanager/pooltask", "//resourcemanager/util", diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index 1f25e557639e7..b434b4d4d945b 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -262,7 +262,7 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), result := make(chan U, opt.ResultChanLen) productCloseCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productCloseCh, &wg, &prodWg, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productCloseCh, &wg, &prodWg, inputCh, result) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) for i := 0; i < opt.Concurrency; i++ { err := p.run() @@ -320,7 +320,7 @@ func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg productCloseCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productCloseCh, &wg, &prodWg, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productCloseCh, &wg, &prodWg, inputCh, result) for i := 0; i < opt.Concurrency; i++ { err := p.run() if err == gpool.ErrPoolClosed { diff --git a/util/gpool/spmc/spmcpool_test.go b/util/gpool/spmc/spmcpool_test.go index 053f931d9d46c..62387feb24aad 100644 --- a/util/gpool/spmc/spmcpool_test.go +++ b/util/gpool/spmc/spmcpool_test.go @@ -120,6 +120,48 @@ func TestStopPool(t *testing.T) { pool.ReleaseAndWait() } +func TestStopPoolWithSlice(t *testing.T) { + type ConstArgs struct { + a int + } + myArgs := ConstArgs{a: 10} + // init the pool + // input type, output type, constArgs type + pool, err := NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext]("TestStopPoolWithSlice", 3, rmutil.UNKNOWN) + require.NoError(t, err) + pool.SetConsumerFunc(func(task int, constArgs ConstArgs, ctx any) int { + return task + constArgs.a + }) + + exit := make(chan struct{}) + + pfunc := func() ([]int, error) { + select { + case <-exit: + return nil, gpool.ErrProducerClosed + default: + return []int{1, 2, 3}, nil + } + } + // add new task + resultCh, control := pool.AddProduceBySlice(pfunc, myArgs, pooltask.NilContext{}, WithConcurrency(4)) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for result := range resultCh { + require.Greater(t, result, 10) + } + }() + // Waiting task finishing + control.Stop() + control.Wait() + wg.Wait() + // close pool + pool.ReleaseAndWait() +} + func TestTuneSimplePool(t *testing.T) { testTunePool(t, "TestTuneSimplePool") } From e741e6234195417c0dff53aed8fe521847fbd3b5 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 23:35:40 +0800 Subject: [PATCH 9/9] resourcemanager: fix unavailable Stop Signed-off-by: Weizhen Wang --- resourcemanager/pooltask/task.go | 38 ++++++++-------- util/gpool/spmc/BUILD.bazel | 1 - util/gpool/spmc/option.go | 4 +- util/gpool/spmc/spmcpool.go | 76 +++++++++++++++----------------- util/gpool/spmc/spmcpool_test.go | 19 +++++--- 5 files changed, 71 insertions(+), 67 deletions(-) diff --git a/resourcemanager/pooltask/task.go b/resourcemanager/pooltask/task.go index 0c2f2f406af21..88134e684065b 100644 --- a/resourcemanager/pooltask/task.go +++ b/resourcemanager/pooltask/task.go @@ -129,31 +129,28 @@ type GPool[T any, U any, C any, CT any, TF Context[CT]] interface { // TaskController is a controller that can control or watch the pool. type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct { - pool GPool[T, U, C, CT, TF] - productCloseCh chan struct{} - wg *sync.WaitGroup - prodWg *sync.WaitGroup - taskID uint64 - resultCh chan U - inputCh chan Task[T] + pool GPool[T, U, C, CT, TF] + productExitCh chan struct{} + wg *sync.WaitGroup + taskID uint64 + resultCh chan U + inputCh chan Task[T] } // NewTaskController create a controller to deal with pooltask's status. -func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, productCloseCh chan struct{}, wg, prodWg *sync.WaitGroup, inputCh chan Task[T], resultCh chan U) TaskController[T, U, C, CT, TF] { +func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, productExitCh chan struct{}, wg *sync.WaitGroup, inputCh chan Task[T], resultCh chan U) TaskController[T, U, C, CT, TF] { return TaskController[T, U, C, CT, TF]{ - pool: p, - taskID: taskID, - productCloseCh: productCloseCh, - wg: wg, - prodWg: prodWg, - resultCh: resultCh, - inputCh: inputCh, + pool: p, + taskID: taskID, + productExitCh: productExitCh, + wg: wg, + resultCh: resultCh, + inputCh: inputCh, } } // Wait is to wait the pool task to stop. func (t *TaskController[T, U, C, CT, TF]) Wait() { - t.prodWg.Wait() t.wg.Wait() close(t.resultCh) t.pool.DeleteTask(t.taskID) @@ -161,9 +158,14 @@ func (t *TaskController[T, U, C, CT, TF]) Wait() { // Stop is to send stop command to the task. But you still need to wait the task to stop. func (t *TaskController[T, U, C, CT, TF]) Stop() { - close(t.productCloseCh) - channel.Clear(t.inputCh) + close(t.productExitCh) + // Clear all the task in the task queue and mark all task complete. + // so that ```t.Wait``` is able to close resultCh + for range t.inputCh { + t.wg.Done() + } t.pool.StopTask(t.TaskID()) + // Clear the resultCh to avoid blocking the consumer put result into the channel and cannot exit. channel.Clear(t.resultCh) } diff --git a/util/gpool/spmc/BUILD.bazel b/util/gpool/spmc/BUILD.bazel index 4f1ae6965cf0f..db4d724052666 100644 --- a/util/gpool/spmc/BUILD.bazel +++ b/util/gpool/spmc/BUILD.bazel @@ -34,7 +34,6 @@ go_test( embed = [":spmc"], flaky = True, race = "on", - shard_count = 2, deps = [ "//resourcemanager/pooltask", "//resourcemanager/util", diff --git a/util/gpool/spmc/option.go b/util/gpool/spmc/option.go index 8c86497860be9..af456e3c79772 100644 --- a/util/gpool/spmc/option.go +++ b/util/gpool/spmc/option.go @@ -18,6 +18,8 @@ import ( "time" ) +const defaultTaskChanLen = 1 + // Option represents the optional function. type Option func(opts *Options) @@ -104,7 +106,7 @@ func loadTaskOptions(options ...TaskOption) *TaskOptions { opts.ResultChanLen = uint64(opts.Concurrency) } if opts.TaskChanLen == 0 { - opts.TaskChanLen = uint64(opts.Concurrency) + opts.TaskChanLen = defaultTaskChanLen } return opts } diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index b434b4d4d945b..eaa10c3b9a53a 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -219,7 +219,6 @@ func (p *Pool[T, U, C, CT, TF]) release() { // There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent // those callers blocking infinitely. p.cond.Broadcast() - close(p.taskCh) } func isClose(exitCh chan struct{}) bool { @@ -258,11 +257,11 @@ func (p *Pool[T, U, C, CT, TF]) SetConsumerFunc(consumerFunc func(T, C, CT) U) { func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF]) { opt := loadTaskOptions(options...) taskID := p.NewTaskID() - var wg, prodWg sync.WaitGroup + var wg sync.WaitGroup result := make(chan U, opt.ResultChanLen) - productCloseCh := make(chan struct{}) + productExitCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productCloseCh, &wg, &prodWg, inputCh, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productExitCh, &wg, inputCh, result) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) for i := 0; i < opt.Concurrency; i++ { err := p.run() @@ -274,37 +273,34 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), p.taskManager.AddSubTask(taskID, &taskBox) p.taskCh <- &taskBox } - prodWg.Add(1) + wg.Add(1) go func() { defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("producer panic", zap.Any("recover", r), zap.Stack("stack")) } close(inputCh) - prodWg.Done() + wg.Done() }() for { - select { - case <-productCloseCh: + if isClose(productExitCh) { return - default: - tasks, err := producer() - if err != nil { - if errors.Is(err, gpool.ErrProducerClosed) { - return - } - log.Error("producer error", zap.Error(err)) + } + tasks, err := producer() + if err != nil { + if errors.Is(err, gpool.ErrProducerClosed) { return } - for _, task := range tasks { - wg.Add(1) - task := pooltask.Task[T]{ - Task: task, - } - inputCh <- task + log.Error("producer error", zap.Error(err)) + return + } + for _, task := range tasks { + wg.Add(1) + task := pooltask.Task[T]{ + Task: task, } + inputCh <- task } - } }() return result, tc @@ -315,12 +311,12 @@ func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF]) { opt := loadTaskOptions(options...) taskID := p.NewTaskID() - var wg, prodWg sync.WaitGroup + var wg sync.WaitGroup result := make(chan U, opt.ResultChanLen) - productCloseCh := make(chan struct{}) + productExitCh := make(chan struct{}) inputCh := make(chan pooltask.Task[T], opt.TaskChanLen) p.taskManager.RegisterTask(taskID, int32(opt.Concurrency)) - tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productCloseCh, &wg, &prodWg, inputCh, result) + tc := pooltask.NewTaskController[T, U, C, CT, TF](p, taskID, productExitCh, &wg, inputCh, result) for i := 0; i < opt.Concurrency; i++ { err := p.run() if err == gpool.ErrPoolClosed { @@ -331,34 +327,32 @@ func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg p.taskManager.AddSubTask(taskID, &taskBox) p.taskCh <- &taskBox } - prodWg.Add(1) + wg.Add(1) go func() { defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("producer panic", zap.Any("recover", r), zap.Stack("stack")) } close(inputCh) - prodWg.Done() + wg.Done() }() for { - select { - case <-productCloseCh: + if isClose(productExitCh) { return - default: - task, err := producer() - if err != nil { - if errors.Is(err, gpool.ErrProducerClosed) { - return - } - log.Error("producer error", zap.Error(err)) + } + task, err := producer() + if err != nil { + if errors.Is(err, gpool.ErrProducerClosed) { return } - wg.Add(1) - t := pooltask.Task[T]{ - Task: task, - } - inputCh <- t + log.Error("producer error", zap.Error(err)) + return + } + wg.Add(1) + t := pooltask.Task[T]{ + Task: task, } + inputCh <- t } }() return result, tc diff --git a/util/gpool/spmc/spmcpool_test.go b/util/gpool/spmc/spmcpool_test.go index 62387feb24aad..25fb62aaeb0ca 100644 --- a/util/gpool/spmc/spmcpool_test.go +++ b/util/gpool/spmc/spmcpool_test.go @@ -53,7 +53,7 @@ func TestPool(t *testing.T) { } } // add new task - resultCh, control := pool.AddProducer(pfunc, myArgs, pooltask.NilContext{}, WithConcurrency(4)) + resultCh, control := pool.AddProducer(pfunc, myArgs, pooltask.NilContext{}, WithConcurrency(5)) var count atomic.Uint32 var wg sync.WaitGroup @@ -112,8 +112,12 @@ func TestStopPool(t *testing.T) { require.Greater(t, result, 10) } }() + wg.Add(1) + go func() { + defer wg.Done() + control.Stop() + }() // Waiting task finishing - control.Stop() control.Wait() wg.Wait() // close pool @@ -152,10 +156,10 @@ func TestStopPoolWithSlice(t *testing.T) { defer wg.Done() for result := range resultCh { require.Greater(t, result, 10) + control.Stop() } }() // Waiting task finishing - control.Stop() control.Wait() wg.Wait() // close pool @@ -230,9 +234,12 @@ func testTunePool(t *testing.T, name string) { for n := pool.Cap(); n > 1; n-- { downclockPool(t, pool, tid) } - - // exit test - control.Stop() + wg.Add(1) + go func() { + // exit test + control.Stop() + wg.Done() + }() control.Wait() wg.Wait() // close pool