Skip to content

Commit

Permalink
Don't register duplicate services from different k8s namespaces
Browse files Browse the repository at this point in the history
When Consul namespaces are not enabled and we are processing a service
that already exists in Consul but with a different k8s namespace meta,
we skip service registration to avoid service name collisions.
  • Loading branch information
ishustava committed Jun 1, 2021
1 parent 0bc1f55 commit a8a751a
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 77 deletions.
173 changes: 101 additions & 72 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,77 +147,10 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (

for address, healthStatus := range allAddresses {
if address.TargetRef != nil && address.TargetRef.Kind == "Pod" {
// Get pod associated with this address.
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
if err = r.Client.Get(ctx, objectKey, &pod); err != nil {
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
if err := r.registerServicesAndHealthCheck(ctx, serviceEndpoints, address, healthStatus, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
podHostIP := pod.Status.HostIP

if hasBeenInjected(pod) {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
// Create client for Consul agent local to the pod.
client, err := r.remoteConsulClient(podHostIP, r.consulNamespace(pod.Namespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", podHostIP)
errs = multierror.Append(errs, err)
}

var managedByEndpointsController bool
if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue {
managedByEndpointsController = true
}
// For pods managed by this controller, create and register the service instance.
if managedByEndpointsController {
// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}

// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name,
"id", serviceRegistration.ID, "agentIP", podHostIP)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
errs = multierror.Append(errs, err)
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
errs = multierror.Append(errs, err)
}
}

// Update the service TTL health check for both legacy services and services managed by endpoints
// controller. The proxy health checks are registered separately by endpoints controller and
// lifecycle sidecar for legacy services. Here, we always update the health check for legacy and
// newer services idempotently since the service health check is not added as part of the service
// registration.
reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace)
serviceName := getServiceName(pod, serviceEndpoints)
r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus)
serviceID := getServiceID(pod, serviceEndpoints)
proxyServiceName := getProxyServiceName(pod, serviceEndpoints)
proxyServiceID := getProxyServiceID(pod, serviceEndpoints)
healthCheckID := getConsulHealthCheckID(pod, serviceID)
err = r.upsertHealthCheck(pod, client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, healthStatus)
if err != nil {
r.Log.Error(err, "failed to update health check status for service", "name", serviceName)
errs = multierror.Append(errs, err)
}
}
}
}
}
Expand Down Expand Up @@ -247,6 +180,103 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
).Complete(r)
}

// registerServicesAndHealthCheck create Consul registrations for the service and proxy and register them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *EndpointsController) registerServicesAndHealthCheck(ctx context.Context, serviceEndpoints corev1.Endpoints, address corev1.EndpointAddress, healthStatus string, endpointAddressMap map[string]bool) error {
// Get pod associated with this address.
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
if err := r.Client.Get(ctx, objectKey, &pod); err != nil {
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
return err
}
podHostIP := pod.Status.HostIP

if hasBeenInjected(pod) {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
// Create client for Consul agent local to the pod.
client, err := r.remoteConsulClient(podHostIP, r.consulNamespace(pod.Namespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", podHostIP)
return err
}

var managedByEndpointsController bool
if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue {
managedByEndpointsController = true
}
// For pods managed by this controller, create and register the service instance.
if managedByEndpointsController {
// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return err
}

// When Consul namespaces are not enabled, we check that the service with the same name but in a different namespace
// is already registered with Consul, and if it is, we skip the registration to avoid service name collisions.
if !r.EnableConsulNamespaces {
services, _, err := client.Catalog().Service(serviceRegistration.Name, "", nil)
if err != nil {
r.Log.Error(err, "failed to get service from the Consul catalog", "name", serviceRegistration.Name)
return err
}
for _, service := range services {
if service.ServiceMeta[MetaKeyKubeNS] != serviceEndpoints.Namespace {
// Log but don't return an error because we don't want to reconcile this endpoints object again.
r.Log.Info("Skipping service registration because a service with the same name "+
"but a different Kubernetes namespace is already registered with Consul",
"name", serviceRegistration.Name,
MetaKeyKubeNS, serviceEndpoints.Namespace,
"existing-k8s-namespace", service.ServiceMeta[MetaKeyKubeNS])
return nil
}
}
}

// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name,
"id", serviceRegistration.ID, "agentIP", podHostIP)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
return err
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
return err
}
}

