Skip to content

Commit

Permalink
[v2] Add upgrade action retry (#1219)
Browse files Browse the repository at this point in the history
* Add upgrade action retry

Add the ability for the agent to schedule and retry upgrade actions.

The fleetapi actions now define a ScheduledAction, and RetryableAction interface to eliminate the need for stub methods on all different action types. Action queue has been changed to function on scheduled actions. Serialization tests now ensure that that the retry attribute needed by retryable actions works.

Decouple dispatcher from gateway, dispatcher has an errors channel that will return an error for the list of actions that's sent. Gateway has an Actions method that can be used to get the list of actions from the gateway. The managed_mode config manager will link these two components

If a handler returns an error and the action is a RetryableAction, the dispatcher will attempt to schedule a retry. The dispatcher will also ack the action to fleet-server and indicate if it will be retried or has failed (or has been received normally).
For the acker, if a RetryableAction has an error and an attempt count that is greater than 0 it will be acked as retried. If it has an error and an attempt count less than 1 it will be acked as failed.

Co-authored-by: Blake Rouse <blake.rouse@elastic.co>
  • Loading branch information
michel-laterman and blakerouse authored Oct 14, 2022
1 parent 90523cc commit bfc490a
Show file tree
Hide file tree
Showing 20 changed files with 685 additions and 296 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,4 @@
- Add liveness endpoint, allow fleet-gateway component to report degraded state, add update time and messages to status output. {issue}390[390] {pull}569[569]
- Redact sensitive information on diagnostics collect command. {issue}[241] {pull}[566]
- Fix incorrectly creating a filebeat redis input when a policy contains a packetbeat redis input. {issue}[427] {pull}[700]
- Allow upgrade actions to be retried on failure with action queue scheduling. {issue}778[778] {pull}1219[1219]
12 changes: 11 additions & 1 deletion internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"

"github.com/elastic/elastic-agent/internal/pkg/fleetapi"

Expand Down Expand Up @@ -51,6 +52,9 @@ type UpgradeManager interface {

// Upgrade upgrades running agent.
Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade) (_ reexec.ShutdownCallbackFn, err error)

// Ack is used on startup to check if the agent has upgraded and needs to send an ack for the action
Ack(ctx context.Context, acker acker.Acker) error
}

// Runner provides interface to run a manager and receive running errors.
Expand Down Expand Up @@ -251,10 +255,16 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
c.state.overrideState = nil
return err
}
c.ReExec(cb)
if cb != nil {
c.ReExec(cb)
}
return nil
}

func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error {
return c.upgradeMgr.Ack(ctx, acker)
}

// PerformAction executes an action on a unit.
func (c *Coordinator) PerformAction(ctx context.Context, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) {
return c.runtimeMgr.PerformAction(ctx, unit, name, params)
Expand Down
107 changes: 91 additions & 16 deletions internal/pkg/agent/application/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
type actionHandlers map[string]actions.Handler

type priorityQueue interface {
Add(fleetapi.Action, int64)
DequeueActions() []fleetapi.Action
Add(fleetapi.ScheduledAction, int64)
DequeueActions() []fleetapi.ScheduledAction
CancelType(string) int
Save() error
}

// Dispatcher processes actions coming from fleet api.
type Dispatcher interface {
Dispatch(context.Context, acker.Acker, ...fleetapi.Action) error
Dispatch(context.Context, acker.Acker, ...fleetapi.Action)
Errors() <-chan error
}

// ActionDispatcher processes actions coming from fleet using registered set of handlers.
Expand All @@ -39,6 +41,8 @@ type ActionDispatcher struct {
handlers actionHandlers
def actions.Handler
queue priorityQueue
rt *retryConfig
errCh chan error
}

// New creates a new action dispatcher.
Expand All @@ -60,9 +64,15 @@ func New(log *logger.Logger, def actions.Handler, queue priorityQueue) (*ActionD
handlers: make(actionHandlers),
def: def,
queue: queue,
rt: defaultRetryConfig(),
errCh: make(chan error),
}, nil
}

func (ad *ActionDispatcher) Errors() <-chan error {
return ad.errCh
}

