Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay the restart of application when a status report of failure is given #25339

Merged
merged 5 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
- Fix issue with status and inspect inside of container {pull}25204[25204]
- Remove FLEET_SERVER_POLICY_NAME env variable as it was not used {pull}25149[25149]
- Reduce log level for listener cleanup to debug {pull}25274
- Delay the restart of application when a status report of failure is given {pull}25339[25339]

==== New features

Expand Down
7 changes: 0 additions & 7 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,6 @@ func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessStat
}
resChan <- waitResult{enrollmentToken: token}
break
} else if app.Status == proto.Status_FAILED {
// app completely failed; exit now
if app.Message != "" {
log.Infof("Fleet Server - %s", app.Message)
}
resChan <- waitResult{err: errors.New(app.Message)}
break
}
if app.Message != "" {
appMsg := fmt.Sprintf("Fleet Server - %s", app.Message)
Expand Down
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ type Application struct {

logger *logger.Logger

appLock sync.Mutex
appLock sync.Mutex
restartCanceller context.CancelFunc
restartConfig map[string]interface{}
}

// ArgsDecorator decorates arguments before calling an application
Expand Down
82 changes: 71 additions & 11 deletions x-pack/elastic-agent/pkg/core/plugin/process/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package process

import (
"context"
"fmt"
"time"

"gopkg.in/yaml.v2"

Expand All @@ -15,6 +17,11 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)

const (
// FailedRestartTimeout is the amount of time an Application can sit in Failed status before it is restarted.
FailedRestartTimeout = 10 * time.Second
)

// OnStatusChange is the handler called by the GRPC server code.
//
// It updates the status of the application and handles restarting the application if needed.
Expand All @@ -35,21 +42,74 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
return
}

// kill the process
if a.state.ProcessInfo != nil {
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
}
ctx := a.startContext
tag := a.tag

// it was marshalled to pass into the state, so unmarshall will always succeed
var cfg map[string]interface{}
_ = yaml.Unmarshal([]byte(s.Config()), &cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR but I don't think we should swallow the errors here.


err := a.start(ctx, tag, cfg)
if err != nil {
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
// start the failed timer
a.startFailedTimer(cfg)
} else {
a.stopFailedTimer()
}
}

// startFailedTimer starts a timer that will restart the application if it doesn't exit failed after a period of time.
//
// This does not grab the appLock, that must be managed by the caller.
func (a *Application) startFailedTimer(cfg map[string]interface{}) {
if a.restartCanceller != nil {
// already have running failed timer; just update config
a.restartConfig = cfg
return
}

ctx, cancel := context.WithCancel(a.startContext)
a.restartCanceller = cancel
a.restartConfig = cfg
t := time.NewTimer(FailedRestartTimeout)
go func() {
defer func() {
a.appLock.Lock()
a.restartCanceller = nil
a.restartConfig = nil
a.appLock.Unlock()
}()

select {
case <-ctx.Done():
return
case <-t.C:
a.restart(a.restartConfig)
}
}()
}

// stopFailedTimer stops the timer that would restart the application from reporting failure.
//
// This does not grab the appLock, that must be managed by the caller.
func (a *Application) stopFailedTimer() {
if a.restartCanceller == nil {
return
}
a.restartCanceller()
a.restartCanceller = nil
}

// restart restarts the application
func (a *Application) restart(cfg map[string]interface{}) {
a.appLock.Lock()
defer a.appLock.Unlock()

// kill the process
if a.state.ProcessInfo != nil {
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
}
ctx := a.startContext
tag := a.tag

err := a.start(ctx, tag, cfg)
if err != nil {
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add here a bit more info which process (name?) failed to restart?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is there, that is managed inside of the setState.

}
}