Skip to content

Commit

Permalink
Fix startup with failing configuration (#26126)
Browse files Browse the repository at this point in the history
Fix startup with failing configuration (#26126)
  • Loading branch information
michalpristas committed Jun 8, 2021
1 parent f71a0d2 commit 5a294a4
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 22 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
- Handle case where policy doesn't contain Fleet connection information {pull}25707[25707]
- Fix fleet-server.yml spec to not overwrite existing keys {pull}25741[25741]
- Agent sends wrong log level to Endpoint {issue}25583[25583]
- Fix startup with failing configuration {pull}26057[26057]
- Change timestamp in elatic-agent-json.log to use UTC {issue}25391[25391]

==== New features
Expand Down
9 changes: 8 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/operation_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)

Expand Down Expand Up @@ -53,7 +54,13 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error)
func (o *operationConfig) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
application.SetState(state.Failed, err.Error(), nil)
// application failed to apply config but is running.
s := state.Degraded
if errors.Is(err, process.ErrAppNotRunning) {
s = state.Failed
}

application.SetState(s, err.Error(), nil)
}
}()
return application.Configure(ctx, o.cfg)
Expand Down
64 changes: 58 additions & 6 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
var (
// ErrAppNotRunning is returned when configuration is performed on not running application.
ErrAppNotRunning = errors.New("application is not running", errors.TypeApplication)
procExitTimeout = 10 * time.Second
)

// Application encapsulates a concrete application ran by elastic-agent e.g Beat.
Expand All @@ -47,6 +48,7 @@ type Application struct {
tag app.Taggable
state state.State
reporter state.Reporter
watchClosers map[int]context.CancelFunc

uid int
gid int
Expand Down Expand Up @@ -105,6 +107,7 @@ func NewApplication(
uid: uid,
gid: gid,
statusReporter: statusController.RegisterApp(id, appName),
watchClosers: make(map[int]context.CancelFunc),
}, nil
}

Expand Down Expand Up @@ -159,6 +162,8 @@ func (a *Application) Stop() {

a.srvState = nil
if a.state.ProcessInfo != nil {
// stop and clean watcher
a.stopWatcher(a.state.ProcessInfo)
if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil {
// no error on signal, so wait for it to stop
_, _ = a.state.ProcessInfo.Process.Wait()
Expand Down Expand Up @@ -192,33 +197,52 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I
case ps := <-a.waitProc(proc.Process):
procState = ps
case <-a.bgContext.Done():
a.Stop()
return
case <-ctx.Done():
// closer called
return
}

a.appLock.Lock()
defer a.appLock.Unlock()
if a.state.ProcessInfo != proc {
// already another process started, another watcher is watching instead
a.appLock.Unlock()
gracefulKill(proc)
return
}

// stop the watcher
a.stopWatcher(a.state.ProcessInfo)

// was already stopped by Stop, do not restart
if a.state.Status == state.Stopped {
return
}

a.state.ProcessInfo = nil
srvState := a.srvState

if srvState == nil || srvState.Expected() == proto.StateExpected_STOPPING {
a.appLock.Unlock()
return
}

msg := fmt.Sprintf("exited with code: %d", procState.ExitCode())
a.setState(state.Crashed, msg, nil)
a.setState(state.Restarting, msg, nil)

// it was a crash
a.start(ctx, p, cfg)
a.appLock.Unlock()
a.start(ctx, p, cfg, true)
}()
}

func (a *Application) stopWatcher(procInfo *process.Info) {
if procInfo != nil {
if closer, ok := a.watchClosers[procInfo.PID]; ok {
closer()
delete(a.watchClosers, procInfo.PID)
}
}
}