// Update the service TTL health check for both legacy services and services managed by endpoints
// controller. The proxy health checks are registered separately by endpoints controller and
// lifecycle sidecar for legacy services. Here, we always update the health check for legacy and
// newer services idempotently since the service health check is not added as part of the service
// registration.
reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace)
serviceName := getServiceName(pod, serviceEndpoints)
r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus)
serviceID := getServiceID(pod, serviceEndpoints)
healthCheckID := getConsulHealthCheckID(pod, serviceID)
err = r.upsertHealthCheck(pod, client, serviceID, healthCheckID, healthStatus)
if err != nil {
r.Log.Error(err, "failed to update health check status for service", "name", serviceName)
return err
}
}

return nil
}

// getServiceCheck will return the health check for this pod and service if it exists.
func getServiceCheck(client *api.Client, healthCheckID string) (*api.AgentCheck, error) {
filter := fmt.Sprintf("CheckID == `%s`", healthCheckID)
Expand Down Expand Up @@ -300,7 +330,7 @@ func (r *EndpointsController) updateConsulHealthCheckStatus(client *api.Client,

// upsertHealthCheck checks if the healthcheck exists for the service, and creates it if it doesn't exist, or updates it
// if it does.
func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, status string) error {
func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, healthCheckID, status string) error {
reason := getHealthCheckStatusReason(status, pod.Name, pod.Namespace)
// Retrieve the health check that would exist if the service had one registered for this pod.
serviceCheck, err := getServiceCheck(client, healthCheckID)
Expand Down Expand Up @@ -335,7 +365,6 @@ func getServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
serviceName = serviceNameFromAnnotation
}
return serviceName

}

func getServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
Expand All @@ -354,7 +383,7 @@ func getProxyServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string

// createServiceRegistrations creates the service and proxy service instance registrations with the information from the
// Pod.
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) {
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) {
// If a port is specified, then we determine the value of that port
// and register that port for the host service.
// The handler will always set the port annotation if one is not provided on the pod.
Expand Down
106 changes: 101 additions & 5 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func TestProcessUpstreams(t *testing.T) {
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
// Create test consul server
// Create test consul server.
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.NodeName = nodeName
})
Expand Down Expand Up @@ -950,7 +950,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]

