Skip to content

Commit

Permalink
Add liveness endpoint
Browse files Browse the repository at this point in the history
Add /liveness route to metrics server. This route will report the status
from pkg/core/status. fleet-gateway will now report a degraded state if
a checkin fails. This may not propogate to fleet-server as a failed
checkin means communications between the agent and the server are not
working. It may also lead to the server reporting degraded for up to 30s
(fleet-server polling time) when teh agent is able to successfully
connect.
  • Loading branch information
michel-laterman committed Jun 16, 2022
1 parent fa82d1a commit a5d172e
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,4 @@
- Save the agent configuration and the state encrypted on the disk. {issue}535[535] {pull}398[398]
- Bump node.js version for heartbeat/synthetics to 16.15.0
- Support scheduled actions and cancellation of pending actions. {issue}393[393] {pull}419[419]
- Add liveness endpoint, allow fleet-gateway component to report degraded state, add update time and messages to status output. {issue}390[390] {pull}568[568]
44 changes: 27 additions & 17 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,24 @@ type actionQueue interface {
}

type fleetGateway struct {
bgContext context.Context
log *logger.Logger
dispatcher pipeline.Dispatcher
client client.Sender
scheduler scheduler.Scheduler
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker store.FleetAcker
unauthCounter int
statusController status.Controller
statusReporter status.Reporter
stateStore stateStore
queue actionQueue
bgContext context.Context
log *logger.Logger
dispatcher pipeline.Dispatcher
client client.Sender
scheduler scheduler.Scheduler
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker store.FleetAcker
unauthCounter int
checkinFailCounter int
statusController status.Controller
statusReporter status.Reporter
stateStore stateStore
queue actionQueue
}

// New creates a new fleet gateway
Expand Down Expand Up @@ -286,6 +287,7 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
f.log.Debugf("Checking started")
resp, err := f.execute(f.bgContext)
if err != nil {
f.checkinFailCounter++
f.log.Errorf("Could not communicate with fleet-server Checking API will retry, error: %s", err)
if !f.backoff.Wait() {
// Something bad has happened and we log it and we should update our current state.
Expand All @@ -299,8 +301,16 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
f.statusReporter.Update(state.Failed, err.Error(), nil)
return nil, err
}
if f.checkinFailCounter > 1 {
// Update status reporter for gateway to degraded when there are two consecutive failures.
// Note that this may not propogate to fleet-server as the agent is having issues checking in.
// It may also (falsely) report a degraded session for 30s if it is eventually successful.
// However this component will allow the agent to report fleet gateway degredation locally.
f.statusReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
}
continue
}
f.checkinFailCounter = 0
// Request was successful, return the collected actions.
return resp, nil
}
Expand Down
130 changes: 84 additions & 46 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
repo "github.com/elastic/elastic-agent/internal/pkg/reporter"
fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet"
fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
"github.com/elastic/elastic-agent/internal/pkg/testutils"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

Expand Down Expand Up @@ -705,59 +707,95 @@ func TestRetriesOnFailures(t *testing.T) {
Backoff: backoffSettings{Init: 100 * time.Millisecond, Max: 5 * time.Second},
}

