-
Notifications
You must be signed in to change notification settings - Fork 290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implements ShutdownCode option and ShutdownSignal os.Signal wrapper #912
Changes from 8 commits
faa89e5
995e672
c9972dd
638daf4
7ebd130
b4e5c2a
848742a
a69d369
196f699
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -276,6 +276,9 @@ type App struct { | |
clock fxclock.Clock | ||
lifecycle *lifecycleWrapper | ||
|
||
stopch chan struct{} // closed when Stop is called | ||
stopChLock sync.RWMutex // mutex for init and closing of stopch | ||
|
||
container *dig.Container | ||
root *module | ||
modules []*module | ||
|
@@ -286,10 +289,16 @@ type App struct { | |
// Decides how we react to errors when building the graph. | ||
errorHooks []ErrorHandler | ||
validate bool | ||
|
||
// Used to signal shutdowns. | ||
donesMu sync.Mutex // guards dones and shutdownSig | ||
dones []chan os.Signal | ||
shutdownSig os.Signal | ||
shutdownMu sync.Mutex | ||
shutdownSig *ShutdownSignal | ||
sigReceivers []signalReceiver | ||
signalOnce sync.Once | ||
|
||
// Used to make sure Start/Stop is called only once. | ||
runStart sync.Once | ||
runStop sync.Once | ||
|
||
osExit func(code int) // os.Exit override; used for testing only | ||
} | ||
|
@@ -394,6 +403,7 @@ func New(opts ...Option) *App { | |
startTimeout: DefaultTimeout, | ||
stopTimeout: DefaultTimeout, | ||
} | ||
|
||
app.root = &module{ | ||
app: app, | ||
// We start with a logger that writes to stderr. One of the | ||
|
@@ -544,27 +554,32 @@ func (app *App) Run() { | |
// Historically, we do not os.Exit(0) even though most applications | ||
// cede control to Fx with they call app.Run. To avoid a breaking | ||
// change, never os.Exit for success. | ||
if code := app.run(app.Done()); code != 0 { | ||
if code := app.run(app.Wait()); code != 0 { | ||
app.exit(code) | ||
} | ||
} | ||
|
||
func (app *App) run(done <-chan os.Signal) (exitCode int) { | ||
func (app *App) run(done <-chan ShutdownSignal) (exitCode int) { | ||
startCtx, cancel := app.clock.WithTimeout(context.Background(), app.StartTimeout()) | ||
defer cancel() | ||
|
||
if err := app.Start(startCtx); err != nil { | ||
app.closeStopChannel() | ||
return 1 | ||
} | ||
|
||
sig := <-done | ||
app.log().LogEvent(&fxevent.Stopping{Signal: sig}) | ||
app.log().LogEvent(&fxevent.Stopping{Signal: sig.Signal}) | ||
|
||
stopCtx, cancel := app.clock.WithTimeout(context.Background(), app.StopTimeout()) | ||
defer cancel() | ||
|
||
if err := app.Stop(stopCtx); err != nil { | ||
return 1 | ||
// if we encounter a timeout during stop, force exit code 1 | ||
if errors.Is(err, context.DeadlineExceeded) { | ||
return 1 | ||
} | ||
return sig.ExitCode | ||
} | ||
|
||
return 0 | ||
|
@@ -605,14 +620,18 @@ var ( | |
// encountered any errors in application initialization. | ||
func (app *App) Start(ctx context.Context) (err error) { | ||
defer func() { | ||
app.log().LogEvent(&fxevent.Started{Err: err}) | ||
app.runStart.Do(func() { | ||
app.log().LogEvent(&fxevent.Started{Err: err}) | ||
}) | ||
Comment on lines
+623
to
+625
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the LogEvent need to be guarded in the Once? We're allowing multiple actual Start invocations, so they should be able to log multiple times? |
||
}() | ||
|
||
if app.err != nil { | ||
// Some provides failed, short-circuit immediately. | ||
return app.err | ||
} | ||
|
||
app.initStopChannel() | ||
|
||
return withTimeout(ctx, &withTimeoutParams{ | ||
hook: _onStartHook, | ||
callback: app.start, | ||
|
@@ -638,6 +657,30 @@ func (app *App) start(ctx context.Context) error { | |
return nil | ||
} | ||
|
||
func (app *App) initStopChannel() { | ||
app.stopChLock.Lock() | ||
defer app.stopChLock.Unlock() | ||
if app.stopch == nil { | ||
app.stopch = make(chan struct{}) | ||
} | ||
} | ||
|
||
func (app *App) stopChannel() chan struct{} { | ||
app.stopChLock.RLock() | ||
defer app.stopChLock.RUnlock() | ||
ch := app.stopch | ||
return ch | ||
} | ||
|
||
func (app *App) closeStopChannel() { | ||
app.stopChLock.Lock() | ||
defer app.stopChLock.Unlock() | ||
if app.stopch != nil { | ||
close(app.stopch) | ||
app.stopch = nil | ||
} | ||
} | ||
|
||
// Stop gracefully stops the application. It executes any registered OnStop | ||
// hooks in reverse order, so that each constructor's stop hooks are called | ||
// before its dependencies' stop hooks. | ||
|
@@ -646,16 +689,23 @@ func (app *App) start(ctx context.Context) error { | |
// called are executed. However, all those hooks are executed, even if some | ||
// fail. | ||
func (app *App) Stop(ctx context.Context) (err error) { | ||
|
||
defer func() { | ||
app.log().LogEvent(&fxevent.Stopped{Err: err}) | ||
// Protect the Stop hooks from being called multiple times. | ||
app.runStop.Do(func() { | ||
app.log().LogEvent(&fxevent.Stopped{Err: err}) | ||
app.closeStopChannel() | ||
}) | ||
Comment on lines
+695
to
+698
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. I'm not sure we need this guard? If there's a stop channel because of Start, then there should be a defer close. |
||
}() | ||
|
||
return withTimeout(ctx, &withTimeoutParams{ | ||
err = withTimeout(ctx, &withTimeoutParams{ | ||
hook: _onStopHook, | ||
callback: app.lifecycle.Stop, | ||
lifecycle: app.lifecycle, | ||
log: app.log(), | ||
}) | ||
|
||
return | ||
} | ||
|
||
// Done returns a channel of signals to block on after starting the | ||
|
@@ -666,21 +716,53 @@ func (app *App) Stop(ctx context.Context) (err error) { | |
// Alternatively, a signal can be broadcast to all done channels manually by | ||
// using the Shutdown functionality (see the Shutdowner documentation for details). | ||
func (app *App) Done() <-chan os.Signal { | ||
c := make(chan os.Signal, 1) | ||
rcv, ch := newOSSignalReceiver() | ||
app.appendSignalReceiver(rcv) | ||
return ch | ||
} | ||
|
||
app.donesMu.Lock() | ||
defer app.donesMu.Unlock() | ||
// If shutdown signal has been received already | ||
// send it and return. If not, wait for user to send a termination | ||
// signal. | ||
if app.shutdownSig != nil { | ||
c <- app.shutdownSig | ||
return c | ||
} | ||
func (app *App) Wait() <-chan ShutdownSignal { | ||
rcv, ch := newShutdownSignalReceiver() | ||
app.appendSignalReceiver(rcv) | ||
return ch | ||
} | ||
|
||
func (app *App) appendSignalReceiver(r signalReceiver) { | ||
app.shutdownMu.Lock() | ||
defer app.shutdownMu.Unlock() | ||
|
||
signal.Notify(c, os.Interrupt, _sigINT, _sigTERM) | ||
app.dones = append(app.dones, c) | ||
return c | ||
// If shutdown signal has been received already | ||
// send it and return. | ||
// If not, wait for user to send a termination signal. | ||
if sig := app.shutdownSig; sig != nil { | ||
// Ignore the error from ReceiveSignal. | ||
// This is a newly created channel and can't possibly be | ||
// blocked. | ||
_ = r.ReceiveShutdownSignal(*sig) | ||
return | ||
} | ||
|
||
app.sigReceivers = append(app.sigReceivers, r) | ||
|
||
// The first time either Wait or Done is called, | ||
// register an OS signal handler | ||
// and make that broadcast the signal to all sigReceivers | ||
// regardless of whether they're Wait or Done based. | ||
app.signalOnce.Do(func() { | ||
sigch := make(chan os.Signal, 1) | ||
signal.Notify(sigch, os.Interrupt, _sigINT, _sigTERM) | ||
go func() { | ||
// if the stop channel is nil; that means that the app was never started | ||
// thus, do not broadcast any signals | ||
if stopch := app.stopChannel(); stopch != nil { | ||
select { | ||
case sig := <-sigch: | ||
app.broadcastSignal(sig, 1) | ||
case <-stopch: | ||
} | ||
} | ||
}() | ||
}) | ||
} | ||
|
||
// StartTimeout returns the configured startup timeout. Apps default to using | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need this to be included in this PR? I'm asking because this stuff was supposed to ship with the previous version (v1.17.2) and it broke some users: see #945 and #950 for context.