Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix startup with failing configuration #26126

Merged
merged 8 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
- Handle case where policy doesn't contain Fleet connection information {pull}25707[25707]
- Fix fleet-server.yml spec to not overwrite existing keys {pull}25741[25741]
- Agent sends wrong log level to Endpoint {issue}25583[25583]
- Fix startup with failing configuration {pull}26057[26057]
- Change timestamp in elatic-agent-json.log to use UTC {issue}25391[25391]

==== New features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)

Expand Down Expand Up @@ -53,7 +54,13 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error)
func (o *operationConfig) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error(), nil)
// application failed to apply config but is running.
s := state.Degraded
if errors.Is(err, process.ErrAppNotRunning) {
s = state.Failed
}

application.SetState(s, err.Error(), nil)
}
}()
return application.Configure(ctx, o.cfg)
Expand Down
64 changes: 58 additions & 6 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
var (
// ErrAppNotRunning is returned when configuration is performed on not running application.
ErrAppNotRunning = errors.New("application is not running", errors.TypeApplication)
procExitTimeout = 10 * time.Second
)

// Application encapsulates a concrete application ran by elastic-agent e.g Beat.
Expand All @@ -47,6 +48,7 @@ type Application struct {
tag app.Taggable
state state.State
reporter state.Reporter
watchClosers map[int]context.CancelFunc

uid int
gid int
Expand Down Expand Up @@ -105,6 +107,7 @@ func NewApplication(
uid: uid,
gid: gid,
statusReporter: statusController.RegisterApp(id, appName),
watchClosers: make(map[int]context.CancelFunc),
}, nil
}

