Skip to content

Commit

Permalink
Allow multiple ES outputs as long as they are the same ES (elastic#1684)
Browse files Browse the repository at this point in the history
* add 'outputs' field to the ES agent schema to store the API key data and permission hash for each ES output

* add output name to API key metadata

* add v8.5 migration to migration.go

* add migration docs and improve logging

* group migration functions per version
  • Loading branch information
AndersonQ authored Sep 7, 2022
1 parent cb125e0 commit 63fdcbf
Show file tree
Hide file tree
Showing 31 changed files with 1,352 additions and 457 deletions.
14 changes: 9 additions & 5 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,17 +821,21 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *
remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version)
if err != nil {
if len(remoteVersion) != 0 {
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", f.bi.Version, remoteVersion, err)
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w",
f.bi.Version, remoteVersion, err)
}
return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err)
}

// Run migrations; current safe to do in background. That may change in the future.
g.Go(loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
// Run migrations
loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
return dl.Migrate(ctx, bulker)
}))
})
if err = loggedMigration(); err != nil {
return fmt.Errorf("failed to run subsystems: %w", err)
}

// Run schduler for periodic GC/cleanup
// Run scheduler for periodic GC/cleanup
gcCfg := cfg.Inputs[0].Server.GC
sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval))
if err != nil {
Expand Down
49 changes: 23 additions & 26 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
Expand All @@ -24,7 +26,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/pkg/errors"

"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -337,8 +338,9 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
Int64("rev.coordinatorIdx", rev.CoordinatorIdx).
Msg("ack policy revision")

if ok && rev.PolicyID == agent.PolicyID && (rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
if ok && rev.PolicyID == agent.PolicyID &&
(rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
found = true
currRev = rev.RevisionIdx
currCoord = rev.CoordinatorIdx
Expand All @@ -349,17 +351,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return nil
}

sz := len(agent.DefaultAPIKeyHistory)
if sz > 0 {
ids := make([]string, sz)
for i := 0; i < sz; i++ {
ids[i] = agent.DefaultAPIKeyHistory[i].ID
}
log.Info().Strs("ids", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}
ack.invalidateAPIKeys(ctx, agent)

body := makeUpdatePolicyBody(
agent.PolicyID,
Expand All @@ -385,8 +377,24 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return errors.Wrap(err, "handlePolicyChange update")
}

func (ack *AckT) invalidateAPIKeys(ctx context.Context, agent *model.Agent) {
var ids []string
for _, out := range agent.Outputs {
for _, k := range out.ToRetireAPIKeyIds {
ids = append(ids, k.ID)
}
}

if len(ids) > 0 {
log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}
}

func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {
apiKeys := _getAPIKeyIDs(agent)
apiKeys := agent.APIKeyIDs()
if len(apiKeys) > 0 {
zlog = zlog.With().Strs(LogAPIKeyID, apiKeys).Logger()

Expand Down Expand Up @@ -440,17 +448,6 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
return nil
}

func _getAPIKeyIDs(agent *model.Agent) []string {
keys := make([]string, 0, 1)
if agent.AccessAPIKeyID != "" {
keys = append(keys, agent.AccessAPIKeyID)
}
if agent.DefaultAPIKeyID != "" {
keys = append(keys, agent.DefaultAPIKeyID)
}
return keys
}

// Generate an update script that validates that the policy_id
// has not changed underneath us by an upstream process (Kibana or otherwise).
// We have a race condition where a user could have assigned a new policy to
Expand Down
39 changes: 38 additions & 1 deletion internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"net/http"
"testing"

"github.com/google/go-cmp/cmp"

"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
"github.com/google/go-cmp/cmp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -439,3 +440,39 @@ func TestHandleAckEvents(t *testing.T) {
})
}
}

func TestInvalidateAPIKeys(t *testing.T) {
toRetire1 := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire1",
}}
toRetire2 := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire2_0",
}, {
ID: "toRetire2_1",
}}
var toRetire3 []model.ToRetireAPIKeyIdsItems

want := []string{"toRetire1", "toRetire2_0", "toRetire2_1"}

agent := model.Agent{
Outputs: map[string]*model.PolicyOutput{
"1": {ToRetireAPIKeyIds: toRetire1},
"2": {ToRetireAPIKeyIds: toRetire2},
"3": {ToRetireAPIKeyIds: toRetire3},
},
}

bulker := ftesting.NewMockBulk()
bulker.On("APIKeyInvalidate",
context.Background(), mock.MatchedBy(func(ids []string) bool {
// if A contains B and B contains A => A = B
return assert.Subset(t, ids, want) &&
assert.Subset(t, want, ids)
})).
Return(nil)

ack := &AckT{bulk: bulker}
ack.invalidateAPIKeys(context.Background(), &agent)

bulker.AssertExpectations(t)
}
15 changes: 7 additions & 8 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"reflect"
Expand Down Expand Up @@ -60,7 +61,6 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro
Logger()

err := rt.ct.handleCheckin(&zlog, w, r, id)

