Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fully process the endpoints object before returning error #519

Merged
merged 3 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul-k8s/consul"
"github.com/hashicorp/consul-k8s/namespaces"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -91,6 +92,7 @@ type EndpointsController struct {
}

func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var errs error
var serviceEndpoints corev1.Endpoints

if shouldIgnore(req.Namespace, r.DenyK8sNamespacesSet, r.AllowK8sNamespacesSet) {
Expand Down Expand Up @@ -138,7 +140,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
if err = r.Client.Get(ctx, objectKey, &pod); err != nil {
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
return ctrl.Result{}, err
errs = multierror.Append(errs, err)
}
podHostIP := pod.Status.HostIP

Expand All @@ -149,7 +151,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
client, err := r.remoteConsulClient(podHostIP, r.consulNamespace(pod.Namespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", podHostIP)
return ctrl.Result{}, err
errs = multierror.Append(errs, err)
}

var managedByEndpointsController bool
Expand All @@ -162,7 +164,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
errs = multierror.Append(errs, err)
}

// Register the service instance with the local agent.
Expand All @@ -174,15 +176,15 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
return ctrl.Result{}, err
errs = multierror.Append(errs, err)
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
return ctrl.Result{}, err
errs = multierror.Append(errs, err)
}
}

Expand All @@ -201,7 +203,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
err = r.upsertHealthCheck(pod, client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, healthStatus)
if err != nil {
r.Log.Error(err, "failed to update health check status for service", "name", serviceName)
return ctrl.Result{}, err
errs = multierror.Append(errs, err)
}
}
}
Expand All @@ -213,10 +215,10 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
// the registration codepath.
if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to deregister endpoints on all agents", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
errs = multierror.Append(errs, err)
}

return ctrl.Result{}, nil
return ctrl.Result{}, errs
}

func (r *EndpointsController) Logger(name types.NamespacedName) logr.Logger {
Expand Down
133 changes: 132 additions & 1 deletion connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
expectedConsulSvcInstances []*api.CatalogService
expectedProxySvcInstances []*api.CatalogService
expectedAgentHealthChecks []*api.AgentCheck
expErr string
}{
{
name: "Empty endpoints",
Expand Down Expand Up @@ -689,6 +690,132 @@ func TestReconcileCreateEndpoint(t *testing.T) {
},
},
},
{
// This test has 3 addresses, but only 2 are backed by pod resources. This will cause Reconcile to error
// on the invalid address but continue and process the other addresses. We check for error specific to
// pod3 being non-existant at the end, and validate the other 2 addresses have service instances.
name: "Endpoints with multiple addresses but one is invalid",
consulSvcName: "service-created",
k8sObjects: func() []runtime.Object {
pod1 := createPod("pod1", "1.2.3.4", true, true)
pod2 := createPod("pod2", "2.2.3.4", true, true)
endpointWithTwoAddresses := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "service-created",
Namespace: "default",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
// This is an invalid address because pod3 will not exist in k8s.
{
IP: "9.9.9.9",
NodeName: &nodeName,
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Name: "pod3",
Namespace: "default",
},
},
// The next two are valid addresses.
{
IP: "1.2.3.4",
NodeName: &nodeName,
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Name: "pod1",
Namespace: "default",
},
},
{
IP: "2.2.3.4",
NodeName: &nodeName,
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Name: "pod2",
Namespace: "default",
},
},
},
},
},
}
return []runtime.Object{pod1, pod2, endpointWithTwoAddresses}
},
initialConsulSvcs: []*api.AgentServiceRegistration{},
expectedNumSvcInstances: 2,
expectedConsulSvcInstances: []*api.CatalogService{
{
ServiceID: "pod1-service-created",
ServiceName: "service-created",
ServiceAddress: "1.2.3.4",
ServicePort: 0,
ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue},
ServiceTags: []string{},
},
{
ServiceID: "pod2-service-created",
ServiceName: "service-created",
ServiceAddress: "2.2.3.4",
ServicePort: 0,
ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue},
ServiceTags: []string{},
},
},
expectedProxySvcInstances: []*api.CatalogService{
{
ServiceID: "pod1-service-created-sidecar-proxy",
ServiceName: "service-created-sidecar-proxy",
ServiceAddress: "1.2.3.4",
ServicePort: 20000,
ServiceProxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "service-created",
DestinationServiceID: "pod1-service-created",
LocalServiceAddress: "",
LocalServicePort: 0,
TransparentProxy: &api.TransparentProxyConfig{},
},
ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue},
ServiceTags: []string{},
},
{
ServiceID: "pod2-service-created-sidecar-proxy",
ServiceName: "service-created-sidecar-proxy",
ServiceAddress: "2.2.3.4",
ServicePort: 20000,
ServiceProxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "service-created",
DestinationServiceID: "pod2-service-created",
LocalServiceAddress: "",
LocalServicePort: 0,
TransparentProxy: &api.TransparentProxyConfig{},
},
ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue},
ServiceTags: []string{},
},
},
expectedAgentHealthChecks: []*api.AgentCheck{
{
CheckID: "default/pod1-service-created/kubernetes-health-check",
ServiceName: "service-created",
ServiceID: "pod1-service-created",
Name: "Kubernetes Health Check",
Status: api.HealthPassing,
Output: kubernetesSuccessReasonMsg,
Type: ttl,
},
{
CheckID: "default/pod2-service-created/kubernetes-health-check",
ServiceName: "service-created",
ServiceID: "pod2-service-created",
Name: "Kubernetes Health Check",
Status: api.HealthPassing,
Output: kubernetesSuccessReasonMsg,
Type: ttl,
},
},
expErr: "1 error occurred:\n\t* pods \"pod3\" not found\n\n",
},
{
name: "Every configurable field set: port, different Consul service name, meta, tags, upstreams, metrics",
consulSvcName: "different-consul-svc-name",
Expand Down Expand Up @@ -851,7 +978,11 @@ func TestReconcileCreateEndpoint(t *testing.T) {
resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
if tt.expErr != "" {
require.EqualError(t, err, tt.expErr)
} else {
require.NoError(t, err)
}
require.False(t, resp.Requeue)

// After reconciliation, Consul should have the service with the correct number of instances
Expand Down