Skip to content

Commit

Permalink
Improve graceful manager code/comment (#28063)
Browse files Browse the repository at this point in the history
The graceful manager has some bugs (#27643, #28062). This is a
preparation for further fixes.
  • Loading branch information
wxiaoguang authored Nov 15, 2023
1 parent f65977d commit 79394b3
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 83 deletions.
7 changes: 7 additions & 0 deletions modules/graceful/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"context"
)

// Shutdown procedure:
// * cancel ShutdownContext: the registered context consumers have time to do their cleanup (they could use the hammer context)
// * cancel HammerContext: the all context consumers have limited time to do their cleanup (wait for a few seconds)
// * cancel TerminateContext: the registered context consumers have time to do their cleanup (but they shouldn't use shutdown/hammer context anymore)
// * cancel manager context
// If the shutdown is triggered again during the shutdown procedure, the hammer context will be canceled immediately to force to shut down.

// ShutdownContext returns a context.Context that is Done at shutdown
// Callers using this context should ensure that they are registered as a running server
// in order that they are waited for.
Expand Down
52 changes: 8 additions & 44 deletions modules/graceful/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ type RunCanceler interface {
// and add a function to call manager.InformCleanup if it's not going to be used
const numberOfServersToCreate = 4

// Manager represents the graceful server manager interface
var manager *Manager

var initOnce = sync.Once{}
var (
manager *Manager
initOnce sync.Once
)

// GetManager returns the Manager
func GetManager() *Manager {
Expand Down Expand Up @@ -147,12 +147,12 @@ func (g *Manager) doShutdown() {
go g.doHammerTime(setting.GracefulHammerTime)
}
go func() {
g.WaitForServers()
g.runningServerWaitGroup.Wait()
// Mop up any remaining unclosed events.
g.doHammerTime(0)
<-time.After(1 * time.Second)
g.doTerminate()
g.WaitForTerminate()
g.terminateWaitGroup.Wait()
g.lock.Lock()
g.managerCtxCancel()
g.lock.Unlock()
Expand Down Expand Up @@ -199,55 +199,26 @@ func (g *Manager) IsChild() bool {
}

// IsShutdown returns a channel which will be closed at shutdown.
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
// The order of closure is shutdown, hammer (potentially), terminate
func (g *Manager) IsShutdown() <-chan struct{} {
return g.shutdownCtx.Done()
}

// IsHammer returns a channel which will be closed at hammer
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
// IsHammer returns a channel which will be closed at hammer.
// Servers running within the running server wait group should respond to IsHammer
// if not shutdown already
func (g *Manager) IsHammer() <-chan struct{} {
return g.hammerCtx.Done()
}

// IsTerminate returns a channel which will be closed at terminate
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
// IsTerminate will only close once all running servers have stopped
func (g *Manager) IsTerminate() <-chan struct{} {
return g.terminateCtx.Done()
}

// ServerDone declares a running server done and subtracts one from the
// running server wait group. Users probably do not want to call this
// and should use one of the RunWithShutdown* functions
func (g *Manager) ServerDone() {
g.runningServerWaitGroup.Done()
}

// WaitForServers waits for all running servers to finish. Users should probably
// instead use AtTerminate or IsTerminate
func (g *Manager) WaitForServers() {
g.runningServerWaitGroup.Wait()
}

// WaitForTerminate waits for all terminating actions to finish.
// Only the main go-routine should use this
func (g *Manager) WaitForTerminate() {
g.terminateWaitGroup.Wait()
}

func (g *Manager) getState() state {
g.lock.RLock()
defer g.lock.RUnlock()
return g.state
}

func (g *Manager) setStateTransition(old, new state) bool {
if old != g.getState() {
return false
}
g.lock.Lock()
if g.state != old {
g.lock.Unlock()
Expand All @@ -258,13 +229,6 @@ func (g *Manager) setStateTransition(old, new state) bool {
return true
}

func (g *Manager) setState(st state) {
g.lock.Lock()
defer g.lock.Unlock()

g.state = st
}

// InformCleanup tells the cleanup wait group that we have either taken a listener or will not be taking a listener.
// At the moment the total number of servers (numberOfServersToCreate) are pre-defined as a const before global init,
// so this function MUST be called if a server is not used.
Expand Down
4 changes: 3 additions & 1 deletion modules/graceful/manager_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ func (g *Manager) start(ctx context.Context) {
defer pprof.SetGoroutineLabels(ctx)

// Set the running state & handle signals
g.setState(stateRunning)
if !g.setStateTransition(stateInit, stateRunning) {
panic("invalid graceful manager state: transition from init to running failed")
}
g.notify(statusMsg("Starting Gitea"))
g.notify(pidMsg())
go g.handleSignals(g.managerCtx)
Expand Down
4 changes: 3 additions & 1 deletion modules/graceful/manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func (g *Manager) start() {
g.shutdownRequested = make(chan struct{})

// Set the running state
g.setState(stateRunning)
if !g.setStateTransition(stateInit, stateRunning) {
panic("invalid graceful manager state: transition from init to running failed")
}
if skip, _ := strconv.ParseBool(os.Getenv("SKIP_MINWINSVC")); skip {
log.Trace("Skipping SVC check as SKIP_MINWINSVC is set")
return
Expand Down
8 changes: 2 additions & 6 deletions modules/graceful/net_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,8 @@ func CloseProvidedListeners() error {
return returnableError
}

// DefaultGetListener obtains a listener for the local network address. The network must be
// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket". It
// returns an provided net.Listener for the matching network and address, or
// creates a new one using net.Listen. This function can be replaced by changing the
// GetListener variable at the top of this file, for example to listen on an onion service using
// github.com/cretz/bine
// DefaultGetListener obtains a listener for the stream-oriented local network address:
// "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
func DefaultGetListener(network, address string) (net.Listener, error) {
// Add a deferral to say that we've tried to grab a listener
defer GetManager().InformCleanup()
Expand Down
4 changes: 1 addition & 3 deletions modules/graceful/net_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ package graceful
import "net"

// DefaultGetListener obtains a listener for the local network address.
// On windows this is basically just a shim around net.Listen. This function
// can be replaced by changing the GetListener variable at the top of this file,
// for example to listen on an onion service using github.com/cretz/bine
// On windows this is basically just a shim around net.Listen.
func DefaultGetListener(network, address string) (net.Listener, error) {
// Add a deferral to say that we've tried to grab a listener
defer GetManager().InformCleanup()
Expand Down
26 changes: 3 additions & 23 deletions modules/graceful/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,11 @@ import (
"code.gitea.io/gitea/modules/setting"
)

var (
// DefaultReadTimeOut default read timeout
DefaultReadTimeOut time.Duration
// DefaultWriteTimeOut default write timeout
DefaultWriteTimeOut time.Duration
// DefaultMaxHeaderBytes default max header bytes
DefaultMaxHeaderBytes int
// PerWriteWriteTimeout timeout for writes
PerWriteWriteTimeout = 30 * time.Second
// PerWriteWriteTimeoutKbTime is a timeout taking account of how much there is to be written
PerWriteWriteTimeoutKbTime = 10 * time.Second
)

// GetListener returns a listener from a GetListener function, which must have the
// signature: `func FunctioName(network, address string) (net.Listener, error)`.
// This determines the implementation of net.Listener which the server will use.`
// It is implemented in this way so that downstreams may specify the type of listener
// they want to provide Gitea on by default, such as with a hidden service or a p2p network
// No need to worry about "breaking" if there would be a refactoring for the Listeners. No compatibility-guarantee for this mechanism
// GetListener returns a net listener
// This determines the implementation of net.Listener which the server will use,
// so that downstreams could provide their own Listener, such as with a hidden service or a p2p network
var GetListener = DefaultGetListener

func init() {
DefaultMaxHeaderBytes = 0 // use http.DefaultMaxHeaderBytes - which currently is 1 << 20 (1MB)
}

// ServeFunction represents a listen.Accept loop
type ServeFunction = func(net.Listener) error

Expand Down
7 changes: 2 additions & 5 deletions modules/graceful/server_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ import (
func newHTTPServer(network, address, name string, handler http.Handler) (*Server, ServeFunction) {
server := NewServer(network, address, name)
httpServer := http.Server{
ReadTimeout: DefaultReadTimeOut,
WriteTimeout: DefaultWriteTimeOut,
MaxHeaderBytes: DefaultMaxHeaderBytes,
Handler: handler,
BaseContext: func(net.Listener) context.Context { return GetManager().HammerContext() },
Handler: handler,
BaseContext: func(net.Listener) context.Context { return GetManager().HammerContext() },
}
server.OnShutdown = func() {
httpServer.SetKeepAlivesEnabled(false)
Expand Down

0 comments on commit 79394b3

Please sign in to comment.