Skip to content

Commit

Permalink
[Ingest Management] Report agent status during checkin (#23058) (#23141)
Browse files Browse the repository at this point in the history
[Ingest Management] Report agent status during checkin (#23058)
  • Loading branch information
michalpristas authored Dec 15, 2020
1 parent 28e2ab2 commit 3a6b95f
Show file tree
Hide file tree
Showing 18 changed files with 480 additions and 97 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ require (
github.com/google/flatbuffers v1.7.2-0.20170925184458-7a6b2bf521e9
github.com/google/go-cmp v0.4.0
github.com/google/gopacket v1.1.18-0.20191009163724-0ad7f2610e34
github.com/google/uuid v1.1.2-0.20190416172445-c2e93f3ae59f // indirect
github.com/google/uuid v1.1.2-0.20190416172445-c2e93f3ae59f
github.com/gorhill/cronexpr v0.0.0-20161205141322-d520615e531a
github.com/gorilla/mux v1.7.2 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
Expand Down
45 changes: 29 additions & 16 deletions x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/scheduler"
)
Expand Down Expand Up @@ -61,19 +62,21 @@ type fleetAcker interface {
// call the API to send the events and will receive actions to be executed locally.
// The only supported action for now is a "ActionPolicyChange".
type fleetGateway struct {
bgContext context.Context
log *logger.Logger
dispatcher dispatcher
client clienter
scheduler scheduler.Scheduler
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker fleetAcker
unauthCounter int
bgContext context.Context
log *logger.Logger
dispatcher dispatcher
client clienter
scheduler scheduler.Scheduler
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker fleetAcker
unauthCounter int
statusController status.Controller
statusReporter status.Reporter
}

func newFleetGateway(
Expand All @@ -84,6 +87,7 @@ func newFleetGateway(
d dispatcher,
r fleetReporter,
acker fleetAcker,
statusController status.Controller,
) (*fleetGateway, error) {

scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
Expand All @@ -97,6 +101,7 @@ func newFleetGateway(
scheduler,
r,
acker,
statusController,
)
}

Expand All @@ -110,6 +115,7 @@ func newFleetGatewayWithScheduler(
scheduler scheduler.Scheduler,
r fleetReporter,
acker fleetAcker,
statusController status.Controller,
) (*fleetGateway, error) {

// Backoff implementation doesn't support the using context as the shutdown mechanism.
Expand All @@ -129,9 +135,11 @@ func newFleetGatewayWithScheduler(
settings.Backoff.Init,
settings.Backoff.Max,
),
done: done,
reporter: r,
acker: acker,
done: done,
reporter: r,
acker: acker,
statusReporter: statusController.Register("gateway"),
statusController: statusController,
}, nil
}

Expand All @@ -147,6 +155,7 @@ func (f *fleetGateway) worker() {
resp, err := f.doExecute()
if err != nil {
f.log.Error(err)
f.statusReporter.Update(status.Failed)
continue
}

Expand All @@ -157,9 +166,11 @@ func (f *fleetGateway) worker() {

if err := f.dispatcher.Dispatch(f.acker, actions...); err != nil {
f.log.Errorf("failed to dispatch actions, error: %s", err)
f.statusReporter.Update(status.Degraded)
}

f.log.Debugf("FleetGateway is sleeping, next update in %s", f.settings.Duration)
f.statusReporter.Update(status.Healthy)
case <-f.bgContext.Done():
f.stop()
return
Expand Down Expand Up @@ -203,6 +214,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
req := &fleetapi.CheckinRequest{
Events: ee,
Metadata: ecsMeta,
Status: f.statusController.StatusString(),
}

resp, err := cmd.Execute(ctx, req)
Expand Down Expand Up @@ -250,6 +262,7 @@ func (f *fleetGateway) Start() {
func (f *fleetGateway) stop() {
f.log.Info("Fleet gateway is stopping")
defer f.scheduler.Stop()
f.statusReporter.Unregister()
close(f.done)
f.wg.Wait()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat
scheduler,
rep,
newNoopAcker(),
&noopController{},
)

go gateway.Start()
Expand Down Expand Up @@ -254,6 +255,7 @@ func TestFleetGateway(t *testing.T) {
scheduler,
getReporter(agentInfo, log, t),
newNoopAcker(),
&noopController{},
)

go gateway.Start()
Expand Down Expand Up @@ -342,6 +344,7 @@ func TestFleetGateway(t *testing.T) {
scheduler,
getReporter(agentInfo, log, t),
newNoopAcker(),
&noopController{},
)

require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func newLocal(
uc upgraderControl,
agentInfo *info.AgentInfo,
) (*Local, error) {
statusController := &noopController{}
cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -97,7 +98,7 @@ func newLocal(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(localApplication.bgContext, agentInfo, cfg.Settings, localApplication.srv, reporter, monitor))
router, err := newRouter(log, streamFactory(localApplication.bgContext, agentInfo, cfg.Settings, localApplication.srv, reporter, monitor, statusController))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand Down
5 changes: 4 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet"
Expand Down Expand Up @@ -68,6 +69,7 @@ func newManaged(
reexec reexecManager,
agentInfo *info.AgentInfo,
) (*Managed, error) {
statusController := status.NewController(log)
path := info.AgentConfigFile()

store := storage.NewDiskStore(path)
Expand Down Expand Up @@ -154,7 +156,7 @@ func newManaged(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(managedApplication.bgContext, agentInfo, cfg.Settings, managedApplication.srv, combinedReporter, monitor))
router, err := newRouter(log, streamFactory(managedApplication.bgContext, agentInfo, cfg.Settings, managedApplication.srv, combinedReporter, monitor, statusController))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand Down Expand Up @@ -274,6 +276,7 @@ func newManaged(
actionDispatcher,
fleetR,
actionAcker,
statusController,
)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
)

type noopController struct{}

func (*noopController) Register(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }

type noopReporter struct{}

func (*noopReporter) Update(status.AgentStatus) {}
func (*noopReporter) Unregister() {}
8 changes: 5 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

Expand All @@ -41,10 +42,10 @@ func (b *operatorStream) Shutdown() {
b.configHandler.Shutdown()
}

func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor, statusController status.Controller) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
operator, err := newOperator(ctx, log, agentInfo, id, cfg, srv, r, m)
operator, err := newOperator(ctx, log, agentInfo, id, cfg, srv, r, m, statusController)
if err != nil {
return nil, err
}
Expand All @@ -56,7 +57,7 @@ func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configur
}
}

func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) {
func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor, statusController status.Controller) (*operation.Operator, error) {
fetcher := downloader.NewDownloader(log, config.DownloadConfig)
allowEmptyPgp, pgp := release.PGP()
verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp)
Expand Down Expand Up @@ -93,5 +94,6 @@ func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentI
srv,
r,
m,
statusController,
)
}
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/retry"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
)

var downloadPath = getAbsPath("tests/downloads")
Expand Down Expand Up @@ -69,7 +70,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a
t.Fatal(err)
}

operator, err := NewOperator(context.Background(), l, agentInfo, "p1", operatorCfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, noop.NewMonitor())
operator, err := NewOperator(context.Background(), l, agentInfo, "p1", operatorCfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, noop.NewMonitor(), status.NewController(l))
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/retry"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
)

func TestGenerateSteps(t *testing.T) {
Expand Down Expand Up @@ -132,7 +133,7 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M
}

ctx := context.Background()
operator, err := NewOperator(ctx, l, agentInfo, "p1", cfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, m)
operator, err := NewOperator(ctx, l, agentInfo, "p1", cfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, m, status.NewController(l))
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 3a6b95f

Please sign in to comment.