Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ceclinux committed Oct 24, 2021
1 parent 38222ef commit c0167c6
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 66 deletions.
9 changes: 8 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,14 @@ func run(o *Options) error {
}

if features.DefaultFeatureGate.Enabled(features.Multicast) {
mcastController := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore, o.config.HostGateway, o.config.TransportInterface)
err := multicast.SetOvsMulticast(o.config.OVSBridge)
if err != nil {
return err
}
mcastController, err := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore, o.config.HostGateway, o.config.TransportInterface)
if err != nil {
return fmt.Errorf("error creating multicast controller: %v", err)
}
go mcastController.Run(stopCh)
}

Expand Down
32 changes: 26 additions & 6 deletions pkg/agent/controller/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package multicast

import (
"context"
"fmt"
"net"
"os/exec"

"sync"
"time"

Expand Down Expand Up @@ -224,15 +227,32 @@ func (m *MulticastController) clearStaleGroups() {
}
}

func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, gateway string, transportInterface string) *MulticastController {
func SetOvsMulticast(ovsBridge string) error {
setSnoopingEnable := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "mcast_snooping_enable=true")
cmd := exec.Command("/bin/sh", "-c", setSnoopingEnable)
err := cmd.Run()
if err != nil {
return fmt.Errorf("error running %s: %v", setSnoopingEnable, err)
}
mcastSnoopingDisableFloodUnregistered := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "other_config:mcast-snooping-disable-flood-unregistered=true")
cmd = exec.Command("/bin/sh", "-c", mcastSnoopingDisableFloodUnregistered)
err = cmd.Run()
if err != nil {
return fmt.Errorf("error running %s: %v", mcastSnoopingDisableFloodUnregistered, err)
}
return nil
}

func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, gateway string, transportInterface string) (*MulticastController, error) {
multicastRouteClient, err := NewRouteClient(nodeConfig, gateway, transportInterface)
if err != nil {
return nil, fmt.Errorf("failed to initialize multicast route client %+v", err)
}
eventCh := make(chan mcastGroupEvent)
groupDetector := newDiscovery(ofClient, ifaceStore, eventCh)
groupCache := cache.NewIndexer(getGroupEventKey, nil)
ctx, cancelFunc := context.WithCancel(context.Background())
multicastRouteClient, err := NewRouteClient(nodeConfig, gateway, transportInterface)
if err != nil {
klog.Errorf("Failed to initialize multicast route client %+v", err)
}

return &MulticastController{
ofClient: ofClient,
nodeConfig: nodeConfig,
Expand All @@ -242,7 +262,7 @@ func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeCon
ctx: ctx,
cancelFunc: cancelFunc,
routeClient: multicastRouteClient,
}
}, nil
}