Expand Down Expand Up @@ -159,6 +162,8 @@ func (a *Application) Stop() {

a.srvState = nil
if a.state.ProcessInfo != nil {
// stop and clean watcher
a.stopWatcher(a.state.ProcessInfo)
if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil {
// no error on signal, so wait for it to stop
_, _ = a.state.ProcessInfo.Process.Wait()
Expand Down Expand Up @@ -192,33 +197,52 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I
case ps := <-a.waitProc(proc.Process):
procState = ps
case <-a.bgContext.Done():
a.Stop()
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: is there any connection between a.bgContext and ctx? Does ctx has to be cancelled when a.bgContext is cancelled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bgContext is like a cancellation token passed from top of the app and cancelled on exit or unenroll, so agent cleans and backs up everything in a nice manner. we want to avoid just shutting down agent without any cleaning as this may turn out problematic

case <-ctx.Done():
// closer called
return
}

a.appLock.Lock()
defer a.appLock.Unlock()
if a.state.ProcessInfo != proc {
// already another process started, another watcher is watching instead
a.appLock.Unlock()
gracefulKill(proc)
return
}

// stop the watcher
a.stopWatcher(a.state.ProcessInfo)

// was already stopped by Stop, do not restart
if a.state.Status == state.Stopped {
return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we missing a `a.appLock.Unlock()t here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking the complete function body, we always use Unlock for each exit path. Better add the defer unlock right after the lock in order to reduce the chance of introducing new deadlocks in the future.

}

a.state.ProcessInfo = nil
srvState := a.srvState

if srvState == nil || srvState.Expected() == proto.StateExpected_STOPPING {
a.appLock.Unlock()
return
}

msg := fmt.Sprintf("exited with code: %d", procState.ExitCode())
a.setState(state.Crashed, msg, nil)
a.setState(state.Restarting, msg, nil)

// it was a crash
a.start(ctx, p, cfg)
a.appLock.Unlock()
a.start(ctx, p, cfg, true)
}()
}

func (a *Application) stopWatcher(procInfo *process.Info) {
if procInfo != nil {
if closer, ok := a.watchClosers[procInfo.PID]; ok {
closer()
delete(a.watchClosers, procInfo.PID)
}
}
}

func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState {
resChan := make(chan *os.ProcessState)

Expand Down Expand Up @@ -250,3 +274,31 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in
func (a *Application) cleanUp() {
a.monitor.Cleanup(a.desc.Spec(), a.pipelineID)
}

func gracefulKill(proc *process.Info) {
if proc == nil || proc.Process == nil {
return
}

// send stop signal to request stop
proc.Stop()

var wg sync.WaitGroup
doneChan := make(chan struct{})
wg.Add(1)
go func() {
wg.Done()
_, _ = proc.Process.Wait()
close(doneChan)
}()

// wait for awaiter
wg.Wait()

// kill in case it's still running after timeout
select {
case <-doneChan:
case <-time.After(procExitTimeout):
_ = proc.Process.Kill()
}
}
35 changes: 28 additions & 7 deletions x-pack/elastic-agent/pkg/core/plugin/process/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,37 @@ func (a *Application) Start(ctx context.Context, t app.Taggable, cfg map[string]
a.appLock.Lock()
defer a.appLock.Unlock()

return a.start(ctx, t, cfg)
return a.start(ctx, t, cfg, false)
}

// Start starts the application without grabbing the lock.
func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}) (err error) {
func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}, isRestart bool) (err error) {
defer func() {
if err != nil {
// inject App metadata
err = errors.New(err, errors.M(errors.MetaKeyAppName, a.name), errors.M(errors.MetaKeyAppName, a.id))
}
}()

// already started if not stopped or crashed
if a.Started() {
// starting only if it's not running
// or if it is, then only in case it's restart and this call initiates from restart call
if a.Started() && a.state.Status != state.Restarting {
if a.state.ProcessInfo == nil {
// already started if not stopped or crashed
return nil
}

// in case app reported status it might still be running and failure timer
// in progress. Stop timer and stop failing process
a.stopFailedTimer()
a.stopWatcher(a.state.ProcessInfo)

// kill the process
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
}

if a.state.Status == state.Restarting && !isRestart {
return nil
}

Expand Down Expand Up @@ -69,7 +86,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]
if a.state.Status != state.Stopped {
// restarting as it was previously in a different state
a.setState(state.Restarting, "Restarting", nil)
} else {
} else if a.state.Status != state.Restarting {
// keep restarting state otherwise it's starting
a.setState(state.Starting, "Starting", nil)
}

Expand Down Expand Up @@ -116,12 +134,15 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]
if err != nil {
return err
}

// write connect info to stdin
go a.writeToStdin(a.srvState, a.state.ProcessInfo.Stdin)

// create closer for watcher, used to terminate watcher without
// side effect of restarting process during shutdown
cancelCtx, cancel := context.WithCancel(ctx)
a.watchClosers[a.state.ProcessInfo.PID] = cancel
// setup watcher
a.watch(ctx, t, a.state.ProcessInfo, cfg)
a.watch(cancelCtx, t, a.state.ProcessInfo, cfg)

return nil
}
Expand Down
30 changes: 22 additions & 8 deletions x-pack/elastic-agent/pkg/core/plugin/process/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)
Expand Down Expand Up @@ -42,7 +43,8 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
_ = yaml.Unmarshal([]byte(s.Config()), &cfg)

// start the failed timer
a.startFailedTimer(cfg)
// pass process info to avoid killing new process spun up in a meantime
a.startFailedTimer(cfg, a.state.ProcessInfo)
} else {
a.stopFailedTimer()
}
Expand All @@ -51,7 +53,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
// startFailedTimer starts a timer that will restart the application if it doesn't exit failed after a period of time.
//
// This does not grab the appLock, that must be managed by the caller.
func (a *Application) startFailedTimer(cfg map[string]interface{}) {
func (a *Application) startFailedTimer(cfg map[string]interface{}, proc *process.Info) {
if a.restartCanceller != nil {
// already have running failed timer; just update config
a.restartConfig = cfg
Expand All @@ -74,7 +76,7 @@ func (a *Application) startFailedTimer(cfg map[string]interface{}) {
case <-ctx.Done():
return
case <-t.C:
a.restart()
a.restart(proc)
}
}()
}
Expand All @@ -91,19 +93,31 @@ func (a *Application) stopFailedTimer() {
}

// restart restarts the application
func (a *Application) restart() {
func (a *Application) restart(proc *process.Info) {
a.appLock.Lock()
defer a.appLock.Unlock()

// stop the watcher
a.stopWatcher(proc)

// kill the process
if a.state.ProcessInfo != nil {
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
if proc != nil && proc.Process != nil {
_ = proc.Process.Kill()
}

if proc != a.state.ProcessInfo {
// we're restarting different process than actually running
// no need to start another one
return
}

a.state.ProcessInfo = nil

ctx := a.startContext
tag := a.tag

err := a.start(ctx, tag, a.restartConfig)
a.setState(state.Restarting, "", nil)
err := a.start(ctx, tag, a.restartConfig, true)
if err != nil {
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
}
Expand Down