Skip to content

Commit

Permalink
Revert "Fix startup with failing configuration (elastic#26126)"
Browse files Browse the repository at this point in the history
This reverts commit 5a294a4.
  • Loading branch information
michalpristas committed Jun 21, 2021
1 parent ee3adfe commit b895e44
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 117 deletions.
1 change: 0 additions & 1 deletion x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
- Handle case where policy doesn't contain Fleet connection information {pull}25707[25707]
- Fix fleet-server.yml spec to not overwrite existing keys {pull}25741[25741]
- Agent sends wrong log level to Endpoint {issue}25583[25583]
- Fix startup with failing configuration {pull}26057[26057]
- Change timestamp in elatic-agent-json.log to use UTC {issue}25391[25391]

==== New features
Expand Down
9 changes: 1 addition & 8 deletions x-pack/elastic-agent/pkg/agent/operation/operation_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)

Expand Down Expand Up @@ -54,13 +53,7 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error)
func (o *operationConfig) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
// application failed to apply config but is running.
s := state.Degraded
if errors.Is(err, process.ErrAppNotRunning) {
s = state.Failed
}

application.SetState(s, err.Error(), nil)
application.SetState(state.Failed, err.Error(), nil)
}
}()
return application.Configure(ctx, o.cfg)
Expand Down
64 changes: 6 additions & 58 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
var (
// ErrAppNotRunning is returned when configuration is performed on not running application.
ErrAppNotRunning = errors.New("application is not running", errors.TypeApplication)
procExitTimeout = 10 * time.Second
)

// Application encapsulates a concrete application ran by elastic-agent e.g Beat.
Expand All @@ -48,7 +47,6 @@ type Application struct {
tag app.Taggable
state state.State
reporter state.Reporter
watchClosers map[int]context.CancelFunc

uid int
gid int
Expand Down Expand Up @@ -107,7 +105,6 @@ func NewApplication(
uid: uid,
gid: gid,
statusReporter: statusController.RegisterApp(id, appName),
watchClosers: make(map[int]context.CancelFunc),
}, nil
}

Expand Down Expand Up @@ -162,8 +159,6 @@ func (a *Application) Stop() {

a.srvState = nil
if a.state.ProcessInfo != nil {
// stop and clean watcher
a.stopWatcher(a.state.ProcessInfo)
if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil {
// no error on signal, so wait for it to stop
_, _ = a.state.ProcessInfo.Process.Wait()
Expand Down Expand Up @@ -197,52 +192,33 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I
case ps := <-a.waitProc(proc.Process):
procState = ps
case <-a.bgContext.Done():
return
case <-ctx.Done():
// closer called
a.Stop()
return
}

a.appLock.Lock()
defer a.appLock.Unlock()
if a.state.ProcessInfo != proc {
// already another process started, another watcher is watching instead
gracefulKill(proc)
return
}

// stop the watcher
a.stopWatcher(a.state.ProcessInfo)

// was already stopped by Stop, do not restart
if a.state.Status == state.Stopped {
a.appLock.Unlock()
return
}

a.state.ProcessInfo = nil
srvState := a.srvState

if srvState == nil || srvState.Expected() == proto.StateExpected_STOPPING {
a.appLock.Unlock()
return
}

msg := fmt.Sprintf("exited with code: %d", procState.ExitCode())
a.setState(state.Restarting, msg, nil)
a.setState(state.Crashed, msg, nil)

// it was a crash
a.start(ctx, p, cfg, true)
a.start(ctx, p, cfg)
a.appLock.Unlock()
}()
}

func (a *Application) stopWatcher(procInfo *process.Info) {
if procInfo != nil {
if closer, ok := a.watchClosers[procInfo.PID]; ok {
closer()
delete(a.watchClosers, procInfo.PID)
}
}
}

