Skip to content

Commit

Permalink
perf: Fetch services once rather than per-node on deregister
Browse files Browse the repository at this point in the history
Rather than fetching all nodes in a cluster then listing services
per-node, fetch all service instances directly by name.

This should generally reduce the cost of deregistration in endpoints
controller reconciles (in terms of network calls) from N calls (N=node
count) to 3-6 calls regardless of K8s cluster size.

This does trade the cost of node list and per-node instance fetching for
a bulk fetch of service instances. However, a bulk fetch was the
previous behavior prior to the introduction of Consul node mirroring in
`consul-k8s`, and in the majority of real-world use cases, should be
cheaper than listing all nodes and fetching services individually per
node in the vast majority of cases.

This change is motivated by customers with larger K8s clusters (node
counts) seeing performance issues with reconciles.
  • Loading branch information
zalimeni committed Dec 21, 2023
1 parent 6f293d5 commit 1171d16
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 101 deletions.
3 changes: 3 additions & 0 deletions .changelog/3322.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
control-plane: reduce Consul Catalog API requests required for endpoints reconcile in large clusters
```
Original file line number Diff line number Diff line change
Expand Up @@ -933,67 +933,65 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string)
// has addresses, it will only deregister instances not in the map.
func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {
// Get services matching metadata from Consul
nodesWithSvcs, err := r.serviceInstancesForNodes(apiClient, k8sSvcName, k8sSvcNamespace)
serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return err
}

var errs error
for _, nodeSvcs := range nodesWithSvcs {
for _, svc := range nodeSvcs.Services {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
var serviceDeregistered bool
if endpointsAddressesMap != nil {
if _, ok := endpointsAddressesMap[svc.Address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: nodeSvcs.Node.Node,
ServiceID: svc.ID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID)
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
}
} else {
r.Log.Info("deregistering service from consul", "svc", svc.ID)
for _, svc := range serviceInstances {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
var serviceDeregistered bool
if endpointsAddressesMap != nil {
if _, ok := endpointsAddressesMap[svc.ServiceAddress]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: nodeSvcs.Node.Node,
ServiceID: svc.ID,
Node: svc.Node,
ServiceID: svc.ServiceID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID)
r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID)
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
}
} else {
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID)
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
}

if r.AuthMethod != "" && serviceDeregistered {
r.Log.Info("reconciling ACL tokens for service", "svc", svc.Service)
err := r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.Meta[constants.MetaKeyPodName], svc.Meta[constants.MetaKeyPodUID])
if err != nil {
r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", svc.Service)
errs = multierror.Append(errs, err)
}
if r.AuthMethod != "" && serviceDeregistered {
r.Log.Info("reconciling ACL tokens for service", "svc", svc.ServiceName)
err := r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.ServiceMeta[constants.MetaKeyPodName], svc.ServiceMeta[constants.MetaKeyPodUID])
if err != nil {
r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", svc.ServiceName)
errs = multierror.Append(errs, err)
}
}

if serviceDeregistered {
err = r.deregisterNode(apiClient, nodeSvcs.Node.Node)
if err != nil {
r.Log.Error(err, "failed to deregister node", "svc", svc.Service)
errs = multierror.Append(errs, err)
}
if serviceDeregistered {
err = r.deregisterNode(apiClient, svc.Node)
if err != nil {
r.Log.Error(err, "failed to deregister node", "svc", svc.ServiceName)
errs = multierror.Append(errs, err)
}
}
}
Expand Down Expand Up @@ -1036,7 +1034,7 @@ func (r *Controller) deregisterNode(apiClient *api.Client, nodeName string) erro
// deleteACLTokensForServiceInstance finds the ACL tokens that belongs to the service instance and deletes it from Consul.
// It will only check for ACL tokens that have been created with the auth method this controller
// has been configured with and will only delete tokens for the provided podName and podUID.
func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, svc *api.AgentService, k8sNS, podName, podUID string) error {
func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, svc *api.CatalogService, k8sNS, podName, podUID string) error {
// Skip if podName is empty.
if podName == "" {
return nil
Expand All @@ -1049,7 +1047,7 @@ func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, sv
// matches as well.
tokens, _, err := apiClient.ACL().TokenListFiltered(
api.ACLTokenFilterOptions{
ServiceName: svc.Service,
ServiceName: svc.ServiceName,
},
&api.QueryOptions{
Namespace: svc.Namespace,
Expand All @@ -1064,7 +1062,7 @@ func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, sv
// * have a single service identity whose service name is the same as 'svc.Service'
if token.AuthMethod == r.AuthMethod &&
len(token.ServiceIdentities) == 1 &&
token.ServiceIdentities[0].ServiceName == svc.Service {
token.ServiceIdentities[0].ServiceName == svc.ServiceName {
tokenMeta, err := getTokenMetaFromDescription(token.Description)
if err != nil {
return fmt.Errorf("failed to parse token metadata: %s", err)
Expand Down Expand Up @@ -1162,49 +1160,100 @@ func getTokenMetaFromDescription(description string) (map[string]string, error)
return tokenMeta, nil
}

func (r *Controller) serviceInstancesForNodes(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogNodeServiceList, error) {
var serviceList []*api.CatalogNodeServiceList
func (r *Controller) serviceInstances(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogService, error) {
var (
instances []*api.CatalogService
errs error
)

// The nodelist may have changed between this point and when the event was raised
// For example, if a pod is evicted because a node has been deleted, there is no guarantee that that node will show up here
// query consul catalog for a list of nodes supporting this service
// quite a lot of results as synthetic nodes are never deregistered.
var nodes []*api.Node
filter := fmt.Sprintf(`Meta[%q] == %q `, "synthetic-node", "true")
nodes, _, err := apiClient.Catalog().Nodes(&api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace})
// Get the names of services that have the provided k8sServiceName and k8sServiceNamespace in their metadata.
// This is necessary so that we can then list the service instances for each Consul service name, which may
// not match the K8s service name.
services, err := r.servicesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace)
if err != nil {
r.Log.Error(err, "failed to get catalog services", "name", k8sServiceName)
return nil, err
}

var errs error
for _, node := range nodes {
var nodeServices *api.CatalogNodeServiceList
nodeServices, err := r.serviceInstancesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace, node.Node)
// Query consul catalog for a list of service instances matching the given service names.
filter := fmt.Sprintf(`NodeMeta[%q] == %q and ServiceMeta[%q] == %q and ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`,
metaKeySyntheticNode, "true",
metaKeyKubeServiceName, k8sServiceName,
constants.MetaKeyKubeNS, k8sServiceNamespace,
metaKeyManagedBy, constants.ManagedByValue)
for _, service := range services {
var is []*api.CatalogService
// Always query the default NS. This ensures that we include mesh gateways, which are always registered to the default NS.
// It also ensures that during migrations that enable namespaces, we deregister old service instances in the default NS.
// An alternative approach to this dual query would be a service instance list using the wildcard NS, which would also
// include instances in Consul namespaces that are no longer in use (e.g. configuration change); this capability does
// not currently exist in Consul's catalog API (as of 1.17) and would need to first be added.
//
// This request uses the service index of the services table (does not perform a full table scan), then decorates each
// result with a single node fetched by ID index from the nodes table.
is, _, err = apiClient.Catalog().Service(service, "", &api.QueryOptions{Filter: filter})
if err != nil {
errs = multierror.Append(errs, err)
} else {
serviceList = append(serviceList, nodeServices)
instances = append(instances, is...)
}
// If namespaces are enabled a non-default NS is targeted, also query by target Consul NS.
if r.EnableConsulNamespaces {
nonDefaultNamespace := namespaces.NonDefaultConsulNamespace(r.consulNamespace(k8sServiceNamespace))
if nonDefaultNamespace != "" {
is, _, err = apiClient.Catalog().Service(service, "", &api.QueryOptions{Filter: filter, Namespace: nonDefaultNamespace})
if err != nil {
errs = multierror.Append(errs, err)
} else {
instances = append(instances, is...)
}
}
}
}

return serviceList, errs
return instances, errs
}

// serviceInstancesForK8SServiceNameAndNamespace calls Consul's ServicesWithFilter to get the list
// of services instances that have the provided k8sServiceName and k8sServiceNamespace in their metadata.
func (r *Controller) serviceInstancesForK8SServiceNameAndNamespace(apiClient *api.Client, k8sServiceName, k8sServiceNamespace, nodeName string) (*api.CatalogNodeServiceList, error) {
// servicesForK8SServiceNameAndNamespace calls Consul's Services to get the list
// of services that have the provided k8sServiceName and k8sServiceNamespace in their metadata.
func (r *Controller) servicesForK8SServiceNameAndNamespace(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]string, error) {
var (
serviceList *api.CatalogNodeServiceList
err error
services map[string][]string
err error
)
filter := fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q and Meta[%q] == %q`,
filter := fmt.Sprintf(`ServiceMeta[%q] == %q and ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`,
metaKeyKubeServiceName, k8sServiceName, constants.MetaKeyKubeNS, k8sServiceNamespace, metaKeyManagedBy, constants.ManagedByValue)
// Always query the default NS. This ensures that we cover CE->Ent upgrades where services were previously
// in the default NS, as well as mesh gateways, which are always registered to the default NS.
//
// This request performs a NS-bound scan of the services table. If needed in the future, its performance
// could be improved by adding an index on ServiceMeta to Consul's state store.
services, _, err = apiClient.Catalog().Services(&api.QueryOptions{Filter: filter})
if err != nil {
return nil, err
}
// If namespaces are enabled a non-default NS is targeted, also query by target Consul NS.
if r.EnableConsulNamespaces {
serviceList, _, err = apiClient.Catalog().NodeServiceList(nodeName, &api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace})
} else {
serviceList, _, err = apiClient.Catalog().NodeServiceList(nodeName, &api.QueryOptions{Filter: filter})
nonDefaultNamespace := namespaces.NonDefaultConsulNamespace(r.consulNamespace(k8sServiceNamespace))
if nonDefaultNamespace != "" {
ss, _, err := apiClient.Catalog().Services(&api.QueryOptions{Filter: filter, Namespace: nonDefaultNamespace})
if err != nil {
return nil, err
}
// Add to existing map to deduplicate.
for s := range ss {
services[s] = nil // We don't use the tags, so just set to nil
}
}
}

