diff --git a/connect-inject/consul_sidecar.go b/connect-inject/consul_sidecar.go index 04679eed5d..838d706f5f 100644 --- a/connect-inject/consul_sidecar.go +++ b/connect-inject/consul_sidecar.go @@ -7,7 +7,7 @@ import ( corev1 "k8s.io/api/core/v1" ) -func (h *Handler) consulSidecar(pod *corev1.Pod) (corev1.Container, error) { +func (h *Handler) consulSidecar(pod corev1.Pod) (corev1.Container, error) { command := []string{ "consul-k8s", "consul-sidecar", diff --git a/connect-inject/consul_sidecar_test.go b/connect-inject/consul_sidecar_test.go index 4ce7255869..6aca7942ce 100644 --- a/connect-inject/consul_sidecar_test.go +++ b/connect-inject/consul_sidecar_test.go @@ -34,7 +34,7 @@ func TestConsulSidecar_Default(t *testing.T) { ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", ConsulSidecarResources: consulSidecarResources, } - container, err := handler.consulSidecar(&corev1.Pod{ + container, err := handler.consulSidecar(corev1.Pod{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ { @@ -84,7 +84,7 @@ func TestConsulSidecar_AuthMethod(t *testing.T) { AuthMethod: authMethod, ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", } - container, err := handler.consulSidecar(&corev1.Pod{ + container, err := handler.consulSidecar(corev1.Pod{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ { @@ -113,7 +113,7 @@ func TestConsulSidecar_SyncPeriodAnnotation(t *testing.T) { Log: hclog.Default().Named("handler"), ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", } - container, err := handler.consulSidecar(&corev1.Pod{ + container, err := handler.consulSidecar(corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ "consul.hashicorp.com/connect-sync-period": "55s", @@ -141,7 +141,7 @@ func TestConsulSidecar_TLS(t *testing.T) { ConsulCACert: "consul-ca-cert", ConsulSidecarResources: consulSidecarResources, } - container, err := handler.consulSidecar(&corev1.Pod{ + container, err := handler.consulSidecar(corev1.Pod{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ { @@ -194,7 +194,7 @@ func TestConsulSidecar_MetricsFlags(t *testing.T) { DefaultEnableMetrics: true, DefaultEnableMetricsMerging: true, } - container, err := handler.consulSidecar(&corev1.Pod{ + container, err := handler.consulSidecar(corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ annotationMergedMetricsPort: "20100", diff --git a/connect-inject/container_env.go b/connect-inject/container_env.go index a97c9196d9..f94e3834ee 100644 --- a/connect-inject/container_env.go +++ b/connect-inject/container_env.go @@ -8,7 +8,7 @@ import ( corev1 "k8s.io/api/core/v1" ) -func (h *Handler) containerEnvVars(pod *corev1.Pod) []corev1.EnvVar { +func (h *Handler) containerEnvVars(pod corev1.Pod) []corev1.EnvVar { raw, ok := pod.Annotations[annotationUpstreams] if !ok || raw == "" { return []corev1.EnvVar{} diff --git a/connect-inject/container_env_test.go b/connect-inject/container_env_test.go index 5cc2a6114f..969b28e166 100644 --- a/connect-inject/container_env_test.go +++ b/connect-inject/container_env_test.go @@ -29,7 +29,7 @@ func TestContainerEnvVars(t *testing.T) { require := require.New(t) var h Handler - envVars := h.containerEnvVars(&corev1.Pod{ + envVars := h.containerEnvVars(corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ annotationService: "foo", diff --git a/connect-inject/container_init.go b/connect-inject/container_init.go index ae9a627348..5029385793 100644 --- a/connect-inject/container_init.go +++ b/connect-inject/container_init.go @@ -85,7 +85,7 @@ func (h *Handler) containerInitCopyContainer() corev1.Container { // containerInit returns the init container spec for registering the Consul // service, setting up the Envoy bootstrap, etc. -func (h *Handler) containerInit(pod *corev1.Pod, k8sNamespace string) (corev1.Container, error) { +func (h *Handler) containerInit(pod corev1.Pod, k8sNamespace string) (corev1.Container, error) { data := initContainerCommandData{ ServiceName: pod.Annotations[annotationService], ProxyServiceName: fmt.Sprintf("%s-sidecar-proxy", pod.Annotations[annotationService]), diff --git a/connect-inject/container_init_test.go b/connect-inject/container_init_test.go index e3b99796f6..91aa4d97ff 100644 --- a/connect-inject/container_init_test.go +++ b/connect-inject/container_init_test.go @@ -672,7 +672,7 @@ services { h := Handler{ ConsulClient: consulClient, } - container, err := h.containerInit(tt.Pod(minimal()), k8sNamespace) + container, err := h.containerInit(*tt.Pod(minimal()), k8sNamespace) require.NoError(err) actual := strings.Join(container.Command, " ") require.Contains(actual, tt.Cmd) @@ -1103,7 +1103,7 @@ EOF require.True(written) h.ConsulClient = consulClient - container, err := h.containerInit(tt.Pod(minimal()), k8sNamespace) + container, err := h.containerInit(*tt.Pod(minimal()), k8sNamespace) require.NoError(err) actual := strings.Join(container.Command, " ") require.Contains(actual, tt.Cmd) @@ -1142,7 +1142,7 @@ func TestHandlerContainerInit_authMethod(t *testing.T) { ServiceAccountName: "foo", }, } - container, err := h.containerInit(pod, k8sNamespace) + container, err := h.containerInit(*pod, k8sNamespace) require.NoError(err) actual := strings.Join(container.Command, " ") require.Contains(actual, ` @@ -1183,7 +1183,7 @@ func TestHandlerContainerInit_WithTLS(t *testing.T) { }, }, } - container, err := h.containerInit(pod, k8sNamespace) + container, err := h.containerInit(*pod, k8sNamespace) require.NoError(err) actual := strings.Join(container.Command, " ") require.Contains(actual, ` @@ -1227,7 +1227,7 @@ func TestHandlerContainerInit_Resources(t *testing.T) { }, }, } - container, err := h.containerInit(pod, k8sNamespace) + container, err := h.containerInit(*pod, k8sNamespace) require.NoError(err) require.Equal(corev1.ResourceRequirements{ Limits: corev1.ResourceList{ @@ -1262,7 +1262,7 @@ func TestHandlerContainerInit_MismatchedServiceNameServiceAccountNameWithACLsEna }, } - _, err := h.containerInit(pod, k8sNamespace) + _, err := h.containerInit(*pod, k8sNamespace) require.EqualError(err, `serviceAccountName "notServiceName" does not match service name "foo"`) } @@ -1285,7 +1285,7 @@ func TestHandlerContainerInit_MismatchedServiceNameServiceAccountNameWithACLsDis }, } - _, err := h.containerInit(pod, k8sNamespace) + _, err := h.containerInit(*pod, k8sNamespace) require.NoError(err) } @@ -1410,7 +1410,7 @@ func TestHandlerContainerInit_MeshGatewayModeErrors(t *testing.T) { }, }, } - _, err = h.containerInit(pod, k8sNamespace) + _, err = h.containerInit(*pod, k8sNamespace) if c.ExpError == "" { require.NoError(err) } else { diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 8f3f1c864f..3e4b28ea20 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -146,7 +146,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service // and register that port for the host service. var servicePort int if raw, ok := pod.Annotations[annotationPort]; ok && raw != "" { - if port, _ := portValue(&pod, raw); port > 0 { + if port, _ := portValue(pod, raw); port > 0 { servicePort = int(port) } } @@ -208,7 +208,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service proxyConfig.LocalServicePort = servicePort } - upstreams, err := r.processUpstreams(&pod) + upstreams, err := r.processUpstreams(pod) if err != nil { return nil, nil, err } @@ -316,7 +316,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam // processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream // objects. -func (r *EndpointsController) processUpstreams(pod *corev1.Pod) ([]api.Upstream, error) { +func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, error) { var upstreams []api.Upstream if raw, ok := pod.Annotations[annotationUpstreams]; ok && raw != "" { for _, raw := range strings.Split(raw, ",") { diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index 44a8a09177..ca319fbd57 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -226,7 +226,7 @@ func TestProcessUpstreams(t *testing.T) { DenyK8sNamespacesSet: mapset.NewSetWith(), } - upstreams, err := ep.processUpstreams(tt.pod()) + upstreams, err := ep.processUpstreams(*tt.pod()) if tt.expErr != "" { require.EqualError(t, err, tt.expErr) } else { diff --git a/connect-inject/envoy_sidecar.go b/connect-inject/envoy_sidecar.go index f251d9bcef..bac7e7ef1a 100644 --- a/connect-inject/envoy_sidecar.go +++ b/connect-inject/envoy_sidecar.go @@ -17,7 +17,7 @@ type sidecarContainerCommandData struct { ConsulNamespace string } -func (h *Handler) envoySidecar(pod *corev1.Pod, k8sNamespace string) (corev1.Container, error) { +func (h *Handler) envoySidecar(pod corev1.Pod, k8sNamespace string) (corev1.Container, error) { templateData := sidecarContainerCommandData{ AuthMethod: h.AuthMethod, ConsulNamespace: h.consulNamespace(k8sNamespace), @@ -91,7 +91,7 @@ func (h *Handler) envoySidecar(pod *corev1.Pod, k8sNamespace string) (corev1.Con } return container, nil } -func (h *Handler) getContainerSidecarCommand(pod *corev1.Pod) ([]string, error) { +func (h *Handler) getContainerSidecarCommand(pod corev1.Pod) ([]string, error) { cmd := []string{ "envoy", "--config-path", "/consul/connect-inject/envoy-bootstrap.yaml", @@ -124,7 +124,7 @@ func (h *Handler) getContainerSidecarCommand(pod *corev1.Pod) ([]string, error) return cmd, nil } -func (h *Handler) envoySidecarResources(pod *corev1.Pod) (corev1.ResourceRequirements, error) { +func (h *Handler) envoySidecarResources(pod corev1.Pod) (corev1.ResourceRequirements, error) { resources := corev1.ResourceRequirements{ Limits: corev1.ResourceList{}, Requests: corev1.ResourceList{}, diff --git a/connect-inject/envoy_sidecar_test.go b/connect-inject/envoy_sidecar_test.go index e2a681dd2e..dbfdaae43e 100644 --- a/connect-inject/envoy_sidecar_test.go +++ b/connect-inject/envoy_sidecar_test.go @@ -13,7 +13,7 @@ import ( func TestHandlerEnvoySidecar(t *testing.T) { require := require.New(t) h := Handler{} - pod := &corev1.Pod{ + pod := corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ annotationService: "foo", @@ -144,7 +144,7 @@ func TestHandlerEnvoySidecar_EnvoyExtraArgs(t *testing.T) { EnvoyExtraArgs: tc.envoyExtraArgs, } - c, err := h.envoySidecar(tc.pod, k8sNamespace) + c, err := h.envoySidecar(*tc.pod, k8sNamespace) require.NoError(t, err) require.Equal(t, tc.expectedContainerCommand, c.Command) }) @@ -158,7 +158,7 @@ func TestHandlerEnvoySidecar_AuthMethod(t *testing.T) { h := Handler{ AuthMethod: "test-auth-method", } - pod := &corev1.Pod{ + pod := corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ annotationService: "foo", @@ -192,7 +192,7 @@ func TestHandlerEnvoySidecar_WithTLS(t *testing.T) { h := Handler{ ConsulCACert: "consul-ca-cert", } - pod := &corev1.Pod{ + pod := corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ annotationService: "foo", @@ -237,7 +237,7 @@ func TestHandlerEnvoySidecar_Namespaces(t *testing.T) { EnableNamespaces: true, ConsulDestinationNamespace: k8sNamespace, } - pod := &corev1.Pod{ + pod := corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ annotationService: "foo", @@ -268,7 +268,7 @@ func TestHandlerEnvoySidecar_NamespacesAndAuthMethod(t *testing.T) { ConsulDestinationNamespace: k8sNamespace, AuthMethod: "test-auth-method", } - pod := &corev1.Pod{ + pod := corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ annotationService: "foo", @@ -449,7 +449,7 @@ func TestHandlerEnvoySidecar_Resources(t *testing.T) { for name, c := range cases { t.Run(name, func(tt *testing.T) { require := require.New(tt) - pod := &corev1.Pod{ + pod := corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: c.annotations, }, diff --git a/connect-inject/handler.go b/connect-inject/handler.go index ca02451059..cb49eccf75 100644 --- a/connect-inject/handler.go +++ b/connect-inject/handler.go @@ -1,10 +1,10 @@ package connectinject import ( + "context" "encoding/json" "errors" "fmt" - "io/ioutil" "net/http" "strconv" @@ -12,13 +12,14 @@ import ( "github.com/hashicorp/consul-k8s/namespaces" "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" - "github.com/mattbaird/jsonpatch" - "k8s.io/api/admission/v1beta1" + "gomodules.xyz/jsonpatch/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + _ "k8s.io/client-go/plugin/pkg/client/auth" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) const ( @@ -215,238 +216,128 @@ type Handler struct { // Log Log hclog.Logger + + decoder *admission.Decoder } -// Handle is the http.HandlerFunc implementation that actually handles the +// Handle is the admission.Handler implementation that actually handles the // webhook request for admission control. This should be registered or -// served via an HTTP server. -func (h *Handler) Handle(w http.ResponseWriter, r *http.Request) { - h.Log.Info("Request received", "Method", r.Method, "URL", r.URL) - - if ct := r.Header.Get("Content-Type"); ct != "application/json" { - msg := fmt.Sprintf("Invalid content-type: %q", ct) - http.Error(w, msg, http.StatusBadRequest) - h.Log.Error("Error on request", "err", msg, "Code", http.StatusBadRequest) - return - } - - var body []byte - if r.Body != nil { - var err error - if body, err = ioutil.ReadAll(r.Body); err != nil { - msg := fmt.Sprintf("Error reading request body: %s", err) - http.Error(w, msg, http.StatusBadRequest) - h.Log.Error("Error on request", "err", msg, "Code", http.StatusBadRequest) - return - } - } - if len(body) == 0 { - msg := "Empty request body" - http.Error(w, msg, http.StatusBadRequest) - h.Log.Error("Error on request", "err", msg, "Code", http.StatusBadRequest) - return - } - - var admReq v1beta1.AdmissionReview - var admResp v1beta1.AdmissionReview - if _, _, err := deserializer.Decode(body, nil, &admReq); err != nil { - h.Log.Error("Could not decode admission request", "err", err) - admResp.Response = admissionError(err) - } else { - admResp.Response = h.Mutate(admReq.Request) - } - - resp, err := json.Marshal(&admResp) - if err != nil { - msg := fmt.Sprintf("Error marshalling admission response: %s", err) - http.Error(w, msg, http.StatusInternalServerError) - h.Log.Error("Error on request", "err", msg, "Code", http.StatusInternalServerError) - return - } - - if _, err := w.Write(resp); err != nil { - h.Log.Error("Error writing response", "err", err) - } -} +// served via the controller runtime manager. +func (h *Handler) Handle(_ context.Context, req admission.Request) admission.Response { + var pod corev1.Pod -// Mutate takes an admission request and performs mutation if necessary, -// returning the final API response. -func (h *Handler) Mutate(req *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse { // Decode the pod from the request - var pod corev1.Pod - if err := json.Unmarshal(req.Object.Raw, &pod); err != nil { + if err := h.decoder.Decode(req, &pod); err != nil { h.Log.Error("Could not unmarshal request to pod", "err", err) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Could not unmarshal request to pod: %s", err), - }, - } + return admission.Errored(http.StatusBadRequest, err) } - // Build the basic response - resp := &v1beta1.AdmissionResponse{ - Allowed: true, - UID: req.UID, + // Marshall the contents of the pod that was received. This is compared with the + // marshalled contents of the pod after it has been updated to create the jsonpatch. + origPodJson, err := json.Marshal(pod) + if err != nil { + return admission.Errored(http.StatusBadRequest, err) } - // Accumulate any patches here - var patches []jsonpatch.JsonPatchOperation - if err := h.validatePod(pod); err != nil { h.Log.Error("Error validating pod", "err", err, "Request Name", req.Name) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Error validating pod: %s", err), - }, - } + return admission.Errored(http.StatusBadRequest, err) } // Setup the default annotation values that are used for the container. // This MUST be done before shouldInject is called since that function // uses these annotations. - if err := h.defaultAnnotations(&pod, &patches); err != nil { + if err := h.defaultAnnotations(&pod); err != nil { h.Log.Error("Error creating default annotations", "err", err, "Request Name", req.Name) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Error creating default annotations: %s", err), - }, - } + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error creating default annotations: %s", err)) } // Check if we should inject, for example we don't inject in the // system namespaces. - if shouldInject, err := h.shouldInject(&pod, req.Namespace); err != nil { + if shouldInject, err := h.shouldInject(pod, req.Namespace); err != nil { h.Log.Error("Error checking if should inject", "err", err, "Request Name", req.Name) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Error checking if should inject: %s", err), - }, - } + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error checking if should inject: %s", err)) } else if !shouldInject { - return resp + return admission.Allowed(fmt.Sprintf("%s %s does not require injection", pod.Kind, pod.Name)) } // Add our volume that will be shared by the init container and // the sidecar for passing data in the pod. - patches = append(patches, addVolume( - pod.Spec.Volumes, - []corev1.Volume{h.containerVolume()}, - "/spec/volumes")...) + pod.Spec.Volumes = append(pod.Spec.Volumes, h.containerVolume()) // Add the upstream services as environment variables for easy // service discovery. - for i, container := range pod.Spec.InitContainers { - patches = append(patches, addEnvVar( - container.Env, - h.containerEnvVars(&pod), - fmt.Sprintf("/spec/initContainers/%d/env", i))...) + containerEnvVars := h.containerEnvVars(pod) + for _, container := range pod.Spec.InitContainers { + container.Env = append(container.Env, containerEnvVars...) } - for i, container := range pod.Spec.Containers { - patches = append(patches, addEnvVar( - container.Env, - h.containerEnvVars(&pod), - fmt.Sprintf("/spec/containers/%d/env", i))...) + + for _, container := range pod.Spec.Containers { + container.Env = append(container.Env, containerEnvVars...) } - // Add the init container which copies the consul binary. + // TODO: rename both of these initcontainers appropriately + // Add the consul-init container initCopyContainer := h.containerInitCopyContainer() + pod.Spec.InitContainers = append(pod.Spec.InitContainers, initCopyContainer) // Add the init container that registers the service and sets up // the Envoy configuration. - initContainer, err := h.containerInit(&pod, req.Namespace) + initContainer, err := h.containerInit(pod, req.Namespace) if err != nil { h.Log.Error("Error configuring injection init container", "err", err, "Request Name", req.Name) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Error configuring injection init container: %s", err), - }, - } + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring injection init container: %s", err)) } - patches = append(patches, addContainer( - pod.Spec.InitContainers, - []corev1.Container{initCopyContainer, initContainer}, - "/spec/initContainers")...) + pod.Spec.InitContainers = append(pod.Spec.InitContainers, initContainer) // Add the Envoy and Consul sidecars. - esContainer, err := h.envoySidecar(&pod, req.Namespace) + esContainer, err := h.envoySidecar(pod, req.Namespace) if err != nil { h.Log.Error("Error configuring injection sidecar container", "err", err, "Request Name", req.Name) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Error configuring injection sidecar container: %s", err), - }, - } + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring injection sidecar container: %s", err)) } - connectContainer, err := h.consulSidecar(&pod) + pod.Spec.Containers = append(pod.Spec.Containers, esContainer) + + connectContainer, err := h.consulSidecar(pod) if err != nil { h.Log.Error("Error configuring consul sidecar container", "err", err, "Request Name", req.Name) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Error configuring consul sidecar container: %s", err), - }, - } + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring consul sidecar container: %s", err)) } - patches = append(patches, addContainer( - pod.Spec.Containers, - []corev1.Container{esContainer, connectContainer}, - "/spec/containers")...) + pod.Spec.Containers = append(pod.Spec.Containers, connectContainer) - // Add annotations so that we know we're injected - patches = append(patches, updateAnnotation( - pod.Annotations, - map[string]string{ - annotationStatus: injected, - })...) + // pod.Annotations has already been initialized by h.defaultAnnotations() + // and does not need to be checked for being a nil value. + pod.Annotations[annotationStatus] = injected - // Add annotations for metrics - promAnnotations, err := h.prometheusAnnotations(&pod) + // Add annotations for metrics. + err = h.prometheusAnnotations(&pod) if err != nil { h.Log.Error("Error configuring prometheus annotations", "err", err, "Request Name", req.Name) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Error configuring prometheus annotations: %s", err), - }, - } - } - if promAnnotations != nil { - patches = append(patches, updateAnnotation( - pod.Annotations, - promAnnotations)...) + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring prometheus annotations: %s", err)) } - // Add Pod label for health checks - patches = append(patches, updateLabels( - pod.Labels, - map[string]string{ - labelInject: injected, - })...) + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + pod.Labels[labelInject] = injected // Consul-ENT only: Add the Consul destination namespace as an annotation to the pod. if h.EnableNamespaces { - patches = append(patches, updateAnnotation( - pod.Annotations, - map[string]string{ - annotationConsulNamespace: h.consulNamespace(req.Namespace), - })...) - } - - // Generate the patch - var patch []byte - if len(patches) > 0 { - var err error - patch, err = json.Marshal(patches) - if err != nil { - h.Log.Error("Could not marshal patches", "err", err, "Request Name", req.Name) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Could not marshal patches: %s", err), - }, - } - } + pod.Annotations[annotationConsulNamespace] = h.consulNamespace(req.Namespace) + } + + // Marshall the pod into JSON after it has the desired envs, annotations, labels, + // sidecars and initContainers appended to it. + updatedPodJson, err := json.Marshal(pod) + if err != nil { + return admission.Errored(http.StatusBadRequest, err) + } - resp.Patch = patch - patchType := v1beta1.PatchTypeJSONPatch - resp.PatchType = &patchType + // Create a patches based on the Pod that was received by the handler + // and the desired Pod spec. + patches, err := jsonpatch.CreatePatch(origPodJson, updatedPodJson) + if err != nil { + return admission.Errored(http.StatusBadRequest, err) } // Check and potentially create Consul resources. This is done after @@ -456,18 +347,16 @@ func (h *Handler) Mutate(req *v1beta1.AdmissionRequest) *v1beta1.AdmissionRespon if _, err := namespaces.EnsureExists(h.ConsulClient, h.consulNamespace(req.Namespace), h.CrossNamespaceACLPolicy); err != nil { h.Log.Error("Error checking or creating namespace", "err", err, "Namespace", h.consulNamespace(req.Namespace), "Request Name", req.Name) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: fmt.Sprintf("Error checking or creating namespace: %s", err), - }, - } + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error checking or creating namespace: %s", err)) } } - return resp + // Return a Patched response along with the patches we intend on applying to the + // Pod received by the handler. + return admission.Patched(fmt.Sprintf("valid %s request", pod.Kind), patches...) } -func (h *Handler) shouldInject(pod *corev1.Pod, namespace string) (bool, error) { +func (h *Handler) shouldInject(pod corev1.Pod, namespace string) (bool, error) { // Don't inject in the Kubernetes system namespaces if kubeSystemNamespaces.Contains(namespace) { return false, nil @@ -506,22 +395,16 @@ func (h *Handler) shouldInject(pod *corev1.Pod, namespace string) (bool, error) return !h.RequireAnnotation, nil } -func (h *Handler) defaultAnnotations(pod *corev1.Pod, patches *[]jsonpatch.JsonPatchOperation) error { - if pod.ObjectMeta.Annotations == nil { - pod.ObjectMeta.Annotations = make(map[string]string) +func (h *Handler) defaultAnnotations(pod *corev1.Pod) error { + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) } // Default service name is the name of the first container. if _, ok := pod.ObjectMeta.Annotations[annotationService]; !ok { if cs := pod.Spec.Containers; len(cs) > 0 { - // Create the patch for this first, so that the Annotation - // object will be created if necessary - *patches = append(*patches, updateAnnotation( - pod.Annotations, - map[string]string{annotationService: cs[0].Name})...) - // Set the annotation for checking in shouldInject - pod.ObjectMeta.Annotations[annotationService] = cs[0].Name + pod.Annotations[annotationService] = cs[0].Name } } @@ -530,21 +413,9 @@ func (h *Handler) defaultAnnotations(pod *corev1.Pod, patches *[]jsonpatch.JsonP if cs := pod.Spec.Containers; len(cs) > 0 { if ps := cs[0].Ports; len(ps) > 0 { if ps[0].Name != "" { - // Create the patch for this first, so that the Annotation - // object will be created if necessary - *patches = append(*patches, updateAnnotation( - pod.Annotations, - map[string]string{annotationPort: ps[0].Name})...) - - pod.ObjectMeta.Annotations[annotationPort] = ps[0].Name + pod.Annotations[annotationPort] = ps[0].Name } else { - // Create the patch for this first, so that the Annotation - // object will be created if necessary - *patches = append(*patches, updateAnnotation( - pod.Annotations, - map[string]string{annotationPort: strconv.Itoa(int(ps[0].ContainerPort))})...) - - pod.ObjectMeta.Annotations[annotationPort] = strconv.Itoa(int(ps[0].ContainerPort)) + pod.Annotations[annotationPort] = strconv.Itoa(int(ps[0].ContainerPort)) } } } @@ -555,7 +426,7 @@ func (h *Handler) defaultAnnotations(pod *corev1.Pod, patches *[]jsonpatch.JsonP // enableMetrics returns the default value in the handler, or overrides that // with the annotation if provided. -func (h *Handler) enableMetrics(pod *corev1.Pod) (bool, error) { +func (h *Handler) enableMetrics(pod corev1.Pod) (bool, error) { enabled := h.DefaultEnableMetrics if raw, ok := pod.Annotations[annotationEnableMetrics]; ok && raw != "" { enableMetrics, err := strconv.ParseBool(raw) @@ -569,7 +440,7 @@ func (h *Handler) enableMetrics(pod *corev1.Pod) (bool, error) { // enableMetricsMerging returns the default value in the handler, or overrides // that with the annotation if provided. -func (h *Handler) enableMetricsMerging(pod *corev1.Pod) (bool, error) { +func (h *Handler) enableMetricsMerging(pod corev1.Pod) (bool, error) { enabled := h.DefaultEnableMetricsMerging if raw, ok := pod.Annotations[annotationEnableMetricsMerging]; ok && raw != "" { enableMetricsMerging, err := strconv.ParseBool(raw) @@ -583,19 +454,19 @@ func (h *Handler) enableMetricsMerging(pod *corev1.Pod) (bool, error) { // mergedMetricsPort returns the default value in the handler, or overrides // that with the annotation if provided. -func (h *Handler) mergedMetricsPort(pod *corev1.Pod) (string, error) { +func (h *Handler) mergedMetricsPort(pod corev1.Pod) (string, error) { return determineAndValidatePort(pod, annotationMergedMetricsPort, h.DefaultMergedMetricsPort, false) } // prometheusScrapePort returns the default value in the handler, or overrides // that with the annotation if provided. -func (h *Handler) prometheusScrapePort(pod *corev1.Pod) (string, error) { +func (h *Handler) prometheusScrapePort(pod corev1.Pod) (string, error) { return determineAndValidatePort(pod, annotationPrometheusScrapePort, h.DefaultPrometheusScrapePort, false) } // prometheusScrapePath returns the default value in the handler, or overrides // that with the annotation if provided. -func (h *Handler) prometheusScrapePath(pod *corev1.Pod) string { +func (h *Handler) prometheusScrapePath(pod corev1.Pod) string { if raw, ok := pod.Annotations[annotationPrometheusScrapePath]; ok && raw != "" { return raw } @@ -606,7 +477,7 @@ func (h *Handler) prometheusScrapePath(pod *corev1.Pod) string { // serviceMetricsPort returns the port the service exposes metrics on. This will // default to the port used to register the service with Consul, and can be // overridden with the annotation if provided. -func (h *Handler) serviceMetricsPort(pod *corev1.Pod) (string, error) { +func (h *Handler) serviceMetricsPort(pod corev1.Pod) (string, error) { // The annotationPort is the port used to register the service with Consul. // If that has been set, it'll be used as the port for getting service // metrics as well, unless overridden by the service-metrics-port annotation. @@ -627,7 +498,7 @@ func (h *Handler) serviceMetricsPort(pod *corev1.Pod) (string, error) { // serviceMetricsPath returns a default of /metrics, or overrides // that with the annotation if provided. -func (h *Handler) serviceMetricsPath(pod *corev1.Pod) string { +func (h *Handler) serviceMetricsPath(pod corev1.Pod) string { if raw, ok := pod.Annotations[annotationServiceMetricsPath]; ok && raw != "" { return raw } @@ -638,32 +509,30 @@ func (h *Handler) serviceMetricsPath(pod *corev1.Pod) string { // prometheusAnnotations returns the Prometheus scraping configuration // annotations. It returns a nil map if metrics are not enabled and annotations // should not be set. -func (h *Handler) prometheusAnnotations(pod *corev1.Pod) (map[string]string, error) { - enableMetrics, err := h.enableMetrics(pod) +func (h *Handler) prometheusAnnotations(pod *corev1.Pod) error { + enableMetrics, err := h.enableMetrics(*pod) if err != nil { - return map[string]string{}, err + return err } - prometheusScrapePort, err := h.prometheusScrapePort(pod) + prometheusScrapePort, err := h.prometheusScrapePort(*pod) if err != nil { - return map[string]string{}, err + return err } - prometheusScrapePath := h.prometheusScrapePath(pod) + prometheusScrapePath := h.prometheusScrapePath(*pod) if enableMetrics { - return map[string]string{ - "prometheus.io/scrape": "true", - "prometheus.io/port": prometheusScrapePort, - "prometheus.io/path": prometheusScrapePath, - }, nil + pod.Annotations["prometheus.io/scrape"] = "true" + pod.Annotations["prometheus.io/port"] = prometheusScrapePort + pod.Annotations["prometheus.io/path"] = prometheusScrapePath } - return nil, nil + return nil } // shouldRunMergedMetricsServer returns whether we need to run a merged metrics // server. This is used to configure the consul sidecar command, and the init // container, so it can pass appropriate arguments to the consul connect envoy // command. -func (h *Handler) shouldRunMergedMetricsServer(pod *corev1.Pod) (bool, error) { +func (h *Handler) shouldRunMergedMetricsServer(pod corev1.Pod) (bool, error) { enableMetrics, err := h.enableMetrics(pod) if err != nil { return false, err @@ -693,7 +562,7 @@ func (h *Handler) shouldRunMergedMetricsServer(pod *corev1.Pod) (bool, error) { // If the privileged flag is true, it will allow the port to be in the // privileged port range of 1-1023. Otherwise, it will only allow ports in the // unprivileged range of 1024-65535. -func determineAndValidatePort(pod *corev1.Pod, annotation string, defaultPort string, privileged bool) (string, error) { +func determineAndValidatePort(pod corev1.Pod, annotation string, defaultPort string, privileged bool) (string, error) { if raw, ok := pod.Annotations[annotation]; ok && raw != "" { port, err := portValue(pod, raw) if err != nil { @@ -736,7 +605,7 @@ func (h *Handler) validatePod(pod corev1.Pod) error { return nil } -func portValue(pod *corev1.Pod, value string) (int32, error) { +func portValue(pod corev1.Pod, value string) (int32, error) { // First search for the named port for _, c := range pod.Spec.Containers { for _, p := range c.Ports { @@ -751,15 +620,7 @@ func portValue(pod *corev1.Pod, value string) (int32, error) { return int32(raw), err } -func admissionError(err error) *v1beta1.AdmissionResponse { - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: err.Error(), - }, - } -} - -func findServiceAccountVolumeMount(pod *corev1.Pod) (corev1.VolumeMount, error) { +func findServiceAccountVolumeMount(pod corev1.Pod) (corev1.VolumeMount, error) { // Find the volume mount that is mounted at the known // service account token location var volumeMount corev1.VolumeMount @@ -779,3 +640,8 @@ func findServiceAccountVolumeMount(pod *corev1.Pod) (corev1.VolumeMount, error) return volumeMount, nil } + +func (h *Handler) InjectDecoder(d *admission.Decoder) error { + h.decoder = d + return nil +} diff --git a/connect-inject/handler_ent_test.go b/connect-inject/handler_ent_test.go index ca079acf47..7c63b7cf8b 100644 --- a/connect-inject/handler_ent_test.go +++ b/connect-inject/handler_ent_test.go @@ -3,7 +3,7 @@ package connectinject import ( - "encoding/json" + "context" "testing" "time" @@ -12,10 +12,13 @@ import ( "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" - "github.com/mattbaird/jsonpatch" "github.com/stretchr/testify/require" "k8s.io/api/admission/v1beta1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) // This tests the checkAndCreate namespace function that is called @@ -32,126 +35,148 @@ func TestHandler_MutateWithNamespaces(t *testing.T) { }, }, } + s := runtime.NewScheme() + s.AddKnownTypes(schema.GroupVersion{Group: "", Version: "v1"}, &corev1.Pod{}) + decoder, err := admission.NewDecoder(s) + require.NoError(t, err) cases := []struct { Name string Handler Handler - Req v1beta1.AdmissionRequest + Req admission.Request ExpectedNamespaces []string }{ { - "single destination namespace 'default' from k8s 'default'", - Handler{ + Name: "single destination namespace 'default' from k8s 'default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "default", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "default", + }, }, - []string{"default"}, + ExpectedNamespaces: []string{"default"}, }, { - "single destination namespace 'default' from k8s 'non-default'", - Handler{ + Name: "single destination namespace 'default' from k8s 'non-default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "default", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "non-default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "non-default", + }, }, - []string{"default"}, + ExpectedNamespaces: []string{"default"}, }, { - "single destination namespace 'dest' from k8s 'default'", - Handler{ + Name: "single destination namespace 'dest' from k8s 'default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "dest", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "default", + }, }, - []string{"default", "dest"}, + ExpectedNamespaces: []string{"default", "dest"}, }, { - "single destination namespace 'dest' from k8s 'non-default'", - Handler{ + Name: "single destination namespace 'dest' from k8s 'non-default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "dest", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "non-default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "non-default", + }, }, - []string{"default", "dest"}, + ExpectedNamespaces: []string{"default", "dest"}, }, { - "mirroring from k8s 'default'", - Handler{ + Name: "mirroring from k8s 'default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "default", // will be overridden EnableK8SNSMirroring: true, + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "default", + }, }, - []string{"default"}, + ExpectedNamespaces: []string{"default"}, }, { - "mirroring from k8s 'dest'", - Handler{ + Name: "mirroring from k8s 'dest'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "default", // will be overridden EnableK8SNSMirroring: true, + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "dest", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "dest", + }, }, - []string{"default", "dest"}, + ExpectedNamespaces: []string{"default", "dest"}, }, { - "mirroring with prefix from k8s 'default'", - Handler{ + Name: "mirroring with prefix from k8s 'default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), @@ -159,19 +184,22 @@ func TestHandler_MutateWithNamespaces(t *testing.T) { ConsulDestinationNamespace: "default", // will be overridden EnableK8SNSMirroring: true, K8SNSMirroringPrefix: "k8s-", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "default", + }, }, - []string{"default", "k8s-default"}, + ExpectedNamespaces: []string{"default", "k8s-default"}, }, { - "mirroring with prefix from k8s 'dest'", - Handler{ + Name: "mirroring with prefix from k8s 'dest'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), @@ -179,14 +207,17 @@ func TestHandler_MutateWithNamespaces(t *testing.T) { ConsulDestinationNamespace: "default", // will be overridden EnableK8SNSMirroring: true, K8SNSMirroringPrefix: "k8s-", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "dest", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "dest", + }, }, - []string{"default", "k8s-dest"}, + ExpectedNamespaces: []string{"default", "k8s-dest"}, }, } @@ -197,6 +228,7 @@ func TestHandler_MutateWithNamespaces(t *testing.T) { // Set up consul server a, err := testutil.NewTestServerConfigT(t, nil) require.NoError(err) + a.WaitForSerfCheck(t) defer a.Stop() // Set up consul client @@ -209,7 +241,7 @@ func TestHandler_MutateWithNamespaces(t *testing.T) { tt.Handler.ConsulClient = client // Mutate! - resp := tt.Handler.Mutate(&tt.Req) + resp := tt.Handler.Handle(context.Background(), tt.Req) require.Equal(resp.Allowed, true) // Check all the namespace things @@ -251,91 +283,108 @@ func TestHandler_MutateWithNamespaces_ACLs(t *testing.T) { }, } + s := runtime.NewScheme() + s.AddKnownTypes(schema.GroupVersion{Group: "", Version: "v1"}, &corev1.Pod{}) + decoder, err := admission.NewDecoder(s) + require.NoError(t, err) + cases := []struct { Name string Handler Handler - Req v1beta1.AdmissionRequest + Req admission.Request ExpectedNamespaces []string }{ { - "acls + single destination namespace 'default' from k8s 'default'", - Handler{ + Name: "acls + single destination namespace 'default' from k8s 'default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "default", CrossNamespaceACLPolicy: "cross-namespace-policy", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "default", + }, }, - []string{"default"}, + ExpectedNamespaces: []string{"default"}, }, { - "acls + single destination namespace 'default' from k8s 'non-default'", - Handler{ + Name: "acls + single destination namespace 'default' from k8s 'non-default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "default", CrossNamespaceACLPolicy: "cross-namespace-policy", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "non-default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "non-default", + }, }, - []string{"default"}, + ExpectedNamespaces: []string{"default"}, }, { - "acls + single destination namespace 'dest' from k8s 'default'", - Handler{ + Name: "acls + single destination namespace 'dest' from k8s 'default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "dest", CrossNamespaceACLPolicy: "cross-namespace-policy", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "default", + }, }, - []string{"default", "dest"}, + ExpectedNamespaces: []string{"default", "dest"}, }, { - "acls + single destination namespace 'dest' from k8s 'non-default'", - Handler{ + Name: "acls + single destination namespace 'dest' from k8s 'non-default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), EnableNamespaces: true, ConsulDestinationNamespace: "dest", CrossNamespaceACLPolicy: "cross-namespace-policy", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "non-default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "non-default", + }, }, - []string{"default", "dest"}, + ExpectedNamespaces: []string{"default", "dest"}, }, { - "acls + mirroring from k8s 'default'", - Handler{ + Name: "acls + mirroring from k8s 'default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), @@ -343,19 +392,22 @@ func TestHandler_MutateWithNamespaces_ACLs(t *testing.T) { ConsulDestinationNamespace: "default", // will be overridden EnableK8SNSMirroring: true, CrossNamespaceACLPolicy: "cross-namespace-policy", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "default", + }, }, - []string{"default"}, + ExpectedNamespaces: []string{"default"}, }, { - "acls + mirroring from k8s 'dest'", - Handler{ + Name: "acls + mirroring from k8s 'dest'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), @@ -363,19 +415,22 @@ func TestHandler_MutateWithNamespaces_ACLs(t *testing.T) { ConsulDestinationNamespace: "default", // will be overridden EnableK8SNSMirroring: true, CrossNamespaceACLPolicy: "cross-namespace-policy", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "dest", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "dest", + }, }, - []string{"default", "dest"}, + ExpectedNamespaces: []string{"default", "dest"}, }, { - "acls + mirroring with prefix from k8s 'default'", - Handler{ + Name: "acls + mirroring with prefix from k8s 'default'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), @@ -384,19 +439,22 @@ func TestHandler_MutateWithNamespaces_ACLs(t *testing.T) { EnableK8SNSMirroring: true, K8SNSMirroringPrefix: "k8s-", CrossNamespaceACLPolicy: "cross-namespace-policy", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "default", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "default", + }, }, - []string{"default", "k8s-default"}, + ExpectedNamespaces: []string{"default", "k8s-default"}, }, { - "acls + mirroring with prefix from k8s 'dest'", - Handler{ + Name: "acls + mirroring with prefix from k8s 'dest'", + Handler: Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSet("*"), DenyK8sNamespacesSet: mapset.NewSet(), @@ -405,14 +463,17 @@ func TestHandler_MutateWithNamespaces_ACLs(t *testing.T) { EnableK8SNSMirroring: true, K8SNSMirroringPrefix: "k8s-", CrossNamespaceACLPolicy: "cross-namespace-policy", + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), - Namespace: "dest", + Req: admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + Namespace: "dest", + }, }, - []string{"default", "k8s-dest"}, + ExpectedNamespaces: []string{"default", "k8s-dest"}, }, } @@ -422,6 +483,7 @@ func TestHandler_MutateWithNamespaces_ACLs(t *testing.T) { a, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { c.ACL.Enabled = true }) + a.WaitForSerfCheck(t) defer a.Stop() // Set up a client for bootstrapping @@ -472,7 +534,7 @@ func TestHandler_MutateWithNamespaces_ACLs(t *testing.T) { require.NoError(t, err) // Mutate! - resp := tt.Handler.Mutate(&tt.Req) + resp := tt.Handler.Handle(context.Background(), tt.Req) require.Equal(t, resp.Allowed, true) // Check all the namespace things @@ -548,8 +610,14 @@ func TestHandler_MutateWithNamespaces_Annotation(t *testing.T) { // Set up consul server a, err := testutil.NewTestServerConfigT(t, nil) require.NoError(err) + a.WaitForSerfCheck(t) defer a.Stop() + s := runtime.NewScheme() + s.AddKnownTypes(schema.GroupVersion{Group: "", Version: "v1"}, &corev1.Pod{}) + decoder, err := admission.NewDecoder(s) + require.NoError(err) + // Set up consul client client, err := api.NewClient(&api.Config{ Address: a.HTTPAddr, @@ -565,29 +633,39 @@ func TestHandler_MutateWithNamespaces_Annotation(t *testing.T) { EnableK8SNSMirroring: c.Mirroring, K8SNSMirroringPrefix: c.MirroringPrefix, ConsulClient: client, + decoder: decoder, } - resp := handler.Mutate(&v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "web", - }, + pod := corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Namespace: sourceKubeNS, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "web", }, }, - }), - Namespace: sourceKubeNS, - }) + }, + } + request := admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &pod), + Namespace: sourceKubeNS, + }, + } + resp := handler.Handle(context.Background(), request) require.Equal(resp.Allowed, true) // Check that the annotation was added as a patch. var consulNamespaceAnnotationValue string - var patches []jsonpatch.JsonPatchOperation - require.NoError(json.Unmarshal(resp.Patch, &patches)) - for _, patch := range patches { - if patch.Path == "/metadata/annotations/"+escapeJSONPointer(annotationConsulNamespace) { - consulNamespaceAnnotationValue = patch.Value.(string) + for _, patch := range resp.Patches { + if patch.Path == "/metadata/annotations" { + for annotationName, annotationValue := range patch.Value.(map[string]interface{}) { + if annotationName == annotationConsulNamespace { + consulNamespaceAnnotationValue = annotationValue.(string) + } + } } } require.NotEmpty(consulNamespaceAnnotationValue, "no namespace annotation set") diff --git a/connect-inject/handler_test.go b/connect-inject/handler_test.go index ca785ea9bb..68d7877ff7 100644 --- a/connect-inject/handler_test.go +++ b/connect-inject/handler_test.go @@ -1,19 +1,21 @@ package connectinject import ( + "context" "encoding/json" - "net/http" - "net/http/httptest" + "fmt" "testing" "github.com/deckarep/golang-set" "github.com/hashicorp/go-hclog" - "github.com/mattbaird/jsonpatch" "github.com/stretchr/testify/require" + "gomodules.xyz/jsonpatch/v2" "k8s.io/api/admission/v1beta1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) func TestHandlerHandle(t *testing.T) { @@ -24,13 +26,20 @@ func TestHandlerHandle(t *testing.T) { }, }, } + s := runtime.NewScheme() + s.AddKnownTypes(schema.GroupVersion{ + Group: "", + Version: "v1", + }, &corev1.Pod{}) + decoder, err := admission.NewDecoder(s) + require.NoError(t, err) cases := []struct { Name string Handler Handler - Req v1beta1.AdmissionRequest + Req admission.Request Err string // expected error string, not exact - Patches []jsonpatch.JsonPatchOperation + Patches []jsonpatch.Operation }{ { "kube-system namespace", @@ -38,12 +47,15 @@ func TestHandlerHandle(t *testing.T) { Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSetWith("*"), DenyK8sNamespacesSet: mapset.NewSet(), + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Namespace: metav1.NamespaceSystem, - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), + admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Namespace: metav1.NamespaceSystem, + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + }, }, "", nil, @@ -55,17 +67,19 @@ func TestHandlerHandle(t *testing.T) { Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSetWith("*"), DenyK8sNamespacesSet: mapset.NewSet(), - }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - annotationStatus: injected, + decoder: decoder, + }, + admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + annotationStatus: injected, + }, }, - }, - - Spec: basicSpec, - }), + Spec: basicSpec, + }), + }, }, "", nil, @@ -77,45 +91,40 @@ func TestHandlerHandle(t *testing.T) { Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSetWith("*"), DenyK8sNamespacesSet: mapset.NewSet(), + decoder: decoder, }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - }), + admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + }), + }, }, "", - []jsonpatch.JsonPatchOperation{ - { - Operation: "add", - Path: "/metadata/annotations", - }, - { - Operation: "add", - Path: "/spec/volumes", - }, + []jsonpatch.Operation{ { Operation: "add", - Path: "/spec/initContainers", + Path: "/metadata/labels", }, { Operation: "add", - Path: "/spec/initContainers/-", + Path: "/metadata/annotations", }, { Operation: "add", - Path: "/spec/containers/-", + Path: "/spec/volumes", }, { Operation: "add", - Path: "/spec/containers/-", + Path: "/spec/initContainers", }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationStatus), + Path: "/spec/containers/1", }, { Operation: "add", - Path: "/metadata/labels", + Path: "/spec/containers/2", }, }, }, @@ -126,43 +135,37 @@ func TestHandlerHandle(t *testing.T) { Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSetWith("*"), DenyK8sNamespacesSet: mapset.NewSet(), - }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - annotationUpstreams: "echo:1234,db:1234", + decoder: decoder, + }, + admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + annotationUpstreams: "echo:1234,db:1234", + }, }, - }, - - Spec: basicSpec, - }), + Spec: basicSpec, + }), + }, }, "", - []jsonpatch.JsonPatchOperation{ + []jsonpatch.Operation{ { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationService), - }, - { - Operation: "add", - Path: "/spec/volumes", - }, - { - Operation: "add", - Path: "/spec/containers/0/env", + Path: "/metadata/labels", }, { Operation: "add", - Path: "/spec/containers/0/env/-", + Path: "/metadata/annotations/" + escapeJSONPointer(annotationStatus), }, { Operation: "add", - Path: "/spec/containers/0/env/-", + Path: "/metadata/annotations/" + escapeJSONPointer(annotationService), }, { Operation: "add", - Path: "/spec/containers/0/env/-", + Path: "/spec/volumes", }, { Operation: "add", @@ -170,23 +173,11 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/spec/initContainers/-", - }, - { - Operation: "add", - Path: "/spec/containers/-", - }, - { - Operation: "add", - Path: "/spec/containers/-", + Path: "/spec/containers/1", }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationStatus), - }, - { - Operation: "add", - Path: "/metadata/labels", + Path: "/spec/containers/2", }, }, }, @@ -197,17 +188,19 @@ func TestHandlerHandle(t *testing.T) { Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSetWith("*"), DenyK8sNamespacesSet: mapset.NewSet(), - }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - annotationInject: "false", + decoder: decoder, + }, + admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + annotationInject: "false", + }, }, - }, - - Spec: basicSpec, - }), + Spec: basicSpec, + }), + }, }, "", nil, @@ -219,20 +212,22 @@ func TestHandlerHandle(t *testing.T) { Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSetWith("*"), DenyK8sNamespacesSet: mapset.NewSet(), - }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - annotationInject: "t", + decoder: decoder, + }, + admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + annotationInject: "t", + }, }, - }, - - Spec: basicSpec, - }), + Spec: basicSpec, + }), + }, }, "", - []jsonpatch.JsonPatchOperation{ + []jsonpatch.Operation{ { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(annotationService), @@ -247,15 +242,11 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/spec/initContainers/-", + Path: "/spec/containers/1", }, { Operation: "add", - Path: "/spec/containers/-", - }, - { - Operation: "add", - Path: "/spec/containers/-", + Path: "/spec/containers/2", }, { Operation: "add", @@ -274,19 +265,22 @@ func TestHandlerHandle(t *testing.T) { Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSetWith("*"), DenyK8sNamespacesSet: mapset.NewSet(), - }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - Spec: basicSpec, - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - annotationService: "foo", + decoder: decoder, + }, + admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + annotationService: "foo", + }, }, - }, - }), + }), + }, }, "", - []jsonpatch.JsonPatchOperation{ + []jsonpatch.Operation{ { Operation: "add", Path: "/spec/volumes", @@ -297,15 +291,11 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/spec/initContainers/-", - }, - { - Operation: "add", - Path: "/spec/containers/-", + Path: "/spec/containers/1", }, { Operation: "add", - Path: "/spec/containers/-", + Path: "/spec/containers/2", }, { Operation: "add", @@ -324,24 +314,22 @@ func TestHandlerHandle(t *testing.T) { Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSetWith("*"), DenyK8sNamespacesSet: mapset.NewSet(), - }, - v1beta1.AdmissionRequest{ - Object: encodeRaw(t, &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "testLabel": "123", + decoder: decoder, + }, + admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Object: encodeRaw(t, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "testLabel": "123", + }, }, - }, - - Spec: basicSpec, - }), + Spec: basicSpec, + }), + }, }, "", - []jsonpatch.JsonPatchOperation{ - { - Operation: "add", - Path: "/metadata/annotations", - }, + []jsonpatch.Operation{ { Operation: "add", Path: "/spec/volumes", @@ -352,19 +340,15 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/spec/initContainers/-", - }, - { - Operation: "add", - Path: "/spec/containers/-", + Path: "/spec/containers/1", }, { Operation: "add", - Path: "/spec/containers/-", + Path: "/spec/containers/2", }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationStatus), + Path: "/metadata/annotations", }, { Operation: "add", @@ -377,7 +361,8 @@ func TestHandlerHandle(t *testing.T) { for _, tt := range cases { t.Run(tt.Name, func(t *testing.T) { require := require.New(t) - resp := tt.Handler.Mutate(&tt.Req) + ctx := context.Background() + resp := tt.Handler.Handle(ctx, tt.Req) if (tt.Err == "") != resp.Allowed { t.Fatalf("allowed: %v, expected err: %v", resp.Allowed, tt.Err) } @@ -386,14 +371,14 @@ func TestHandlerHandle(t *testing.T) { return } - var actual []jsonpatch.JsonPatchOperation - if len(resp.Patch) > 0 { - require.NoError(json.Unmarshal(resp.Patch, &actual)) + actual := resp.Patches + fmt.Println(resp.Patches) + if len(actual) > 0 { for i, _ := range actual { actual[i].Value = nil } } - require.Equal(tt.Patches, actual) + require.ElementsMatch(tt.Patches, actual) }) } } @@ -401,67 +386,44 @@ func TestHandlerHandle(t *testing.T) { // Test that we error out if the protocol annotation is set. func TestHandler_ErrorsOnProtocolAnnotations(t *testing.T) { require := require.New(t) + s := runtime.NewScheme() + s.AddKnownTypes(schema.GroupVersion{ + Group: "", + Version: "v1", + }, &corev1.Pod{}) + decoder, err := admission.NewDecoder(s) + require.NoError(err) + handler := Handler{ Log: hclog.Default().Named("handler"), AllowK8sNamespacesSet: mapset.NewSetWith("*"), DenyK8sNamespacesSet: mapset.NewSet(), + decoder: decoder, } - request := v1beta1.AdmissionRequest{ - Namespace: "default", - Object: encodeRaw(t, &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - annotationProtocol: "http", + request := admission.Request{ + AdmissionRequest: v1beta1.AdmissionRequest{ + Namespace: "default", + Object: encodeRaw(t, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + annotationProtocol: "http", + }, }, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "web", + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "web", + }, }, }, - }, - }), + }), + }, } - response := handler.Mutate(&request) + response := handler.Handle(context.Background(), request) require.False(response.Allowed) - require.Equal(response.Result.Message, "Error validating pod: the \"consul.hashicorp.com/connect-service-protocol\" annotation is no longer supported. Instead, create a ServiceDefaults resource (see www.consul.io/docs/k8s/crds/upgrade-to-crds)") -} - -// Test that an incorrect content type results in an error. -func TestHandlerHandle_badContentType(t *testing.T) { - req, err := http.NewRequest("POST", "/", nil) - require.NoError(t, err) - req.Header.Set("Content-Type", "text/plain") - - h := Handler{ - Log: hclog.Default().Named("handler"), - AllowK8sNamespacesSet: mapset.NewSetWith("*"), - DenyK8sNamespacesSet: mapset.NewSet(), - } - rec := httptest.NewRecorder() - h.Handle(rec, req) - require.Equal(t, http.StatusBadRequest, rec.Code) - require.Contains(t, rec.Body.String(), "content-type") -} - -// Test that no body results in an error -func TestHandlerHandle_noBody(t *testing.T) { - req, err := http.NewRequest("POST", "/", nil) - require.NoError(t, err) - req.Header.Set("Content-Type", "application/json") - - h := Handler{ - Log: hclog.Default().Named("handler"), - AllowK8sNamespacesSet: mapset.NewSetWith("*"), - DenyK8sNamespacesSet: mapset.NewSet(), - } - rec := httptest.NewRecorder() - h.Handle(rec, req) - require.Equal(t, http.StatusBadRequest, rec.Code) - require.Contains(t, rec.Body.String(), "body") + require.Equal("the \"consul.hashicorp.com/connect-service-protocol\" annotation is no longer supported. Instead, create a ServiceDefaults resource (see www.consul.io/docs/k8s/crds/upgrade-to-crds)", response.Result.Message) } func TestHandlerDefaultAnnotations(t *testing.T) { @@ -587,8 +549,7 @@ func TestHandlerDefaultAnnotations(t *testing.T) { require := require.New(t) var h Handler - var patches []jsonpatch.JsonPatchOperation - err := h.defaultAnnotations(tt.Pod, &patches) + err := h.defaultAnnotations(tt.Pod) if (tt.Err != "") != (err != nil) { t.Fatalf("actual: %v, expected err: %v", err, tt.Err) } @@ -678,7 +639,7 @@ func TestHandlerEnableMetrics(t *testing.T) { require := require.New(t) h := tt.Handler - actual, err := h.enableMetrics(tt.Pod(minimal())) + actual, err := h.enableMetrics(*tt.Pod(minimal())) if tt.Err == "" { require.Equal(tt.Expected, actual) @@ -740,7 +701,7 @@ func TestHandlerEnableMetricsMerging(t *testing.T) { require := require.New(t) h := tt.Handler - actual, err := h.enableMetricsMerging(tt.Pod(minimal())) + actual, err := h.enableMetricsMerging(*tt.Pod(minimal())) if tt.Err == "" { require.Equal(tt.Expected, actual) @@ -789,7 +750,7 @@ func TestHandlerServiceMetricsPort(t *testing.T) { require := require.New(t) h := Handler{} - actual, err := h.serviceMetricsPort(tt.Pod(minimal())) + actual, err := h.serviceMetricsPort(*tt.Pod(minimal())) require.Equal(tt.Expected, actual) require.NoError(err) @@ -825,7 +786,7 @@ func TestHandlerServiceMetricsPath(t *testing.T) { require := require.New(t) h := Handler{} - actual := h.serviceMetricsPath(tt.Pod(minimal())) + actual := h.serviceMetricsPath(*tt.Pod(minimal())) require.Equal(tt.Expected, actual) }) @@ -867,7 +828,7 @@ func TestHandlerPrometheusScrapePath(t *testing.T) { require := require.New(t) h := tt.Handler - actual := h.prometheusScrapePath(tt.Pod(minimal())) + actual := h.prometheusScrapePath(*tt.Pod(minimal())) require.Equal(tt.Expected, actual) }) @@ -882,10 +843,7 @@ func TestHandlerPrometheusAnnotations(t *testing.T) { Expected map[string]string }{ { - Name: "Returns the correct prometheus annotations", - Pod: func(pod *corev1.Pod) *corev1.Pod { - return pod - }, + Name: "Sets the correct prometheus annotations on the pod", Handler: Handler{ DefaultEnableMetrics: true, DefaultPrometheusScrapePort: "20200", @@ -898,16 +856,13 @@ func TestHandlerPrometheusAnnotations(t *testing.T) { }, }, { - Name: "Returns nil if metrics are not enabled", - Pod: func(pod *corev1.Pod) *corev1.Pod { - return pod - }, + Name: "Does not set annotations if metrics are not enabled", Handler: Handler{ DefaultEnableMetrics: false, DefaultPrometheusScrapePort: "20200", DefaultPrometheusScrapePath: "/metrics", }, - Expected: nil, + Expected: map[string]string{}, }, } @@ -915,11 +870,12 @@ func TestHandlerPrometheusAnnotations(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { require := require.New(t) h := tt.Handler + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}} - actual, err := h.prometheusAnnotations(tt.Pod(minimal())) - - require.Equal(tt.Expected, actual) + err := h.prometheusAnnotations(pod) require.NoError(err) + + require.Equal(pod.Annotations, tt.Expected) }) } } @@ -964,7 +920,7 @@ func TestHandlerShouldRunMergedMetricsServer(t *testing.T) { require := require.New(t) h := tt.Handler - actual, err := h.shouldRunMergedMetricsServer(tt.Pod(minimal())) + actual, err := h.shouldRunMergedMetricsServer(*tt.Pod(minimal())) require.Equal(tt.Expected, actual) require.NoError(err) @@ -1103,7 +1059,7 @@ func TestHandlerDetermineAndValidatePort(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { require := require.New(t) - actual, err := determineAndValidatePort(tt.Pod(minimal()), tt.Annotation, tt.DefaultPort, tt.Privileged) + actual, err := determineAndValidatePort(*tt.Pod(minimal()), tt.Annotation, tt.DefaultPort, tt.Privileged) if tt.Err == "" { require.NoError(err) @@ -1188,7 +1144,7 @@ func TestHandlerPortValue(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { require := require.New(t) - port, err := portValue(tt.Pod, tt.Value) + port, err := portValue(*tt.Pod, tt.Value) if (tt.Err != "") != (err != nil) { t.Fatalf("actual: %v, expected err: %v", err, tt.Err) } @@ -1597,7 +1553,7 @@ func TestShouldInject(t *testing.T) { DenyK8sNamespacesSet: tt.DenyK8sNamespacesSet, } - injected, err := h.shouldInject(tt.Pod, tt.K8sNamespace) + injected, err := h.shouldInject(*tt.Pod, tt.K8sNamespace) require.Equal(nil, err) require.Equal(tt.Expected, injected) diff --git a/go.mod b/go.mod index 3f1387eb79..b39f943515 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,6 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect - github.com/radovskyb/watcher v1.0.2 github.com/stretchr/testify v1.5.1 go.opencensus.io v0.22.0 // indirect go.uber.org/zap v1.10.0 diff --git a/go.sum b/go.sum index ce44b79d1e..3d84126ed8 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,4 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0 h1:ROfEUZz+Gh5pa62DJWXSaonyu3StP6EA6lPEXPI6mCo= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -130,7 +129,6 @@ github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= @@ -261,13 +259,11 @@ github.com/hashicorp/consul/sdk v0.7.0/go.mod h1:fY08Y9z5SvJqevyZNy6WWPXiG3KwBPA github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= -github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-discover v0.0.0-20200812215701-c4b85f6ed31f h1:7WFMVeuJQp6BkzuTv9O52pzwtEFVUJubKYN+zez8eTI= github.com/hashicorp/go-discover v0.0.0-20200812215701-c4b85f6ed31f/go.mod h1:D4eo8/CN92vm9/9UDG+ldX1/fMFa4kpl8qzyTolus8o= -github.com/hashicorp/go-hclog v0.12.0 h1:d4QkX8FRTYaKaCZBoXYY8zJX2BXjWxurN/GA2tkrmZM= github.com/hashicorp/go-hclog v0.12.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= github.com/hashicorp/go-hclog v0.15.0 h1:qMuK0wxsoW4D0ddCCYwPSTm4KQv1X1ke3WmPWZ0Mvsk= github.com/hashicorp/go-hclog v0.15.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= @@ -310,7 +306,6 @@ github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg= github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= -github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= github.com/jackc/pgx v3.3.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= @@ -358,7 +353,6 @@ github.com/mattbaird/jsonpatch v0.0.0-20171005235357-81af80346b1a h1:+J2gw7Bw77w github.com/mattbaird/jsonpatch v0.0.0-20171005235357-81af80346b1a/go.mod h1:M1qoD/MqPgTZIk0EWKB38wE28ACRfVcn+cU08jyArI0= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= -github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -455,8 +449,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia5qI= github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/radovskyb/watcher v1.0.2 h1:9L5TsZUbo1nKhQEQPtICVc+x9UZQ6VPdBepLHyGw/bQ= -github.com/radovskyb/watcher v1.0.2/go.mod h1:78okwvY5wPdzcb1UYnip1pvrZNIVEIh/Cm+ZuvsUYIg= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03 h1:Wdi9nwnhFNAlseAOekn6B5G/+GMtks9UKbvRU/CMM/o= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= diff --git a/helper/cert/source_disk.go b/helper/cert/source_disk.go deleted file mode 100644 index cead5b20fa..0000000000 --- a/helper/cert/source_disk.go +++ /dev/null @@ -1,103 +0,0 @@ -package cert - -import ( - "context" - "fmt" - "io/ioutil" - "time" - - "github.com/radovskyb/watcher" -) - -// DiskSource sources certificates from files on disk. It sets up a -// file watcher that detects when the content changes and sends an update -// on the configured channel. -type DiskSource struct { - CertPath string // CertPath is the path to the PEM-encoded cert - KeyPath string // KeyPath is the path to the PEM-encoded private key - CAPath string // CAPath is the path to the PEM-encoded CA root bundle (optional) - - pollInterval time.Duration -} - -// Certificate implements Source -func (s *DiskSource) Certificate(ctx context.Context, last *Bundle) (Bundle, error) { - // Setup the poll interval - pollInterval := s.pollInterval - if pollInterval == 0 { - pollInterval = 250 * time.Millisecond - } - - // Setup the file watcher. We do this first so taht there isn't a race - // between reading the files below initially and detecting a change. - w := watcher.New() - defer w.Close() - w.SetMaxEvents(1) - if err := w.Add(s.CertPath); err != nil { - return Bundle{}, err - } - if err := w.Add(s.KeyPath); err != nil { - return Bundle{}, err - } - go w.Start(pollInterval) - w.Wait() - - // At this point the file watcher is started and we can start reading - // events. But we want to load the files as-is right now so we can - // detect if there is change. - for { - // Always load the current. If they don't exist yet or something - // just return the error since the higher level system will retry. - bundle, err := s.loadCerts() - if err != nil { - return bundle, err - } - - // If there was no prior certificate bundle or the bundle has - // changed, then return it. - if last == nil || !last.Equal(&bundle) { - return bundle, nil - } - - // No change in the bundle, let's wait for a change from the watcher - select { - case <-w.Event: - // Fall through the loop so that we reload the certs from disk - - case err := <-w.Error: - return bundle, err - - case <-w.Closed: - return bundle, fmt.Errorf("filesystem watcher closed") - - case <-ctx.Done(): - return bundle, ctx.Err() - } - } -} - -func (s *DiskSource) loadCerts() (Bundle, error) { - certPEMBlock, err := ioutil.ReadFile(s.CertPath) - if err != nil { - return Bundle{}, err - } - - keyPEMBlock, err := ioutil.ReadFile(s.KeyPath) - if err != nil { - return Bundle{}, err - } - - var caPEMBlock []byte - if s.CAPath != "" { - caPEMBlock, err = ioutil.ReadFile(s.CAPath) - if err != nil { - return Bundle{}, err - } - } - - return Bundle{ - Cert: certPEMBlock, - Key: keyPEMBlock, - CACert: caPEMBlock, - }, nil -} diff --git a/helper/cert/source_disk_test.go b/helper/cert/source_disk_test.go deleted file mode 100644 index c34e63e49e..0000000000 --- a/helper/cert/source_disk_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package cert - -import ( - "context" - "io/ioutil" - "os" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -// Test that an error is immediately returned with no files -func TestGenDisk_noExist(t *testing.T) { - t.Parallel() - require := require.New(t) - - td, err := ioutil.TempDir("", "consul") - require.NoError(err) - defer os.RemoveAll(td) - - source := &DiskSource{ - CertPath: filepath.Join(td, "nope.pem"), - KeyPath: filepath.Join(td, "nope.pem"), - CAPath: filepath.Join(td, "nope.pem"), - } - _, err = source.Certificate(context.Background(), nil) - require.Error(err) -} - -// Test that the initial bundles are returned -func TestGenDisk_initial(t *testing.T) { - t.Parallel() - require := require.New(t) - - td := testBundleDir(t, testBundle(t), "") - defer os.RemoveAll(td) - - source := &DiskSource{ - CertPath: filepath.Join(td, "leaf.pem"), - KeyPath: filepath.Join(td, "leaf.key.pem"), - CAPath: filepath.Join(td, "ca.pem"), - } - bundle, err := source.Certificate(context.Background(), nil) - require.NoError(err) - testBundleVerify(t, &bundle) -} - -// Test that cert will block until the contents change -func TestGenDisk_blockWrite(t *testing.T) { - t.Parallel() - require := require.New(t) - - td := testBundleDir(t, testBundle(t), "") - defer os.RemoveAll(td) - - source := &DiskSource{ - CertPath: filepath.Join(td, "leaf.pem"), - KeyPath: filepath.Join(td, "leaf.key.pem"), - CAPath: filepath.Join(td, "ca.pem"), - pollInterval: 5 * time.Millisecond, // Fast for tests - } - bundle, err := source.Certificate(context.Background(), nil) - require.NoError(err) - testBundleVerify(t, &bundle) - - // Start waiting for the next bundle - nextCh := make(chan *Bundle, 1) - go func() { - next, err := source.Certificate(context.Background(), &bundle) - require.NoError(err) - nextCh <- &next - }() - - // It should not be received yet since no updates - select { - case <-nextCh: - t.Fatal("should not have received next") - case <-time.After(1000 * time.Millisecond): - } - - // Update the file - testBundleDir(t, testBundle(t), td) - - // It should not be received yet since no updates - select { - case <-time.After(500 * time.Millisecond): - t.Fatal("should receive update") - - case next := <-nextCh: - testBundleVerify(t, next) - } -} diff --git a/subcommand/inject-connect/command.go b/subcommand/inject-connect/command.go index e72ba17bef..bfcc6ae9e0 100644 --- a/subcommand/inject-connect/command.go +++ b/subcommand/inject-connect/command.go @@ -3,7 +3,6 @@ package connectinject import ( "context" "crypto/tls" - "encoding/base64" "flag" "fmt" "io/ioutil" @@ -11,6 +10,7 @@ import ( "net/url" "os" "os/signal" + "strconv" "strings" "sync" "sync/atomic" @@ -19,34 +19,32 @@ import ( connectinject "github.com/hashicorp/consul-k8s/connect-inject" "github.com/hashicorp/consul-k8s/consul" - "github.com/hashicorp/consul-k8s/helper/cert" "github.com/hashicorp/consul-k8s/helper/controller" "github.com/hashicorp/consul-k8s/subcommand/common" "github.com/hashicorp/consul-k8s/subcommand/flags" "github.com/hashicorp/consul/api" "github.com/mitchellh/cli" + "go.uber.org/zap/zapcore" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/webhook" ) type Command struct { UI cli.Ui flagListen string - flagAutoName string // MutatingWebhookConfiguration for updating - flagAutoHosts string // SANs for the auto-generated TLS cert. - flagCertFile string // TLS cert for listening (PEM) - flagKeyFile string // TLS cert private key (PEM) + flagCertDir string // Directory with TLS certs for listening (PEM) flagDefaultInject bool // True to inject by default flagConsulImage string // Docker image for Consul flagEnvoyImage string // Docker image for Envoy @@ -132,14 +130,8 @@ func (c *Command) init() { c.flagSet = flag.NewFlagSet("", flag.ContinueOnError) c.flagSet.StringVar(&c.flagListen, "listen", ":8080", "Address to bind listener to.") c.flagSet.BoolVar(&c.flagDefaultInject, "default-inject", true, "Inject by default.") - c.flagSet.StringVar(&c.flagAutoName, "tls-auto", "", - "MutatingWebhookConfiguration name. If specified, will auto generate cert bundle.") - c.flagSet.StringVar(&c.flagAutoHosts, "tls-auto-hosts", "", - "Comma-separated hosts for auto-generated TLS cert. If specified, will auto generate cert bundle.") - c.flagSet.StringVar(&c.flagCertFile, "tls-cert-file", "", - "PEM-encoded TLS certificate to serve. If blank, will generate random cert.") - c.flagSet.StringVar(&c.flagKeyFile, "tls-key-file", "", - "PEM-encoded TLS private key to serve. If blank, will generate random cert.") + c.flagSet.StringVar(&c.flagCertDir, "tls-cert-dir", "", + "Directory with PEM-encoded TLS certificate and key to serve.") c.flagSet.StringVar(&c.flagConsulImage, "consul-image", "", "Docker image for Consul.") c.flagSet.StringVar(&c.flagEnvoyImage, "envoy-image", "", @@ -327,12 +319,12 @@ func (c *Command) Run(args []string) int { if c.clientset == nil { config, err := rest.InClusterConfig() if err != nil { - c.UI.Error(fmt.Sprintf("Error loading in-cluster K8S config: %s", err)) + c.UI.Error(fmt.Sprintf("error loading in-cluster K8S config: %s", err)) return 1 } c.clientset, err = kubernetes.NewForConfig(config) if err != nil { - c.UI.Error(fmt.Sprintf("Error creating K8S client: %s", err)) + c.UI.Error(fmt.Sprintf("error creating K8S client: %s", err)) return 1 } } @@ -350,7 +342,7 @@ func (c *Command) Run(args []string) int { } consulURL, err := url.Parse(consulURLRaw) if err != nil { - c.UI.Error(fmt.Sprintf("Error parsing consul address %q: %s", consulURLRaw, err)) + c.UI.Error(fmt.Sprintf("error parsing consul address %q: %s", consulURLRaw, err)) return 1 } @@ -360,7 +352,7 @@ func (c *Command) Run(args []string) int { var err error consulCACert, err = ioutil.ReadFile(cfg.TLSConfig.CAFile) if err != nil { - c.UI.Error(fmt.Sprintf("Error reading Consul's CA cert file %q: %s", cfg.TLSConfig.CAFile, err)) + c.UI.Error(fmt.Sprintf("error reading Consul's CA cert file %q: %s", cfg.TLSConfig.CAFile, err)) return 1 } } @@ -370,137 +362,116 @@ func (c *Command) Run(args []string) int { var err error c.consulClient, err = consul.NewClient(cfg) if err != nil { - c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + c.UI.Error(fmt.Sprintf("error connecting to Consul agent: %s", err)) return 1 } } - // Determine where to source the certificates from - var certSource cert.Source = &cert.GenSource{ - Name: "Connect Inject", - Hosts: strings.Split(c.flagAutoHosts, ","), - } - if c.flagCertFile != "" { - certSource = &cert.DiskSource{ - CertPath: c.flagCertFile, - KeyPath: c.flagKeyFile, - } - } - - // Create the certificate notifier so we can update for certificates, - // then start all the background routines for updating certificates. - certCh := make(chan cert.MetaBundle) - certNotify := &cert.Notify{Ch: certCh, Source: certSource} - defer certNotify.Stop() - go certNotify.Start(context.Background()) + // Create a context to be used by the processes started in this command. ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() - go c.certWatcher(ctx, certCh, c.clientset) // Convert allow/deny lists to sets allowK8sNamespaces := flags.ToSet(c.flagAllowK8sNamespacesList) denyK8sNamespaces := flags.ToSet(c.flagDenyK8sNamespacesList) - // Build the HTTP handler and server - injector := connectinject.Handler{ - ConsulClient: c.consulClient, - ImageConsul: c.flagConsulImage, - ImageEnvoy: c.flagEnvoyImage, - EnvoyExtraArgs: c.flagEnvoyExtraArgs, - ImageConsulK8S: c.flagConsulK8sImage, - RequireAnnotation: !c.flagDefaultInject, - AuthMethod: c.flagACLAuthMethod, - ConsulCACert: string(consulCACert), - DefaultProxyCPURequest: sidecarProxyCPURequest, - DefaultProxyCPULimit: sidecarProxyCPULimit, - DefaultProxyMemoryRequest: sidecarProxyMemoryRequest, - DefaultProxyMemoryLimit: sidecarProxyMemoryLimit, - DefaultEnableMetrics: c.flagDefaultEnableMetrics, - DefaultEnableMetricsMerging: c.flagDefaultEnableMetricsMerging, - DefaultMergedMetricsPort: c.flagDefaultMergedMetricsPort, - DefaultPrometheusScrapePort: c.flagDefaultPrometheusScrapePort, - DefaultPrometheusScrapePath: c.flagDefaultPrometheusScrapePath, - InitContainerResources: initResources, - ConsulSidecarResources: consulSidecarResources, - EnableNamespaces: c.flagEnableNamespaces, - AllowK8sNamespacesSet: allowK8sNamespaces, - DenyK8sNamespacesSet: denyK8sNamespaces, - ConsulDestinationNamespace: c.flagConsulDestinationNamespace, - EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, - K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, - CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, - Log: logger.Named("handler"), - } - mux := http.NewServeMux() - mux.HandleFunc("/mutate", injector.Handle) - mux.HandleFunc("/health/ready", c.handleReady) - var handler http.Handler = mux - serverErrors := make(chan error) - server := &http.Server{ - Addr: c.flagListen, - Handler: handler, - TLSConfig: &tls.Config{GetCertificate: c.getCertificate}, - } - - // Start the mutating webhook server. - go func() { - c.UI.Info(fmt.Sprintf("Listening on %q...", c.flagListen)) - if err := server.ListenAndServeTLS("", ""); err != nil { - c.UI.Error(fmt.Sprintf("Error listening: %s", err)) - serverErrors <- err - } - }() - // Create a channel for all controllers' exits. ctrlExitCh := make(chan error) // TODO: future PR to enable this and disable the old service registration path // Create a manager for endpoints controller and the mutating webhook. - //zapLogger := zap.New(zap.UseDevMode(true), zap.Level(zapcore.InfoLevel)) - //ctrl.SetLogger(zapLogger) - //klog.SetLogger(zapLogger) - //mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - // Scheme: scheme, - // LeaderElection: false, - // Logger: zapLogger, - // MetricsBindAddress: "0.0.0.0:9444", - //}) - //if err != nil { - // setupLog.Error(err, "unable to start manager") - // return 1 - //} - //// Start the endpoints controller. - //if err = (&connectinject.EndpointsController{ - // Client: mgr.GetClient(), - // ConsulClient: c.consulClient, - // ConsulScheme: consulURL.Scheme, - // ConsulPort: consulURL.Port(), - // AllowK8sNamespacesSet: allowK8sNamespaces, - // DenyK8sNamespacesSet: denyK8sNamespaces, - // Log: ctrl.Log.WithName("controller").WithName("endpoints-controller"), - // Scheme: mgr.GetScheme(), - // Ctx: ctx, - // ReleaseName: c.flagReleaseName, - // ReleaseNamespace: c.flagReleaseNamespace, - //}).SetupWithManager(mgr); err != nil { - // setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{}) - // return 1 - //} - // - //// todo: Add tests in case it's not refactored to not have any signal handling - //// (In the future, we plan to only have the manager and rely on it to do signal handling for us). - //go func() { - // // Pass existing context's done channel so that the controller - // // will stop when this context is canceled. - // // This could be due to an interrupt signal or if any other component did not start - // // successfully. In those cases, we want to make sure that this controller is no longer - // // running. - // if err := mgr.Start(ctx.Done()); err != nil { - // setupLog.Error(err, "problem running manager") - // // Use an existing channel for ctrl exists in case manager failed to start properly. - // ctrlExitCh <- fmt.Errorf("endpoints controller exited unexpectedly") - // } - //}() + // Note: the webhook refactor PR will use this manager for the mutating webhook. + zapLogger := zap.New(zap.UseDevMode(true), zap.Level(zapcore.InfoLevel)) + ctrl.SetLogger(zapLogger) + klog.SetLogger(zapLogger) + listenSplits := strings.SplitN(c.flagListen, ":", 2) + if len(listenSplits) < 2 { + c.UI.Error(fmt.Sprintf("missing port in address: %s", c.flagListen)) + return 1 + } + port, err := strconv.Atoi(listenSplits[1]) + if err != nil { + c.UI.Error(fmt.Sprintf("unable to parse port string: %s", err)) + return 1 + } + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + LeaderElection: false, + Host: listenSplits[0], + Port: port, + Logger: zapLogger, + MetricsBindAddress: "0.0.0.0:9444", + }) + if err != nil { + setupLog.Error(err, "unable to start manager") + return 1 + } + // Start the endpoints controller + if err = (&connectinject.EndpointsController{ + Client: mgr.GetClient(), + ConsulClient: c.consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + AllowK8sNamespacesSet: allowK8sNamespaces, + DenyK8sNamespacesSet: denyK8sNamespaces, + Log: ctrl.Log.WithName("controller").WithName("endpoints-controller"), + Scheme: mgr.GetScheme(), + Ctx: ctx, + ReleaseName: c.flagReleaseName, + ReleaseNamespace: c.flagReleaseNamespace, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{}) + return 1 + } + + mgr.GetWebhookServer().CertDir = c.flagCertDir + + mgr.GetWebhookServer().Register("/mutate", + &webhook.Admission{Handler: &connectinject.Handler{ + ConsulClient: c.consulClient, + ImageConsul: c.flagConsulImage, + ImageEnvoy: c.flagEnvoyImage, + EnvoyExtraArgs: c.flagEnvoyExtraArgs, + ImageConsulK8S: c.flagConsulK8sImage, + RequireAnnotation: !c.flagDefaultInject, + AuthMethod: c.flagACLAuthMethod, + ConsulCACert: string(consulCACert), + DefaultProxyCPURequest: sidecarProxyCPURequest, + DefaultProxyCPULimit: sidecarProxyCPULimit, + DefaultProxyMemoryRequest: sidecarProxyMemoryRequest, + DefaultProxyMemoryLimit: sidecarProxyMemoryLimit, + DefaultEnableMetrics: c.flagDefaultEnableMetrics, + DefaultEnableMetricsMerging: c.flagDefaultEnableMetricsMerging, + DefaultMergedMetricsPort: c.flagDefaultMergedMetricsPort, + DefaultPrometheusScrapePort: c.flagDefaultPrometheusScrapePort, + DefaultPrometheusScrapePath: c.flagDefaultPrometheusScrapePath, + InitContainerResources: initResources, + ConsulSidecarResources: consulSidecarResources, + EnableNamespaces: c.flagEnableNamespaces, + AllowK8sNamespacesSet: allowK8sNamespaces, + DenyK8sNamespacesSet: denyK8sNamespaces, + ConsulDestinationNamespace: c.flagConsulDestinationNamespace, + EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, + K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, + CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, + Log: logger.Named("handler"), + }}) + + // todo: Add tests in case it's not refactored to not have any signal handling + // (In the future, we plan to only have the manager and rely on it to do signal handling for us). + go func() { + // Pass existing context's done channel so that the controller + // will stop when this context is canceled. + // This could be due to an interrupt signal or if any other component did not start + // successfully. In those cases, we want to make sure that this controller is no longer + // running. + if err := mgr.Start(ctx.Done()); err != nil { + setupLog.Error(err, "problem running manager") + // Use an existing channel for ctrl exists in case manager failed to start properly. + ctrlExitCh <- fmt.Errorf("endpoints controller exited unexpectedly") + } + }() // Start the cleanup controller that cleans up Consul service instances // still registered after the pod has been deleted (usually due to a force delete). @@ -559,15 +530,8 @@ func (c *Command) Run(args []string) int { select { case sig := <-c.sigCh: c.UI.Info(fmt.Sprintf("%s received, shutting down", sig)) - if err := server.Close(); err != nil { - c.UI.Error(fmt.Sprintf("shutting down server: %v", err)) - return 1 - } return 0 - case <-serverErrors: - return 1 - case err := <-ctrlExitCh: c.UI.Error(fmt.Sprintf("controller error: %v", err)) return 1 @@ -598,57 +562,6 @@ func (c *Command) getCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) return certRaw.(*tls.Certificate), nil } -func (c *Command) certWatcher(ctx context.Context, ch <-chan cert.MetaBundle, clientset kubernetes.Interface) { - var bundle cert.MetaBundle - for { - select { - case bundle = <-ch: - c.UI.Output("Updated certificate bundle received. Updating certs...") - // Bundle is updated, set it up - - case <-time.After(1 * time.Second): - // This forces the mutating webhook config to remain updated - // fairly quickly. This is a jank way to do this and we should - // look to improve it in the future. Since we use Patch requests - // it is pretty cheap to do, though. - - case <-ctx.Done(): - // Quit - return - } - - cert, err := tls.X509KeyPair(bundle.Cert, bundle.Key) - if err != nil { - c.UI.Error(fmt.Sprintf("Error loading TLS keypair: %s", err)) - continue - } - - // If there is a MWC name set, then update the CA bundle. - if c.flagAutoName != "" && len(bundle.CACert) > 0 { - // The CA Bundle value must be base64 encoded - value := base64.StdEncoding.EncodeToString(bundle.CACert) - - _, err := clientset.AdmissionregistrationV1beta1(). - MutatingWebhookConfigurations(). - Patch(context.TODO(), c.flagAutoName, types.JSONPatchType, []byte(fmt.Sprintf( - `[{ - "op": "add", - "path": "/webhooks/0/clientConfig/caBundle", - "value": %q - }]`, value)), metav1.PatchOptions{}) - if err != nil { - c.UI.Error(fmt.Sprintf( - "Error updating MutatingWebhookConfiguration: %s", - err)) - continue - } - } - - // Update the certificate - c.cert.Store(&cert) - } -} - func (c *Command) parseAndValidateResourceFlags() (corev1.ResourceRequirements, corev1.ResourceRequirements, error) { // Init container var initContainerCPULimit, initContainerCPURequest, initContainerMemoryLimit, initContainerMemoryRequest resource.Quantity diff --git a/subcommand/inject-connect/command_test.go b/subcommand/inject-connect/command_test.go index 1e7f6d5cf0..dc9c0a0e3c 100644 --- a/subcommand/inject-connect/command_test.go +++ b/subcommand/inject-connect/command_test.go @@ -49,7 +49,7 @@ func TestRun_FlagValidation(t *testing.T) { { flags: []string{"-consul-k8s-image", "foo", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", "-ca-file", "bar"}, - expErr: "Error reading Consul's CA cert file \"bar\"", + expErr: "error reading Consul's CA cert file \"bar\"", }, { flags: []string{"-consul-k8s-image", "foo", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", @@ -153,6 +153,20 @@ func TestRun_FlagValidation(t *testing.T) { }, expErr: "request must be <= limit: -consul-sidecar-cpu-request value of \"50m\" is greater than the -consul-sidecar-cpu-limit value of \"25m\"", }, + { + flags: []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", + "-enable-health-checks-controller=true", + "-http-addr=http://0.0.0.0:9999", + "-listen", "999999"}, + expErr: "missing port in address: 999999", + }, + { + flags: []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", + "-enable-health-checks-controller=true", + "-http-addr=http://0.0.0.0:9999", + "-listen", ":foobar"}, + expErr: "unable to parse port string: strconv.Atoi: parsing \"foobar\": invalid syntax", + }, } for _, c := range cases { @@ -188,55 +202,21 @@ func TestRun_ResourceLimitDefaults(t *testing.T) { } func TestRun_ValidationConsulHTTPAddr(t *testing.T) { - cases := []struct { - name string - envVars []string - flags []string - expErr string - }{ - { - envVars: []string{api.HTTPAddrEnvName, "%"}, - flags: []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", - "-enable-health-checks-controller=true"}, - expErr: "Error parsing consul address \"http://%\": parse \"http://%\": invalid URL escape \"%", - }, - } - for _, c := range cases { - t.Run(c.expErr, func(t *testing.T) { - k8sClient := fake.NewSimpleClientset() - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - clientset: k8sClient, - } - os.Setenv(c.envVars[0], c.envVars[1]) - code := cmd.Run(c.flags) - os.Unsetenv(c.envVars[0]) - require.Equal(t, 1, code) - require.Contains(t, ui.ErrorWriter.String(), c.expErr) - }) - } -} - -// Test that with health checks enabled, if the listener fails to bind that -// everything shuts down gracefully and the command exits. -func TestRun_CommandFailsWithInvalidListener(t *testing.T) { - // TODO: fix this skip - t.Skip("This test will be fixed in an upcoming webhook refactor PR") k8sClient := fake.NewSimpleClientset() ui := cli.NewMockUi() cmd := Command{ UI: ui, clientset: k8sClient, } - code := cmd.Run([]string{ - "-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", - "-enable-health-checks-controller=true", - "-http-addr=http://0.0.0.0:9999", - "-listen", "999999", - }) + flags := []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", + "-enable-health-checks-controller=true"} + + os.Setenv(api.HTTPAddrEnvName, "%") + code := cmd.Run(flags) + os.Unsetenv(api.HTTPAddrEnvName) + require.Equal(t, 1, code) - require.Contains(t, ui.ErrorWriter.String(), "Error listening: listen tcp: address 999999: missing port in address") + require.Contains(t, ui.ErrorWriter.String(), "error parsing consul address \"http://%\": parse \"http://%\": invalid URL escape \"%") } // Test that when healthchecks are enabled that SIGINT/SIGTERM exits the @@ -276,7 +256,7 @@ func testSignalHandling(sig os.Signal) func(*testing.T) { select { case exitCode := <-exitChan: require.Equal(t, 0, exitCode, ui.ErrorWriter.String()) - case <-time.After(time.Second * 1): + case <-time.After(time.Second * 2): // Fail if the stopCh was not caught. require.Fail(t, "timeout waiting for command to exit") }