func getGroupEventKey(obj interface{}) (string, error) {
Expand Down
28 changes: 13 additions & 15 deletions pkg/agent/controller/multicast/multicast_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ const (
)

func NewRouteClient(transportNode *config.NodeConfig, gateway string, transportInterface string) (*RouteClient, error) {
klog.Infof(transportInterface)
fd, err := createMSocket()
fd, err := createMulticastSocket()
if err != nil {
klog.Infof(err.Error())
return nil, err
Expand All @@ -39,9 +38,6 @@ func NewRouteClient(transportNode *config.NodeConfig, gateway string, transportI
if err != nil {
return nil, fmt.Errorf("cannot create multicast udp socket")
}
vifMap := make(map[string]uint8)
vifMap[gateway] = 0
vifMap[transportInterface] = 1
vifAllocator := newVifAllocator(fd)
c := &RouteClient{
sockFD: fd,
Expand All @@ -52,8 +48,14 @@ func NewRouteClient(transportNode *config.NodeConfig, gateway string, transportI
transportInterface: transportInterface,
routeCache: cache.NewIndexer(getMulticastEntryKey, cache.Indexers{GroupNameIndexName: groupIndexFunc}),
}
vifAllocator.getAndSetVif(gateway)
vifAllocator.getAndSetVif(transportInterface)
_, err = vifAllocator.getAndSetVif(gateway)
if err != nil {
return nil, fmt.Errorf("failed to allocate vif for gateway")
}
_, err = vifAllocator.getAndSetVif(transportInterface)
if err != nil {
return nil, fmt.Errorf("failed to allocate vif for transportInterface")
}
return c, nil
}

Expand All @@ -67,7 +69,7 @@ type RouteClient struct {
routeCache cache.Indexer
}

func createMSocket() (int, error) {
func createMulticastSocket() (int, error) {
fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_IGMP)
if err != nil {
return -1, fmt.Errorf("cannot create multicast socket")
Expand Down Expand Up @@ -266,19 +268,15 @@ func (c *RouteClient) tryDeleteVif(name string) (err error) {
return
}
}
vif, err := c.vifAllocator.getAndSetVif(name)
if err != nil {
return err
}
return c.vifAllocator.delVif(uint16(vif))
return c.vifAllocator.release(name)
}

func (c *RouteClient) AddMrouteEntryByGroup(src net.IP, group net.IP) (err error) {
return c.AddMrouteEntry(c.gateway, src, group, []string{c.transportInterface})
}

func (c *RouteClient) AddMrouteEntry(iif string, src net.IP, group net.IP, oifs []string) (err error) {
klog.Infof("Adding mroute iif:%s, src:%s, group: %s, oifs: %v", iif, src, group, oifs)
klog.InfoS("Adding multicast route", "iif", iif, "src", src, "group", group, "oifs", oifs)
obj, ok, _ := c.routeCache.GetByKey(group.String() + "/" + src.String() + "/" + iif)
var mEntry *multicastRouteEntry
if ok {
Expand Down Expand Up @@ -331,7 +329,7 @@ func (c *RouteClient) AddMrouteEntry(iif string, src net.IP, group net.IP, oifs
}

func (c *RouteClient) DeleteGroup(group net.IP) (err error) {
klog.Infof("deleting group %s", group.String())
klog.V(2).Infof("Deleting group %s", group.String())

mEntries, err := c.routeCache.ByIndex(GroupNameIndexName, group.String())
if err != nil {
Expand Down
92 changes: 48 additions & 44 deletions pkg/agent/controller/multicast/vif_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package multicast

import (
"container/list"
"fmt"
"net"
"sync"
Expand All @@ -26,93 +27,96 @@ import (

type vifAllocator struct {
sync.Mutex
vifMap map[string]uint8
vifPool [MAXVIFS]bool
startIndex uint8
sockFD int
counter int
vifMap map[string]uint8
nextID uint8
maxID uint8
sockFD int
availableIDs *list.List
}

func newVifAllocator(sockFD int) *vifAllocator {
return &vifAllocator{
vifMap: make(map[string]uint8),
startIndex: 0,
sockFD: sockFD,
counter: 0,
availableIDs: list.New(),
vifMap: make(map[string]uint8),
nextID: 0,
maxID: 255,
sockFD: sockFD,
}
}

func (v *vifAllocator) allocateVif(name string) (uint8, error) {
if v.counter == MAXVIFS {
return 0, fmt.Errorf("cannot allocate vif because all %d vifs are allocated", MAXVIFS)
}
for v.vifPool[v.startIndex] {
v.startIndex = (v.startIndex + 1) % MAXVIFS
}
v.vifPool[v.startIndex] = true
l, err := netlink.LinkList()
if err != nil {
return 0, err
} else {
for _, link := range l {
if link.Attrs().Flags > net.FlagMulticast {
if link.Attrs().Name == name {
err = v.addVif(uint16(v.startIndex), link.Attrs().Index)
vif, err := v.allocate(link.Attrs().Index)
v.vifMap[name] = vif
vc := &vifctl{}
vc.vifcVifi = uint16(vif)
vc.vifcRateLimit = 0
vc.vifcFlags = 0
vc.vifcFlags |= VIFF_USE_IFINDEX
vc.vifcLclIfindex = link.Attrs().Index
setsockoptVifctl(v.sockFD, syscall.IPPROTO_IP, MRT_ADD_VIF, vc)
if err != nil {
klog.Infof("err add vif: ", err.Error())
return 0, err
} else {
klog.Infof("success add vif: %d", v.startIndex)
return uint8(v.startIndex), nil
klog.Infof("Success add vif: %d", vif)
return vif, nil
}
}
}
}
}
return 0, fmt.Errorf("add vif for %s failed: cannot find this interface", name)
return 0, fmt.Errorf("error adding vif for %s failed: cannot find this interface", name)
}

func (v *vifAllocator) getAndSetVif(name string) (uint8, error) {
klog.V(2).Infof("Get and set vif %s", name)
v.Lock()
defer v.Unlock()
vif, exist := v.vifMap[name]
if !exist {
v.Lock()
defer v.Unlock()
newvif, err := v.allocateVif(name)

if err != nil {
return 0, err
}
v.startIndex = newvif + uint8(1)
v.vifMap[name] = newvif
v.counter += 1
return newvif, nil
} else {
return vif, nil
}
}

func (v *vifAllocator) addVif(vif uint16, ifIdx int) (err error) {
vc := &vifctl{}
vc.vifcVifi = vif
vc.vifcRateLimit = 0
vc.vifcFlags = 0
vc.vifcFlags |= VIFF_USE_IFINDEX
vc.vifcLclIfindex = ifIdx
return setsockoptVifctl(v.sockFD, syscall.IPPROTO_IP, MRT_ADD_VIF, vc)
func (v *vifAllocator) allocate(ifIdx int) (uint8, error) {
klog.V(2).Infof("Allocating vif for ifIdx %d", ifIdx)
front := v.availableIDs.Front()
if front != nil {
return v.availableIDs.Remove(front).(uint8), nil
}
if v.nextID <= v.maxID {
allocated := v.nextID
v.nextID += 1
return allocated, nil
}
return 0, fmt.Errorf("no ID available")

}

func (v *vifAllocator) delVif(vif uint16) (err error) {
klog.Infof("deleting vif %d", vif)
func (v *vifAllocator) release(name string) (err error) {
klog.V(2).Infof("Release vif for name %s", name)
v.Lock()
defer v.Unlock()
v.vifPool[vif] = false
for k, vi := range v.vifMap {
if uint16(vi) == vif {
delete(v.vifMap, k)
}

vif, has := v.vifMap[name]
if !has {
return fmt.Errorf("vif for %s does not exist", name)
}
v.counter -= 1
delete(v.vifMap, name)
v.availableIDs.PushBack(vif)
vc := &vifctl{}
vc.vifcVifi = vif
vc.vifcVifi = uint16(vif)
return setsockoptVifctl(v.sockFD, syscall.IPPROTO_IP, MRT_DEL_VIF, vc)
}

0 comments on commit c0167c6

Please sign in to comment.