func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState {
resChan := make(chan *os.ProcessState)

Expand Down Expand Up @@ -250,3 +274,31 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in
func (a *Application) cleanUp() {
a.monitor.Cleanup(a.desc.Spec(), a.pipelineID)
}

func gracefulKill(proc *process.Info) {
if proc == nil || proc.Process == nil {
return
}

// send stop signal to request stop
proc.Stop()

var wg sync.WaitGroup
doneChan := make(chan struct{})
wg.Add(1)
go func() {
wg.Done()
_, _ = proc.Process.Wait()
close(doneChan)
}()

// wait for awaiter
wg.Wait()

// kill in case it's still running after timeout
select {
case <-doneChan:
case <-time.After(procExitTimeout):
_ = proc.Process.Kill()
}
}
35 changes: 28 additions & 7 deletions x-pack/elastic-agent/pkg/core/plugin/process/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,37 @@ func (a *Application) Start(ctx context.Context, t app.Taggable, cfg map[string]
a.appLock.Lock()
defer a.appLock.Unlock()

return a.start(ctx, t, cfg)
return a.start(ctx, t, cfg, false)
}

// Start starts the application without grabbing the lock.
func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}) (err error) {
func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}, isRestart bool) (err error) {
defer func() {
if err != nil {
// inject App metadata
err = errors.New(err, errors.M(errors.MetaKeyAppName, a.name), errors.M(errors.MetaKeyAppName, a.id))
}
}()

// already started if not stopped or crashed
if a.Started() {
// starting only if it's not running
// or if it is, then only in case it's restart and this call initiates from restart call
if a.Started() && a.state.Status != state.Restarting {
if a.state.ProcessInfo == nil {
// already started if not stopped or crashed
return nil
}

// in case app reported status it might still be running and failure timer
// in progress. Stop timer and stop failing process
a.stopFailedTimer()
a.stopWatcher(a.state.ProcessInfo)

// kill the process
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
}

if a.state.Status == state.Restarting && !isRestart {
return nil
}

Expand Down Expand Up @@ -69,7 +86,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]
if a.state.Status != state.Stopped {
// restarting as it was previously in a different state
a.setState(state.Restarting, "Restarting", nil)
} else {
} else if a.state.Status != state.Restarting {
// keep restarting state otherwise it's starting
a.setState(state.Starting, "Starting", nil)
}

Expand Down Expand Up @@ -116,12 +134,15 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]
if err != nil {
return err
}

// write connect info to stdin
go a.writeToStdin(a.srvState, a.state.ProcessInfo.Stdin)

// create closer for watcher, used to terminate watcher without
// side effect of restarting process during shutdown
cancelCtx, cancel := context.WithCancel(ctx)
a.watchClosers[a.state.ProcessInfo.PID] = cancel
// setup watcher
a.watch(ctx, t, a.state.ProcessInfo, cfg)
a.watch(cancelCtx, t, a.state.ProcessInfo, cfg)

return nil
}
Expand Down
30 changes: 22 additions & 8 deletions x-pack/elastic-agent/pkg/core/plugin/process/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)
Expand Down Expand Up @@ -42,7 +43,8 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
_ = yaml.Unmarshal([]byte(s.Config()), &cfg)

// start the failed timer
a.startFailedTimer(cfg)
// pass process info to avoid killing new process spun up in a meantime
a.startFailedTimer(cfg, a.state.ProcessInfo)
} else {
a.stopFailedTimer()
}
Expand All @@ -51,7 +53,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
// startFailedTimer starts a timer that will restart the application if it doesn't exit failed after a period of time.
//
// This does not grab the appLock, that must be managed by the caller.
func (a *Application) startFailedTimer(cfg map[string]interface{}) {
func (a *Application) startFailedTimer(cfg map[string]interface{}, proc *process.Info) {
if a.restartCanceller != nil {
// already have running failed timer; just update config
a.restartConfig = cfg
Expand All @@ -74,7 +76,7 @@ func (a *Application) startFailedTimer(cfg map[string]interface{}) {
case <-ctx.Done():
return
case <-t.C:
a.restart()
a.restart(proc)
}
}()
}
Expand All @@ -91,19 +93,31 @@ func (a *Application) stopFailedTimer() {
}

// restart restarts the application
func (a *Application) restart() {
func (a *Application) restart(proc *process.Info) {
a.appLock.Lock()
defer a.appLock.Unlock()

// stop the watcher
a.stopWatcher(proc)

// kill the process
if a.state.ProcessInfo != nil {
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
if proc != nil && proc.Process != nil {
_ = proc.Process.Kill()
}

if proc != a.state.ProcessInfo {
// we're restarting different process than actually running
// no need to start another one
return
}

a.state.ProcessInfo = nil

ctx := a.startContext
tag := a.tag

err := a.start(ctx, tag, a.restartConfig)
a.setState(state.Restarting, "", nil)
err := a.start(ctx, tag, a.restartConfig, true)
if err != nil {
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
}
Expand Down

0 comments on commit 5a294a4

Please sign in to comment.