Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: gorotinue pool #39872

Merged
merged 40 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
087d18e
util: gorotinue pool
hawkingrei Dec 13, 2022
6e37a0f
util: gorotinue pool
hawkingrei Dec 13, 2022
bd7e2b1
util: gorotinue pool
hawkingrei Dec 13, 2022
178f653
util: gorotinue pool
hawkingrei Dec 13, 2022
1ad4d6b
Update util/gpool/gpool.go
hawkingrei Dec 19, 2022
13d829a
remove useless function
hawkingrei Dec 19, 2022
e52db31
remove useless function
hawkingrei Dec 19, 2022
10b8f02
remove useless function
hawkingrei Dec 19, 2022
2e2ee3f
update
hawkingrei Dec 21, 2022
476e024
update
hawkingrei Dec 21, 2022
f66c2d8
update
hawkingrei Dec 21, 2022
38b3d52
update
hawkingrei Dec 21, 2022
4820b68
update
hawkingrei Dec 21, 2022
477c016
update
hawkingrei Dec 21, 2022
adece54
Update util/gpool/spmc/worker_loop_queue.go
hawkingrei Dec 21, 2022
72bad05
improve code
hawkingrei Dec 22, 2022
ec7afaf
improve code
hawkingrei Dec 26, 2022
e3e9558
improve code
hawkingrei Dec 26, 2022
f8e9516
improve code
hawkingrei Dec 26, 2022
43df89f
improve code
hawkingrei Dec 26, 2022
08ce0aa
improve code
hawkingrei Dec 26, 2022
6389546
improve code
hawkingrei Dec 29, 2022
47cd1c6
improve code
hawkingrei Dec 29, 2022
d88ecd9
improve tes code
hawkingrei Dec 29, 2022
b726a7e
improve tes code
hawkingrei Dec 29, 2022
5817d6a
improve tes code
hawkingrei Dec 29, 2022
fb55887
improve tes code
hawkingrei Dec 29, 2022
19f167c
improve tes code
hawkingrei Dec 29, 2022
a38397c
improve tes code
hawkingrei Dec 29, 2022
b0501bb
improve tes code
hawkingrei Dec 29, 2022
c1704e1
improve tes code
hawkingrei Dec 30, 2022
9cedd5f
improve tes code
hawkingrei Dec 30, 2022
728e38a
fix bug and make test stable
hawkingrei Jan 3, 2023
15a4a5b
fix bug and make test stable
hawkingrei Jan 3, 2023
fe1e900
fix bug and make test stable
hawkingrei Jan 3, 2023
da506e4
fix bug that nil in the taskBoxCh
hawkingrei Jan 3, 2023
5814339
rename test name
hawkingrei Jan 3, 2023
f62832a
fix exit bug
hawkingrei Jan 3, 2023
b29e662
simple code
hawkingrei Jan 3, 2023
26fa2e8
Merge branch 'master' into add_simple_pool
ti-chi-bot Jan 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions resourcemanager/pooltask/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "pooltask",
srcs = ["task.go"],
importpath = "github.com/pingcap/tidb/resourcemanager/pooltask",
visibility = ["//visibility:public"],
)
132 changes: 132 additions & 0 deletions resourcemanager/pooltask/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pooltask

import (
"sync"
)

// Context is a interface that can be used to create a context.
type Context[T any] interface {
GetContext() T
}

// NilContext is to create a nil as context
type NilContext struct{}

// GetContext is to get a nil as context
func (NilContext) GetContext() any {
return nil
}

// TaskBox is a box which contains all info about pooltask.
type TaskBox[T any, U any, C any, CT any, TF Context[CT]] struct {
constArgs C
contextFunc TF
wg *sync.WaitGroup
task chan Task[T]
resultCh chan U
taskID uint64
}

// NewTaskBox is to create a task box for pool.
func NewTaskBox[T any, U any, C any, CT any, TF Context[CT]](constArgs C, contextFunc TF, wg *sync.WaitGroup, taskCh chan Task[T], resultCh chan U, taskID uint64) TaskBox[T, U, C, CT, TF] {
return TaskBox[T, U, C, CT, TF]{
constArgs: constArgs,
contextFunc: contextFunc,
wg: wg,
task: taskCh,
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
resultCh: resultCh,
taskID: taskID,
}
}

// TaskID is to get the task id.
func (t TaskBox[T, U, C, CT, TF]) TaskID() uint64 {
return t.taskID
}

// ConstArgs is to get the const args.
func (t *TaskBox[T, U, C, CT, TF]) ConstArgs() C {
return t.constArgs
}

