Skip to content

Commit 3c342b8

Browse files
Add agent_policy_id and policy_revision_idx to checkin attributes
Allow the agents to add their currently running policy_id and revision_idx attributes to the checkin request bodies. These attributes, if included and different from the agent doc will be used when updating the agent doc in the pre-poll checkin. If the agent's policy id does not match the expected policy id from the server a reassign is detected and a new policy change action will be sent. If the checkin ID is greater than what was previously recorded or the policy id changes from what was previously recoreded, then the api keys will be managed.
1 parent 5b5468a commit 3c342b8

File tree

10 files changed

+348
-150
lines changed

10 files changed

+348
-150
lines changed

internal/pkg/api/handleAck.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -442,19 +442,28 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
442442
agentID string,
443443
apiKeyID, permissionHash string,
444444
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
445-
bulk := ack.bulk
445+
return updateAPIKey(ctx, zlog, ack.bulk, agentID, apiKeyID, permissionHash, toRetireAPIKeyIDs, outputName)
446+
}
447+
448+
func updateAPIKey(ctx context.Context,
449+
zlog zerolog.Logger,
450+
bulk bulk.Bulk,
451+
agentID string,
452+
apiKeyID, permissionHash string,
453+
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
446454
// use output bulker if exists
455+
outBulk := bulk
447456
if outputName != "" {
448-
outputBulk := ack.bulk.GetBulker(outputName)
457+
outputBulk := bulk.GetBulker(outputName)
449458
if outputBulk != nil {
450459
zlog.Debug().Str(ecs.PolicyOutputName, outputName).Msg("Using output bulker in updateAPIKey")
451-
bulk = outputBulk
460+
outBulk = outputBulk
452461
}
453462
}
454463
if apiKeyID != "" {
455-
res, err := bulk.APIKeyRead(ctx, apiKeyID, true)
464+
res, err := outBulk.APIKeyRead(ctx, apiKeyID, true)
456465
if err != nil {
457-
if isAgentActive(ctx, zlog, ack.bulk, agentID) {
466+
if isAgentActive(ctx, zlog, outBulk, agentID) {
458467
zlog.Warn().
459468
Err(err).
460469
Str(LogAPIKeyID, apiKeyID).
@@ -480,7 +489,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
480489
Str(LogAPIKeyID, apiKeyID).
481490
Msg("Failed to cleanup roles")
482491
} else if removedRolesCount > 0 {
483-
if err := bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
492+
if err := outBulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
484493
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str(ecs.PolicyOutputName, outputName).Msg("Failed to update API Key")
485494
} else {
486495
zlog.Debug().
@@ -493,7 +502,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
493502
}
494503
}
495504
}
496-
ack.invalidateAPIKeys(ctx, zlog, toRetireAPIKeyIDs, apiKeyID)
505+
invalidateAPIKeys(ctx, zlog, bulk, toRetireAPIKeyIDs, apiKeyID)
497506
}
498507

499508
return nil

internal/pkg/api/handleCheckin.go

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,16 +278,34 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
278278
return fmt.Errorf("failed to update upgrade_details: %w", err)
279279
}
280280

281+
initialOpts := []checkin.Option{
282+
checkin.WithStatus(string(req.Status)),
283+
checkin.WithMessage(req.Message),
284+
checkin.WithMeta(rawMeta),
285+
checkin.WithComponents(rawComponents),
286+
checkin.WithSeqNo(seqno),
287+
checkin.WithVer(ver),
288+
checkin.WithUnhealthyReason(unhealthyReason),
289+
checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""),
290+
}
291+
292+
revID, opts, err := ct.processPolicyDetails(r.Context(), zlog, agent, req)
293+
if err != nil {
294+
return fmt.Errorf("failed to update policy details: %w", err)
295+
}
296+
if len(opts) > 0 {
297+
initialOpts = append(initialOpts, opts...)
298+
}
299+
281300
// Subscribe to actions dispatcher
282301
aSub := ct.ad.Subscribe(zlog, agent.Id, seqno)
283302
defer ct.ad.Unsubscribe(zlog, aSub)
284303
actCh := aSub.Ch()
285304

