Skip to content

Commit

Permalink
[flexible-ipam] Add e2e for StatefulSet
Browse files Browse the repository at this point in the history
Signed-off-by: gran <gran@vmware.com>
  • Loading branch information
gran-vmv committed Dec 17, 2021
1 parent cc5c1eb commit ab7ab60
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 19 deletions.
2 changes: 1 addition & 1 deletion ci/jenkins/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ function run_e2e {
# HACK: see https://github.com/antrea-io/antrea/issues/2292
go mod edit -replace github.com/moby/spdystream=github.com/antoninbas/spdystream@v0.2.1 && go mod tidy
if [[ $FLEXIBLE_IPAM == true ]]; then
go test -v antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --provider remote -timeout=100m --prometheus --antrea-ipam
go test -v -run=TestAntreaIPAM antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --provider remote -timeout=100m --prometheus --antrea-ipam
else
go test -v antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --provider remote -timeout=100m --prometheus
fi
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/antreaipam_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestAntreaIPAMService(t *testing.T) {
defer teardownTest(t, data)

// Create AntreaIPAM IPPool and test Namespace
ippool, err := createIPPool(t, data, 0)
ippool, err := createIPPool(t, data, "0")
if err != nil {
t.Fatalf("Creating IPPool failed, err=%+v", err)
}
Expand Down
259 changes: 244 additions & 15 deletions test/e2e/antreaipam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,29 @@ package e2e
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
"reflect"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
utilnet "k8s.io/utils/net"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
crdv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
)

var (
subnetIPv4RangesMap = map[int]crdv1alpha2.IPPool{
0: {
subnetIPv4RangesMap = map[string]crdv1alpha2.IPPool{
"0": {
ObjectMeta: metav1.ObjectMeta{
Name: "test-ippool-ipv4-0",
},
Expand All @@ -47,6 +57,24 @@ var (
}}},
},
},
"1": {
ObjectMeta: metav1.ObjectMeta{
Name: "test-ippool-ipv4-1",
},
Spec: crdv1alpha2.IPPoolSpec{
IPVersion: 4,
IPRanges: []crdv1alpha2.SubnetIPRange{{IPRange: crdv1alpha2.IPRange{
CIDR: "",
Start: "192.168.240.130",
End: "192.168.240.139",
},
SubnetInfo: crdv1alpha2.SubnetInfo{
Gateway: "192.168.240.1",
PrefixLength: 24,
VLAN: "",
}}},
},
},
}
)

Expand All @@ -60,7 +88,7 @@ func TestAntreaIPAM(t *testing.T) {
defer teardownTest(t, data)

// Create AntreaIPAM IPPool and test Namespace
ippool, err := createIPPool(t, data, 0)
ippool, err := createIPPool(t, data, "0")
if err != nil {
t.Fatalf("Creating IPPool failed, err=%+v", err)
}
Expand All @@ -72,6 +100,12 @@ func TestAntreaIPAM(t *testing.T) {
t.Fatalf("Creating AntreaIPAM Namespace failed, err=%+v", err)
}
defer deleteAntreaIPAMNamespace(t, data)
// Create 2nd AntreaIPAM IPPool
ippool, err = createIPPool(t, data, "1")
if err != nil {
t.Fatalf("Creating IPPool failed, err=%+v", err)
}
defer deleteIPPoolWrapper(t, data, ippool.Name)

// basic test
t.Run("testAntreaIPAMPodAssignIP", func(t *testing.T) { testPodAssignIP(t, data, testAntreaIPAMNamespace, "192.168.240.0/24", "") })
Expand Down Expand Up @@ -113,6 +147,15 @@ func TestAntreaIPAM(t *testing.T) {
skipIfHasWindowsNodes(t)
testOVSFlowReplay(t, data, testAntreaIPAMNamespace)
})

// StatefulSet test
dedicatedIPPoolKey := "1"
t.Run("testAntreaIPAMStatefulSetDedicated", func(t *testing.T) {
testAntreaIPAMStatefulSet(t, data, &dedicatedIPPoolKey)
})
t.Run("testAntreaIPAMStatefulSetShared", func(t *testing.T) {
testAntreaIPAMStatefulSet(t, data, nil)
})
}

