Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/fleet/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ func (ct *CheckinT) fetchAgentPendingActions(ctx context.Context, seqno sqn.SeqN
now := time.Now().UTC().Format(time.RFC3339)

return dl.FindActions(ctx, ct.bulker, dl.QueryAgentActions, map[string]interface{}{
dl.FieldSeqNo: seqno.Get(0),
dl.FieldMaxSeqNo: ct.gcp.GetCheckpoint(),
dl.FieldSeqNo: seqno.Value(),
dl.FieldMaxSeqNo: ct.gcp.GetCheckpoint().Value(),
dl.FieldExpiration: now,
dl.FieldAgents: []string{agentId},
})
Expand Down
20 changes: 17 additions & 3 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/coordinator"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
Expand Down Expand Up @@ -505,7 +506,13 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er
// shutdown before the bulker is then cancelled.
bulkCtx, bulkCancel := context.WithCancel(context.Background())
defer bulkCancel()
es, bulker, err := bulk.InitES(bulkCtx, cfg)
esCli, bulker, err := bulk.InitES(bulkCtx, cfg)
if err != nil {
return err
}

// Monitoring es client, longer timeout, no retries
monCli, err := es.NewClient(ctx, cfg, true)
if err != nil {
return err
}
Expand All @@ -514,7 +521,10 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er
g, ctx := errgroup.WithContext(ctx)

// Coordinator policy monitor
pim, err := monitor.New(dl.FleetPolicies, es, monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize))
pim, err := monitor.New(dl.FleetPolicies, esCli, monCli,
monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize),
monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout),
)
if err != nil {
return err
}
Expand All @@ -536,7 +546,11 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er
var ad *action.Dispatcher
var tr *action.TokenResolver

am, err = monitor.NewSimple(dl.FleetActions, es, monitor.WithExpiration(true), monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize))
am, err = monitor.NewSimple(dl.FleetActions, esCli, monCli,
monitor.WithExpiration(true),
monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize),
monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout),
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion dev-tools/integration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
checkErr(err)

ctx := context.Background()
es, err := es.NewClient(ctx, cfg)
es, err := es.NewClient(ctx, cfg, false)
checkErr(err)

err = esutil.EnsureESIndices(ctx, es)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ const (

func InitES(ctx context.Context, cfg *config.Config, opts ...BulkOpt) (*elasticsearch.Client, Bulk, error) {

es, err := es.NewClient(ctx, cfg)
es, err := es.NewClient(ctx, cfg, false)
if err != nil {
return nil, nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions internal/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func TestConfig(t *testing.T) {
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
},
},
},
Expand Down Expand Up @@ -182,7 +183,8 @@ func TestConfig(t *testing.T) {
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
},
},
},
Expand Down Expand Up @@ -269,7 +271,8 @@ func TestConfig(t *testing.T) {
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
},
},
},
Expand Down Expand Up @@ -356,7 +359,8 @@ func TestConfig(t *testing.T) {
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
},
},
},
Expand Down
9 changes: 7 additions & 2 deletions internal/pkg/config/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

package config

import "time"

const (
defaultFetchSize = 1000
defaultFetchSize = 1000
defaultPollTimeout = 5 * time.Minute
)

type Monitor struct {
FetchSize int `config:"fetch_size"`
FetchSize int `config:"fetch_size"`
PollTimeout time.Duration `config:"poll_timeout"`
}

func (m *Monitor) InitDefaults() {
m.FetchSize = defaultFetchSize
m.PollTimeout = defaultPollTimeout
}
18 changes: 17 additions & 1 deletion internal/pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

// The timeout would be driven by the server for long poll.
// Giving it some sane long value.
const httpTransportLongPollTimeout = 10 * time.Minute

var hasScheme = regexp.MustCompile(`^([a-z][a-z0-9+\-.]*)://`)

// Elasticsearch is the configuration for elasticsearch.
Expand Down Expand Up @@ -77,7 +81,7 @@ func (c *Elasticsearch) Validate() error {
}

