Skip to content

Commit

Permalink
agent: replace docker check
Browse files Browse the repository at this point in the history
This patch replaces the Docker client which is used
for health checks with a simplified version tailored
for that purpose.

See #3254
See #3257
Fixes #3270
  • Loading branch information
magiconair committed Jul 18, 2017
1 parent dd85ea2 commit b514187
Show file tree
Hide file tree
Showing 7 changed files with 540 additions and 369 deletions.
37 changes: 28 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type Agent struct {
// checkLock protects updates to the check* maps
checkLock sync.Mutex

// dockerClient is the client for performing docker health checks.
dockerClient *DockerClient

// eventCh is used to receive user events
eventCh chan serf.UserEvent

Expand Down Expand Up @@ -1637,9 +1640,12 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,

// Check if already registered
if chkType != nil {
if chkType.IsTTL() {
switch {

case chkType.IsTTL():
if existing, ok := a.checkTTLs[check.CheckID]; ok {
existing.Stop()
delete(a.checkTTLs, check.CheckID)
}

ttl := &CheckTTL{
Expand All @@ -1658,9 +1664,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
ttl.Start()
a.checkTTLs[check.CheckID] = ttl

} else if chkType.IsHTTP() {
case chkType.IsHTTP():
if existing, ok := a.checkHTTPs[check.CheckID]; ok {
existing.Stop()
delete(a.checkHTTPs, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
Expand All @@ -1682,9 +1689,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
http.Start()
a.checkHTTPs[check.CheckID] = http

} else if chkType.IsTCP() {
case chkType.IsTCP():
if existing, ok := a.checkTCPs[check.CheckID]; ok {
existing.Stop()
delete(a.checkTCPs, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
Expand All @@ -1703,16 +1711,26 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
tcp.Start()
a.checkTCPs[check.CheckID] = tcp

} else if chkType.IsDocker() {
case chkType.IsDocker():
if existing, ok := a.checkDockers[check.CheckID]; ok {
existing.Stop()
delete(a.checkDockers, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
check.CheckID, MinInterval))
chkType.Interval = MinInterval
}

if a.dockerClient == nil {
dc, err := NewDockerClient(os.Getenv("DOCKER_HOST"), CheckBufSize)
if err != nil {
a.logger.Printf("[ERR] agent: error creating docker client: %s", err)
return err
}
a.dockerClient = dc
}

dockerCheck := &CheckDocker{
Notify: a.state,
CheckID: check.CheckID,
Expand All @@ -1721,15 +1739,15 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Script: chkType.Script,
Interval: chkType.Interval,
Logger: a.logger,
}
if err := dockerCheck.Init(); err != nil {
return err
client: a.dockerClient,
}
dockerCheck.Start()
a.checkDockers[check.CheckID] = dockerCheck
} else if chkType.IsMonitor() {

case chkType.IsMonitor():
if existing, ok := a.checkMonitors[check.CheckID]; ok {
existing.Stop()
delete(a.checkMonitors, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
Expand All @@ -1747,7 +1765,8 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
} else {

default:
return fmt.Errorf("Check type is not valid")
}

Expand Down
159 changes: 57 additions & 102 deletions agent/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand All @@ -14,7 +15,6 @@ import (
"time"

"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -516,14 +516,6 @@ func (c *CheckTCP) check() {
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
}

// DockerClient defines an interface for a docker client
// which is used for injecting a fake client during tests.
type DockerClient interface {
CreateExec(docker.CreateExecOptions) (*docker.Exec, error)
StartExec(string, docker.StartExecOptions) error
InspectExec(string) (*docker.ExecInspect, error)
}

// CheckDocker is used to periodically invoke a script to
// determine the health of an application running inside a
// Docker Container. We assume that the script is compatible
Expand All @@ -537,137 +529,100 @@ type CheckDocker struct {
Interval time.Duration
Logger *log.Logger

dockerClient DockerClient
cmd []string
stop bool
stopCh chan struct{}
stopLock sync.Mutex
client *DockerClient
stop chan struct{}
}

// Init initializes the Docker Client
func (c *CheckDocker) Init() error {
var err error
c.dockerClient, err = docker.NewClientFromEnv()
if err != nil {
c.Logger.Printf("[DEBUG] Error creating the Docker client: %s", err.Error())
return err
func (c *CheckDocker) Start() {
if c.stop != nil {
panic("Docker check already started")
}
return nil
}

// Start is used to start checks.
// Docker Checks runs until stop is called
func (c *CheckDocker) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.Logger == nil {
c.Logger = log.New(ioutil.Discard, "", 0)
}

//figure out the shell
if c.Shell == "" {
c.Shell = shell()
c.Shell = os.Getenv("SHELL")
if c.Shell == "" {
c.Shell = "/bin/sh"
}
}

c.cmd = []string{c.Shell, "-c", c.Script}

c.stop = false
c.stopCh = make(chan struct{})
c.stop = make(chan struct{})
go c.run()
}

// Stop is used to stop a docker check.
func (c *CheckDocker) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
if c.stop == nil {
panic("Stop called before start")
}
close(c.stop)
}

// run is invoked by a goroutine to run until Stop() is called
func (c *CheckDocker) run() {
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID)
next := time.After(initialPauseTime)
firstWait := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", firstWait, c.Shell, c.Script, c.DockerContainerID)
next := time.After(firstWait)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
case <-c.stop:
return
}
}
}

func (c *CheckDocker) check() {
//Set up the Exec since
execOpts := docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
Cmd: c.cmd,
Container: c.DockerContainerID,
}
var (
exec *docker.Exec
err error
)
if exec, err = c.dockerClient.CreateExec(execOpts); err != nil {
c.Logger.Printf("[DEBUG] agent: Error while creating Exec: %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to create Exec, error: %s", err.Error()))
return
}

// Collect the output
output, _ := circbuf.NewBuffer(CheckBufSize)

err = c.dockerClient.StartExec(exec.ID, docker.StartExecOptions{Detach: false, Tty: false, OutputStream: output, ErrorStream: output})
var out string
status, b, err := c.doCheck()
if err != nil {
c.Logger.Printf("[DEBUG] Error in executing health checks: %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to start Exec: %s", err.Error()))
return
c.Logger.Printf("[DEBUG] agent: Check '%s': %s", c.CheckID, err)
out = err.Error()
} else {
// out is already limited to CheckBufSize since we're getting a
// limited buffer. So we don't need to truncate it just report
// that it was truncated.
out = string(b.Bytes())
if int(b.TotalWritten()) > len(out) {
out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out)
}
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", c.CheckID, c.Script, out)
}

// Get the output, add a message about truncation
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
if status == api.HealthCritical {
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
}

c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s",
c.CheckID, c.Script, outputStr)
c.Notify.UpdateCheck(c.CheckID, status, out)
}

execInfo, err := c.dockerClient.InspectExec(exec.ID)
func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
cmd := []string{c.Shell, "-c", c.Script}
execID, err := c.client.CreateExec(c.DockerContainerID, cmd)
if err != nil {
c.Logger.Printf("[DEBUG] Error in inspecting check result : %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to inspect Exec: %s", err.Error()))
return
return api.HealthCritical, nil, err
}

// Sets the status of the check to healthy if exit code is 0
if execInfo.ExitCode == 0 {
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr)
return
buf, err := c.client.StartExec(c.DockerContainerID, execID)
if err != nil {
return api.HealthCritical, nil, err
}

// Set the status of the check to Warning if exit code is 1
if execInfo.ExitCode == 1 {
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", execInfo.ExitCode)
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr)
return
exitCode, err := c.client.InspectExec(c.DockerContainerID, execID)
if err != nil {
return api.HealthCritical, nil, err
}

// Set the health as critical
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr)
}

func shell() string {
if sh := os.Getenv("SHELL"); sh != "" {
return sh
switch exitCode {
case 0:
return api.HealthPassing, buf, nil
case 1:
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
return api.HealthWarning, buf, nil
default:
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
return api.HealthCritical, buf, nil
}
return "/bin/sh"
}
Loading

0 comments on commit b514187

Please sign in to comment.