-
Notifications
You must be signed in to change notification settings - Fork 1
/
multi_handler_dispatcher.go
337 lines (272 loc) · 8.63 KB
/
multi_handler_dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package flowprocess
import (
sj "github.com/guyannanfei25/go-simplejson"
"fmt"
"sync"
"sync/atomic"
"time"
"log"
"os"
"errors"
)
var (
ERRNONEEDPASSDOWN = errors.New("no need to pass down")
)
// 配置文件从ini改为json,json能够更好的处理复杂的层级关系
// http://www.guyannanfei25.site/2017/05/14/misc-talk/
type MultiHandlerDispatcher interface {
Init(conf *sj.Json) error
SetName(name string)
GetName() string
// 只有需要级联的dispatch才需要Register接口
// 注册下游dispatcher
DownRegister(d MultiHandlerDispatcher)
// 注册上游dispatcher
UpRegister(d MultiHandlerDispatcher)
// 注册handler生成函数
// Must before Start
RegisterHandlerCreator(c HandlerCreator)
// 开始运行,在注册好dispatcher并设置好预处理函数之后
Start()
// 是否正在运行
IsRunning() bool
Dispatch(item interface{})
Ack(result interface{}) error
// 自定义logger
SetLogger(l logger, lvl LogLevel)
getLogger() (logger, LogLevel)
// 一些定时任务,比如打印状态,更新缓存。。。
Tick()
Close()
}
type Handler interface {
// id for record which goroutine it stays
Init(conf *sj.Json, id int) error
Process(interface{}) (interface{}, error)
Tick()
Close() error
}
type HandlerCreator func() Handler
type DefaultMultiHandlerDispatcher struct {
name string
// 保存配置,以便用户需要获取相应参数
Conf *sj.Json
// 改为map用于去重
downDispatchers map[MultiHandlerDispatcher]int
upDispatchers map[MultiHandlerDispatcher]int
msgChan chan interface{} // *item
msgMaxSize int // 最大缓存大小
concurrency int // 并发执行数
chanCount uint32 // 当前协程数
running bool // 是否正在运行
cmdChans []chan struct{} // 目前仅仅发送 tick 命令
// handler生成函数
creator HandlerCreator
handlers []Handler
wg sync.WaitGroup
// 自定义logger
logger logger
logLvl LogLevel
logGuard sync.RWMutex
}
func (d *DefaultMultiHandlerDispatcher) Init(conf *sj.Json) error {
d.Conf = conf
d.concurrency = conf.Get("concurrency").MustInt(1)
if d.concurrency < 1 {
d.concurrency = 1
}
d.msgMaxSize = conf.Get("msgMaxSize").MustInt(0)
if d.msgMaxSize < 0 {
d.msgMaxSize = 0 // 无缓存
}
d.msgChan = make(chan interface{}, d.msgMaxSize)
d.cmdChans = make([]chan struct{}, d.concurrency)
d.name = conf.Get("name").MustString("DefaultMultiHandlerDispatcher")
d.downDispatchers = make(map[MultiHandlerDispatcher]int)
d.upDispatchers = make(map[MultiHandlerDispatcher]int)
d.handlers = make([]Handler, d.concurrency)
d.chanCount = 0
d.creator = nil
d.running = false
d.logger = log.New(os.Stderr, "", log.Flags())
d.logLvl = LogLevelInfo
return nil
}
func (d *DefaultMultiHandlerDispatcher) Start() {
if d.running {
d.logf(LogLevelInfo, " is already running!\n")
return
}
for i := 0; i < d.concurrency; i++ {
go d.process(i)
}
// Make sure down and up dispatcher running
// 由上一级 dispatcher 启动所有下一级
for down, _ := range d.downDispatchers {
down.Start()
// to ensure downregister start success
for {
if !down.IsRunning() {
time.Sleep(time.Second)
} else {
break
}
}
}
d.running = true
d.logf(LogLevelInfo, " start running!\n")
}
func (d *DefaultMultiHandlerDispatcher) IsRunning() bool {
return d.running
}
func (d *DefaultMultiHandlerDispatcher) SetName(name string) {
d.name = name
}
func (d *DefaultMultiHandlerDispatcher) GetName() string {
return d.name
}
func (d *DefaultMultiHandlerDispatcher) SetLogger(l logger, lvl LogLevel) {
d.logGuard.Lock()
defer d.logGuard.Unlock()
d.logger = l
d.logLvl = lvl
}
func (d *DefaultMultiHandlerDispatcher) getLogger() (logger, LogLevel) {
d.logGuard.Lock()
defer d.logGuard.Unlock()
return d.logger, d.logLvl
}
func (d *DefaultMultiHandlerDispatcher) DownRegister(down MultiHandlerDispatcher) {
d.downDispatchers[down] = 1
down.UpRegister(d)
}
func (d *DefaultMultiHandlerDispatcher) UpRegister(up MultiHandlerDispatcher) {
d.upDispatchers[up] = 1
}
// allow creator == nil, this MultiHandlerDispatcher use just for forward
func (d *DefaultMultiHandlerDispatcher) RegisterHandlerCreator (c HandlerCreator) {
d.creator = c
}
func (d *DefaultMultiHandlerDispatcher) process(id int) {
d.wg.Add(1)
defer d.wg.Done()
atomic.AddUint32(&d.chanCount, 1)
var innerHandler Handler
if d.creator != nil {
// 每个工作协程一个handler对象,可以不用考虑多协程不安全问题
innerHandler = d.creator()
d.handlers[id] = innerHandler
// init handler, if err then panic
if err := innerHandler.Init(d.Conf, id); err != nil {
d.logf(LogLevelFatal, "%s %dth handler init err[%s]\n", d.name, id, err)
}
defer innerHandler.Close()
}
// init cmd chan
d.cmdChans[id] = make(chan struct{}, 10)
d.logf(LogLevelInfo, " %dth process starting...\n", id)
PROCESS_MAIN:
for {
select {
case item, ok := <- d.msgChan:
if !ok {
break PROCESS_MAIN
}
// 经过用户业务处理之后的返回值,可以简简单单返回item自身
var ret interface{}
var err error
if d.creator != nil {
// handler handle item, if no need to pass down and no err happen just return ERRNONEEDPASSDOWN
if ret, err = innerHandler.Process(item); err != nil {
if err != ERRNONEEDPASSDOWN {
d.logf(LogLevelError, "%s %dth process item[%v] err[%s]\n",
d.name, id, item, err)
}
continue
}
} else {
// just for forward
ret = item
}
for sub, _ := range d.downDispatchers {
sub.Dispatch(ret)
}
// 防止并发调用 Tick() 函数,与单线程初衷违背
case <- d.cmdChans[id]:
if innerHandler != nil {
innerHandler.Tick()
}
}
}
d.logf(LogLevelInfo, " %dth process quiting...\n", id)
atomic.AddUint32(&d.chanCount, ^uint32(0))
}
func (d *DefaultMultiHandlerDispatcher) Dispatch(item interface{}) {
d.msgChan <- item
}
func (d *DefaultMultiHandlerDispatcher) Ack(result interface{}) error {
// TODO:是否合理???
// DefaultMultiHandlerDispatcher不应该调用上游Dispatcher,应由继承者来实现
// for up, _ := range d.upDispatchers {
// up.Ack()
// }
return nil
}
// 一些定时任务,比如打印状态,更新缓存。。。
// 使用时建议使用一个单独协程,
// 由首个dispatcher调用,触犯后面dispatcher链
// framework := new(DefaultMultiHandlerDispatcher)
// go func(){
// tick := time.NewTicker(time.Duration(20) * time.Second)
// for {
// <- tick.C
// framework.Tick()
// }
// }()
// defer tick.Stop()
func (d *DefaultMultiHandlerDispatcher) Tick() {
// call every handler Tick
// if d.creator != nil {
// for _, h := range d.handlers {
// h.Tick()
// }
// }
for _, cmdChan := range d.cmdChans {
cmdChan <- struct{}{}
}
for sub, _ := range d.downDispatchers {
sub.Tick()
}
}
func (d *DefaultMultiHandlerDispatcher) logf(lvl LogLevel, line string, args ...interface{}) {
logger, logLvl := d.getLogger()
if logger == nil {
return
}
if logLvl > lvl {
return
}
logger.Output(2, fmt.Sprintf("[%-7s %s] %s", lvl, d.GetName(), fmt.Sprintf(line, args...)))
if logLvl == LogLevelFatal {
os.Exit(1)
}
}
func (d *DefaultMultiHandlerDispatcher) Close() {
close(d.msgChan)
d.wg.Wait()
d.running = false
// 关闭下游dispatcher
for sub, _ := range d.downDispatchers {
if sub.IsRunning() {
sub.Close()
}
for {
if sub.IsRunning() {
time.Sleep(time.Second)
} else {
break
}
}
}
d.logf(LogLevelInfo, " exit Success\n")
}