Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] When a node level check is removed, ensure all services of node are notified #3970

Merged
85 changes: 43 additions & 42 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,43 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
return nil
}

// This ensure all connected services to a check are updated properly
func (s *Store) alterServicesFromCheck(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck, checkServiceExists bool) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go style is to start the comment with the name of the method and write full sentence:

// alterServicesFromCheck ensures ... are updated properly.
func (s *Store) alterServicesFromCheck(...)

Also the name confuses me: it's names "alter services" yet it actually updates the service index right not the actual service record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DONE in next patch

// If the check is associated with a service, check that we have
// a registration for the service.
if hc.ServiceID != "" {
service, err := tx.First("services", "id", hc.Node, hc.ServiceID)
// When deleting a service, service might have been removed already
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, I guess this is refering to the checkServiceExists flag passed which seems reasonable, but then in the case that we don't check existence and so skip the first error handler, the second handler will be triggered and return an error anyway?

The caller in deleteCheckTxn doesn't distinguish between those error types so will fail if this returns any error anyway. Can we just remove this case as it seems to do nothing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I did it another way in next patch: I renamed alterServicesFromCheck and it only updates all Service Indexes for a given node

if err != nil && checkServiceExists {
return fmt.Errorf("failed service lookup: %s", err)
}
if service == nil {
return ErrMissingService
}

// Copy in the service name and tags
svc := service.(*structs.ServiceNode)
hc.ServiceName = svc.ServiceName
hc.ServiceTags = svc.ServiceTags
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} else {
// Update the status for all the services associated with this node
services, err := tx.Get("services", "node", hc.Node)
if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
}
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode).ToNodeService()
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
}
return nil
}

// ensureCheckTransaction is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting
// checks with no matching node or service.
Expand Down Expand Up @@ -1119,36 +1156,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
return ErrMissingNode
}

// If the check is associated with a service, check that we have
// a registration for the service.
if hc.ServiceID != "" {
service, err := tx.First("services", "id", hc.Node, hc.ServiceID)
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
if service == nil {
return ErrMissingService
}

// Copy in the service name and tags
svc := service.(*structs.ServiceNode)
hc.ServiceName = svc.ServiceName
hc.ServiceTags = svc.ServiceTags
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} else {
// Update the status for all the services associated with this node
services, err := tx.Get("services", "node", hc.Node)
if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
}
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode).ToNodeService()
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
err = s.alterServicesFromCheck(tx, idx, hc, true)
if err != nil {
return err
}

// Delete any sessions for this check if the health is critical.
Expand Down Expand Up @@ -1390,19 +1400,10 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
return nil
}
existing := hc.(*structs.HealthCheck)
if existing != nil && existing.ServiceID != "" {
service, err := tx.First("services", "id", node, existing.ServiceID)
if existing != nil {
err = s.alterServicesFromCheck(tx, idx, existing, false)
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
if service == nil {
return ErrMissingService
}

// Updated index of service
svc := service.(*structs.ServiceNode)
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err)
}
}

Expand Down
5 changes: 5 additions & 0 deletions agent/consul/state/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2119,6 +2119,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
// Register a node and a node-level health check.
testRegisterNode(t, s, 1, "node1")
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
testRegisterService(t, s, 2, "node1", "service1")

// Make sure the check is there.
ws := memdb.NewWatchSet()
Expand All @@ -2130,13 +2131,17 @@ func TestStateStore_DeleteCheck(t *testing.T) {
t.Fatalf("bad: %#v", checks)
}

ensureServiceVersion(t, s, ws, "service1", 2, 1)

// Delete the check.
if err := s.DeleteCheck(3, "node1", "check1"); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// All services linked to this node should have their index updated
ensureServiceVersion(t, s, ws, "service1", 3, 1)

// Check is gone
ws = memdb.NewWatchSet()
Expand Down