Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kube-ovn-speaker support IPv6/Dual #2455

Merged
merged 3 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 56 additions & 43 deletions pkg/speaker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Configuration struct {
ClusterAs uint32
RouterId string
NeighborAddress string
NeighborIPv6Address string
NeighborAs uint32
AuthPassword string
HoldTime float64
Expand Down Expand Up @@ -69,6 +70,7 @@ func ParseFlags() (*Configuration, error) {
argClusterAs = pflag.Uint32("cluster-as", DefaultBGPClusterAs, "The as number of container network, default 65000")
argRouterId = pflag.String("router-id", "", "The address for the speaker to use as router id, default the node ip")
argNeighborAddress = pflag.String("neighbor-address", "", "The router address the speaker connects to.")
argNeighborIPv6Address = pflag.String("neighbor-ipv6-address", "", "The router address the speaker connects to.")
argNeighborAs = pflag.Uint32("neighbor-as", DefaultBGPNeighborAs, "The router as number, default 65001")
argAuthPassword = pflag.String("auth-password", "", "bgp peer auth password")
argHoldTime = pflag.Duration("holdtime", DefaultBGPHoldtime, "ovn-speaker goes down abnormally, the local saving time of BGP route will be affected.Holdtime must be in the range 3s to 65536s. (default 90s)")
Expand Down Expand Up @@ -102,7 +104,7 @@ func ParseFlags() (*Configuration, error) {
if *argRouterId != "" && net.ParseIP(*argRouterId) == nil {
return nil, fmt.Errorf("invalid router-id format: %s", *argRouterId)
}
if net.ParseIP(*argNeighborAddress) == nil {
if *argNeighborAddress != "" && net.ParseIP(*argNeighborAddress) == nil {
return nil, fmt.Errorf("invalid neighbor-address format: %s", *argNeighborAddress)
}

Expand All @@ -113,6 +115,7 @@ func ParseFlags() (*Configuration, error) {
ClusterAs: *argClusterAs,
RouterId: *argRouterId,
NeighborAddress: *argNeighborAddress,
NeighborIPv6Address: *argNeighborIPv6Address,
NeighborAs: *argNeighborAs,
AuthPassword: *argAuthPassword,
HoldTime: ht,
Expand Down Expand Up @@ -194,13 +197,21 @@ func (config *Configuration) checkGracefulRestartOptions() error {

func (config *Configuration) initBgpServer() error {
maxSize := 256 << 20
peersMap := make(map[api.Family_Afi]string)
var listenPort int32 = -1
grpcOpts := []grpc.ServerOption{grpc.MaxRecvMsgSize(maxSize), grpc.MaxSendMsgSize(maxSize)}
s := gobgp.NewBgpServer(
gobgp.GrpcListenAddress(fmt.Sprintf("%s:%d", config.GrpcHost, config.GrpcPort)),
gobgp.GrpcOption(grpcOpts))
go s.Serve()

if config.NeighborAddress != "" {
peersMap[api.Family_AFI_IP] = config.NeighborAddress
}
if config.NeighborIPv6Address != "" {
peersMap[api.Family_AFI_IP6] = config.NeighborIPv6Address
}

if config.PassiveMode {
listenPort = bgp.BGP_PORT
}
Expand All @@ -214,57 +225,59 @@ func (config *Configuration) initBgpServer() error {
}); err != nil {
return err
}

peer := &api.Peer{
Timers: &api.Timers{Config: &api.TimersConfig{HoldTime: uint64(config.HoldTime)}},
Conf: &api.PeerConf{
NeighborAddress: config.NeighborAddress,
PeerAsn: config.NeighborAs,
},
Transport: &api.Transport{
PassiveMode: config.PassiveMode,
},
}
if config.EbgpMultihopTtl != DefaultEbgpMultiHop {
peer.EbgpMultihop = &api.EbgpMultihop{
Enabled: true,
MultihopTtl: uint32(config.EbgpMultihopTtl),
for ipFamily, address := range peersMap {
peer := &api.Peer{
Timers: &api.Timers{Config: &api.TimersConfig{HoldTime: uint64(config.HoldTime)}},
Conf: &api.PeerConf{
NeighborAddress: address,
PeerAsn: config.NeighborAs,
},
Transport: &api.Transport{
PassiveMode: config.PassiveMode,
},
}
}
if config.AuthPassword != "" {
peer.Conf.AuthPassword = config.AuthPassword
}
if config.GracefulRestart {

if err := config.checkGracefulRestartOptions(); err != nil {
return err
if config.EbgpMultihopTtl != DefaultEbgpMultiHop {
peer.EbgpMultihop = &api.EbgpMultihop{
Enabled: true,
MultihopTtl: uint32(config.EbgpMultihopTtl),
}
}
peer.GracefulRestart = &api.GracefulRestart{
Enabled: true,
RestartTime: uint32(config.GracefulRestartTime.Seconds()),
DeferralTime: uint32(config.GracefulRestartDeferralTime.Seconds()),
LocalRestarting: true,
if config.AuthPassword != "" {
peer.Conf.AuthPassword = config.AuthPassword
}
peer.AfiSafis = []*api.AfiSafi{
{
Config: &api.AfiSafiConfig{
Family: &api.Family{Afi: api.Family_AFI_IP, Safi: api.Family_SAFI_UNICAST},
Enabled: true,
},
MpGracefulRestart: &api.MpGracefulRestart{
Config: &api.MpGracefulRestartConfig{
if config.GracefulRestart {

if err := config.checkGracefulRestartOptions(); err != nil {
return err
}
peer.GracefulRestart = &api.GracefulRestart{
Enabled: true,
RestartTime: uint32(config.GracefulRestartTime.Seconds()),
DeferralTime: uint32(config.GracefulRestartDeferralTime.Seconds()),
LocalRestarting: true,
}
peer.AfiSafis = []*api.AfiSafi{
{
Config: &api.AfiSafiConfig{
Family: &api.Family{Afi: ipFamily, Safi: api.Family_SAFI_UNICAST},
Enabled: true,
},
MpGracefulRestart: &api.MpGracefulRestart{
Config: &api.MpGracefulRestartConfig{
Enabled: true,
},
},
},
},
}
}
}

if err := s.AddPeer(context.Background(), &api.AddPeerRequest{
Peer: peer,
}); err != nil {
return err
if err := s.AddPeer(context.Background(), &api.AddPeerRequest{
Peer: peer,
}); err != nil {
return err
}
}

config.BgpServer = s
return nil
}
113 changes: 90 additions & 23 deletions pkg/speaker/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package speaker
import (
"context"
"fmt"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"

"golang.org/x/sys/unix"
"net"
"strconv"
Expand Down Expand Up @@ -44,7 +47,9 @@ func isClusterIPService(svc *v1.Service) bool {

// TODO: ipv4 only, need ipv6/dual-stack support later
func (c *Controller) syncSubnetRoutes() {
bgpExpected, bgpExists := []string{}, []string{}
maskMap := map[string]int{kubeovnv1.ProtocolIPv4: 32, kubeovnv1.ProtocolIPv6: 128}
bgpExpected, bgpExists := make(map[string][]string), make(map[string][]string)

subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets, %v", err)
Expand All @@ -64,24 +69,36 @@ func (c *Controller) syncSubnetRoutes() {
}
for _, svc := range services {
if svc.Annotations != nil && svc.Annotations[util.BgpAnnotation] == "true" && isClusterIPService(svc) {
bgpExpected = append(bgpExpected, fmt.Sprintf("%s/32", svc.Spec.ClusterIP))
for _, clusterIp := range svc.Spec.ClusterIPs {
ipFamily := util.CheckProtocol(clusterIp)
bgpExpected[ipFamily] = append(bgpExpected[ipFamily], fmt.Sprintf("%s/%d", clusterIp, maskMap[ipFamily]))
}
//bgpExpected = append(bgpExpected, fmt.Sprintf("%s/32", svc.Spec.ClusterIP))
}
}
}

for _, subnet := range subnets {
if subnet.Status.IsReady() && subnet.Annotations != nil && subnet.Annotations[util.BgpAnnotation] == "true" {
bgpExpected = append(bgpExpected, subnet.Spec.CIDRBlock)
ips := strings.Split(subnet.Spec.CIDRBlock, ",")
for _, cidr := range ips {
ipFamily := util.CheckProtocol(cidr)
bgpExpected[ipFamily] = append(bgpExpected[ipFamily], cidr)
}
}
}

for _, pod := range pods {
if isPodAlive(pod) && !pod.Spec.HostNetwork && pod.Annotations[util.BgpAnnotation] == "true" && pod.Status.PodIP != "" {
bgpExpected = append(bgpExpected, fmt.Sprintf("%s/32", pod.Status.PodIP))
podIps := pod.Status.PodIPs
for _, podIp := range podIps {
ipFamily := util.CheckProtocol(podIp.IP)
bgpExpected[ipFamily] = append(bgpExpected[ipFamily], fmt.Sprintf("%s/%d", podIp.IP, maskMap[ipFamily]))
}
}
}

klog.V(5).Infof("expected routes %v", bgpExpected)
klog.V(5).Infof("expected ipv4 routes %v,ipv6 route %v", bgpExpected[kubeovnv1.ProtocolIPv4], bgpExpected[kubeovnv1.ProtocolIPv6])
listPathRequest := &bgpapi.ListPathRequest{
TableType: bgpapi.TableType_GLOBAL,
Family: &bgpapi.Family{Afi: bgpapi.Family_AFI_IP, Safi: bgpapi.Family_SAFI_UNICAST},
Expand All @@ -91,31 +108,66 @@ func (c *Controller) syncSubnetRoutes() {
attrInterfaces, _ := bgpapiutil.UnmarshalPathAttributes(path.Pattrs)
nextHop := getNextHopFromPathAttributes(attrInterfaces)
klog.V(5).Infof("nexthop is %s, routerID is %s", nextHop.String(), c.config.RouterId)
ipFamily := util.CheckProtocol(nextHop.String())
route, _ := netlink.RouteGet(nextHop)
if len(route) == 1 && route[0].Type == unix.RTN_LOCAL || nextHop.String() == c.config.RouterId {
bgpExists = append(bgpExists, d.Prefix)
bgpExists[ipFamily] = append(bgpExists[ipFamily], d.Prefix)
return
}
}
}
if err := c.config.BgpServer.ListPath(context.Background(), listPathRequest, fn); err != nil {
klog.Errorf("failed to list exist route, %v", err)
return
}

klog.V(5).Infof("exists routes %v", bgpExists)
toAdd, toDel := routeDiff(bgpExpected, bgpExists)
klog.V(5).Infof("toAdd routes %v", toAdd)
for _, route := range toAdd {
if err := c.addRoute(route); err != nil {
klog.Error(err)
if c.config.NeighborAddress != "" {
if err := c.config.BgpServer.ListPath(context.Background(), listPathRequest, fn); err != nil {
klog.Errorf("failed to list exist route, %v", err)
return
}

klog.V(5).Infof("exists ipv4 routes %v", bgpExists[kubeovnv1.ProtocolIPv4])
toAdd, toDel := routeDiff(bgpExpected[kubeovnv1.ProtocolIPv4], bgpExists[kubeovnv1.ProtocolIPv4])
klog.V(5).Infof("toAdd ipv4 routes %v", toAdd)
for _, route := range toAdd {
if err := c.addRoute(route); err != nil {
klog.Error(err)
}
}

klog.V(5).Infof("toDel ipv4 routes %v", toDel)
for _, route := range toDel {
if err := c.delRoute(route); err != nil {
klog.Error(err)
}
}
}
klog.V(5).Infof("toDel routes %v", toDel)
for _, route := range toDel {
if err := c.delRoute(route); err != nil {
klog.Error(err)

if c.config.NeighborIPv6Address != "" {

listIPv6PathRequest := &bgpapi.ListPathRequest{
TableType: bgpapi.TableType_GLOBAL,
Family: &bgpapi.Family{Afi: bgpapi.Family_AFI_IP6, Safi: bgpapi.Family_SAFI_UNICAST},
}

if err := c.config.BgpServer.ListPath(context.Background(), listIPv6PathRequest, fn); err != nil {
klog.Errorf("failed to list exist route, %v", err)
return
}

klog.V(5).Infof("exists ipv6 routes %v", bgpExists[kubeovnv1.ProtocolIPv6])
toAdd, toDel := routeDiff(bgpExpected[kubeovnv1.ProtocolIPv6], bgpExists[kubeovnv1.ProtocolIPv6])
klog.V(5).Infof("toAdd ipv6 routes %v", toAdd)

for _, route := range toAdd {
if err := c.addRoute(route); err != nil {
klog.Error(err)
}
}
klog.V(5).Infof("toDel ipv6 routes %v", toDel)
for _, route := range toDel {
if err := c.delRoute(route); err != nil {
klog.Error(err)
}
}

}
}

Expand Down Expand Up @@ -158,13 +210,18 @@ func parseRoute(route string) (string, uint32, error) {
}

func (c *Controller) addRoute(route string) error {
routeAfi := bgpapi.Family_AFI_IP
if util.CheckProtocol(route) == kubeovnv1.ProtocolIPv6 {
routeAfi = bgpapi.Family_AFI_IP6
}

nlri, attrs, err := c.getNlriAndAttrs(route)
if err != nil {
return err
}
_, err = c.config.BgpServer.AddPath(context.Background(), &bgpapi.AddPathRequest{
Path: &bgpapi.Path{
Family: &bgpapi.Family{Afi: bgpapi.Family_AFI_IP, Safi: bgpapi.Family_SAFI_UNICAST},
Family: &bgpapi.Family{Afi: routeAfi, Safi: bgpapi.Family_SAFI_UNICAST},
Nlri: nlri,
Pattrs: attrs,
},
Expand All @@ -177,6 +234,11 @@ func (c *Controller) addRoute(route string) error {
}

func (c *Controller) getNlriAndAttrs(route string) (*anypb.Any, []*anypb.Any, error) {
neighborAddr := c.config.NeighborAddress
if util.CheckProtocol(route) == kubeovnv1.ProtocolIPv6 {
neighborAddr = c.config.NeighborIPv6Address
}

prefix, prefixLen, err := parseRoute(route)
if err != nil {
return nil, nil, err
Expand All @@ -189,20 +251,25 @@ func (c *Controller) getNlriAndAttrs(route string) (*anypb.Any, []*anypb.Any, er
Origin: 0,
})
a2, _ := anypb.New(&bgpapi.NextHopAttribute{
NextHop: getNextHopAttribute(c.config.NeighborAddress, c.config.RouterId),
NextHop: getNextHopAttribute(neighborAddr, c.config.RouterId),
})
attrs := []*anypb.Any{a1, a2}
return nlri, attrs, err
}

func (c *Controller) delRoute(route string) error {
routeAfi := bgpapi.Family_AFI_IP
if util.CheckProtocol(route) == kubeovnv1.ProtocolIPv6 {
routeAfi = bgpapi.Family_AFI_IP6
}

nlri, attrs, err := c.getNlriAndAttrs(route)
if err != nil {
return err
}
err = c.config.BgpServer.DeletePath(context.Background(), &bgpapi.DeletePathRequest{
Path: &bgpapi.Path{
Family: &bgpapi.Family{Afi: bgpapi.Family_AFI_IP, Safi: bgpapi.Family_SAFI_UNICAST},
Family: &bgpapi.Family{Afi: routeAfi, Safi: bgpapi.Family_SAFI_UNICAST},
Nlri: nlri,
Pattrs: attrs,
},
Expand Down