diff --git a/charts/consul/test/unit/connect-inject-deployment.bats b/charts/consul/test/unit/connect-inject-deployment.bats index bc3156fca5..f6e19262ed 100755 --- a/charts/consul/test/unit/connect-inject-deployment.bats +++ b/charts/consul/test/unit/connect-inject-deployment.bats @@ -1075,147 +1075,6 @@ load _helpers [ "${actual}" = "false" ] } -#-------------------------------------------------------------------- -# consul sidecar resources - -@test "connectInject/Deployment: default consul sidecar container resources" { - cd `chart_dir` - local cmd=$(helm template \ - -s templates/connect-inject-deployment.yaml \ - --set 'connectInject.enabled=true' \ - . | tee /dev/stderr | - yq '.spec.template.spec.containers[0].command' | tee /dev/stderr) - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-request=25Mi"))' | tee /dev/stderr) - [ "${actual}" = "true" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-request=20m"))' | tee /dev/stderr) - [ "${actual}" = "true" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-limit=50Mi"))' | tee /dev/stderr) - [ "${actual}" = "true" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-limit=20m"))' | tee /dev/stderr) - [ "${actual}" = "true" ] -} - -@test "connectInject/Deployment: consul sidecar container resources can be set" { - cd `chart_dir` - local cmd=$(helm template \ - -s templates/connect-inject-deployment.yaml \ - --set 'connectInject.enabled=true' \ - --set 'global.consulSidecarContainer.resources.requests.memory=100Mi' \ - --set 'global.consulSidecarContainer.resources.requests.cpu=100m' \ - --set 'global.consulSidecarContainer.resources.limits.memory=200Mi' \ - --set 'global.consulSidecarContainer.resources.limits.cpu=200m' \ - . | tee /dev/stderr | - yq '.spec.template.spec.containers[0].command' | tee /dev/stderr) - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-request=100Mi"))' | tee /dev/stderr) - [ "${actual}" = "true" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-request=100m"))' | tee /dev/stderr) - [ "${actual}" = "true" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-limit=200Mi"))' | tee /dev/stderr) - [ "${actual}" = "true" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-limit=200m"))' | tee /dev/stderr) - [ "${actual}" = "true" ] -} - -@test "connectInject/Deployment: consul sidecar container resources can be set explicitly to 0" { - cd `chart_dir` - local cmd=$(helm template \ - -s templates/connect-inject-deployment.yaml \ - --set 'connectInject.enabled=true' \ - --set 'global.consulSidecarContainer.resources.requests.memory=0' \ - --set 'global.consulSidecarContainer.resources.requests.cpu=0' \ - --set 'global.consulSidecarContainer.resources.limits.memory=0' \ - --set 'global.consulSidecarContainer.resources.limits.cpu=0' \ - . | tee /dev/stderr | - yq '.spec.template.spec.containers[0].command' | tee /dev/stderr) - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-request=0"))' | tee /dev/stderr) - [ "${actual}" = "true" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-request=0"))' | tee /dev/stderr) - [ "${actual}" = "true" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-limit=0"))' | tee /dev/stderr) - [ "${actual}" = "true" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-limit=0"))' | tee /dev/stderr) - [ "${actual}" = "true" ] -} - -@test "connectInject/Deployment: consul sidecar container resources can be individually set to null" { - cd `chart_dir` - local cmd=$(helm template \ - -s templates/connect-inject-deployment.yaml \ - --set 'connectInject.enabled=true' \ - --set 'global.consulSidecarContainer.resources.requests.memory=null' \ - --set 'global.consulSidecarContainer.resources.requests.cpu=null' \ - --set 'global.consulSidecarContainer.resources.limits.memory=null' \ - --set 'global.consulSidecarContainer.resources.limits.cpu=null' \ - . | tee /dev/stderr | - yq '.spec.template.spec.containers[0].command' | tee /dev/stderr) - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-request"))' | tee /dev/stderr) - [ "${actual}" = "false" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-request"))' | tee /dev/stderr) - [ "${actual}" = "false" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-limit"))' | tee /dev/stderr) - [ "${actual}" = "false" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-limit"))' | tee /dev/stderr) - [ "${actual}" = "false" ] -} - -@test "connectInject/Deployment: consul sidecar container resources can be set to null" { - cd `chart_dir` - local cmd=$(helm template \ - -s templates/connect-inject-deployment.yaml \ - --set 'connectInject.enabled=true' \ - --set 'global.consulSidecarContainer.resources=null' \ - . | tee /dev/stderr | - yq '.spec.template.spec.containers[0].command' | tee /dev/stderr) - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-request"))' | tee /dev/stderr) - [ "${actual}" = "false" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-request"))' | tee /dev/stderr) - [ "${actual}" = "false" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-memory-limit"))' | tee /dev/stderr) - [ "${actual}" = "false" ] - - local actual=$(echo "$cmd" | - yq 'any(contains("-default-consul-sidecar-cpu-limit"))' | tee /dev/stderr) - [ "${actual}" = "false" ] -} - #-------------------------------------------------------------------- # sidecarProxy.resources diff --git a/charts/consul/values.yaml b/charts/consul/values.yaml index 2e9b9d4cf4..899140e36b 100644 --- a/charts/consul/values.yaml +++ b/charts/consul/values.yaml @@ -590,28 +590,6 @@ global: # @type: boolean enableGatewayMetrics: true - # For connect-injected pods, the consul sidecar is responsible for metrics merging. For ingress/mesh/terminating - # gateways, it additionally ensures the Consul services are always registered with their local Consul client. - # @type: map - consulSidecarContainer: - # Set default resources for consul sidecar. If null, that resource won't - # be set. - # These settings can be overridden on a per-pod basis via these annotations: - # - # - `consul.hashicorp.com/consul-sidecar-cpu-limit` - # - `consul.hashicorp.com/consul-sidecar-cpu-request` - # - `consul.hashicorp.com/consul-sidecar-memory-limit` - # - `consul.hashicorp.com/consul-sidecar-memory-request` - # @recurse: false - # @type: map - resources: - requests: - memory: "25Mi" - cpu: "20m" - limits: - memory: "50Mi" - cpu: "20m" - # The name (and tag) of the Envoy Docker image used for the # connect-injected sidecar proxies and mesh, terminating, and ingress gateways. # See https://www.consul.io/docs/connect/proxies/envoy for full compatibility matrix between Consul and Envoy. @@ -2089,18 +2067,18 @@ connectInject: # add a listener on the Envoy sidecar to expose metrics. The exposed # metrics will depend on whether metrics merging is enabled: # - If metrics merging is enabled: - # the Consul sidecar will run a merged metrics server + # the consul-dataplane will run a merged metrics server # combining Envoy sidecar and Connect service metrics, # i.e. if your service exposes its own Prometheus metrics. # - If metrics merging is disabled: # the listener will just expose Envoy sidecar metrics. # This will inherit from `global.metrics.enabled`. defaultEnabled: "-" - # Configures the Consul sidecar to run a merged metrics server + # Configures the consul-dataplane to run a merged metrics server # to combine and serve both Envoy and Connect service metrics. # This feature is available only in Consul v1.10.0 or greater. defaultEnableMerging: false - # Configures the port at which the Consul sidecar will listen on to return + # Configures the port at which the consul-dataplane will listen on to return # combined metrics. This port only needs to be changed if it conflicts with # the application's ports. defaultMergedMetricsPort: 20100 diff --git a/control-plane/commands.go b/control-plane/commands.go index a027f7b6c1..d3347cf29e 100644 --- a/control-plane/commands.go +++ b/control-plane/commands.go @@ -6,7 +6,6 @@ import ( cmdACLInit "github.com/hashicorp/consul-k8s/control-plane/subcommand/acl-init" cmdConnectInit "github.com/hashicorp/consul-k8s/control-plane/subcommand/connect-init" cmdConsulLogout "github.com/hashicorp/consul-k8s/control-plane/subcommand/consul-logout" - cmdConsulSidecar "github.com/hashicorp/consul-k8s/control-plane/subcommand/consul-sidecar" cmdController "github.com/hashicorp/consul-k8s/control-plane/subcommand/controller" cmdCreateFederationSecret "github.com/hashicorp/consul-k8s/control-plane/subcommand/create-federation-secret" cmdDeleteCompletedJob "github.com/hashicorp/consul-k8s/control-plane/subcommand/delete-completed-job" @@ -43,10 +42,6 @@ func init() { return &cmdInjectConnect.Command{UI: ui}, nil }, - "consul-sidecar": func() (cli.Command, error) { - return &cmdConsulSidecar.Command{UI: ui}, nil - }, - "consul-logout": func() (cli.Command, error) { return &cmdConsulLogout.Command{UI: ui}, nil }, diff --git a/control-plane/connect-inject/annotations.go b/control-plane/connect-inject/annotations.go index be97189b40..8beaeb5ae7 100644 --- a/control-plane/connect-inject/annotations.go +++ b/control-plane/connect-inject/annotations.go @@ -115,12 +115,6 @@ const ( annotationSidecarProxyMemoryLimit = "consul.hashicorp.com/sidecar-proxy-memory-limit" annotationSidecarProxyMemoryRequest = "consul.hashicorp.com/sidecar-proxy-memory-request" - // annotations for consul sidecar resource limits. - annotationConsulSidecarCPULimit = "consul.hashicorp.com/consul-sidecar-cpu-limit" - annotationConsulSidecarCPURequest = "consul.hashicorp.com/consul-sidecar-cpu-request" - annotationConsulSidecarMemoryLimit = "consul.hashicorp.com/consul-sidecar-memory-limit" - annotationConsulSidecarMemoryRequest = "consul.hashicorp.com/consul-sidecar-memory-request" - // annotations for sidecar volumes. annotationConsulSidecarUserVolume = "consul.hashicorp.com/consul-sidecar-user-volume" annotationConsulSidecarUserVolumeMount = "consul.hashicorp.com/consul-sidecar-user-volume-mount" diff --git a/control-plane/connect-inject/consul_sidecar.go b/control-plane/connect-inject/consul_sidecar.go deleted file mode 100644 index a19eebb5ef..0000000000 --- a/control-plane/connect-inject/consul_sidecar.go +++ /dev/null @@ -1,115 +0,0 @@ -package connectinject - -import ( - "fmt" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" -) - -// consulSidecar starts the consul-sidecar command to only run -// the metrics merging server when metrics merging feature is enabled. -// It always disables service registration because for connect we no longer -// need to keep services registered as this is handled in the endpoints-controller. -func (w *MeshWebhook) consulSidecar(pod corev1.Pod) (corev1.Container, error) { - metricsPorts, err := w.MetricsConfig.mergedMetricsServerConfiguration(pod) - if err != nil { - return corev1.Container{}, err - } - - resources, err := w.consulSidecarResources(pod) - if err != nil { - return corev1.Container{}, err - } - - command := []string{ - "consul-k8s-control-plane", - "consul-sidecar", - "-enable-service-registration=false", - "-enable-metrics-merging=true", - fmt.Sprintf("-merged-metrics-port=%s", metricsPorts.mergedPort), - fmt.Sprintf("-service-metrics-port=%s", metricsPorts.servicePort), - fmt.Sprintf("-service-metrics-path=%s", metricsPorts.servicePath), - fmt.Sprintf("-log-level=%s", w.LogLevel), - fmt.Sprintf("-log-json=%t", w.LogJSON), - } - - return corev1.Container{ - Name: "consul-sidecar", - Image: w.ImageConsulK8S, - VolumeMounts: []corev1.VolumeMount{ - { - Name: volumeName, - MountPath: "/consul/connect-inject", - }, - }, - Command: command, - Resources: resources, - }, nil -} - -func (w *MeshWebhook) consulSidecarResources(pod corev1.Pod) (corev1.ResourceRequirements, error) { - resources := corev1.ResourceRequirements{ - Limits: corev1.ResourceList{}, - Requests: corev1.ResourceList{}, - } - // zeroQuantity is used for comparison to see if a quantity was explicitly - // set. - var zeroQuantity resource.Quantity - - // NOTE: We only want to set the limit/request if the default or annotation - // was explicitly set. If it's not explicitly set, it will be the zero value - // which would show up in the pod spec as being explicitly set to zero if we - // set that key, e.g. "cpu" to zero. - // We want it to not show up in the pod spec at all if if it's not explicitly - // set so that users aren't wondering why it's set to 0 when they didn't specify - // a request/limit. If they have explicitly set it to 0 then it will be set - // to 0 in the pod spec because we're doing a comparison to the zero-valued - // struct. - - // CPU Limit. - if anno, ok := pod.Annotations[annotationConsulSidecarCPULimit]; ok { - cpuLimit, err := resource.ParseQuantity(anno) - if err != nil { - return corev1.ResourceRequirements{}, fmt.Errorf("parsing annotation %s:%q: %s", annotationConsulSidecarCPULimit, anno, err) - } - resources.Limits[corev1.ResourceCPU] = cpuLimit - } else if w.DefaultConsulSidecarResources.Limits[corev1.ResourceCPU] != zeroQuantity { - resources.Limits[corev1.ResourceCPU] = w.DefaultConsulSidecarResources.Limits[corev1.ResourceCPU] - } - - // CPU Request. - if anno, ok := pod.Annotations[annotationConsulSidecarCPURequest]; ok { - cpuRequest, err := resource.ParseQuantity(anno) - if err != nil { - return corev1.ResourceRequirements{}, fmt.Errorf("parsing annotation %s:%q: %s", annotationConsulSidecarCPURequest, anno, err) - } - resources.Requests[corev1.ResourceCPU] = cpuRequest - } else if w.DefaultConsulSidecarResources.Requests[corev1.ResourceCPU] != zeroQuantity { - resources.Requests[corev1.ResourceCPU] = w.DefaultConsulSidecarResources.Requests[corev1.ResourceCPU] - } - - // Memory Limit. - if anno, ok := pod.Annotations[annotationConsulSidecarMemoryLimit]; ok { - memoryLimit, err := resource.ParseQuantity(anno) - if err != nil { - return corev1.ResourceRequirements{}, fmt.Errorf("parsing annotation %s:%q: %s", annotationConsulSidecarMemoryLimit, anno, err) - } - resources.Limits[corev1.ResourceMemory] = memoryLimit - } else if w.DefaultConsulSidecarResources.Limits[corev1.ResourceMemory] != zeroQuantity { - resources.Limits[corev1.ResourceMemory] = w.DefaultConsulSidecarResources.Limits[corev1.ResourceMemory] - } - - // Memory Request. - if anno, ok := pod.Annotations[annotationConsulSidecarMemoryRequest]; ok { - memoryRequest, err := resource.ParseQuantity(anno) - if err != nil { - return corev1.ResourceRequirements{}, fmt.Errorf("parsing annotation %s:%q: %s", annotationConsulSidecarMemoryRequest, anno, err) - } - resources.Requests[corev1.ResourceMemory] = memoryRequest - } else if w.DefaultConsulSidecarResources.Requests[corev1.ResourceMemory] != zeroQuantity { - resources.Requests[corev1.ResourceMemory] = w.DefaultConsulSidecarResources.Requests[corev1.ResourceMemory] - } - - return resources, nil -} diff --git a/control-plane/connect-inject/consul_sidecar_test.go b/control-plane/connect-inject/consul_sidecar_test.go deleted file mode 100644 index bafaad104a..0000000000 --- a/control-plane/connect-inject/consul_sidecar_test.go +++ /dev/null @@ -1,343 +0,0 @@ -package connectinject - -import ( - "testing" - - logrtest "github.com/go-logr/logr/testing" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -// Test that if the conditions for running a merged metrics server are true, -// that we pass the metrics flags to consul sidecar. -func TestConsulSidecar_MetricsFlags(t *testing.T) { - meshWebhook := MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - } - container, err := meshWebhook.consulSidecar(corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - }, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "web", - }, - }, - }, - }) - - require.NoError(t, err) - require.Contains(t, container.Command, "-enable-metrics-merging=true") - require.Contains(t, container.Command, "-merged-metrics-port=20100") - require.Contains(t, container.Command, "-service-metrics-port=8080") - require.Contains(t, container.Command, "-service-metrics-path=/metrics") -} - -func TestHandlerConsulSidecar_Resources(t *testing.T) { - mem1 := resource.MustParse("100Mi") - mem2 := resource.MustParse("200Mi") - cpu1 := resource.MustParse("100m") - cpu2 := resource.MustParse("200m") - zero := resource.MustParse("0") - - cases := map[string]struct { - meshWebhook MeshWebhook - annotations map[string]string - expResources corev1.ResourceRequirements - expErr string - }{ - "no defaults, no annotations": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - }, - expResources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{}, - Requests: corev1.ResourceList{}, - }, - }, - "all defaults, no annotations": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - DefaultConsulSidecarResources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: cpu1, - corev1.ResourceMemory: mem1, - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: cpu2, - corev1.ResourceMemory: mem2, - }, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - }, - expResources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: cpu2, - corev1.ResourceMemory: mem2, - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: cpu1, - corev1.ResourceMemory: mem1, - }, - }, - }, - "no defaults, all annotations": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - annotationConsulSidecarCPURequest: "100m", - annotationConsulSidecarMemoryRequest: "100Mi", - annotationConsulSidecarCPULimit: "200m", - annotationConsulSidecarMemoryLimit: "200Mi", - }, - expResources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: cpu2, - corev1.ResourceMemory: mem2, - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: cpu1, - corev1.ResourceMemory: mem1, - }, - }, - }, - "annotations override defaults": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - DefaultConsulSidecarResources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: zero, - corev1.ResourceMemory: zero, - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: zero, - corev1.ResourceMemory: zero, - }, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - annotationConsulSidecarCPURequest: "100m", - annotationConsulSidecarMemoryRequest: "100Mi", - annotationConsulSidecarCPULimit: "200m", - annotationConsulSidecarMemoryLimit: "200Mi", - }, - expResources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: cpu2, - corev1.ResourceMemory: mem2, - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: cpu1, - corev1.ResourceMemory: mem1, - }, - }, - }, - "defaults set to zero, no annotations": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - DefaultConsulSidecarResources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: zero, - corev1.ResourceMemory: zero, - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: zero, - corev1.ResourceMemory: zero, - }, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - }, - expResources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: zero, - corev1.ResourceMemory: zero, - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: zero, - corev1.ResourceMemory: zero, - }, - }, - }, - "annotations set to 0": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - annotationConsulSidecarCPURequest: "0", - annotationConsulSidecarMemoryRequest: "0", - annotationConsulSidecarCPULimit: "0", - annotationConsulSidecarMemoryLimit: "0", - }, - expResources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: zero, - corev1.ResourceMemory: zero, - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: zero, - corev1.ResourceMemory: zero, - }, - }, - }, - "invalid cpu request": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - annotationConsulSidecarCPURequest: "invalid", - }, - expErr: "parsing annotation consul.hashicorp.com/consul-sidecar-cpu-request:\"invalid\": quantities must match the regular expression", - }, - "invalid cpu limit": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - annotationConsulSidecarCPULimit: "invalid", - }, - expErr: "parsing annotation consul.hashicorp.com/consul-sidecar-cpu-limit:\"invalid\": quantities must match the regular expression", - }, - "invalid memory request": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - annotationConsulSidecarMemoryRequest: "invalid", - }, - expErr: "parsing annotation consul.hashicorp.com/consul-sidecar-memory-request:\"invalid\": quantities must match the regular expression", - }, - "invalid memory limit": { - meshWebhook: MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - ImageConsulK8S: "hashicorp/consul-k8s:9.9.9", - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - }, - annotations: map[string]string{ - annotationMergedMetricsPort: "20100", - annotationServiceMetricsPort: "8080", - annotationServiceMetricsPath: "/metrics", - annotationConsulSidecarMemoryLimit: "invalid", - }, - expErr: "parsing annotation consul.hashicorp.com/consul-sidecar-memory-limit:\"invalid\": quantities must match the regular expression", - }, - } - - for name, c := range cases { - t.Run(name, func(tt *testing.T) { - require := require.New(tt) - pod := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: c.annotations, - }, - - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "web", - }, - }, - }, - } - container, err := c.meshWebhook.consulSidecar(pod) - if c.expErr != "" { - require.NotNil(err) - require.Contains(err.Error(), c.expErr) - } else { - require.NoError(err) - require.Equal(c.expResources, container.Resources) - } - }) - } -} diff --git a/control-plane/connect-inject/container_init.go b/control-plane/connect-inject/container_init.go index 903bbc9fd9..9e7726067d 100644 --- a/control-plane/connect-inject/container_init.go +++ b/control-plane/connect-inject/container_init.go @@ -186,50 +186,6 @@ func (w *MeshWebhook) containerInit(namespace corev1.Namespace, pod corev1.Pod, volMounts = append(volMounts, saTokenVolumeMount) } - // todo (agentless): this needs to be configured in consul-dataplane once it supports telemetry - //// This determines how to configure the consul connect envoy command: what - //// metrics backend to use and what path to expose on the - //// envoy_prometheus_bind_addr listener for scraping. - //metricsServer, err := w.MetricsConfig.shouldRunMergedMetricsServer(pod) - //if err != nil { - // return corev1.Container{}, err - //} - //if metricsServer { - // prometheusScrapePath := w.MetricsConfig.prometheusScrapePath(pod) - // mergedMetricsPort, err := w.MetricsConfig.mergedMetricsPort(pod) - // if err != nil { - // return corev1.Container{}, err - // } - // data.PrometheusScrapePath = prometheusScrapePath - // data.PrometheusBackendPort = mergedMetricsPort - //} - //// Pull the TLS config from the relevant annotations. - //if raw, ok := pod.Annotations[annotationPrometheusCAFile]; ok && raw != "" { - // data.PrometheusCAFile = raw - //} - //if raw, ok := pod.Annotations[annotationPrometheusCAPath]; ok && raw != "" { - // data.PrometheusCAPath = raw - //} - //if raw, ok := pod.Annotations[annotationPrometheusCertFile]; ok && raw != "" { - // data.PrometheusCertFile = raw - //} - //if raw, ok := pod.Annotations[annotationPrometheusKeyFile]; ok && raw != "" { - // data.PrometheusKeyFile = raw - //} - // - //// Validate required Prometheus TLS config is present if set. - //if data.PrometheusCertFile != "" || data.PrometheusKeyFile != "" || data.PrometheusCAFile != "" || data.PrometheusCAPath != "" { - // if data.PrometheusCAFile == "" && data.PrometheusCAPath == "" { - // return corev1.Container{}, fmt.Errorf("must set one of %q or %q when providing prometheus TLS config", annotationPrometheusCAFile, annotationPrometheusCAPath) - // } - // if data.PrometheusCertFile == "" { - // return corev1.Container{}, fmt.Errorf("must set %q when providing prometheus TLS config", annotationPrometheusCertFile) - // } - // if data.PrometheusKeyFile == "" { - // return corev1.Container{}, fmt.Errorf("must set %q when providing prometheus TLS config", annotationPrometheusKeyFile) - // } - //} - // Render the command var buf bytes.Buffer tpl := template.Must(template.New("root").Parse(strings.TrimSpace( diff --git a/control-plane/connect-inject/mesh_webhook.go b/control-plane/connect-inject/mesh_webhook.go index b91a6866ba..5aef7611e2 100644 --- a/control-plane/connect-inject/mesh_webhook.go +++ b/control-plane/connect-inject/mesh_webhook.go @@ -362,26 +362,6 @@ func (w *MeshWebhook) Handle(ctx context.Context, req admission.Request) admissi } } - // Now that the consul-sidecar no longer needs to re-register services periodically - // (that functionality lives in the endpoints-controller), - // we only need the consul sidecar to run the metrics merging server. - // First, determine if we need to run the metrics merging server. - shouldRunMetricsMerging, err := w.MetricsConfig.shouldRunMergedMetricsServer(pod) - if err != nil { - w.Log.Error(err, "error determining if metrics merging server should be run", "request name", req.Name) - return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error determining if metrics merging server should be run: %s", err)) - } - - // Add the consul-sidecar only if we need to run the metrics merging server. - if shouldRunMetricsMerging { - consulSidecar, err := w.consulSidecar(pod) - if err != nil { - w.Log.Error(err, "error configuring consul sidecar container", "request name", req.Name) - return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring consul sidecar container: %s", err)) - } - pod.Spec.Containers = append(pod.Spec.Containers, consulSidecar) - } - // pod.Annotations has already been initialized by h.defaultAnnotations() // and does not need to be checked for being a nil value. pod.Annotations[keyInjectStatus] = injected diff --git a/control-plane/connect-inject/mesh_webhook_test.go b/control-plane/connect-inject/mesh_webhook_test.go index 1a31dbcb04..7726d3e1db 100644 --- a/control-plane/connect-inject/mesh_webhook_test.go +++ b/control-plane/connect-inject/mesh_webhook_test.go @@ -573,85 +573,6 @@ func TestHandlerHandle(t *testing.T) { }, }, }, - - { - "when metrics merging is enabled, we should inject the consul-sidecar and add prometheus annotations", - MeshWebhook{ - Log: logrtest.TestLogger{T: t}, - AllowK8sNamespacesSet: mapset.NewSetWith("*"), - DenyK8sNamespacesSet: mapset.NewSet(), - MetricsConfig: MetricsConfig{ - DefaultEnableMetrics: true, - DefaultEnableMetricsMerging: true, - }, - decoder: decoder, - Clientset: defaultTestClientWithNamespace(), - }, - admission.Request{ - AdmissionRequest: admissionv1.AdmissionRequest{ - Namespace: namespaces.DefaultNamespace, - Object: encodeRaw(t, &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "testLabel": "123", - }, - Annotations: map[string]string{ - annotationServiceMetricsPort: "1234", - }, - }, - Spec: basicSpec, - }), - }, - }, - "", - []jsonpatch.Operation{ - { - Operation: "add", - Path: "/spec/volumes", - }, - { - Operation: "add", - Path: "/spec/initContainers", - }, - { - Operation: "add", - Path: "/spec/containers/1", - }, - { - Operation: "add", - Path: "/spec/containers/2", - }, - { - Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(keyInjectStatus), - }, - { - Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationOriginalPod), - }, - { - Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationPrometheusScrape), - }, - { - Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationPrometheusPath), - }, - { - Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationPrometheusPort), - }, - { - Operation: "add", - Path: "/metadata/labels/" + escapeJSONPointer(keyInjectStatus), - }, - { - Operation: "add", - Path: "/metadata/labels/" + escapeJSONPointer(keyManagedBy), - }, - }, - }, - { "tproxy with overwriteProbes is enabled", MeshWebhook{ diff --git a/control-plane/subcommand/consul-sidecar/command.go b/control-plane/subcommand/consul-sidecar/command.go deleted file mode 100644 index 6ea77f635c..0000000000 --- a/control-plane/subcommand/consul-sidecar/command.go +++ /dev/null @@ -1,427 +0,0 @@ -package consulsidecar - -import ( - "context" - "errors" - "flag" - "fmt" - "io" - "net/http" - "os" - "os/exec" - "os/signal" - "strings" - "sync" - "syscall" - "time" - - "github.com/hashicorp/consul-k8s/control-plane/subcommand/common" - "github.com/hashicorp/consul-k8s/control-plane/subcommand/flags" - "github.com/hashicorp/go-hclog" - "github.com/mitchellh/cli" -) - -const ( - metricsServerShutdownTimeout = 5 * time.Second - envoyMetricsAddr = "http://127.0.0.1:19000/stats/prometheus" - // prometheusServiceMetricsSuccessKey is the key of the prometheus metric used to - // indicate if service metrics were scraped successfully. - prometheusServiceMetricsSuccessKey = "consul_merged_service_metrics_success" -) - -type Command struct { - UI cli.Ui - - http *flags.HTTPFlags - flagEnableServiceRegistration bool - flagServiceConfig string - flagConsulBinary string - flagSyncPeriod time.Duration - flagSet *flag.FlagSet - flagLogLevel string - flagLogJSON bool - - // Flags to configure metrics merging - flagEnableMetricsMerging bool - flagMergedMetricsPort string - flagServiceMetricsPort string - flagServiceMetricsPath string - - envoyMetricsGetter metricsGetter - serviceMetricsGetter metricsGetter - - consulCommand []string - - logger hclog.Logger - once sync.Once - help string - sigCh chan os.Signal -} - -// metricsGetter abstracts the function of retrieving metrics. It is used to -// enable easier unit testing. -type metricsGetter interface { - Get(url string) (resp *http.Response, err error) -} - -func (c *Command) init() { - c.flagSet = flag.NewFlagSet("", flag.ContinueOnError) - c.flagSet.BoolVar(&c.flagEnableServiceRegistration, "enable-service-registration", true, "Enables consul sidecar to register the service with consul every sync period. Defaults to true.") - c.flagSet.StringVar(&c.flagServiceConfig, "service-config", "", "Path to the service config file") - c.flagSet.StringVar(&c.flagConsulBinary, "consul-binary", "consul", "Path to a consul binary") - c.flagSet.DurationVar(&c.flagSyncPeriod, "sync-period", 10*time.Second, "Time between syncing the service registration. Defaults to 10s.") - c.flagSet.StringVar(&c.flagLogLevel, "log-level", "info", - "Log verbosity level. Supported values (in order of detail) are \"trace\", "+ - "\"debug\", \"info\", \"warn\", and \"error\". Defaults to info.") - c.flagSet.BoolVar(&c.flagLogJSON, "log-json", false, - "Enable or disable JSON output format for logging.") - - c.flagSet.BoolVar(&c.flagEnableMetricsMerging, "enable-metrics-merging", false, "Enables consul sidecar to run a merged metrics endpoint. Defaults to false.") - // -merged-metrics-port, -service-metrics-port, and -service-metrics-path - // are only used if metrics merging is enabled. -merged-metrics-port and - // -service-metrics-path have defaults, and -service-metrics-port is - // expected to be set by the connect-inject handler to a valid value. The - // connect-inject handler will only enable metrics merging in the consul - // sidecar if it finds a service metrics port greater than 0. - c.flagSet.StringVar(&c.flagMergedMetricsPort, "merged-metrics-port", "20100", "Port to serve merged Envoy and application metrics. Defaults to 20100.") - c.flagSet.StringVar(&c.flagServiceMetricsPort, "service-metrics-port", "0", "Port where application metrics are being served. Defaults to 0.") - c.flagSet.StringVar(&c.flagServiceMetricsPath, "service-metrics-path", "/metrics", "Path where application metrics are being served. Defaults to /metrics.") - c.help = flags.Usage(help, c.flagSet) - c.http = &flags.HTTPFlags{} - flags.Merge(c.flagSet, c.http.Flags()) - c.help = flags.Usage(help, c.flagSet) - - // Wait on an interrupt or terminate to exit. This channel must be initialized before - // Run() is called so that there are no race conditions where the channel - // is not defined. - if c.sigCh == nil { - c.sigCh = make(chan os.Signal, 1) - signal.Notify(c.sigCh, syscall.SIGINT, syscall.SIGTERM) - } -} - -// Run continually re-registers the service with Consul. -// This is needed because if the Consul Client pod is restarted, it loses all -// its service registrations. -// This command expects to be run as a sidecar and to be injected by the -// mutating webhook. -func (c *Command) Run(args []string) int { - c.once.Do(c.init) - if err := c.flagSet.Parse(args); err != nil { - return 1 - } - - err := c.validateFlags() - if err != nil { - c.UI.Error("Error: " + err.Error()) - return 1 - } - - logger, err := common.Logger(c.flagLogLevel, c.flagLogJSON) - if err != nil { - c.UI.Error(err.Error()) - return 1 - } - c.logger = logger - - // Log initial configuration - c.logger.Info("Command configuration", "enable-service-registration", c.flagEnableServiceRegistration, - "service-config", c.flagServiceConfig, - "consul-binary", c.flagConsulBinary, - "sync-period", c.flagSyncPeriod, - "log-level", c.flagLogLevel, - "enable-metrics-merging", c.flagEnableMetricsMerging, - "merged-metrics-port", c.flagMergedMetricsPort, - "service-metrics-port", c.flagServiceMetricsPort, - "service-metrics-path", c.flagServiceMetricsPath, - ) - - // signalCtx that we pass in to the main work loop, signal handling is handled in another thread - // due to the length of time it can take for the cmd to complete causing synchronization issues - // on shutdown. Also passing a context in so that it can interrupt the cmd and exit cleanly. - signalCtx, cancelFunc := context.WithCancel(context.Background()) - go func() { - sig := <-c.sigCh - c.logger.Info(fmt.Sprintf("%s received, shutting down", sig)) - cancelFunc() - }() - - // If metrics merging is enabled, run a merged metrics server in a goroutine - // that serves Envoy sidecar metrics and Connect service metrics. The merged - // metrics server will be shut down when a signal is received by the main - // for loop using shutdownMetricsServer(). - var server *http.Server - srvExitCh := make(chan error) - if c.flagEnableMetricsMerging { - c.logger.Info("Metrics is enabled, creating merged metrics server.") - server = c.createMergedMetricsServer() - - // Run the merged metrics server. - c.logger.Info("Running merged metrics server.") - go func() { - if err = server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - srvExitCh <- err - } - }() - } - - // The work loop for re-registering the service. We continually re-register - // our service every syncPeriod. Consul is smart enough to know when the - // service hasn't changed and so won't update any indices. This means we - // won't be causing a lot of traffic within the cluster. We tolerate Consul - // Clients going down and will simply re-register once it's back up. - if c.flagEnableServiceRegistration { - c.consulCommand = []string{"services", "register"} - c.consulCommand = append(c.consulCommand, c.parseConsulFlags()...) - c.consulCommand = append(c.consulCommand, c.flagServiceConfig) - - go func() { - for { - start := time.Now() - cmd := exec.CommandContext(signalCtx, c.flagConsulBinary, c.consulCommand...) - - // Run the command and record the stdout and stderr output. - output, err := cmd.CombinedOutput() - if err != nil { - c.logger.Error("failed to sync service", "output", strings.TrimSpace(string(output)), "err", err, "duration", time.Since(start)) - } else { - c.logger.Info("successfully synced service", "output", strings.TrimSpace(string(output)), "duration", time.Since(start)) - } - select { - // Re-loop after syncPeriod or exit if we receive interrupt or terminate signals. - case <-time.After(c.flagSyncPeriod): - continue - case <-signalCtx.Done(): - return - } - } - }() - } - - // Block and wait for a signal or for the metrics server to exit. - select { - case <-signalCtx.Done(): - // After the signal is received, wait for the merged metrics server - // to gracefully shutdown as well if it has been enabled. This can - // take up to metricsServerShutdownTimeout seconds. - if c.flagEnableMetricsMerging { - c.logger.Info("Attempting to shut down metrics server.") - c.shutdownMetricsServer(server) - } - return 0 - case err := <-srvExitCh: - c.logger.Error(fmt.Sprintf("Metrics server error: %v", err)) - return 1 - } - -} - -// shutdownMetricsServer handles gracefully shutting down the server. This will -// call server.Shutdown(), which will indefinitely wait for connections to turn -// idle. To avoid potentially waiting forever, we pass a context to -// server.Shutdown() that will timeout in metricsServerShutdownTimeout (5) seconds. -func (c *Command) shutdownMetricsServer(server *http.Server) { - // The shutdownCancelFunc will be unused since it is unnecessary to call it as we - // are already about to call shutdown with a timeout. We'd only need to - // shutdownCancelFunc if we needed to trigger something to happen when the - // shutdownCancelFunc is called, which we do not. The reason for not - // discarding it with _ is for the go vet check. - shutdownCtx, shutdownCancelFunc := context.WithTimeout(context.Background(), metricsServerShutdownTimeout) - defer shutdownCancelFunc() - - c.logger.Info("Merged metrics server exists, attempting to gracefully shut down server") - if err := server.Shutdown(shutdownCtx); err != nil { - c.logger.Error(fmt.Sprintf("Server shutdown failed: %s", err)) - return - } - c.logger.Info("Server has been shut down") -} - -// createMergedMetricsServer sets up the merged metrics server. -func (c *Command) createMergedMetricsServer() *http.Server { - mux := http.NewServeMux() - mux.HandleFunc("/stats/prometheus", c.mergedMetricsHandler) - - mergedMetricsServerAddr := fmt.Sprintf("127.0.0.1:%s", c.flagMergedMetricsPort) - server := &http.Server{Addr: mergedMetricsServerAddr, Handler: mux} - - // http.Client satisfies the metricsGetter interface. - // The default http.Client timeout is indefinite, so adding a timeout makes - // sure that requests don't hang. - client := &http.Client{ - Timeout: time.Second * 10, - } - - // During tests these may already be set to mocks. - if c.envoyMetricsGetter == nil { - c.envoyMetricsGetter = client - } - if c.serviceMetricsGetter == nil { - c.serviceMetricsGetter = client - } - - return server -} - -// mergedMetricsHandler has the logic to append both Envoy and service metrics -// together, logging if it's unsuccessful at either. -// If the Envoy scrape fails, we respond with a 500 code which follows the Prometheus -// exporter guidelines. If the service scrape fails, we respond with a 200 so -// that the Envoy metrics are still scraped. -// We also include a metric line in each response indicating the success or -// failure of the service metric scraping. -func (c *Command) mergedMetricsHandler(rw http.ResponseWriter, _ *http.Request) { - envoyMetrics, err := c.envoyMetricsGetter.Get(envoyMetricsAddr) - if err != nil { - c.logger.Error("Error scraping Envoy proxy metrics", "err", err) - http.Error(rw, fmt.Sprintf("Error scraping Envoy proxy metrics: %s", err), http.StatusInternalServerError) - return - } - - // Write Envoy metrics to the response. - defer func() { - err = envoyMetrics.Body.Close() - if err != nil { - c.logger.Error(fmt.Sprintf("Error closing envoy metrics body: %s", err.Error())) - } - }() - envoyMetricsBody, err := io.ReadAll(envoyMetrics.Body) - if err != nil { - c.logger.Error("Could not read Envoy proxy metrics", "err", err) - http.Error(rw, fmt.Sprintf("Could not read Envoy proxy metrics: %s", err), http.StatusInternalServerError) - return - } - if non2xxCode(envoyMetrics.StatusCode) { - c.logger.Error("Received non-2xx status code scraping Envoy proxy metrics", "code", envoyMetrics.StatusCode, "response", string(envoyMetricsBody)) - http.Error(rw, fmt.Sprintf("Received non-2xx status code scraping Envoy proxy metrics: %d: %s", envoyMetrics.StatusCode, string(envoyMetricsBody)), http.StatusInternalServerError) - return - } - writeResponse(rw, envoyMetricsBody, "envoy metrics", c.logger) - - serviceMetricsAddr := fmt.Sprintf("http://127.0.0.1:%s%s", c.flagServiceMetricsPort, c.flagServiceMetricsPath) - serviceMetrics, err := c.serviceMetricsGetter.Get(serviceMetricsAddr) - if err != nil { - c.logger.Warn("Error scraping service metrics", "err", err) - writeResponse(rw, serviceMetricSuccess(false), "service metrics success", c.logger) - // Since we've already written the Envoy metrics to the response, we can - // return at this point if we were unable to get service metrics. - return - } - - // Since serviceMetrics will be non-nil if there are no errors, write the - // service metrics to the response as well. - defer func() { - err = serviceMetrics.Body.Close() - if err != nil { - c.logger.Error(fmt.Sprintf("Error closing service metrics body: %s", err.Error())) - } - }() - serviceMetricsBody, err := io.ReadAll(serviceMetrics.Body) - if err != nil { - c.logger.Error("Could not read service metrics", "err", err) - writeResponse(rw, serviceMetricSuccess(false), "service metrics success", c.logger) - return - } - if non2xxCode(serviceMetrics.StatusCode) { - c.logger.Error("Received non-2xx status code scraping service metrics", "code", serviceMetrics.StatusCode, "response", string(serviceMetricsBody)) - writeResponse(rw, serviceMetricSuccess(false), "service metrics success", c.logger) - return - } - writeResponse(rw, serviceMetricsBody, "service metrics", c.logger) - writeResponse(rw, serviceMetricSuccess(true), "service metrics success", c.logger) -} - -// writeResponse is a helper method to write resp to rw and log if there is an error writing. -// respName is the name of this response that will be used in the error log. -func writeResponse(rw http.ResponseWriter, resp []byte, respName string, logger hclog.Logger) { - _, err := rw.Write(resp) - if err != nil { - logger.Error(fmt.Sprintf("Error writing %s: %s", respName, err.Error())) - } -} - -// validateFlags validates the flags. -func (c *Command) validateFlags() error { - if !c.flagEnableServiceRegistration && !c.flagEnableMetricsMerging { - return errors.New("at least one of -enable-service-registration or -enable-metrics-merging must be true") - } - if c.flagEnableServiceRegistration { - if c.flagSyncPeriod == 0 { - // if sync period is 0, then the select loop will - // always pick the first case, and it'll be impossible - // to terminate the command gracefully with SIGINT. - return errors.New("-sync-period must be greater than 0") - } - if c.flagServiceConfig == "" { - return errors.New("-service-config must be set") - } - if c.flagConsulBinary == "" { - return errors.New("-consul-binary must be set") - } - if c.http.ConsulAPITimeout() <= 0 { - return errors.New("-consul-api-timeout must be set to a value greater than 0") - } - _, err := os.Stat(c.flagServiceConfig) - if os.IsNotExist(err) { - return fmt.Errorf("-service-config file %q not found", c.flagServiceConfig) - } - _, err = exec.LookPath(c.flagConsulBinary) - if err != nil { - return fmt.Errorf("-consul-binary %q not found: %s", c.flagConsulBinary, err) - } - } - return nil -} - -// non2xxCode returns true if code is not in the range of 200-299 inclusive. -func non2xxCode(code int) bool { - return code < 200 || code >= 300 -} - -// serviceMetricSuccess returns a prometheus metric line indicating -// the success of the metrics merging. -func serviceMetricSuccess(success bool) []byte { - boolAsInt := 0 - if success { - boolAsInt = 1 - } - return []byte(fmt.Sprintf("%s %d\n", prometheusServiceMetricsSuccessKey, boolAsInt)) -} - -// parseConsulFlags creates Consul client command flags -// from command's HTTP flags and returns them as an array of strings. -func (c *Command) parseConsulFlags() []string { - var consulCommandFlags []string - c.http.Flags().VisitAll(func(f *flag.Flag) { - // not adding -consul-api-timeout since consul does not use this flag - if f.Value.String() != "" && f.Name != "consul-api-timeout" { - consulCommandFlags = append(consulCommandFlags, fmt.Sprintf("-%s=%s", f.Name, f.Value.String())) - } - }) - return consulCommandFlags -} - -// interrupt sends os.Interrupt signal to the command -// so it can exit gracefully. This function is needed for tests. -func (c *Command) interrupt() { - c.sendSignal(syscall.SIGINT) -} - -func (c *Command) sendSignal(sig os.Signal) { - c.sigCh <- sig -} - -func (c *Command) Synopsis() string { return synopsis } -func (c *Command) Help() string { - c.once.Do(c.init) - return c.help -} - -const synopsis = "Consul sidecar for Connect." -const help = ` -Usage: consul-k8s-control-plane consul-sidecar [options] - - Run as a sidecar to your Connect service. Ensures that your service - is registered with the local Consul client. - -` diff --git a/control-plane/subcommand/consul-sidecar/command_ent_test.go b/control-plane/subcommand/consul-sidecar/command_ent_test.go deleted file mode 100644 index d3a198d59a..0000000000 --- a/control-plane/subcommand/consul-sidecar/command_ent_test.go +++ /dev/null @@ -1,90 +0,0 @@ -//go:build enterprise - -package consulsidecar - -import ( - "os" - "testing" - "time" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/mitchellh/cli" - "github.com/stretchr/testify/require" -) - -// Test that we register the services with namespaces. -func TestRun_ServicesRegistration_Namespaces(t *testing.T) { - t.Parallel() - tmpDir, configFile := createServicesTmpFile(t, servicesRegistrationWithNamespaces) - defer os.RemoveAll(tmpDir) - - a, err := testutil.NewTestServerConfigT(t, nil) - require.NoError(t, err) - defer a.Stop() - - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - - // Run async because we need to kill it when the test is over. - exitChan := runCommandAsynchronously(&cmd, []string{ - "-http-addr", a.HTTPAddr, - "-service-config", configFile, - "-sync-period", "100ms", - "-consul-api-timeout", "5s", - }) - defer stopCommand(t, &cmd, exitChan) - - client, err := api.NewClient(&api.Config{ - Address: a.HTTPAddr, - }) - require.NoError(t, err) - - // create necessary namespaces first - _, _, err = client.Namespaces().Create(&api.Namespace{Name: "namespace"}, nil) - require.NoError(t, err) - - timer := &retry.Timer{Timeout: 1 * time.Second, Wait: 100 * time.Millisecond} - retry.RunWith(timer, t, func(r *retry.R) { - svc, _, err := client.Agent().Service("service-id", &api.QueryOptions{Namespace: "namespace"}) - require.NoError(r, err) - require.Equal(r, 80, svc.Port) - require.Equal(r, "namespace", svc.Namespace) - - svcProxy, _, err := client.Agent().Service("service-id-sidecar-proxy", &api.QueryOptions{Namespace: "namespace"}) - require.NoError(r, err) - require.Equal(r, 2000, svcProxy.Port) - require.Equal(r, svcProxy.Namespace, "namespace") - require.Len(r, svcProxy.Proxy.Upstreams, 1) - require.Equal(r, svcProxy.Proxy.Upstreams[0].DestinationNamespace, "dest-namespace") - }) -} - -const servicesRegistrationWithNamespaces = ` -services { - id = "service-id" - name = "service" - port = 80 - namespace = "namespace" -} -services { - id = "service-id-sidecar-proxy" - name = "service-sidecar-proxy" - namespace = "namespace" - port = 2000 - kind = "connect-proxy" - proxy { - destination_service_name = "service" - destination_service_id = "service-id" - local_service_port = 80 - upstreams { - destination_type = "service" - destination_name = "dest-name" - destination_namespace = "dest-namespace" - local_bind_port = 1234 - } - } -}` diff --git a/control-plane/subcommand/consul-sidecar/command_test.go b/control-plane/subcommand/consul-sidecar/command_test.go deleted file mode 100644 index cd2d024ec5..0000000000 --- a/control-plane/subcommand/consul-sidecar/command_test.go +++ /dev/null @@ -1,643 +0,0 @@ -package consulsidecar - -import ( - "bytes" - "fmt" - "io" - - "net" - "net/http" - "os" - "path/filepath" - "syscall" - "testing" - "time" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/freeport" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/hashicorp/go-hclog" - "github.com/mitchellh/cli" - "github.com/stretchr/testify/require" -) - -func TestRun_Defaults(t *testing.T) { - t.Parallel() - var cmd Command - cmd.init() - require.Equal(t, 10*time.Second, cmd.flagSyncPeriod) - require.Equal(t, "info", cmd.flagLogLevel) - require.Equal(t, "consul", cmd.flagConsulBinary) -} - -func TestRunSignalHandlingRegistrationOnly(t *testing.T) { - cases := map[string]os.Signal{ - "SIGINT": syscall.SIGINT, - "SIGTERM": syscall.SIGTERM, - } - for name, signal := range cases { - t.Run(name, func(t *testing.T) { - - tmpDir, configFile := createServicesTmpFile(t, servicesRegistration) - defer os.RemoveAll(tmpDir) - - a, err := testutil.NewTestServerConfigT(t, nil) - require.NoError(t, err) - defer a.Stop() - - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - - client, err := api.NewClient(&api.Config{ - Address: a.HTTPAddr, - }) - require.NoError(t, err) - // Run async because we need to kill it when the test is over. - exitChan := runCommandAsynchronously(&cmd, []string{ - "-service-config", configFile, - "-http-addr", a.HTTPAddr, - "-sync-period", "1s", - "-consul-api-timeout", "5s", - }) - cmd.sendSignal(signal) - - // Assert that it exits cleanly or timeout. - select { - case exitCode := <-exitChan: - require.Equal(t, 0, exitCode, ui.ErrorWriter.String()) - case <-time.After(time.Second * 1): - // Fail if the signal was not caught. - require.Fail(t, "timeout waiting for command to exit") - } - // Assert that the services were not created because the cmd has exited. - _, _, err = client.Agent().Service("service-id", nil) - require.Error(t, err) - _, _, err = client.Agent().Service("service-id-sidecar-proxy", nil) - require.Error(t, err) - }) - } -} - -func TestRunSignalHandlingMetricsOnly(t *testing.T) { - cases := map[string]os.Signal{ - "SIGINT": syscall.SIGINT, - "SIGTERM": syscall.SIGTERM, - } - for name, signal := range cases { - t.Run(name, func(t *testing.T) { - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - - randomPorts := freeport.GetN(t, 1) - // Run async because we need to kill it when the test is over. - exitChan := runCommandAsynchronously(&cmd, []string{ - "-enable-service-registration=false", - "-enable-metrics-merging=true", - "-merged-metrics-port", fmt.Sprint(randomPorts[0]), - "-service-metrics-port", "8080", - "-service-metrics-path", "/metrics", - "-consul-api-timeout", "5s", - }) - - // Keep an open connection to the server by continuously sending bytes - // on the connection so it will have to be drained. - var conn net.Conn - var err error - retry.Run(t, func(r *retry.R) { - conn, err = net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", randomPorts[0])) - if err != nil { - require.NoError(r, err) - } - }) - go func() { - for { - _, err := conn.Write([]byte("hello")) - // Once the server has been shut down there will be an error writing to that connection. So, this - // will break out of the for loop and the goroutine will exit (and be cleaned up). - if err != nil { - break - } - } - }() - - // Send a signal to consul-sidecar. The merged metrics server can take - // up to metricsServerShutdownTimeout to finish cleaning up. - cmd.sendSignal(signal) - - // Will need to wait for slightly longer than the shutdown timeout to - // make sure that the command has exited shortly after the timeout. - waitForShutdown := metricsServerShutdownTimeout + 100*time.Millisecond - - // Assert that it exits cleanly or timeout. - select { - case exitCode := <-exitChan: - require.Equal(t, 0, exitCode, ui.ErrorWriter.String()) - case <-time.After(waitForShutdown): - // Fail if the signal was not caught. - require.Fail(t, "timeout waiting for command to exit") - } - }) - } -} - -func TestRunSignalHandlingAllProcessesEnabled(t *testing.T) { - cases := map[string]os.Signal{ - "SIGINT": syscall.SIGINT, - "SIGTERM": syscall.SIGTERM, - } - for name, signal := range cases { - t.Run(name, func(t *testing.T) { - tmpDir, configFile := createServicesTmpFile(t, servicesRegistration) - defer os.RemoveAll(tmpDir) - - a, err := testutil.NewTestServerConfigT(t, nil) - require.NoError(t, err) - defer a.Stop() - - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - - require.NoError(t, err) - - randomPorts := freeport.GetN(t, 1) - // Run async because we need to kill it when the test is over. - exitChan := runCommandAsynchronously(&cmd, []string{ - "-service-config", configFile, - "-http-addr", a.HTTPAddr, - "-enable-metrics-merging=true", - "-merged-metrics-port", fmt.Sprint(randomPorts[0]), - "-service-metrics-port", "8080", - "-service-metrics-path", "/metrics", - "-consul-api-timeout", "5s", - }) - - // Keep an open connection to the server by continuously sending bytes - // on the connection so it will have to be drained. - var conn net.Conn - retry.Run(t, func(r *retry.R) { - conn, err = net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", randomPorts[0])) - if err != nil { - require.NoError(r, err) - } - }) - go func() { - for { - _, err := conn.Write([]byte("hello")) - // Once the server has been shut down there will be an error writing to that connection. So, this - // will break out of the for loop and the goroutine will exit (and be cleaned up). - if err != nil { - break - } - } - }() - - // Send a signal to consul-sidecar. The merged metrics server can take - // up to metricsServerShutdownTimeout to finish cleaning up. - cmd.sendSignal(signal) - - // Will need to wait for slightly longer than the shutdown timeout to - // make sure that the command has exited shortly after the timeout. - waitForShutdown := metricsServerShutdownTimeout + 100*time.Millisecond - - // Assert that it exits cleanly or timeout. - select { - case exitCode := <-exitChan: - require.Equal(t, 0, exitCode, ui.ErrorWriter.String()) - case <-time.After(waitForShutdown): - // Fail if the signal was not caught. - require.Fail(t, "timeout waiting for command to exit") - } - }) - } -} - -type mockEnvoyMetricsGetter struct { - respStatusCode int -} - -func (em *mockEnvoyMetricsGetter) Get(_ string) (resp *http.Response, err error) { - response := &http.Response{} - response.StatusCode = em.respStatusCode - response.Body = io.NopCloser(bytes.NewReader([]byte("envoy metrics\n"))) - return response, nil -} - -// mockServiceMetricsGetter. -type mockServiceMetricsGetter struct { - // reqURL is the last URL that was passed to Get(url) - reqURL string - - // respStatusCode is the status code to use for the response. - respStatusCode int -} - -func (sm *mockServiceMetricsGetter) Get(url string) (resp *http.Response, err error) { - // Record the URL that we were called with. - sm.reqURL = url - - response := &http.Response{} - response.Body = io.NopCloser(bytes.NewReader([]byte("service metrics\n"))) - response.StatusCode = sm.respStatusCode - - return response, nil -} - -func TestMergedMetricsServer(t *testing.T) { - cases := []struct { - name string - envoyMetricsGetter *mockEnvoyMetricsGetter - serviceMetricsGetter *mockServiceMetricsGetter - expectedStatusCode int - expectedOutput string - }{ - { - name: "happy path: envoy and service metrics are merged", - envoyMetricsGetter: &mockEnvoyMetricsGetter{ - respStatusCode: 200, - }, - serviceMetricsGetter: &mockServiceMetricsGetter{ - respStatusCode: 200, - }, - expectedStatusCode: 200, - expectedOutput: "envoy metrics\nservice metrics\nconsul_merged_service_metrics_success 1\n", - }, - { - name: "service metrics non-200", - envoyMetricsGetter: &mockEnvoyMetricsGetter{ - respStatusCode: 200, - }, - serviceMetricsGetter: &mockServiceMetricsGetter{ - respStatusCode: 404, - }, - expectedStatusCode: 200, - expectedOutput: "envoy metrics\nconsul_merged_service_metrics_success 0\n", - }, - { - name: "envoy metrics non-200", - envoyMetricsGetter: &mockEnvoyMetricsGetter{ - respStatusCode: 404, - }, - serviceMetricsGetter: &mockServiceMetricsGetter{ - respStatusCode: 200, - }, - expectedStatusCode: 500, - expectedOutput: "Received non-2xx status code scraping Envoy proxy metrics: 404: envoy metrics\n\n", - }, - { - name: "envoy and service metrics non-200", - envoyMetricsGetter: &mockEnvoyMetricsGetter{ - respStatusCode: 500, - }, - serviceMetricsGetter: &mockServiceMetricsGetter{ - respStatusCode: 500, - }, - expectedStatusCode: 500, - expectedOutput: "Received non-2xx status code scraping Envoy proxy metrics: 500: envoy metrics\n\n", - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - randomPorts := freeport.GetN(t, 2) - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - flagEnableMetricsMerging: true, - flagMergedMetricsPort: fmt.Sprint(randomPorts[0]), - flagServiceMetricsPort: fmt.Sprint(randomPorts[1]), - flagServiceMetricsPath: "/metrics", - logger: hclog.Default(), - envoyMetricsGetter: c.envoyMetricsGetter, - serviceMetricsGetter: c.serviceMetricsGetter, - } - - server := cmd.createMergedMetricsServer() - go func() { - _ = server.ListenAndServe() - }() - defer server.Close() - - // Call the merged metrics endpoint and make assertions on the - // output. retry.Run times out in 7 seconds, which should give the - // merged metrics server enough time to come up. - retry.Run(t, func(r *retry.R) { - resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/stats/prometheus", randomPorts[0])) - require.NoError(r, err) - bytes, err := io.ReadAll(resp.Body) - require.NoError(r, err) - require.Equal(r, c.expectedOutput, string(bytes)) - // Verify the correct service metrics url was used. The service - // metrics endpoint is only called if the Envoy metrics endpoint - // call succeeds. - if c.envoyMetricsGetter.respStatusCode == 200 { - require.Equal(r, fmt.Sprintf("http://127.0.0.1:%d%s", randomPorts[1], "/metrics"), c.serviceMetricsGetter.reqURL) - } - }) - }) - } -} - -func TestRun_FlagValidation(t *testing.T) { - t.Parallel() - cases := []struct { - Flags []string - ExpErr string - }{ - { - Flags: []string{""}, - ExpErr: "-service-config must be set", - }, - { - Flags: []string{ - "-service-config=/config.hcl", - "-consul-binary=", - }, - ExpErr: "-consul-binary must be set", - }, - { - Flags: []string{ - "-service-config=/config.hcl", - "-consul-binary=consul", - "-sync-period=0s", - }, - ExpErr: "-sync-period must be greater than 0", - }, - { - Flags: []string{ - "-enable-service-registration=false", - "-enable-metrics-merging=false", - }, - ExpErr: " at least one of -enable-service-registration or -enable-metrics-merging must be true", - }, - { - Flags: []string{ - "-service-config=/config.hcl", - "-consul-binary=consul", - "-sync-period=5s", - "-enable-service-registration=true", - }, - ExpErr: "-consul-api-timeout must be set to a value greater than 0", - }, - } - - for _, c := range cases { - t.Run(c.ExpErr, func(t *testing.T) { - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - responseCode := cmd.Run(c.Flags) - require.Equal(t, 1, responseCode, ui.ErrorWriter.String()) - require.Contains(t, ui.ErrorWriter.String(), c.ExpErr) - }) - } -} - -func TestRun_FlagValidation_ServiceConfigFileMissing(t *testing.T) { - t.Parallel() - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - responseCode := cmd.Run([]string{"-service-config=/does/not/exist", "-consul-binary=/not/a/valid/path", "-consul-api-timeout=5s"}) - require.Equal(t, 1, responseCode, ui.ErrorWriter.String()) - require.Contains(t, ui.ErrorWriter.String(), "-service-config file \"/does/not/exist\" not found") -} - -func TestRun_FlagValidation_ConsulBinaryMissing(t *testing.T) { - t.Parallel() - - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - - tmpDir, configFile := createServicesTmpFile(t, servicesRegistration) - defer os.RemoveAll(tmpDir) - - configFlag := "-service-config=" + configFile - - responseCode := cmd.Run([]string{configFlag, "-consul-binary=/not/a/valid/path", "-consul-api-timeout=5s"}) - require.Equal(t, 1, responseCode, ui.ErrorWriter.String()) - require.Contains(t, ui.ErrorWriter.String(), "-consul-binary \"/not/a/valid/path\" not found") -} - -func TestRun_FlagValidation_InvalidLogLevel(t *testing.T) { - t.Parallel() - - tmpDir, configFile := createServicesTmpFile(t, servicesRegistration) - defer os.RemoveAll(tmpDir) - - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - responseCode := cmd.Run([]string{"-service-config", configFile, "-consul-binary=consul", "-log-level=foo", "-consul-api-timeout=5s"}) - require.Equal(t, 1, responseCode, ui.ErrorWriter.String()) - require.Contains(t, ui.ErrorWriter.String(), "unknown log level: foo") -} - -// Test that we register the services. -func TestRun_ServicesRegistration(t *testing.T) { - t.Parallel() - - tmpDir, configFile := createServicesTmpFile(t, servicesRegistration) - defer os.RemoveAll(tmpDir) - - a, err := testutil.NewTestServerConfigT(t, nil) - require.NoError(t, err) - defer a.Stop() - - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - - // Run async because we need to kill it when the test is over. - exitChan := runCommandAsynchronously(&cmd, []string{ - "-http-addr", a.HTTPAddr, - "-service-config", configFile, - "-sync-period", "100ms", - "-consul-api-timeout", "5s", - }) - defer stopCommand(t, &cmd, exitChan) - - client, err := api.NewClient(&api.Config{ - Address: a.HTTPAddr, - }) - require.NoError(t, err) - - retry.Run(t, func(r *retry.R) { - svc, _, err := client.Agent().Service("service-id", nil) - require.NoError(r, err) - require.Equal(r, 80, svc.Port) - - svcProxy, _, err := client.Agent().Service("service-id-sidecar-proxy", nil) - require.NoError(r, err) - require.Equal(r, 2000, svcProxy.Port) - }) -} - -// Test that we register services when the Consul agent is down at first. -func TestRun_ServicesRegistration_ConsulDown(t *testing.T) { - t.Parallel() - - tmpDir, configFile := createServicesTmpFile(t, servicesRegistration) - defer os.RemoveAll(tmpDir) - - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - - // we need to reserve all 6 ports to avoid potential - // port collisions with other tests - randomPorts := freeport.GetN(t, 6) - - // Run async because we need to kill it when the test is over. - exitChan := runCommandAsynchronously(&cmd, []string{ - "-http-addr", fmt.Sprintf("127.0.0.1:%d", randomPorts[1]), - "-service-config", configFile, - "-sync-period", "100ms", - "-consul-api-timeout", "5s", - }) - defer stopCommand(t, &cmd, exitChan) - - // Start the Consul agent after 500ms. - time.Sleep(500 * time.Millisecond) - a, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { - c.Ports = &testutil.TestPortConfig{ - DNS: randomPorts[0], - HTTP: randomPorts[1], - HTTPS: randomPorts[2], - SerfLan: randomPorts[3], - SerfWan: randomPorts[4], - Server: randomPorts[5], - } - }) - require.NoError(t, err) - defer a.Stop() - - client, err := api.NewClient(&api.Config{ - Address: a.HTTPAddr, - }) - require.NoError(t, err) - - // The services should be registered when the Consul agent comes up - retry.Run(t, func(r *retry.R) { - svc, _, err := client.Agent().Service("service-id", nil) - require.NoError(r, err) - require.Equal(r, 80, svc.Port) - - svcProxy, _, err := client.Agent().Service("service-id-sidecar-proxy", nil) - require.NoError(r, err) - require.Equal(r, 2000, svcProxy.Port) - }) -} - -// Test that we parse all flags and pass them down to the underlying Consul command. -func TestRun_ConsulCommandFlags(t *testing.T) { - t.Parallel() - tmpDir, configFile := createServicesTmpFile(t, servicesRegistration) - defer os.RemoveAll(tmpDir) - - a, err := testutil.NewTestServerConfigT(t, nil) - require.NoError(t, err) - defer a.Stop() - - ui := cli.NewMockUi() - cmd := Command{ - UI: ui, - } - - // Run async because we need to kill it when the test is over. - exitChan := runCommandAsynchronously(&cmd, []string{ - "-http-addr", a.HTTPAddr, - "-service-config", configFile, - "-sync-period", "1s", - "-consul-binary", "consul", - "-token=abc", - "-token-file=/token/file", - "-ca-file=/ca/file", - "-ca-path=/ca/path", - "-consul-api-timeout", "5s", - }) - defer stopCommand(t, &cmd, exitChan) - - expectedCommand := []string{ - "services", - "register", - "-http-addr=" + a.HTTPAddr, - "-token=abc", - "-token-file=/token/file", - "-ca-file=/ca/file", - "-ca-path=/ca/path", - configFile, - } - retry.Run(t, func(r *retry.R) { - require.ElementsMatch(r, expectedCommand, cmd.consulCommand) - }) -} - -// This function starts the command asynchronously and returns a non-blocking chan. -// When finished, the command will send its exit code to the channel. -// Note that it's the responsibility of the caller to terminate the command by calling stopCommand, -// otherwise it can run forever. -func runCommandAsynchronously(cmd *Command, args []string) chan int { - // We have to run cmd.init() to ensure that the channel the command is - // using to watch for os interrupts is initialized. If we don't do this, - // then if stopCommand is called immediately, it will block forever - // because it calls interrupt() which will attempt to send on a nil channel. - cmd.init() - exitChan := make(chan int, 1) - go func() { - exitChan <- cmd.Run(args) - }() - return exitChan -} - -func stopCommand(t *testing.T, cmd *Command, exitChan chan int) { - if len(exitChan) == 0 { - cmd.interrupt() - } - c := <-exitChan - require.Equal(t, 0, c, string(cmd.UI.(*cli.MockUi).ErrorWriter.Bytes())) -} - -// createServicesTmpFile creates a temp directory -// and writes servicesRegistration as an HCL file there. -func createServicesTmpFile(t *testing.T, serviceHCL string) (string, string) { - tmpDir, err := os.MkdirTemp("", "") - require.NoError(t, err) - - configFile := filepath.Join(tmpDir, "svc.hcl") - err = os.WriteFile(configFile, []byte(serviceHCL), 0600) - require.NoError(t, err) - - return tmpDir, configFile -} - -const servicesRegistration = ` -services { - id = "service-id" - name = "service" - port = 80 -} -services { - id = "service-id-sidecar-proxy" - name = "service-sidecar-proxy" - port = 2000 - kind = "connect-proxy" - proxy { - destination_service_name = "service" - destination_service_id = "service-id" - local_service_port = 80 - } -}` diff --git a/control-plane/subcommand/inject-connect/command.go b/control-plane/subcommand/inject-connect/command.go index f8648729c5..37899c2863 100644 --- a/control-plane/subcommand/inject-connect/command.go +++ b/control-plane/subcommand/inject-connect/command.go @@ -153,7 +153,7 @@ func (c *Command) init() { c.flagSet.StringVar(&c.flagEnvoyExtraArgs, "envoy-extra-args", "", "Extra envoy command line args to be set when starting envoy (e.g \"--log-level debug --disable-hot-restart\").") c.flagSet.StringVar(&c.flagACLAuthMethod, "acl-auth-method", "", - "The name of the Kubernetes Auth Method to use for connectInjection if ACLs are enabled.") //todo: rename this to be more specific? + "The name of the Kubernetes Auth Method to use for connectInjection if ACLs are enabled.") // todo: rename this to be more specific? c.flagSet.Var((*flags.AppendSliceValue)(&c.flagAllowK8sNamespacesList), "allow-k8s-namespace", "K8s namespaces to explicitly allow. May be specified multiple times.") c.flagSet.Var((*flags.AppendSliceValue)(&c.flagDenyK8sNamespacesList), "deny-k8s-namespace", @@ -306,7 +306,7 @@ func (c *Command) Run(args []string) int { } // Validate resource request/limit flags and parse into corev1.ResourceRequirements - initResources, consulSidecarResources, err := c.parseAndValidateResourceFlags() + initResources, err := c.parseAndValidateResourceFlags() if err != nil { c.UI.Error(err.Error()) return 1 @@ -496,44 +496,43 @@ func (c *Command) Run(args []string) int { mgr.GetWebhookServer().Register("/mutate", &webhook.Admission{Handler: &connectinject.MeshWebhook{ - Clientset: c.clientset, - ConsulConfig: consulConfig, - ConsulServerConnMgr: watcher, - ImageConsul: c.flagConsulImage, - ImageConsulDataplane: c.flagConsulDataplaneImage, - EnvoyExtraArgs: c.flagEnvoyExtraArgs, - ImageConsulK8S: c.flagConsulK8sImage, - RequireAnnotation: !c.flagDefaultInject, - AuthMethod: c.flagACLAuthMethod, - ConsulCACert: string(caCertPem), - TLSEnabled: c.consul.UseTLS, - ConsulAddress: c.consul.Addresses, - ConsulTLSServerName: c.consul.TLSServerName, - DefaultProxyCPURequest: sidecarProxyCPURequest, - DefaultProxyCPULimit: sidecarProxyCPULimit, - DefaultProxyMemoryRequest: sidecarProxyMemoryRequest, - DefaultProxyMemoryLimit: sidecarProxyMemoryLimit, - DefaultEnvoyProxyConcurrency: c.flagDefaultEnvoyProxyConcurrency, - MetricsConfig: metricsConfig, - InitContainerResources: initResources, - DefaultConsulSidecarResources: consulSidecarResources, - ConsulPartition: c.consul.Partition, - AllowK8sNamespacesSet: allowK8sNamespaces, - DenyK8sNamespacesSet: denyK8sNamespaces, - EnableNamespaces: c.flagEnableNamespaces, - ConsulDestinationNamespace: c.flagConsulDestinationNamespace, - EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, - K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, - CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, - EnableTransparentProxy: c.flagDefaultEnableTransparentProxy, - EnableCNI: c.flagEnableCNI, - TProxyOverwriteProbes: c.flagTransparentProxyDefaultOverwriteProbes, - EnableConsulDNS: c.flagEnableConsulDNS, - ResourcePrefix: c.flagResourcePrefix, - EnableOpenShift: c.flagEnableOpenShift, - Log: ctrl.Log.WithName("handler").WithName("connect"), - LogLevel: c.flagLogLevel, - LogJSON: c.flagLogJSON, + Clientset: c.clientset, + ConsulConfig: consulConfig, + ConsulServerConnMgr: watcher, + ImageConsul: c.flagConsulImage, + ImageConsulDataplane: c.flagConsulDataplaneImage, + EnvoyExtraArgs: c.flagEnvoyExtraArgs, + ImageConsulK8S: c.flagConsulK8sImage, + RequireAnnotation: !c.flagDefaultInject, + AuthMethod: c.flagACLAuthMethod, + ConsulCACert: string(caCertPem), + TLSEnabled: c.consul.UseTLS, + ConsulAddress: c.consul.Addresses, + ConsulTLSServerName: c.consul.TLSServerName, + DefaultProxyCPURequest: sidecarProxyCPURequest, + DefaultProxyCPULimit: sidecarProxyCPULimit, + DefaultProxyMemoryRequest: sidecarProxyMemoryRequest, + DefaultProxyMemoryLimit: sidecarProxyMemoryLimit, + DefaultEnvoyProxyConcurrency: c.flagDefaultEnvoyProxyConcurrency, + MetricsConfig: metricsConfig, + InitContainerResources: initResources, + ConsulPartition: c.consul.Partition, + AllowK8sNamespacesSet: allowK8sNamespaces, + DenyK8sNamespacesSet: denyK8sNamespaces, + EnableNamespaces: c.flagEnableNamespaces, + ConsulDestinationNamespace: c.flagConsulDestinationNamespace, + EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, + K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, + CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, + EnableTransparentProxy: c.flagDefaultEnableTransparentProxy, + EnableCNI: c.flagEnableCNI, + TProxyOverwriteProbes: c.flagTransparentProxyDefaultOverwriteProbes, + EnableConsulDNS: c.flagEnableConsulDNS, + ResourcePrefix: c.flagResourcePrefix, + EnableOpenShift: c.flagEnableOpenShift, + Log: ctrl.Log.WithName("handler").WithName("connect"), + LogLevel: c.flagLogLevel, + LogJSON: c.flagLogJSON, }}) if c.flagEnableWebhookCAUpdate { @@ -565,10 +564,10 @@ func (c *Command) updateWebhookCABundle(ctx context.Context) error { } return nil } + func (c *Command) validateFlags() error { if c.flagConsulK8sImage == "" { return errors.New("-consul-k8s-image must be set") - } if c.flagConsulImage == "" { return errors.New("-consul-image must be set") @@ -591,39 +590,40 @@ func (c *Command) validateFlags() error { return nil } -func (c *Command) parseAndValidateResourceFlags() (corev1.ResourceRequirements, corev1.ResourceRequirements, error) { + +func (c *Command) parseAndValidateResourceFlags() (corev1.ResourceRequirements, error) { // Init container var initContainerCPULimit, initContainerCPURequest, initContainerMemoryLimit, initContainerMemoryRequest resource.Quantity // Parse and validate the initContainer resources. initContainerCPURequest, err := resource.ParseQuantity(c.flagInitContainerCPURequest) if err != nil { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, + return corev1.ResourceRequirements{}, fmt.Errorf("-init-container-cpu-request '%s' is invalid: %s", c.flagInitContainerCPURequest, err) } initContainerCPULimit, err = resource.ParseQuantity(c.flagInitContainerCPULimit) if err != nil { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, + return corev1.ResourceRequirements{}, fmt.Errorf("-init-container-cpu-limit '%s' is invalid: %s", c.flagInitContainerCPULimit, err) } if initContainerCPULimit.Value() != 0 && initContainerCPURequest.Cmp(initContainerCPULimit) > 0 { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, fmt.Errorf( + return corev1.ResourceRequirements{}, fmt.Errorf( "request must be <= limit: -init-container-cpu-request value of %q is greater than the -init-container-cpu-limit value of %q", c.flagInitContainerCPURequest, c.flagInitContainerCPULimit) } initContainerMemoryRequest, err = resource.ParseQuantity(c.flagInitContainerMemoryRequest) if err != nil { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, + return corev1.ResourceRequirements{}, fmt.Errorf("-init-container-memory-request '%s' is invalid: %s", c.flagInitContainerMemoryRequest, err) } initContainerMemoryLimit, err = resource.ParseQuantity(c.flagInitContainerMemoryLimit) if err != nil { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, + return corev1.ResourceRequirements{}, fmt.Errorf("-init-container-memory-limit '%s' is invalid: %s", c.flagInitContainerMemoryLimit, err) } if initContainerMemoryLimit.Value() != 0 && initContainerMemoryRequest.Cmp(initContainerMemoryLimit) > 0 { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, fmt.Errorf( + return corev1.ResourceRequirements{}, fmt.Errorf( "request must be <= limit: -init-container-memory-request value of %q is greater than the -init-container-memory-limit value of %q", c.flagInitContainerMemoryRequest, c.flagInitContainerMemoryLimit) } @@ -640,55 +640,7 @@ func (c *Command) parseAndValidateResourceFlags() (corev1.ResourceRequirements, }, } - // Consul sidecar - var consulSidecarCPULimit, consulSidecarCPURequest, consulSidecarMemoryLimit, consulSidecarMemoryRequest resource.Quantity - - // Parse and validate the Consul sidecar resources - consulSidecarCPURequest, err = resource.ParseQuantity(c.flagDefaultConsulSidecarCPURequest) - if err != nil { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, - fmt.Errorf("-default-consul-sidecar-cpu-request '%s' is invalid: %s", c.flagDefaultConsulSidecarCPURequest, err) - } - consulSidecarCPULimit, err = resource.ParseQuantity(c.flagDefaultConsulSidecarCPULimit) - if err != nil { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, - fmt.Errorf("-default-consul-sidecar-cpu-limit '%s' is invalid: %s", c.flagDefaultConsulSidecarCPULimit, err) - } - if consulSidecarCPULimit.Value() != 0 && consulSidecarCPURequest.Cmp(consulSidecarCPULimit) > 0 { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, fmt.Errorf( - "request must be <= limit: -default-consul-sidecar-cpu-request value of %q is greater than the -default-consul-sidecar-cpu-limit value of %q", - c.flagDefaultConsulSidecarCPURequest, c.flagDefaultConsulSidecarCPULimit) - } - - consulSidecarMemoryRequest, err = resource.ParseQuantity(c.flagDefaultConsulSidecarMemoryRequest) - if err != nil { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, - fmt.Errorf("-default-consul-sidecar-memory-request '%s' is invalid: %s", c.flagDefaultConsulSidecarMemoryRequest, err) - } - consulSidecarMemoryLimit, err = resource.ParseQuantity(c.flagDefaultConsulSidecarMemoryLimit) - if err != nil { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, - fmt.Errorf("-default-consul-sidecar-memory-limit '%s' is invalid: %s", c.flagDefaultConsulSidecarMemoryLimit, err) - } - if consulSidecarMemoryLimit.Value() != 0 && consulSidecarMemoryRequest.Cmp(consulSidecarMemoryLimit) > 0 { - return corev1.ResourceRequirements{}, corev1.ResourceRequirements{}, fmt.Errorf( - "request must be <= limit: -default-consul-sidecar-memory-request value of %q is greater than the -default-consul-sidecar-memory-limit value of %q", - c.flagDefaultConsulSidecarMemoryRequest, c.flagDefaultConsulSidecarMemoryLimit) - } - - // Put into corev1.ResourceRequirements form - consulSidecarResources := corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: consulSidecarCPURequest, - corev1.ResourceMemory: consulSidecarMemoryRequest, - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: consulSidecarCPULimit, - corev1.ResourceMemory: consulSidecarMemoryLimit, - }, - } - - return initResources, consulSidecarResources, nil + return initResources, nil } func (c *Command) Synopsis() string { return synopsis } @@ -697,10 +649,12 @@ func (c *Command) Help() string { return c.help } -const synopsis = "Inject the proxy sidecar, run endpoints controller and peering controllers." -const help = ` +const ( + synopsis = "Inject the proxy sidecar, run endpoints controller and peering controllers." + help = ` Usage: consul-k8s-control-plane inject-connect [options] Run the admission webhook server for injecting the sidecar proxy, the endpoints controller, and the peering controllers. ` +) diff --git a/control-plane/subcommand/inject-connect/command_test.go b/control-plane/subcommand/inject-connect/command_test.go index a6c6b285be..5f067cf7c2 100644 --- a/control-plane/subcommand/inject-connect/command_test.go +++ b/control-plane/subcommand/inject-connect/command_test.go @@ -113,40 +113,6 @@ func TestRun_FlagValidation(t *testing.T) { }, expErr: "request must be <= limit: -init-container-cpu-request value of \"50m\" is greater than the -init-container-cpu-limit value of \"25m\"", }, - { - flags: []string{"-consul-k8s-image", "foo", "-consul-image", "foo", "-consul-dataplane-image", "envoy:1.16.0", - "-default-consul-sidecar-cpu-limit=unparseable"}, - expErr: "-default-consul-sidecar-cpu-limit 'unparseable' is invalid", - }, - { - flags: []string{"-consul-k8s-image", "foo", "-consul-image", "foo", "-consul-dataplane-image", "consul-dataplane:1.14.0", - "-default-consul-sidecar-cpu-request=unparseable"}, - expErr: "-default-consul-sidecar-cpu-request 'unparseable' is invalid", - }, - { - flags: []string{"-consul-k8s-image", "foo", "-consul-image", "foo", "-consul-dataplane-image", "consul-dataplane:1.14.0", - "-default-consul-sidecar-memory-limit=unparseable"}, - expErr: "-default-consul-sidecar-memory-limit 'unparseable' is invalid", - }, - { - flags: []string{"-consul-k8s-image", "foo", "-consul-image", "foo", "-consul-dataplane-image", "consul-dataplane:1.14.0", - "-default-consul-sidecar-memory-request=unparseable"}, - expErr: "-default-consul-sidecar-memory-request 'unparseable' is invalid", - }, - { - flags: []string{"-consul-k8s-image", "foo", "-consul-image", "foo", "-consul-dataplane-image", "consul-dataplane:1.14.0", - "-default-consul-sidecar-memory-request=50Mi", - "-default-consul-sidecar-memory-limit=25Mi", - }, - expErr: "request must be <= limit: -default-consul-sidecar-memory-request value of \"50Mi\" is greater than the -default-consul-sidecar-memory-limit value of \"25Mi\"", - }, - { - flags: []string{"-consul-k8s-image", "foo", "-consul-image", "foo", "-consul-dataplane-image", "consul-dataplane:1.14.0", - "-default-consul-sidecar-cpu-request=50m", - "-default-consul-sidecar-cpu-limit=25m", - }, - expErr: "request must be <= limit: -default-consul-sidecar-cpu-request value of \"50m\" is greater than the -default-consul-sidecar-cpu-limit value of \"25m\"", - }, { flags: []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-consul-dataplane-image", "consul-dataplane:1.14.0", "-listen", "999999"}, @@ -189,10 +155,4 @@ func TestRun_ResourceLimitDefaults(t *testing.T) { require.Equal(t, cmd.flagInitContainerCPULimit, "50m") require.Equal(t, cmd.flagInitContainerMemoryRequest, "25Mi") require.Equal(t, cmd.flagInitContainerMemoryLimit, "150Mi") - - // Consul sidecar container defaults - require.Equal(t, cmd.flagDefaultConsulSidecarCPURequest, "20m") - require.Equal(t, cmd.flagDefaultConsulSidecarCPULimit, "20m") - require.Equal(t, cmd.flagDefaultConsulSidecarMemoryRequest, "25Mi") - require.Equal(t, cmd.flagDefaultConsulSidecarMemoryLimit, "50Mi") }