Skip to content

Commit 557ffd0

Browse files
Handle when agent checks in with revision idx that is too high
Handle the scenario when the agent checks in with a revision_idx value that is greater than the latest available policy in ES. Add E2E tests when using policy_id and revision_idx values in checkin.
1 parent 99f1c89 commit 557ffd0

File tree

7 files changed

+509
-22
lines changed

7 files changed

+509
-22
lines changed

internal/pkg/api/handleCheckin.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,7 +1144,7 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu
11441144
}
11451145

11461146
// processPolicyDetails handles the agent_policy_id and revision_idx included in the checkin request.
1147-
// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is greater than what the last checkin reported.
1147+
// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is different than what the last checkin reported.
11481148
// It returns the revision idx that should be used when subscribing for new POLICY_CHANGE actons and optional args to use when doing the non-tick checkin.
11491149
func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) (int64, []checkin.Option, error) {
11501150
// no details specified
@@ -1162,7 +1162,7 @@ func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logge
11621162

11631163
// update agent doc if policy id or revision idx does not match
11641164
var opts []checkin.Option
1165-
if policyID != agent.PolicyID || revisionIDX != agent.PolicyRevisionIdx {
1165+
if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx {
11661166
opts = []checkin.Option{
11671167
checkin.WithAgentPolicyID(policyID),
11681168
checkin.WithPolicyRevisionIDX(revisionIDX),
@@ -1174,8 +1174,14 @@ func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logge
11741174
return 0, opts, nil
11751175
}
11761176

1177-
// Update API keys if the policy has changed, or if the revision increments.
1178-
if policyID != agent.AgentPolicyID || revisionIDX > agent.PolicyRevisionIdx {
1177+
// Check if the checkin revision_idx is greater than the latest available
1178+
latestRev := ct.pm.LatestRev(ctx, agent.PolicyID)
1179+
if latestRev != 0 && revisionIDX > latestRev {
1180+
return 0, opts, nil
1181+
}
1182+
1183+
// Update API keys if the policy has changed, or if the revision differs.
1184+
if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx {
11791185
for outputName, output := range agent.Outputs {
11801186
if output.Type != policy.OutputTypeElasticsearch {
11811187
continue

internal/pkg/api/handleCheckin_test.go

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
2626
"github.com/elastic/fleet-server/v7/internal/pkg/es"
2727
"github.com/elastic/fleet-server/v7/internal/pkg/model"
28-
mockmonitor "github.com/elastic/fleet-server/v7/internal/pkg/monitor/mock"
2928
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
3029
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
3130
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
@@ -39,6 +38,30 @@ import (
3938
"github.com/stretchr/testify/require"
4039
)
4140

41+
type mockPolicyMonitor struct {
42+
mock.Mock
43+
}
44+
45+
func (m *mockPolicyMonitor) Run(ctx context.Context) error {
46+
args := m.Called(ctx)
47+
return args.Error(0)
48+
}
49+
50+
func (m *mockPolicyMonitor) Subscribe(agentID, policyID string, revIDX int64) (policy.Subscription, error) {
51+
args := m.Called(agentID, policyID, revIDX)
52+
return args.Get(0).(policy.Subscription), args.Error(1)
53+
}
54+
55+
func (m *mockPolicyMonitor) Unsubscribe(sub policy.Subscription) error {
56+
args := m.Called(sub)
57+
return args.Error(0)
58+
}
59+
60+
func (m *mockPolicyMonitor) LatestRev(ctx context.Context, id string) int64 {
61+
args := m.Called(ctx, id)
62+
return args.Get(0).(int64)
63+
}
64+
4265
func TestConvertActionData(t *testing.T) {
4366
tests := []struct {
4467
name string
@@ -339,14 +362,13 @@ func TestResolveSeqNo(t *testing.T) {
339362
cfg := &config.Server{}
340363
c, _ := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
341364
bc := checkin.NewBulk(nil)
342-
bulker := ftesting.NewMockBulk()
343-
pim := mockmonitor.NewMockMonitor()
344-
pm := policy.NewMonitor(bulker, pim, config.ServerLimits{PolicyLimit: config.Limit{Interval: 5 * time.Millisecond, Burst: 1}})
365+
pm := &mockPolicyMonitor{}
345366
ct, err := NewCheckinT(verCon, cfg, c, bc, pm, nil, nil, nil)
346367
assert.NoError(t, err)
347368

348369
resp, _ := ct.resolveSeqNo(ctx, logger, tc.req, tc.agent)
349370
assert.Equal(t, tc.resp, resp)
371+
pm.AssertExpectations(t)
350372
})
351373
}
352374

@@ -1123,18 +1145,22 @@ func TestProcessPolicyDetails(t *testing.T) {
11231145
policyID := "policy-id"
11241146
revIDX2 := int64(2)
11251147
tests := []struct {
1126-
name string
1127-
agent *model.Agent
1128-
req *CheckinRequest
1129-
revIDX int64
1130-
returnsOps bool
1131-
err error
1148+
name string
1149+
agent *model.Agent
1150+
req *CheckinRequest
1151+
getPolicyMonitor func() *mockPolicyMonitor
1152+
revIDX int64
1153+
returnsOps bool
1154+
err error
11321155
}{{
11331156
name: "request has no policy details",
11341157
agent: &model.Agent{
11351158
PolicyRevisionIdx: 1,
11361159
},
1137-
req: &CheckinRequest{},
1160+
req: &CheckinRequest{},
1161+
getPolicyMonitor: func() *mockPolicyMonitor {
1162+
return &mockPolicyMonitor{}
1163+
},
11381164
revIDX: 1,
11391165
returnsOps: false,
11401166
err: nil,
@@ -1152,8 +1178,11 @@ func TestProcessPolicyDetails(t *testing.T) {
11521178
AgentPolicyId: &policyID,
11531179
PolicyRevisionIdx: &revIDX2,
11541180
},
1181+
getPolicyMonitor: func() *mockPolicyMonitor {
1182+
return &mockPolicyMonitor{}
1183+
},
11551184
revIDX: 0,
1156-
returnsOps: true,
1185+
returnsOps: false,
11571186
err: nil,
11581187
}, {
11591188
name: "revision updated",
@@ -1169,9 +1198,36 @@ func TestProcessPolicyDetails(t *testing.T) {
11691198
AgentPolicyId: &policyID,
11701199
PolicyRevisionIdx: &revIDX2,
11711200
},
1201+
getPolicyMonitor: func() *mockPolicyMonitor {
1202+
pm := &mockPolicyMonitor{}
1203+
pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once()
1204+
return pm
1205+
},
11721206
revIDX: 2,
11731207
returnsOps: true,
11741208
err: nil,
1209+
}, {
1210+
name: "checkin revision is greater than the policy's latest revision",
1211+
agent: &model.Agent{
1212+
Agent: &model.AgentMetadata{
1213+
ID: "agent-id",
1214+
},
1215+
PolicyID: policyID,
1216+
AgentPolicyID: policyID,
1217+
PolicyRevisionIdx: 1,
1218+
},
1219+
req: &CheckinRequest{
1220+
AgentPolicyId: &policyID,
1221+
PolicyRevisionIdx: &revIDX2,
1222+
},
1223+
getPolicyMonitor: func() *mockPolicyMonitor {
1224+
pm := &mockPolicyMonitor{}
1225+
pm.On("LatestRev", mock.Anything, policyID).Return(int64(1)).Once()
1226+
return pm
1227+
},
1228+
revIDX: 0,
1229+
returnsOps: true,
1230+
err: nil,
11751231
}, {
11761232
name: "agent_policy_id has changed",
11771233
agent: &model.Agent{
@@ -1186,6 +1242,11 @@ func TestProcessPolicyDetails(t *testing.T) {
11861242
AgentPolicyId: &policyID,
11871243
PolicyRevisionIdx: &revIDX2,
11881244
},
1245+
getPolicyMonitor: func() *mockPolicyMonitor {
1246+
pm := &mockPolicyMonitor{}
1247+
pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once()
1248+
return pm
1249+
},
11891250
revIDX: 2,
11901251
returnsOps: true,
11911252
err: nil,
@@ -1202,8 +1263,13 @@ func TestProcessPolicyDetails(t *testing.T) {
12021263
AgentPolicyId: &policyID,
12031264
PolicyRevisionIdx: &revIDX2,
12041265
},
1266+
getPolicyMonitor: func() *mockPolicyMonitor {
1267+
pm := &mockPolicyMonitor{}
1268+
pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once()
1269+
return pm
1270+
},
12051271
revIDX: 2,
1206-
returnsOps: false,
1272+
returnsOps: true,
12071273
err: nil,
12081274
}, {
12091275
name: "details present with no changes from agent doc",
@@ -1219,6 +1285,11 @@ func TestProcessPolicyDetails(t *testing.T) {
12191285
AgentPolicyId: &policyID,
12201286
PolicyRevisionIdx: &revIDX2,
12211287
},
1288+
getPolicyMonitor: func() *mockPolicyMonitor {
1289+
pm := &mockPolicyMonitor{}
1290+
pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once()
1291+
return pm
1292+
},
12221293
revIDX: 2,
12231294
returnsOps: false,
12241295
err: nil,
@@ -1227,8 +1298,10 @@ func TestProcessPolicyDetails(t *testing.T) {
12271298
for _, tc := range tests {
12281299
t.Run(tc.name, func(t *testing.T) {
12291300
logger := testlog.SetLogger(t)
1301+
pm := tc.getPolicyMonitor()
12301302
checkin := &CheckinT{
12311303
bulker: ftesting.NewMockBulk(),
1304+
pm: pm,
12321305
}
12331306

12341307
revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, tc.agent, tc.req)
@@ -1243,6 +1316,7 @@ func TestProcessPolicyDetails(t *testing.T) {
12431316
} else {
12441317
assert.NoError(t, err)
12451318
}
1319+
pm.AssertExpectations(t)
12461320
})
12471321
}
12481322
}

internal/pkg/api/handleStatus_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ func withAuthFunc(authfn AuthFunc) OptFunc {
3434
}
3535
}
3636

37-
type mockPolicyMonitor struct {
37+
type mockStatusPolicyMonitor struct {
3838
state client.UnitState
3939
}
4040

41-
func (pm *mockPolicyMonitor) Run(ctx context.Context) error {
41+
func (pm *mockStatusPolicyMonitor) Run(ctx context.Context) error {
4242
return nil
4343
}
4444

45-
func (pm *mockPolicyMonitor) State() client.UnitState {
45+
func (pm *mockStatusPolicyMonitor) State() client.UnitState {
4646
return pm.state
4747
}
4848

@@ -86,7 +86,7 @@ func TestHandleStatus(t *testing.T) {
8686
ctx = logger.WithContext(ctx)
8787
state := client.UnitState(k)
8888
r := apiServer{
89-
st: NewStatusT(cfg, nil, c, withAuthFunc(tc.AuthFn), WithSelfMonitor(&mockPolicyMonitor{state}), WithBuildInfo(fbuild.Info{
89+
st: NewStatusT(cfg, nil, c, withAuthFunc(tc.AuthFn), WithSelfMonitor(&mockStatusPolicyMonitor{state}), WithBuildInfo(fbuild.Info{
9090
Version: "8.1.0",
9191
Commit: "4eff928",
9292
BuildTime: time.Now(),

internal/pkg/policy/monitor.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type Monitor interface {
6262

6363
// Unsubscribe removes the current subscription.
6464
Unsubscribe(sub Subscription) error
65+
66+
// LatestRev returns the latest revision idx for the specified policy.
67+
LatestRev(ctx context.Context, policyID string) int64
6568
}
6669

6770
type policyFetcher func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error)
@@ -557,3 +560,34 @@ func (m *monitorT) Unsubscribe(sub Subscription) error {
557560

558561
return nil
559562
}
563+
564+
// LatestRev returns the revision_idx for the passed policy ID.
565+
// If the policy does not exist in the map, then all policies are foribly reloaded.
566+
// On an error with the reload, or if the policy does not exist a 0 is returned.
567+
func (m *monitorT) LatestRev(ctx context.Context, id string) int64 {
568+
if id == "" {
569+
return 0
570+
}
571+
572+
m.mut.Lock()
573+
p, ok := m.policies[id]
574+
m.mut.Unlock()
575+
576+
if !ok {
577+
// We've not seen this policy before, force load.
578+
err := m.loadPolicies(ctx)
579+
if err != nil {
580+
m.log.Error().Err(err).Str(ecs.PolicyID, id).Msg("Unable to load policies.")
581+
return 0
582+
}
583+
584+
m.mut.Lock()
585+
p, ok = m.policies[id]
586+
m.mut.Unlock()
587+
if !ok {
588+
m.log.Warn().Str(ecs.PolicyID, id).Msg("Unable to find policy after load.")
589+
return 0
590+
}
591+
}
592+
return p.pp.Policy.RevisionIdx
593+
}

internal/pkg/policy/monitor_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package policy
99
import (
1010
"context"
1111
"encoding/json"
12+
"fmt"
1213
"sync"
1314
"testing"
1415
"time"
@@ -549,3 +550,76 @@ LOOP:
549550
ms.AssertExpectations(t)
550551
mm.AssertExpectations(t)
551552
}
553+
554+
func TestMonitor_LatestRev(t *testing.T) {
555+
t.Run("empty policy id", func(t *testing.T) {
556+
pm := &monitorT{}
557+
idx := pm.LatestRev(t.Context(), "")
558+
assert.Equal(t, int64(0), idx)
559+
})
560+
561+
t.Run("policy load error", func(t *testing.T) {
562+
bulker := ftesting.NewMockBulk()
563+
mm := mmock.NewMockMonitor()
564+
monitor := NewMonitor(bulker, mm, config.ServerLimits{})
565+
pm := monitor.(*monitorT)
566+
pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) {
567+
return nil, fmt.Errorf("policy fetch error")
568+
}
569+
570+
idx := pm.LatestRev(t.Context(), "test-id")
571+
assert.Equal(t, int64(0), idx)
572+
})
573+
574+
t.Run("policy not found", func(t *testing.T) {
575+
bulker := ftesting.NewMockBulk()
576+
mm := mmock.NewMockMonitor()
577+
monitor := NewMonitor(bulker, mm, config.ServerLimits{})
578+
pm := monitor.(*monitorT)
579+
pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) {
580+
return []model.Policy{}, nil
581+
}
582+
idx := pm.LatestRev(t.Context(), "test-id")
583+
assert.Equal(t, int64(0), idx)
584+
})
585+
586+
t.Run("policy found after load", func(t *testing.T) {
587+
bulker := ftesting.NewMockBulk()
588+
mm := mmock.NewMockMonitor()
589+
monitor := NewMonitor(bulker, mm, config.ServerLimits{})
590+
pm := monitor.(*monitorT)
591+
policyId := uuid.Must(uuid.NewV4()).String()
592+
rId := xid.New().String()
593+
policy := model.Policy{
594+
ESDocument: model.ESDocument{
595+
Id: rId,
596+
Version: 1,
597+
SeqNo: 1,
598+
},
599+
PolicyID: policyId,
600+
Data: policyDataDefault,
601+
RevisionIdx: 2,
602+
}
603+
pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) {
604+
return []model.Policy{policy}, nil
605+
}
606+
idx := pm.LatestRev(t.Context(), policyId)
607+
assert.Equal(t, int64(2), idx)
608+
})
609+
610+
t.Run("policy found", func(t *testing.T) {
611+
pm := &monitorT{
612+
policies: map[string]policyT{
613+
"test-id": policyT{
614+
pp: ParsedPolicy{
615+
Policy: model.Policy{
616+
RevisionIdx: 1,
617+
},
618+
},
619+
},
620+
},
621+
}
622+
idx := pm.LatestRev(t.Context(), "test-id")
623+
assert.Equal(t, int64(1), idx)
624+
})
625+
}

0 commit comments

Comments
 (0)