diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index fd1556ca0ce..cdbd914759b 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -68,7 +68,6 @@ - Handle case where policy doesn't contain Fleet connection information {pull}25707[25707] - Fix fleet-server.yml spec to not overwrite existing keys {pull}25741[25741] - Agent sends wrong log level to Endpoint {issue}25583[25583] -- Fix startup with failing configuration {pull}26057[26057] - Change timestamp in elatic-agent-json.log to use UTC {issue}25391[25391] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go index 00c365a6166..b8c56257aca 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go @@ -10,7 +10,6 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" ) @@ -54,13 +53,7 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error) func (o *operationConfig) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - // application failed to apply config but is running. - s := state.Degraded - if errors.Is(err, process.ErrAppNotRunning) { - s = state.Failed - } - - application.SetState(s, err.Error(), nil) + application.SetState(state.Failed, err.Error(), nil) } }() return application.Configure(ctx, o.cfg) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index 635d4e370d5..c0c6341cbd3 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -30,7 +30,6 @@ import ( var ( // ErrAppNotRunning is returned when configuration is performed on not running application. ErrAppNotRunning = errors.New("application is not running", errors.TypeApplication) - procExitTimeout = 10 * time.Second ) // Application encapsulates a concrete application ran by elastic-agent e.g Beat. @@ -48,7 +47,6 @@ type Application struct { tag app.Taggable state state.State reporter state.Reporter - watchClosers map[int]context.CancelFunc uid int gid int @@ -107,7 +105,6 @@ func NewApplication( uid: uid, gid: gid, statusReporter: statusController.RegisterApp(id, appName), - watchClosers: make(map[int]context.CancelFunc), }, nil } @@ -162,8 +159,6 @@ func (a *Application) Stop() { a.srvState = nil if a.state.ProcessInfo != nil { - // stop and clean watcher - a.stopWatcher(a.state.ProcessInfo) if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil { // no error on signal, so wait for it to stop _, _ = a.state.ProcessInfo.Process.Wait() @@ -197,52 +192,33 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I case ps := <-a.waitProc(proc.Process): procState = ps case <-a.bgContext.Done(): - return - case <-ctx.Done(): - // closer called + a.Stop() return } a.appLock.Lock() - defer a.appLock.Unlock() if a.state.ProcessInfo != proc { // already another process started, another watcher is watching instead - gracefulKill(proc) - return - } - - // stop the watcher - a.stopWatcher(a.state.ProcessInfo) - - // was already stopped by Stop, do not restart - if a.state.Status == state.Stopped { + a.appLock.Unlock() return } - a.state.ProcessInfo = nil srvState := a.srvState if srvState == nil || srvState.Expected() == proto.StateExpected_STOPPING { + a.appLock.Unlock() return } msg := fmt.Sprintf("exited with code: %d", procState.ExitCode()) - a.setState(state.Restarting, msg, nil) + a.setState(state.Crashed, msg, nil) // it was a crash - a.start(ctx, p, cfg, true) + a.start(ctx, p, cfg) + a.appLock.Unlock() }() } -func (a *Application) stopWatcher(procInfo *process.Info) { - if procInfo != nil { - if closer, ok := a.watchClosers[procInfo.PID]; ok { - closer() - delete(a.watchClosers, procInfo.PID) - } - } -} - func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState { resChan := make(chan *os.ProcessState) @@ -274,31 +250,3 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in func (a *Application) cleanUp() { a.monitor.Cleanup(a.desc.Spec(), a.pipelineID) } - -func gracefulKill(proc *process.Info) { - if proc == nil || proc.Process == nil { - return - } - - // send stop signal to request stop - proc.Stop() - - var wg sync.WaitGroup - doneChan := make(chan struct{}) - wg.Add(1) - go func() { - wg.Done() - _, _ = proc.Process.Wait() - close(doneChan) - }() - - // wait for awaiter - wg.Wait() - - // kill in case it's still running after timeout - select { - case <-doneChan: - case <-time.After(procExitTimeout): - _ = proc.Process.Kill() - } -} diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/start.go b/x-pack/elastic-agent/pkg/core/plugin/process/start.go index 60792649da0..f87c439c011 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/start.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/start.go @@ -26,11 +26,11 @@ func (a *Application) Start(ctx context.Context, t app.Taggable, cfg map[string] a.appLock.Lock() defer a.appLock.Unlock() - return a.start(ctx, t, cfg, false) + return a.start(ctx, t, cfg) } // Start starts the application without grabbing the lock. -func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}, isRestart bool) (err error) { +func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}) (err error) { defer func() { if err != nil { // inject App metadata @@ -38,25 +38,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] } }() - // starting only if it's not running - // or if it is, then only in case it's restart and this call initiates from restart call - if a.Started() && a.state.Status != state.Restarting { - if a.state.ProcessInfo == nil { - // already started if not stopped or crashed - return nil - } - - // in case app reported status it might still be running and failure timer - // in progress. Stop timer and stop failing process - a.stopFailedTimer() - a.stopWatcher(a.state.ProcessInfo) - - // kill the process - _ = a.state.ProcessInfo.Process.Kill() - a.state.ProcessInfo = nil - } - - if a.state.Status == state.Restarting && !isRestart { + // already started if not stopped or crashed + if a.Started() { return nil } @@ -86,8 +69,7 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] if a.state.Status != state.Stopped { // restarting as it was previously in a different state a.setState(state.Restarting, "Restarting", nil) - } else if a.state.Status != state.Restarting { - // keep restarting state otherwise it's starting + } else { a.setState(state.Starting, "Starting", nil) } @@ -134,15 +116,12 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] if err != nil { return err } + // write connect info to stdin go a.writeToStdin(a.srvState, a.state.ProcessInfo.Stdin) - // create closer for watcher, used to terminate watcher without - // side effect of restarting process during shutdown - cancelCtx, cancel := context.WithCancel(ctx) - a.watchClosers[a.state.ProcessInfo.PID] = cancel // setup watcher - a.watch(cancelCtx, t, a.state.ProcessInfo, cfg) + a.watch(ctx, t, a.state.ProcessInfo, cfg) return nil } diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/status.go b/x-pack/elastic-agent/pkg/core/plugin/process/status.go index c335b3b6446..02c38d6b82d 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/status.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/status.go @@ -13,7 +13,6 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/proto" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" ) @@ -43,8 +42,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St _ = yaml.Unmarshal([]byte(s.Config()), &cfg) // start the failed timer - // pass process info to avoid killing new process spun up in a meantime - a.startFailedTimer(cfg, a.state.ProcessInfo) + a.startFailedTimer(cfg) } else { a.stopFailedTimer() } @@ -53,7 +51,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St // startFailedTimer starts a timer that will restart the application if it doesn't exit failed after a period of time. // // This does not grab the appLock, that must be managed by the caller. -func (a *Application) startFailedTimer(cfg map[string]interface{}, proc *process.Info) { +func (a *Application) startFailedTimer(cfg map[string]interface{}) { if a.restartCanceller != nil { // already have running failed timer; just update config a.restartConfig = cfg @@ -76,7 +74,7 @@ func (a *Application) startFailedTimer(cfg map[string]interface{}, proc *process case <-ctx.Done(): return case <-t.C: - a.restart(proc) + a.restart() } }() } @@ -93,31 +91,19 @@ func (a *Application) stopFailedTimer() { } // restart restarts the application -func (a *Application) restart(proc *process.Info) { +func (a *Application) restart() { a.appLock.Lock() defer a.appLock.Unlock() - // stop the watcher - a.stopWatcher(proc) - // kill the process - if proc != nil && proc.Process != nil { - _ = proc.Process.Kill() - } - - if proc != a.state.ProcessInfo { - // we're restarting different process than actually running - // no need to start another one - return + if a.state.ProcessInfo != nil { + _ = a.state.ProcessInfo.Process.Kill() + a.state.ProcessInfo = nil } - - a.state.ProcessInfo = nil - ctx := a.startContext tag := a.tag - a.setState(state.Restarting, "", nil) - err := a.start(ctx, tag, a.restartConfig, true) + err := a.start(ctx, tag, a.restartConfig) if err != nil { a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil) }