From a25f2bd990f027213ec99bb30232ccfb052e4a20 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 3 Jun 2021 07:51:32 +0200 Subject: [PATCH 1/5] all processes must die --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + .../pkg/agent/operation/operation_config.go | 9 ++++- .../pkg/core/plugin/process/app.go | 35 +++++++++++++++++-- .../pkg/core/plugin/process/start.go | 35 +++++++++++++++---- .../pkg/core/plugin/process/status.go | 30 +++++++++++----- 5 files changed, 91 insertions(+), 19 deletions(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 914d45258bc..e5f79b1b2a5 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -68,6 +68,7 @@ - Handle case where policy doesn't contain Fleet connection information {pull}25707[25707] - Fix fleet-server.yml spec to not overwrite existing keys {pull}25741[25741] - Agent sends wrong log level to Endpoint {issue}25583[25583] +- Fix startup with failing configuration {pull}26057[26057] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go index b8c56257aca..00c365a6166 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go @@ -10,6 +10,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" ) @@ -53,7 +54,13 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error) func (o *operationConfig) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - application.SetState(state.Failed, err.Error(), nil) + // application failed to apply config but is running. + s := state.Degraded + if errors.Is(err, process.ErrAppNotRunning) { + s = state.Failed + } + + application.SetState(s, err.Error(), nil) } }() return application.Configure(ctx, o.cfg) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index c0c6341cbd3..6834fbbffc6 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -47,6 +47,7 @@ type Application struct { tag app.Taggable state state.State reporter state.Reporter + watchClosers map[int]context.CancelFunc uid int gid int @@ -105,6 +106,7 @@ func NewApplication( uid: uid, gid: gid, statusReporter: statusController.RegisterApp(id, appName), + watchClosers: make(map[int]context.CancelFunc), }, nil } @@ -159,6 +161,8 @@ func (a *Application) Stop() { a.srvState = nil if a.state.ProcessInfo != nil { + // stop and clean watcher + a.stopWatcher(a.state.ProcessInfo) if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil { // no error on signal, so wait for it to stop _, _ = a.state.ProcessInfo.Process.Wait() @@ -192,16 +196,32 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I case ps := <-a.waitProc(proc.Process): procState = ps case <-a.bgContext.Done(): - a.Stop() + return + case <-ctx.Done(): + // closer called return } a.appLock.Lock() if a.state.ProcessInfo != proc { + // kill original process if possible + if proc != nil && proc.Process != nil { + _ = proc.Process.Kill() + } + // already another process started, another watcher is watching instead a.appLock.Unlock() return } + + // stop the watcher + a.stopWatcher(a.state.ProcessInfo) + + // was already stopped by Stop, do not restart + if a.state.Status == state.Stopped { + return + } + a.state.ProcessInfo = nil srvState := a.srvState @@ -211,14 +231,23 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I } msg := fmt.Sprintf("exited with code: %d", procState.ExitCode()) - a.setState(state.Crashed, msg, nil) + a.setState(state.Restarting, msg, nil) // it was a crash - a.start(ctx, p, cfg) + a.start(ctx, p, cfg, true) a.appLock.Unlock() }() } +func (a *Application) stopWatcher(procInfo *process.Info) { + if procInfo != nil { + if closer, ok := a.watchClosers[procInfo.PID]; ok { + closer() + delete(a.watchClosers, procInfo.PID) + } + } +} + func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState { resChan := make(chan *os.ProcessState) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/start.go b/x-pack/elastic-agent/pkg/core/plugin/process/start.go index f87c439c011..60792649da0 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/start.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/start.go @@ -26,11 +26,11 @@ func (a *Application) Start(ctx context.Context, t app.Taggable, cfg map[string] a.appLock.Lock() defer a.appLock.Unlock() - return a.start(ctx, t, cfg) + return a.start(ctx, t, cfg, false) } // Start starts the application without grabbing the lock. -func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}) (err error) { +func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}, isRestart bool) (err error) { defer func() { if err != nil { // inject App metadata @@ -38,8 +38,25 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] } }() - // already started if not stopped or crashed - if a.Started() { + // starting only if it's not running + // or if it is, then only in case it's restart and this call initiates from restart call + if a.Started() && a.state.Status != state.Restarting { + if a.state.ProcessInfo == nil { + // already started if not stopped or crashed + return nil + } + + // in case app reported status it might still be running and failure timer + // in progress. Stop timer and stop failing process + a.stopFailedTimer() + a.stopWatcher(a.state.ProcessInfo) + + // kill the process + _ = a.state.ProcessInfo.Process.Kill() + a.state.ProcessInfo = nil + } + + if a.state.Status == state.Restarting && !isRestart { return nil } @@ -69,7 +86,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] if a.state.Status != state.Stopped { // restarting as it was previously in a different state a.setState(state.Restarting, "Restarting", nil) - } else { + } else if a.state.Status != state.Restarting { + // keep restarting state otherwise it's starting a.setState(state.Starting, "Starting", nil) } @@ -116,12 +134,15 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] if err != nil { return err } - // write connect info to stdin go a.writeToStdin(a.srvState, a.state.ProcessInfo.Stdin) + // create closer for watcher, used to terminate watcher without + // side effect of restarting process during shutdown + cancelCtx, cancel := context.WithCancel(ctx) + a.watchClosers[a.state.ProcessInfo.PID] = cancel // setup watcher - a.watch(ctx, t, a.state.ProcessInfo, cfg) + a.watch(cancelCtx, t, a.state.ProcessInfo, cfg) return nil } diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/status.go b/x-pack/elastic-agent/pkg/core/plugin/process/status.go index 02c38d6b82d..c335b3b6446 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/status.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/status.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" ) @@ -42,7 +43,8 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St _ = yaml.Unmarshal([]byte(s.Config()), &cfg) // start the failed timer - a.startFailedTimer(cfg) + // pass process info to avoid killing new process spun up in a meantime + a.startFailedTimer(cfg, a.state.ProcessInfo) } else { a.stopFailedTimer() } @@ -51,7 +53,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St // startFailedTimer starts a timer that will restart the application if it doesn't exit failed after a period of time. // // This does not grab the appLock, that must be managed by the caller. -func (a *Application) startFailedTimer(cfg map[string]interface{}) { +func (a *Application) startFailedTimer(cfg map[string]interface{}, proc *process.Info) { if a.restartCanceller != nil { // already have running failed timer; just update config a.restartConfig = cfg @@ -74,7 +76,7 @@ func (a *Application) startFailedTimer(cfg map[string]interface{}) { case <-ctx.Done(): return case <-t.C: - a.restart() + a.restart(proc) } }() } @@ -91,19 +93,31 @@ func (a *Application) stopFailedTimer() { } // restart restarts the application -func (a *Application) restart() { +func (a *Application) restart(proc *process.Info) { a.appLock.Lock() defer a.appLock.Unlock() + // stop the watcher + a.stopWatcher(proc) + // kill the process - if a.state.ProcessInfo != nil { - _ = a.state.ProcessInfo.Process.Kill() - a.state.ProcessInfo = nil + if proc != nil && proc.Process != nil { + _ = proc.Process.Kill() + } + + if proc != a.state.ProcessInfo { + // we're restarting different process than actually running + // no need to start another one + return } + + a.state.ProcessInfo = nil + ctx := a.startContext tag := a.tag - err := a.start(ctx, tag, a.restartConfig) + a.setState(state.Restarting, "", nil) + err := a.start(ctx, tag, a.restartConfig, true) if err != nil { a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil) } From 1fcadedafc9ede0407ed681f6a79eefe50e6b304 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 4 Jun 2021 16:55:33 +0200 Subject: [PATCH 2/5] missing unlock --- x-pack/elastic-agent/pkg/core/plugin/process/app.go | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index 6834fbbffc6..7e5a228e505 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -219,6 +219,7 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I // was already stopped by Stop, do not restart if a.state.Status == state.Stopped { + a.appLock.Unlock() return } From 3f8aeb5b8e496cdfe739ec20b70f6057496e55e2 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 4 Jun 2021 17:24:31 +0200 Subject: [PATCH 3/5] graceful kill --- .../pkg/core/plugin/process/app.go | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index 7e5a228e505..a44f568ec05 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -30,6 +30,7 @@ import ( var ( // ErrAppNotRunning is returned when configuration is performed on not running application. ErrAppNotRunning = errors.New("application is not running", errors.TypeApplication) + procExitTimeout = 10 * time.Second ) // Application encapsulates a concrete application ran by elastic-agent e.g Beat. @@ -204,10 +205,7 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I a.appLock.Lock() if a.state.ProcessInfo != proc { - // kill original process if possible - if proc != nil && proc.Process != nil { - _ = proc.Process.Kill() - } + gracefulKill(proc) // already another process started, another watcher is watching instead a.appLock.Unlock() @@ -280,3 +278,31 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in func (a *Application) cleanUp() { a.monitor.Cleanup(a.desc.Spec(), a.pipelineID) } + +func gracefulKill(proc *process.Info) { + if proc == nil || proc.Process == nil { + return + } + + // send stop signal to request stop + proc.Process.Signal(os.Interrupt) + + var wg sync.WaitGroup + doneChan := make(chan struct{}) + wg.Add(1) + go func() { + wg.Done() + _, _ = proc.Process.Wait() + close(doneChan) + }() + + // wait for awaiter + wg.Wait() + + // kill in case it's still running after timeout + select { + case <-doneChan: + case <-time.After(procExitTimeout): + _ = proc.Process.Kill() + } +} From 6b748382e83d8f63260a2137181a6e00ae8c63bc Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 4 Jun 2021 17:30:31 +0200 Subject: [PATCH 4/5] unlock in defer --- x-pack/elastic-agent/pkg/core/plugin/process/app.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index a44f568ec05..6b62c48f0d9 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -204,11 +204,10 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I } a.appLock.Lock() + defer a.appLock.Unlock() if a.state.ProcessInfo != proc { - gracefulKill(proc) - // already another process started, another watcher is watching instead - a.appLock.Unlock() + gracefulKill(proc) return } @@ -217,7 +216,6 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I // was already stopped by Stop, do not restart if a.state.Status == state.Stopped { - a.appLock.Unlock() return } @@ -225,7 +223,6 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I srvState := a.srvState if srvState == nil || srvState.Expected() == proto.StateExpected_STOPPING { - a.appLock.Unlock() return } @@ -234,7 +231,6 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I // it was a crash a.start(ctx, p, cfg, true) - a.appLock.Unlock() }() } From 47db569e71bf0214938ce94cb579c94f357522b1 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 8 Jun 2021 14:50:56 +0200 Subject: [PATCH 5/5] use stop, windows does not implement sending interupt --- x-pack/elastic-agent/pkg/core/plugin/process/app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index 6b62c48f0d9..635d4e370d5 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -281,7 +281,7 @@ func gracefulKill(proc *process.Info) { } // send stop signal to request stop - proc.Process.Signal(os.Interrupt) + proc.Stop() var wg sync.WaitGroup doneChan := make(chan struct{})