Skip to content

Commit

Permalink
Adds ability to deregister a service based on critical check state lo…
Browse files Browse the repository at this point in the history
…nger than a timeout.
  • Loading branch information
slackpad committed Aug 16, 2016
1 parent 315b9d4 commit 4a3d7db
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 56 deletions.
11 changes: 9 additions & 2 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ type AgentCheckRegistration struct {
AgentServiceCheck
}

// AgentServiceCheck is used to create an associated
// check for a service
// AgentServiceCheck is used to define a node or service level check
type AgentServiceCheck struct {
Script string `json:",omitempty"`
DockerContainerID string `json:",omitempty"`
Expand All @@ -74,6 +73,14 @@ type AgentServiceCheck struct {
HTTP string `json:",omitempty"`
TCP string `json:",omitempty"`
Status string `json:",omitempty"`

// Checks that are associated with a service may also contain this
// optional DeregisterCriticalServiceAfter field, which is a timeout in
// the same Go time format as Interval and TTL. If a check is in the
// critical state for more than this configured value, then its
// associated service (and all of its associated checks) will
// automatically be deregistered.
DeregisterCriticalServiceAfter string `json:",omitempty"`
}
type AgentServiceChecks []*AgentServiceCheck

Expand Down
7 changes: 7 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,13 @@ func TestAgent_Checks_serviceBound(t *testing.T) {
ServiceID: "redis",
}
reg.TTL = "15s"
reg.DeregisterCriticalServiceAfter = "nope"
err := agent.CheckRegister(reg)
if err == nil || !strings.Contains(err.Error(), "invalid duration") {
t.Fatalf("err: %v", err)
}

reg.DeregisterCriticalServiceAfter = "90m"
if err := agent.CheckRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
118 changes: 94 additions & 24 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ type Agent struct {
// services and checks. Used for anti-entropy.
state localState

// checkReapAfter maps the check ID to a timeout after which we should
// reap its associated service
checkReapAfter map[types.CheckID]time.Duration

// checkMonitors maps the check ID to an associated monitor
checkMonitors map[types.CheckID]*CheckMonitor

Expand Down Expand Up @@ -174,24 +178,25 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
}

agent := &Agent{
config: config,
logger: log.New(logOutput, "", log.LstdFlags),
logOutput: logOutput,
checkMonitors: make(map[types.CheckID]*CheckMonitor),
checkTTLs: make(map[types.CheckID]*CheckTTL),
checkHTTPs: make(map[types.CheckID]*CheckHTTP),
checkTCPs: make(map[types.CheckID]*CheckTCP),
checkDockers: make(map[types.CheckID]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
}

// Initialize the local state
config: config,
logger: log.New(logOutput, "", log.LstdFlags),
logOutput: logOutput,
checkReapAfter: make(map[types.CheckID]time.Duration),
checkMonitors: make(map[types.CheckID]*CheckMonitor),
checkTTLs: make(map[types.CheckID]*CheckTTL),
checkHTTPs: make(map[types.CheckID]*CheckHTTP),
checkTCPs: make(map[types.CheckID]*CheckTCP),
checkDockers: make(map[types.CheckID]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
}

// Initialize the local state.
agent.state.Init(config, agent.logger)

// Setup either the client or the server
// Setup either the client or the server.
var err error
if config.Server {
err = agent.setupServer()
Expand All @@ -213,23 +218,27 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err
}

// Load checks/services
// Load checks/services.
if err := agent.loadServices(config); err != nil {
return nil, err
}
if err := agent.loadChecks(config); err != nil {
return nil, err
}

// Start handling events
// Start watching for critical services to deregister, based on their
// checks.
go agent.reapServices()

// Start handling events.
go agent.handleEvents()

// Start sending network coordinate to the server.
if !config.DisableCoordinates {
go agent.sendCoordinate()
}

// Write out the PID file if necessary
// Write out the PID file if necessary.
err = agent.storePid()
if err != nil {
return nil, err
Expand Down Expand Up @@ -664,6 +673,52 @@ func (a *Agent) sendCoordinate() {
}
}

// reapServices is a long running goroutine that looks for checks that have been
// critical too long and dregisters their associated services.
func (a *Agent) reapServices() {
reap := func() {
reaped := make(map[string]struct{})
for checkID, check := range a.state.CriticalChecks() {
// There's nothing to do if there's no service.
if check.Check.ServiceID == "" {
continue
}

// There might be multiple checks for one service, so
// we don't need to reap multiple times.
serviceID := check.Check.ServiceID
if _, ok := reaped[serviceID]; ok {
continue
}

// See if there's a timeout.
a.checkLock.Lock()
timeout, ok := a.checkReapAfter[checkID]
a.checkLock.Unlock()

// Reap, if necessary. We keep track of which service
// this is so that we won't try to remove it again.
if ok && check.CriticalFor > timeout {
reaped[serviceID] = struct{}{}
a.RemoveService(serviceID, true)
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
checkID, serviceID)
}
}
}

for {
select {
case <-time.After(a.config.CheckReapInterval):
reap()

case <-a.shutdownCh:
return
}
}

}

// persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
Expand Down Expand Up @@ -987,6 +1042,18 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
} else {
return fmt.Errorf("Check type is not valid")
}

