Skip to content

Commit

Permalink
Add Multicast route
Browse files Browse the repository at this point in the history
  • Loading branch information
ceclinux committed Nov 30, 2021
1 parent d23945f commit 6987f32
Show file tree
Hide file tree
Showing 10 changed files with 825 additions and 6 deletions.
5 changes: 4 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,10 @@ func run(o *Options) error {
}

if features.DefaultFeatureGate.Enabled(features.Multicast) {
mcastController := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore)
mcastController, err := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore, o.config.TransportInterface)
if err != nil {
return fmt.Errorf("error creating multicast controller: %s", err.Error())
}
go mcastController.Run(stopCh)
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/multicast"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/route"
Expand Down Expand Up @@ -185,6 +186,13 @@ func (i *Initializer) setupOVSBridge() error {
return err
}

if features.DefaultFeatureGate.Enabled(features.Multicast) {
err := multicast.SetOVSMulticast(i.ovsBridge)
if err != nil {
return err
}
}

return nil
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/agent/interfacestore/interface_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ func (c *interfaceCache) GetInterfacesByType(interfaceType InterfaceType) []*Int
return interfaces
}

func (c *interfaceCache) GetAllInterfaces() []*InterfaceConfig {
c.RLock()
defer c.RUnlock()
objs := c.cache.List()
interfaces := make([]*InterfaceConfig, len(objs))
for i := range objs {
interfaces[i] = objs[i].(*InterfaceConfig)
}
return interfaces
}