// ToESConfig converts the configuration object into the config for the elasticsearch client.
func (c *Elasticsearch) ToESConfig() (elasticsearch.Config, error) {
func (c *Elasticsearch) ToESConfig(longPoll bool) (elasticsearch.Config, error) {
// build the addresses
addrs := make([]string, len(c.Hosts))
for i, host := range c.Hosts {
Expand All @@ -104,6 +108,17 @@ func (c *Elasticsearch) ToESConfig() (elasticsearch.Config, error) {
ResponseHeaderTimeout: c.Timeout,
ExpectContinueTimeout: 1 * time.Second,
}

disableRetry := false

if longPoll {
httpTransport.IdleConnTimeout = httpTransportLongPollTimeout
httpTransport.ResponseHeaderTimeout = httpTransportLongPollTimeout

// no retries for long poll monitoring
disableRetry = true
}

if c.TLS != nil && c.TLS.IsEnabled() {
tls, err := tlscommon.LoadTLSConfig(c.TLS)
if err != nil {
Expand Down Expand Up @@ -136,6 +151,7 @@ func (c *Elasticsearch) ToESConfig() (elasticsearch.Config, error) {
Header: h,
Transport: httpTransport,
MaxRetries: c.MaxRetries,
DisableRetry: disableRetry,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/config/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestToESConfig(t *testing.T) {
cmpopts.IgnoreUnexported(tls.Config{}),
}
t.Run(name, func(t *testing.T) {
res, err := test.cfg.ToESConfig()
res, err := test.cfg.ToESConfig(false)
require.NoError(t, err)
test.result.Header.Set("X-elastic-product-origin", "fleet")
if !assert.True(t, cmp.Equal(test.result, res, copts...)) {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/coordinator/monitor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestMonitorLeadership(t *testing.T) {
serversIndex := ftesting.SetupIndex(bulkCtx, t, bulker, es.MappingServer)
policiesIndex := ftesting.SetupIndex(bulkCtx, t, bulker, es.MappingPolicy)
leadersIndex := ftesting.SetupIndex(bulkCtx, t, bulker, es.MappingPolicyLeader)
pim, err := monitor.New(policiesIndex, bulker.Client())
pim, err := monitor.New(policiesIndex, bulker.Client(), bulker.Client())
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/rs/zerolog/log"
)

func NewClient(ctx context.Context, cfg *config.Config) (*elasticsearch.Client, error) {
escfg, err := cfg.Output.Elasticsearch.ToESConfig()
func NewClient(ctx context.Context, cfg *config.Config, longPoll bool) (*elasticsearch.Client, error) {
escfg, err := cfg.Output.Elasticsearch.ToESConfig(longPoll)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/es/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ type ErrElastic struct {
func (e *ErrElastic) Unwrap() error {
if e.Type == "index_not_found_exception" {
return ErrIndexNotFound
} else if e.Type == "timeout_exception" {
return ErrTimeout
}

return nil
}

Expand All @@ -35,6 +38,8 @@ var (
ErrElasticNotFound = errors.New("elastic not found")
ErrInvalidBody = errors.New("invalid body")
ErrIndexNotFound = errors.New("index not found")
ErrTimeout = errors.New("timeout")
ErrNotFound = errors.New("not found")
)

func TranslateError(status int, e ErrorT) error {
Expand Down
164 changes: 164 additions & 0 deletions internal/pkg/es/fleet_global_checkpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package es

import (
"context"
"net/http"
"strconv"
"strings"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

// The wrapper for the new _fleet global_checkpoints that is not the part of the
// standard client library at the moment.
// The shape mimics the official client API and should be easy drop-in replacement in the future.
// This should be replaced the official client library when/if the new API makes it in.

func NewGlobalCheckpointsRequest(t esapi.Transport) GlobalCheckpoints {
return func(o ...func(*GlobalCheckpointsRequest)) (*esapi.Response, error) {
var r = GlobalCheckpointsRequest{}
for _, f := range o {
f(&r)
}
return r.Do(r.ctx, t)
}
}

// Copied from the official client
func formatDuration(d time.Duration) string {
if d < time.Millisecond {
return strconv.FormatInt(int64(d), 10) + "nanos"
}
return strconv.FormatInt(int64(d)/int64(time.Millisecond), 10) + "ms"
}

type GlobalCheckpoints func(o ...func(*GlobalCheckpointsRequest)) (*esapi.Response, error)

// GlobalCheckpointsRequest configures the _fleet API global_checkpoints request.
//
type GlobalCheckpointsRequest struct {
ctx context.Context

Index string
WaitForAdvance *bool
Checkpoints []int64
Timeout time.Duration

Header http.Header
}

// Do executes the request and returns response or error.
//
func (r GlobalCheckpointsRequest) Do(ctx context.Context, transport esapi.Transport) (*esapi.Response, error) {
var (
method string
path strings.Builder
params map[string]string
)

method = "GET"

path.Grow(1 + len(r.Index) + len("/_fleet/global_checkpoints"))
if len(r.Index) > 0 {
path.WriteString("/")
path.WriteString(r.Index)
}
path.WriteString("/_fleet/global_checkpoints")

params = make(map[string]string)

if r.WaitForAdvance != nil {
params["wait_for_advance"] = strconv.FormatBool(*r.WaitForAdvance)
}

if len(r.Checkpoints) > 0 {
seqNo := sqn.SeqNo(r.Checkpoints)
params["checkpoints"] = seqNo.String()
}

if r.Timeout != 0 {
params["timeout"] = formatDuration(r.Timeout)
}

req, err := http.NewRequest(method, path.String(), nil)
if err != nil {
return nil, err
}

if len(params) > 0 {
q := req.URL.Query()
for k, v := range params {
q.Set(k, v)
}
req.URL.RawQuery = q.Encode()
}

if len(r.Header) > 0 {
if len(req.Header) == 0 {
req.Header = r.Header
} else {
for k, vv := range r.Header {
for _, v := range vv {
req.Header.Add(k, v)
}
}
}
}

if ctx != nil {
req = req.WithContext(ctx)
}

res, err := transport.Perform(req)
if err != nil {
return nil, err
}

response := esapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}

return &response, nil
}

// WithContext sets the request context.
//
func (f GlobalCheckpoints) WithContext(v context.Context) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.ctx = v
}
}

// WithIndex - an index name
//
func (f GlobalCheckpoints) WithIndex(index string) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.Index = index
}
}

func (f GlobalCheckpoints) WithWaitForAdvance(v bool) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.WaitForAdvance = &v
}
}

func (f GlobalCheckpoints) WithCheckpoints(checkpoints []int64) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.Checkpoints = checkpoints
}
}

func (f GlobalCheckpoints) WithTimeout(to time.Duration) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.Timeout = to
}
}
Loading