diff --git a/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml b/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml new file mode 100644 index 00000000000..a267150c03b --- /dev/null +++ b/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add agent_policy_id and policy_revision_idx to checkin requests + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Add agent_policy_id and policy_revision_idx attributes to checkin requests. + These attributes are used to inform fleet-server of the policy id and revision that the agent is currently running. + Add a feature flag to disable sending acks for POLICY_CHANGE actions on a future release. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/9931 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6446 diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go index 634f2020e55..9bfb90ad7ea 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/remote" "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent/pkg/features" ) // PolicyChangeHandler is a handler for POLICY_CHANGE action. @@ -41,6 +42,7 @@ type PolicyChangeHandler struct { setters []actions.ClientSetter policyLogLevelSetter logLevelSetter coordinator *coordinator.Coordinator + disableAckFn func() bool // Disabled for 8.8.0 release in order to limit the surface // https://github.com/elastic/security-team/issues/6501 // // Last known valid signature validation key @@ -67,6 +69,7 @@ func NewPolicyChangeHandler( setters: setters, coordinator: coordinator, policyLogLevelSetter: policyLogLevelSetter, + disableAckFn: features.DisablePolicyChangeAcks, } } @@ -111,7 +114,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack return err } - h.ch <- newPolicyChange(ctx, c, a, acker, false) + h.ch <- newPolicyChange(ctx, c, a, acker, false, h.disableAckFn()) return nil } @@ -473,8 +476,8 @@ type policyChange struct { cfg *config.Config action fleetapi.Action acker acker.Acker - commit bool ackWatcher chan struct{} + disableAck bool } func newPolicyChange( @@ -482,9 +485,10 @@ func newPolicyChange( config *config.Config, action fleetapi.Action, acker acker.Acker, - commit bool) *policyChange { + makeCh bool, + disableAck bool) *policyChange { var ackWatcher chan struct{} - if commit { + if makeCh { // we don't need it otherwise ackWatcher = make(chan struct{}) } @@ -493,8 +497,8 @@ func newPolicyChange( cfg: config, action: action, acker: acker, - commit: true, ackWatcher: ackWatcher, + disableAck: disableAck, } } @@ -502,22 +506,21 @@ func (l *policyChange) Config() *config.Config { return l.cfg } +// Ack sends an ack for the associated action if the results are expected. +// An ack will be sent for UNENROLL actions, or by POLICY_CHANGE actions if it has not been explicitly disabled. func (l *policyChange) Ack() error { - if l.action == nil { + if l.disableAck || l.action == nil { return nil } err := l.acker.Ack(l.ctx, l.action) if err != nil { return err } - if l.commit { - err := l.acker.Commit(l.ctx) - if l.ackWatcher != nil && err == nil { - close(l.ackWatcher) - } - return err + err = l.acker.Commit(l.ctx) + if err == nil && l.ackWatcher != nil { + close(l.ackWatcher) } - return nil + return err } // WaitAck waits for policy change to be acked. @@ -525,7 +528,7 @@ func (l *policyChange) Ack() error { // Caller is responsible to use any reasonable deadline otherwise // function call can be endlessly blocking. func (l *policyChange) WaitAck(ctx context.Context) { - if !l.commit || l.ackWatcher == nil { + if l.ackWatcher == nil { return } diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go index 9fdf7184e89..dd671fb41ac 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go @@ -105,7 +105,7 @@ func TestPolicyAcked(t *testing.T) { agentInfo := &info.AgentInfo{} nullStore := &storage.NullStore{} - t.Run("Config change should ACK", func(t *testing.T) { + t.Run("Default: Config changes are ACKed", func(t *testing.T) { ch := make(chan coordinator.ConfigChange, 1) tacker := &testAcker{} @@ -119,6 +119,7 @@ func TestPolicyAcked(t *testing.T) { }, } + // Test default FF value cfg := configuration.DefaultConfiguration() handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{}) @@ -129,9 +130,64 @@ func TestPolicyAcked(t *testing.T) { require.NoError(t, change.Ack()) actions := tacker.Items() - assert.EqualValues(t, 1, len(actions)) + assert.Len(t, actions, 1) assert.Equal(t, actionID, actions[0]) }) + t.Run("Config change acks when forced", func(t *testing.T) { + ch := make(chan coordinator.ConfigChange, 1) + tacker := &testAcker{} + + config := map[string]interface{}{"hello": "world"} + actionID := "abc123" + action := &fleetapi.ActionPolicyChange{ + ActionID: actionID, + ActionType: "POLICY_CHANGE", + Data: fleetapi.ActionPolicyChangeData{ + Policy: config, + }, + } + + cfg := configuration.DefaultConfiguration() + handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{}) + handler.disableAckFn = func() bool { return false } + + err := handler.Handle(context.Background(), action, tacker) + require.NoError(t, err) + + change := <-ch + require.NoError(t, change.Ack()) + + actions := tacker.Items() + assert.Len(t, actions, 1) + assert.Equal(t, actionID, actions[0]) + }) + t.Run("Config change do not ack when disabled", func(t *testing.T) { + ch := make(chan coordinator.ConfigChange, 1) + tacker := &testAcker{} + + config := map[string]interface{}{"hello": "world"} + actionID := "abc123" + action := &fleetapi.ActionPolicyChange{ + ActionID: actionID, + ActionType: "POLICY_CHANGE", + Data: fleetapi.ActionPolicyChangeData{ + Policy: config, + }, + } + + cfg := configuration.DefaultConfiguration() + handler := NewPolicyChangeHandler(log, agentInfo, cfg, nullStore, ch, nilLogLevelSet(t), &coordinator.Coordinator{}) + handler.disableAckFn = func() bool { return true } + + err := handler.Handle(context.Background(), action, tacker) + require.NoError(t, err) + + change := <-ch + require.NoError(t, change.Ack()) + + actions := tacker.Items() + assert.Empty(t, actions) + }) } func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) { diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go b/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go index 3f5e0cfed99..d05e325b68f 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go @@ -93,7 +93,7 @@ func (h *Unenroll) handle(ctx context.Context, a fleetapi.Action, acker acker.Ac } // Generate empty policy change, this removing all the running components - unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true) + unenrollPolicy := newPolicyChange(ctx, config.New(), a, acker, true, false) h.ch <- unenrollPolicy // backup action for future start to avoid starting fleet gateway loop diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index 2fd60a79ae6..905ce4186db 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -71,6 +71,7 @@ type stateStore interface { AckToken() string SetAckToken(ackToken string) Save() error + Action() fleetapi.Action } type FleetGateway struct { @@ -356,15 +357,21 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, // Fix loglevel with the current log level used by coordinator ecsMeta.Elastic.Agent.LogLevel = state.LogLevel.String() + action := f.stateStore.Action() + agentPolicyID := getPolicyID(action) + policyRevisionIDX := getPolicyRevisionIDX(action) + // checkin cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client) req := &fleetapi.CheckinRequest{ - AckToken: ackToken, - Metadata: ecsMeta, - Status: agentStateToString(state.State), - Message: state.Message, - Components: components, - UpgradeDetails: state.UpgradeDetails, + AckToken: ackToken, + Metadata: ecsMeta, + Status: agentStateToString(state.State), + Message: state.Message, + Components: components, + UpgradeDetails: state.UpgradeDetails, + AgentPolicyID: agentPolicyID, + PolicyRevisionIDX: policyRevisionIDX, } resp, took, err := cmd.Execute(ctx, req) @@ -447,3 +454,43 @@ func RequestBackoff(done <-chan struct{}) backoff.Backoff { defaultFleetBackoffSettings.Max, ) } + +// getPolicyID will check that the passed action is a POLICY_CHANGE action and return the policy_id attribute of the policy as a string. +func getPolicyID(action fleetapi.Action) string { + policyChange, ok := action.(*fleetapi.ActionPolicyChange) + if !ok { + return "" + } + v, ok := policyChange.Data.Policy["policy_id"] + if !ok { + return "" + } + vv, ok := v.(string) + if !ok { + return "" + } + return vv +} + +// getPolicyRevisionIDX will check that the passed action is a POLICY_CHANGE action and return the policy_revision_idx attribute of the policy as an int64. +// The function will attempt to convert the attribute to int64 if int or float64 is used in order to prevent issues from serialization. +func getPolicyRevisionIDX(action fleetapi.Action) int64 { + policyChange, ok := action.(*fleetapi.ActionPolicyChange) + if !ok { + return 0 + } + v, ok := policyChange.Data.Policy["policy_revision_idx"] + if !ok { + return 0 + } + switch vv := v.(type) { + case int64: + return vv + case int: + return int64(vv) + case float64: + return int64(vv) + default: + return 0 + } +} diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index 217ac1ca457..dfcd40a93b7 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -377,6 +377,74 @@ func TestFleetGateway(t *testing.T) { default: } }) + + t.Run("sends agent_policy_id and policy_revision_idx", func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + scheduler := scheduler.NewStepper() + client := newTestingClient() + + log, _ := loggertest.New("fleet_gateway") + + stateStore := newStateStore(t, log) + stateStore.SetAction(&fleetapi.ActionPolicyChange{ + ActionID: "test-action-id", + ActionType: fleetapi.ActionTypePolicyChange, + Data: fleetapi.ActionPolicyChangeData{ + Policy: map[string]interface{}{ + "policy_id": "test-policy-id", + "policy_revision_idx": 1, + }, + }, + }) + err := stateStore.Save() + require.NoError(t, err) + + gateway, err := newFleetGatewayWithScheduler( + log, + settings, + agentInfo, + client, + scheduler, + noop.New(), + emptyStateFetcher, + stateStore, + ) + require.NoError(t, err) + + waitFn := ackSeq( + client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { + data, err := io.ReadAll(body) + require.NoError(t, err) + + var checkinRequest fleetapi.CheckinRequest + err = json.Unmarshal(data, &checkinRequest) + require.NoError(t, err) + + require.Equal(t, "test-policy-id", checkinRequest.AgentPolicyID) + require.Equal(t, int64(1), checkinRequest.PolicyRevisionIDX) + + resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) + return resp, nil + }), + ) + + errCh := runFleetGateway(ctx, gateway) + + // Synchronize scheduler and acking of calls from the worker go routine. + scheduler.Next() + waitFn() + + cancel() + err = <-errCh + require.NoError(t, err) + select { + case actions := <-gateway.Actions(): + t.Errorf("Expected no actions, got %v", actions) + default: + } + }) } func TestRetriesOnFailures(t *testing.T) { diff --git a/internal/pkg/fleetapi/checkin_cmd.go b/internal/pkg/fleetapi/checkin_cmd.go index 16ce9afe671..fb204b6ad3a 100644 --- a/internal/pkg/fleetapi/checkin_cmd.go +++ b/internal/pkg/fleetapi/checkin_cmd.go @@ -41,12 +41,14 @@ type CheckinComponent struct { // CheckinRequest consists of multiple events reported to fleet ui. type CheckinRequest struct { - Status string `json:"status"` - AckToken string `json:"ack_token,omitempty"` - Metadata *info.ECSMeta `json:"local_metadata,omitempty"` - Message string `json:"message"` // V2 Agent message - Components []CheckinComponent `json:"components"` // V2 Agent components - UpgradeDetails *details.Details `json:"upgrade_details,omitempty"` + Status string `json:"status"` + AckToken string `json:"ack_token,omitempty"` + Metadata *info.ECSMeta `json:"local_metadata,omitempty"` + Message string `json:"message"` // V2 Agent message + Components []CheckinComponent `json:"components"` // V2 Agent components + UpgradeDetails *details.Details `json:"upgrade_details,omitempty"` + AgentPolicyID string `json:"agent_policy_id,omitempty"` + PolicyRevisionIDX int64 `json:"policy_revision_idx,omitempty"` } // SerializableEvent is a representation of the event to be send to the Fleet Server API via the checkin diff --git a/pkg/features/features.go b/pkg/features/features.go index 1e547e73a47..3d1ee3a0931 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -21,6 +21,10 @@ import ( // 8.11+ - default is enabled const defaultTamperProtection = true +// The default value of the disable policy change acks flag if the flag is missing. +// 9.2 - disabled (acks are sent) +const defaultDisablePolicyChangeAcks = false + var ( current = Flags{ tamperProtection: defaultTamperProtection, @@ -36,7 +40,8 @@ type Flags struct { fqdn bool fqdnCallbacks map[string]BoolValueOnChangeCallback - tamperProtection bool + tamperProtection bool + disablePolicyChangeAcks bool } type cfg struct { @@ -48,6 +53,9 @@ type cfg struct { TamperProtection *struct { Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"` } `json:"tamper_protection,omitempty" yaml:"tamper_protection,omitempty" config:"tamper_protection,omitempty"` + DisablePolicyChangeAcks *struct { + Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"` + } `json:"disable_policy_change_acks" yaml:"disable_policy_change_acks" config:"disable_policy_change_acks"` } `json:"features" yaml:"features" config:"features"` } `json:"agent" yaml:"agent" config:"agent"` } @@ -66,6 +74,13 @@ func (f *Flags) TamperProtection() bool { return f.tamperProtection } +func (f *Flags) DisablePolicyChangeAcks() bool { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.disablePolicyChangeAcks +} + func (f *Flags) AsProto() *proto.Features { return &proto.Features{ Fqdn: &proto.FQDNFeature{ @@ -121,6 +136,13 @@ func (f *Flags) setTamperProtection(newValue bool) { f.tamperProtection = newValue } +func (f *Flags) setDisablePolicyChangeAcks(newValue bool) { + f.mu.Lock() + defer f.mu.Unlock() + + f.disablePolicyChangeAcks = newValue +} + // setSource sets the source from he given cfg. func (f *Flags) setSource(c cfg) error { // Use JSON marshalling-unmarshalling to convert cfg to mapstr @@ -186,6 +208,12 @@ func Parse(policy any) (*Flags, error) { flags.setTamperProtection(defaultTamperProtection) } + if parsedFlags.Agent.Features.DisablePolicyChangeAcks != nil { + flags.setDisablePolicyChangeAcks(parsedFlags.Agent.Features.DisablePolicyChangeAcks.Enabled) + } else { + flags.setDisablePolicyChangeAcks(defaultDisablePolicyChangeAcks) + } + if err := flags.setSource(parsedFlags); err != nil { return nil, fmt.Errorf("error creating feature flags source: %w", err) } @@ -208,6 +236,7 @@ func Apply(c *config.Config) error { current.setFQDN(parsed.FQDN()) current.setTamperProtection(parsed.TamperProtection()) + current.setDisablePolicyChangeAcks(parsed.DisablePolicyChangeAcks()) return err } @@ -220,3 +249,8 @@ func FQDN() bool { func TamperProtection() bool { return current.TamperProtection() } + +// DisablePolicyChangeAcks reports if the agent will stop using ACKs for POLICY_CHANGE actions. +func DisablePolicyChangeAcks() bool { + return current.DisablePolicyChangeAcks() +}