diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 9b7a1f5d29d..584b2a8985a 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -68,6 +68,7 @@ - Handle case where policy doesn't contain Fleet connection information {pull}25707[25707] - Fix fleet-server.yml spec to not overwrite existing keys {pull}25741[25741] - Agent sends wrong log level to Endpoint {issue}25583[25583] +- Fix startup with failing configuration {pull}26057[26057] - Change timestamp in elatic-agent-json.log to use UTC {issue}25391[25391] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go index b8c56257aca..00c365a6166 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go @@ -10,6 +10,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" ) @@ -53,7 +54,13 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error) func (o *operationConfig) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - application.SetState(state.Failed, err.Error(), nil) + // application failed to apply config but is running. + s := state.Degraded + if errors.Is(err, process.ErrAppNotRunning) { + s = state.Failed + } + + application.SetState(s, err.Error(), nil) } }() return application.Configure(ctx, o.cfg) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index c0c6341cbd3..635d4e370d5 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -30,6 +30,7 @@ import ( var ( // ErrAppNotRunning is returned when configuration is performed on not running application. ErrAppNotRunning = errors.New("application is not running", errors.TypeApplication) + procExitTimeout = 10 * time.Second ) // Application encapsulates a concrete application ran by elastic-agent e.g Beat. @@ -47,6 +48,7 @@ type Application struct { tag app.Taggable state state.State reporter state.Reporter + watchClosers map[int]context.CancelFunc uid int gid int @@ -105,6 +107,7 @@ func NewApplication( uid: uid, gid: gid, statusReporter: statusController.RegisterApp(id, appName), + watchClosers: make(map[int]context.CancelFunc), }, nil } @@ -159,6 +162,8 @@ func (a *Application) Stop() { a.srvState = nil if a.state.ProcessInfo != nil { + // stop and clean watcher + a.stopWatcher(a.state.ProcessInfo) if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil { // no error on signal, so wait for it to stop _, _ = a.state.ProcessInfo.Process.Wait() @@ -192,33 +197,52 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I case ps := <-a.waitProc(proc.Process): procState = ps case <-a.bgContext.Done(): - a.Stop() + return + case <-ctx.Done(): + // closer called return } a.appLock.Lock() + defer a.appLock.Unlock() if a.state.ProcessInfo != proc { // already another process started, another watcher is watching instead - a.appLock.Unlock() + gracefulKill(proc) + return + } + + // stop the watcher + a.stopWatcher(a.state.ProcessInfo) + + // was already stopped by Stop, do not restart + if a.state.Status == state.Stopped { return } + a.state.ProcessInfo = nil srvState := a.srvState if srvState == nil || srvState.Expected() == proto.StateExpected_STOPPING { - a.appLock.Unlock() return } msg := fmt.Sprintf("exited with code: %d", procState.ExitCode()) - a.setState(state.Crashed, msg, nil) + a.setState(state.Restarting, msg, nil) // it was a crash - a.start(ctx, p, cfg) - a.appLock.Unlock() + a.start(ctx, p, cfg, true) }() } +func (a *Application) stopWatcher(procInfo *process.Info) { + if procInfo != nil { + if closer, ok := a.watchClosers[procInfo.PID]; ok { + closer() + delete(a.watchClosers, procInfo.PID) + } + } +} + func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState { resChan := make(chan *os.ProcessState) @@ -250,3 +274,31 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in func (a *Application) cleanUp() { a.monitor.Cleanup(a.desc.Spec(), a.pipelineID) } + +func gracefulKill(proc *process.Info) { + if proc == nil || proc.Process == nil { + return + } + + // send stop signal to request stop + proc.Stop() + + var wg sync.WaitGroup + doneChan := make(chan struct{}) + wg.Add(1) + go func() { + wg.Done() + _, _ = proc.Process.Wait() + close(doneChan) + }() + + // wait for awaiter + wg.Wait() + + // kill in case it's still running after timeout + select { + case <-doneChan: + case <-time.After(procExitTimeout): + _ = proc.Process.Kill() + } +} diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/start.go b/x-pack/elastic-agent/pkg/core/plugin/process/start.go index f87c439c011..60792649da0 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/start.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/start.go @@ -26,11 +26,11 @@ func (a *Application) Start(ctx context.Context, t app.Taggable, cfg map[string] a.appLock.Lock() defer a.appLock.Unlock() - return a.start(ctx, t, cfg) + return a.start(ctx, t, cfg, false) } // Start starts the application without grabbing the lock. -func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}) (err error) { +func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}, isRestart bool) (err error) { defer func() { if err != nil { // inject App metadata @@ -38,8 +38,25 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] } }() - // already started if not stopped or crashed - if a.Started() { + // starting only if it's not running + // or if it is, then only in case it's restart and this call initiates from restart call + if a.Started() && a.state.Status != state.Restarting { + if a.state.ProcessInfo == nil { + // already started if not stopped or crashed + return nil + } + + // in case app reported status it might still be running and failure timer + // in progress. Stop timer and stop failing process + a.stopFailedTimer() + a.stopWatcher(a.state.ProcessInfo) + + // kill the process + _ = a.state.ProcessInfo.Process.Kill() + a.state.ProcessInfo = nil + } + + if a.state.Status == state.Restarting && !isRestart { return nil } @@ -69,7 +86,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] if a.state.Status != state.Stopped { // restarting as it was previously in a different state a.setState(state.Restarting, "Restarting", nil) - } else { + } else if a.state.Status != state.Restarting { + // keep restarting state otherwise it's starting a.setState(state.Starting, "Starting", nil) } @@ -116,12 +134,15 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] if err != nil { return err } - // write connect info to stdin go a.writeToStdin(a.srvState, a.state.ProcessInfo.Stdin) + // create closer for watcher, used to terminate watcher without + // side effect of restarting process during shutdown + cancelCtx, cancel := context.WithCancel(ctx) + a.watchClosers[a.state.ProcessInfo.PID] = cancel // setup watcher - a.watch(ctx, t, a.state.ProcessInfo, cfg) + a.watch(cancelCtx, t, a.state.ProcessInfo, cfg) return nil } diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/status.go b/x-pack/elastic-agent/pkg/core/plugin/process/status.go index 02c38d6b82d..c335b3b6446 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/status.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/status.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" ) @@ -42,7 +43,8 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St _ = yaml.Unmarshal([]byte(s.Config()), &cfg) // start the failed timer - a.startFailedTimer(cfg) + // pass process info to avoid killing new process spun up in a meantime + a.startFailedTimer(cfg, a.state.ProcessInfo) } else { a.stopFailedTimer() } @@ -51,7 +53,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St // startFailedTimer starts a timer that will restart the application if it doesn't exit failed after a period of time. // // This does not grab the appLock, that must be managed by the caller. -func (a *Application) startFailedTimer(cfg map[string]interface{}) { +func (a *Application) startFailedTimer(cfg map[string]interface{}, proc *process.Info) { if a.restartCanceller != nil { // already have running failed timer; just update config a.restartConfig = cfg @@ -74,7 +76,7 @@ func (a *Application) startFailedTimer(cfg map[string]interface{}) { case <-ctx.Done(): return case <-t.C: - a.restart() + a.restart(proc) } }() } @@ -91,19 +93,31 @@ func (a *Application) stopFailedTimer() { } // restart restarts the application -func (a *Application) restart() { +func (a *Application) restart(proc *process.Info) { a.appLock.Lock() defer a.appLock.Unlock() + // stop the watcher + a.stopWatcher(proc) + // kill the process - if a.state.ProcessInfo != nil { - _ = a.state.ProcessInfo.Process.Kill() - a.state.ProcessInfo = nil + if proc != nil && proc.Process != nil { + _ = proc.Process.Kill() + } + + if proc != a.state.ProcessInfo { + // we're restarting different process than actually running + // no need to start another one + return } + + a.state.ProcessInfo = nil + ctx := a.startContext tag := a.tag - err := a.start(ctx, tag, a.restartConfig) + a.setState(state.Restarting, "", nil) + err := a.start(ctx, tag, a.restartConfig, true) if err != nil { a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil) }