286-
// use revision_idx=0 if the agent has a single output where no API key is defined
287-
// This will force the policy monitor to emit a new policy to regerate API keys
288-
revID := agent.PolicyRevisionIdx
289305
for _, output := range agent.Outputs {
290306
if output.APIKey == "" {
307+
// use revision_idx=0 if the agent has a single output where no API key is defined
308+
// This will force the policy monitor to emit a new policy to regerate API keys
291309
revID = 0
292310
break
293311
}
@@ -327,7 +345,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
327345
// Initial update on checkin, and any user fields that might have changed
328346
// Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin.
329347
// 8.16.x releases would incorrectly set unenrolled_at
330-
err = ct.bc.CheckIn(agent.Id, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), checkin.WithMeta(rawMeta), checkin.WithComponents(rawComponents), checkin.WithSeqNo(seqno), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""))
348+
err = ct.bc.CheckIn(agent.Id, initialOpts...)
331349
if err != nil {
332350
zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed")
333351
}
@@ -1123,3 +1141,49 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu
11231141

11241142
return pollDuration, jitter
11251143
}
1144+
1145+
// processPolicyDetails handles the agent_policy_id and revision_idx included in the checkin request.
1146+
// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is greater than what the last checkin reported.
1147+
// It returns the revision idx that should be used when subscribing for new POLICY_CHANGE actons and optional args to use when doing the non-tick checkin.
1148+
func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) (int64, []checkin.Option, error) {
1149+
// no details specified
1150+
if req == nil || req.PolicyRevisionIdx == nil || req.AgentPolicyId == nil {
1151+
return agent.PolicyRevisionIdx, nil, nil
1152+
}
1153+
policyID := *req.AgentPolicyId
1154+
revisionIDX := *req.PolicyRevisionIdx
1155+
1156+
span, ctx := apm.StartSpan(ctx, "Process policy details", "process")
1157+
span.Context.SetLabel("agent_id", agent.Agent.ID)
1158+
span.Context.SetLabel(dl.FieldAgentPolicyID, policyID)
1159+
span.Context.SetLabel(dl.FieldPolicyRevisionIdx, revisionIDX)
1160+
defer span.End()
1161+
1162+
// update agent doc if policy id or revision idx does not match
1163+
var opts []checkin.Option
1164+
if policyID != agent.PolicyID || revisionIDX != agent.PolicyRevisionIdx {
1165+
opts = []checkin.Option{
1166+
checkin.WithAgentPolicyID(policyID),
1167+
checkin.WithPolicyRevisionIDX(revisionIDX),
1168+
}
1169+
}
1170+
// Policy reassign, subscribe to policy with revision 0
1171+
if policyID != agent.PolicyID {
1172+
zlog.Debug().Str(dl.FieldAgentPolicyID, policyID).Str("new_policy_id", agent.PolicyID).Msg("Policy ID mismatch detected, reassigning agent.")
1173+
return 0, opts, nil
1174+
}
1175+
1176+
// Update API keys if the policy has changed, or if the revision increments.
1177+
if policyID != agent.AgentPolicyID || revisionIDX > agent.PolicyRevisionIdx {
1178+
for outputName, output := range agent.Outputs {
1179+
if output.Type != policy.OutputTypeElasticsearch {
1180+
continue
1181+
}
1182+
if err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName); err != nil {
1183+
// Only returns ErrUpdatingInactiveAgent
1184+
return 0, nil, err
1185+
}
1186+
}
1187+
}
1188+
return revisionIDX, opts, nil
1189+
}

