From ef9ca2bd78641e48e8ac2356989dfbfeebc4efa6 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Mon, 19 Sep 2022 17:16:49 +0200 Subject: [PATCH] Revert "Fix v8.5.0 migration painless script" (#1878) * Revert "Fix v8.5.0 migration painless script (#1839)" This reverts commit de5d74bbb244baf93761eb375af8e7f48e531013. * Revert "Allow multiple ES outputs as long as they are the same ES (#1684)" This reverts commit 63fdcbf6fd730b2f9dc5c84443c07c7bd9a859da. --- cmd/fleet/main.go | 14 +- internal/pkg/api/handleAck.go | 49 +-- internal/pkg/api/handleAck_test.go | 39 +-- internal/pkg/api/handleCheckin.go | 15 +- internal/pkg/api/handleEnroll.go | 11 +- internal/pkg/apikey/apikey.go | 61 ---- .../pkg/apikey/apikey_integration_test.go | 103 ++---- internal/pkg/apikey/create.go | 1 + internal/pkg/apikey/get.go | 68 ++++ internal/pkg/apikey/invalidate.go | 1 + internal/pkg/apikey/metadata.go | 20 +- internal/pkg/bulk/opBulk.go | 16 +- internal/pkg/coordinator/monitor.go | 17 +- .../coordinator/monitor_integration_test.go | 23 +- internal/pkg/dl/agent.go | 22 +- internal/pkg/dl/agent_integration_test.go | 45 --- internal/pkg/dl/constants.go | 32 +- internal/pkg/dl/migration.go | 290 +++++------------ internal/pkg/dl/migration_integration_test.go | 295 ------------------ internal/pkg/es/error.go | 18 +- internal/pkg/model/ext.go | 28 +- internal/pkg/model/ext_test.go | 55 +--- internal/pkg/model/schema.go | 53 +--- internal/pkg/policy/parsed_policy.go | 10 +- internal/pkg/policy/parsed_policy_test.go | 1 + internal/pkg/policy/policy_output.go | 244 ++++++--------- .../policy/policy_output_integration_test.go | 127 -------- internal/pkg/policy/policy_output_test.go | 137 ++------ internal/pkg/testing/esutil/bootstrap.go | 2 +- internal/pkg/testing/setup.go | 16 +- model/schema.json | 89 ++---- 31 files changed, 457 insertions(+), 1445 deletions(-) create mode 100644 internal/pkg/apikey/get.go delete mode 100644 internal/pkg/dl/migration_integration_test.go delete mode 100644 internal/pkg/policy/policy_output_integration_test.go diff --git a/cmd/fleet/main.go b/cmd/fleet/main.go index 976564cb8..a28ec50c3 100644 --- a/cmd/fleet/main.go +++ b/cmd/fleet/main.go @@ -821,21 +821,17 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g * remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version) if err != nil { if len(remoteVersion) != 0 { - return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", - f.bi.Version, remoteVersion, err) + return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", f.bi.Version, remoteVersion, err) } return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err) } - // Run migrations - loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error { + // Run migrations; current safe to do in background. That may change in the future. + g.Go(loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error { return dl.Migrate(ctx, bulker) - }) - if err = loggedMigration(); err != nil { - return fmt.Errorf("failed to run subsystems: %w", err) - } + })) - // Run scheduler for periodic GC/cleanup + // Run schduler for periodic GC/cleanup gcCfg := cfg.Inputs[0].Server.GC sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval)) if err != nil { diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 1261d8f6c..b3febcfd8 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -15,8 +15,6 @@ import ( "strings" "time" - "github.com/pkg/errors" - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -26,6 +24,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/policy" + "github.com/pkg/errors" "github.com/julienschmidt/httprouter" "github.com/rs/zerolog" @@ -338,9 +337,8 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag Int64("rev.coordinatorIdx", rev.CoordinatorIdx). Msg("ack policy revision") - if ok && rev.PolicyID == agent.PolicyID && - (rev.RevisionIdx > currRev || - (rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { + if ok && rev.PolicyID == agent.PolicyID && (rev.RevisionIdx > currRev || + (rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { found = true currRev = rev.RevisionIdx currCoord = rev.CoordinatorIdx @@ -351,7 +349,17 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return nil } - ack.invalidateAPIKeys(ctx, agent) + sz := len(agent.DefaultAPIKeyHistory) + if sz > 0 { + ids := make([]string, sz) + for i := 0; i < sz; i++ { + ids[i] = agent.DefaultAPIKeyHistory[i].ID + } + log.Info().Strs("ids", ids).Msg("Invalidate old API keys") + if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { + log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") + } + } body := makeUpdatePolicyBody( agent.PolicyID, @@ -377,24 +385,8 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return errors.Wrap(err, "handlePolicyChange update") } -func (ack *AckT) invalidateAPIKeys(ctx context.Context, agent *model.Agent) { - var ids []string - for _, out := range agent.Outputs { - for _, k := range out.ToRetireAPIKeyIds { - ids = append(ids, k.ID) - } - } - - if len(ids) > 0 { - log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys") - if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { - log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") - } - } -} - func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error { - apiKeys := agent.APIKeyIDs() + apiKeys := _getAPIKeyIDs(agent) if len(apiKeys) > 0 { zlog = zlog.With().Strs(LogAPIKeyID, apiKeys).Logger() @@ -449,6 +441,17 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return nil } +func _getAPIKeyIDs(agent *model.Agent) []string { + keys := make([]string, 0, 1) + if agent.AccessAPIKeyID != "" { + keys = append(keys, agent.AccessAPIKeyID) + } + if agent.DefaultAPIKeyID != "" { + keys = append(keys, agent.DefaultAPIKeyID) + } + return keys +} + // Generate an update script that validates that the policy_id // has not changed underneath us by an upstream process (Kibana or otherwise). // We have a race condition where a user could have assigned a new policy to diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index 60a265bd4..90c961456 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -15,14 +15,13 @@ import ( "net/http" "testing" - "github.com/google/go-cmp/cmp" - "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -440,39 +439,3 @@ func TestHandleAckEvents(t *testing.T) { }) } } - -func TestInvalidateAPIKeys(t *testing.T) { - toRetire1 := []model.ToRetireAPIKeyIdsItems{{ - ID: "toRetire1", - }} - toRetire2 := []model.ToRetireAPIKeyIdsItems{{ - ID: "toRetire2_0", - }, { - ID: "toRetire2_1", - }} - var toRetire3 []model.ToRetireAPIKeyIdsItems - - want := []string{"toRetire1", "toRetire2_0", "toRetire2_1"} - - agent := model.Agent{ - Outputs: map[string]*model.PolicyOutput{ - "1": {ToRetireAPIKeyIds: toRetire1}, - "2": {ToRetireAPIKeyIds: toRetire2}, - "3": {ToRetireAPIKeyIds: toRetire3}, - }, - } - - bulker := ftesting.NewMockBulk() - bulker.On("APIKeyInvalidate", - context.Background(), mock.MatchedBy(func(ids []string) bool { - // if A contains B and B contains A => A = B - return assert.Subset(t, ids, want) && - assert.Subset(t, want, ids) - })). - Return(nil) - - ack := &AckT{bulk: bulker} - ack.invalidateAPIKeys(context.Background(), &agent) - - bulker.AssertExpectations(t) -} diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 2752dd147..721ee4538 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -10,7 +10,6 @@ import ( "compress/gzip" "context" "encoding/json" - "fmt" "math/rand" "net/http" "reflect" @@ -61,6 +60,7 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro Logger() err := rt.ct.handleCheckin(&zlog, w, r, id) + if err != nil { cntCheckin.IncError(err) resp := NewHTTPErrResp(err) @@ -430,13 +430,13 @@ func convertActions(agentID string, actions []model.Action) ([]ActionResp, strin // func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agentID string, pp *policy.ParsedPolicy) (*ActionResp, error) { zlog = zlog.With(). - Str("fleet.ctx", "processPolicy"). - Int64("fleet.policyRevision", pp.Policy.RevisionIdx). - Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx). + Str("ctx", "processPolicy"). + Int64("policyRevision", pp.Policy.RevisionIdx). + Int64("policyCoordinator", pp.Policy.CoordinatorIdx). Str(LogPolicyID, pp.Policy.PolicyID). Logger() - // Repull and decode the agent object. Do not trust the cache. + // Repull and decode the agent object. Do not trust the cache. agent, err := dl.FindAgent(ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID) if err != nil { zlog.Error().Err(err).Msg("fail find agent record") @@ -446,6 +446,7 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a // Parse the outputs maps in order to prepare the outputs const outputsProperty = "outputs" outputs, err := smap.Parse(pp.Fields[outputsProperty]) + if err != nil { return nil, err } @@ -457,9 +458,9 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a // Iterate through the policy outputs and prepare them for _, policyOutput := range pp.Outputs { err = policyOutput.Prepare(ctx, zlog, bulker, &agent, outputs) + if err != nil { - return nil, fmt.Errorf("failed to prepare output %q: %w", - policyOutput.Name, err) + return nil, err } } diff --git a/internal/pkg/api/handleEnroll.go b/internal/pkg/api/handleEnroll.go index 7c5b1dd5a..a3c2f9833 100644 --- a/internal/pkg/api/handleEnroll.go +++ b/internal/pkg/api/handleEnroll.go @@ -53,6 +53,7 @@ type EnrollerT struct { } func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) { + log.Info(). Interface("limits", cfg.Limits.EnrollLimit). Msg("Setting config enroll_limit") @@ -186,13 +187,7 @@ func (et *EnrollerT) processRequest(rb *rollback.Rollback, zlog zerolog.Logger, return et._enroll(r.Context(), rb, zlog, req, erec.PolicyID, ver) } -func (et *EnrollerT) _enroll( - ctx context.Context, - rb *rollback.Rollback, - zlog zerolog.Logger, - req *EnrollRequest, - policyID, - ver string) (*EnrollResponse, error) { +func (et *EnrollerT) _enroll(ctx context.Context, rb *rollback.Rollback, zlog zerolog.Logger, req *EnrollRequest, policyID, ver string) (*EnrollResponse, error) { if req.SharedID != "" { // TODO: Support pre-existing install @@ -432,7 +427,7 @@ func generateAccessAPIKey(ctx context.Context, bulk bulk.Bulk, agentID string) ( agentID, "", []byte(kFleetAccessRolesJSON), - apikey.NewMetadata(agentID, "", apikey.TypeAccess), + apikey.NewMetadata(agentID, apikey.TypeAccess), ) } diff --git a/internal/pkg/apikey/apikey.go b/internal/pkg/apikey/apikey.go index 4134f2b0d..4924a647b 100644 --- a/internal/pkg/apikey/apikey.go +++ b/internal/pkg/apikey/apikey.go @@ -6,18 +6,12 @@ package apikey import ( - "bytes" - "context" "encoding/base64" - "encoding/json" "errors" "fmt" "net/http" "strings" "unicode/utf8" - - "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" ) const ( @@ -34,61 +28,6 @@ var ( var AuthKey = http.CanonicalHeaderKey("Authorization") -// APIKeyMetadata tracks Metadata associated with an APIKey. -type APIKeyMetadata struct { - ID string - Metadata Metadata -} - -// Read gathers APIKeyMetadata from Elasticsearch using the given client. -func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) { - opts := []func(*esapi.SecurityGetAPIKeyRequest){ - client.Security.GetAPIKey.WithContext(ctx), - client.Security.GetAPIKey.WithID(id), - } - - res, err := client.Security.GetAPIKey( - opts..., - ) - if err != nil { - return nil, fmt.Errorf("request to elasticsearch failed: %w", err) - } - defer res.Body.Close() - - if res.IsError() { - return nil, fmt.Errorf("%s: %w", res.String(), ErrAPIKeyNotFound) - } - - type APIKeyResponse struct { - ID string `json:"id"` - Metadata Metadata `json:"metadata"` - } - type GetAPIKeyResponse struct { - APIKeys []APIKeyResponse `json:"api_keys"` - } - - var buff bytes.Buffer - if _, err := buff.ReadFrom(res.Body); err != nil { - return nil, fmt.Errorf("could not read from response body: %w", err) - } - - var resp GetAPIKeyResponse - if err = json.Unmarshal(buff.Bytes(), &resp); err != nil { - return nil, fmt.Errorf( - "could not Unmarshal elasticsearch GetAPIKeyResponse: %w", err) - } - - if len(resp.APIKeys) == 0 { - return nil, ErrAPIKeyNotFound - } - - first := resp.APIKeys[0] - return &APIKeyMetadata{ - ID: first.ID, - Metadata: first.Metadata, - }, nil -} - // APIKey is used to represent an Elasticsearch API Key. type APIKey struct { ID string diff --git a/internal/pkg/apikey/apikey_integration_test.go b/internal/pkg/apikey/apikey_integration_test.go index 72f410d99..5c4e3b97c 100644 --- a/internal/pkg/apikey/apikey_integration_test.go +++ b/internal/pkg/apikey/apikey_integration_test.go @@ -30,7 +30,7 @@ const testFleetRoles = ` } ` -func TestRead(t *testing.T) { +func TestCreateAPIKeyWithMetadata(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() @@ -44,83 +44,44 @@ func TestRead(t *testing.T) { t.Fatal(err) } - // Try to get the key that doesn't exist, expect ErrApiKeyNotFound - _, err = Read(ctx, es, "0000000000000") - if !errors.Is(err, ErrAPIKeyNotFound) { - t.Errorf("Unexpected error type: %v", err) + // Create the key + agentID := uuid.Must(uuid.NewV4()).String() + name := uuid.Must(uuid.NewV4()).String() + akey, err := Create(ctx, es, name, "", "true", []byte(testFleetRoles), + NewMetadata(agentID, TypeAccess)) + if err != nil { + t.Fatal(err) } -} -func TestCreateAPIKeyWithMetadata(t *testing.T) { - tts := []struct { - name string - outputName string - }{ - {name: "with metadata.output_name", outputName: "a_output_name"}, - {name: "without metadata.output_name"}, + // Get the key and verify that metadata was saved correctly + aKeyMeta, err := Read(ctx, es, akey.ID) + if err != nil { + t.Fatal(err) } - for _, tt := range tts { - t.Run(tt.name, func(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - - cfg := elasticsearch.Config{ - Username: "elastic", - Password: "changeme", - } - - es, err := elasticsearch.NewClient(cfg) - if err != nil { - t.Fatal(err) - } - - // Create the API key - agentID := uuid.Must(uuid.NewV4()).String() - name := uuid.Must(uuid.NewV4()).String() - outputName := tt.outputName - apiKey, err := Create( - ctx, - es, - name, - "", - "true", - []byte(testFleetRoles), - NewMetadata(agentID, outputName, TypeAccess)) - if err != nil { - t.Fatal(err) - } - - // Get the API key and verify that the metadata was saved correctly - aKeyMeta, err := Read(ctx, es, apiKey.ID) - if err != nil { - t.Fatal(err) - } - - diff := cmp.Diff(ManagedByFleetServer, aKeyMeta.Metadata.ManagedBy) - if diff != "" { - t.Error(diff) - } + diff := cmp.Diff(ManagedByFleetServer, aKeyMeta.Metadata.ManagedBy) + if diff != "" { + t.Error(diff) + } - diff = cmp.Diff(true, aKeyMeta.Metadata.Managed) - if diff != "" { - t.Error(diff) - } + diff = cmp.Diff(true, aKeyMeta.Metadata.Managed) + if diff != "" { + t.Error(diff) + } - diff = cmp.Diff(agentID, aKeyMeta.Metadata.AgentID) - if diff != "" { - t.Error(diff) - } + diff = cmp.Diff(agentID, aKeyMeta.Metadata.AgentID) + if diff != "" { + t.Error(diff) + } - diff = cmp.Diff(outputName, aKeyMeta.Metadata.OutputName) - if diff != "" { - t.Error(diff) - } + diff = cmp.Diff(TypeAccess.String(), aKeyMeta.Metadata.Type) + if diff != "" { + t.Error(diff) + } - diff = cmp.Diff(TypeAccess.String(), aKeyMeta.Metadata.Type) - if diff != "" { - t.Error(diff) - } - }) + // Try to get the key that doesn't exists, expect ErrApiKeyNotFound + _, err = Read(ctx, es, "0000000000000") + if !errors.Is(err, ErrAPIKeyNotFound) { + t.Errorf("Unexpected error type: %v", err) } } diff --git a/internal/pkg/apikey/create.go b/internal/pkg/apikey/create.go index de61390c3..f3cee99f8 100644 --- a/internal/pkg/apikey/create.go +++ b/internal/pkg/apikey/create.go @@ -42,6 +42,7 @@ func Create(ctx context.Context, client *elasticsearch.Client, name, ttl, refres bytes.NewReader(body), opts..., ) + if err != nil { return nil, err } diff --git a/internal/pkg/apikey/get.go b/internal/pkg/apikey/get.go new file mode 100644 index 000000000..5d931c670 --- /dev/null +++ b/internal/pkg/apikey/get.go @@ -0,0 +1,68 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package apikey + +import ( + "context" + "encoding/json" + + "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/pkg/errors" +) + +// APIKetMetadata tracks Metadata associated with an APIKey. +type APIKeyMetadata struct { + ID string + Metadata Metadata +} + +// Read gathers APIKeyMetadata from Elasticsearch using the given client. +func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) { + + opts := []func(*esapi.SecurityGetAPIKeyRequest){ + client.Security.GetAPIKey.WithContext(ctx), + client.Security.GetAPIKey.WithID(id), + } + + res, err := client.Security.GetAPIKey( + opts..., + ) + + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.IsError() { + err = errors.Wrap(ErrAPIKeyNotFound, res.String()) + return nil, err + } + + type APIKeyResponse struct { + ID string `json:"id"` + Metadata Metadata `json:"metadata"` + } + type GetAPIKeyResponse struct { + APIKeys []APIKeyResponse `json:"api_keys"` + } + + var resp GetAPIKeyResponse + d := json.NewDecoder(res.Body) + if err = d.Decode(&resp); err != nil { + return nil, err + } + + if len(resp.APIKeys) == 0 { + return nil, ErrAPIKeyNotFound + } + + first := resp.APIKeys[0] + + return &APIKeyMetadata{ + ID: first.ID, + Metadata: first.Metadata, + }, nil +} diff --git a/internal/pkg/apikey/invalidate.go b/internal/pkg/apikey/invalidate.go index 6c5d5d304..421662388 100644 --- a/internal/pkg/apikey/invalidate.go +++ b/internal/pkg/apikey/invalidate.go @@ -38,6 +38,7 @@ func Invalidate(ctx context.Context, client *elasticsearch.Client, ids ...string bytes.NewReader(body), opts..., ) + if err != nil { return fmt.Errorf("InvalidateAPIKey: %w", err) } diff --git a/internal/pkg/apikey/metadata.go b/internal/pkg/apikey/metadata.go index d00380c01..c80997c7b 100644 --- a/internal/pkg/apikey/metadata.go +++ b/internal/pkg/apikey/metadata.go @@ -19,20 +19,18 @@ func (t Type) String() string { // Metadata is additional information associated with an APIKey. type Metadata struct { - AgentID string `json:"agent_id,omitempty"` - Managed bool `json:"managed,omitempty"` - ManagedBy string `json:"managed_by,omitempty"` - OutputName string `json:"output_name,omitempty"` - Type string `json:"type,omitempty"` + AgentID string `json:"agent_id,omitempty"` + Managed bool `json:"managed,omitempty"` + ManagedBy string `json:"managed_by,omitempty"` + Type string `json:"type,omitempty"` } // NewMetadata returns Metadata for the given agentID. -func NewMetadata(agentID string, outputName string, typ Type) Metadata { +func NewMetadata(agentID string, typ Type) Metadata { return Metadata{ - AgentID: agentID, - Managed: true, - ManagedBy: ManagedByFleetServer, - OutputName: outputName, - Type: typ.String(), + AgentID: agentID, + Managed: true, + ManagedBy: ManagedByFleetServer, + Type: typ.String(), } } diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index d47ba9592..50b2c47e0 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -7,7 +7,6 @@ package bulk import ( "bytes" "context" - "errors" "fmt" "time" @@ -188,6 +187,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { } res, err := req.Do(ctx, b.es) + if err != nil { log.Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do") return err @@ -217,18 +217,12 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { var blk bulkIndexerResponse blk.Items = make([]bulkStubItem, 0, queueCnt) - // TODO: We're loosing information abut the errors, we should check a way - // to return the full error ES returns if err = easyjson.Unmarshal(buf.Bytes(), &blk); err != nil { - log.Err(err). + log.Error(). + Err(err). Str("mod", kModBulk). - Msg("flushBulk failed, could not unmarshal ES response") - return fmt.Errorf("flushBulk failed, could not unmarshal ES response: %w", err) - } - if blk.HasErrors { - // We lack information to properly correlate this error with what has failed. - // Thus, for now it'd be more noise than information outside an investigation. - log.Debug().Err(errors.New(buf.String())).Msg("Bulk call: Es returned an error") + Msg("Unmarshal error") + return err } log.Trace(). diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 2242305f8..53870e58e 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -508,7 +508,7 @@ func runUnenroller(ctx context.Context, bulker bulk.Bulk, policyID string, unenr func runUnenrollerWork(ctx context.Context, bulker bulk.Bulk, policyID string, unenrollTimeout time.Duration, zlog zerolog.Logger, agentsIndex string) error { agents, err := dl.FindOfflineAgents(ctx, bulker, policyID, unenrollTimeout, dl.WithIndexName(agentsIndex)) - if err != nil { + if err != nil || len(agents) == 0 { return err } @@ -540,13 +540,11 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a dl.FieldUnenrolledReason: unenrolledReasonTimeout, dl.FieldUpdatedAt: now, } - body, err := fields.Marshal() if err != nil { return err } - - apiKeys := agent.APIKeyIDs() + apiKeys := getAPIKeyIDs(agent) zlog = zlog.With(). Str(logger.AgentID, agent.Id). @@ -569,6 +567,17 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a return err } +func getAPIKeyIDs(agent *model.Agent) []string { + keys := make([]string, 0, 1) + if agent.AccessAPIKeyID != "" { + keys = append(keys, agent.AccessAPIKeyID) + } + if agent.DefaultAPIKeyID != "" { + keys = append(keys, agent.DefaultAPIKeyID) + } + return keys +} + func waitWithContext(ctx context.Context, to time.Duration) error { t := time.NewTimer(to) defer t.Stop() diff --git a/internal/pkg/coordinator/monitor_integration_test.go b/internal/pkg/coordinator/monitor_integration_test.go index defc4a9c7..ffef699d1 100644 --- a/internal/pkg/coordinator/monitor_integration_test.go +++ b/internal/pkg/coordinator/monitor_integration_test.go @@ -159,7 +159,7 @@ func TestMonitorUnenroller(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, "", apikey.TypeAccess), + apikey.NewMetadata(agentID, apikey.TypeAccess), ) require.NoError(t, err) outputKey, err := bulker.APIKeyCreate( @@ -167,21 +167,20 @@ func TestMonitorUnenroller(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, "default", apikey.TypeAccess), + apikey.NewMetadata(agentID, apikey.TypeAccess), ) require.NoError(t, err) // add agent that should be unenrolled sixAgo := time.Now().UTC().Add(-6 * time.Minute) agentBody, err := json.Marshal(model.Agent{ - AccessAPIKeyID: accessKey.ID, - Outputs: map[string]*model.PolicyOutput{ - "default": {APIKeyID: outputKey.ID}}, - Active: true, - EnrolledAt: sixAgo.Format(time.RFC3339), - LastCheckin: sixAgo.Format(time.RFC3339), - PolicyID: policy1Id, - UpdatedAt: sixAgo.Format(time.RFC3339), + AccessAPIKeyID: accessKey.ID, + DefaultAPIKeyID: outputKey.ID, + Active: true, + EnrolledAt: sixAgo.Format(time.RFC3339), + LastCheckin: sixAgo.Format(time.RFC3339), + PolicyID: policy1Id, + UpdatedAt: sixAgo.Format(time.RFC3339), }) require.NoError(t, err) _, err = bulker.Create(ctx, agentsIndex, agentID, agentBody) @@ -307,7 +306,7 @@ func TestMonitorUnenrollerSetAndClear(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, "", apikey.TypeAccess), + apikey.NewMetadata(agentID, apikey.TypeAccess), ) require.NoError(t, err) outputKey, err := bulker.APIKeyCreate( @@ -315,7 +314,7 @@ func TestMonitorUnenrollerSetAndClear(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, "default", apikey.TypeAccess), + apikey.NewMetadata(agentID, apikey.TypeAccess), ) require.NoError(t, err) diff --git a/internal/pkg/dl/agent.go b/internal/pkg/dl/agent.go index a4871fa73..1d52082f7 100644 --- a/internal/pkg/dl/agent.go +++ b/internal/pkg/dl/agent.go @@ -6,7 +6,6 @@ package dl import ( "context" - "fmt" "time" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" @@ -49,23 +48,19 @@ func prepareOfflineAgentsByPolicyID() *dsl.Tmpl { return tmpl } -func FindAgent(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, name string, v interface{}, opt ...Option) (model.Agent, error) { +func FindAgent(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, name string, v interface{}, opt ...Option) (agent model.Agent, err error) { o := newOption(FleetAgents, opt...) res, err := SearchWithOneParam(ctx, bulker, tmpl, o.indexName, name, v) if err != nil { - return model.Agent{}, fmt.Errorf("failed searching for agent: %w", err) + return } if len(res.Hits) == 0 { - return model.Agent{}, ErrNotFound + return agent, ErrNotFound } - var agent model.Agent - if err = res.Hits[0].Unmarshal(&agent); err != nil { - return model.Agent{}, fmt.Errorf("could not unmarshal ES document into model.Agent: %w", err) - } - - return agent, nil + err = res.Hits[0].Unmarshal(&agent) + return agent, err } func FindOfflineAgents(ctx context.Context, bulker bulk.Bulk, policyID string, unenrollTimeout time.Duration, opt ...Option) ([]model.Agent, error) { @@ -76,19 +71,18 @@ func FindOfflineAgents(ctx context.Context, bulker bulk.Bulk, policyID string, u FieldLastCheckin: past, }) if err != nil { - return nil, fmt.Errorf("failed searching for agent: %w", err) + return nil, err } if len(res.Hits) == 0 { - return nil, ErrNotFound + return nil, nil } agents := make([]model.Agent, len(res.Hits)) for i, hit := range res.Hits { if err := hit.Unmarshal(&agents[i]); err != nil { - return nil, fmt.Errorf("could not unmarshal ES document into model.Agent: %w", err) + return nil, err } } - return agents, nil } diff --git a/internal/pkg/dl/agent_integration_test.go b/internal/pkg/dl/agent_integration_test.go index 3baab6c7e..4e65ddb94 100644 --- a/internal/pkg/dl/agent_integration_test.go +++ b/internal/pkg/dl/agent_integration_test.go @@ -108,48 +108,3 @@ func TestFindOfflineAgents(t *testing.T) { require.Len(t, agents, 2) assert.EqualValues(t, []string{twoDayOldID, threeDayOldID}, []string{agents[0].Id, agents[1].Id}) } - -func TestFindAgent_NewModel(t *testing.T) { - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) - - now := time.Now().UTC() - nowStr := now.Format(time.RFC3339) - - policyID := uuid.Must(uuid.NewV4()).String() - agentID := uuid.Must(uuid.NewV4()).String() - - wantOutputs := map[string]*model.PolicyOutput{ - "default": { - Type: "elasticsearch", - APIKey: "TestFindNewModelAgent_APIKey", - ToRetireAPIKeyIds: []model.ToRetireAPIKeyIdsItems{ - { - ID: "TestFindNewModelAgent_APIKeyID_invalidated", - RetiredAt: "TestFindNewModelAgent_APIKeyID_invalidated_at"}, - }, - APIKeyID: "TestFindNewModelAgent_APIKeyID", - PermissionsHash: "TestFindNewModelAgent_PermisPolicysionsHash", - }, - } - body, err := json.Marshal(model.Agent{ - PolicyID: policyID, - Active: true, - LastCheckin: nowStr, - LastCheckinStatus: "", - UpdatedAt: nowStr, - EnrolledAt: nowStr, - Outputs: wantOutputs, - }) - require.NoError(t, err) - - _, err = bulker.Create( - context.Background(), index, agentID, body, bulk.WithRefresh()) - require.NoError(t, err) - - agent, err := FindAgent( - context.Background(), bulker, QueryAgentByID, FieldID, agentID, WithIndexName(index)) - require.NoError(t, err) - - assert.Equal(t, agentID, agent.Id) - assert.Equal(t, wantOutputs, agent.Outputs) -} diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 034c23541..9fb0614da 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -27,22 +27,22 @@ const ( FieldMaxSeqNo = "max_seq_no" FieldActionSeqNo = "action_seq_no" - FieldActionID = "action_id" - FieldAgent = "agent" - FieldAgentVersion = "version" - FieldCoordinatorIdx = "coordinator_idx" - FieldLastCheckin = "last_checkin" - FieldLastCheckinStatus = "last_checkin_status" - FieldLocalMetadata = "local_metadata" - FieldPolicyCoordinatorIdx = "policy_coordinator_idx" - FieldPolicyID = "policy_id" - FieldPolicyOutputAPIKey = "api_key" - FieldPolicyOutputAPIKeyID = "api_key_id" - FieldPolicyOutputPermissionsHash = "permissions_hash" - FieldPolicyOutputToRetireAPIKeyIDs = "to_retire_api_key_ids" //nolint:gosec // false positive - FieldPolicyRevisionIdx = "policy_revision_idx" - FieldRevisionIdx = "revision_idx" - FieldUnenrolledReason = "unenrolled_reason" + FieldActionID = "action_id" + FieldPolicyID = "policy_id" + FieldRevisionIdx = "revision_idx" + FieldCoordinatorIdx = "coordinator_idx" + FieldLastCheckin = "last_checkin" + FieldLastCheckinStatus = "last_checkin_status" + FieldLocalMetadata = "local_metadata" + FieldPolicyRevisionIdx = "policy_revision_idx" + FieldPolicyCoordinatorIdx = "policy_coordinator_idx" + FieldDefaultAPIKey = "default_api_key" + FieldDefaultAPIKeyID = "default_api_key_id" //nolint:gosec // field name + FieldDefaultAPIKeyHistory = "default_api_key_history" //nolint:gosec // field name + FieldPolicyOutputPermissionsHash = "policy_output_permissions_hash" + FieldUnenrolledReason = "unenrolled_reason" + FieldAgentVersion = "version" + FieldAgent = "agent" FieldActive = "active" FieldUpdatedAt = "updated_at" diff --git a/internal/pkg/dl/migration.go b/internal/pkg/dl/migration.go index 4107dfd38..4beb26741 100644 --- a/internal/pkg/dl/migration.go +++ b/internal/pkg/dl/migration.go @@ -12,73 +12,59 @@ import ( "net/http" "time" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/dsl" + "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/pkg/errors" "github.com/rs/zerolog/log" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/dsl" ) -type ( - migrationFn func(context.Context, bulk.Bulk) error - migrationBodyFn func() (string, string, []byte, error) - migrationResponse struct { - Took int `json:"took"` - TimedOut bool `json:"timed_out"` - Total int `json:"total"` - Updated int `json:"updated"` - Deleted int `json:"deleted"` - Batches int `json:"batches"` - VersionConflicts int `json:"version_conflicts"` - Noops int `json:"noops"` - Retries struct { - Bulk int `json:"bulk"` - Search int `json:"search"` - } `json:"retries"` - Failures []json.RawMessage `json:"failures"` - } -) +func Migrate(ctx context.Context, bulker bulk.Bulk) error { + return migrateAgentMetadata(ctx, bulker) +} -// timeNow is used to get the current time. It should be replaced for testing. -var timeNow = time.Now +// FleetServer 7.15 added a new *AgentMetadata field to the Agent record. +// This field was populated in new enrollments in 7.15 and later; however, the +// change was not backported to support 7.14. The security team is reliant on the +// existence of this field in 7.16, so the following migration was added to +// support upgrade from 7.14. +// +// It is currently safe to run this in the background; albeit with some +// concern on conflicts. The conflict risk exists regardless as N Fleet Servers +// can be run in parallel at the same time. +// +// As the update only occurs once, the 99.9% case is a noop. +func migrateAgentMetadata(ctx context.Context, bulker bulk.Bulk) error { -// Migrate applies, in sequence, the migration functions. Currently, each migration -// function is responsible to ensure it only applies the migration if needed, -// being a no-op otherwise. -func Migrate(ctx context.Context, bulker bulk.Bulk) error { - for _, fn := range []migrationFn{migrateTov7_15, migrateToV8_5} { - if err := fn(ctx, bulker); err != nil { - return err - } - } + root := dsl.NewRoot() + root.Query().Bool().MustNot().Exists("agent.id") - return nil -} + painless := "ctx._source.agent = [:]; ctx._source.agent.id = ctx._id;" + root.Param("script", painless) -func migrate(ctx context.Context, bulker bulk.Bulk, fn migrationBodyFn) (int, error) { - var updatedDocs int - for { - name, index, body, err := fn() - if err != nil { - return updatedDocs, fmt.Errorf(": %w", err) - } + body, err := root.MarshalJSON() + if err != nil { + return err + } - resp, err := applyMigration(ctx, name, index, bulker, body) +LOOP: + for { + nConflicts, err := updateAgentMetadata(ctx, bulker, body) if err != nil { - return updatedDocs, fmt.Errorf("failed to apply migration %q: %w", - name, err) + return err } - updatedDocs += resp.Updated - if resp.VersionConflicts == 0 { - break + if nConflicts == 0 { + break LOOP } + + time.Sleep(time.Second) } - return updatedDocs, nil + return nil } -func applyMigration(ctx context.Context, name string, index string, bulker bulk.Bulk, body []byte) (migrationResponse, error) { +func updateAgentMetadata(ctx context.Context, bulker bulk.Bulk, body []byte) (int, error) { start := time.Now() client := bulker.Client() @@ -92,193 +78,59 @@ func applyMigration(ctx context.Context, name string, index string, bulker bulk. client.UpdateByQuery.WithConflicts("proceed"), } - res, err := client.UpdateByQuery([]string{index}, opts...) + res, err := client.UpdateByQuery([]string{FleetAgents}, opts...) + if err != nil { - return migrationResponse{}, err + return 0, err } if res.IsError() { if res.StatusCode == http.StatusNotFound { // Ignore index not created yet; nothing to upgrade - return migrationResponse{}, nil + return 0, nil } - return migrationResponse{}, fmt.Errorf("migrate %s UpdateByQuery failed: %s", - name, res.String()) + return 0, fmt.Errorf("Migrate UpdateByQuery %s", res.String()) } - resp := migrationResponse{} + resp := struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Total int `json:"total"` + Updated int `json:"updated"` + Deleted int `json:"deleted"` + Batches int `json:"batches"` + VersionConflicts int `json:"version_conflicts"` + Noops int `json:"noops"` + Retries struct { + Bulk int `json:"bulk"` + Search int `json:"search"` + } `json:"retries"` + Failures []json.RawMessage `json:"failures"` + }{} decoder := json.NewDecoder(res.Body) if err := decoder.Decode(&resp); err != nil { - return migrationResponse{}, errors.Wrap(err, "decode UpdateByQuery response") + return 0, errors.Wrap(err, "decode UpdateByQuery response") } log.Info(). - Str("fleet.migration.name", name). - Int("fleet.migration.es.took", resp.Took). - Bool("fleet.migration.es.timed_out", resp.TimedOut). - Int("fleet.migration.total", resp.Total). - Int("fleet.migration.updated", resp.Updated). - Int("fleet.migration.deleted", resp.Deleted). - Int("fleet.migration.batches", resp.Batches). - Int("fleet.migration.version_conflicts", resp.VersionConflicts). - Int("fleet.migration.noops", resp.Noops). - Int("fleet.migration.retries.bulk", resp.Retries.Bulk). - Int("fleet.migration.retries.search", resp.Retries.Search). - Dur("fleet.migration.total.duration", time.Since(start)). - Msgf("migration %s done", name) + Int("took", resp.Took). + Bool("timed_out", resp.TimedOut). + Int("total", resp.Total). + Int("updated", resp.Updated). + Int("deleted", resp.Deleted). + Int("batches", resp.Batches). + Int("version_conflicts", resp.VersionConflicts). + Int("noops", resp.Noops). + Int("retries.bulk", resp.Retries.Bulk). + Int("retries.search", resp.Retries.Search). + Dur("rtt", time.Since(start)). + Msg("migrate agent records response") for _, fail := range resp.Failures { - log.Error().RawJSON("failure", fail).Msgf("failed applying %s migration", name) - } - - return resp, err -} - -// ============================== V7.15 migration ============================== -func migrateTov7_15(ctx context.Context, bulker bulk.Bulk) error { - log.Debug().Msg("applying migration to v7.15") - _, err := migrate(ctx, bulker, migrateAgentMetadata) - if err != nil { - return fmt.Errorf("v7.15.0 data migration failed: %w", err) - } - - return nil -} - -// FleetServer 7.15 added a new *AgentMetadata field to the Agent record. -// This field was populated in new enrollments in 7.15 and later; however, the -// change was not backported to support 7.14. The security team is reliant on the -// existence of this field in 7.16, so the following migration was added to -// support upgrade from 7.14. -// -// It is currently safe to run this in the background; albeit with some -// concern on conflicts. The conflict risk exists regardless as N Fleet Servers -// can be run in parallel at the same time. -// -// As the update only occurs once, the 99.9% case is a noop. -func migrateAgentMetadata() (string, string, []byte, error) { - const migrationName = "AgentMetadata" - query := dsl.NewRoot() - query.Query().Bool().MustNot().Exists("agent.id") - - painless := "ctx._source.agent = [:]; ctx._source.agent.id = ctx._id;" - query.Param("script", painless) - - body, err := query.MarshalJSON() - if err != nil { - return migrationName, FleetAgents, nil, fmt.Errorf("could not marshal ES query: %w", err) - } - - return migrationName, FleetAgents, body, nil -} - -// ============================== V8.5.0 migration ============================= -// https://github.com/elastic/fleet-server/issues/1672 - -func migrateToV8_5(ctx context.Context, bulker bulk.Bulk) error { - log.Debug().Msg("applying migration to v8.5.0") - migrated, err := migrate(ctx, bulker, migrateAgentOutputs) - if err != nil { - return fmt.Errorf("v8.5.0 data migration failed: %w", err) - } - - // The migration was necessary and indeed run, thus we need to regenerate - // the API keys for all agents. In order to do so, we increase the policy - // coordinator index to force a policy update. - if migrated > 0 { - _, err := migrate(ctx, bulker, migratePolicyCoordinatorIdx) - if err != nil { - return fmt.Errorf("v8.5.0 data migration failed: %w", err) - } - } - - return nil -} - -// migrateAgentOutputs performs the necessary changes on the Agent documents -// to introduce the `Outputs` field. -// -// FleetServer 8.5.0 introduces a new field to the Agent document, Outputs, to -// store the outputs credentials and data. The DefaultAPIKey, DefaultAPIKeyID, -// DefaultAPIKeyHistory and PolicyOutputPermissionsHash are now deprecated in -// favour of the new `Outputs` fields, which maps the output name to its data. -// This change fixes https://github.com/elastic/fleet-server/issues/1672. -// -// The change is backward compatible as the deprecated fields are just set to -// their zero value and an older version of FleetServer can repopulate them. -// However, reverting FleetServer to an older version might cause very issue -// this change fixes. -func migrateAgentOutputs() (string, string, []byte, error) { - const ( - migrationName = "AgentOutputs" - fieldOutputs = "outputs" - fieldRetiredAt = "retiredAt" - ) - - query := dsl.NewRoot() - query.Query().Bool().MustNot().Exists(fieldOutputs) - - fields := map[string]interface{}{fieldRetiredAt: timeNow().UTC().Format(time.RFC3339)} - painless := ` -// set up the new fields -ctx._source['` + fieldOutputs + `']=new HashMap(); -ctx._source['` + fieldOutputs + `']['default']=new HashMap(); -ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids=new ArrayList(); - -// copy 'default_api_key_history' to new 'outputs' field -ctx._source['` + fieldOutputs + `']['default'].type="elasticsearch"; -if (ctx._source.default_api_key_history != null && ctx._source.default_api_key_history.length > 0) { - ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids=ctx._source.default_api_key_history; -} - -Map map = new HashMap(); -map.put("retired_at", params.` + fieldRetiredAt + `); -map.put("id", ctx._source.default_api_key_id); - -// Make current API key empty, so fleet-server will generate a new one -// Add current API jey to be retired -ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids.add(map); -ctx._source['` + fieldOutputs + `']['default'].api_key=""; -ctx._source['` + fieldOutputs + `']['default'].api_key_id=""; -ctx._source['` + fieldOutputs + `']['default'].permissions_hash=ctx._source.policy_output_permissions_hash; - -// Erase deprecated fields -ctx._source.default_api_key_history=null; -ctx._source.default_api_key=""; -ctx._source.default_api_key_id=""; -ctx._source.policy_output_permissions_hash=""; -` - query.Param("script", map[string]interface{}{ - "lang": "painless", - "source": painless, - "params": fields, - }) - - body, err := query.MarshalJSON() - if err != nil { - return migrationName, FleetAgents, nil, fmt.Errorf("could not marshal ES query: %w", err) - } - - return migrationName, FleetAgents, body, nil -} - -// migratePolicyCoordinatorIdx increases the policy's CoordinatorIdx to force -// a policy update ensuring the output data will be migrated to the new -// Agent.Outputs field. See migrateAgentOutputs and https://github.com/elastic/fleet-server/issues/1672 -// for details. -func migratePolicyCoordinatorIdx() (string, string, []byte, error) { - const migrationName = "PolicyCoordinatorIdx" - - query := dsl.NewRoot() - query.Query().MatchAll() - query.Param("script", `ctx._source.coordinator_idx++;`) - - body, err := query.MarshalJSON() - if err != nil { - return migrationName, FleetPolicies, nil, fmt.Errorf("could not marshal ES query: %w", err) + log.Error().RawJSON("failure", fail).Msg("migration failure") } - return migrationName, FleetPolicies, body, nil + return resp.VersionConflicts, err } diff --git a/internal/pkg/dl/migration_integration_test.go b/internal/pkg/dl/migration_integration_test.go deleted file mode 100644 index 183502b94..000000000 --- a/internal/pkg/dl/migration_integration_test.go +++ /dev/null @@ -1,295 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package dl - -import ( - "context" - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/gofrs/uuid" - "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/model" - ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" -) - -const nowStr = "2022-08-12T16:50:05Z" - -func createSomeAgents(t *testing.T, n int, apiKey bulk.APIKey, index string, bulker bulk.Bulk) []string { - t.Helper() - - var createdAgents []string - - for i := 0; i < n; i++ { - outputAPIKey := bulk.APIKey{ - ID: fmt.Sprint(apiKey.ID, i), - Key: fmt.Sprint(apiKey.Key, i), - } - - agentID := uuid.Must(uuid.NewV4()).String() - policyID := uuid.Must(uuid.NewV4()).String() - - agentModel := model.Agent{ - PolicyID: policyID, - Active: true, - LastCheckin: nowStr, - LastCheckinStatus: "", - UpdatedAt: nowStr, - EnrolledAt: nowStr, - DefaultAPIKeyID: outputAPIKey.ID, - DefaultAPIKey: outputAPIKey.Agent(), - PolicyOutputPermissionsHash: fmt.Sprint("a_output_permission_SHA_", i), - DefaultAPIKeyHistory: []model.ToRetireAPIKeyIdsItems{ - { - ID: "old_" + outputAPIKey.ID, - RetiredAt: nowStr, - }, - }, - } - - body, err := json.Marshal(agentModel) - require.NoError(t, err) - - _, err = bulker.Create( - context.Background(), index, agentID, body, bulk.WithRefresh()) - require.NoError(t, err) - - createdAgents = append(createdAgents, agentID) - } - - return createdAgents -} - -func createSomePolicies(t *testing.T, n int, index string, bulker bulk.Bulk) []string { - t.Helper() - - var created []string - - for i := 0; i < n; i++ { - now := time.Now().UTC() - nowStr := now.Format(time.RFC3339) - - policyModel := model.Policy{ - ESDocument: model.ESDocument{}, - CoordinatorIdx: int64(i), - Data: nil, - DefaultFleetServer: false, - PolicyID: fmt.Sprint(i), - RevisionIdx: 1, - Timestamp: nowStr, - UnenrollTimeout: 0, - } - - body, err := json.Marshal(policyModel) - require.NoError(t, err) - - policyDocID, err := bulker.Create( - context.Background(), index, "", body, bulk.WithRefresh()) - require.NoError(t, err) - - created = append(created, policyDocID) - } - - return created -} - -func TestPolicyCoordinatorIdx(t *testing.T) { - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetPolicies) - - docIDs := createSomePolicies(t, 25, index, bulker) - - migrated, err := migrate(context.Background(), bulker, migratePolicyCoordinatorIdx) - require.NoError(t, err) - - require.Equal(t, len(docIDs), migrated) - - for i := range docIDs { - policies, err := QueryLatestPolicies( - context.Background(), bulker, WithIndexName(index)) - if err != nil { - assert.NoError(t, err, "failed to query latest policies") // we want to continue even if a single agent fails - continue - } - - var got model.Policy - for _, p := range policies { - if p.PolicyID == fmt.Sprint(i) { - got = p - } - } - - assert.Equal(t, int64(i+1), got.CoordinatorIdx) - } -} - -func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) { - now, err := time.Parse(time.RFC3339, nowStr) - require.NoError(t, err, "could not parse time "+nowStr) - timeNow = func() time.Time { - return now - } - - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) - apiKey := bulk.APIKey{ - ID: "testAgent_", - Key: "testAgent_key_", - } - - agentIDs := createSomeAgents(t, 25, apiKey, index, bulker) - - migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) - require.NoError(t, err) - - assert.Equal(t, len(agentIDs), migratedAgents) - - for i, id := range agentIDs { - wantOutputType := "elasticsearch" //nolint:goconst // test cases have some duplication - - got, err := FindAgent( - context.Background(), bulker, QueryAgentByID, FieldID, id, WithIndexName(index)) - if err != nil { - assert.NoError(t, err, "failed to find agent ID %q", id) // we want to continue even if a single agent fails - continue - } - - wantToRetireAPIKeyIds := []model.ToRetireAPIKeyIdsItems{ - { - // Current API should be marked to retire after the migration - ID: fmt.Sprintf("%s%d", apiKey.ID, i), - RetiredAt: timeNow().UTC().Format(time.RFC3339)}, - { - ID: fmt.Sprintf("old_%s%d", apiKey.ID, i), - RetiredAt: nowStr}, - } - - // Assert new fields - require.Len(t, got.Outputs, 1) - // Default API key is empty to force fleet-server to regenerate them. - assert.Empty(t, got.Outputs["default"].APIKey) - assert.Empty(t, got.Outputs["default"].APIKeyID) - - assert.Equal(t, wantOutputType, got.Outputs["default"].Type) - assert.Equal(t, - fmt.Sprint("a_output_permission_SHA_", i), - got.Outputs["default"].PermissionsHash) - - // Assert ToRetireAPIKeyIds contains the expected values, regardless of the order. - for _, want := range wantToRetireAPIKeyIds { - var found bool - for _, got := range got.Outputs["default"].ToRetireAPIKeyIds { - found = found || cmp.Equal(want, got) - } - if !found { - t.Errorf("could not find %#v, in %#v", - want, got.Outputs["default"].ToRetireAPIKeyIds) - } - } - - // Assert deprecated fields - assert.Empty(t, got.DefaultAPIKey) - assert.Empty(t, got.DefaultAPIKey) - assert.Empty(t, got.PolicyOutputPermissionsHash) - assert.Nil(t, got.DefaultAPIKeyHistory) - } -} - -func TestMigrateOutputs_nil_DefaultAPIKeyHistory(t *testing.T) { - wantOutputType := "elasticsearch" //nolint:goconst // test cases have some duplication - - now, err := time.Parse(time.RFC3339, nowStr) - require.NoError(t, err, "could not parse time "+nowStr) - timeNow = func() time.Time { - return now - } - - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) - apiKey := bulk.APIKey{ - ID: "testAgent_", - Key: "testAgent_key_", - } - - i := 0 - outputAPIKey := bulk.APIKey{ - ID: fmt.Sprint(apiKey.ID, i), - Key: fmt.Sprint(apiKey.Key, i), - } - - agentID := uuid.Must(uuid.NewV4()).String() - policyID := uuid.Must(uuid.NewV4()).String() - - agentModel := model.Agent{ - PolicyID: policyID, - Active: true, - LastCheckin: nowStr, - LastCheckinStatus: "", - UpdatedAt: nowStr, - EnrolledAt: nowStr, - DefaultAPIKeyID: outputAPIKey.ID, - DefaultAPIKey: outputAPIKey.Agent(), - PolicyOutputPermissionsHash: fmt.Sprint("a_output_permission_SHA_", i), - } - - body, err := json.Marshal(agentModel) - require.NoError(t, err) - - _, err = bulker.Create( - context.Background(), index, agentID, body, bulk.WithRefresh()) - require.NoError(t, err) - - migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) - require.NoError(t, err) - - got, err := FindAgent( - context.Background(), bulker, QueryAgentByID, FieldID, agentID, WithIndexName(index)) - require.NoError(t, err, "failed to find agent ID %q", agentID) // we want to continue even if a single agent fails - - assert.Equal(t, 1, migratedAgents) - - // Assert new fields - require.Len(t, got.Outputs, 1) - // Default API key is empty to force fleet-server to regenerate them. - assert.Empty(t, got.Outputs["default"].APIKey) - assert.Empty(t, got.Outputs["default"].APIKeyID) - assert.Equal(t, wantOutputType, got.Outputs["default"].Type) - assert.Equal(t, - fmt.Sprint("a_output_permission_SHA_", i), - got.Outputs["default"].PermissionsHash) - - // Assert ToRetireAPIKeyIds contains the expected values, regardless of the order. - if assert.Len(t, got.Outputs["default"].ToRetireAPIKeyIds, 1) { - assert.Equal(t, - model.ToRetireAPIKeyIdsItems{ID: outputAPIKey.ID, RetiredAt: nowStr}, - got.Outputs["default"].ToRetireAPIKeyIds[0]) - } - - // Assert deprecated fields - assert.Empty(t, got.DefaultAPIKey) - assert.Empty(t, got.DefaultAPIKey) - assert.Empty(t, got.PolicyOutputPermissionsHash) - assert.Nil(t, got.DefaultAPIKeyHistory) -} - -func TestMigrateOutputs_no_agent_document(t *testing.T) { - now, err := time.Parse(time.RFC3339, nowStr) - require.NoError(t, err, "could not parse time "+nowStr) - timeNow = func() time.Time { - return now - } - - _, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) - - migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) - require.NoError(t, err) - - assert.Equal(t, 0, migratedAgents) -} diff --git a/internal/pkg/es/error.go b/internal/pkg/es/error.go index a5e575df5..79b07499c 100644 --- a/internal/pkg/es/error.go +++ b/internal/pkg/es/error.go @@ -37,25 +37,17 @@ func (e ErrElastic) Error() string { // Otherwise were getting: "elastic fail 404::" msg := "elastic fail " var b strings.Builder - b.Grow(len(msg) + 11 + len(e.Type) + len(e.Reason) + len(e.Cause.Type) + len(e.Cause.Reason)) + b.Grow(len(msg) + 5 + len(e.Type) + len(e.Reason)) b.WriteString(msg) b.WriteString(strconv.Itoa(e.Status)) if e.Type != "" { - b.WriteString(": ") + b.WriteString(":") b.WriteString(e.Type) } if e.Reason != "" { - b.WriteString(": ") + b.WriteString(":") b.WriteString(e.Reason) } - if e.Cause.Type != "" { - b.WriteString(": ") - b.WriteString(e.Cause.Type) - } - if e.Cause.Reason != "" { - b.WriteString(": ") - b.WriteString(e.Cause.Reason) - } return b.String() } @@ -91,8 +83,8 @@ func TranslateError(status int, e *ErrorT) error { Type string Reason string }{ - Type: e.Cause.Type, - Reason: e.Cause.Reason, + e.Cause.Type, + e.Cause.Reason, }, } } diff --git a/internal/pkg/model/ext.go b/internal/pkg/model/ext.go index 4a11bbe08..d89787855 100644 --- a/internal/pkg/model/ext.go +++ b/internal/pkg/model/ext.go @@ -27,36 +27,14 @@ func (m *Server) SetTime(t time.Time) { } // CheckDifferentVersion returns Agent version if it is different from ver, otherwise return empty string -func (a *Agent) CheckDifferentVersion(ver string) string { - if a == nil { +func (m *Agent) CheckDifferentVersion(ver string) string { + if m == nil { return "" } - if a.Agent == nil || ver != a.Agent.Version { + if m.Agent == nil || ver != m.Agent.Version { return ver } return "" } - -// APIKeyIDs returns all the API keys, the valid, in-use as well as the one -// marked to be retired. -func (a *Agent) APIKeyIDs() []string { - if a == nil { - return nil - } - keys := make([]string, 0, len(a.Outputs)+1) - if a.AccessAPIKeyID != "" { - keys = append(keys, a.AccessAPIKeyID) - } - - for _, output := range a.Outputs { - keys = append(keys, output.APIKeyID) - for _, key := range output.ToRetireAPIKeyIds { - keys = append(keys, key.ID) - } - } - - return keys - -} diff --git a/internal/pkg/model/ext_test.go b/internal/pkg/model/ext_test.go index 527570270..e48194b30 100644 --- a/internal/pkg/model/ext_test.go +++ b/internal/pkg/model/ext_test.go @@ -2,13 +2,15 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. +//go:build !integration +// +build !integration + package model import ( "testing" "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" ) func TestAgentGetNewVersion(t *testing.T) { @@ -83,54 +85,3 @@ func TestAgentGetNewVersion(t *testing.T) { }) } } - -func TestAgentAPIKeyIDs(t *testing.T) { - tcs := []struct { - name string - agent Agent - want []string - }{ - { - name: "no API key marked to be retired", - agent: Agent{ - AccessAPIKeyID: "access_api_key_id", - Outputs: map[string]*PolicyOutput{ - "p1": {APIKeyID: "p1_api_key_id"}, - "p2": {APIKeyID: "p2_api_key_id"}, - }, - }, - want: []string{"access_api_key_id", "p1_api_key_id", "p2_api_key_id"}, - }, - { - name: "with API key marked to be retired", - agent: Agent{ - AccessAPIKeyID: "access_api_key_id", - Outputs: map[string]*PolicyOutput{ - "p1": { - APIKeyID: "p1_api_key_id", - ToRetireAPIKeyIds: []ToRetireAPIKeyIdsItems{{ - ID: "p1_to_retire_key", - }}}, - "p2": { - APIKeyID: "p2_api_key_id", - ToRetireAPIKeyIds: []ToRetireAPIKeyIdsItems{{ - ID: "p2_to_retire_key", - }}}, - }, - }, - want: []string{ - "access_api_key_id", "p1_api_key_id", "p2_api_key_id", - "p1_to_retire_key", "p2_to_retire_key"}, - }, - } - - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - got := tc.agent.APIKeyIDs() - - // if A contains B and B contains A => A = B - assert.Subset(t, tc.want, got) - assert.Subset(t, got, tc.want) - }) - } -} diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 5d3c844f1..9fb0d07fc 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -124,13 +124,13 @@ type Agent struct { Active bool `json:"active"` Agent *AgentMetadata `json:"agent,omitempty"` - // Deprecated. Use Outputs instead. API key the Elastic Agent uses to authenticate with elasticsearch + // API key the Elastic Agent uses to authenticate with elasticsearch DefaultAPIKey string `json:"default_api_key,omitempty"` - // Deprecated. Use Outputs instead. Default API Key History - DefaultAPIKeyHistory []ToRetireAPIKeyIdsItems `json:"default_api_key_history,omitempty"` + // Default API Key History + DefaultAPIKeyHistory []DefaultAPIKeyHistoryItems `json:"default_api_key_history,omitempty"` - // Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch + // ID of the API key the Elastic Agent uses to authenticate with elasticsearch DefaultAPIKeyID string `json:"default_api_key_id,omitempty"` // Date/time the Elastic Agent enrolled @@ -148,9 +148,6 @@ type Agent struct { // Local metadata information for the Elastic Agent LocalMetadata json.RawMessage `json:"local_metadata,omitempty"` - // Outputs is the policy output data, mapping the output name to its data - Outputs map[string]*PolicyOutput `json:"outputs,omitempty"` - // Packages array Packages []string `json:"packages,omitempty"` @@ -160,7 +157,7 @@ type Agent struct { // The policy ID for the Elastic Agent PolicyID string `json:"policy_id,omitempty"` - // Deprecated. Use Outputs instead. The policy output permissions hash + // The policy output permissions hash PolicyOutputPermissionsHash string `json:"policy_output_permissions_hash,omitempty"` // The current policy revision_idx for the Elastic Agent @@ -253,6 +250,16 @@ type Body struct { type Data struct { } +// DefaultAPIKeyHistoryItems +type DefaultAPIKeyHistoryItems struct { + + // API Key identifier + ID string `json:"id,omitempty"` + + // Date/time the API key was retired + RetiredAt string `json:"retired_at,omitempty"` +} + // EnrollmentAPIKey An Elastic Agent enrollment API key type EnrollmentAPIKey struct { ESDocument @@ -329,26 +336,6 @@ type PolicyLeader struct { Timestamp string `json:"@timestamp,omitempty"` } -// PolicyOutput holds the needed data to manage the output API keys -type PolicyOutput struct { - ESDocument - - // API key the Elastic Agent uses to authenticate with elasticsearch - APIKey string `json:"api_key"` - - // ID of the API key the Elastic Agent uses to authenticate with elasticsearch - APIKeyID string `json:"api_key_id"` - - // The policy output permissions hash - PermissionsHash string `json:"permissions_hash"` - - // API keys to be invalidated on next agent ack - ToRetireAPIKeyIds []ToRetireAPIKeyIdsItems `json:"to_retire_api_key_ids,omitempty"` - - // Type is the output type. Currently only Elasticsearch is supported. - Type string `json:"type"` -} - // Server A Fleet Server type Server struct { ESDocument @@ -370,16 +357,6 @@ type ServerMetadata struct { Version string `json:"version"` } -// ToRetireAPIKeyIdsItems the Output API Keys that were replaced and should be retired -type ToRetireAPIKeyIdsItems struct { - - // API Key identifier - ID string `json:"id,omitempty"` - - // Date/time the API key was retired - RetiredAt string `json:"retired_at,omitempty"` -} - // UserProvidedMetadata User provided metadata information for the Elastic Agent type UserProvidedMetadata struct { } diff --git a/internal/pkg/policy/parsed_policy.go b/internal/pkg/policy/parsed_policy.go index 029298ef5..dbf5d3801 100644 --- a/internal/pkg/policy/parsed_policy.go +++ b/internal/pkg/policy/parsed_policy.go @@ -42,7 +42,7 @@ type ParsedPolicy struct { Policy model.Policy Fields map[string]json.RawMessage Roles RoleMapT - Outputs map[string]Output + Outputs map[string]PolicyOutput Default ParsedPolicyDefaults } @@ -91,8 +91,8 @@ func NewParsedPolicy(p model.Policy) (*ParsedPolicy, error) { return pp, nil } -func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) (map[string]Output, error) { - result := make(map[string]Output) +func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) (map[string]PolicyOutput, error) { + result := make(map[string]PolicyOutput) outputsMap, err := smap.Parse(outputsRaw) if err != nil { @@ -102,7 +102,7 @@ func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) for k := range outputsMap { v := outputsMap.GetMap(k) - p := Output{ + p := PolicyOutput{ Name: k, Type: v.GetString(FieldOutputType), } @@ -126,13 +126,13 @@ func parsePerms(permsRaw json.RawMessage) (RoleMapT, error) { // iterate across the keys m := make(RoleMapT, len(permMap)) for k := range permMap { + v := permMap.GetMap(k) if v != nil { var r RoleT // Stable hash on permissions payload - // permission hash created here if r.Sha2, err = v.Hash(); err != nil { return nil, err } diff --git a/internal/pkg/policy/parsed_policy_test.go b/internal/pkg/policy/parsed_policy_test.go index 32ef271a7..547cfcf7a 100644 --- a/internal/pkg/policy/parsed_policy_test.go +++ b/internal/pkg/policy/parsed_policy_test.go @@ -13,6 +13,7 @@ import ( ) func TestNewParsedPolicy(t *testing.T) { + // Run two formatting of the same payload to validate that the sha2 remains the same payloads := []string{ testPolicy, diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index c2728aa1e..8115d22ec 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -32,173 +32,118 @@ var ( ErrFailInjectAPIKey = errors.New("fail inject api key") ) -type Output struct { +type PolicyOutput struct { Name string Type string Role *RoleT } -// Prepare prepares the output p to be sent to the elastic-agent -// The agent might be mutated for an elasticsearch output -func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, outputMap smap.Map) error { - zlog = zlog.With(). - Str("fleet.agent.id", agent.Id). - Str("fleet.policy.output.name", p.Name).Logger() - +func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, outputMap smap.Map) error { switch p.Type { case OutputTypeElasticsearch: zlog.Debug().Msg("preparing elasticsearch output") - if err := p.prepareElasticsearch(ctx, zlog, bulker, agent, outputMap); err != nil { - return fmt.Errorf("failed to prepare elasticsearch output %q: %w", p.Name, err) - } - case OutputTypeLogstash: - zlog.Debug().Msg("preparing logstash output") - zlog.Info().Msg("no actions required for logstash output preparation") - default: - zlog.Error().Msgf("unknown output type: %s; skipping preparation", p.Type) - return fmt.Errorf("encountered unexpected output type while preparing outputs: %s", p.Type) - } - return nil -} - -func (p *Output) prepareElasticsearch( - ctx context.Context, - zlog zerolog.Logger, - bulker bulk.Bulk, - agent *model.Agent, - outputMap smap.Map) error { - // The role is required to do api key management - if p.Role == nil { - zlog.Error(). - Msg("policy does not contain required output permission section") - return ErrNoOutputPerms - } - output, ok := agent.Outputs[p.Name] - if !ok { - if agent.Outputs == nil { - agent.Outputs = map[string]*model.PolicyOutput{} + // The role is required to do api key management + if p.Role == nil { + zlog.Error().Str("name", p.Name).Msg("policy does not contain required output permission section") + return ErrNoOutputPerms } - zlog.Debug().Msgf("creating agent.Outputs[%s]", p.Name) - output = &model.PolicyOutput{} - agent.Outputs[p.Name] = output - } + // Determine whether we need to generate an output ApiKey. + // This is accomplished by comparing the sha2 hash stored in the agent + // record with the precalculated sha2 hash of the role. + + // Note: This will need to be updated when doing multi-cluster elasticsearch support + // Currently, we only have access to the token for the elasticsearch instance fleet-server + // is monitors. When updating for multiple ES instances we need to tie the token to the output. + needNewKey := true + switch { + case agent.DefaultAPIKey == "": + zlog.Debug().Msg("must generate api key as default API key is not present") + case p.Role.Sha2 != agent.PolicyOutputPermissionsHash: + zlog.Debug().Msg("must generate api key as policy output permissions changed") + default: + needNewKey = false + zlog.Debug().Msg("policy output permissions are the same") + } - // Determine whether we need to generate an output ApiKey. - // This is accomplished by comparing the sha2 hash stored in the corresponding - // output in the agent record with the precalculated sha2 hash of the role. + if needNewKey { + zlog.Debug(). + RawJSON("roles", p.Role.Raw). + Str("oldHash", agent.PolicyOutputPermissionsHash). + Str("newHash", p.Role.Sha2). + Msg("Generating a new API key") + + outputAPIKey, err := generateOutputAPIKey(ctx, bulker, agent.Id, p.Name, p.Role.Raw) + if err != nil { + zlog.Error().Err(err).Msg("fail generate output key") + return err + } - // Note: This will need to be updated when doing multi-cluster elasticsearch support - // Currently, we assume all ES outputs are the same ES fleet-server is connected to. - needNewKey := true - switch { - case output.APIKey == "": - zlog.Debug().Msg("must generate api key as default API key is not present") - case p.Role.Sha2 != output.PermissionsHash: - // the is actually the OutputPermissionsHash for the default hash. The Agent - // document on ES does not have OutputPermissionsHash for any other output - // besides the default one. It seems to me error-prone to rely on the default - // output permissions hash to generate new API keys for other outputs. - zlog.Debug().Msg("must generate api key as policy output permissions changed") - default: - needNewKey = false - zlog.Debug().Msg("policy output permissions are the same") - } + agent.DefaultAPIKey = outputAPIKey.Agent() - if needNewKey { - zlog.Debug(). - RawJSON("fleet.policy.roles", p.Role.Raw). - Str("fleet.policy.default.oldHash", output.PermissionsHash). - Str("fleet.policy.default.newHash", p.Role.Sha2). - Msg("Generating a new API key") + // When a new keys is generated we need to update the Agent record, + // this will need to be updated when multiples Elasticsearch output + // are used. + zlog.Info(). + Str("hash.sha256", p.Role.Sha2). + Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). + Msg("Updating agent record to pick up default output key.") - ctx := zlog.WithContext(ctx) - outputAPIKey, err := - generateOutputAPIKey(ctx, bulker, agent.Id, p.Name, p.Role.Raw) - if err != nil { - return fmt.Errorf("failed generate output API key: %w", err) - } - - output.Type = OutputTypeElasticsearch - output.APIKey = outputAPIKey.Agent() - output.APIKeyID = outputAPIKey.ID - output.PermissionsHash = p.Role.Sha2 // for the sake of consistency + fields := map[string]interface{}{ + dl.FieldDefaultAPIKey: outputAPIKey.Agent(), + dl.FieldDefaultAPIKeyID: outputAPIKey.ID, + dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, + } + if agent.DefaultAPIKeyID != "" { + fields[dl.FieldDefaultAPIKeyHistory] = model.DefaultAPIKeyHistoryItems{ + ID: agent.DefaultAPIKeyID, + RetiredAt: time.Now().UTC().Format(time.RFC3339), + } + } - // When a new keys is generated we need to update the Agent record, - // this will need to be updated when multiples remote Elasticsearch output - // are supported. - zlog.Info(). - Str("fleet.policy.role.hash.sha256", p.Role.Sha2). - Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). - Msg("Updating agent record to pick up default output key.") + // Using painless script to append the old keys to the history + body, err := renderUpdatePainlessScript(fields) - fields := map[string]interface{}{ - dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(), - dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, - dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, - } - if output.APIKeyID != "" { - fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{ - ID: output.APIKeyID, - RetiredAt: time.Now().UTC().Format(time.RFC3339), + if err != nil { + return err } - } - // Using painless script to append the old keys to the history - body, err := renderUpdatePainlessScript(p.Name, fields) - if err != nil { - return fmt.Errorf("could no tupdate painless script: %w", err) + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return err + } } - if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { - zlog.Error().Err(err).Msg("fail update agent record") - return fmt.Errorf("fail update agent record: %w", err) + // Always insert the `api_key` as part of the output block, this is required + // because only fleet server knows the api key for the specific agent, if we don't + // add it the agent will not receive the `api_key` and will not be able to connect + // to Elasticsearch. + // + // We need to investigate allocation with the new LS output, we had optimization + // in place to reduce number of agent policy allocation when sending the updated + // agent policy to multiple agents. + // See: https://github.com/elastic/fleet-server/issues/1301 + if ok := setMapObj(outputMap, agent.DefaultAPIKey, p.Name, "api_key"); !ok { + return ErrFailInjectAPIKey } + case OutputTypeLogstash: + zlog.Debug().Msg("preparing logstash output") + zlog.Info().Msg("no actions required for logstash output preparation") + default: + zlog.Error().Msgf("unknown output type: %s; skipping preparation", p.Type) + return fmt.Errorf("encountered unexpected output type while preparing outputs: %s", p.Type) } - - // Always insert the `api_key` as part of the output block, this is required - // because only fleet server knows the api key for the specific agent, if we don't - // add it the agent will not receive the `api_key` and will not be able to connect - // to Elasticsearch. - // - // We need to investigate allocation with the new LS output, we had optimization - // in place to reduce number of agent policy allocation when sending the updated - // agent policy to multiple agents. - // See: https://github.com/elastic/fleet-server/issues/1301 - if err := setMapObj(outputMap, output.APIKey, p.Name, "api_key"); err != nil { - return err - } - return nil } -func renderUpdatePainlessScript(outputName string, fields map[string]interface{}) ([]byte, error) { +func renderUpdatePainlessScript(fields map[string]interface{}) ([]byte, error) { var source strings.Builder - - // prepare agent.elasticsearch_outputs[OUTPUT_NAME] - source.WriteString(fmt.Sprintf(` -if (ctx._source['outputs']==null) - {ctx._source['outputs']=new HashMap();} -if (ctx._source['outputs']['%s']==null) - {ctx._source['outputs']['%s']=new HashMap();} -`, outputName, outputName)) - for field := range fields { - if field == dl.FieldPolicyOutputToRetireAPIKeyIDs { - // dl.FieldPolicyOutputToRetireAPIKeyIDs is a special case. - // It's an array that gets deleted when the keys are invalidated. - // Thus, append the old API key ID, create the field if necessary. - source.WriteString(fmt.Sprintf(` -if (ctx._source['outputs']['%s'].%s==null) - {ctx._source['outputs']['%s'].%s=new ArrayList();} -ctx._source['outputs']['%s'].%s.add(params.%s); -`, outputName, field, outputName, field, outputName, field, field)) + if field == dl.FieldDefaultAPIKeyHistory { + source.WriteString(fmt.Sprint("if (ctx._source.", field, "==null) {ctx._source.", field, "=new ArrayList();} ctx._source.", field, ".add(params.", field, ");")) } else { - // Update the other fields - source.WriteString(fmt.Sprintf(` -ctx._source['outputs']['%s'].%s=params.%s;`, - outputName, field, field)) + source.WriteString(fmt.Sprint("ctx._source.", field, "=", "params.", field, ";")) } } @@ -213,45 +158,36 @@ ctx._source['outputs']['%s'].%s=params.%s;`, return body, err } -func generateOutputAPIKey( - ctx context.Context, - bulk bulk.Bulk, - agentID, - outputName string, - roles []byte) (*apikey.APIKey, error) { +func generateOutputAPIKey(ctx context.Context, bulk bulk.Bulk, agentID, outputName string, roles []byte) (*apikey.APIKey, error) { name := fmt.Sprintf("%s:%s", agentID, outputName) - zerolog.Ctx(ctx).Info().Msgf("generating output API key %s for agent ID %s", - name, agentID) return bulk.APIKeyCreate( ctx, name, "", roles, - apikey.NewMetadata(agentID, outputName, apikey.TypeOutput), + apikey.NewMetadata(agentID, apikey.TypeOutput), ) } -func setMapObj(obj map[string]interface{}, val interface{}, keys ...string) error { +func setMapObj(obj map[string]interface{}, val interface{}, keys ...string) bool { if len(keys) == 0 { - return fmt.Errorf("no key to be updated: %w", ErrFailInjectAPIKey) + return false } for _, k := range keys[:len(keys)-1] { v, ok := obj[k] if !ok { - return fmt.Errorf("no key %q not present on MapObj: %w", - k, ErrFailInjectAPIKey) + return false } obj, ok = v.(map[string]interface{}) if !ok { - return fmt.Errorf("cannot cast %T to map[string]interface{}: %w", - obj, ErrFailInjectAPIKey) + return false } } k := keys[len(keys)-1] obj[k] = val - return nil + return true } diff --git a/internal/pkg/policy/policy_output_integration_test.go b/internal/pkg/policy/policy_output_integration_test.go deleted file mode 100644 index 6acd0d9fa..000000000 --- a/internal/pkg/policy/policy_output_integration_test.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package policy - -import ( - "context" - "encoding/json" - "testing" - "time" - - "github.com/gofrs/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/dl" - "github.com/elastic/fleet-server/v7/internal/pkg/model" - ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" -) - -func TestRenderUpdatePainlessScript(t *testing.T) { - tts := []struct { - name string - - existingToRetireAPIKeyIds []model.ToRetireAPIKeyIdsItems - }{ - { - name: "to_retire_api_key_ids is empty", - }, - { - name: "to_retire_api_key_ids is not empty", - existingToRetireAPIKeyIds: []model.ToRetireAPIKeyIdsItems{{ - ID: "pre_existing_ID", RetiredAt: "pre_existing__RetiredAt"}}, - }, - } - - for _, tt := range tts { - t.Run(tt.name, func(t *testing.T) { - outputPermissionSha := "new_permissionSHA_" + tt.name - outputName := "output_" + tt.name - outputAPIKey := bulk.APIKey{ID: "new_ID", Key: "new-key"} - - index, bulker := ftesting.SetupCleanIndex(context.Background(), t, dl.FleetAgents) - - now := time.Now().UTC() - nowStr := now.Format(time.RFC3339) - - agentID := uuid.Must(uuid.NewV4()).String() - policyID := uuid.Must(uuid.NewV4()).String() - - previousAPIKey := bulk.APIKey{ - ID: "old_" + outputAPIKey.ID, - Key: "old_" + outputAPIKey.Key, - } - - wantOutputs := map[string]*model.PolicyOutput{ - outputName: { - APIKey: outputAPIKey.Agent(), - APIKeyID: outputAPIKey.ID, - PermissionsHash: outputPermissionSha, - Type: OutputTypeElasticsearch, - ToRetireAPIKeyIds: append(tt.existingToRetireAPIKeyIds, - model.ToRetireAPIKeyIdsItems{ - ID: previousAPIKey.ID, RetiredAt: nowStr}), - }, - } - - agentModel := model.Agent{ - PolicyID: policyID, - Active: true, - LastCheckin: nowStr, - LastCheckinStatus: "", - UpdatedAt: nowStr, - EnrolledAt: nowStr, - Outputs: map[string]*model.PolicyOutput{ - outputName: { - Type: OutputTypeElasticsearch, - APIKey: previousAPIKey.Agent(), - APIKeyID: previousAPIKey.ID, - PermissionsHash: "old_" + outputPermissionSha, - }, - }, - } - if tt.existingToRetireAPIKeyIds != nil { - agentModel.Outputs[outputName].ToRetireAPIKeyIds = - tt.existingToRetireAPIKeyIds - } - - body, err := json.Marshal(agentModel) - require.NoError(t, err) - - _, err = bulker.Create( - context.Background(), index, agentID, body, bulk.WithRefresh()) - require.NoError(t, err) - - fields := map[string]interface{}{ - dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(), - dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, - dl.FieldPolicyOutputPermissionsHash: outputPermissionSha, - dl.FieldPolicyOutputToRetireAPIKeyIDs: model.ToRetireAPIKeyIdsItems{ - ID: previousAPIKey.ID, RetiredAt: nowStr}, - } - - got, err := renderUpdatePainlessScript(outputName, fields) - require.NoError(t, err, "renderUpdatePainlessScript returned an unexpected error") - - err = bulker.Update(context.Background(), dl.FleetAgents, agentID, got) - require.NoError(t, err, "bulker.Update failed") - - // there is some refresh thing that needs time, I didn't manage to find - // how ot fix it at the requests to ES level, thus this timeout here. - time.Sleep(time.Second) - - gotAgent, err := dl.FindAgent( - context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) - require.NoError(t, err) - - assert.Equal(t, agentID, gotAgent.Id) - assert.Len(t, gotAgent.Outputs, len(wantOutputs)) - assert.Equal(t, wantOutputs, gotAgent.Outputs) - }) - } -} diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index d66275d04..1e90cee57 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -8,7 +8,6 @@ import ( "context" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -24,7 +23,7 @@ var TestPayload []byte func TestPolicyLogstashOutputPrepare(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := Output{ + po := PolicyOutput{ Type: OutputTypeLogstash, Name: "test output", Role: &RoleT{ @@ -40,7 +39,7 @@ func TestPolicyLogstashOutputPrepare(t *testing.T) { func TestPolicyLogstashOutputPrepareNoRole(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := Output{ + po := PolicyOutput{ Type: OutputTypeLogstash, Name: "test output", Role: nil, @@ -55,7 +54,7 @@ func TestPolicyLogstashOutputPrepareNoRole(t *testing.T) { func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := Output{ + po := PolicyOutput{ Type: OutputTypeLogstash, Name: "test output", Role: &RoleT{ @@ -72,7 +71,7 @@ func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) { func TestPolicyESOutputPrepareNoRole(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := Output{ + po := PolicyOutput{ Type: OutputTypeElasticsearch, Name: "test output", Role: nil, @@ -87,11 +86,8 @@ func TestPolicyOutputESPrepare(t *testing.T) { t.Run("Permission hash == Agent Permission Hash no need to regenerate the key", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - - apiKey := bulk.APIKey{ID: "test_id_existing", Key: "existing-key"} - hashPerm := "abc123" - output := Output{ + po := PolicyOutput{ Type: OutputTypeElasticsearch, Name: "test output", Role: &RoleT{ @@ -105,62 +101,29 @@ func TestPolicyOutputESPrepare(t *testing.T) { } testAgent := &model.Agent{ - Outputs: map[string]*model.PolicyOutput{ - output.Name: { - ESDocument: model.ESDocument{}, - APIKey: apiKey.Agent(), - ToRetireAPIKeyIds: nil, - APIKeyID: apiKey.ID, - PermissionsHash: hashPerm, - Type: OutputTypeElasticsearch, - }, - }, + DefaultAPIKey: "test_id:EXISTING-KEY", + PolicyOutputPermissionsHash: hashPerm, } - err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + err := po.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap(output.Name)["api_key"].(string) - gotOutput := testAgent.Outputs[output.Name] - - require.True(t, ok, "api key not present on policy map") - assert.Equal(t, apiKey.Agent(), key) - - assert.Equal(t, apiKey.Agent(), gotOutput.APIKey) - assert.Equal(t, apiKey.ID, gotOutput.APIKeyID) - assert.Equal(t, output.Role.Sha2, gotOutput.PermissionsHash) - assert.Equal(t, output.Type, gotOutput.Type) - assert.Empty(t, gotOutput.ToRetireAPIKeyIds) + key, ok := policyMap.GetMap("test output")["api_key"].(string) - // Old model must always remain empty - assert.Empty(t, testAgent.DefaultAPIKey) - assert.Empty(t, testAgent.DefaultAPIKeyID) - assert.Empty(t, testAgent.DefaultAPIKeyHistory) - assert.Empty(t, testAgent.PolicyOutputPermissionsHash) - - bulker.AssertNotCalled(t, "Update", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - bulker.AssertNotCalled(t, "APIKeyCreate", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + require.True(t, ok, "unable to case api key") + require.Equal(t, testAgent.DefaultAPIKey, key) + bulker.AssertNotCalled(t, "Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + bulker.AssertNotCalled(t, "APIKeyCreate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) bulker.AssertExpectations(t) }) t.Run("Permission hash != Agent Permission Hash need to regenerate the key", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() + bulker.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + bulker.On("APIKeyCreate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&bulk.APIKey{"abc", "new-key"}, nil).Once() //nolint:govet // test case - oldAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} - wantAPIKey := bulk.APIKey{ID: "abc", Key: "new-key"} - hashPerm := "old-HASH" - - bulker.On("Update", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(nil).Once() - bulker.On("APIKeyCreate", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(&wantAPIKey, nil).Once() - - output := Output{ + po := PolicyOutput{ Type: OutputTypeElasticsearch, Name: "test output", Role: &RoleT{ @@ -174,55 +137,27 @@ func TestPolicyOutputESPrepare(t *testing.T) { } testAgent := &model.Agent{ - Outputs: map[string]*model.PolicyOutput{ - output.Name: { - ESDocument: model.ESDocument{}, - APIKey: oldAPIKey.Agent(), - ToRetireAPIKeyIds: nil, - APIKeyID: oldAPIKey.ID, - PermissionsHash: hashPerm, - Type: OutputTypeElasticsearch, - }, - }, + DefaultAPIKey: "test_id:EXISTING-KEY", + PolicyOutputPermissionsHash: "old-HASH", } - err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + err := po.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap(output.Name)["api_key"].(string) - gotOutput := testAgent.Outputs[output.Name] + key, ok := policyMap.GetMap("test output")["api_key"].(string) require.True(t, ok, "unable to case api key") - require.Equal(t, wantAPIKey.Agent(), key) - - assert.Equal(t, wantAPIKey.Agent(), gotOutput.APIKey) - assert.Equal(t, wantAPIKey.ID, gotOutput.APIKeyID) - assert.Equal(t, output.Role.Sha2, gotOutput.PermissionsHash) - assert.Equal(t, output.Type, gotOutput.Type) - - // assert.Contains(t, gotOutput.ToRetireAPIKeyIds, oldAPIKey.ID) // TODO: assert on bulker.Update - - // Old model must always remain empty - assert.Empty(t, testAgent.DefaultAPIKey) - assert.Empty(t, testAgent.DefaultAPIKeyID) - assert.Empty(t, testAgent.DefaultAPIKeyHistory) - assert.Empty(t, testAgent.PolicyOutputPermissionsHash) - + require.Equal(t, "abc:new-key", key) bulker.AssertExpectations(t) }) t.Run("Generate API Key on new Agent", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - bulker.On("Update", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(nil).Once() - apiKey := bulk.APIKey{ID: "abc", Key: "new-key"} - bulker.On("APIKeyCreate", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(&apiKey, nil).Once() - - output := Output{ + bulker.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + bulker.On("APIKeyCreate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&bulk.APIKey{"abc", "new-key"}, nil).Once() //nolint:govet // test case + + po := PolicyOutput{ Type: OutputTypeElasticsearch, Name: "test output", Role: &RoleT{ @@ -235,29 +170,15 @@ func TestPolicyOutputESPrepare(t *testing.T) { "test output": map[string]interface{}{}, } - testAgent := &model.Agent{Outputs: map[string]*model.PolicyOutput{}} + testAgent := &model.Agent{} - err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + err := po.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap(output.Name)["api_key"].(string) - gotOutput := testAgent.Outputs[output.Name] + key, ok := policyMap.GetMap("test output")["api_key"].(string) require.True(t, ok, "unable to case api key") - assert.Equal(t, apiKey.Agent(), key) - - assert.Equal(t, apiKey.Agent(), gotOutput.APIKey) - assert.Equal(t, apiKey.ID, gotOutput.APIKeyID) - assert.Equal(t, output.Role.Sha2, gotOutput.PermissionsHash) - assert.Equal(t, output.Type, gotOutput.Type) - assert.Empty(t, gotOutput.ToRetireAPIKeyIds) - - // Old model must always remain empty - assert.Empty(t, testAgent.DefaultAPIKey) - assert.Empty(t, testAgent.DefaultAPIKeyID) - assert.Empty(t, testAgent.DefaultAPIKeyHistory) - assert.Empty(t, testAgent.PolicyOutputPermissionsHash) - + require.Equal(t, "abc:new-key", key) bulker.AssertExpectations(t) }) } diff --git a/internal/pkg/testing/esutil/bootstrap.go b/internal/pkg/testing/esutil/bootstrap.go index 978f95a75..e2aafce76 100644 --- a/internal/pkg/testing/esutil/bootstrap.go +++ b/internal/pkg/testing/esutil/bootstrap.go @@ -10,7 +10,7 @@ import ( "github.com/elastic/go-elasticsearch/v7" ) -// EnsureIndex sets up the index if it doesn't exist. It's utilized for integration tests at the moment. +// EnsureIndex sets up the index if it doesn't exists, utilized for integration tests at the moment func EnsureIndex(ctx context.Context, cli *elasticsearch.Client, name, mapping string) error { err := EnsureTemplate(ctx, cli, name, mapping, false) if err != nil { diff --git a/internal/pkg/testing/setup.go b/internal/pkg/testing/setup.go index 8f38ba7e6..8dac38cdc 100644 --- a/internal/pkg/testing/setup.go +++ b/internal/pkg/testing/setup.go @@ -98,7 +98,7 @@ func SetupCleanIndex(ctx context.Context, t *testing.T, index string, opts ...bu func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index string) string { t.Helper() - + t.Helper() tmpl := dsl.NewTmpl() root := dsl.NewRoot() root.Query().MatchAll() @@ -106,25 +106,25 @@ func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index strin query, err := q.Render(make(map[string]interface{})) if err != nil { - t.Fatalf("could not clean index: failed to render query template: %v", err) + t.Fatal(err) } cli := bulker.Client() - res, err := cli.API.DeleteByQuery([]string{index}, bytes.NewReader(query), cli.API.DeleteByQuery.WithContext(ctx), cli.API.DeleteByQuery.WithRefresh(true), ) + if err != nil { - t.Fatalf("could not clean index %s, DeleteByQuery failed: %v", - index, err) + t.Fatal(err) } defer res.Body.Close() var esres es.DeleteByQueryResponse + err = json.NewDecoder(res.Body).Decode(&esres) if err != nil { - t.Fatalf("could not decode ES response: %v", err) + t.Fatal(err) } if res.IsError() { @@ -135,9 +135,9 @@ func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index strin } } } + if err != nil { - t.Fatalf("ES returned an error: %v. body: %q", err, res) + t.Fatal(err) } - return index } diff --git a/model/schema.json b/model/schema.json index 423cfe3fc..91bda492f 100644 --- a/model/schema.json +++ b/model/schema.json @@ -244,7 +244,6 @@ "name" ] }, - "server-metadata": { "title": "Server Metadata", "description": "A Fleet Server metadata", @@ -265,7 +264,6 @@ "version" ] }, - "server": { "title": "Server", "description": "A Fleet Server", @@ -286,7 +284,6 @@ "server" ] }, - "policy": { "title": "Policy", "description": "A policy that an Elastic Agent is attached to", @@ -332,7 +329,6 @@ "default_fleet_server" ] }, - "policy-leader": { "title": "Policy Leader", "description": "The current leader Fleet Server for a policy", @@ -349,60 +345,6 @@ "server" ] }, - - "to_retire_api_key_ids": { - "type": "array", - "items": { - "description": "the Output API Keys that were replaced and should be retired", - "type": "object", - "properties": { - "id": { - "description": "API Key identifier", - "type": "string" - }, - "retired_at": { - "description": "Date/time the API key was retired", - "type": "string", - "format": "date-time" - } - } - } - }, - - "policy_output" : { - "type": "object", - "description": "holds the needed data to manage the output API keys", - "properties": { - "api_key": { - "description": "API key the Elastic Agent uses to authenticate with elasticsearch", - "type": "string" - }, - "to_retire_api_key_ids": { - "description": "API keys to be invalidated on next agent ack", - "$ref": "#/definitions/to_retire_api_key_ids" - }, - "api_key_id": { - "description": "ID of the API key the Elastic Agent uses to authenticate with elasticsearch", - "type": "string" - }, - "permissions_hash": { - "description": "The policy output permissions hash", - "type": "string" - }, - "type": { - "description": "Type is the output type. Currently only Elasticsearch is supported.", - "type": "string" - } - }, - "required": [ - "api_key", - "api_key_history", - "api_key_id", - "permissions_hash", - "type" - ] - }, - "agent": { "title": "Agent", "description": "An Elastic Agent that has enrolled into Fleet", @@ -499,7 +441,7 @@ "type": "integer" }, "policy_output_permissions_hash": { - "description": "Deprecated. Use Outputs instead. The policy output permissions hash", + "description": "The policy output permissions hash", "type": "string" }, "last_updated": { @@ -517,21 +459,30 @@ "type": "string" }, "default_api_key_id": { - "description": "Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch", + "description": "ID of the API key the Elastic Agent uses to authenticate with elasticsearch", "type": "string" }, "default_api_key": { - "description": "Deprecated. Use Outputs instead. API key the Elastic Agent uses to authenticate with elasticsearch", + "description": "API key the Elastic Agent uses to authenticate with elasticsearch", "type": "string" }, "default_api_key_history": { - "description": "Deprecated. Use Outputs instead. Default API Key History", - "$ref": "#/definitions/to_retire_api_key_ids" - }, - "outputs": { - "description": "Outputs is the policy output data, mapping the output name to its data", - "type": "object", - "additionalProperties": { "$ref": "#/definitions/policy_output"} + "description": "Default API Key History", + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "description": "API Key identifier", + "type": "string" + }, + "retired_at": { + "description": "Date/time the API key was retired", + "type": "string", + "format": "date-time" + } + } + } }, "updated_at": { "description": "Date/time the Elastic Agent was last updated", @@ -561,7 +512,6 @@ "status" ] }, - "enrollment_api_key": { "title": "Enrollment API key", "description": "An Elastic Agent enrollment API key", @@ -605,7 +555,6 @@ ] } }, - "checkin": { "title": "Checkin", "description": "An Elastic Agent checkin to Fleet",