diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 4d72699b0fe..58cb802d629 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -30,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" ) @@ -40,7 +41,7 @@ var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues // Controller is used to manage all schedulers. type Controller struct { sync.RWMutex - wg sync.WaitGroup + wg *syncutil.FlexibleWaitGroup ctx context.Context cluster sche.SchedulerCluster storage endpoint.ConfigStorage @@ -57,6 +58,7 @@ type Controller struct { func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller { return &Controller{ ctx: ctx, + wg: syncutil.NewFlexibleWaitGroup(), cluster: cluster, storage: storage, schedulers: make(map[string]*ScheduleController), diff --git a/pkg/utils/syncutil/flexible_wait_group.go b/pkg/utils/syncutil/flexible_wait_group.go new file mode 100644 index 00000000000..3ee602d7624 --- /dev/null +++ b/pkg/utils/syncutil/flexible_wait_group.go @@ -0,0 +1,60 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncutil + +import ( + "sync" +) + +// FlexibleWaitGroup is a flexible wait group. +// Note: we can't use sync.WaitGroup because it doesn't support to call `Add` after `Wait` finished. +type FlexibleWaitGroup struct { + sync.Mutex + count int + cond *sync.Cond +} + +// NewFlexibleWaitGroup creates a FlexibleWaitGroup. +func NewFlexibleWaitGroup() *FlexibleWaitGroup { + dwg := &FlexibleWaitGroup{} + dwg.cond = sync.NewCond(&dwg.Mutex) + return dwg +} + +// Add adds delta, which may be negative, to the FlexibleWaitGroup counter. +// If the counter becomes zero, all goroutines blocked on Wait are released. +func (fwg *FlexibleWaitGroup) Add(delta int) { + fwg.Lock() + defer fwg.Unlock() + + fwg.count += delta + if fwg.count <= 0 { + fwg.cond.Broadcast() + } +} + +// Done decrements the FlexibleWaitGroup counter. +func (fwg *FlexibleWaitGroup) Done() { + fwg.Add(-1) +} + +// Wait blocks until the FlexibleWaitGroup counter is zero. +func (fwg *FlexibleWaitGroup) Wait() { + fwg.Lock() + for fwg.count > 0 { + fwg.cond.Wait() + } + fwg.Unlock() +} diff --git a/pkg/utils/syncutil/flexible_wait_group_test.go b/pkg/utils/syncutil/flexible_wait_group_test.go new file mode 100644 index 00000000000..e98074cf55d --- /dev/null +++ b/pkg/utils/syncutil/flexible_wait_group_test.go @@ -0,0 +1,68 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncutil + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestFlexibleWaitGroup(t *testing.T) { + re := require.New(t) + fwg := NewFlexibleWaitGroup() + for i := 20; i >= 0; i-- { + fwg.Add(1) + go func(i int) { + defer fwg.Done() + time.Sleep(time.Millisecond * time.Duration(i*50)) + }(i) + } + now := time.Now() + fwg.Wait() + re.GreaterOrEqual(time.Since(now).Milliseconds(), int64(1000)) +} + +func TestAddAfterWait(t *testing.T) { + fwg := NewFlexibleWaitGroup() + startWait := make(chan struct{}) + addTwice := make(chan struct{}) + done := make(chan struct{}) + + // First goroutine: Adds a task, then waits for the second task to be added before finishing. + go func() { + defer fwg.Done() + fwg.Add(1) + <-addTwice + }() + + // Second goroutine: adds a second task after ensure the third goroutine has started to wait + // and triggers the first goroutine to finish. + go func() { + defer fwg.Done() + <-startWait + fwg.Add(1) + addTwice <- struct{}{} + }() + + // Third goroutine: waits for all tasks to be added, then finishes. + go func() { + startWait <- struct{}{} + fwg.Wait() + done <- struct{}{} + }() + <-done +}