Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move enableEcmp to subnet #2284

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,8 @@ spec:
type: boolean
enableLb:
type: boolean
enableEcmp:
hongzhen-ma marked this conversation as resolved.
Show resolved Hide resolved
type: boolean
scope: Cluster
names:
plural: subnets
Expand Down
2 changes: 2 additions & 0 deletions kubeovn-helm/templates/kube-ovn-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,8 @@ spec:
type: boolean
enableLb:
type: boolean
enableEcmp:
type: boolean
scope: Cluster
names:
plural: subnets
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type SubnetSpec struct {

U2OInterconnection bool `json:"u2oInterconnection,omitempty"`
EnableLb *bool `json:"enableLb,omitempty"`
EnableEcmp bool `json:"enableEcmp,omitempty"`
}

type Acl struct {
Expand Down
15 changes: 15 additions & 0 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,21 @@ func (c *Controller) initSyncCrdSubnets() error {
klog.Errorf("failed to calculate subnet %s used ip: %v", subnet.Name, err)
return err
}

// only sync subnet spec enableEcmp when subnet.Spec.EnableEcmp is false and c.config.EnableEcmp is true
if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType && !subnet.Spec.EnableEcmp && subnet.Spec.EnableEcmp != c.config.EnableEcmp {
subnet, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), subnet.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("failed to get subnet %s: %v", subnet.Name, err)
return err
}

subnet.Spec.EnableEcmp = c.config.EnableEcmp
if _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Update(context.Background(), subnet, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to sync subnet spec enableEcmp with kube-ovn-controller config enableEcmp %s: %v", subnet.Name, err)
return err
}
}
}
return nil
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ func (c *Controller) handleAddNode(key string) error {
return err
}

