Skip to content

Commit

Permalink
Allow upgrade action to signal retry (#1887)
Browse files Browse the repository at this point in the history
* Allow upgrade action to signal retry

Allow the ack of an upgrade action to set the upgrade status to
retrying.

* fix tests set failed state

* Fix broken test

* nil upgrade status by default

* Set agent to healthy in case of upgrade failure

* fix upgrade fields

* Fix tests
  • Loading branch information
michel-laterman authored Oct 24, 2022
1 parent f708740 commit 17d3681
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@
- Fleet Server now allows setting transaction sample rate on APM instrumentation {pull}1681[1681]
- Log redacted config when config updates. {issue}1626[1626] {pull}1668[1668]
- Storing checkin message in last_checkin_message {pull}1932[1932]
- Allow upgrade actions to signal that they will be retried. {pull}1887[1887]
53 changes: 39 additions & 14 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,17 @@ func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent
// The unenroll and upgrade acks might overwrite it later
setResult(n, http.StatusOK)

if ev.Error == "" {
if action.Type == TypeUnenroll {
unenrollIdxs = append(unenrollIdxs, n)
} else if action.Type == TypeUpgrade {
if err := ack.handleUpgrade(ctx, zlog, agent); err != nil {
setError(n, err)
log.Error().Err(err).Msg("handle upgrade event")
continue
}
if action.Type == TypeUpgrade {
if err := ack.handleUpgrade(ctx, zlog, agent, ev); err != nil {
setError(n, err)
log.Error().Err(err).Msg("handle upgrade event")
continue
}
}

if ev.Error == "" && action.Type == TypeUnenroll {
unenrollIdxs = append(unenrollIdxs, n)
}
}

// Process policy acks
Expand Down Expand Up @@ -503,12 +503,37 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent
return nil
}

func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {

func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, event Event) error {
now := time.Now().UTC().Format(time.RFC3339)
doc := bulk.UpdateFields{
dl.FieldUpgradeStartedAt: nil,
dl.FieldUpgradedAt: now,
doc := bulk.UpdateFields{}
if event.Error != "" {
// unmarshal event payload
var pl struct {
Retry bool `json:"retry"`
Attempt int `json:"retry_attempt"`
}
err := json.Unmarshal(event.Payload, &pl)
if err != nil {
zlog.Error().Err(err).Msg("unable to unmarshal upgrade event payload")
}

// if the payload indicates a retry, mark change the upgrade status to retrying.
if pl.Retry {
zlog.Info().Int("retry_attempt", pl.Attempt).Msg("marking agent upgrade as retrying")
doc[dl.FieldUpgradeStatus] = "retrying" // Keep FieldUpgradeStatedAt abd FieldUpgradeded at to original values
} else {
zlog.Info().Int("retry_attempt", pl.Attempt).Msg("marking agent upgrade as failed, agent logs contain failure message")
doc = bulk.UpdateFields{
dl.FieldUpgradeStartedAt: nil,
dl.FieldUpgradeStatus: "failed",
}
}
} else {
doc = bulk.UpdateFields{
dl.FieldUpgradeStartedAt: nil,
dl.FieldUpgradeStatus: nil,
dl.FieldUpgradedAt: now,
}
}

body, err := doc.Marshal()
Expand Down
136 changes: 136 additions & 0 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,61 @@ func TestHandleAckEvents(t *testing.T) {
},
err: &HTTPError{Status: http.StatusNotFound},
},
{
name: "upgrade action failed",
events: []Event{
{
ActionID: "ab12dcd8-bde0-4045-92dc-c4b27668d73a",
Type: "UPGRADE",
Error: "Error with no payload",
},
},
res: newAckResponse(false, []AckResponseItem{
{
Status: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
}),
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.MatchedBy(matchAction(t, "ab12dcd8-bde0-4045-92dc-c4b27668d73a")), mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{{
Source: []byte(`{"action_id":"ab12dcd8-bde0-4045-92dc-c4b27668d73a","type":"UPGRADE"}`),
}},
}}, nil).Once()
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", nil).Once()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
},
{
name: "upgrade action retrying",
events: []Event{
{
ActionID: "ab12dcd8-bde0-4045-92dc-c4b27668d73a",
Type: "UPGRADE",
Error: "Error with payload",
Payload: json.RawMessage(`{"retry":true,"retry_attempt":1}`),
},
},
res: newAckResponse(false, []AckResponseItem{
{
Status: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
}),
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.MatchedBy(matchAction(t, "ab12dcd8-bde0-4045-92dc-c4b27668d73a")), mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{{
Source: []byte(`{"action_id":"ab12dcd8-bde0-4045-92dc-c4b27668d73a","type":"UPGRADE"}`),
}},
}}, nil).Once()
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", nil).Once()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -492,3 +547,84 @@ func TestInvalidateAPIKeys(t *testing.T) {
bulker.AssertExpectations(t)
}
}

func TestAckHandleUpgrade(t *testing.T) {
tests := []struct {
name string
event Event
bulker func(t *testing.T) *ftesting.MockBulk
}{{
name: "ok",
event: Event{},
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
}, {
name: "retry signaled",
event: Event{
Error: "upgrade error",
Payload: json.RawMessage(`{"retry":true,"retry_attempt":1}`),
},
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.MatchedBy(func(p []byte) bool {
var body struct {
Doc struct {
Status string `json:"upgrade_status"`
} `json:"doc"`
}
if err := json.Unmarshal(p, &body); err != nil {
t.Fatal(err)
}
return body.Doc.Status == "retrying"
}), mock.Anything).Return(nil).Once()
return m
},
}, {
name: "no more retries",
event: Event{
Error: "upgrade error",
Payload: json.RawMessage(`{"retry":false}`),
},
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.MatchedBy(func(p []byte) bool {
var body struct {
Doc struct {
Status string `json:"upgrade_status"`
} `json:"doc"`
}
if err := json.Unmarshal(p, &body); err != nil {
t.Fatal(err)
}
return body.Doc.Status == "failed"
}), mock.Anything).Return(nil).Once()
return m
},
}}
cfg := &config.Server{
Limits: config.ServerLimits{},
}
agent := &model.Agent{
ESDocument: model.ESDocument{Id: "ab12dcd8-bde0-4045-92dc-c4b27668d735"},
Agent: &model.AgentMetadata{Version: "8.0.0"},
}
ctx := context.Background()
cache, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
if err != nil {
t.Fatal(err)
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
logger := testlog.SetLogger(t)
bulker := tc.bulker(t)
ack := NewAckT(cfg, bulker, cache)

err := ack.handleUpgrade(ctx, logger, agent, tc.event)
assert.NoError(t, err)
bulker.AssertExpectations(t)
})
}
}

0 comments on commit 17d3681

Please sign in to comment.