Skip to content

Commit

Permalink
add a flexible wg
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Sep 20, 2023
1 parent 24fffdf commit 0367c5b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

Expand All @@ -40,7 +41,7 @@ var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues
// Controller is used to manage all schedulers.
type Controller struct {
sync.RWMutex
wg sync.WaitGroup
wg *syncutil.FlexibleWaitGroup
ctx context.Context
cluster sche.SchedulerCluster
storage endpoint.ConfigStorage
Expand All @@ -57,6 +58,7 @@ type Controller struct {
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller {
return &Controller{
ctx: ctx,
wg: syncutil.NewFlexibleWaitGroup(),
cluster: cluster,
storage: storage,
schedulers: make(map[string]*ScheduleController),
Expand Down
60 changes: 60 additions & 0 deletions pkg/utils/syncutil/flexible_wait_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncutil

import (
"sync"
)

// FlexibleWaitGroup is a flexible wait group.
// Note: we can't use sync.WaitGroup because it doesn't support to call `Add` after `Wait` finished.
type FlexibleWaitGroup struct {
sync.Mutex
count int
cond *sync.Cond
}

// NewFlexibleWaitGroup creates a FlexibleWaitGroup.
func NewFlexibleWaitGroup() *FlexibleWaitGroup {
dwg := &FlexibleWaitGroup{}
dwg.cond = sync.NewCond(&dwg.Mutex)
return dwg
}

// Add adds delta, which may be negative, to the FlexibleWaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
func (fwg *FlexibleWaitGroup) Add(delta int) {
fwg.Lock()
defer fwg.Unlock()

fwg.count += delta
if fwg.count <= 0 {
fwg.cond.Broadcast()
}
}

// Done decrements the FlexibleWaitGroup counter.
func (fwg *FlexibleWaitGroup) Done() {
fwg.Add(-1)
}

// Wait blocks until the FlexibleWaitGroup counter is zero.
func (fwg *FlexibleWaitGroup) Wait() {
fwg.Lock()
for fwg.count > 0 {
fwg.cond.Wait()
}
fwg.Unlock()
}
68 changes: 68 additions & 0 deletions pkg/utils/syncutil/flexible_wait_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncutil

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestFlexibleWaitGroup(t *testing.T) {
re := require.New(t)
fwg := NewFlexibleWaitGroup()
for i := 20; i >= 0; i-- {
fwg.Add(1)
go func(i int) {
defer fwg.Done()
time.Sleep(time.Millisecond * time.Duration(i*50))
}(i)
}
now := time.Now()
fwg.Wait()
re.GreaterOrEqual(time.Since(now).Milliseconds(), int64(1000))
}

func TestAddAfterWait(t *testing.T) {
fwg := NewFlexibleWaitGroup()
startWait := make(chan struct{})
addTwice := make(chan struct{})
done := make(chan struct{})

// First goroutine: Adds a task, then waits for the second task to be added before finishing.
go func() {
defer fwg.Done()
fwg.Add(1)
<-addTwice
}()

// Second goroutine: adds a second task after ensure the third goroutine has started to wait
// and triggers the first goroutine to finish.
go func() {
defer fwg.Done()
<-startWait
fwg.Add(1)
addTwice <- struct{}{}
}()

// Third goroutine: waits for all tasks to be added, then finishes.
go func() {
startWait <- struct{}{}
fwg.Wait()
done <- struct{}{}
}()
<-done
}

0 comments on commit 0367c5b

Please sign in to comment.