-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathworkerpool.go
148 lines (130 loc) · 2.49 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package xwp
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
// JobQueue 任务队列
type JobQueue chan interface{}
// RunI
type RunI interface {
Do(data interface{})
}
// WorkerPool 调度器
type WorkerPool struct {
JobQueue JobQueue
// default == runtime.NumCPU()
MaxWorkers int
// default == MaxWorkers
InitWorkers int
// default == InitWorkers
MaxIdleWorkers int
RunF func(data interface{})
RunI RunI
workers *sync.Map
workerCount int64
workerQueuePool chan JobQueue
wg *sync.WaitGroup
quit chan bool
}
// Run 执行
func (t *WorkerPool) Run() {
t.Start()
t.Wait()
}
// init
func (t *WorkerPool) init() {
if t.MaxWorkers == 0 {
t.MaxWorkers = runtime.NumCPU()
}
if t.InitWorkers == 0 {
t.InitWorkers = t.MaxWorkers
}
if t.MaxIdleWorkers == 0 {
t.MaxIdleWorkers = t.InitWorkers
}
t.workers = &sync.Map{}
t.workerQueuePool = make(chan JobQueue, t.MaxIdleWorkers)
t.wg = &sync.WaitGroup{}
t.quit = make(chan bool)
if t.RunF == nil && t.RunI == nil {
panic(fmt.Errorf("xwp.WorkerPool RunF & RunI field is empty"))
}
}
// Start 启动
func (t *WorkerPool) Start() {
t.init()
for i := 0; i < t.InitWorkers; i++ {
NewWorker(t).Run()
}
go func() {
timer := time.NewTimer(time.Millisecond)
timer.Stop()
for {
select {
case data := <-t.JobQueue:
if data == nil {
t.workers.Range(func(key, value interface{}) bool {
w := value.(*Worker)
w.Stop()
return true
})
return
}
func() {
for {
select {
case ch := <-t.workerQueuePool:
ch <- data
default:
if atomic.LoadInt64(&t.workerCount) < int64(t.MaxWorkers) {
NewWorker(t).Run()
continue
} else {
// 设定时间的监听
timer.Reset(10 * time.Millisecond)
select {
case ch := <-t.workerQueuePool:
timer.Stop()
ch <- data
case <-timer.C:
continue
}
}
}
return
}
}()
case <-t.quit:
close(t.JobQueue)
}
}
}()
}
// Stop 停止
func (t *WorkerPool) Stop() {
go func() {
t.quit <- true
}()
}
// Wait 等待执行完成
func (t *WorkerPool) Wait() {
t.wg.Wait()
}
type Statistic struct {
Active int `json:"active"`
Idle int `json:"idle"`
Total int `json:"total"`
}
// Stats 统计
func (t *WorkerPool) Stats() *Statistic {
total := int(t.workerCount)
idle := len(t.workerQueuePool)
return &Statistic{
Active: total - idle,
Idle: idle,
Total: total,
}
}