-
Notifications
You must be signed in to change notification settings - Fork 0
/
fly.go
234 lines (207 loc) · 6.87 KB
/
fly.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
// Package fly -----------------------------
// @author : deng zhi
// @time : 2023/3/31 10:01
// graceful shutdown server
// -------------------------------------------
package fly
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sort"
"sync"
"sync/atomic"
"syscall"
"time"
)
// shutdownPollIntervalMax is the max polling interval when checking
// quiescence during Server.Shutdown. Polling starts with a small
// interval and backs off to the max.
// Ideally we could find a solution that doesn't involve polling,
// but which also doesn't have a high runtime cost (and doesn't
// involve any contentious mutexes), but that is left as an
// exercise for the reader.
const shutdownPollIntervalMax = 500 * time.Millisecond
var logger Logger
var defaultLog = log.New(os.Stdout, "gracefulCloser: ", log.LstdFlags)
// ShutdownSignals receives shutdown signals to process
var ShutdownSignals = []os.Signal{
os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP,
syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP,
syscall.SIGABRT, syscall.SIGSYS, syscall.SIGTERM,
}
type Logger interface {
Print(info string)
}
func CustomLog(customLogger Logger) {
logger = customLogger
}
type GracefulCloser struct {
closeFuncChain map[int][]func(ctx context.Context)
levelAfterClosedWait map[int]time.Duration
timeout time.Duration
mutex sync.Mutex
quit chan os.Signal // quit chan os.Signal
}
// NewAndMonitor new a gracefulCloser, and monitor it, timeout: wait timeout for graceful close, stop signal to monitor
func NewAndMonitor(timeout time.Duration, sig ...os.Signal) *GracefulCloser {
if timeout == 0 {
timeout = 30 * time.Second
}
gracefulCloser := &GracefulCloser{
closeFuncChain: make(map[int][]func(ctx context.Context)),
quit: make(chan os.Signal, 1),
levelAfterClosedWait: make(map[int]time.Duration),
timeout: timeout,
}
if len(sig) == 0 {
sig = ShutdownSignals
}
signal.Notify(gracefulCloser.quit, sig...)
return gracefulCloser
}
// AddCloser Add closer, closerCoequal is a set of functions with ctx context.Context parameters,
// which needs to be implemented by the user.
// It is usually necessary to implement operations such as closing resources in this function.
// The ctx parameter is used to monitor the super. The user can implement monitoring by himself or not,
// because gracefulCloser will implement a global monitoring,
// @see doShutdown(closeFunS []func (ctx context.Context), ctx context.Context)
// level param represents the priority level of this closer, the smaller the value,
// the higher the priority, and the earlier it will be executed,
// which can solve the scenario when there are interdependent resources for closing
func (closer *GracefulCloser) AddCloser(level int, closerCoequal ...func(ctx context.Context)) *GracefulCloser {
if closerCoequal == nil {
// TODO log
return closer
}
closer.mutex.Lock()
defer closer.mutex.Unlock()
if value, exists := closer.closeFuncChain[level]; exists {
closer.closeFuncChain[level] = append(value, closerCoequal...)
} else {
closer.closeFuncChain[level] = closerCoequal
}
return closer
}
func (closer *GracefulCloser) AddCloserLevelWait(level int, levelWait time.Duration, closerCoequal ...func(ctx context.Context)) *GracefulCloser {
if closerCoequal == nil {
// TODO log
return closer
}
closer.mutex.Lock()
defer closer.mutex.Unlock()
if value, exists := closer.closeFuncChain[level]; exists {
closer.closeFuncChain[level] = append(value, closerCoequal...)
} else {
closer.closeFuncChain[level] = closerCoequal
}
closer.levelAfterClosedWait[level] = levelWait
return closer
}
// SoftLanding start graceful shutdown
func (closer *GracefulCloser) SoftLanding(done chan<- bool) {
quit := closer.quit
if quit == nil {
panic("Please call the MonitorSignal method to monitor the signal")
}
if len(closer.closeFuncChain) == 0 {
// TODO log closeFuncChain 为空 没有可关闭的函数
return
}
go func() {
<-quit
if logger == nil {
defaultLog.Printf("Server is shutting down...")
} else {
logger.Print("Server is shutting down...")
}
// Start calling the closer function
ctx, cancel := context.WithTimeout(context.Background(), closer.timeout)
defer cancel()
// Sort according to level, the one with higher priority will be executed first
levels := sortLevel(closer.closeFuncChain)
// The smaller the level value, the higher the priority, and execute it first
for _, level := range levels {
if logger == nil {
defaultLog.Printf("Start to execute closeFunc of group %v", level)
} else {
logger.Print(fmt.Sprintf("Start to execute closeFunc of group %v", level))
}
// execute level closer
doShutdown(closer.closeFuncChain[level], ctx)
// wait some time and then execute next level closer
if waitTime, ok := closer.levelAfterClosedWait[level]; ok {
time.Sleep(waitTime)
}
}
close(done)
}()
}
// Sort according to level, the one with higher priority will be executed first
func sortLevel(closeFuncChain map[int][]func(ctx context.Context)) []int {
levels := make([]int, len(closeFuncChain))
i := 0
for key := range closeFuncChain {
levels[i] = key
i++
}
sort.Ints(levels)
return levels
}
// Execute the close operation and execute closeFunS concurrently
func doShutdown(closeFunS []func(ctx context.Context), ctx context.Context) {
// used to wait for the program to complete
doneCount := atomic.Int32{}
// Count initialization, which means to wait for len(closeFunS) goroutines
doneCount.Store(int32(len(closeFunS)))
for _, closeFun := range closeFunS {
fun := closeFun
go func() {
// Call Done when the function exits to notify the parent that the work is done
defer func() {
if err := recover(); err != nil {
if logger == nil {
defaultLog.Printf("closeFunc execute panic %v", err)
} else {
logger.Print(fmt.Sprintf("closeFunc execute panic:%v", err))
}
}
doneCount.Add(-1)
}()
fun(ctx)
}()
}
// Detect context timeout
pollIntervalBase := time.Millisecond
timer := time.NewTimer(nextPollInterval(&pollIntervalBase))
defer timer.Stop()
for doneCount.Load() > 0 {
select {
case <-ctx.Done():
doLog("Graceful shutdown timeout, forced to end: %v", ctx.Err())
case <-timer.C:
timer.Reset(nextPollInterval(&pollIntervalBase))
}
}
}
// Rolling timer interval
func nextPollInterval(pollIntervalBase *time.Duration) time.Duration {
// Add 10% jitter.
interval := *pollIntervalBase + time.Duration(rand.Intn(int(*pollIntervalBase/10)))
// Double and clamp for next time.
*pollIntervalBase *= 2
if *pollIntervalBase > shutdownPollIntervalMax {
*pollIntervalBase = shutdownPollIntervalMax
}
return interval
}
func doLog(format string, param ...any) {
if logger == nil {
defaultLog.Printf(format, param...)
} else {
logger.Print(fmt.Sprintf(format, param...))
}
}