Skip to content

Commit

Permalink
Support for maximum size for Output of checks (#5233)
Browse files Browse the repository at this point in the history
* Support for maximum size for Output of checks

This PR allows users to limit the size of output produced by checks at the agent 
and check level.

When set at the agent level, it will limit the output for all checks monitored
by the agent.

When set at the check level, it can override the agent max for a specific check but
only if it is lower than the agent max.

Default value is 4k, and input must be at least 1.
  • Loading branch information
pierresouchay authored and freddygv committed Jun 26, 2019
1 parent 4c37598 commit 0e907f5
Show file tree
Hide file tree
Showing 18 changed files with 260 additions and 94 deletions.
41 changes: 27 additions & 14 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.CoordinateUpdateBatchSize = a.config.ConsulCoordinateUpdateBatchSize
base.CoordinateUpdateMaxBatches = a.config.ConsulCoordinateUpdateMaxBatches
base.CoordinateUpdatePeriod = a.config.ConsulCoordinateUpdatePeriod
base.CheckOutputMaxSize = a.config.CheckOutputMaxSize

base.RaftConfig.HeartbeatTimeout = a.config.ConsulRaftHeartbeatTimeout
base.RaftConfig.LeaderLeaseTimeout = a.config.ConsulRaftLeaderLeaseTimeout
Expand Down Expand Up @@ -971,6 +972,9 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
if a.config.Bootstrap {
base.Bootstrap = true
}
if a.config.CheckOutputMaxSize > 0 {
base.CheckOutputMaxSize = a.config.CheckOutputMaxSize
}
if a.config.RejoinAfterLeave {
base.RejoinAfterLeave = true
}
Expand Down Expand Up @@ -2248,6 +2252,13 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,

// Check if already registered
if chkType != nil {
maxOutputSize := a.config.CheckOutputMaxSize
if maxOutputSize == 0 {
maxOutputSize = checks.DefaultBufSize
}
if chkType.OutputMaxSize > 0 && maxOutputSize > chkType.OutputMaxSize {
maxOutputSize = chkType.OutputMaxSize
}
switch {

case chkType.IsTTL():
Expand All @@ -2257,10 +2268,11 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}

ttl := &checks.CheckTTL{
Notify: a.State,
CheckID: check.CheckID,
TTL: chkType.TTL,
Logger: a.logger,
Notify: a.State,
CheckID: check.CheckID,
TTL: chkType.TTL,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
}

// Restore persisted state, if any
Expand Down Expand Up @@ -2294,6 +2306,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
TLSClientConfig: tlsClientConfig,
}
http.Start()
Expand Down Expand Up @@ -2361,7 +2374,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}

if a.dockerClient == nil {
dc, err := checks.NewDockerClient(os.Getenv("DOCKER_HOST"), checks.BufSize)
dc, err := checks.NewDockerClient(os.Getenv("DOCKER_HOST"), int64(maxOutputSize))
if err != nil {
a.logger.Printf("[ERR] agent: error creating docker client: %s", err)
return err
Expand Down Expand Up @@ -2396,14 +2409,14 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
check.CheckID, checks.MinInterval)
chkType.Interval = checks.MinInterval
}

monitor := &checks.CheckMonitor{
Notify: a.State,
CheckID: check.CheckID,
ScriptArgs: chkType.ScriptArgs,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
Notify: a.State,
CheckID: check.CheckID,
ScriptArgs: chkType.ScriptArgs,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
}
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
Expand Down Expand Up @@ -2878,7 +2891,7 @@ func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) err
}

// Set the status through CheckTTL to reset the TTL.
check.SetStatus(status, output)
outputTruncated := check.SetStatus(status, output)

// We don't write any files in dev mode so bail here.
if a.config.DataDir == "" {
Expand All @@ -2887,7 +2900,7 @@ func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) err

// Persist the state so the TTL check can come up in a good state after
// an agent restart, especially with long TTL values.
if err := a.persistCheckState(check, status, output); err != nil {
if err := a.persistCheckState(check, status, outputTruncated); err != nil {
return fmt.Errorf("failed persisting state for check %q: %s", checkID, err)
}

Expand Down
7 changes: 0 additions & 7 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/debug"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -715,12 +714,6 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques
return nil, nil
}

total := len(update.Output)
if total > checks.BufSize {
update.Output = fmt.Sprintf("%s ... (captured %d of %d bytes)",
update.Output[:checks.BufSize], checks.BufSize, total)
}

checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))