internal/pkg/api/handleCheckin_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,3 +1118,131 @@ func TestValidateCheckinRequest(t *testing.T) {
11181118
})
11191119
}
11201120
}
1121+
1122+
func TestProcessPolicyDetails(t *testing.T) {
1123+
policyID := "policy-id"
1124+
revIDX2 := int64(2)
1125+
tests := []struct {
1126+
name string
1127+
agent *model.Agent
1128+
req *CheckinRequest
1129+
revIDX int64
1130+
returnsOps bool
1131+
err error
1132+
}{{
1133+
name: "request has no policy details",
1134+
agent: &model.Agent{
1135+
PolicyRevisionIdx: 1,
1136+
},
1137+
req: &CheckinRequest{},
1138+
revIDX: 1,
1139+
returnsOps: false,
1140+
err: nil,
1141+
}, {
1142+
name: "policy reassign detected",
1143+
agent: &model.Agent{
1144+
Agent: &model.AgentMetadata{
1145+
ID: "agent-id",
1146+
},
1147+
PolicyID: "new-policy-id",
1148+
AgentPolicyID: policyID,
1149+
PolicyRevisionIdx: 2,
1150+
},
1151+
req: &CheckinRequest{
1152+
AgentPolicyId: &policyID,
1153+
PolicyRevisionIdx: &revIDX2,
1154+
},
1155+
revIDX: 0,
1156+
returnsOps: true,
1157+
err: nil,
1158+
}, {
1159+
name: "revision updated",
1160+
agent: &model.Agent{
1161+
Agent: &model.AgentMetadata{
1162+
ID: "agent-id",
1163+
},
1164+
PolicyID: policyID,
1165+
AgentPolicyID: policyID,
1166+
PolicyRevisionIdx: 1,
1167+
},
1168+
req: &CheckinRequest{
1169+
AgentPolicyId: &policyID,
1170+
PolicyRevisionIdx: &revIDX2,
1171+
},
1172+
revIDX: 2,
1173+
returnsOps: true,
1174+
err: nil,
1175+
}, {
1176+
name: "agent_policy_id has changed",
1177+
agent: &model.Agent{
1178+
Agent: &model.AgentMetadata{
1179+
ID: "agent-id",
1180+
},
1181+
PolicyID: policyID,
1182+
AgentPolicyID: "old-policy-id",
1183+
PolicyRevisionIdx: 1,
1184+
},
1185+
req: &CheckinRequest{
1186+
AgentPolicyId: &policyID,
1187+
PolicyRevisionIdx: &revIDX2,
1188+
},
1189+
revIDX: 2,
1190+
returnsOps: true,
1191+
err: nil,
1192+
}, {
1193+
name: "agent does not have agent_policy_id present",
1194+
agent: &model.Agent{
1195+
Agent: &model.AgentMetadata{
1196+
ID: "agent-id",
1197+
},
1198+
PolicyID: policyID,
1199+
PolicyRevisionIdx: 2,
1200+
},
1201+
req: &CheckinRequest{
1202+
AgentPolicyId: &policyID,
1203+
PolicyRevisionIdx: &revIDX2,
1204+
},
1205+
revIDX: 2,
1206+
returnsOps: false,
1207+
err: nil,
1208+
}, {
1209+
name: "details present with no changes from agent doc",
1210+
agent: &model.Agent{
1211+
Agent: &model.AgentMetadata{
1212+
ID: "agent-id",
1213+
},
1214+
AgentPolicyID: policyID,
1215+
PolicyID: policyID,
1216+
PolicyRevisionIdx: revIDX2,
1217+
},
1218+
req: &CheckinRequest{
1219+
AgentPolicyId: &policyID,
1220+
PolicyRevisionIdx: &revIDX2,
1221+
},
1222+
revIDX: 2,
1223+
returnsOps: false,
1224+
err: nil,
1225+
}}
1226+
1227+
for _, tc := range tests {
1228+
t.Run(tc.name, func(t *testing.T) {
1229+
logger := testlog.SetLogger(t)
1230+
checkin := &CheckinT{
1231+
bulker: ftesting.NewMockBulk(),
1232+
}
1233+
1234+
revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, tc.agent, tc.req)
1235+
assert.Equal(t, tc.revIDX, revIDX)
1236+
if tc.returnsOps {
1237+
assert.NotEmpty(t, opts)
1238+
} else {
1239+
assert.Empty(t, opts)
1240+
}
1241+
if tc.err != nil {
1242+
assert.ErrorIs(t, tc.err, err)
1243+
} else {
1244+
assert.NoError(t, err)
1245+
}
1246+
})
1247+
}
1248+
}