// Register registers a new handler for action.
func (ad *ActionDispatcher) Register(a fleetapi.Action, handler actions.Handler) error {
k := ad.key(a)
Expand All @@ -88,13 +98,18 @@ func (ad *ActionDispatcher) key(a fleetapi.Action) string {
}

// Dispatch dispatches an action using pre-registered set of handlers.
func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, actions ...fleetapi.Action) (err error) {
// Dispatch will handle action queue operations, and retries.
// Any action that implements the ScheduledAction interface may be added/removed from the queue based on StartTime.
// Any action that implements the RetryableAction interface will be rescheduled if the handler returns an error.
func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, actions ...fleetapi.Action) {
var err error
span, ctx := apm.StartSpan(ctx, "dispatch", "app.internal")
defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()

ad.removeQueuedUpgrades(actions)
actions = ad.queueScheduledActions(actions)
actions = ad.dispatchCancelActions(ctx, actions, acker)
queued, expired := ad.gatherQueuedActions(time.Now().UTC())
Expand All @@ -108,7 +123,7 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, act

if len(actions) == 0 {
ad.log.Debug("No action to dispatch")
return nil
return
}

ad.log.Debugf(
Expand All @@ -118,18 +133,28 @@ func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker acker.Acker, act
)

for _, action := range actions {
if err := ctx.Err(); err != nil {
return err
if err = ctx.Err(); err != nil {
ad.errCh <- err
return
}

if err := ad.dispatchAction(ctx, action, acker); err != nil {
rAction, ok := action.(fleetapi.RetryableAction)
if ok {
rAction.SetError(err) // set the retryable action error to what the dispatcher returned
ad.scheduleRetry(ctx, rAction, acker)
continue
}
ad.log.Debugf("Failed to dispatch action '%+v', error: %+v", action, err)
return err
ad.errCh <- err
continue
}
ad.log.Debugf("Successfully dispatched action: '%+v'", action)
}

return acker.Commit(ctx)
if err = acker.Commit(ctx); err != nil {
ad.errCh <- err
}
}

func (ad *ActionDispatcher) dispatchAction(ctx context.Context, a fleetapi.Action, acker acker.Acker) error {
Expand All @@ -154,15 +179,18 @@ func detectTypes(actions []fleetapi.Action) []string {
func (ad *ActionDispatcher) queueScheduledActions(input []fleetapi.Action) []fleetapi.Action {
actions := make([]fleetapi.Action, 0, len(input))
for _, action := range input {
start, err := action.StartTime()
if err == nil {
ad.log.Debugf("Adding action id: %s to queue.", action.ID())
ad.queue.Add(action, start.Unix())
sAction, ok := action.(fleetapi.ScheduledAction)
if ok {
start, err := sAction.StartTime()
if err != nil {
ad.log.Warnf("Skipping addition to action-queue, issue gathering start time from action id %s: %v", sAction.ID(), err)
actions = append(actions, action)
continue
}
ad.log.Debugf("Adding action id: %s to queue.", sAction.ID())
ad.queue.Add(sAction, start.Unix())
continue
}
if !errors.Is(err, fleetapi.ErrNoStartTime) {
ad.log.Warnf("Issue gathering start time from action id %s: %v", action.ID(), err)
}
actions = append(actions, action)
}
return actions
Expand Down Expand Up @@ -197,3 +225,50 @@ func (ad *ActionDispatcher) gatherQueuedActions(ts time.Time) (queued, expired [
}
return queued, expired
}

// removeQueuedUpgrades will scan the passed actions and if there is an upgrade action it will remove all upgrade actions in the queue but not alter the passed list.
// this is done to try to only have the most recent upgrade action executed. However it does not eliminate duplicates in retrieved directly from the gateway
func (ad *ActionDispatcher) removeQueuedUpgrades(actions []fleetapi.Action) {
for _, action := range actions {
if action.Type() == fleetapi.ActionTypeUpgrade {
if n := ad.queue.CancelType(fleetapi.ActionTypeUpgrade); n > 0 {
ad.log.Debugw("New upgrade action retrieved from gateway, removing queued upgrade actions", "actions_found", n)
}
return
}
}
}

func (ad *ActionDispatcher) scheduleRetry(ctx context.Context, action fleetapi.RetryableAction, acker acker.Acker) {
attempt := action.RetryAttempt()
d, err := ad.rt.GetWait(attempt)
if err != nil {
ad.log.Errorf("No more reties for action id %s: %v", action.ID(), err)
action.SetRetryAttempt(-1)
if err := acker.Ack(ctx, action); err != nil {
ad.log.Errorf("Unable to ack action failure (id %s) to fleet-server: %v", action.ID(), err)
return
}
if err := acker.Commit(ctx); err != nil {
ad.log.Errorf("Unable to commit action failure (id %s) to fleet-server: %v", action.ID(), err)
}
return
}
attempt = attempt + 1
startTime := time.Now().UTC().Add(d)
action.SetRetryAttempt(attempt)
action.SetStartTime(startTime)
ad.log.Debugf("Adding action id: %s to queue.", action.ID())
ad.queue.Add(action, startTime.Unix())
err = ad.queue.Save()
if err != nil {
ad.log.Errorf("retry action id %s attempt %d failed to persist action_queue: %v", action.ID(), attempt, err)
}
if err := acker.Ack(ctx, action); err != nil {
ad.log.Errorf("Unable to ack action retry (id %s) to fleet-server: %v", action.ID(), err)
return
}
if err := acker.Commit(ctx); err != nil {
ad.log.Errorf("Unable to commit action retry (id %s) to fleet-server: %v", action.ID(), err)
}
}
Loading

0 comments on commit bfc490a

Please sign in to comment.