Skip to content

Commit

Permalink
Fix bug upgrading from non-mesh to mesh
Browse files Browse the repository at this point in the history
When modifying a deployment to add it to the mesh, there will be a
time when the endpoints list has both non-injected and injected
pods. Previously we were short-circuiting in this case which
meant the injected pods never got registered.
  • Loading branch information
lkysow committed Apr 4, 2022
1 parent a713f85 commit b303dbe
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 83 deletions.
5 changes: 0 additions & 5 deletions control-plane/connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,6 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
} else {
// If this endpoints object points to a pod that has injection disabled,
// then we want to ignore it for any further processing and exit early.
r.Log.Info("ignoring because endpoints pods have not been injected", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, nil
}
}
}
Expand Down
161 changes: 83 additions & 78 deletions control-plane/connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package connectinject
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"

Expand Down Expand Up @@ -1368,6 +1365,89 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
},
},
// Test that if a user is updating their deployment from non-mesh to mesh that we
// register the mesh pods.
{
name: "Some endpoints injected, some not.",
consulSvcName: "service-created",
k8sObjects: func() []runtime.Object {
pod1 := createPod("pod1", "1.2.3.4", true, true)
pod2 := createPod("pod2", "2.3.4.5", false, false)

// NOTE: the order of the addresses is important. The non-mesh pod must be first to correctly
// reproduce the bug where we were exiting the loop early if any pod was non-mesh.
endpointWithTwoAddresses := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "service-created",
Namespace: "default",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "2.3.4.5",
NodeName: &nodeName,
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Name: "pod2",
Namespace: "default",
},
},
{
IP: "1.2.3.4",
NodeName: &nodeName,
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Name: "pod1",
Namespace: "default",
},
},
},
},
},
}
return []runtime.Object{pod1, pod2, endpointWithTwoAddresses}
},
initialConsulSvcs: []*api.AgentServiceRegistration{},
expectedNumSvcInstances: 1,
expectedConsulSvcInstances: []*api.CatalogService{
{
ServiceID: "pod1-service-created",
ServiceName: "service-created",
ServiceAddress: "1.2.3.4",
ServicePort: 0,
ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue},
ServiceTags: []string{},
},
},
expectedProxySvcInstances: []*api.CatalogService{
{
ServiceID: "pod1-service-created-sidecar-proxy",
ServiceName: "service-created-sidecar-proxy",
ServiceAddress: "1.2.3.4",
ServicePort: 20000,
ServiceProxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "service-created",
DestinationServiceID: "pod1-service-created",
LocalServiceAddress: "",
LocalServicePort: 0,
},
ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue},
ServiceTags: []string{},
},
},
expectedAgentHealthChecks: []*api.AgentCheck{
{
CheckID: "default/pod1-service-created/kubernetes-health-check",
ServiceName: "service-created",
ServiceID: "pod1-service-created",
Name: "Kubernetes Health Check",
Status: api.HealthPassing,
Output: kubernetesSuccessReasonMsg,
Type: ttl,
},
},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -3298,81 +3378,6 @@ func TestReconcileIgnoresServiceIgnoreLabel(t *testing.T) {
}
}

// Test that when endpoints pods have not been connect-injected (i.e. not in the service mesh)
// we don't make any API calls to Consul.
// This is because we want to avoid any unnecessary calls. Especially if the client agent is unreachable
// and any calls to it will result in an i/o timeout errors, it will
// slow down processing of the events by the endpoints controller making unnecessary calls and waiting for ~30sec.
func TestReconcile_endpointsIgnoredWhenNotInjected(t *testing.T) {
nodeName := "test-node"
namespace := "default"

// Set up the fake Kubernetes client with an endpoint, pod, consul client, and the default namespace.
endpoint := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "not-in-mesh",
Namespace: namespace,
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "1.2.3.4",
NodeName: &nodeName,
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Name: "pod1",
Namespace: namespace,
},
},
},
},
},
}
pod1 := createPod("pod1", "1.2.3.4", false, true)
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, true)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}
ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
k8sObjects := []runtime.Object{endpoint, pod1, fakeClientPod, &ns}
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()

// Create test Consul server.
consul := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "application/json")
if r != nil {
t.Fatalf("should not receive any calls to Consul client")
}
}))
t.Cleanup(consul.Close)

cfg := &api.Config{Address: consul.URL}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
parsedURL, err := url.Parse(consul.URL)
require.NoError(t, err)
consulPort := parsedURL.Port()

// Create the endpoints controller.
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "http",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: namespace,
ConsulClientCfg: cfg,
}

// Run the reconcile process to deregister the service if it was registered before.
namespacedName := types.NamespacedName{Namespace: namespace, Name: "not-in-mesh"}
resp, err := ep.Reconcile(context.Background(), ctrl.Request{NamespacedName: namespacedName})
require.NoError(t, err)
require.False(t, resp.Requeue)
}

func TestFilterAgentPods(t *testing.T) {
t.Parallel()
cases := map[string]struct {
Expand Down

0 comments on commit b303dbe

Please sign in to comment.