// Return just the service name keys (we don't need the tags)
// https://developer.hashicorp.com/consul/api-docs/catalog#list-services
var serviceNames []string
for s := range services {
serviceNames = append(serviceNames, s)
}
return serviceList, err
return serviceNames, err
}

// processPreparedQueryUpstream processes an upstream in the format:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1954,7 +1954,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) {
{
ID: "mesh-gateway",
Kind: api.ServiceKindMeshGateway,
Service: "mesh-gateway",
Service: consulSvcName,
Port: 80,
Address: "1.2.3.4",
Meta: map[string]string{
Expand Down Expand Up @@ -1985,7 +1985,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) {
{
ID: "mesh-gateway",
Kind: api.ServiceKindMeshGateway,
Service: "mesh-gateway",
Service: consulSvcName,
Port: 80,
Address: "1.2.3.4",
Meta: map[string]string{
Expand Down Expand Up @@ -2016,7 +2016,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) {
{
ID: "terminating-gateway",
Kind: api.ServiceKindTerminatingGateway,
Service: "terminating-gateway",
Service: consulSvcName,
Port: 8443,
Address: "1.2.3.4",
Meta: map[string]string{
Expand All @@ -2037,7 +2037,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) {
{
ID: "terminating-gateway",
Kind: api.ServiceKindTerminatingGateway,
Service: "terminating-gateway",
Service: consulSvcName,
Port: 8443,
Address: "1.2.3.4",
Meta: map[string]string{
Expand Down Expand Up @@ -2089,7 +2089,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) {
{
ID: "ingress-gateway",
Kind: api.ServiceKindIngressGateway,
Service: "ingress-gateway",
Service: consulSvcName,
Port: 80,
Address: "1.2.3.4",
Meta: map[string]string{
Expand Down Expand Up @@ -2175,15 +2175,16 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) {

// Create the endpoints controller.
ep := &Controller{
Client: fakeClient,
Log: logrtest.NewTestLogger(t),
ConsulClientConfig: testClient.Cfg,
ConsulServerConnMgr: testClient.Watcher,
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
EnableConsulNamespaces: true,
Client: fakeClient,
Log: logrtest.NewTestLogger(t),
ConsulClientConfig: testClient.Cfg,
ConsulServerConnMgr: testClient.Watcher,
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
EnableConsulNamespaces: true,
ConsulDestinationNamespace: ts.ConsulNS,
}
if tt.enableACLs {
ep.AuthMethod = test.AuthMethod
Expand Down Expand Up @@ -2216,6 +2217,7 @@ func TestReconcileDeleteGatewayWithNamespaces(t *testing.T) {
}

token, _, err = consulClient.ACL().TokenRead(token.AccessorID, queryOpts)
require.Error(t, err)
require.Contains(t, err.Error(), "ACL not found", token)
}
})
Expand Down
Loading

0 comments on commit 1171d16

Please sign in to comment.