klog.Infof("add policy route for centrailized subnet %s, on node %s, ip %s", subnet.Name, node.Name, ipStr)
if err := c.addPolicyRouteForCentralizedSubnetOnNode(node.Name, ipStr); err != nil {
klog.Errorf("failed to add policy route for node %s, %v", key, err)
return err
Expand Down Expand Up @@ -713,9 +712,6 @@ func (c *Controller) CheckGatewayReady() {
}

func (c *Controller) checkGatewayReady() error {
if !c.config.EnableEcmp {
return nil
}
klog.V(3).Infoln("start to check gateway status")
subnetList, err := c.subnetsLister.List(labels.Everything())
if err != nil {
Expand All @@ -731,7 +727,8 @@ func (c *Controller) checkGatewayReady() error {
for _, subnet := range subnetList {
if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) ||
subnet.Spec.GatewayNode == "" ||
subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType ||
!subnet.Spec.EnableEcmp {
continue
}

Expand Down Expand Up @@ -1134,7 +1131,7 @@ func (c *Controller) deletePolicyRouteForNode(nodeName string) error {
}

if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
if c.config.EnableEcmp {
if subnet.Spec.EnableEcmp {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
if err != nil {
Expand Down Expand Up @@ -1190,7 +1187,7 @@ func (c *Controller) addPolicyRouteForCentralizedSubnetOnNode(nodeName, nodeIP s
continue
}

if c.config.EnableEcmp {
if subnet.Spec.EnableEcmp {
if !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) {
continue
}
Expand Down
57 changes: 32 additions & 25 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (c *Controller) enqueueUpdateSubnet(old, new interface{}) {
oldSubnet.Spec.Protocol != newSubnet.Spec.Protocol ||
(oldSubnet.Spec.EnableLb != nil && newSubnet.Spec.EnableLb == nil) ||
(oldSubnet.Spec.EnableLb != nil && newSubnet.Spec.EnableLb != nil && *oldSubnet.Spec.EnableLb != *newSubnet.Spec.EnableLb) ||
oldSubnet.Spec.EnableEcmp != newSubnet.Spec.EnableEcmp ||
!reflect.DeepEqual(oldSubnet.Spec.Acls, newSubnet.Spec.Acls) ||
oldSubnet.Spec.U2OInterconnection != newSubnet.Spec.U2OInterconnection {
klog.V(3).Infof("enqueue update subnet %s", key)
Expand Down Expand Up @@ -1274,13 +1275,29 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
return fmt.Errorf("failed to add ecmp policy route, no gateway node exists")
}

if c.config.EnableEcmp {
if subnet.Spec.EnableEcmp {
// 1. Default value of subnet.Spec.EnableEcmp is false, so the field subnet.Status.ActivateGateway has value when centralized subnet is created
// 2. Change subnet.Spec.EnableEcmp from false to true, ecmp route is added based on gatewayNode, not ActivateGateway
// 3. Change subnet.Spec.EnableEcmp from true to false, the ActivateGateway still works and ecmp route does not update, which is incorrect
// 4. So delete ActivateGateway field when ecmp is enabled, and when value changed, the policy route will be updated correctly
if subnet.Status.ActivateGateway != "" {
subnet.Status.ActivateGateway = ""
bytes, err := subnet.Status.Bytes()
if err != nil {
return err
}
if _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("failed to patch for removing subnet activeGateway of subnet %s", subnet.Name)
return err
}
}

// centralized subnet, enable ecmp, add ecmp policy route
gatewayNodes := strings.Split(subnet.Spec.GatewayNode, ",")
nodeV4Ips := make([]string, 0, len(gatewayNodes))
nodeV6Ips := make([]string, 0, len(gatewayNodes))
nameV4IpMap := make(map[string]string, len(gatewayNodes)*2)
nameV6IpMap := make(map[string]string, len(gatewayNodes)*2)
nameV4IpMap := make(map[string]string, len(gatewayNodes))
nameV6IpMap := make(map[string]string, len(gatewayNodes))
for _, gw := range gatewayNodes {
// the format of gatewayNodeStr can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3', which consists of node name and designative egress ip
if strings.Contains(gw, ":") {
Expand Down Expand Up @@ -1328,7 +1345,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err)
return err
}
klog.Infof("subnet %s configure gateway node, nodeIPs %v", subnet.Name, nodeV4Ips)
klog.Infof("subnet %s configure ecmp policy route, nexthops %v", subnet.Name, nodeV4Ips)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v4Cidr, nodeV4Ips, nameV4IpMap); err != nil {
klog.Errorf("failed to add v4 ecmp policy route for centralized subnet %s: %v", subnet.Name, err)
return err
Expand All @@ -1348,8 +1365,8 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err)
return err
}
klog.Infof("subnet %s configure gateway node, nodeIPs %v", subnet.Name, nodeV6Ips)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v6Cidr, nodeV6Ips, nameV4IpMap); err != nil {
klog.Infof("subnet %s configure ecmp policy route, nexthops %v", subnet.Name, nodeV6Ips)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v6Cidr, nodeV6Ips, nameV6IpMap); err != nil {
klog.Errorf("failed to add v6 ecmp policy route for centralized subnet %s: %v", subnet.Name, err)
return err
}
Expand Down Expand Up @@ -1404,7 +1421,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
}

nextHop := getNextHopByTunnelIP(nodeTunlIPAddr)
klog.Infof("subnet %s configure new gateway node, nodeIPs %s", subnet.Name, nextHop)
klog.Infof("subnet %s configure new gateway node, nextHop %s", subnet.Name, nextHop)
if err = c.addPolicyRouteForCentralizedSubnet(subnet, newActivateNode, nil, strings.Split(nextHop, ",")); err != nil {
klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err)
return err
Expand Down Expand Up @@ -1473,8 +1490,6 @@ func (c *Controller) reconcileVlan(subnet *kubeovnv1.Subnet) error {
func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) error {

needCalcIP := false
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s ",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
if subnet.Spec.U2OInterconnection {
if subnet.Status.U2OInterconnectionIP == "" {
u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
Expand Down Expand Up @@ -1517,6 +1532,8 @@ func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) err
}

if needCalcIP {
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s ",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
if subnet.Spec.Protocol == kubeovnv1.ProtocolDual {
if err := calcDualSubnetStatusIP(subnet, c); err != nil {
return err
Expand Down Expand Up @@ -1893,12 +1910,8 @@ func (c *Controller) updatePolicyRouteForCentralizedSubnet(subnetName, cidr stri
}
match := fmt.Sprintf("%s.src == %s", ipSuffix, cidr)

// there's no way to update policy route when activeGateway changed for subnet, so delete and readd policy route
klog.Infof("delete policy route for router: %s, priority: %d, match %s", c.config.ClusterRouter, util.GatewayRouterPolicyPriority, match)
if err := c.ovnLegacyClient.DeletePolicyRoute(c.config.ClusterRouter, util.GatewayRouterPolicyPriority, match); err != nil {
klog.Errorf("failed to delete policy route for centralized subnet %s: %v", subnetName, err)
return err
}
// there's no way to update policy route when gatewayNode changed for subnet, so delete and readd policy route
// The delete operation is processed in AddPolicyRoute if the policy route is inconsistent, so no need delete here

nextHopIp := strings.Join(nextHops, ",")
externalIDs := map[string]string{
Expand All @@ -1910,7 +1923,7 @@ func (c *Controller) updatePolicyRouteForCentralizedSubnet(subnetName, cidr stri
for node, ip := range nameIpMap {
externalIDs[node] = ip
}
klog.Infof("add ecmp policy route for router: %s, match %s, action %s, nexthop %s, extrenalID %s", c.config.ClusterRouter, match, "allow", "", externalIDs)
klog.Infof("add ecmp policy route for router: %s, match %s, action %s, nexthop %s, extrenalID %s", c.config.ClusterRouter, match, "allow", nextHopIp, externalIDs)
if err := c.ovnLegacyClient.AddPolicyRoute(c.config.ClusterRouter, util.GatewayRouterPolicyPriority, match, "reroute", nextHopIp, externalIDs); err != nil {
klog.Errorf("failed to add policy route for centralized subnet %s: %v", subnetName, err)
return err
Expand All @@ -1925,14 +1938,8 @@ func (c *Controller) addPolicyRouteForCentralizedSubnet(subnet *kubeovnv1.Subnet
if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nodeIP) {
continue
}
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock, nodeIP, util.GatewayRouterPolicyPriority)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}
if exist {
continue
}
// Check for repeat policy route is processed in AddPolicyRoute

var nextHops []string
nameIpMap := map[string]string{}
nextHops = append(nextHops, nodeIP)
Expand Down Expand Up @@ -2062,7 +2069,7 @@ func (c *Controller) deletePolicyRouteByGatewayType(subnet *kubeovnv1.Subnet, ga
klog.Errorf("failed to delete port group for subnet %s and node %s, %v", subnet.Name, node.Name, err)
return err
}
klog.Infof("delete policy route for distributed subnet %s, node %s", subnet.Name, node.Name)

if err = c.deletePolicyRouteForDistributedSubnet(subnet, node.Name); err != nil {
klog.Errorf("failed to delete policy route for subnet %s and node %s, %v", subnet.Name, node.Name, err)
return err
Expand Down
14 changes: 9 additions & 5 deletions pkg/ovs/ovn-nbctl-legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"regexp"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -2522,6 +2523,7 @@ func (c *LegacyClient) VpcHasPolicyRoute(vpc string, nextHops []string, priority
routePriority := result[0]
strNodeIPs := result[1]
nodeIPs := strings.Fields(strNodeIPs)
sort.Strings(nodeIPs)
if routePriority == strPriority && slices.Equal(nextHops, nodeIPs) {
// make sure priority, nexthops is just the same
return true, nil
Expand Down Expand Up @@ -2616,15 +2618,17 @@ func (c LegacyClient) CheckPolicyRouteNexthopConsistent(router, match, nexthop s
return false, nil
}

nextHops, _, err := c.GetPolicyRouteParas(priority, match)
dbNextHops, _, err := c.GetPolicyRouteParas(priority, match)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
return false, err
}
for _, next := range nextHops {
if next == nexthop {
return true, nil
}
cfgNextHops := strings.Split(nexthop, ",")

sort.Strings(dbNextHops)
sort.Strings(cfgNextHops)
if slices.Equal(dbNextHops, cfgNextHops) {
return true, nil
}
return false, nil
}
Expand Down
63 changes: 63 additions & 0 deletions test/e2e/kube-ovn/subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,69 @@ var _ = framework.Describe("[group:subnet]", func() {
}
})

framework.ConformanceIt("create centralized subnet without enableEcmp", func() {
ginkgo.By("Getting nodes")
nodes, err := e2enode.GetReadySchedulableNodes(cs)
framework.ExpectNoError(err)
framework.ExpectNotEmpty(nodes.Items)

ginkgo.By("Creating subnet " + subnetName)
gatewayNodes := make([]string, 0, len(nodes.Items))
nodeIPs := make([]string, 0, len(nodes.Items))
for i := 0; i < 3 && i < len(nodes.Items); i++ {
gatewayNodes = append(gatewayNodes, nodes.Items[i].Name)
nodeIPs = append(nodeIPs, nodes.Items[i].Annotations[util.IpAddressAnnotation])
}
subnet = framework.MakeSubnet(subnetName, "", cidr, "", nil, gatewayNodes, nil)
subnet = subnetClient.CreateSync(subnet)

ginkgo.By("Validating subnet finalizers")
framework.ExpectContainElement(subnet.Finalizers, util.ControllerName)

ginkgo.By("Validating centralized subnet with active-standby mode")
framework.ExpectFalse(subnet.Spec.EnableEcmp)
framework.ExpectEqual(subnet.Status.ActivateGateway, gatewayNodes[0])
framework.ExpectConsistOf(strings.Split(subnet.Spec.GatewayNode, ","), gatewayNodes)

ginkgo.By("change subnet spec field enableEcmp to true")
modifiedSubnet := subnet.DeepCopy()
modifiedSubnet.Spec.EnableEcmp = true
subnet = subnetClient.PatchSync(subnet, modifiedSubnet)
framework.ExpectEmpty(subnet.Status.ActivateGateway)
time.Sleep(2 * time.Second)

execCmd := "kubectl ko nbctl --format=csv --data=bare --no-heading --columns=nexthops find logical-router-policy " + fmt.Sprintf("external_ids:subnet=%s", subnetName)
output, err := exec.Command("bash", "-c", execCmd).CombinedOutput()
framework.ExpectNoError(err)

lines := strings.Split(string(output), "\n")
nextHops := make([]string, 0, len(lines))
for _, l := range lines {
if len(strings.TrimSpace(l)) == 0 {
continue
}
nextHops = strings.Fields(l)
}
framework.Logf("subnet policy route nextHops %v, gatewayNode IPs %v", nextHops, nodeIPs)

check := true
if len(nextHops) < len(nodeIPs) {
framework.Logf("some gateway nodes maybe not ready for subnet %s", subnetName)
check = false
}

if check {
for _, nodeIP := range nodeIPs {
for _, strIP := range strings.Split(nodeIP, ",") {
if util.CheckProtocol(strIP) != util.CheckProtocol(nextHops[0]) {
continue
}
framework.ExpectContainElement(nextHops, strIP)
}
}
}
})

framework.ConformanceIt("should support distributed external egress gateway", func() {
ginkgo.By("Getting nodes")
nodes, err := e2enode.GetReadySchedulableNodes(cs)
Expand Down
2 changes: 2 additions & 0 deletions yamls/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,8 @@ spec:
type: boolean
enableLb:
type: boolean
enableEcmp:
type: boolean
scope: Cluster
names:
plural: subnets
Expand Down