66package graceful
77
88import (
9+ "container/list"
910 "crypto/tls"
1011 "net"
1112 "os"
1213 "strings"
1314 "sync"
14- "sync/atomic"
1515 "syscall"
1616 "time"
1717
@@ -30,14 +30,15 @@ type ServeFunction = func(net.Listener) error
3030
3131// Server represents our graceful server
3232type Server struct {
33- network string
34- address string
35- listener net.Listener
36- wg sync.WaitGroup
37- state state
38- lock * sync.RWMutex
39- connections map [* wrappedConn ]struct {}
40- connectionsLock sync.RWMutex
33+ network string
34+ address string
35+ listener net.Listener
36+
37+ lock sync.RWMutex
38+ state state
39+ connList * list.List
40+ connEmptyCond * sync.Cond
41+
4142 BeforeBegin func (network , address string )
4243 OnShutdown func ()
4344 PerWriteTimeout time.Duration
@@ -52,15 +53,14 @@ func NewServer(network, address, name string) *Server {
5253 log .Info ("Starting new %s server: %s:%s on PID: %d" , name , network , address , os .Getpid ())
5354 }
5455 srv := & Server {
55- wg : sync.WaitGroup {},
5656 state : stateInit ,
57- lock : & sync.RWMutex {},
58- connections : make (map [* wrappedConn ]struct {}),
57+ connList : list .New (),
5958 network : network ,
6059 address : address ,
6160 PerWriteTimeout : setting .PerWriteTimeout ,
6261 PerWritePerKbTimeout : setting .PerWritePerKbTimeout ,
6362 }
63+ srv .connEmptyCond = sync .NewCond (& srv .lock )
6464
6565 srv .BeforeBegin = func (network , addr string ) {
6666 log .Debug ("Starting server on %s:%s (PID: %d)" , network , addr , syscall .Getpid ())
@@ -157,7 +157,7 @@ func (srv *Server) Serve(serve ServeFunction) error {
157157 GetManager ().RegisterServer ()
158158 err := serve (srv .listener )
159159 log .Debug ("Waiting for connections to finish... (PID: %d)" , syscall .Getpid ())
160- srv .wg . Wait ()
160+ srv .waitForActiveConnections ()
161161 srv .setState (stateTerminate )
162162 GetManager ().ServerDone ()
163163 // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil
@@ -181,19 +181,60 @@ func (srv *Server) setState(st state) {
181181 srv .state = st
182182}
183183
184+ func (srv * Server ) waitForActiveConnections () {
185+ srv .lock .Lock ()
186+ for srv .connList .Len () > 0 {
187+ srv .connEmptyCond .Wait ()
188+ }
189+ srv .lock .Unlock ()
190+ }
191+
192+ func (srv * Server ) wrapConnection (c net.Conn ) (net.Conn , error ) {
193+ srv .lock .Lock ()
194+ defer srv .lock .Unlock ()
195+
196+ if srv .state != stateRunning {
197+ _ = c .Close ()
198+ return nil , syscall .EINVAL // same as AcceptTCP
199+ }
200+
201+ wc := & wrappedConn {Conn : c , server : srv }
202+ wc .listElem = srv .connList .PushBack (wc )
203+ return wc , nil
204+ }
205+
206+ func (srv * Server ) removeConnection (conn * wrappedConn ) {
207+ srv .lock .Lock ()
208+ defer srv .lock .Unlock ()
209+
210+ if conn .listElem == nil {
211+ return
212+ }
213+ srv .connList .Remove (conn .listElem )
214+ if srv .connList .Len () == 0 {
215+ srv .connEmptyCond .Broadcast ()
216+ }
217+ }
218+
184219// closeAllConnections forcefully closes all active connections
185220func (srv * Server ) closeAllConnections () {
186- srv .connectionsLock .Lock ()
187- connections := make ([]* wrappedConn , 0 , len (srv .connections ))
188- for conn := range srv .connections {
189- connections = append (connections , conn )
221+ srv .lock .Lock ()
222+ if srv .connList .Len () > 0 {
223+ log .Warn ("Forcefully closing all %d connections" , srv .connList .Len ())
190224 }
191- srv .connectionsLock .Unlock ()
225+ conns := make ([]* wrappedConn , 0 , srv .connList .Len ())
226+ for e := srv .connList .Front (); e != nil ; e = e .Next () {
227+ conn := e .Value .(* wrappedConn )
228+ conn .listElem = nil // mark as removed, will close it later to avoid deadlock of "server.lock"
229+ conns = append (conns , conn )
230+ }
231+ srv .connList = list .New ()
232+ srv .lock .Unlock ()
192233
193- // Close all connections outside the lock to avoid deadlock
194- for _ , conn := range connections {
195- _ = conn .Conn .Close () // Force close the underlying connection
234+ for _ , conn := range conns {
235+ _ = conn .Close () // do real close outside of lock
196236 }
237+ srv .connEmptyCond .Broadcast ()
197238}
198239
199240type filer interface {
@@ -202,61 +243,39 @@ type filer interface {
202243
203244type wrappedListener struct {
204245 net.Listener
205- stopped bool
206- server * Server
246+ server * Server
207247}
208248
249+ var (
250+ _ net.Listener = (* wrappedListener )(nil )
251+ _ filer = (* wrappedListener )(nil )
252+ )
253+
209254func newWrappedListener (l net.Listener , srv * Server ) * wrappedListener {
210255 return & wrappedListener {
211256 Listener : l ,
212257 server : srv ,
213258 }
214259}
215260
216- func (wl * wrappedListener ) Accept () (net.Conn , error ) {
217- var c net.Conn
218- // Set keepalive on TCPListeners connections.
261+ func (wl * wrappedListener ) Accept () (c net.Conn , err error ) {
219262 if tcl , ok := wl .Listener .(* net.TCPListener ); ok {
263+ // Set keepalive on TCPListeners connections if possible, http.tcpKeepAliveListener
220264 tc , err := tcl .AcceptTCP ()
221265 if err != nil {
222266 return nil , err
223267 }
224- _ = tc .SetKeepAlive (true ) // see http.tcpKeepAliveListener
225- _ = tc .SetKeepAlivePeriod (3 * time .Minute ) // see http.tcpKeepAliveListener
268+ _ = tc .SetKeepAlive (true )
269+ _ = tc .SetKeepAlivePeriod (3 * time .Minute )
226270 c = tc
227271 } else {
228- var err error
229272 c , err = wl .Listener .Accept ()
230273 if err != nil {
231274 return nil , err
232275 }
233276 }
234277
235- closed := int32 (0 )
236-
237- wc := & wrappedConn {
238- Conn : c ,
239- server : wl .server ,
240- closed : & closed ,
241- }
242-
243- wl .server .wg .Add (1 )
244-
245- // Track the connection
246- wl .server .connectionsLock .Lock ()
247- wl .server .connections [wc ] = struct {}{}
248- wl .server .connectionsLock .Unlock ()
249-
250- return wc , nil
251- }
252-
253- func (wl * wrappedListener ) Close () error {
254- if wl .stopped {
255- return syscall .EINVAL
256- }
257-
258- wl .stopped = true
259- return wl .Listener .Close ()
278+ return wl .server .wrapConnection (c )
260279}
261280
262281func (wl * wrappedListener ) File () (* os.File , error ) {
@@ -266,8 +285,12 @@ func (wl *wrappedListener) File() (*os.File, error) {
266285
267286type wrappedConn struct {
268287 net.Conn
288+
289+ // listElem is protected by the server's lock (used by the server to remove conn itself from the list)
290+ // nil means it has been removed
291+ listElem * list.Element
292+
269293 server * Server
270- closed * int32
271294 deadline time.Time
272295}
273296
@@ -286,25 +309,6 @@ func (w *wrappedConn) Write(p []byte) (n int, err error) {
286309}
287310
288311func (w * wrappedConn ) Close () error {
289- if atomic .CompareAndSwapInt32 (w .closed , 0 , 1 ) {
290- defer func () {
291- if err := recover (); err != nil {
292- select {
293- case <- GetManager ().IsHammer ():
294- // Likely deadlocked request released at hammertime
295- log .Warn ("Panic during connection close! %v. Likely there has been a deadlocked request which has been released by forced shutdown." , err )
296- default :
297- log .Error ("Panic during connection close! %v" , err )
298- }
299- }
300- }()
301-
302- // Remove from tracked connections
303- w .server .connectionsLock .Lock ()
304- delete (w .server .connections , w )
305- w .server .connectionsLock .Unlock ()
306-
307- w .server .wg .Done ()
308- }
312+ w .server .removeConnection (w )
309313 return w .Conn .Close ()
310314}
0 commit comments