Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle errors properly when services are de-registered from the catalog #2571

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/2571.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
control-plane: fix bug in endpoints controller when deregistering services from consul when a node is deleted.
```
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

err = r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints)

// endpointPods holds a set of all pods this endpoints object is currently pointing to.
// We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul
// is for a pod that no longer exists.
Expand All @@ -183,7 +182,7 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// It is possible that the endpoints object has never been registered, in which case deregistration is a no-op.
if isLabeledIgnore(serviceEndpoints.Labels) {
// We always deregister the service to handle the case where a user has registered the service, then added the label later.
r.Log.Info("Ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace)
r.Log.Info("ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace)
err = r.deregisterService(apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -915,14 +914,14 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string)
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {
// Get services matching metadata.
nodesWithSvcs, err := r.serviceInstancesForK8sNodes(apiClient, k8sSvcName, k8sSvcNamespace)
// Get services matching metadata from Consul
nodesWithSvcs, err := r.serviceInstancesForNodes(apiClient, k8sSvcName, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return err
}

// Deregister each service instance that matches the metadata.
var errs error
for _, nodeSvcs := range nodesWithSvcs {
for _, svc := range nodeSvcs.Services {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
Expand All @@ -933,42 +932,48 @@ func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvc
if _, ok := endpointsAddressesMap[svc.Address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ID)
_, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: nodeSvcs.Node.Node,
ServiceID: svc.ID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID)
return err
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
serviceDeregistered = true
}
} else {
r.Log.Info("deregistering service from consul", "svc", svc.ID)
if _, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: nodeSvcs.Node.Node,
ServiceID: svc.ID,
Namespace: svc.Namespace,
}, nil); err != nil {
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID)
return err
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
serviceDeregistered = true
}

if r.AuthMethod != "" && serviceDeregistered {
r.Log.Info("reconciling ACL tokens for service", "svc", svc.Service)
err = r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.Meta[constants.MetaKeyPodName])
err := r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.Meta[constants.MetaKeyPodName])
if err != nil {
r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", svc.Service)
return err
errs = multierror.Append(errs, err)
}
}
}
}

return nil
return errs

}

// deleteACLTokensForServiceInstance finds the ACL tokens that belongs to the service instance and deletes it from Consul.
Expand Down Expand Up @@ -1088,21 +1093,32 @@ func getTokenMetaFromDescription(description string) (map[string]string, error)
return tokenMeta, nil
}

func (r *Controller) serviceInstancesForK8sNodes(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogNodeServiceList, error) {
func (r *Controller) serviceInstancesForNodes(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogNodeServiceList, error) {
var serviceList []*api.CatalogNodeServiceList
// Get a list of k8s nodes.
var nodeList corev1.NodeList
err := r.Client.List(r.Context, &nodeList)

// The nodelist may have changed between this point and when the event was raised
// For example, if a pod is evicted because a node has been deleted, there is no guarantee that that node will show up here
// query consul catalog for a list of nodes supporting this service
// quite a lot of results as synthetic nodes are never deregistered.
var nodes []*api.Node
filter := fmt.Sprintf(`Meta[%q] == %q `, "synthetic-node", "true")
nodes, _, err := apiClient.Catalog().Nodes(&api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace})
if err != nil {
return nil, err
}
for _, node := range nodeList.Items {

var errs error
for _, node := range nodes {
var nodeServices *api.CatalogNodeServiceList
nodeServices, err = r.serviceInstancesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace, common.ConsulNodeNameFromK8sNode(node.Name))
serviceList = append(serviceList, nodeServices)
nodeServices, err := r.serviceInstancesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace, node.Node)
if err != nil {
errs = multierror.Append(errs, err)
} else {
serviceList = append(serviceList, nodeServices)
}
}

return serviceList, err
return serviceList, errs
}

// serviceInstancesForK8SServiceNameAndNamespace calls Consul's ServicesWithFilter to get the list
Expand Down
Loading
Loading