if chkType.DeregisterCriticalServiceAfter > 0 {
timeout := chkType.DeregisterCriticalServiceAfter
if timeout < a.config.CheckDeregisterIntervalMin {
timeout = a.config.CheckDeregisterIntervalMin
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has deregister interval below minimum of %v",
check.CheckID, a.config.CheckDeregisterIntervalMin))
}
a.checkReapAfter[check.CheckID] = timeout
} else {
delete(a.checkReapAfter, check.CheckID)
}
}

// Add to the local state for anti-entropy
Expand Down Expand Up @@ -1015,6 +1082,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
defer a.checkLock.Unlock()

// Stop any monitors
delete(a.checkReapAfter, checkID)
if check, ok := a.checkMonitors[checkID]; ok {
check.Stop()
delete(a.checkMonitors, checkID)
Expand Down Expand Up @@ -1043,25 +1111,27 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
return nil
}

// UpdateCheck is used to update the status of a check.
// This can only be used with checks of the TTL type.
func (a *Agent) UpdateCheck(checkID types.CheckID, status, output string) error {
// updateTTLCheck is used to update the status of a TTL check via the Agent API.
func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) error {
a.checkLock.Lock()
defer a.checkLock.Unlock()

// Grab the TTL check.
check, ok := a.checkTTLs[checkID]
if !ok {
return fmt.Errorf("CheckID %q does not have associated TTL", checkID)
}

// Set the status through CheckTTL to reset the TTL
// Set the status through CheckTTL to reset the TTL.
check.SetStatus(status, output)

// We don't write any files in dev mode so bail here.
if a.config.DevMode {
return nil
}

// Always persist the state for TTL checks
// Persist the state so the TTL check can come up in a good state after
// an agent restart, especially with long TTL values.
if err := a.persistCheckState(check, status, output); err != nil {
return fmt.Errorf("failed persisting state for check %q: %s", checkID, err)
}
Expand Down
8 changes: 4 additions & 4 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Re
func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/"))
note := req.URL.Query().Get("note")
if err := s.agent.UpdateCheck(checkID, structs.HealthPassing, note); err != nil {
if err := s.agent.updateTTLCheck(checkID, structs.HealthPassing, note); err != nil {
return nil, err
}
s.syncChanges()
Expand All @@ -154,7 +154,7 @@ func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request)
func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/"))
note := req.URL.Query().Get("note")
if err := s.agent.UpdateCheck(checkID, structs.HealthWarning, note); err != nil {
if err := s.agent.updateTTLCheck(checkID, structs.HealthWarning, note); err != nil {
return nil, err
}
s.syncChanges()
Expand All @@ -164,7 +164,7 @@ func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request)
func (s *HTTPServer) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/"))
note := req.URL.Query().Get("note")
if err := s.agent.UpdateCheck(checkID, structs.HealthCritical, note); err != nil {
if err := s.agent.updateTTLCheck(checkID, structs.HealthCritical, note); err != nil {
return nil, err
}
s.syncChanges()
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques
}

checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))
if err := s.agent.UpdateCheck(checkID, update.Status, update.Output); err != nil {
if err := s.agent.updateTTLCheck(checkID, update.Status, update.Output); err != nil {
return nil, err
}
s.syncChanges()
Expand Down
Loading

0 comments on commit 4a3d7db

Please sign in to comment.