// Register service and proxy in consul
// Register service and proxy in consul.
for _, svc := range tt.initialConsulSvcs {
err = consulClient.Agent().ServiceRegister(svc)
require.NoError(t, err)
Expand Down Expand Up @@ -1405,6 +1405,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
Name: "service-updated",
Port: 80,
Address: "1.2.3.4",
Meta: map[string]string{MetaKeyKubeNS: "default"},
Check: &api.AgentServiceCheck{
CheckID: "default/pod1-service-updated/kubernetes-health-check",
Name: "Kubernetes Health Check",
Expand All @@ -1420,6 +1421,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
Name: "service-updated-sidecar-proxy",
Port: 20000,
Address: "1.2.3.4",
Meta: map[string]string{MetaKeyKubeNS: "default"},
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "service-updated",
DestinationServiceID: "pod1-service-updated",
Expand Down Expand Up @@ -1486,6 +1488,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
Name: "service-updated",
Port: 80,
Address: "1.2.3.4",
Meta: map[string]string{MetaKeyKubeNS: "default"},
Check: &api.AgentServiceCheck{
CheckID: "default/pod1-service-updated/kubernetes-health-check",
Name: "Kubernetes Health Check",
Expand All @@ -1501,6 +1504,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
Name: "service-updated-sidecar-proxy",
Port: 20000,
Address: "1.2.3.4",
Meta: map[string]string{MetaKeyKubeNS: "default"},
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "service-updated",
DestinationServiceID: "pod1-service-updated",
Expand Down Expand Up @@ -1567,13 +1571,15 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
Name: "service-updated",
Port: 80,
Address: "1.2.3.4",
Meta: map[string]string{MetaKeyKubeNS: "default"},
},
{
Kind: api.ServiceKindConnectProxy,
ID: "pod1-service-updated-sidecar-proxy",
Name: "service-updated-sidecar-proxy",
Port: 20000,
Address: "1.2.3.4",
Meta: map[string]string{MetaKeyKubeNS: "default"},
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "service-updated",
DestinationServiceID: "pod1-service-updated",
Expand Down Expand Up @@ -1630,7 +1636,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
Name: "different-consul-svc-name",
Port: 80,
Address: "1.2.3.4",
Meta: map[string]string{MetaKeyManagedBy: managedByValue},
Meta: map[string]string{MetaKeyManagedBy: managedByValue, MetaKeyKubeNS: "default"},
},
{
Kind: api.ServiceKindConnectProxy,
Expand Down Expand Up @@ -3051,7 +3057,7 @@ func TestServiceInstancesForK8SServiceNameAndNamespace(t *testing.T) {
}
}

func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *testing.T) {
func TestCreateServiceRegistrations_withTransparentProxy(t *testing.T) {
t.Parallel()

const serviceName = "test-service"
Expand Down Expand Up @@ -3890,7 +3896,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
Log: logrtest.TestLogger{T: t},
}

serviceRegistration, proxyServiceRegistration, err := epCtrl.createServiceRegistrations(*pod, *endpoints, api.HealthPassing)
serviceRegistration, proxyServiceRegistration, err := epCtrl.createServiceRegistrations(*pod, *endpoints)
if c.expErr != "" {
require.EqualError(t, err, c.expErr)
} else {
Expand All @@ -3905,6 +3911,96 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
}
}

func TestRegisterServicesAndHealthCheck_skipsWhenDuplicateServiceFound(t *testing.T) {
t.Parallel()

cases := map[string]struct {
consulServiceMeta map[string]string
}{
"different k8s namespace meta": {
consulServiceMeta: map[string]string{MetaKeyKubeNS: "some-other-ns"},
},
"no k8s namespace meta": {
consulServiceMeta: nil,
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
nodeName := "test-node"
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForServiceIntentions(t)
httpAddr := consul.HTTPAddr
clientConfig := &api.Config{
Address: httpAddr,
}
consulClient, err := api.NewClient(clientConfig)
require.NoError(t, err)
addr := strings.Split(httpAddr, ":")
consulPort := addr[1]

existingService := &api.AgentServiceRegistration{
ID: "test-service",
Name: "test-service",
Port: 1234,
Address: "1.2.3.4",
Meta: c.consulServiceMeta,
}
err = consulClient.Agent().ServiceRegister(existingService)
require.NoError(t, err)
pod := createPod("test-pod", "1.1.1.1", true, true)

endpointsAddress := corev1.EndpointAddress{
IP: "1.2.3.4",
NodeName: &nodeName,
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
},
}
endpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
Namespace: "default",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{endpointsAddress},
},
},
}
ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(ns, pod, endpoints).Build()

ep := &EndpointsController{
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "http",
ConsulClientCfg: clientConfig,
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
Client: fakeClient,
}

err = ep.registerServicesAndHealthCheck(context.Background(), *endpoints, endpointsAddress, api.HealthPassing, make(map[string]bool))

// Check that the service is not registered with Consul.
_, _, err = consulClient.Agent().Service("test-pod-test-service", nil)
require.EqualError(t, err, "Unexpected response code: 404 (unknown service ID: test-pod-test-service)")

_, _, err = consulClient.Agent().Service("test-pod-test-service-sidecar-proxy", nil)
require.EqualError(t, err, "Unexpected response code: 404 (unknown service ID: test-pod-test-service-sidecar-proxy)")
})
}
}

func createPod(name, ip string, inject bool, managedByEndpointsController bool) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit a8a751a

Please sign in to comment.