Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ability to deregister a service based on critical check state longer than a timeout. #2276

Merged
merged 4 commits into from
Aug 16, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ type AgentCheckRegistration struct {
AgentServiceCheck
}

// AgentServiceCheck is used to create an associated
// check for a service
// AgentServiceCheck is used to define a node or service level check
type AgentServiceCheck struct {
Script string `json:",omitempty"`
DockerContainerID string `json:",omitempty"`
Expand All @@ -74,6 +73,14 @@ type AgentServiceCheck struct {
HTTP string `json:",omitempty"`
TCP string `json:",omitempty"`
Status string `json:",omitempty"`

// In Consul 0.7 and later, checks that are associated with a service
// may also contain this optional DeregisterCriticalServiceAfter field,
// which is a timeout in the same Go time format as Interval and TTL. If
// a check is in the critical state for more than this configured value,
// then its associated service (and all of its associated checks) will
// automatically be deregistered.
DeregisterCriticalServiceAfter string `json:",omitempty"`
}
type AgentServiceChecks []*AgentServiceCheck

Expand Down
7 changes: 7 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,13 @@ func TestAgent_Checks_serviceBound(t *testing.T) {
ServiceID: "redis",
}
reg.TTL = "15s"
reg.DeregisterCriticalServiceAfter = "nope"
err := agent.CheckRegister(reg)
if err == nil || !strings.Contains(err.Error(), "invalid duration") {
t.Fatalf("err: %v", err)
}

reg.DeregisterCriticalServiceAfter = "90m"
if err := agent.CheckRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
118 changes: 94 additions & 24 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ type Agent struct {
// services and checks. Used for anti-entropy.
state localState

// checkReapAfter maps the check ID to a timeout after which we should
// reap its associated service
checkReapAfter map[types.CheckID]time.Duration

// checkMonitors maps the check ID to an associated monitor
checkMonitors map[types.CheckID]*CheckMonitor

Expand Down Expand Up @@ -174,24 +178,25 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
}

agent := &Agent{
config: config,
logger: log.New(logOutput, "", log.LstdFlags),
logOutput: logOutput,
checkMonitors: make(map[types.CheckID]*CheckMonitor),
checkTTLs: make(map[types.CheckID]*CheckTTL),
checkHTTPs: make(map[types.CheckID]*CheckHTTP),
checkTCPs: make(map[types.CheckID]*CheckTCP),
checkDockers: make(map[types.CheckID]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
}

// Initialize the local state
config: config,
logger: log.New(logOutput, "", log.LstdFlags),
logOutput: logOutput,
checkReapAfter: make(map[types.CheckID]time.Duration),
checkMonitors: make(map[types.CheckID]*CheckMonitor),
checkTTLs: make(map[types.CheckID]*CheckTTL),
checkHTTPs: make(map[types.CheckID]*CheckHTTP),
checkTCPs: make(map[types.CheckID]*CheckTCP),
checkDockers: make(map[types.CheckID]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
}

// Initialize the local state.
agent.state.Init(config, agent.logger)

// Setup either the client or the server
// Setup either the client or the server.
var err error
if config.Server {
err = agent.setupServer()
Expand All @@ -213,23 +218,27 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err
}

// Load checks/services
// Load checks/services.
if err := agent.loadServices(config); err != nil {
return nil, err
}
if err := agent.loadChecks(config); err != nil {
return nil, err
}

// Start handling events
// Start watching for critical services to deregister, based on their
// checks.
go agent.reapServices()

// Start handling events.
go agent.handleEvents()

// Start sending network coordinate to the server.
if !config.DisableCoordinates {
go agent.sendCoordinate()
}

// Write out the PID file if necessary
// Write out the PID file if necessary.
err = agent.storePid()
if err != nil {
return nil, err
Expand Down Expand Up @@ -664,6 +673,52 @@ func (a *Agent) sendCoordinate() {
}
}

// reapServices is a long running goroutine that looks for checks that have been
// critical too long and dregisters their associated services.
func (a *Agent) reapServices() {
reap := func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just move this out to a named function

reaped := make(map[string]struct{})
for checkID, check := range a.state.CriticalChecks() {
// There's nothing to do if there's no service.
if check.Check.ServiceID == "" {
continue
}

// There might be multiple checks for one service, so
// we don't need to reap multiple times.
serviceID := check.Check.ServiceID
if _, ok := reaped[serviceID]; ok {
continue
}

// See if there's a timeout.
a.checkLock.Lock()
timeout, ok := a.checkReapAfter[checkID]
a.checkLock.Unlock()

// Reap, if necessary. We keep track of which service
// this is so that we won't try to remove it again.
if ok && check.CriticalFor > timeout {
reaped[serviceID] = struct{}{}
a.RemoveService(serviceID, true)
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
checkID, serviceID)
}
}
}

for {
select {
case <-time.After(a.config.CheckReapInterval):
reap()

case <-a.shutdownCh:
return
}
}

}

// persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
Expand Down Expand Up @@ -987,6 +1042,18 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
} else {
return fmt.Errorf("Check type is not valid")
}

if chkType.DeregisterCriticalServiceAfter > 0 {
timeout := chkType.DeregisterCriticalServiceAfter
if timeout < a.config.CheckDeregisterIntervalMin {
timeout = a.config.CheckDeregisterIntervalMin
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has deregister interval below minimum of %v",
check.CheckID, a.config.CheckDeregisterIntervalMin))
}
a.checkReapAfter[check.CheckID] = timeout
} else {
delete(a.checkReapAfter, check.CheckID)
}
}

// Add to the local state for anti-entropy
Expand Down Expand Up @@ -1015,6 +1082,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
defer a.checkLock.Unlock()

// Stop any monitors
delete(a.checkReapAfter, checkID)
if check, ok := a.checkMonitors[checkID]; ok {
check.Stop()
delete(a.checkMonitors, checkID)
Expand Down Expand Up @@ -1043,25 +1111,27 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
return nil
}

// UpdateCheck is used to update the status of a check.
// This can only be used with checks of the TTL type.
func (a *Agent) UpdateCheck(checkID types.CheckID, status, output string) error {
// updateTTLCheck is used to update the status of a TTL check via the Agent API.
func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) error {
a.checkLock.Lock()
defer a.checkLock.Unlock()

// Grab the TTL check.
check, ok := a.checkTTLs[checkID]
if !ok {
return fmt.Errorf("CheckID %q does not have associated TTL", checkID)
}

// Set the status through CheckTTL to reset the TTL
// Set the status through CheckTTL to reset the TTL.
check.SetStatus(status, output)

// We don't write any files in dev mode so bail here.
if a.config.DevMode {
return nil
}

// Always persist the state for TTL checks
// Persist the state so the TTL check can come up in a good state after
// an agent restart, especially with long TTL values.
if err := a.persistCheckState(check, status, output); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like persistCheckState should save the critical time, so that an agent restart does not reset all the timers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this but the actual time is managed down in the local state, and we want to be pretty conservative with deregisters, so this seemed like a good way to go.

return fmt.Errorf("failed persisting state for check %q: %s", checkID, err)
}
Expand Down
8 changes: 4 additions & 4 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Re
func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/"))
note := req.URL.Query().Get("note")
if err := s.agent.UpdateCheck(checkID, structs.HealthPassing, note); err != nil {
if err := s.agent.updateTTLCheck(checkID, structs.HealthPassing, note); err != nil {
return nil, err
}
s.syncChanges()
Expand All @@ -154,7 +154,7 @@ func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request)
func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/"))
note := req.URL.Query().Get("note")
if err := s.agent.UpdateCheck(checkID, structs.HealthWarning, note); err != nil {
if err := s.agent.updateTTLCheck(checkID, structs.HealthWarning, note); err != nil {
return nil, err
}
s.syncChanges()
Expand All @@ -164,7 +164,7 @@ func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request)
func (s *HTTPServer) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/"))
note := req.URL.Query().Get("note")
if err := s.agent.UpdateCheck(checkID, structs.HealthCritical, note); err != nil {
if err := s.agent.updateTTLCheck(checkID, structs.HealthCritical, note); err != nil {
return nil, err
}
s.syncChanges()
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques
}

checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))
if err := s.agent.UpdateCheck(checkID, update.Status, update.Output); err != nil {
if err := s.agent.updateTTLCheck(checkID, update.Status, update.Output); err != nil {
return nil, err
}
s.syncChanges()
Expand Down
Loading