Skip to content

Commit

Permalink
Add conformance tests for service ports
Browse files Browse the repository at this point in the history
When a service is exported from two clusters, ensure the clusterset
service exposes the union of service ports declared on its constituent
services. Also if port properties don't match, ensure the conflict
resolution policy is properly applied.

Creation of K8s resources was refactored to functions rather than
defined in global vars to allow the resources to be customized when
created on multiple clusters.

Fixes #71

Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
  • Loading branch information
tpantelis committed Sep 25, 2024
1 parent 7bbb599 commit 36a1076
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 117 deletions.
79 changes: 64 additions & 15 deletions conformance/conformance_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -94,33 +95,37 @@ func setupClients() error {
}

type testDriver struct {
namespace string
namespace string
helloService *corev1.Service
requestPod *corev1.Pod
}

func newTestDriver() *testDriver {
t := &testDriver{}

BeforeEach(func() {
t.namespace = fmt.Sprintf("mcs-conformance-%v", rand.Uint32())
t.helloService = newHelloService()
t.requestPod = newRequestPod()
})

JustBeforeEach(func() {
Expect(clients).ToNot(BeEmpty())

// Set up the shared namespace
t.namespace = fmt.Sprintf("mcs-conformance-%v", rand.Uint32())
for _, client := range clients {
_, err := client.k8s.CoreV1().Namespaces().Create(ctx, &v1.Namespace{
_, err := client.k8s.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: t.namespace},
}, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
}

// Set up the remote service (the first cluster is considered to be the remote)
_, err := clients[0].k8s.AppsV1().Deployments(t.namespace).Create(ctx, &helloDeployment, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
_, err = clients[0].k8s.CoreV1().Services(t.namespace).Create(ctx, &helloService, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
t.deployHelloService(&clients[0], t.helloService)

// Start the request pod on all clusters
for _, client := range clients {
startRequestPod(ctx, client, t.namespace)
t.startRequestPod(ctx, client)
}
})

Expand All @@ -135,19 +140,29 @@ func newTestDriver() *testDriver {
return t
}

func (t *testDriver) createServiceExport() {
_, err := clients[0].mcs.MulticlusterV1alpha1().ServiceExports(t.namespace).Create(ctx,
&v1alpha1.ServiceExport{ObjectMeta: metav1.ObjectMeta{Name: helloService.Name}}, metav1.CreateOptions{})
func (t *testDriver) createServiceExport(c *clusterClients) {
_, err := c.mcs.MulticlusterV1alpha1().ServiceExports(t.namespace).Create(ctx,
&v1alpha1.ServiceExport{ObjectMeta: metav1.ObjectMeta{Name: helloServiceName}}, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
}

func (t *testDriver) deployHelloService(c *clusterClients, service *corev1.Service) {
_, err := c.k8s.AppsV1().Deployments(t.namespace).Create(ctx, newHelloDeployment(), metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
_, err = c.k8s.CoreV1().Services(t.namespace).Create(ctx, service, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
}

func (t *testDriver) awaitServiceImport(c *clusterClients, name string, verify func(*v1alpha1.ServiceImport) bool) *v1alpha1.ServiceImport {
var serviceImport *v1alpha1.ServiceImport

_ = wait.PollUntilContextTimeout(ctx, 200*time.Millisecond,
_ = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond,
20*time.Second, true, func(ctx context.Context) (bool, error) {
defer GinkgoRecover()

si, err := c.mcs.MulticlusterV1alpha1().ServiceImports(t.namespace).Get(ctx, name, metav1.GetOptions{})
if apierrors.IsNotFound(err) || errors.Is(err, context.DeadlineExceeded) {
if apierrors.IsNotFound(err) || errors.Is(err, context.DeadlineExceeded) ||
(err != nil && strings.Contains(err.Error(), "rate limiter")) {
return false, nil
}

Expand All @@ -161,7 +176,35 @@ func (t *testDriver) awaitServiceImport(c *clusterClients, name string, verify f
return serviceImport
}

func toMCSPorts(from []v1.ServicePort) []v1alpha1.ServicePort {
func (t *testDriver) awaitServiceExportCondition(c *clusterClients, condType string) {
Eventually(func() bool {
se, err := c.mcs.MulticlusterV1alpha1().ServiceExports(t.namespace).Get(ctx, helloServiceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())

return meta.FindStatusCondition(se.Status.Conditions, condType) != nil
}, 20*time.Second, 100*time.Millisecond).Should(BeTrue(),
reportNonConformant(fmt.Sprintf("The %s condition was not set", condType)))
}

func (t *testDriver) startRequestPod(ctx context.Context, client clusterClients) {
_, err := client.k8s.CoreV1().Pods(t.namespace).Create(ctx, t.requestPod, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())

Eventually(func() error {
pod, err := client.k8s.CoreV1().Pods(t.namespace).Get(ctx, t.requestPod.Name, metav1.GetOptions{})
if err != nil {
return err
}

if pod.Status.Phase != corev1.PodRunning {
return fmt.Errorf("pod is not running yet, current status %v", pod.Status.Phase)
}

return nil
}, 20, 1).Should(Succeed())
}

func toMCSPorts(from []corev1.ServicePort) []v1alpha1.ServicePort {
var mcsPorts []v1alpha1.ServicePort

for _, port := range from {
Expand All @@ -183,3 +226,9 @@ func sortMCSPorts(p []v1alpha1.ServicePort) []v1alpha1.ServicePort {

return p
}

func requireTwoClusters() {
if len(clients) < 2 {
Skip("This test requires at least 2 clusters - skipping")
}
}
10 changes: 5 additions & 5 deletions conformance/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ var _ = Describe("Connectivity to remote services", func() {
// Repeat multiple times
for i := 0; i < 20; i++ {
command := []string{"sh", "-c", fmt.Sprintf("echo hi | nc %s.%s.svc.clusterset.local 42",
helloService.Name, t.namespace)}
stdout, _, _ := execCmd(client.k8s, client.rest, requestPod.Name, t.namespace, command)
helloServiceName, t.namespace)}
stdout, _, _ := execCmd(client.k8s, client.rest, t.requestPod.Name, t.namespace, command)
Expect(string(stdout)).NotTo(ContainSubstring("pod ip"), reportNonConformant(""))
}
}
Expand All @@ -51,15 +51,15 @@ var _ = Describe("Connectivity to remote services", func() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#dns")
By("exporting the service", func() {
// On the "remote" cluster
t.createServiceExport()
t.createServiceExport(&clients[0])
})
By("issuing a request from all clusters", func() {
// Run on all clusters
for _, client := range clients {
command := []string{"sh", "-c", fmt.Sprintf("echo hi | nc %s.%s.svc.clusterset.local 42",
helloService.Name, t.namespace)}
helloServiceName, t.namespace)}
Eventually(func() string {
stdout, _, _ := execCmd(client.k8s, client.rest, requestPod.Name, t.namespace, command)
stdout, _, _ := execCmd(client.k8s, client.rest, t.requestPod.Name, t.namespace, command)
return string(stdout)
}, 20, 1).Should(ContainSubstring("pod ip"), reportNonConformant(""))
}
Expand Down
17 changes: 0 additions & 17 deletions conformance/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -134,19 +133,3 @@ func execCmd(k8s kubernetes.Interface, config *rest.Config, podName string, podN

return stdout.Bytes(), stderr.Bytes(), nil
}

func startRequestPod(ctx context.Context, client clusterClients, namespace string) {
_, err := client.k8s.CoreV1().Pods(namespace).Create(ctx, &requestPod, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())

Eventually(func() error {
pod, err := client.k8s.CoreV1().Pods(namespace).Get(ctx, requestPod.Name, metav1.GetOptions{})
if err != nil {
return err
}
if pod.Status.Phase != v1.PodRunning {
return fmt.Errorf("pod is not running yet, current status %v", pod.Status.Phase)
}
return nil
}, 20, 1).Should(Succeed())
}
150 changes: 79 additions & 71 deletions conformance/k8s_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,80 +18,86 @@ package conformance

import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)

var helloService = v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "hello",
},
Spec: v1.ServiceSpec{
Selector: map[string]string{
"app": "hello",
const helloServiceName = "hello"

func newHelloService() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: helloServiceName,
},
Ports: []v1.ServicePort{
{
Name: "tcp",
Port: 42,
Protocol: v1.ProtocolTCP,
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": helloServiceName,
},
Ports: []corev1.ServicePort{
{
Name: "tcp",
Port: 42,
Protocol: corev1.ProtocolTCP,
},
{
Name: "udp",
Port: 42,
Protocol: corev1.ProtocolUDP,
},
},
{
Name: "udp",
Port: 42,
Protocol: v1.ProtocolUDP,
SessionAffinity: corev1.ServiceAffinityClientIP,
SessionAffinityConfig: &corev1.SessionAffinityConfig{
ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
},
},
SessionAffinity: v1.ServiceAffinityClientIP,
SessionAffinityConfig: &v1.SessionAffinityConfig{
ClientIP: &v1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))},
},
},
}
}

var helloDeployment = appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "hello",
},
Spec: appsv1.DeploymentSpec{
Replicas: ptr.To(int32(1)),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "hello",
},
func newHelloDeployment() *appsv1.Deployment {
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: helloServiceName,
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "hello"},
Spec: appsv1.DeploymentSpec{
Replicas: ptr.To(int32(1)),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": helloServiceName,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "hello-tcp",
Image: "alpine/socat:1.7.4.4",
Args: []string{"-v", "-v", "TCP-LISTEN:42,crlf,reuseaddr,fork", "SYSTEM:echo pod ip $(MY_POD_IP)"},
Env: []v1.EnvVar{
{
Name: "MY_POD_IP",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "status.podIP",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": helloServiceName},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "hello-tcp",
Image: "alpine/socat:1.7.4.4",
Args: []string{"-v", "-v", "TCP-LISTEN:42,crlf,reuseaddr,fork", "SYSTEM:echo pod ip $(MY_POD_IP)"},
Env: []corev1.EnvVar{
{
Name: "MY_POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
},
},
},
{
Name: "hello-udp",
Image: "alpine/socat:1.7.4.4",
Args: []string{"-v", "-v", "UDP-LISTEN:42,crlf,reuseaddr,fork", "SYSTEM:echo pod ip $(MY_POD_IP)"},
Env: []v1.EnvVar{
{
Name: "MY_POD_IP",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "status.podIP",
{
Name: "hello-udp",
Image: "alpine/socat:1.7.4.4",
Args: []string{"-v", "-v", "UDP-LISTEN:42,crlf,reuseaddr,fork", "SYSTEM:echo pod ip $(MY_POD_IP)"},
Env: []corev1.EnvVar{
{
Name: "MY_POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
},
Expand All @@ -100,21 +106,23 @@ var helloDeployment = appsv1.Deployment{
},
},
},
},
}
}

var requestPod = v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "request",
Labels: map[string]string{"app": "request"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "request",
Image: "busybox",
Args: []string{"/bin/sh", "-ec", "while :; do echo '.'; sleep 5 ; done"},
func newRequestPod() *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "request",
Labels: map[string]string{"app": "request"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "request",
Image: "busybox",
Args: []string{"/bin/sh", "-ec", "while :; do echo '.'; sleep 5 ; done"},
},
},
},
},
}
}
Loading

0 comments on commit 36a1076

Please sign in to comment.