Skip to content

Commit

Permalink
move enableEcmp to subnet
Browse files Browse the repository at this point in the history
  • Loading branch information
hongzhen-ma committed Feb 2, 2023
1 parent 2b38340 commit 0eb08e0
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 37 deletions.
2 changes: 2 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,8 @@ spec:
type: boolean
enableLb:
type: boolean
enableEcmp:
type: boolean
scope: Cluster
names:
plural: subnets
Expand Down
2 changes: 2 additions & 0 deletions kubeovn-helm/templates/kube-ovn-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,8 @@ spec:
type: boolean
enableLb:
type: boolean
enableEcmp:
type: boolean
scope: Cluster
names:
plural: subnets
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type SubnetSpec struct {

U2OInterconnection bool `json:"u2oInterconnection,omitempty"`
EnableLb *bool `json:"enableLb,omitempty"`
EnableEcmp bool `json:"enableEcmp,omitempty"`
}

type Acl struct {
Expand Down
11 changes: 4 additions & 7 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ func (c *Controller) handleAddNode(key string) error {
return err
}

klog.Infof("add policy route for centrailized subnet %s, on node %s, ip %s", subnet.Name, node.Name, ipStr)
if err := c.addPolicyRouteForCentralizedSubnetOnNode(node.Name, ipStr); err != nil {
klog.Errorf("failed to add policy route for node %s, %v", key, err)
return err
Expand Down Expand Up @@ -713,9 +712,6 @@ func (c *Controller) CheckGatewayReady() {
}

func (c *Controller) checkGatewayReady() error {
if !c.config.EnableEcmp {
return nil
}
klog.V(3).Infoln("start to check gateway status")
subnetList, err := c.subnetsLister.List(labels.Everything())
if err != nil {
Expand All @@ -731,7 +727,8 @@ func (c *Controller) checkGatewayReady() error {
for _, subnet := range subnetList {
if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) ||
subnet.Spec.GatewayNode == "" ||
subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType ||
!subnet.Spec.EnableEcmp {
continue
}

Expand Down Expand Up @@ -1134,7 +1131,7 @@ func (c *Controller) deletePolicyRouteForNode(nodeName string) error {
}

if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
if c.config.EnableEcmp {
if subnet.Spec.EnableEcmp {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
if err != nil {
Expand Down Expand Up @@ -1190,7 +1187,7 @@ func (c *Controller) addPolicyRouteForCentralizedSubnetOnNode(nodeName, nodeIP s
continue
}

if c.config.EnableEcmp {
if subnet.Spec.EnableEcmp {
if !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) {
continue
}
Expand Down
57 changes: 32 additions & 25 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (c *Controller) enqueueUpdateSubnet(old, new interface{}) {
oldSubnet.Spec.Protocol != newSubnet.Spec.Protocol ||
(oldSubnet.Spec.EnableLb != nil && newSubnet.Spec.EnableLb == nil) ||
(oldSubnet.Spec.EnableLb != nil && newSubnet.Spec.EnableLb != nil && *oldSubnet.Spec.EnableLb != *newSubnet.Spec.EnableLb) ||
oldSubnet.Spec.EnableEcmp != newSubnet.Spec.EnableEcmp ||
!reflect.DeepEqual(oldSubnet.Spec.Acls, newSubnet.Spec.Acls) ||
oldSubnet.Spec.U2OInterconnection != newSubnet.Spec.U2OInterconnection {
klog.V(3).Infof("enqueue update subnet %s", key)
Expand Down Expand Up @@ -1274,13 +1275,29 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
return fmt.Errorf("failed to add ecmp policy route, no gateway node exists")
}

if c.config.EnableEcmp {
if subnet.Spec.EnableEcmp {
// 1. Default value of subnet.Spec.EnableEcmp is false, so the field subnet.Status.ActivateGateway has value when centralized subnet is created
// 2. Change subnet.Spec.EnableEcmp from false to true, ecmp route is added based on gatewayNode, not ActivateGateway
// 3. Change subnet.Spec.EnableEcmp from true to false, the ActivateGateway still works and ecmp route does not update, which is incorrect
// 4. So delete ActivateGateway field when ecmp is enabled, and when value changed, the policy route will be updated correctly
if subnet.Status.ActivateGateway != "" {
subnet.Status.ActivateGateway = ""
bytes, err := subnet.Status.Bytes()
if err != nil {
return err
}
if _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("failed to patch for removing subnet activeGateway of subnet %s", subnet.Name)
return err
}
}

// centralized subnet, enable ecmp, add ecmp policy route
gatewayNodes := strings.Split(subnet.Spec.GatewayNode, ",")
nodeV4Ips := make([]string, 0, len(gatewayNodes))
nodeV6Ips := make([]string, 0, len(gatewayNodes))
nameV4IpMap := make(map[string]string, len(gatewayNodes)*2)
nameV6IpMap := make(map[string]string, len(gatewayNodes)*2)
nameV4IpMap := make(map[string]string, len(gatewayNodes))
nameV6IpMap := make(map[string]string, len(gatewayNodes))
for _, gw := range gatewayNodes {
// the format of gatewayNodeStr can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3', which consists of node name and designative egress ip
if strings.Contains(gw, ":") {
Expand Down Expand Up @@ -1328,7 +1345,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err)
return err
}
klog.Infof("subnet %s configure gateway node, nodeIPs %v", subnet.Name, nodeV4Ips)
klog.Infof("subnet %s configure ecmp policy route, nexthops %v", subnet.Name, nodeV4Ips)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v4Cidr, nodeV4Ips, nameV4IpMap); err != nil {
klog.Errorf("failed to add v4 ecmp policy route for centralized subnet %s: %v", subnet.Name, err)
return err
Expand All @@ -1348,8 +1365,8 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err)
return err
}
klog.Infof("subnet %s configure gateway node, nodeIPs %v", subnet.Name, nodeV6Ips)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v6Cidr, nodeV6Ips, nameV4IpMap); err != nil {
klog.Infof("subnet %s configure ecmp policy route, nexthops %v", subnet.Name, nodeV6Ips)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v6Cidr, nodeV6Ips, nameV6IpMap); err != nil {
klog.Errorf("failed to add v6 ecmp policy route for centralized subnet %s: %v", subnet.Name, err)
return err
}
Expand Down Expand Up @@ -1404,7 +1421,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
}

nextHop := getNextHopByTunnelIP(nodeTunlIPAddr)
klog.Infof("subnet %s configure new gateway node, nodeIPs %s", subnet.Name, nextHop)
klog.Infof("subnet %s configure new gateway node, nextHop %s", subnet.Name, nextHop)
if err = c.addPolicyRouteForCentralizedSubnet(subnet, newActivateNode, nil, strings.Split(nextHop, ",")); err != nil {
klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err)
return err
Expand Down Expand Up @@ -1473,8 +1490,6 @@ func (c *Controller) reconcileVlan(subnet *kubeovnv1.Subnet) error {
func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) error {

needCalcIP := false
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s ",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
if subnet.Spec.U2OInterconnection {
if subnet.Status.U2OInterconnectionIP == "" {
u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
Expand Down Expand Up @@ -1517,6 +1532,8 @@ func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) err
}

if needCalcIP {
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s ",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
if subnet.Spec.Protocol == kubeovnv1.ProtocolDual {
if err := calcDualSubnetStatusIP(subnet, c); err != nil {
return err
Expand Down Expand Up @@ -1893,12 +1910,8 @@ func (c *Controller) updatePolicyRouteForCentralizedSubnet(subnetName, cidr stri
}
match := fmt.Sprintf("%s.src == %s", ipSuffix, cidr)

// there's no way to update policy route when activeGateway changed for subnet, so delete and readd policy route
klog.Infof("delete policy route for router: %s, priority: %d, match %s", c.config.ClusterRouter, util.GatewayRouterPolicyPriority, match)
if err := c.ovnLegacyClient.DeletePolicyRoute(c.config.ClusterRouter, util.GatewayRouterPolicyPriority, match); err != nil {
klog.Errorf("failed to delete policy route for centralized subnet %s: %v", subnetName, err)
return err
}
// there's no way to update policy route when gatewayNode changed for subnet, so delete and readd policy route
// The delete operation is processed in AddPolicyRoute if the policy route is inconsistent, so no need delete here

nextHopIp := strings.Join(nextHops, ",")
externalIDs := map[string]string{
Expand All @@ -1910,7 +1923,7 @@ func (c *Controller) updatePolicyRouteForCentralizedSubnet(subnetName, cidr stri
for node, ip := range nameIpMap {
externalIDs[node] = ip
}
klog.Infof("add ecmp policy route for router: %s, match %s, action %s, nexthop %s, extrenalID %s", c.config.ClusterRouter, match, "allow", "", externalIDs)
klog.Infof("add ecmp policy route for router: %s, match %s, action %s, nexthop %s, extrenalID %s", c.config.ClusterRouter, match, "allow", nextHopIp, externalIDs)
if err := c.ovnLegacyClient.AddPolicyRoute(c.config.ClusterRouter, util.GatewayRouterPolicyPriority, match, "reroute", nextHopIp, externalIDs); err != nil {
klog.Errorf("failed to add policy route for centralized subnet %s: %v", subnetName, err)
return err
Expand All @@ -1925,14 +1938,8 @@ func (c *Controller) addPolicyRouteForCentralizedSubnet(subnet *kubeovnv1.Subnet
if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nodeIP) {
continue
}
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock, nodeIP, util.GatewayRouterPolicyPriority)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}
if exist {
continue
}
// Check for repeat policy route is processed in AddPolicyRoute

var nextHops []string
nameIpMap := map[string]string{}
nextHops = append(nextHops, nodeIP)
Expand Down Expand Up @@ -2062,7 +2069,7 @@ func (c *Controller) deletePolicyRouteByGatewayType(subnet *kubeovnv1.Subnet, ga
klog.Errorf("failed to delete port group for subnet %s and node %s, %v", subnet.Name, node.Name, err)
return err
}
klog.Infof("delete policy route for distributed subnet %s, node %s", subnet.Name, node.Name)

if err = c.deletePolicyRouteForDistributedSubnet(subnet, node.Name); err != nil {
klog.Errorf("failed to delete policy route for subnet %s and node %s, %v", subnet.Name, node.Name, err)
return err
Expand Down
14 changes: 9 additions & 5 deletions pkg/ovs/ovn-nbctl-legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"regexp"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -2522,6 +2523,7 @@ func (c *LegacyClient) VpcHasPolicyRoute(vpc string, nextHops []string, priority
routePriority := result[0]
strNodeIPs := result[1]
nodeIPs := strings.Fields(strNodeIPs)
sort.Strings(nodeIPs)
if routePriority == strPriority && slices.Equal(nextHops, nodeIPs) {
// make sure priority, nexthops is just the same
return true, nil
Expand Down Expand Up @@ -2616,15 +2618,17 @@ func (c LegacyClient) CheckPolicyRouteNexthopConsistent(router, match, nexthop s
return false, nil
}

nextHops, _, err := c.GetPolicyRouteParas(priority, match)
dbNextHops, _, err := c.GetPolicyRouteParas(priority, match)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
return false, err
}
for _, next := range nextHops {
if next == nexthop {
return true, nil
}
cfgNextHops := strings.Split(nexthop, ",")

sort.Strings(dbNextHops)
sort.Strings(cfgNextHops)
if slices.Equal(dbNextHops, cfgNextHops) {
return true, nil
}
return false, nil
}
Expand Down
63 changes: 63 additions & 0 deletions test/e2e/kube-ovn/subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,69 @@ var _ = framework.Describe("[group:subnet]", func() {
}
})

framework.ConformanceIt("create centralized subnet without enableEcmp", func() {
ginkgo.By("Getting nodes")
nodes, err := e2enode.GetReadySchedulableNodes(cs)
framework.ExpectNoError(err)
framework.ExpectNotEmpty(nodes.Items)

ginkgo.By("Creating subnet " + subnetName)
gatewayNodes := make([]string, 0, len(nodes.Items))
nodeIPs := make([]string, 0, len(nodes.Items))
for i := 0; i < 3 && i < len(nodes.Items); i++ {
gatewayNodes = append(gatewayNodes, nodes.Items[i].Name)
nodeIPs = append(nodeIPs, nodes.Items[i].Annotations[util.IpAddressAnnotation])
}
subnet = framework.MakeSubnet(subnetName, "", cidr, "", nil, gatewayNodes, nil)
subnet = subnetClient.CreateSync(subnet)

ginkgo.By("Validating subnet finalizers")
framework.ExpectContainElement(subnet.Finalizers, util.ControllerName)

ginkgo.By("Validating centralized subnet with active-standby mode")
framework.ExpectFalse(subnet.Spec.EnableEcmp)
framework.ExpectEqual(subnet.Status.ActivateGateway, gatewayNodes[0])
framework.ExpectConsistOf(strings.Split(subnet.Spec.GatewayNode, ","), gatewayNodes)

ginkgo.By("change subnet spec field enableEcmp to true")
modifiedSubnet := subnet.DeepCopy()
modifiedSubnet.Spec.EnableEcmp = true
subnet = subnetClient.PatchSync(subnet, modifiedSubnet)
framework.ExpectEmpty(subnet.Status.ActivateGateway)
time.Sleep(2 * time.Second)

execCmd := "kubectl ko nbctl --format=csv --data=bare --no-heading --columns=nexthops find logical-router-policy " + fmt.Sprintf("external_ids:subnet=%s", subnetName)
output, err := exec.Command("bash", "-c", execCmd).CombinedOutput()
framework.ExpectNoError(err)

lines := strings.Split(string(output), "\n")
nextHops := make([]string, 0, len(lines))
for _, l := range lines {
if len(strings.TrimSpace(l)) == 0 {
continue
}
nextHops = strings.Fields(l)
}
framework.Logf("subnet policy route nextHops %v, gatewayNode IPs %v", nextHops, nodeIPs)

check := true
if len(nextHops) < len(nodeIPs) {
framework.Logf("some gateway nodes maybe not ready for subnet %s", subnetName)
check = false
}

if check {
for _, nodeIP := range nodeIPs {
for _, strIP := range strings.Split(nodeIP, ",") {
if util.CheckProtocol(strIP) != util.CheckProtocol(nextHops[0]) {
continue
}
framework.ExpectContainElement(nextHops, strIP)
}
}
}
})

framework.ConformanceIt("should support distributed external egress gateway", func() {
ginkgo.By("Getting nodes")
nodes, err := e2enode.GetReadySchedulableNodes(cs)
Expand Down
2 changes: 2 additions & 0 deletions yamls/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,8 @@ spec:
type: boolean
enableLb:
type: boolean
enableEcmp:
type: boolean
scope: Cluster
names:
plural: subnets
Expand Down

0 comments on commit 0eb08e0

Please sign in to comment.