Skip to content

Commit

Permalink
perf: Fetch services once rather than per-node on deregister
Browse files Browse the repository at this point in the history
Rather than fetching all nodes in a cluster then listing services
per-node, fetch all service instances directly by name.

This should generally reduce the cost of endpoints controller reconciles
(in terms of network calls) from N calls (N=node count) to 2 calls
regardless of K8s cluster size.

This does trade the cost of node list and per-node instance fetching for
a bulk fetch of service instances. However, a bulk fetch was the
previous behavior prior to the introduction of Consul node mirroring in
`consul-k8s`, and in the majority of real-world use cases, should be
cheaper than listing all nodes and fetching services individually per
node in the vast majority of cases.

This change is motivated by customers with larger K8s clusters (node
counts) seeing performance issues with reconciles.
  • Loading branch information
zalimeni committed Dec 7, 2023
1 parent d36e464 commit c6c501d
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -921,59 +921,61 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string)
// has addresses, it will only deregister instances not in the map.
func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {
// Get services matching metadata from Consul
nodesWithSvcs, err := r.serviceInstancesForNodes(apiClient, k8sSvcName, k8sSvcNamespace)
serviceInstances, err := r.serviceInstancesForK8SServiceNameAndNamespace(apiClient, k8sSvcName, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return err
}

var errs error
for _, nodeSvcs := range nodesWithSvcs {
for _, svc := range nodeSvcs.Services {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
var serviceDeregistered bool
if endpointsAddressesMap != nil {
if _, ok := endpointsAddressesMap[svc.Address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: nodeSvcs.Node.Node,
ServiceID: svc.ID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID)
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
}
} else {
r.Log.Info("deregistering service from consul", "svc", svc.ID)
for _, svc := range serviceInstances {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
var serviceDeregistered bool
if endpointsAddressesMap != nil {
address := svc.ServiceAddress
if address == "" {
address = svc.Address
}
if _, ok := endpointsAddressesMap[address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: nodeSvcs.Node.Node,
ServiceID: svc.ID,
Node: svc.Node,
ServiceID: svc.ServiceID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID)
r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID)
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
}
} else {
r.Log.Info("deregistering service from consul", "svc", svc.ID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ID)
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}
}

if r.AuthMethod != "" && serviceDeregistered {
r.Log.Info("reconciling ACL tokens for service", "svc", svc.Service)
err := r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.Meta[constants.MetaKeyPodName], svc.Meta[constants.MetaKeyPodUID])
if err != nil {
r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", svc.Service)
errs = multierror.Append(errs, err)
}
if r.AuthMethod != "" && serviceDeregistered {
r.Log.Info("reconciling ACL tokens for service", "svc", svc.ServiceName)
err := r.deleteACLTokensForServiceInstance(apiClient, svc, k8sSvcNamespace, svc.ServiceMeta[constants.MetaKeyPodName], svc.ServiceMeta[constants.MetaKeyPodUID])
if err != nil {
r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", svc.ServiceName)
errs = multierror.Append(errs, err)
}
}
}
Expand All @@ -985,7 +987,7 @@ func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvc
// deleteACLTokensForServiceInstance finds the ACL tokens that belongs to the service instance and deletes it from Consul.
// It will only check for ACL tokens that have been created with the auth method this controller
// has been configured with and will only delete tokens for the provided podName and podUID.
func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, svc *api.AgentService, k8sNS, podName, podUID string) error {
func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, svc *api.CatalogService, k8sNS, podName, podUID string) error {
// Skip if podName is empty.
if podName == "" {
return nil
Expand All @@ -998,7 +1000,7 @@ func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, sv
// matches as well.
tokens, _, err := apiClient.ACL().TokenListFiltered(
api.ACLTokenFilterOptions{
ServiceName: svc.Service,
ServiceName: svc.ServiceName,
},
&api.QueryOptions{
Namespace: svc.Namespace,
Expand All @@ -1013,7 +1015,7 @@ func (r *Controller) deleteACLTokensForServiceInstance(apiClient *api.Client, sv
// * have a single service identity whose service name is the same as 'svc.Service'
if token.AuthMethod == r.AuthMethod &&
len(token.ServiceIdentities) == 1 &&
token.ServiceIdentities[0].ServiceName == svc.Service {
token.ServiceIdentities[0].ServiceName == svc.ServiceName {
tokenMeta, err := getTokenMetaFromDescription(token.Description)
if err != nil {
return fmt.Errorf("failed to parse token metadata: %s", err)
Expand Down Expand Up @@ -1111,49 +1113,62 @@ func getTokenMetaFromDescription(description string) (map[string]string, error)
return tokenMeta, nil
}

func (r *Controller) serviceInstancesForNodes(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogNodeServiceList, error) {
var serviceList []*api.CatalogNodeServiceList
func (r *Controller) serviceInstancesForK8SServiceNameAndNamespace(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]*api.CatalogService, error) {
var instances []*api.CatalogService
var errs error

// The nodelist may have changed between this point and when the event was raised
// For example, if a pod is evicted because a node has been deleted, there is no guarantee that that node will show up here
// query consul catalog for a list of nodes supporting this service
// quite a lot of results as synthetic nodes are never deregistered.
var nodes []*api.Node
filter := fmt.Sprintf(`Meta[%q] == %q `, "synthetic-node", "true")
nodes, _, err := apiClient.Catalog().Nodes(&api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace})
// Get the list of services that have the provided k8sServiceName and k8sServiceNamespace in their metadata.
// This is necessary so that we can then list the service instances for the correct Consul service
// name(s), which may not match the current effective service name for new registrations.
services, err := r.servicesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace)
if err != nil {
r.Log.Error(err, "failed to get catalog services", "name", k8sServiceName)
return nil, err
}

var errs error
for _, node := range nodes {
var nodeServices *api.CatalogNodeServiceList
nodeServices, err := r.serviceInstancesForK8SServiceNameAndNamespace(apiClient, k8sServiceName, k8sServiceNamespace, node.Node)
// query consul catalog for a list of service instances matching the given service names
filter := fmt.Sprintf(`NodeMeta[%q] == %q `, "synthetic-node", "true")
for _, service := range services {
var is []*api.CatalogService
if r.EnableConsulNamespaces {
is, _, err = apiClient.Catalog().Service(service, "", &api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace})
} else {
is, _, err = apiClient.Catalog().Service(service, "", &api.QueryOptions{Filter: filter})
}
if err != nil {
errs = multierror.Append(errs, err)
} else {
serviceList = append(serviceList, nodeServices)
for _, i := range is {
instances = append(instances, i)
}
}
}

return serviceList, errs
return instances, errs
}

// serviceInstancesForK8SServiceNameAndNamespace calls Consul's ServicesWithFilter to get the list
// of services instances that have the provided k8sServiceName and k8sServiceNamespace in their metadata.
func (r *Controller) serviceInstancesForK8SServiceNameAndNamespace(apiClient *api.Client, k8sServiceName, k8sServiceNamespace, nodeName string) (*api.CatalogNodeServiceList, error) {
// servicesForK8SServiceNameAndNamespace calls Consul's Services to get the list
// of services that have the provided k8sServiceName and k8sServiceNamespace in their metadata.
func (r *Controller) servicesForK8SServiceNameAndNamespace(apiClient *api.Client, k8sServiceName, k8sServiceNamespace string) ([]string, error) {
var (
serviceList *api.CatalogNodeServiceList
err error
services map[string][]string
err error
)
filter := fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q and Meta[%q] == %q`,
filter := fmt.Sprintf(`ServiceMeta[%q] == %q and ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`,
metaKeyKubeServiceName, k8sServiceName, constants.MetaKeyKubeNS, k8sServiceNamespace, metaKeyManagedBy, constants.ManagedByValue)
if r.EnableConsulNamespaces {
serviceList, _, err = apiClient.Catalog().NodeServiceList(nodeName, &api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace})
services, _, err = apiClient.Catalog().Services(&api.QueryOptions{Filter: filter, Namespace: namespaces.WildcardNamespace})
} else {
serviceList, _, err = apiClient.Catalog().NodeServiceList(nodeName, &api.QueryOptions{Filter: filter})
services, _, err = apiClient.Catalog().Services(&api.QueryOptions{Filter: filter})
}

// Return just the service name keys (we don't need the tags)
// https://developer.hashicorp.com/consul/api-docs/catalog#list-services
var serviceNames []string
for s := range services {
serviceNames = append(serviceNames, s)
}
return serviceList, err
return serviceNames, err
}

// processPreparedQueryUpstream processes an upstream in the format:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4550,7 +4550,7 @@ func TestServiceInstancesForK8SServiceNameAndNamespace(t *testing.T) {
name string
k8sServiceNameMeta string
k8sNamespaceMeta string
expected []*api.AgentService
expected []*api.CatalogService
}{
{
"no k8s service name or namespace meta",
Expand All @@ -4574,22 +4574,21 @@ func TestServiceInstancesForK8SServiceNameAndNamespace(t *testing.T) {
"both k8s service name and namespace set",
k8sSvc,
k8sNS,
[]*api.AgentService{
[]*api.CatalogService{
{
ID: "foo1",
Service: "foo",
Meta: map[string]string{"k8s-service-name": k8sSvc, "k8s-namespace": k8sNS},
ID: "foo1",
ServiceName: "foo",
ServiceMeta: map[string]string{"k8s-service-name": k8sSvc, "k8s-namespace": k8sNS},
},
{
Kind: api.ServiceKindConnectProxy,
ID: "foo1-proxy",
Service: "foo-sidecar-proxy",
Port: 20000,
Proxy: &api.AgentServiceConnectProxyConfig{
ID: "foo1-proxy",
ServiceName: "foo-sidecar-proxy",
ServicePort: 20000,
ServiceProxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo1",
},
Meta: map[string]string{"k8s-service-name": k8sSvc, "k8s-namespace": k8sNS},
ServiceMeta: map[string]string{"k8s-service-name": k8sSvc, "k8s-namespace": k8sNS},
},
},
},
Expand Down Expand Up @@ -4658,14 +4657,14 @@ func TestServiceInstancesForK8SServiceNameAndNamespace(t *testing.T) {
}
ep := Controller{}

svcs, err := ep.serviceInstancesForK8SServiceNameAndNamespace(consulClient, k8sSvc, k8sNS, consulNodeName)
svcs, err := ep.serviceInstancesForK8SServiceNameAndNamespace(consulClient, k8sSvc, k8sNS)
require.NoError(t, err)
if len(svcs.Services) > 0 {
if len(svcs) > 0 {
require.Len(t, svcs, 2)
require.NotNil(t, c.expected[0], svcs.Services[0])
require.Equal(t, c.expected[0].Service, svcs.Services[0].Service)
require.NotNil(t, c.expected[1], svcs.Services[1])
require.Equal(t, c.expected[1].Service, svcs.Services[1].Service)
require.NotNil(t, svcs[0], c.expected[0])
require.Equal(t, c.expected[0].ServiceName, svcs[0].ServiceName)
require.NotNil(t, svcs[1], c.expected[1])
require.Equal(t, c.expected[1].ServiceName, svcs[1].ServiceName)
}
})
}
Expand Down

0 comments on commit c6c501d

Please sign in to comment.