func testAntreaIPAMPodConnectivitySameNode(t *testing.T, data *TestData) {
Expand Down Expand Up @@ -156,26 +199,217 @@ func testAntreaIPAMPodConnectivityDifferentNodes(t *testing.T, data *TestData) {
data.runPingMesh(t, podInfos, agnhostContainerName)
}

func testAntreaIPAMStatefulSet(t *testing.T, data *TestData, dedicatedIPPoolKey *string) {
// TODO(gran): correct reservedIPOffsets according to antrea-controller
stsName := randName("sts-test-")
ipPoolName := subnetIPv4RangesMap["0"].Name
if dedicatedIPPoolKey != nil {
ipPoolName = subnetIPv4RangesMap[*dedicatedIPPoolKey].Name
}
ipOffsets := []int32{0, 1}
size := len(ipOffsets)
reservedIPOffsets := []int32{}
reservedIPOffsets = ipOffsets
mutateFunc := func(sts *appsv1.StatefulSet) {
if sts.Spec.Template.Annotations == nil {
sts.Spec.Template.Annotations = map[string]string{}
}
if dedicatedIPPoolKey != nil {
sts.Spec.Template.Annotations[ipam.AntreaIPAMAnnotationKey] = ipPoolName
}
}
_, cleanup, err := data.createStatefulSet(stsName, testAntreaIPAMNamespace, int32(size), agnhostContainerName, agnhostImage, []string{"sleep", "3600"}, nil, mutateFunc)
if err != nil {
t.Fatalf("Error when creating StatefulSet '%s': %v", stsName, err)
}
defer cleanup()
if err := data.waitForStatefulSetPods(defaultTimeout, stsName, testAntreaIPAMNamespace); err != nil {
t.Fatalf("Error when waiting for StatefulSet Pods to get IPs: %v", err)
}
checkStatefulSetIPPoolAllocation(t, data, stsName, testAntreaIPAMNamespace, ipPoolName, ipOffsets, reservedIPOffsets)

ipOffsets = []int32{0}
size = len(ipOffsets)
_, err = data.updateStatefulSetSize(stsName, testAntreaIPAMNamespace, int32(size))
if err != nil {
t.Fatalf("Error when updating StatefulSet '%s': %v", stsName, err)
}
if err := data.waitForStatefulSetPods(defaultTimeout, stsName, testAntreaIPAMNamespace); err != nil {
t.Fatalf("Error when waiting for StatefulSet Pods to get IPs: %v", err)
}
checkStatefulSetIPPoolAllocation(t, data, stsName, testAntreaIPAMNamespace, ipPoolName, ipOffsets, reservedIPOffsets)

podMutateFunc := func(pod *corev1.Pod) {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if dedicatedIPPoolKey != nil {
pod.Annotations[ipam.AntreaIPAMAnnotationKey] = ipPoolName
}
}
podName := randName("test-standalone-pod-")
err = data.createPodOnNode(podName, testAntreaIPAMNamespace, controlPlaneNodeName(), agnhostImage, []string{"sleep", "3600"}, nil, nil, nil, false, podMutateFunc)
if err != nil {
t.Fatalf("Error when creating Pod '%s': %v", podName, err)
}
defer data.deletePodAndWait(defaultTimeout, testAntreaIPAMNamespace, podName)
podIPs, err := data.podWaitForIPs(defaultTimeout, podName, testAntreaIPAMNamespace)
if err != nil {
t.Fatalf("Error when waiting Pod IPs: %v", err)
}
isBelongTo, ipAddressState, err := checkIPPoolAllocation(t, data, ipPoolName, podIPs.ipv4.String())
if err != nil {
t.Fatalf("Error when checking IPPoolAllocation: %v", err)
}
startIPString := subnetIPv4RangesMap["0"].Spec.IPRanges[0].Start
offset := 2
if dedicatedIPPoolKey != nil {
startIPString = subnetIPv4RangesMap[*dedicatedIPPoolKey].Spec.IPRanges[0].Start
}
expectedPodIP := utilnet.AddIPOffset(utilnet.BigForIP(net.ParseIP(startIPString)), offset)
assert.True(t, isBelongTo)
assert.True(t, reflect.DeepEqual(ipAddressState, &crdv1alpha2.IPAddressState{
IPAddress: expectedPodIP.String(),
Phase: crdv1alpha2.IPAddressPhaseAllocated,
Owner: crdv1alpha2.IPAddressOwner{
Pod: &crdv1alpha2.PodOwner{
Name: podName,
Namespace: testAntreaIPAMNamespace,
ContainerID: ipAddressState.Owner.Pod.ContainerID,
},
},
}))

ipOffsets = []int32{0, 1, 3}
size = len(ipOffsets)
reservedIPOffsets = ipOffsets
_, err = data.updateStatefulSetSize(stsName, testAntreaIPAMNamespace, int32(size))
if err != nil {
t.Fatalf("Error when updating StatefulSet '%s': %v", stsName, err)
}
if err := data.waitForStatefulSetPods(defaultTimeout, stsName, testAntreaIPAMNamespace); err != nil {
t.Fatalf("Error when waiting for StatefulSet Pods to get IPs: %v", err)
}
checkStatefulSetIPPoolAllocation(t, data, stsName, testAntreaIPAMNamespace, ipPoolName, ipOffsets, reservedIPOffsets)

data.deletePodAndWait(defaultTimeout, testAntreaIPAMNamespace, podName)
_, err = data.restartStatefulSet(stsName, testAntreaIPAMNamespace)
if err != nil {
t.Fatalf("Error when restarting StatefulSet '%s': %v", stsName, err)
}
time.Sleep(time.Second)
if err := data.waitForStatefulSetPods(defaultTimeout, stsName, testAntreaIPAMNamespace); err != nil {
t.Fatalf("Error when waiting for StatefulSet Pods to get IPs: %v", err)
}
checkStatefulSetIPPoolAllocation(t, data, stsName, testAntreaIPAMNamespace, ipPoolName, ipOffsets, reservedIPOffsets)

cleanup()
checkStatefulSetIPPoolAllocation(t, data, stsName, testAntreaIPAMNamespace, ipPoolName, nil, nil)
}

func checkStatefulSetIPPoolAllocation(tb testing.TB, data *TestData, name string, namespace string, ipPoolName string, ipOffsets, reservedIPOffsets []int32) {
ipPool, err := data.crdClient.CrdV1alpha2().IPPools().Get(context.TODO(), ipPoolName, metav1.GetOptions{})
if err != nil {
tb.Fatalf("Failed to get IPPool %s, err: %+v", ipPoolName, err)
}
startIP := net.ParseIP(ipPool.Spec.IPRanges[0].Start)
expectedIPAddressMap := map[string]*crdv1alpha2.IPAddressState{}
for i, offset := range ipOffsets {
ipString := utilnet.AddIPOffset(utilnet.BigForIP(startIP), int(offset)).String()
podName := fmt.Sprintf("%s-%d", name, i)
expectedIPAddressMap[ipString] = &crdv1alpha2.IPAddressState{
IPAddress: ipString,
Phase: crdv1alpha2.IPAddressPhaseAllocated,
Owner: crdv1alpha2.IPAddressOwner{
Pod: &crdv1alpha2.PodOwner{
Name: podName,
Namespace: namespace,
ContainerID: "",
},
},
}
}
for i, offset := range reservedIPOffsets {
ipString := utilnet.AddIPOffset(utilnet.BigForIP(startIP), int(offset)).String()
stsOwner := &crdv1alpha2.StatefulSetOwner{
Name: name,
Namespace: namespace,
Index: i,
}
if _, ok := expectedIPAddressMap[ipString]; ok {
expectedIPAddressMap[ipString].Owner.StatefulSet = stsOwner
} else {
expectedIPAddressMap[ipString] = &crdv1alpha2.IPAddressState{
IPAddress: ipString,
Phase: crdv1alpha2.IPAddressPhaseReserved,
Owner: crdv1alpha2.IPAddressOwner{
StatefulSet: stsOwner,
},
}
}
}
expectedIPAddressJson, _ := json.Marshal(expectedIPAddressMap)
tb.Logf("expectedIPAddressMap: %s", expectedIPAddressJson)

err = wait.Poll(time.Second*5, defaultTimeout, func() (bool, error) {
ipPool, err := data.crdClient.CrdV1alpha2().IPPools().Get(context.TODO(), ipPoolName, metav1.GetOptions{})
if err != nil {
tb.Fatalf("Failed to get IPPool %s, err: %+v", ipPoolName, err)
}
actualIPAddressMap := map[string]*crdv1alpha2.IPAddressState{}
actualIPAddressLoop:
for _, IPAddress := range ipPool.Status.IPAddresses {
for expectedIP := range expectedIPAddressMap {
if IPAddress.IPAddress == expectedIP {
actualIPAddressMap[expectedIP] = IPAddress.DeepCopy()
if actualIPAddressMap[expectedIP].Owner.Pod != nil {
actualIPAddressMap[expectedIP].Owner.Pod.ContainerID = ""
}
continue actualIPAddressLoop
}
}
if IPAddress.Owner.Pod != nil && IPAddress.Owner.Pod.Namespace == namespace && strings.HasPrefix(IPAddress.Owner.Pod.Name, name) {
actualIPAddressMap[IPAddress.IPAddress] = IPAddress.DeepCopy()
if actualIPAddressMap[IPAddress.IPAddress].Owner.Pod != nil {
actualIPAddressMap[IPAddress.IPAddress].Owner.Pod.ContainerID = ""
}
continue
}
if IPAddress.Owner.StatefulSet != nil && IPAddress.Owner.StatefulSet.Namespace == namespace && IPAddress.Owner.StatefulSet.Name == name {
actualIPAddressMap[IPAddress.IPAddress] = IPAddress.DeepCopy()
continue
}
}
done := reflect.DeepEqual(expectedIPAddressMap, actualIPAddressMap)
if !done {
actualIPAddressJson, _ := json.Marshal(ipPool.Status.IPAddresses)
tb.Logf("IPPool status isn't correct %s", actualIPAddressJson)
}
return done, nil
})
require.Nil(tb, err)
}

func deleteAntreaIPAMNamespace(tb testing.TB, data *TestData) {
tb.Logf("Deleting '%s' K8s Namespace", testAntreaIPAMNamespace)
if err := data.deleteNamespace(testAntreaIPAMNamespace, defaultTimeout); err != nil {
tb.Logf("Error when tearing down test: %v", err)
}
}

func createIPPool(tb testing.TB, data *TestData, vlan int) (*crdv1alpha2.IPPool, error) {
ipv4IPPool := subnetIPv4RangesMap[vlan]
func createIPPool(tb testing.TB, data *TestData, key string) (*crdv1alpha2.IPPool, error) {
ipv4IPPool := subnetIPv4RangesMap[key]
tb.Logf("Creating IPPool '%s'", ipv4IPPool.Name)
return data.crdClient.CrdV1alpha2().IPPools().Create(context.TODO(), &ipv4IPPool, metav1.CreateOptions{})
}

func checkIPPoolAllocation(tb testing.TB, data *TestData, IPPoolName, podIPString string) (isBelongTo, isAllocated, isPreallocated bool, err error) {
IPPool, err := data.crdClient.CrdV1alpha2().IPPools().Get(context.TODO(), IPPoolName, metav1.GetOptions{})
func checkIPPoolAllocation(tb testing.TB, data *TestData, ipPoolName, podIPString string) (isBelongTo bool, ipAddressState *crdv1alpha2.IPAddressState, err error) {
ipPool, err := data.crdClient.CrdV1alpha2().IPPools().Get(context.TODO(), ipPoolName, metav1.GetOptions{})
if err != nil {
return
}
podIP := net.ParseIP(podIPString)
for _, subnetIPRange := range IPPool.Spec.IPRanges {
for _, subnetIPRange := range ipPool.Spec.IPRanges {
if subnetIPRange.CIDR != "" {
_, IPNet, _ := net.ParseCIDR(subnetIPRange.CIDR)
if IPNet.Contains(podIP) {
Expand All @@ -192,14 +426,9 @@ func checkIPPoolAllocation(tb testing.TB, data *TestData, IPPoolName, podIPStrin
if !isBelongTo {
return
}
for _, IPAddress := range IPPool.Status.IPAddresses {
for _, IPAddress := range ipPool.Status.IPAddresses {
if podIP.Equal(net.ParseIP(IPAddress.IPAddress)) {
switch IPAddress.Phase {
case crdv1alpha2.IPAddressPhaseAllocated:
isAllocated = true
case crdv1alpha2.IPAddressPhasePreallocated:
isPreallocated = true
}
ipAddressState = IPAddress.DeepCopy()
return
}
}
Expand Down
5 changes: 3 additions & 2 deletions test/e2e/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"antrea.io/antrea/pkg/agent/apiserver/handlers/podinterface"
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
crdv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
"antrea.io/antrea/pkg/clusteridentity"
)

Expand Down Expand Up @@ -186,11 +187,11 @@ func (data *TestData) testDeletePod(t *testing.T, podName string, nodeName strin
}
if namespace == testAntreaIPAMNamespace {
doesIPAllocationExist = func(podIP string) bool {
_, isAllocated, _, err := checkIPPoolAllocation(t, data, "test-ippool-ipv4-0", podIP)
_, ipAddressState, err := checkIPPoolAllocation(t, data, "test-ippool-ipv4-0", podIP)
if err != nil {
t.Fatalf("Cannot check IPPool allocation: %v", err)
}
return err == nil && isAllocated
return err == nil && ipAddressState != nil && ipAddressState.Phase == crdv1alpha2.IPAddressPhaseAllocated
}
}

Expand Down
Loading

0 comments on commit ab7ab60

Please sign in to comment.