func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState {
resChan := make(chan *os.ProcessState)

Expand Down Expand Up @@ -274,31 +250,3 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in
func (a *Application) cleanUp() {
a.monitor.Cleanup(a.desc.Spec(), a.pipelineID)
}

func gracefulKill(proc *process.Info) {
if proc == nil || proc.Process == nil {
return
}

// send stop signal to request stop
proc.Stop()

var wg sync.WaitGroup
doneChan := make(chan struct{})
wg.Add(1)
go func() {
wg.Done()
_, _ = proc.Process.Wait()
close(doneChan)
}()

// wait for awaiter
wg.Wait()

// kill in case it's still running after timeout
select {
case <-doneChan:
case <-time.After(procExitTimeout):
_ = proc.Process.Kill()
}
}
35 changes: 7 additions & 28 deletions x-pack/elastic-agent/pkg/core/plugin/process/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,20 @@ func (a *Application) Start(ctx context.Context, t app.Taggable, cfg map[string]
a.appLock.Lock()
defer a.appLock.Unlock()

return a.start(ctx, t, cfg, false)
return a.start(ctx, t, cfg)
}

// Start starts the application without grabbing the lock.
func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}, isRestart bool) (err error) {
func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}) (err error) {
defer func() {
if err != nil {
// inject App metadata
err = errors.New(err, errors.M(errors.MetaKeyAppName, a.name), errors.M(errors.MetaKeyAppName, a.id))
}
}()

// starting only if it's not running
// or if it is, then only in case it's restart and this call initiates from restart call
if a.Started() && a.state.Status != state.Restarting {
if a.state.ProcessInfo == nil {
// already started if not stopped or crashed
return nil
}

// in case app reported status it might still be running and failure timer
// in progress. Stop timer and stop failing process
a.stopFailedTimer()
a.stopWatcher(a.state.ProcessInfo)

// kill the process
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
}

if a.state.Status == state.Restarting && !isRestart {
// already started if not stopped or crashed
if a.Started() {
return nil
}

Expand Down Expand Up @@ -86,8 +69,7 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]
if a.state.Status != state.Stopped {
// restarting as it was previously in a different state
a.setState(state.Restarting, "Restarting", nil)
} else if a.state.Status != state.Restarting {
// keep restarting state otherwise it's starting
} else {
a.setState(state.Starting, "Starting", nil)
}

Expand Down Expand Up @@ -134,15 +116,12 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]
if err != nil {
return err
}

// write connect info to stdin
go a.writeToStdin(a.srvState, a.state.ProcessInfo.Stdin)

// create closer for watcher, used to terminate watcher without
// side effect of restarting process during shutdown
cancelCtx, cancel := context.WithCancel(ctx)
a.watchClosers[a.state.ProcessInfo.PID] = cancel
// setup watcher
a.watch(cancelCtx, t, a.state.ProcessInfo, cfg)
a.watch(ctx, t, a.state.ProcessInfo, cfg)

return nil
}
Expand Down
30 changes: 8 additions & 22 deletions x-pack/elastic-agent/pkg/core/plugin/process/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)
Expand Down Expand Up @@ -43,8 +42,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
_ = yaml.Unmarshal([]byte(s.Config()), &cfg)

// start the failed timer
// pass process info to avoid killing new process spun up in a meantime
a.startFailedTimer(cfg, a.state.ProcessInfo)
a.startFailedTimer(cfg)
} else {
a.stopFailedTimer()
}
Expand All @@ -53,7 +51,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
// startFailedTimer starts a timer that will restart the application if it doesn't exit failed after a period of time.
//
// This does not grab the appLock, that must be managed by the caller.
func (a *Application) startFailedTimer(cfg map[string]interface{}, proc *process.Info) {
func (a *Application) startFailedTimer(cfg map[string]interface{}) {
if a.restartCanceller != nil {
// already have running failed timer; just update config
a.restartConfig = cfg
Expand All @@ -76,7 +74,7 @@ func (a *Application) startFailedTimer(cfg map[string]interface{}, proc *process
case <-ctx.Done():
return
case <-t.C:
a.restart(proc)
a.restart()
}
}()
}
Expand All @@ -93,31 +91,19 @@ func (a *Application) stopFailedTimer() {
}

// restart restarts the application
func (a *Application) restart(proc *process.Info) {
func (a *Application) restart() {
a.appLock.Lock()
defer a.appLock.Unlock()

// stop the watcher
a.stopWatcher(proc)

// kill the process
if proc != nil && proc.Process != nil {
_ = proc.Process.Kill()
}

if proc != a.state.ProcessInfo {
// we're restarting different process than actually running
// no need to start another one
return
if a.state.ProcessInfo != nil {
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
}

a.state.ProcessInfo = nil

ctx := a.startContext
tag := a.tag

a.setState(state.Restarting, "", nil)
err := a.start(ctx, tag, a.restartConfig, true)
err := a.start(ctx, tag, a.restartConfig)
if err != nil {
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
}
Expand Down

0 comments on commit b895e44

Please sign in to comment.