// Get the provided token, if any, and vet against any ACL policies.
Expand Down
10 changes: 5 additions & 5 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/debug"
Expand Down Expand Up @@ -2299,7 +2298,8 @@ func TestAgent_FailCheck_ACLDeny(t *testing.T) {

func TestAgent_UpdateCheck(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
maxChecksSize := 256
a := NewTestAgent(t, t.Name(), fmt.Sprintf("check_output_max_size=%d", maxChecksSize))
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

Expand Down Expand Up @@ -2340,7 +2340,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
t.Run("log output limit", func(t *testing.T) {
args := checkUpdate{
Status: api.HealthPassing,
Output: strings.Repeat("-= bad -=", 5*checks.BufSize),
Output: strings.Repeat("-= bad -=", 5*maxChecksSize),
}
req, _ := http.NewRequest("PUT", "/v1/agent/check/update/test", jsonReader(args))
resp := httptest.NewRecorder()
Expand All @@ -2359,8 +2359,8 @@ func TestAgent_UpdateCheck(t *testing.T) {
// rough check that the output buffer was cut down so this test
// isn't super brittle.
state := a.State.Checks()["test"]
if state.Status != api.HealthPassing || len(state.Output) > 2*checks.BufSize {
t.Fatalf("bad: %v", state)
if state.Status != api.HealthPassing || len(state.Output) > 2*maxChecksSize {
t.Fatalf("bad: %v, (len:=%d)", state, len(state.Output))
}
})

Expand Down
18 changes: 16 additions & 2 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1586,15 +1586,16 @@ func TestAgent_updateTTLCheck(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()

checkBufSize := 100
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
TTL: 15 * time.Second,
TTL: 15 * time.Second,
OutputMaxSize: checkBufSize,
}

// Add check and update it.
Expand All @@ -1614,6 +1615,19 @@ func TestAgent_updateTTLCheck(t *testing.T) {
if status.Output != "foo" {
t.Fatalf("bad: %v", status)
}

if err := a.updateTTLCheck("mem", api.HealthCritical, strings.Repeat("--bad-- ", 5*checkBufSize)); err != nil {
t.Fatalf("err: %v", err)
}

// Ensure we have a check mapping.
status = a.State.Checks()["mem"]
if status.Status != api.HealthCritical {
t.Fatalf("bad: %v", status)
}
if len(status.Output) > checkBufSize*2 {
t.Fatalf("bad: %v", len(status.Output))
}
}

func TestAgent_PersistService(t *testing.T) {
Expand Down
44 changes: 30 additions & 14 deletions agent/checks/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ const (
// Otherwise we risk fork bombing a system.
MinInterval = time.Second

// BufSize is the maximum size of the captured
// check output. Prevents an enormous buffer
// DefaultBufSize is the maximum size of the captured
// check output by defaut. Prevents an enormous buffer
// from being captured
BufSize = 4 * 1024 // 4KB
DefaultBufSize = 4 * 1024 // 4KB

// UserAgent is the value of the User-Agent header
// for HTTP health checks.
Expand All @@ -56,13 +56,14 @@ type CheckNotifier interface {
// determine the health of a given check. It is compatible with
// nagios plugins and expects the output in the same format.
type CheckMonitor struct {
Notify CheckNotifier
CheckID types.CheckID
Script string
ScriptArgs []string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
Notify CheckNotifier
CheckID types.CheckID
Script string
ScriptArgs []string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
OutputMaxSize int

stop bool
stopCh chan struct{}
Expand Down Expand Up @@ -122,7 +123,7 @@ func (c *CheckMonitor) check() {
}

// Collect the output
output, _ := circbuf.NewBuffer(BufSize)
output, _ := circbuf.NewBuffer(int64(c.OutputMaxSize))
cmd.Stdout = output
cmd.Stderr = output
exec.SetSysProcAttr(cmd)
Expand Down Expand Up @@ -222,12 +223,17 @@ type CheckTTL struct {
stop bool
stopCh chan struct{}
stopLock sync.Mutex

OutputMaxSize int
}

// Start is used to start a check ttl, runs until Stop()
func (c *CheckTTL) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.OutputMaxSize < 1 {
c.OutputMaxSize = DefaultBufSize
}
c.stop = false
c.stopCh = make(chan struct{})
c.timer = time.NewTimer(c.TTL)
Expand Down Expand Up @@ -275,16 +281,22 @@ func (c *CheckTTL) getExpiredOutput() string {

// SetStatus is used to update the status of the check,
// and to renew the TTL. If expired, TTL is restarted.
func (c *CheckTTL) SetStatus(status, output string) {
// output is returned (might be truncated)
func (c *CheckTTL) SetStatus(status, output string) string {
c.Logger.Printf("[DEBUG] agent: Check %q status is now %s", c.CheckID, status)
total := len(output)
if total > c.OutputMaxSize {
output = fmt.Sprintf("%s ... (captured %d of %d bytes)",
output[:c.OutputMaxSize], c.OutputMaxSize, total)
}
c.Notify.UpdateCheck(c.CheckID, status, output)

// Store the last output so we can retain it if the TTL expires.
c.lastOutputLock.Lock()
c.lastOutput = output
c.lastOutputLock.Unlock()

c.timer.Reset(c.TTL)
return output
}

// CheckHTTP is used to periodically make an HTTP request to
Expand All @@ -303,6 +315,7 @@ type CheckHTTP struct {
Timeout time.Duration
Logger *log.Logger
TLSClientConfig *tls.Config
OutputMaxSize int

httpClient *http.Client
stop bool
Expand Down Expand Up @@ -339,6 +352,9 @@ func (c *CheckHTTP) Start() {
} else if c.Interval < 10*time.Second {
c.httpClient.Timeout = c.Interval
}
if c.OutputMaxSize < 1 {
c.OutputMaxSize = DefaultBufSize
}
}

c.stop = false
Expand Down Expand Up @@ -413,7 +429,7 @@ func (c *CheckHTTP) check() {
defer resp.Body.Close()

// Read the response into a circular buffer to limit the size
output, _ := circbuf.NewBuffer(BufSize)
output, _ := circbuf.NewBuffer(int64(c.OutputMaxSize))
if _, err := io.Copy(output, resp.Body); err != nil {
c.Logger.Printf("[WARN] agent: Check %q error while reading body: %s", c.CheckID, err)
}
Expand Down
Loading

0 comments on commit 0e907f5

Please sign in to comment.