From 25c8e53d1803e6d4c6ed9f6e5b6ddb3e600cb891 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Tue, 15 Nov 2022 17:43:54 -0600 Subject: [PATCH] Allow all headless services, not just those backed by Statefulsets with subdomains (#5250) * Allow all headless services, not just those backed by Statefulsets with subdomains Signed-off-by: Keith Mattix II --- .github/workflows/main.yml | 2 +- pkg/cli/verifier/envoy_config.go | 11 +- pkg/k8s/client.go | 10 +- pkg/k8s/client_test.go | 55 +++++ .../e2e_client_server_connectivity_test.go | 19 +- ..._deployment_client_headless_server_test.go | 216 ++++++++++++++++++ tests/e2e/e2e_statefulsets_test.go | 2 +- 7 files changed, 305 insertions(+), 10 deletions(-) create mode 100644 tests/e2e/e2e_deployment_client_headless_server_test.go diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c3ec4e99b4..3a151259d2 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -160,7 +160,7 @@ jobs: matrix: k8s_version: [""] focus: [""] - bucket: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + bucket: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] include: - k8s_version: v1.22.9 focus: "Test traffic flowing from client to server with a Kubernetes Service for the Source: HTTP" diff --git a/pkg/cli/verifier/envoy_config.go b/pkg/cli/verifier/envoy_config.go index 197c9de750..9b7b4cb68a 100644 --- a/pkg/cli/verifier/envoy_config.go +++ b/pkg/cli/verifier/envoy_config.go @@ -18,6 +18,7 @@ import ( configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2" "github.com/openservicemesh/osm/pkg/envoy" + "github.com/openservicemesh/osm/pkg/k8s" "github.com/openservicemesh/osm/pkg/trafficpolicy" "github.com/openservicemesh/osm/pkg/constants" @@ -25,7 +26,6 @@ import ( "github.com/openservicemesh/osm/pkg/envoy/rds/route" envoySecrets "github.com/openservicemesh/osm/pkg/envoy/secrets" "github.com/openservicemesh/osm/pkg/identity" - "github.com/openservicemesh/osm/pkg/k8s" "github.com/openservicemesh/osm/pkg/service" ) @@ -480,11 +480,15 @@ func (v *EnvoyConfigVerifier) getDstMeshServicesForSvcPod(svc corev1.Service, po // us to retrieve the TargetPort for the MeshService. meshSvc.TargetPort = k8s.GetTargetPortFromEndpoints(portSpec.Name, *endpoints) + // Even if the service is headless, add it so it can be targeted if !k8s.IsHeadlessService(svc) { meshServices = append(meshServices, meshSvc) continue } + // If there's not at least 1 subdomain-ed MeshService added, + // add the entire headless service + var added bool for _, subset := range endpoints.Subsets { for _, address := range subset.Addresses { if address.Hostname == "" { @@ -498,8 +502,13 @@ func (v *EnvoyConfigVerifier) getDstMeshServicesForSvcPod(svc corev1.Service, po Protocol: meshSvc.Protocol, } meshServices = append(meshServices, mSvc) + added = true } } + + if !added { + meshServices = append(meshServices, meshSvc) + } } return meshServices, nil diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go index 368b29e071..bb98823922 100644 --- a/pkg/k8s/client.go +++ b/pkg/k8s/client.go @@ -322,7 +322,9 @@ func ServiceToMeshServices(c Controller, svc corev1.Service) []service.MeshServi meshServices = append(meshServices, meshSvc) continue } - + // If there's not at least 1 subdomain-ed MeshService added, + // add the entire headless service + var added bool for _, subset := range endpoints.Subsets { for _, address := range subset.Addresses { if address.Hostname == "" { @@ -335,10 +337,14 @@ func ServiceToMeshServices(c Controller, svc corev1.Service) []service.MeshServi TargetPort: meshSvc.TargetPort, Protocol: meshSvc.Protocol, }) + added = true } } - } + if !added { + meshServices = append(meshServices, meshSvc) + } + } return meshServices } diff --git a/pkg/k8s/client_test.go b/pkg/k8s/client_test.go index fc8b48cd7d..fa304909fe 100644 --- a/pkg/k8s/client_test.go +++ b/pkg/k8s/client_test.go @@ -880,6 +880,61 @@ func TestK8sServicesToMeshServices(t *testing.T) { }, }, }, + { + name: "k8s headless service with single port and endpoint (no hostname), no appProtocol set", + // Single port on the service maps to a single MeshService. + // Since no appProtocol is specified, MeshService.Protocol should default + // to http because Port.Protocol=TCP + svc: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "s1", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "p1", + Port: 80, + Protocol: corev1.ProtocolTCP, + }, + }, + ClusterIP: corev1.ClusterIPNone, + }, + }, + svcEndpoints: []runtime.Object{ + &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + // Should match svc.Name and svc.Namespace + Namespace: "ns1", + Name: "s1", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "10.1.0.1", + }, + }, + Ports: []corev1.EndpointPort{ + { + // Must match the port of 'svc.Spec.Ports[0]' + Port: 8080, // TargetPort + }, + }, + }, + }, + }, + }, + expected: []service.MeshService{ + { + Namespace: "ns1", + Name: "s1", + Port: 80, + TargetPort: 8080, + Protocol: "http", + }, + }, + }, { name: "multiple ports on k8s service with appProtocol specified", svc: corev1.Service{ diff --git a/tests/e2e/e2e_client_server_connectivity_test.go b/tests/e2e/e2e_client_server_connectivity_test.go index f978ecac9f..9c2999048f 100644 --- a/tests/e2e/e2e_client_server_connectivity_test.go +++ b/tests/e2e/e2e_client_server_connectivity_test.go @@ -12,6 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2" + "github.com/openservicemesh/osm/pkg/tests" "github.com/openservicemesh/osm/pkg/constants" . "github.com/openservicemesh/osm/tests/framework" @@ -26,7 +27,7 @@ var _ = OSMDescribe("Test HTTP traffic from 1 pod client -> 1 pod server", func() { Context("Test traffic flowing from client to server with a Kubernetes Service for the Source: HTTP", func() { withSourceKubernetesService := true - testTraffic(withSourceKubernetesService, PodCommandDefault) + testTraffic(withSourceKubernetesService, PodCommandDefault, false) }) Context("Test traffic flowing from client to server without a Kubernetes Service for the Source: HTTP", func() { @@ -34,18 +35,26 @@ var _ = OSMDescribe("Test HTTP traffic from 1 pod client -> 1 pod server", // for the Envoy proxy to be configured for outbound traffic to some remote server. // This test ensures we test this scenario: client Pod is not associated w/ a service. withSourceKubernetesService := false - testTraffic(withSourceKubernetesService, PodCommandDefault) + testTraffic(withSourceKubernetesService, PodCommandDefault, false) }) Context("Test traffic flowing from client to a server with a podIP bind", func() { // Prior iterations of OSM didn't allow mesh services to bind to the podIP // This test ensures that that behavior is configurable via MeshConfig withSourceKubernetesService := true - testTraffic(withSourceKubernetesService, []string{"gunicorn", "-b", "$(POD_IP):80", "httpbin:app", "-k", "gevent"}, WithLocalProxyMode(v1alpha2.LocalProxyModePodIP)) + testTraffic(withSourceKubernetesService, []string{"gunicorn", "-b", "$(POD_IP):80", "httpbin:app", "-k", "gevent"}, false, WithLocalProxyMode(v1alpha2.LocalProxyModePodIP)) + }) + + Context("Test traffic flowing from client to a headless service without a Kubernetes Service for the Source: HTTP", func() { + // Prior iterations of OSM required that a source pod belong to a Kubernetes service + // for the Envoy proxy to be configured for outbound traffic to some remote server. + // This test ensures we test this scenario: client Pod is not associated w/ a service. + withSourceKubernetesService := true + testTraffic(withSourceKubernetesService, PodCommandDefault, true) }) }) -func testTraffic(withSourceKubernetesService bool, destPodCommand []string, installOpts ...InstallOsmOpt) { +func testTraffic(withSourceKubernetesService bool, destPodCommand []string, destServiceHeadless bool, installOpts ...InstallOsmOpt) { const sourceName = "client" const destName = "server" var ns = []string{sourceName, destName} @@ -68,7 +77,7 @@ func testTraffic(withSourceKubernetesService bool, destPodCommand []string, inst Expect(err).NotTo(HaveOccurred()) _, err = Td.CreatePod(destName, podDef) Expect(err).NotTo(HaveOccurred()) - dstSvc, err := Td.CreateService(destName, svcDef) + dstSvc, err := Td.CreateService(destName, *tests.HeadlessSvc(&svcDef)) Expect(err).NotTo(HaveOccurred()) // Expect it to be up and running in it's receiver namespace diff --git a/tests/e2e/e2e_deployment_client_headless_server_test.go b/tests/e2e/e2e_deployment_client_headless_server_test.go new file mode 100644 index 0000000000..cee4a1d251 --- /dev/null +++ b/tests/e2e/e2e_deployment_client_headless_server_test.go @@ -0,0 +1,216 @@ +package e2e + +import ( + "context" + "fmt" + "sync" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/openservicemesh/osm/pkg/tests" + . "github.com/openservicemesh/osm/tests/framework" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = OSMDescribe("Test HTTP traffic from N deployment client -> 1 deployment server", + OSMDescribeInfo{ + Tier: 1, + Bucket: 11, + }, + func() { + const ( + // to name the header we will use to identify the server that replies + HTTPHeaderName = "podname" + ) + + var maxTestDuration = 150 * time.Second + + Context("DeploymentsClientServer", func() { + var ( + destApp = "server" + sourceAppBaseName = "client" + sourceNamespaces = []string{} + + // Total (numberOfClientApps x replicaSetPerApp) pods + numberOfClientServices = 5 + replicaSetPerService = 5 + ) + + // Used across the test to wait for concurrent steps to finish + var wg sync.WaitGroup + + for i := 0; i < numberOfClientServices; i++ { + // base names for each client service + sourceNamespaces = append(sourceNamespaces, fmt.Sprintf("%s%d", sourceAppBaseName, i)) + } + + It("Tests HTTP traffic from multiple client deployments to a headless service deployment", func() { + // Install OSM + Expect(Td.InstallOSM(Td.GetOSMInstallOpts())).To(Succeed()) + + // Server NS + Expect(Td.CreateNs(destApp, nil)).To(Succeed()) + Expect(Td.AddNsToMesh(true, destApp)).To(Succeed()) + + // Client Applications + for _, srcClient := range sourceNamespaces { + Expect(Td.CreateNs(srcClient, nil)).To(Succeed()) + Expect(Td.AddNsToMesh(true, srcClient)).To(Succeed()) + } + + // Use a deployment with multiple replicaset at serverside + svcAccDef, deploymentDef, svcDef, err := Td.SimpleDeploymentApp( + SimpleDeploymentAppDef{ + DeploymentName: destApp, + Namespace: destApp, + ServiceName: destApp, + ServiceAccountName: destApp, + ReplicaCount: int32(replicaSetPerService), + Image: "simonkowallik/httpbin", + Ports: []int{DefaultUpstreamServicePort}, + Command: HttpbinCmd, + OS: Td.ClusterOS, + }) + Expect(err).NotTo(HaveOccurred()) + + // Expose an env variable such as XHTTPBIN_X_POD_NAME: + // This httpbin fork will pick certain env variable formats and reply the values as headers. + // We will expose pod name as one of these env variables, and will use it + // to identify the pod that replies to the request, and validate the test + deploymentDef.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{ + { + Name: fmt.Sprintf("XHTTPBIN_%s", HTTPHeaderName), + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + } + + _, err = Td.CreateServiceAccount(destApp, &svcAccDef) + Expect(err).NotTo(HaveOccurred()) + _, err = Td.CreateDeployment(destApp, deploymentDef) + Expect(err).NotTo(HaveOccurred()) + _, err = Td.CreateService(destApp, *tests.HeadlessSvc(&svcDef)) + Expect(err).NotTo(HaveOccurred()) + + wg.Add(1) + go func(wg *sync.WaitGroup, srcClient string) { + defer GinkgoRecover() + defer wg.Done() + Expect(Td.WaitForPodsRunningReady(destApp, 200*time.Second, replicaSetPerService, nil)).To(Succeed()) + }(&wg, destApp) + + // Create all client deployments, also with replicaset + for _, srcClient := range sourceNamespaces { + svcAccDef, deploymentDef, svcDef, err = Td.SimpleDeploymentApp( + SimpleDeploymentAppDef{ + DeploymentName: srcClient, + Namespace: srcClient, + ServiceAccountName: srcClient, + ServiceName: srcClient, + ContainerName: srcClient, + ReplicaCount: int32(replicaSetPerService), + Command: []string{"/bin/bash", "-c", "--"}, + Args: []string{"while true; do sleep 30; done;"}, + Image: "songrgg/alpine-debug", + Ports: []int{DefaultUpstreamServicePort}, // Can't deploy services with empty/no ports + OS: Td.ClusterOS, + }) + Expect(err).NotTo(HaveOccurred()) + + _, err = Td.CreateServiceAccount(srcClient, &svcAccDef) + Expect(err).NotTo(HaveOccurred()) + _, err = Td.CreateDeployment(srcClient, deploymentDef) + Expect(err).NotTo(HaveOccurred()) + _, err = Td.CreateService(srcClient, svcDef) + Expect(err).NotTo(HaveOccurred()) + + wg.Add(1) + go func(wg *sync.WaitGroup, srcClient string) { + defer GinkgoRecover() + defer wg.Done() + Expect(Td.WaitForPodsRunningReady(srcClient, 200*time.Second, replicaSetPerService, nil)).To(Succeed()) + }(&wg, srcClient) + } + wg.Wait() + + // Create traffic rules + // Deploy allow rules (10x)clients -> server + for _, srcClient := range sourceNamespaces { + httpRG, trafficTarget := Td.CreateSimpleAllowPolicy( + SimpleAllowPolicy{ + RouteGroupName: srcClient, + TrafficTargetName: srcClient, + + SourceNamespace: srcClient, + SourceSVCAccountName: srcClient, + + DestinationNamespace: destApp, + DestinationSvcAccountName: destApp, + }) + + // Configs have to be put into a monitored NS, and osm-system can't be by cli + _, err = Td.CreateHTTPRouteGroup(destApp, httpRG) + Expect(err).NotTo(HaveOccurred()) + _, err = Td.CreateTrafficTarget(destApp, trafficTarget) + Expect(err).NotTo(HaveOccurred()) + } + + // Create Multiple HTTP request structure and fill it with appropriate details + requests := HTTPMultipleRequest{ + Sources: []HTTPRequestDef{}, + } + for _, ns := range sourceNamespaces { + pods, err := Td.Client.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{}) + Expect(err).To(BeNil()) + + for _, pod := range pods.Items { + requests.Sources = append(requests.Sources, HTTPRequestDef{ + SourceNs: ns, + SourcePod: pod.Name, + SourceContainer: ns, // container_name == NS for this test + + Destination: fmt.Sprintf("%s.%s:%d", destApp, destApp, DefaultUpstreamServicePort), + }) + } + } + + var results HTTPMultipleResults + var serversSeen map[string]bool = map[string]bool{} // Just counts unique servers seen + success := Td.WaitForRepeatedSuccess(func() bool { + // Issue all calls, get results + results = Td.MultipleHTTPRequest(&requests) + + // Care, depending on variables there could be a lot of results + Td.PrettyPrintHTTPResults(&results) + + // Verify success conditions + for _, ns := range results { + for _, podResult := range ns { + if podResult.Err != nil || podResult.StatusCode != 200 { + return false + } + // We should see pod header populated + dstPod, ok := podResult.Headers[HTTPHeaderName] + if ok { + // Store and mark that we have seen a response for this server pod + serversSeen[dstPod] = true + } + } + } + Td.T.Logf("Unique servers replied %d/%d", len(serversSeen), replicaSetPerService) + Expect(len(serversSeen)).To(Equal(replicaSetPerService)) + + return true + }, 5, maxTestDuration) + + Expect(success).To(BeTrue()) + }) + }) + }) diff --git a/tests/e2e/e2e_statefulsets_test.go b/tests/e2e/e2e_statefulsets_test.go index 864e7d6cd6..5e0b6c63c9 100644 --- a/tests/e2e/e2e_statefulsets_test.go +++ b/tests/e2e/e2e_statefulsets_test.go @@ -22,7 +22,7 @@ import ( var _ = OSMDescribe("Test traffic among Statefulset members", OSMDescribeInfo{ Tier: 1, - Bucket: 9, + Bucket: 12, OS: OSCrossPlatform, }, func() {