-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathqueuetask.go
147 lines (130 loc) · 3.33 KB
/
queuetask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package task
import (
"errors"
"fmt"
"time"
)
const (
DefaultQueueSize = 1000
)
type (
QueueTask struct {
TaskInfo
Interval int64 //运行间隔时间,单位毫秒,当TaskType==TaskType_Loop||TaskType_Queue时有效
MessageChan chan interface{}
}
)
//EnQueue enqueue value into message queue
func (task *QueueTask) EnQueue(value interface{}) {
task.MessageChan <- value
}
//Start start task
func (task *QueueTask) Start() {
if !task.IsRun {
return
}
if task.State == TaskState_Init || task.State == TaskState_Stop {
task.State = TaskState_Run
startQueueTask(task)
}
}
// RunOnce do task only once
func (task *QueueTask) RunOnce() error {
err := task.handler(task.getTaskContext())
return err
}
// GetConfig get task config info
func (task *QueueTask) GetConfig() *TaskConfig {
return &TaskConfig{
TaskID: task.taskID,
TaskType: task.TaskType,
IsRun: task.IsRun,
Handler: task.handler,
DueTime: task.DueTime,
Interval: 0,
Express: "",
TaskData: task.TaskData,
}
}
//Reset first check conf, then reload conf & restart task
func (task *QueueTask) Reset(conf *TaskConfig) error {
if conf.Interval <= 0 {
errmsg := "interval is wrong format => must bigger then zero"
task.taskService.Logger().Debug(fmt.Sprint("TaskInfo:Reset ", task, conf, "error", errmsg))
return errors.New(errmsg)
}
//restart task
task.Stop()
task.IsRun = conf.IsRun
if conf.TaskData != nil {
task.TaskData = conf.TaskData
}
if conf.Handler != nil {
task.handler = conf.Handler
}
task.Interval = conf.Interval
task.Start()
task.taskService.Logger().Debug(fmt.Sprint("TaskInfo:Reset ", task, conf, "success"))
return nil
}
// NewQueueTask create new queue task
func NewQueueTask(taskID string, isRun bool, interval int64, handler TaskHandle, taskData interface{}, queueSize int64) (Task, error) {
task := new(QueueTask)
task.initCounters()
task.taskID = taskID
task.TaskType = TaskType_Queue
task.IsRun = isRun
task.handler = handler
task.Interval = interval
task.State = TaskState_Init
task.TaskData = taskData
task.MessageChan = make(chan interface{}, queueSize)
return task, nil
}
//start queue task
func startQueueTask(task *QueueTask) {
taskCtx := task.getTaskContext()
handler := func() {
defer func() {
task.putTaskContext(taskCtx)
if err := recover(); err != nil {
task.CounterInfo().ErrorCounter.Inc(1)
if task.taskService.ExceptionHandler != nil {
task.taskService.ExceptionHandler(taskCtx, fmt.Errorf("%v", err))
}
}
}()
task.CounterInfo().RunCounter.Inc(1)
//get value from message chan
message := <-task.MessageChan
taskCtx.Message = message
if task.taskService != nil && task.taskService.OnBeforeHandler != nil {
task.taskService.OnBeforeHandler(taskCtx)
}
var err error
if !taskCtx.IsEnd {
err = task.handler(taskCtx)
}
if err != nil {
taskCtx.Error = err
task.CounterInfo().ErrorCounter.Inc(1)
if task.taskService != nil && task.taskService.ExceptionHandler != nil {
task.taskService.ExceptionHandler(taskCtx, err)
}
}
if task.taskService != nil && task.taskService.OnEndHandler != nil {
task.taskService.OnEndHandler(taskCtx)
}
}
dofunc := func() {
task.TimeTicker = time.NewTicker(time.Duration(task.Interval) * time.Millisecond)
handler()
for {
select {
case <-task.TimeTicker.C:
handler()
}
}
}
go dofunc()
}