t.Run("When the gateway fails to communicate with the checkin API we will retry",
withGateway(agentInfo, settings, func(
t *testing.T,
gateway gateway.FleetGateway,
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
fail := func(_ http.Header, _ io.Reader) (*http.Response, error) {
return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil
}
clientWaitFn := client.Answer(fail)
err := gateway.Start()
require.NoError(t, err)
t.Run("When the gateway fails to communicate with the checkin API we will retry", func(t *testing.T) {
scheduler := scheduler.NewStepper()
client := newTestingClient()
dispatcher := newTestingDispatcher()
log, _ := logger.New("fleet_gateway", false)
rep := getReporter(agentInfo, log, t)

_ = rep.Report(context.Background(), &testStateEvent{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Initial tick is done out of bound so we can block on channels.
scheduler.Next()
diskStore := storage.NewDiskStore(paths.AgentStateStoreFile())
stateStore, err := store.NewStateStore(log, diskStore)
require.NoError(t, err)

// Simulate a 500 errors for the next 3 calls.
<-clientWaitFn
<-clientWaitFn
<-clientWaitFn

// API recover
waitFn := ackSeq(
client.Answer(func(_ http.Header, body io.Reader) (*http.Response, error) {
cr := &request{}
content, err := ioutil.ReadAll(body)
if err != nil {
t.Fatal(err)
}
err = json.Unmarshal(content, &cr)
if err != nil {
t.Fatal(err)
}
queue := &mockQueue{}
queue.On("DequeueActions").Return([]fleetapi.Action{})
queue.On("Actions").Return([]fleetapi.Action{})

fleetReporter := &testutils.MockReporter{}
fleetReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2)
fleetReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
fleetReporter.On("Unregister").Maybe()

statusController := &testutils.MockController{}
statusController.On("RegisterComponent", "gateway").Return(fleetReporter).Once()
statusController.On("StatusString").Return("string")

gateway, err := newFleetGatewayWithScheduler(
ctx,
log,
settings,
agentInfo,
client,
dispatcher,
scheduler,
rep,
noopacker.NewAcker(),
statusController,
stateStore,
queue,
)
require.NoError(t, err)

fail := func(_ http.Header, _ io.Reader) (*http.Response, error) {
return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil
}
clientWaitFn := client.Answer(fail)
err = gateway.Start()
require.NoError(t, err)

require.Equal(t, 1, len(cr.Events))
_ = rep.Report(context.Background(), &testStateEvent{})

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
// Initial tick is done out of bound so we can block on channels.
scheduler.Next()

dispatcher.Answer(func(actions ...fleetapi.Action) error {
require.Equal(t, 0, len(actions))
return nil
}),
)
// Simulate a 500 errors for the next 3 calls.
<-clientWaitFn
<-clientWaitFn
<-clientWaitFn

waitFn()
}))
// API recover
waitFn := ackSeq(
client.Answer(func(_ http.Header, body io.Reader) (*http.Response, error) {
cr := &request{}
content, err := ioutil.ReadAll(body)
if err != nil {
t.Fatal(err)
}
err = json.Unmarshal(content, &cr)
if err != nil {
t.Fatal(err)
}

require.Equal(t, 1, len(cr.Events))

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),

dispatcher.Answer(func(actions ...fleetapi.Action) error {
require.Equal(t, 0, len(actions))
return nil
}),
)

waitFn()
statusController.AssertExpectations(t)
fleetReporter.AssertExpectations(t)
})

t.Run("The retry loop is interruptible",
withGateway(agentInfo, &fleetGatewaySettings{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@
package fleet

import (
"net/http"

"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/internal/pkg/core/status"
)

type noopController struct{}

func (*noopController) SetAgentID(_ string) {}
func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) RegisterComponentWithPersistance(_ string, _ bool) status.Reporter {
return &noopReporter{}
}
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }
func (*noopController) ServeHTTP(_ http.ResponseWriter, _ *http.Request) {}

type noopReporter struct{}

Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func run(override cfgOverrider) error {
rex := reexec.NewManager(rexLogger, execPath)

statusCtrl := status.NewController(logger)
statusCtrl.SetAgentID(agentInfo.AgentID())

tracer, err := initTracer(agentName, release.Version(), cfg.Settings.MonitoringConfig)
if err != nil {
Expand Down Expand Up @@ -182,7 +183,7 @@ func run(override cfgOverrider) error {
control.SetRouteFn(app.Routes)
control.SetMonitoringCfg(cfg.Settings.MonitoringConfig)

serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app, tracer)
serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app, tracer, statusCtrl)
if err != nil {
return err
}
Expand Down Expand Up @@ -337,6 +338,7 @@ func setupMetrics(
cfg *monitoringCfg.MonitoringConfig,
app application.Application,
tracer *apm.Tracer,
statusCtrl status.Controller,
) (func() error, error) {
if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil {
return nil, err
Expand All @@ -349,7 +351,7 @@ func setupMetrics(
}

bufferEnabled := cfg.HTTP.Buffer != nil && cfg.HTTP.Buffer.Enabled
s, err := monitoringServer.New(logger, endpointConfig, monitoring.GetNamespace, app.Routes, isProcessStatsEnabled(cfg.HTTP), bufferEnabled, tracer)
s, err := monitoringServer.New(logger, endpointConfig, monitoring.GetNamespace, app.Routes, isProcessStatsEnabled(cfg.HTTP), bufferEnabled, tracer, statusCtrl)
if err != nil {
return nil, errors.New(err, "could not start the HTTP server for the API")
}
Expand Down
6 changes: 5 additions & 1 deletion internal/pkg/core/monitoring/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func New(
enableProcessStats bool,
enableBuffer bool,
tracer *apm.Tracer,
statusController http.Handler,
) (*api.Server, error) {
if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil {
// log but ignore
Expand All @@ -44,7 +45,7 @@ func New(
return nil, err
}

return exposeMetricsEndpoint(log, cfg, ns, routesFetchFn, enableProcessStats, enableBuffer, tracer)
return exposeMetricsEndpoint(log, cfg, ns, routesFetchFn, enableProcessStats, enableBuffer, tracer, statusController)
}

func exposeMetricsEndpoint(
Expand All @@ -55,6 +56,7 @@ func exposeMetricsEndpoint(
enableProcessStats bool,
enableBuffer bool,
tracer *apm.Tracer,
statusController http.Handler,
) (*api.Server, error) {
r := mux.NewRouter()
if tracer != nil {
Expand All @@ -63,6 +65,8 @@ func exposeMetricsEndpoint(
statsHandler := statsHandler(ns("stats"))
r.Handle("/stats", createHandler(statsHandler))

r.Handle("/liveness", statusController)

if enableProcessStats {
r.HandleFunc("/processes", processesHandler(routesFetchFn))
r.Handle("/processes/{processID}", createHandler(processHandler(statsHandler)))
Expand Down
38 changes: 38 additions & 0 deletions internal/pkg/core/status/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package status

import (
"encoding/json"
"net/http"
"time"
)

type LivenessResponse struct {
ID string `json:"id"`
Status string `json:"status"`
Message string `json:"message"`
UpdateTime time.Time `json:"update_timestamp"`
}

// ServeHTTP is an HTTP Handler for the status controller.
// Respose code is 200 for a healthy agent, and 503 otherwise.
// Response body is a JSON object that contains the agent ID, status, message, and the last status update time.
func (r *controller) ServeHTTP(wr http.ResponseWriter, req *http.Request) {
s := r.Status()
lr := LivenessResponse{
ID: r.agentID,
Status: s.Status.String(),
Message: s.Message,
UpdateTime: s.UpdateTime,
}
status := 200
if s.Status != Healthy {
status = 503
}

wr.Header().Set("Content-Type", "application/json")
wr.WriteHeader(status)
enc := json.NewEncoder(wr)
if err := enc.Encode(lr); err != nil {
r.log.Errorf("Unable to encode liveness response: %v", err)
}
}
Loading

0 comments on commit a5d172e

Please sign in to comment.