-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathretry_task.go
95 lines (82 loc) · 2.19 KB
/
retry_task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Copyright 2018 Changkun Ou. All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package tests
import (
"fmt"
"sync/atomic"
"time"
"unsafe"
)
// RetryTask implements task.Interface However it has no
// export field, which is not able to be scheduled by sched
type RetryTask struct {
RetryCount int64
MaxRetry int64
id string
execution time.Time
}
// NewRetryTask creates a task
func NewRetryTask(id string, e time.Time, maxRetry int64) *RetryTask {
return &RetryTask{
RetryCount: 0,
MaxRetry: maxRetry,
id: id,
execution: e,
}
}
// GetID get task id
func (t *RetryTask) GetID() (id string) {
id = t.id
return
}
// GetExecution get execution time
func (t *RetryTask) GetExecution() (execute time.Time) {
execute = t.execution
return
}
// GetTimeout get timeout of execution
func (t *RetryTask) GetTimeout() (executeTimeout time.Duration) {
return time.Millisecond
}
// GetRetryTime get retry execution duration
func (t *RetryTask) GetRetryTime() time.Time {
return time.Now().UTC().Add(time.Millisecond * 100)
}
// SetID sets the id of a task
func (t *RetryTask) SetID(id string) {
t.id = id
}
// IsValidID check id is valid
func (t *RetryTask) IsValidID() bool {
return true
}
// SetExecution sets the execution time of a task
func (t *RetryTask) SetExecution(current time.Time) time.Time {
var ptr = unsafe.Pointer(&t.execution)
var old unsafe.Pointer
// spin lock
for {
old = atomic.LoadPointer(&ptr)
if atomic.CompareAndSwapPointer(&ptr, old, unsafe.Pointer(¤t)) {
return *((*time.Time)(old))
}
}
}
// Execute is the actual execution block
func (t *RetryTask) Execute() (result interface{}, retry bool, fail error) {
if t.RetryCount > t.MaxRetry {
O.SetLast(time.Now().UTC())
return fmt.Sprintf(
"retry task %s, retry count: %d, tollerance: %v, last.",
t.id, t.RetryCount, time.Now().UTC().Sub(t.GetExecution())), false, nil
}
O.Push(t.id)
if O.IsFirstZero() {
O.SetFirst(time.Now().UTC())
}
atomic.AddInt64(&t.RetryCount, 1)
return fmt.Sprintf(
"retry task %s, retry count: %d. tollerance: %v",
t.id, t.RetryCount, time.Now().UTC().Sub(t.GetExecution())), true, nil
}