-
Notifications
You must be signed in to change notification settings - Fork 321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[NET-6581] perf: Fetch services once rather than per-node on deregister #3322
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
```release-note:improvement | ||
control-plane: reduce Consul Catalog API requests required for endpoints reconcile in large clusters | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -933,67 +933,65 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string) | |
// has addresses, it will only deregister instances not in the map. | ||
func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error { | ||
// Get services matching metadata from Consul | ||
nodesWithSvcs, err := r.serviceInstancesForNodes(apiClient, k8sSvcName, k8sSvcNamespace) | ||
serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace) | ||
if err != nil { | ||
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName) | ||
return err | ||
} | ||
|
||
var errs error | ||
for _, nodeSvcs := range nodesWithSvcs { | ||
for _, svc := range nodeSvcs.Services { | ||
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata. | ||
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister | ||
// every service instance. | ||
var serviceDeregistered bool | ||
if endpointsAddressesMap != nil { | ||
if _, ok := endpointsAddressesMap[svc.Address]; !ok { | ||
// If the service address is not in the Endpoints addresses, deregister it. | ||
r.Log.Info("deregistering service from consul", "svc", svc.ID) | ||
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{ | ||
Node: nodeSvcs.Node.Node, | ||
ServiceID: svc.ID, | ||
Namespace: svc.Namespace, | ||
}, nil) | ||
if err != nil { | ||
// Do not exit right away as there might be other services that need to be deregistered. | ||
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID) | ||
errs = multierror.Append(errs, err) | ||
} else { | ||
serviceDeregistered = true | ||
} | ||
} | ||
} else { | ||
r.Log.Info("deregistering service from consul", "svc", svc.ID) | ||
for _, svc := range serviceInstances { | ||
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata. | ||
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister | ||
// every service instance. | ||
var serviceDeregistered bool | ||
if endpointsAddressesMap != nil { | ||
if _, ok := endpointsAddressesMap[svc.ServiceAddress]; !ok { | ||
// If the service address is not in the Endpoints addresses, deregister it. | ||
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID) | ||
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{ | ||
Node: nodeSvcs.Node.Node, | ||
ServiceID: svc.ID, | ||
Node: svc.Node, | ||
ServiceID: svc.ServiceID, | ||
Namespace: svc.Namespace, | ||
}, nil) | ||
if err != nil { | ||
// Do not exit right away as there might be other services that need to be deregistered. | ||
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID) | ||
r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID) | ||
errs = multierror.Append(errs, err) | ||
} else { | ||
serviceDeregistered = true | ||
} | ||
} | ||
} else { | ||
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID) | ||
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{ | ||
Node: svc.Node, | ||
ServiceID: svc.ServiceID, | ||
Namespace: svc.Namespace, | ||
}, nil) | ||
if err != nil { | ||
// Do not exit right away as there might be other services that need to be deregistered. | ||
r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID) | ||
errs = multierror.Append(errs, err) | ||
} else { | ||
serviceDeregistered = true | ||
} | ||
} | ||
|
||
if r.AuthMethod != "" && serviceDeregistered { | ||
r.Log.Info("reconciling ACL tokens for service", "svc", svc.Service) | ||
err := r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.Meta[constants.MetaKeyPodName], svc.Meta[constants.MetaKeyPodUID]) | ||
if err != nil { | ||
r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", svc.Service) | ||
errs = multierror.Append(errs, err) | ||
} | ||
if r.AuthMethod != "" && serviceDeregistered { | ||
r.Log.Info("reconciling ACL tokens for service", "svc", svc.ServiceName) | ||
err := r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.ServiceMeta[constants.MetaKeyPodName], svc.ServiceMeta[constants.MetaKeyPodUID]) | ||
if err != nil { | ||
r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", svc.ServiceName) | ||
errs = multierror.Append(errs, err) | ||
} | ||
} | ||
|
||
if serviceDeregistered { | ||
err = r.deregisterNode(apiClient, nodeSvcs.Node.Node) | ||
if err != nil { | ||
r.Log.Error(err, "failed to deregister node", "svc", svc.Service) | ||
errs = multierror.Append(errs, err) | ||
} | ||
if serviceDeregistered { | ||
err = r.deregisterNode(apiClient, svc.Node) | ||
if err != nil { | ||
r.Log.Error(err, "failed to deregister node", "svc", svc.ServiceName) | ||
errs = multierror.Append(errs, err) | ||
} | ||
} | ||
} | ||
|
@@ -1036,7 +1034,7 @@ func (r *Controller) deregisterNode(apiClient *api.Client, nodeName string) erro | |
// deleteACLTokensForServiceInstance finds the ACL tokens that belongs to the service instance and deletes it from Consul. | ||
// It will only check for ACL tokens that have been created with the auth method this controller | ||
// has been configured with and will only delete tokens for the provided podName and podUID. | ||
func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, svc *api.AgentService, k8sNS, podName, podUID string) error { | ||
func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, svc *api.CatalogService, k8sNS, podName, podUID string) error { | ||
// Skip if podName is empty. | ||
if podName == "" { | ||
return nil | ||
|
@@ -1049,7 +1047,7 @@ func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, sv | |
// matches as well. | ||
tokens, _, err := apiClient.ACL().TokenListFiltered( | ||
api.ACLTokenFilterOptions{ | ||
ServiceName: svc.Service, | ||
ServiceName: svc.ServiceName, | ||
}, | ||
&api.QueryOptions{ | ||
Namespace: svc.Namespace, | ||
|
@@ -1064,7 +1062,7 @@ func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, sv | |
// * have a single service identity whose service name is the same as 'svc.Service' | ||
if token.AuthMethod == r.AuthMethod && | ||
len(token.ServiceIdentities) == 1 && | ||
token.ServiceIdentities[0].ServiceName == svc.Service { | ||
token.ServiceIdentities[0].ServiceName == svc.ServiceName { | ||
tokenMeta, err := getTokenMetaFromDescription(token.Description) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse token metadata: %s", err) | ||
|
@@ -1162,49 +1160,100 @@ func getTokenMetaFromDescription(description string) (map[string]string, error) | |
return tokenMeta, nil | ||
} | ||
|
||
func (r *Controller) serviceInstancesForNodes(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogNodeServiceList, error) { | ||
var serviceList []*api.CatalogNodeServiceList | ||
func (r *Controller) serviceInstances(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogService, error) { | ||
var ( | ||
instances []*api.CatalogService | ||
errs error | ||
) | ||
|
||
// The nodelist may have changed between this point and when the event was raised | ||
// For example, if a pod is evicted because a node has been deleted, there is no guarantee that that node will show up here | ||
// query consul catalog for a list of nodes supporting this service | ||
// quite a lot of results as synthetic nodes are never deregistered. | ||
var nodes []*api.Node | ||
filter := fmt.Sprintf(`Meta[%q] == %q `, "synthetic-node", "true") | ||
nodes, _, err := apiClient.Catalog().Nodes(&api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace}) | ||
// Get the names of services that have the provided k8sServiceName and k8sServiceNamespace in their metadata. | ||
// This is necessary so that we can then list the service instances for each Consul service name, which may | ||
// not match the K8s service name. | ||
services, err := r.servicesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace) | ||
if err != nil { | ||
r.Log.Error(err, "failed to get catalog services", "name", k8sServiceName) | ||
return nil, err | ||
} | ||
|
||
var errs error | ||
for _, node := range nodes { | ||
var nodeServices *api.CatalogNodeServiceList | ||
nodeServices, err := r.serviceInstancesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace, node.Node) | ||
// Query consul catalog for a list of service instances matching the given service names. | ||
filter := fmt.Sprintf(`NodeMeta[%q] == %q and ServiceMeta[%q] == %q and ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`, | ||
zalimeni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
metaKeySyntheticNode, "true", | ||
metaKeyKubeServiceName, k8sServiceName, | ||
constants.MetaKeyKubeNS, k8sServiceNamespace, | ||
metaKeyManagedBy, constants.ManagedByValue) | ||
for _, service := range services { | ||
var is []*api.CatalogService | ||
// Always query the default NS. This ensures that we include mesh gateways, which are always registered to the default NS. | ||
// It also ensures that during migrations that enable namespaces, we deregister old service instances in the default NS. | ||
// An alternative approach to this dual query would be a service instance list using the wildcard NS, which would also | ||
// include instances in Consul namespaces that are no longer in use (e.g. configuration change); this capability does | ||
// not currently exist in Consul's catalog API (as of 1.17) and would need to first be added. | ||
// | ||
// This request uses the service index of the services table (does not perform a full table scan), then decorates each | ||
// result with a single node fetched by ID index from the nodes table. | ||
is, _, err = apiClient.Catalog().Service(service, "", &api.QueryOptions{Filter: filter}) | ||
if err != nil { | ||
errs = multierror.Append(errs, err) | ||
} else { | ||
serviceList = append(serviceList, nodeServices) | ||
instances = append(instances, is...) | ||
} | ||
// If namespaces are enabled a non-default NS is targeted, also query by target Consul NS. | ||
if r.EnableConsulNamespaces { | ||
nonDefaultNamespace := namespaces.NonDefaultConsulNamespace(r.consulNamespace(k8sServiceNamespace)) | ||
if nonDefaultNamespace != "" { | ||
hashi-derek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
is, _, err = apiClient.Catalog().Service(service, "", &api.QueryOptions{Filter: filter, Namespace: nonDefaultNamespace}) | ||
if err != nil { | ||
errs = multierror.Append(errs, err) | ||
} else { | ||
instances = append(instances, is...) | ||
} | ||
} | ||
} | ||
} | ||
|
||
return serviceList, errs | ||
return instances, errs | ||
} | ||
|
||
// serviceInstancesForK8SServiceNameAndNamespace calls Consul's ServicesWithFilter to get the list | ||
// of services instances that have the provided k8sServiceName and k8sServiceNamespace in their metadata. | ||
func (r *Controller) serviceInstancesForK8SServiceNameAndNamespace(apiClient *api.Client, k8sServiceName, k8sServiceNamespace, nodeName string) (*api.CatalogNodeServiceList, error) { | ||
// servicesForK8SServiceNameAndNamespace calls Consul's Services to get the list | ||
// of services that have the provided k8sServiceName and k8sServiceNamespace in their metadata. | ||
func (r *Controller) servicesForK8SServiceNameAndNamespace(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]string, error) { | ||
var ( | ||
serviceList *api.CatalogNodeServiceList | ||
err error | ||
services map[string][]string | ||
err error | ||
) | ||
filter := fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q and Meta[%q] == %q`, | ||
filter := fmt.Sprintf(`ServiceMeta[%q] == %q and ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`, | ||
Comment on lines
-1200
to
+1224
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same filter, different model field. I checked agent/consul/catalog_endpoint.go and If we find the new queries are not performant enough, we have the option of adding an index for |
||
metaKeyKubeServiceName, k8sServiceName, constants.MetaKeyKubeNS, k8sServiceNamespace, metaKeyManagedBy, constants.ManagedByValue) | ||
// Always query the default NS. This ensures that we cover CE->Ent upgrades where services were previously | ||
// in the default NS, as well as mesh gateways, which are always registered to the default NS. | ||
// | ||
// This request performs a NS-bound scan of the services table. If needed in the future, its performance | ||
// could be improved by adding an index on ServiceMeta to Consul's state store. | ||
services, _, err = apiClient.Catalog().Services(&api.QueryOptions{Filter: filter}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// If namespaces are enabled a non-default NS is targeted, also query by target Consul NS. | ||
if r.EnableConsulNamespaces { | ||
serviceList, _, err = apiClient.Catalog().NodeServiceList(nodeName, &api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace}) | ||
} else { | ||
serviceList, _, err = apiClient.Catalog().NodeServiceList(nodeName, &api.QueryOptions{Filter: filter}) | ||
nonDefaultNamespace := namespaces.NonDefaultConsulNamespace(r.consulNamespace(k8sServiceNamespace)) | ||
if nonDefaultNamespace != "" { | ||
ss, _, err := apiClient.Catalog().Services(&api.QueryOptions{Filter: filter, Namespace: nonDefaultNamespace}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// Add to existing map to deduplicate. | ||
for s := range ss { | ||
services[s] = nil // We don't use the tags, so just set to nil | ||
} | ||
} | ||
} | ||
|
||
// Return just the service name keys (we don't need the tags) | ||
// https://developer.hashicorp.com/consul/api-docs/catalog#list-services | ||
var serviceNames []string | ||
for s := range services { | ||
serviceNames = append(serviceNames, s) | ||
} | ||
return serviceList, err | ||
return serviceNames, err | ||
} | ||
|
||
// processPreparedQueryUpstream processes an upstream in the format: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1954,7 +1954,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) { | |
{ | ||
ID: "mesh-gateway", | ||
Kind: api.ServiceKindMeshGateway, | ||
Service: "mesh-gateway", | ||
Service: consulSvcName, | ||
Port: 80, | ||
Address: "1.2.3.4", | ||
Meta: map[string]string{ | ||
|
@@ -1985,7 +1985,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) { | |
{ | ||
ID: "mesh-gateway", | ||
Kind: api.ServiceKindMeshGateway, | ||
Service: "mesh-gateway", | ||
Service: consulSvcName, | ||
Port: 80, | ||
Address: "1.2.3.4", | ||
Meta: map[string]string{ | ||
|
@@ -2016,7 +2016,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) { | |
{ | ||
ID: "terminating-gateway", | ||
Kind: api.ServiceKindTerminatingGateway, | ||
Service: "terminating-gateway", | ||
Service: consulSvcName, | ||
Port: 8443, | ||
Address: "1.2.3.4", | ||
Meta: map[string]string{ | ||
|
@@ -2037,7 +2037,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) { | |
{ | ||
ID: "terminating-gateway", | ||
Kind: api.ServiceKindTerminatingGateway, | ||
Service: "terminating-gateway", | ||
Service: consulSvcName, | ||
Port: 8443, | ||
Address: "1.2.3.4", | ||
Meta: map[string]string{ | ||
|
@@ -2089,7 +2089,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) { | |
{ | ||
ID: "ingress-gateway", | ||
Kind: api.ServiceKindIngressGateway, | ||
Service: "ingress-gateway", | ||
Service: consulSvcName, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here and above: this test was specifying the wrong service name vs. the one checked in assertions that instances are deregistered. This turns the false negatives into true negatives. |
||
Port: 80, | ||
Address: "1.2.3.4", | ||
Meta: map[string]string{ | ||
|
@@ -2175,15 +2175,16 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) { | |
|
||
// Create the endpoints controller. | ||
ep := &Controller{ | ||
Client: fakeClient, | ||
Log: logrtest.NewTestLogger(t), | ||
ConsulClientConfig: testClient.Cfg, | ||
ConsulServerConnMgr: testClient.Watcher, | ||
AllowK8sNamespacesSet: mapset.NewSetWith("*"), | ||
DenyK8sNamespacesSet: mapset.NewSetWith(), | ||
ReleaseName: "consul", | ||
ReleaseNamespace: "default", | ||
EnableConsulNamespaces: true, | ||
Client: fakeClient, | ||
Log: logrtest.NewTestLogger(t), | ||
ConsulClientConfig: testClient.Cfg, | ||
ConsulServerConnMgr: testClient.Watcher, | ||
AllowK8sNamespacesSet: mapset.NewSetWith("*"), | ||
DenyK8sNamespacesSet: mapset.NewSetWith(), | ||
ReleaseName: "consul", | ||
ReleaseNamespace: "default", | ||
EnableConsulNamespaces: true, | ||
ConsulDestinationNamespace: ts.ConsulNS, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This last line addition of |
||
} | ||
if tt.enableACLs { | ||
ep.AuthMethod = test.AuthMethod | ||
|
@@ -2216,6 +2217,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) { | |
} | ||
|
||
token, _, err = consulClient.ACL().TokenRead(token.AccessorID, queryOpts) | ||
require.Error(t, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Panic fix when test fails on no error |
||
require.Contains(t, err.Error(), "ACL not found", token) | ||
} | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the above code is unchanged. The GH diff is easier to read with ?w=1.
The changes are:
CatalogNodeServiceList.Node.Node
->ServiceNode.Node
NodeService.ID
->ServiceNode.ServiceID
NodeService.Service
->ServiceNode.ServiceName
NodeService.Meta
->ServiceNode.ServiceMeta
NodeService.Address
->ServiceNode.ServiceAddress
Here's a visual of the diff of the two models read locally from a multi-node
kind
cluster (the sidecar proxy is similar and carries identicalNodeMeta
andServiceMeta
, which is what we filter on):