Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't register duplicate services from different k8s namespaces #527

Merged
merged 4 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## UNRELEASED

IMPROVEMENTS:
* Connect: skip service registration when a service with the same name but in a different Kubernetes namespace is found
and Consul namespaces are not enabled. [[GH-527](https://github.com/hashicorp/consul-k8s/pull/527)]

## 0.26.0-beta3 (May 27, 2021)

IMPROVEMENTS:
Expand Down
173 changes: 101 additions & 72 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,77 +147,10 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (

for address, healthStatus := range allAddresses {
if address.TargetRef != nil && address.TargetRef.Kind == "Pod" {
// Get pod associated with this address.
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
if err = r.Client.Get(ctx, objectKey, &pod); err != nil {
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
if err := r.registerServicesAndHealthCheck(ctx, serviceEndpoints, address, healthStatus, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
podHostIP := pod.Status.HostIP

if hasBeenInjected(pod) {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
// Create client for Consul agent local to the pod.
client, err := r.remoteConsulClient(podHostIP, r.consulNamespace(pod.Namespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", podHostIP)
errs = multierror.Append(errs, err)
}

var managedByEndpointsController bool
if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue {
managedByEndpointsController = true
}
// For pods managed by this controller, create and register the service instance.
if managedByEndpointsController {
// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}

// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name,
"id", serviceRegistration.ID, "agentIP", podHostIP)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
errs = multierror.Append(errs, err)
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
errs = multierror.Append(errs, err)
}
}

// Update the service TTL health check for both legacy services and services managed by endpoints
// controller. The proxy health checks are registered separately by endpoints controller and
// lifecycle sidecar for legacy services. Here, we always update the health check for legacy and
// newer services idempotently since the service health check is not added as part of the service
// registration.
reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace)
serviceName := getServiceName(pod, serviceEndpoints)
r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus)
serviceID := getServiceID(pod, serviceEndpoints)
proxyServiceName := getProxyServiceName(pod, serviceEndpoints)
proxyServiceID := getProxyServiceID(pod, serviceEndpoints)
healthCheckID := getConsulHealthCheckID(pod, serviceID)
err = r.upsertHealthCheck(pod, client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, healthStatus)
if err != nil {
r.Log.Error(err, "failed to update health check status for service", "name", serviceName)
errs = multierror.Append(errs, err)
}
}
}
}
}
Expand Down Expand Up @@ -247,6 +180,103 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
).Complete(r)
}

// registerServicesAndHealthCheck creates Consul registrations for the service and proxy and register them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *EndpointsController) registerServicesAndHealthCheck(ctx context.Context, serviceEndpoints corev1.Endpoints, address corev1.EndpointAddress, healthStatus string, endpointAddressMap map[string]bool) error {
// Get pod associated with this address.
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
if err := r.Client.Get(ctx, objectKey, &pod); err != nil {
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
return err
}
podHostIP := pod.Status.HostIP

if hasBeenInjected(pod) {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
// Create client for Consul agent local to the pod.
client, err := r.remoteConsulClient(podHostIP, r.consulNamespace(pod.Namespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", podHostIP)
return err
}

var managedByEndpointsController bool
if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue {
managedByEndpointsController = true
}
// For pods managed by this controller, create and register the service instance.
if managedByEndpointsController {
// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return err
}

// When Consul namespaces are not enabled, we check that the service with the same name but in a different namespace
// is already registered with Consul, and if it is, we skip the registration to avoid service name collisions.
if !r.EnableConsulNamespaces {
services, _, err := client.Catalog().Service(serviceRegistration.Name, "", nil)
if err != nil {
r.Log.Error(err, "failed to get service from the Consul catalog", "name", serviceRegistration.Name)
return err
}
for _, service := range services {
if service.ServiceMeta[MetaKeyKubeNS] != serviceEndpoints.Namespace {
// Log but don't return an error because we don't want to reconcile this endpoints object again.
r.Log.Info("Skipping service registration because a service with the same name "+
"but a different Kubernetes namespace is already registered with Consul",
"name", serviceRegistration.Name,
MetaKeyKubeNS, serviceEndpoints.Namespace,
"existing-k8s-namespace", service.ServiceMeta[MetaKeyKubeNS])
return nil
}
}
}

// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name,
"id", serviceRegistration.ID, "agentIP", podHostIP)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
return err
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
return err
}
}

// Update the service TTL health check for both legacy services and services managed by endpoints
// controller. The proxy health checks are registered separately by endpoints controller and
// lifecycle sidecar for legacy services. Here, we always update the health check for legacy and
// newer services idempotently since the service health check is not added as part of the service
// registration.
reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace)
serviceName := getServiceName(pod, serviceEndpoints)
r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus)
serviceID := getServiceID(pod, serviceEndpoints)
healthCheckID := getConsulHealthCheckID(pod, serviceID)
err = r.upsertHealthCheck(pod, client, serviceID, healthCheckID, healthStatus)
if err != nil {
r.Log.Error(err, "failed to update health check status for service", "name", serviceName)
return err
}
}

return nil
}

// getServiceCheck will return the health check for this pod and service if it exists.
func getServiceCheck(client *api.Client, healthCheckID string) (*api.AgentCheck, error) {
filter := fmt.Sprintf("CheckID == `%s`", healthCheckID)
Expand Down Expand Up @@ -300,7 +330,7 @@ func (r *EndpointsController) updateConsulHealthCheckStatus(client *api.Client,

// upsertHealthCheck checks if the healthcheck exists for the service, and creates it if it doesn't exist, or updates it
// if it does.
func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, status string) error {
func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, healthCheckID, status string) error {
reason := getHealthCheckStatusReason(status, pod.Name, pod.Namespace)
// Retrieve the health check that would exist if the service had one registered for this pod.
serviceCheck, err := getServiceCheck(client, healthCheckID)
Expand Down Expand Up @@ -335,7 +365,6 @@ func getServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
serviceName = serviceNameFromAnnotation
}
return serviceName

}

func getServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
Expand All @@ -354,7 +383,7 @@ func getProxyServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string

// createServiceRegistrations creates the service and proxy service instance registrations with the information from the
// Pod.
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) {
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) {
// If a port is specified, then we determine the value of that port
// and register that port for the host service.
// The handler will always set the port annotation if one is not provided on the pod.
Expand Down
Loading