if err != nil {
cntCheckin.IncError(err)
resp := NewHTTPErrResp(err)
Expand Down Expand Up @@ -430,13 +430,13 @@ func convertActions(agentID string, actions []model.Action) ([]ActionResp, strin
//
func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agentID string, pp *policy.ParsedPolicy) (*ActionResp, error) {
zlog = zlog.With().
Str("ctx", "processPolicy").
Int64("policyRevision", pp.Policy.RevisionIdx).
Int64("policyCoordinator", pp.Policy.CoordinatorIdx).
Str("fleet.ctx", "processPolicy").
Int64("fleet.policyRevision", pp.Policy.RevisionIdx).
Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx).
Str(LogPolicyID, pp.Policy.PolicyID).
Logger()

// Repull and decode the agent object. Do not trust the cache.
// Repull and decode the agent object. Do not trust the cache.
agent, err := dl.FindAgent(ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID)
if err != nil {
zlog.Error().Err(err).Msg("fail find agent record")
Expand All @@ -446,7 +446,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
// Parse the outputs maps in order to prepare the outputs
const outputsProperty = "outputs"
outputs, err := smap.Parse(pp.Fields[outputsProperty])

if err != nil {
return nil, err
}
Expand All @@ -458,9 +457,9 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
// Iterate through the policy outputs and prepare them
for _, policyOutput := range pp.Outputs {
err = policyOutput.Prepare(ctx, zlog, bulker, &agent, outputs)

if err != nil {
return nil, err
return nil, fmt.Errorf("failed to prepare output %q: %w",
policyOutput.Name, err)
}
}

Expand Down
11 changes: 8 additions & 3 deletions internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type EnrollerT struct {
}

func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) {

log.Info().
Interface("limits", cfg.Limits.EnrollLimit).
Msg("Setting config enroll_limit")
Expand Down Expand Up @@ -187,7 +186,13 @@ func (et *EnrollerT) processRequest(rb *rollback.Rollback, zlog zerolog.Logger,
return et._enroll(r.Context(), rb, zlog, req, erec.PolicyID, ver)
}

func (et *EnrollerT) _enroll(ctx context.Context, rb *rollback.Rollback, zlog zerolog.Logger, req *EnrollRequest, policyID, ver string) (*EnrollResponse, error) {
func (et *EnrollerT) _enroll(
ctx context.Context,
rb *rollback.Rollback,
zlog zerolog.Logger,
req *EnrollRequest,
policyID,
ver string) (*EnrollResponse, error) {

if req.SharedID != "" {
// TODO: Support pre-existing install
Expand Down Expand Up @@ -427,7 +432,7 @@ func generateAccessAPIKey(ctx context.Context, bulk bulk.Bulk, agentID string) (
agentID,
"",
[]byte(kFleetAccessRolesJSON),
apikey.NewMetadata(agentID, apikey.TypeAccess),
apikey.NewMetadata(agentID, "", apikey.TypeAccess),
)
}

Expand Down
61 changes: 61 additions & 0 deletions internal/pkg/apikey/apikey.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@
package apikey

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"unicode/utf8"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)

const (
Expand All @@ -28,6 +34,61 @@ var (

var AuthKey = http.CanonicalHeaderKey("Authorization")

// APIKeyMetadata tracks Metadata associated with an APIKey.
type APIKeyMetadata struct {
ID string
Metadata Metadata
}

// Read gathers APIKeyMetadata from Elasticsearch using the given client.
func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) {
opts := []func(*esapi.SecurityGetAPIKeyRequest){
client.Security.GetAPIKey.WithContext(ctx),
client.Security.GetAPIKey.WithID(id),
}

res, err := client.Security.GetAPIKey(
opts...,
)
if err != nil {
return nil, fmt.Errorf("request to elasticsearch failed: %w", err)
}
defer res.Body.Close()

if res.IsError() {
return nil, fmt.Errorf("%s: %w", res.String(), ErrAPIKeyNotFound)
}

type APIKeyResponse struct {
ID string `json:"id"`
Metadata Metadata `json:"metadata"`
}
type GetAPIKeyResponse struct {
APIKeys []APIKeyResponse `json:"api_keys"`
}

var buff bytes.Buffer
if _, err := buff.ReadFrom(res.Body); err != nil {
return nil, fmt.Errorf("could not read from response body: %w", err)
}

var resp GetAPIKeyResponse
if err = json.Unmarshal(buff.Bytes(), &resp); err != nil {
return nil, fmt.Errorf(
"could not Unmarshal elasticsearch GetAPIKeyResponse: %w", err)
}

if len(resp.APIKeys) == 0 {
return nil, ErrAPIKeyNotFound
}

first := resp.APIKeys[0]
return &APIKeyMetadata{
ID: first.ID,
Metadata: first.Metadata,
}, nil
}

// APIKey is used to represent an Elasticsearch API Key.
type APIKey struct {
ID string
Expand Down
Loading

0 comments on commit 63fdcbf

Please sign in to comment.