Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace docker check #3270

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
37 changes: 28 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type Agent struct {
// checkLock protects updates to the check* maps
checkLock sync.Mutex

// dockerClient is the client for performing docker health checks.
dockerClient *DockerClient

// eventCh is used to receive user events
eventCh chan serf.UserEvent

Expand Down Expand Up @@ -1637,9 +1640,12 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,

// Check if already registered
if chkType != nil {
if chkType.IsTTL() {
switch {

case chkType.IsTTL():
if existing, ok := a.checkTTLs[check.CheckID]; ok {
existing.Stop()
delete(a.checkTTLs, check.CheckID)
}

ttl := &CheckTTL{
Expand All @@ -1658,9 +1664,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
ttl.Start()
a.checkTTLs[check.CheckID] = ttl

} else if chkType.IsHTTP() {
case chkType.IsHTTP():
if existing, ok := a.checkHTTPs[check.CheckID]; ok {
existing.Stop()
delete(a.checkHTTPs, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
Expand All @@ -1682,9 +1689,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
http.Start()
a.checkHTTPs[check.CheckID] = http

} else if chkType.IsTCP() {
case chkType.IsTCP():
if existing, ok := a.checkTCPs[check.CheckID]; ok {
existing.Stop()
delete(a.checkTCPs, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
Expand All @@ -1703,16 +1711,26 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
tcp.Start()
a.checkTCPs[check.CheckID] = tcp

} else if chkType.IsDocker() {
case chkType.IsDocker():
if existing, ok := a.checkDockers[check.CheckID]; ok {
existing.Stop()
delete(a.checkDockers, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
check.CheckID, MinInterval))
chkType.Interval = MinInterval
}

if a.dockerClient == nil {
dc, err := NewDockerClient(os.Getenv("DOCKER_HOST"), CheckBufSize)
if err != nil {
a.logger.Printf("[ERR] agent: error creating docker client: %s", err)
return err
}
a.dockerClient = dc
}

dockerCheck := &CheckDocker{
Notify: a.state,
CheckID: check.CheckID,
Expand All @@ -1721,15 +1739,15 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Script: chkType.Script,
Interval: chkType.Interval,
Logger: a.logger,
}
if err := dockerCheck.Init(); err != nil {
return err
client: a.dockerClient,
}
dockerCheck.Start()
a.checkDockers[check.CheckID] = dockerCheck
} else if chkType.IsMonitor() {

case chkType.IsMonitor():
if existing, ok := a.checkMonitors[check.CheckID]; ok {
existing.Stop()
delete(a.checkMonitors, check.CheckID)
}
if chkType.Interval < MinInterval {
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",
Expand All @@ -1747,7 +1765,8 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
} else {

default:
return fmt.Errorf("Check type is not valid")
}

Expand Down
159 changes: 57 additions & 102 deletions agent/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand All @@ -14,7 +15,6 @@ import (
"time"

"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -516,14 +516,6 @@ func (c *CheckTCP) check() {
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
}

// DockerClient defines an interface for a docker client
// which is used for injecting a fake client during tests.
type DockerClient interface {
CreateExec(docker.CreateExecOptions) (*docker.Exec, error)
StartExec(string, docker.StartExecOptions) error
InspectExec(string) (*docker.ExecInspect, error)
}

// CheckDocker is used to periodically invoke a script to
// determine the health of an application running inside a
// Docker Container. We assume that the script is compatible
Expand All @@ -537,137 +529,100 @@ type CheckDocker struct {
Interval time.Duration
Logger *log.Logger

dockerClient DockerClient
cmd []string
stop bool
stopCh chan struct{}
stopLock sync.Mutex
client *DockerClient
stop chan struct{}
}

// Init initializes the Docker Client
func (c *CheckDocker) Init() error {
var err error
c.dockerClient, err = docker.NewClientFromEnv()
if err != nil {
c.Logger.Printf("[DEBUG] Error creating the Docker client: %s", err.Error())
return err
func (c *CheckDocker) Start() {
if c.stop != nil {
panic("Docker check already started")
}
return nil
}

// Start is used to start checks.
// Docker Checks runs until stop is called
func (c *CheckDocker) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.Logger == nil {
c.Logger = log.New(ioutil.Discard, "", 0)
}

//figure out the shell
if c.Shell == "" {
c.Shell = shell()
c.Shell = os.Getenv("SHELL")
if c.Shell == "" {
c.Shell = "/bin/sh"
}
}

c.cmd = []string{c.Shell, "-c", c.Script}

c.stop = false
c.stopCh = make(chan struct{})
c.stop = make(chan struct{})
go c.run()
}

// Stop is used to stop a docker check.
func (c *CheckDocker) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
if c.stop == nil {
panic("Stop called before start")
}
close(c.stop)
}

// run is invoked by a goroutine to run until Stop() is called
func (c *CheckDocker) run() {
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID)
next := time.After(initialPauseTime)
firstWait := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", firstWait, c.Shell, c.Script, c.DockerContainerID)
next := time.After(firstWait)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
case <-c.stop:
return
}
}
}

func (c *CheckDocker) check() {
//Set up the Exec since
execOpts := docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
Cmd: c.cmd,
Container: c.DockerContainerID,
}
var (
exec *docker.Exec
err error
)
if exec, err = c.dockerClient.CreateExec(execOpts); err != nil {
c.Logger.Printf("[DEBUG] agent: Error while creating Exec: %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to create Exec, error: %s", err.Error()))
return
}

// Collect the output
output, _ := circbuf.NewBuffer(CheckBufSize)

err = c.dockerClient.StartExec(exec.ID, docker.StartExecOptions{Detach: false, Tty: false, OutputStream: output, ErrorStream: output})
var out string
status, b, err := c.doCheck()
if err != nil {
c.Logger.Printf("[DEBUG] Error in executing health checks: %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to start Exec: %s", err.Error()))
return
c.Logger.Printf("[DEBUG] agent: Check '%s': %s", c.CheckID, err)
out = err.Error()
} else {
// out is already limited to CheckBufSize since we're getting a
// limited buffer. So we don't need to truncate it just report
// that it was truncated.
out = string(b.Bytes())
if int(b.TotalWritten()) > len(out) {
out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out)
}
c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s", c.CheckID, c.Script, out)
}

// Get the output, add a message about truncation
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
if status == api.HealthCritical {
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
}

c.Logger.Printf("[DEBUG] agent: Check '%s' script '%s' output: %s",
c.CheckID, c.Script, outputStr)
c.Notify.UpdateCheck(c.CheckID, status, out)
}

execInfo, err := c.dockerClient.InspectExec(exec.ID)
func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
cmd := []string{c.Shell, "-c", c.Script}
execID, err := c.client.CreateExec(c.DockerContainerID, cmd)
if err != nil {
c.Logger.Printf("[DEBUG] Error in inspecting check result : %s", err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, fmt.Sprintf("Unable to inspect Exec: %s", err.Error()))
return
return api.HealthCritical, nil, err
}

// Sets the status of the check to healthy if exit code is 0
if execInfo.ExitCode == 0 {
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr)
return
buf, err := c.client.StartExec(c.DockerContainerID, execID)
if err != nil {
return api.HealthCritical, nil, err
}

// Set the status of the check to Warning if exit code is 1
if execInfo.ExitCode == 1 {
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", execInfo.ExitCode)
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr)
return
exitCode, err := c.client.InspectExec(c.DockerContainerID, execID)
if err != nil {
return api.HealthCritical, nil, err
}

// Set the health as critical
c.Logger.Printf("[WARN] agent: Check '%v' is now critical", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr)
}

func shell() string {
if sh := os.Getenv("SHELL"); sh != "" {
return sh
switch exitCode {
case 0:
return api.HealthPassing, buf, nil
case 1:
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
return api.HealthWarning, buf, nil
default:
c.Logger.Printf("[DEBUG] Check failed with exit code: %d", exitCode)
return api.HealthCritical, buf, nil
}
return "/bin/sh"
}
Loading