func (c *interfaceCache) Len() int {
c.RLock()
defer c.RUnlock()
Expand Down
14 changes: 14 additions & 0 deletions pkg/agent/interfacestore/testing/mock_interfacestore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/agent/interfacestore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type InterfaceStore interface {
GetInterfaceByOFPort(ofport uint32) (*InterfaceConfig, bool)
GetContainerInterfaceNum() int
GetInterfacesByType(interfaceType InterfaceType) []*InterfaceConfig
GetAllInterfaces() []*InterfaceConfig
Len() int
GetInterfaceKeysByType(interfaceType InterfaceType) []string
}
Expand Down
96 changes: 96 additions & 0 deletions pkg/agent/multicast/kernel_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multicast

import (
"fmt"
"strconv"
"strings"
"syscall"
)

// The following constants and structs come from the linux kernel file
// linux/include/uapi/linux/mroute.h
const (
IGMPMSG_NOCACHE = 1
VIFF_USE_IFINDEX = 8
MRT_ADD_VIF = 202
MRT_DEL_VIF = 203
MRT_ADD_MFC = 204
MRT_DEL_MFC = 205
MRT_BASE = 200
MRT_TABLE = 209
MRT_FLUSH = 212
MAXVIFS = 32
)

type vifctl struct {
vifcVifi uint16
vifcFlags uint8
vifcThreshold uint8
vifcRateLimit uint32
vifcLclIfindex int
vifcRmtAddr [4]byte
}

type mfcctl struct {
mfccOrigin [4]byte
mfccMcastgrp [4]byte
mfccParent uint16
mfccTtls [32]byte
mfccPktCnt uint32
mfccByteCnt uint32
mfccWrongIf uint32
mfccExpire int
}

// We should consider the changes in vifctl struct after kernel versiopn 5.9
func parseKernelVersion() (*kernelVersion, error) {
var uname syscall.Utsname
if err := syscall.Uname(&uname); err != nil {
return nil, fmt.Errorf("Running uname error: %v", err)
}
kernel_version_full := arrayToString(uname.Release)
kernel_version_numbers := strings.Split(kernel_version_full, "-")
kernel_version_array := strings.Split(kernel_version_numbers[0], ".")
major, err := strconv.Atoi(kernel_version_array[0])
minor, err := strconv.Atoi(kernel_version_array[1])
patch, err := strconv.Atoi(kernel_version_array[2])
if err != nil {
return nil, fmt.Errorf("failed to parse kernel version %s", kernel_version_full)
}
return &kernelVersion{
major: major,
minor: minor,
patch: patch,
}, nil
}

func arrayToString(x [65]int8) string {
var buf [65]byte
for i, b := range x {
buf[i] = byte(b)
}
str := string(buf[:])
if i := strings.Index(str, "\x00"); i != -1 {
str = str[:i]
}
return str
}

type kernelVersion struct {
major int
minor int
patch int
}
57 changes: 52 additions & 5 deletions pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package multicast

import (
"context"
"fmt"
"net"
"os/exec"
"sync"
"time"

Expand Down Expand Up @@ -66,6 +68,7 @@ type Controller struct {
groupCache cache.Indexer
ctx context.Context
cancelFunc context.CancelFunc
mRouteClient *MRouteClient
}

func (m *Controller) Run(stopCh <-chan struct{}) {
Expand All @@ -87,6 +90,7 @@ func (m *Controller) Run(stopCh <-chan struct{}) {
go m.processGroupEvent(stopCh)
}

go m.mRouteClient.runRouteClient(stopCh)
}

func (m *Controller) processGroupEvent(stopCh <-chan struct{}) {
Expand Down Expand Up @@ -122,12 +126,15 @@ func (m *Controller) addGroupMemberStatus(e mcastGroupEvent) error {
if err := m.ofClient.InstallMulticastFlow(e.group); err != nil {
return err
}
err := m.mRouteClient.externalInterfacesJoinMgroup(e.group)
if err != nil {
return err
}
status := &GroupMemberStatus{
group: e.group,
lastProbe: e.time,
localMembers: map[string]bool{e.iface.InterfaceName: true},
}
// TODO: add multicast routing entries for the new added Multicast group before add status into groupCache.
m.groupCache.Add(status)
klog.InfoS("Add new Multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName)
return nil
Expand Down Expand Up @@ -188,8 +195,18 @@ func (m *Controller) checkLastMember(group net.IP) {
// Remove the Multicast flow entry if no local Pod has joined the group.
m.ofClient.UninstallMulticastFlow(group)
// TODO: Remove group from Multicast routing entries.
err := m.mRouteClient.deleteInboundMrouteEntryByGroup(group)
if err != nil {
klog.Errorf("Cannot delete multicat group %s: %s", group.String(), err.Error())
return
}
err = m.mRouteClient.externalInterfacesLeaveMgroup(group)
if err != nil {
klog.Errorf("Cannot drop multicat group for all the external and tranport interfaces %s: %s", group.String(), err.Error())
return
}
m.groupCache.Delete(status)
klog.InfoS("Remove Multicast group from cache after all members left", "group", group.String())
klog.InfoS("Remove Multicast group from cache after the last member left", "group", group.String())
}
case <-m.ctx.Done():
// Complete the goroutine to avoid unnecessary leak when Agent is stopped.
Expand All @@ -210,14 +227,27 @@ func (m *Controller) clearStaleGroups() {
if diff > mcastGroupTimeout {
// Remove the Multicast flow entry if no membership report is received in the group.
m.ofClient.UninstallMulticastFlow(status.group)
// TODO: Remove group from Multicast routing entries.
err := m.mRouteClient.deleteInboundMrouteEntryByGroup(status.group)
if err != nil {
klog.Errorf("Cannot delete multicat group %s: %s", status.group.String(), err.Error())
return
}
err = m.mRouteClient.externalInterfacesLeaveMgroup(status.group)
if err != nil {
klog.Errorf("Cannot drop multicat group for all the external and tranport interfaces %s: %s", status.group.String(), err.Error())
return
}
m.groupCache.Delete(status)
klog.InfoS("Remove the stale Multicast group from the cache because no new membership report is received", "group", status.group.String())
}
}
}

func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore) *Controller {
func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore,transportInterface string) (*Controller, error) {
multicastRouteClient, err := NewRouteClient(nodeConfig, transportInterface, ifaceStore)
if err != nil {
return nil, fmt.Errorf("failed to initialize multicast route client %+v", err)
}
eventCh := make(chan mcastGroupEvent)
groupSnooper := newSnooper(ofClient, ifaceStore, eventCh)
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
Expand All @@ -232,7 +262,8 @@ func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeCon
groupCache: groupCache,
ctx: ctx,
cancelFunc: cancelFunc,
}
mRouteClient: multicastRouteClient,
}, nil
}

func podInterfaceIndexFunc(obj interface{}) ([]string, error) {
Expand All @@ -244,6 +275,22 @@ func podInterfaceIndexFunc(obj interface{}) ([]string, error) {
return podIFs, nil
}

func SetOVSMulticast(ovsBridge string) error {
setSnoopingEnable := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "mcast_snooping_enable=true")
cmd := exec.Command("/bin/sh", "-c", setSnoopingEnable)
err := cmd.Run()
if err != nil {
return fmt.Errorf("error running %s: %v", setSnoopingEnable, err)
}
mcastSnoopingDisableFloodUnregistered := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "other_config:mcast-snooping-disable-flood-unregistered=true")
cmd = exec.Command("/bin/sh", "-c", mcastSnoopingDisableFloodUnregistered)
err = cmd.Run()
if err != nil {
return fmt.Errorf("error running %s: %v", mcastSnoopingDisableFloodUnregistered, err)
}
return nil
}

func getGroupEventKey(obj interface{}) (string, error) {
groupState := obj.(*GroupMemberStatus)
return groupState.group.String(), nil
Expand Down
Loading

0 comments on commit 6987f32

Please sign in to comment.