// GetTaskCh is to get the task channel.
func (t *TaskBox[T, U, C, CT, TF]) GetTaskCh() chan Task[T] {
return t.task
}

// GetResultCh is to get result channel
func (t *TaskBox[T, U, C, CT, TF]) GetResultCh() chan U {
return t.resultCh
}

// GetContextFunc is to get context func.
func (t *TaskBox[T, U, C, CT, TF]) GetContextFunc() TF {
return t.contextFunc
}

// Done is to set the pooltask status to complete.
func (t *TaskBox[T, U, C, CT, TF]) Done() {
t.wg.Done()
}

// Clone is to copy the box
func (t *TaskBox[T, U, C, CT, TF]) Clone() *TaskBox[T, U, C, CT, TF] {
newBox := NewTaskBox[T, U, C, CT, TF](t.constArgs, t.contextFunc, t.wg, t.task, t.resultCh, t.taskID)
return &newBox
}

// GPool is a goroutine pool.
type GPool[T any, U any, C any, CT any, TF Context[CT]] interface {
Tune(size int)
}

// TaskController is a controller that can control or watch the pool.
type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct {
pool GPool[T, U, C, CT, TF]
close chan struct{}
wg *sync.WaitGroup
taskID uint64
resultCh chan U
}

// NewTaskController create a controller to deal with pooltask's status.
func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, closeCh chan struct{}, wg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] {
return TaskController[T, U, C, CT, TF]{
pool: p,
taskID: taskID,
close: closeCh,
wg: wg,
resultCh: resultCh,
}
}

// Wait is to wait the pool task to stop.
func (t *TaskController[T, U, C, CT, TF]) Wait() {
<-t.close
t.wg.Wait()
close(t.resultCh)
}

// TaskID is to get the task id.
func (t *TaskController[T, U, C, CT, TF]) TaskID() uint64 {
return t.taskID
}

// Task is a task that can be executed.
type Task[T any] struct {
Task T
}
11 changes: 11 additions & 0 deletions util/gpool/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "gpool",
srcs = [
"gpool.go",
"spinlock.go",
],
importpath = "github.com/pingcap/tidb/util/gpool",
visibility = ["//visibility:public"],
)
69 changes: 69 additions & 0 deletions util/gpool/gpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gpool

import (
"errors"
"sync/atomic"
"time"
)

const (
// DefaultCleanIntervalTime is the interval time to clean up goroutines.
DefaultCleanIntervalTime = 5 * time.Second

// OPENED represents that the pool is opened.
OPENED = iota

// CLOSED represents that the pool is closed.
CLOSED
)

var (
// ErrPoolClosed will be returned when submitting task to a closed pool.
ErrPoolClosed = errors.New("this pool has been closed")

// ErrPoolOverload will be returned when the pool is full and no workers available.
ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")

// ErrProducerClosed will be returned when the producer is closed.
ErrProducerClosed = errors.New("this producer has been closed")
)

// BasePool is base class of pool
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
type BasePool struct {
name string
generator atomic.Uint64
}

// NewBasePool is to create a new BasePool.
func NewBasePool() BasePool {
return BasePool{}
}

// SetName is to set name.
func (p *BasePool) SetName(name string) {
p.name = name
}

// Name is to get name.
func (p *BasePool) Name() string {
return p.name
}

// NewTaskID is to get a new task ID.
func (p *BasePool) NewTaskID() uint64 {
return p.generator.Add(1)
}
47 changes: 47 additions & 0 deletions util/gpool/spinlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gpool

import (
"runtime"
"sync"
"sync/atomic"
)

type spinLock uint32

const maxBackoff = 16

func (sl *spinLock) Lock() {
backoff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
// Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff.
for i := 0; i < backoff; i++ {
runtime.Gosched()
}
if backoff < maxBackoff {
backoff <<= 1
}
}
}

func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}

// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
return new(spinLock)
}
43 changes: 43 additions & 0 deletions util/gpool/spmc/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "spmc",
srcs = [
"option.go",
"spmcpool.go",
"worker.go",
"worker_loop_queue.go",
],
importpath = "github.com/pingcap/tidb/util/gpool/spmc",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager/pooltask",
"//util/gpool",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "spmc_test",
srcs = [
"main_test.go",
"spmcpool_benchmark_test.go",
"spmcpool_test.go",
"worker_loop_queue_test.go",
],
embed = [":spmc"],
race = "on",
deps = [
"//resourcemanager/pooltask",
"//testkit/testsetup",
"//util",
"//util/gpool",
"@com_github_stretchr_testify//require",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_goleak//:goleak",
],
)
27 changes: 27 additions & 0 deletions util/gpool/spmc/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package spmc

import (
"testing"

"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m)
}
Loading