From 4b441a555391a66baf80f4bd0be5b5068ab9c204 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 4 Oct 2022 14:11:56 +0200 Subject: [PATCH 1/9] Conditional log level for api key read --- internal/pkg/api/handleAck.go | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 216c4b195..fd6e9a4a6 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -20,6 +20,7 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -364,10 +365,18 @@ func (ack *AckT) updateAPIKey(ctx context.Context, if apiKeyID != "" { res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true) if err != nil { - zlog.Error(). - Err(err). - Str(LogAPIKeyID, apiKeyID). - Msg("Failed to read API Key roles") + if ack.isAPIKeyReadError(ctx, zlog, apiKeyID, err) { + zlog.Error(). + Err(err). + Str(LogAPIKeyID, apiKeyID). + Msg("Failed to read API Key roles") + } else { + // not an error, race when API key was invalidated before acking + zlog.Info(). + Err(err). + Str(LogAPIKeyID, apiKeyID). + Msg("Failed to read invalidated API Key roles") + } } else { clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors) if err != nil { @@ -513,6 +522,22 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return nil } +func (ack *AckT) isAPIKeyReadError(ctx context.Context, zlog zerolog.Logger, apiKeyID string, err error) bool { + if !errors.Is(err, apikey.ErrAPIKeyNotFound) { + return false + } + + agent, err := findAgentByAPIKeyID(ctx, ack.bulk, apiKeyID) + if err != nil { + zlog.Warn(). + Err(err). + Msg("failed to find agent by api key") + return true + } + + return agent.Active // it is a valid error in case agent is active (was not invalidated) +} + // Generate an update script that validates that the policy_id // has not changed underneath us by an upstream process (Kibana or otherwise). // We have a race condition where a user could have assigned a new policy to From b4c976a5c775fee07262482ba6bd89e831ef102b Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 4 Oct 2022 15:41:47 +0200 Subject: [PATCH 2/9] use agent id to query agent --- internal/pkg/api/handleAck.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index fd6e9a4a6..3cdc94ce6 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -365,7 +365,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, if apiKeyID != "" { res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true) if err != nil { - if ack.isAPIKeyReadError(ctx, zlog, apiKeyID, err) { + if ack.isAPIKeyReadError(ctx, zlog, agentID, err) { zlog.Error(). Err(err). Str(LogAPIKeyID, apiKeyID). @@ -522,12 +522,11 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return nil } -func (ack *AckT) isAPIKeyReadError(ctx context.Context, zlog zerolog.Logger, apiKeyID string, err error) bool { +func (ack *AckT) isAPIKeyReadError(ctx context.Context, zlog zerolog.Logger, agentID string, err error) bool { if !errors.Is(err, apikey.ErrAPIKeyNotFound) { return false } - - agent, err := findAgentByAPIKeyID(ctx, ack.bulk, apiKeyID) + agent, err := dl.FindAgent(ctx, ack.bulk, dl.QueryAgentByID, dl.FieldID, agentID) if err != nil { zlog.Warn(). Err(err). From a2b6926f41f542bef058bc97ebbe242e9bb0e65f Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 4 Oct 2022 15:42:46 +0200 Subject: [PATCH 3/9] use agent id to query agent --- internal/pkg/api/handleAck.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 3cdc94ce6..0d0445523 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -530,7 +530,7 @@ func (ack *AckT) isAPIKeyReadError(ctx context.Context, zlog zerolog.Logger, age if err != nil { zlog.Warn(). Err(err). - Msg("failed to find agent by api key") + Msg("failed to find agent by ID") return true } From 0eb699d55d33dc1ac819886b6d6876306f273bbf Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 4 Oct 2022 16:05:32 +0200 Subject: [PATCH 4/9] stop processing once agent is detected inactive --- internal/pkg/api/handleAck.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 0d0445523..c2e458b8e 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -20,7 +20,6 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -336,13 +335,18 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag } for _, output := range agent.Outputs { + if !agent.Active { + // stop processing if agent is inactive + break + } + if output.Type != policy.OutputTypeElasticsearch { continue } err := ack.updateAPIKey(ctx, zlog, - agent.Id, + agent, currRev, currCoord, agent.PolicyID, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds) @@ -357,7 +361,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag func (ack *AckT) updateAPIKey(ctx context.Context, zlog zerolog.Logger, - agentID string, + agent *model.Agent, currRev, currCoord int64, policyID, apiKeyID, permissionHash string, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error { @@ -365,7 +369,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, if apiKeyID != "" { res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true) if err != nil { - if ack.isAPIKeyReadError(ctx, zlog, agentID, err) { + if isAgentActive(ctx, zlog, ack.bulk, agent.Id, err) { zlog.Error(). Err(err). Str(LogAPIKeyID, apiKeyID). @@ -376,6 +380,9 @@ func (ack *AckT) updateAPIKey(ctx context.Context, Err(err). Str(LogAPIKeyID, apiKeyID). Msg("Failed to read invalidated API Key roles") + + // update to inactive for future checks + agent.Active = false } } else { clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors) @@ -410,7 +417,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, err := ack.bulk.Update( ctx, dl.FleetAgents, - agentID, + agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3), @@ -522,11 +529,8 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return nil } -func (ack *AckT) isAPIKeyReadError(ctx context.Context, zlog zerolog.Logger, agentID string, err error) bool { - if !errors.Is(err, apikey.ErrAPIKeyNotFound) { - return false - } - agent, err := dl.FindAgent(ctx, ack.bulk, dl.QueryAgentByID, dl.FieldID, agentID) +func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, agentID string, err error) bool { + agent, err := dl.FindAgent(ctx, bulk, dl.QueryAgentByID, dl.FieldID, agentID) if err != nil { zlog.Warn(). Err(err). From 284e7e04a9cc8d50c965798e64abc75c673aaa56 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 4 Oct 2022 16:11:08 +0200 Subject: [PATCH 5/9] remove unnecessary args --- internal/pkg/api/handleAck.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index c2e458b8e..c3410d88c 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -348,7 +348,6 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag zlog, agent, currRev, currCoord, - agent.PolicyID, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds) if err != nil { return err @@ -363,7 +362,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, currRev, currCoord int64, - policyID, apiKeyID, permissionHash string, + apiKeyID, permissionHash string, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error { if apiKeyID != "" { @@ -409,7 +408,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, } body := makeUpdatePolicyBody( - policyID, + agent.PolicyID, currRev, currCoord, ) @@ -424,7 +423,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, ) zlog.Err(err). - Str(LogPolicyID, policyID). + Str(LogPolicyID, agent.PolicyID). Int64("policyRevision", currRev). Int64("policyCoordinator", currCoord). Msg("ack policy") From 097a24179c30d2e50c574b3eae17604e22d50fa1 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 4 Oct 2022 16:12:42 +0200 Subject: [PATCH 6/9] lint --- internal/pkg/api/handleAck.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index c3410d88c..35425273a 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -368,7 +368,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, if apiKeyID != "" { res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true) if err != nil { - if isAgentActive(ctx, zlog, ack.bulk, agent.Id, err) { + if isAgentActive(ctx, zlog, ack.bulk, agent.Id) { zlog.Error(). Err(err). Str(LogAPIKeyID, apiKeyID). @@ -528,7 +528,7 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return nil } -func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, agentID string, err error) bool { +func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, agentID string) bool { agent, err := dl.FindAgent(ctx, bulk, dl.QueryAgentByID, dl.FieldID, agentID) if err != nil { zlog.Warn(). From f2ea7aafa162c7cf7ff1560b7348662a36980adf Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 4 Oct 2022 18:50:40 +0200 Subject: [PATCH 7/9] check result instead of modifying passed structure --- internal/pkg/api/handleAck.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 35425273a..4bd261a54 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -31,6 +31,10 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/smap" ) +var ( + errUpdatingInactiveAgent = errors.New("updating inactive agent") +) + type HTTPError struct { Status int } @@ -335,21 +339,20 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag } for _, output := range agent.Outputs { - if !agent.Active { - // stop processing if agent is inactive - break - } - if output.Type != policy.OutputTypeElasticsearch { continue } err := ack.updateAPIKey(ctx, zlog, - agent, + agent.Id, currRev, currCoord, + agent.PolicyID, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds) if err != nil { + if errors.Is(err, errUpdatingInactiveAgent) { + break + } return err } } @@ -360,28 +363,28 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag func (ack *AckT) updateAPIKey(ctx context.Context, zlog zerolog.Logger, - agent *model.Agent, + agentID string, currRev, currCoord int64, - apiKeyID, permissionHash string, + policyID, apiKeyID, permissionHash string, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error { if apiKeyID != "" { res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true) if err != nil { - if isAgentActive(ctx, zlog, ack.bulk, agent.Id) { + if isAgentActive(ctx, zlog, ack.bulk, agentID) { zlog.Error(). Err(err). Str(LogAPIKeyID, apiKeyID). Msg("Failed to read API Key roles") } else { - // not an error, race when API key was invalidated before acking + // race when API key was invalidated before acking zlog.Info(). Err(err). Str(LogAPIKeyID, apiKeyID). Msg("Failed to read invalidated API Key roles") - // update to inactive for future checks - agent.Active = false + // prevents future checks + return errUpdatingInactiveAgent } } else { clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors) @@ -408,7 +411,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, } body := makeUpdatePolicyBody( - agent.PolicyID, + policyID, currRev, currCoord, ) @@ -416,14 +419,14 @@ func (ack *AckT) updateAPIKey(ctx context.Context, err := ack.bulk.Update( ctx, dl.FleetAgents, - agent.Id, + agentID, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3), ) zlog.Err(err). - Str(LogPolicyID, agent.PolicyID). + Str(LogPolicyID, policyID). Int64("policyRevision", currRev). Int64("policyCoordinator", currCoord). Msg("ack policy") From dddf03928ecc18715fb2453b3a9ebaffb0c684e4 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Wed, 5 Oct 2022 07:59:54 +0200 Subject: [PATCH 8/9] propagate info to client, decrease log level on api handler --- internal/pkg/api/error.go | 9 +++++++++ internal/pkg/api/handleAck.go | 7 ++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/internal/pkg/api/error.go b/internal/pkg/api/error.go index 032fba855..45b90cd3c 100644 --- a/internal/pkg/api/error.go +++ b/internal/pkg/api/error.go @@ -118,6 +118,15 @@ func NewHTTPErrResp(err error) HTTPErrResp { zerolog.InfoLevel, }, }, + { + ErrUpdatingInactiveAgent, + HTTPErrResp{ + http.StatusUnauthorized, + "Unauthorized", + "Agent not active", + zerolog.InfoLevel, + }, + }, } for _, e := range errTable { diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 4bd261a54..b2123322b 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -32,7 +32,7 @@ import ( ) var ( - errUpdatingInactiveAgent = errors.New("updating inactive agent") + ErrUpdatingInactiveAgent = errors.New("updating inactive agent") ) type HTTPError struct { @@ -350,9 +350,6 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag agent.PolicyID, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds) if err != nil { - if errors.Is(err, errUpdatingInactiveAgent) { - break - } return err } } @@ -384,7 +381,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, Msg("Failed to read invalidated API Key roles") // prevents future checks - return errUpdatingInactiveAgent + return ErrUpdatingInactiveAgent } } else { clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors) From 4956bd681d68fab3b1b531dc528ed4f2594c1e7b Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 6 Oct 2022 11:02:20 +0200 Subject: [PATCH 9/9] error as error --- internal/pkg/api/handleAck.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index b2123322b..b9b4f355c 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -531,7 +531,7 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, agentID string) bool { agent, err := dl.FindAgent(ctx, bulk, dl.QueryAgentByID, dl.FieldID, agentID) if err != nil { - zlog.Warn(). + zlog.Error(). Err(err). Msg("failed to find agent by ID") return true