internal/pkg/api/openapi.gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pkg/checkin/bulk.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,18 @@ func WithDeleteAudit(del bool) Option {
113113
}
114114
}
115115

116+
func WithAgentPolicyID(id string) Option {
117+
return func(pending *pendingT) {
118+
pending.agentPolicyID = id
119+
}
120+
}
121+
122+
func WithPolicyRevisionIDX(idx int64) Option {
123+
return func(pending *pendingT) {
124+
pending.revisionIDX = idx
125+
}
126+
}
127+
116128
type extraT struct {
117129
meta []byte
118130
seqNo sqn.SeqNo
@@ -128,6 +140,8 @@ type pendingT struct {
128140
ts string
129141
status string
130142
message string
143+
agentPolicyID string // may be empty
144+
revisionIDX int64
131145
extra *extraT
132146
unhealthyReason *[]string
133147
}
@@ -314,6 +328,10 @@ func toUpdateBody(now string, pending pendingT) ([]byte, error) {
314328
dl.FieldLastCheckinMessage: pending.message, // Set the status message
315329
dl.FieldUnhealthyReason: pending.unhealthyReason,
316330
}
331+
if pending.agentPolicyID != "" {
332+
fields[dl.FieldAgentPolicyID] = pending.agentPolicyID
333+
fields[dl.FieldPolicyRevisionIdx] = pending.revisionIDX
334+
}
317335
if pending.extra != nil {
318336
// If the agent version is not empty it needs to be updated
319337
// Assuming the agent can by upgraded keeping the same id, but incrementing the version
@@ -353,11 +371,13 @@ func encodeParams(now string, data pendingT) (map[string]json.RawMessage, error)
353371
reason json.RawMessage
354372

355373
// optional attributes below
356-
ver json.RawMessage
357-
meta json.RawMessage
358-
components json.RawMessage
359-
isSet json.RawMessage
360-
seqNo json.RawMessage
374+
policyID json.RawMessage
375+
revisionIDX json.RawMessage
376+
ver json.RawMessage
377+
meta json.RawMessage
378+
components json.RawMessage
379+
isSet json.RawMessage
380+
seqNo json.RawMessage
361381

362382
err error
363383
)
@@ -371,6 +391,10 @@ func encodeParams(now string, data pendingT) (map[string]json.RawMessage, error)
371391
Err = errors.Join(Err, err)
372392
reason, err = json.Marshal(data.unhealthyReason)
373393
Err = errors.Join(Err, err)
394+
policyID, err = json.Marshal(data.agentPolicyID)
395+
Err = errors.Join(Err, err)
396+
revisionIDX, err = json.Marshal(data.revisionIDX)
397+
Err = errors.Join(Err, err)
374398
ver, err = json.Marshal(data.extra.ver)
375399
Err = errors.Join(Err, err)
376400
isSet, err = json.Marshal(data.extra.seqNo.IsSet())
@@ -394,6 +418,8 @@ func encodeParams(now string, data pendingT) (map[string]json.RawMessage, error)
394418
"Status": status,
395419
"Message": message,
396420
"UnhealthyReason": reason,
421+
"PolicyID": policyID,
422+
"RevisionIDX": revisionIDX,
397423
"Ver": ver,
398424
"Meta": meta,
399425
"Components": components,

0 commit comments

Comments
 (0)