diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 31071a95535..39dfb205b64 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3981,6 +3981,9 @@ data: # Deployments and StatefulSets via IP Pool annotation. # AntreaIPAM: false + # Enable multicast traffic. This feature is supported only with noEncap mode. + # Multicast: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. #ovsBridge: br-int @@ -4261,7 +4264,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-6dtt4hkmmm + name: antrea-config-c5h558tg5g namespace: kube-system --- apiVersion: v1 @@ -4332,7 +4335,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-6dtt4hkmmm + value: antrea-config-c5h558tg5g image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4383,7 +4386,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-6dtt4hkmmm + name: antrea-config-c5h558tg5g name: antrea-config - name: antrea-controller-tls secret: @@ -4664,7 +4667,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-6dtt4hkmmm + name: antrea-config-c5h558tg5g name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index cf186bb8f59..1eb85374d04 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3981,6 +3981,9 @@ data: # Deployments and StatefulSets via IP Pool annotation. # AntreaIPAM: false + # Enable multicast traffic. This feature is supported only with noEncap mode. + # Multicast: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. #ovsBridge: br-int @@ -4261,7 +4264,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-6dtt4hkmmm + name: antrea-config-c5h558tg5g namespace: kube-system --- apiVersion: v1 @@ -4332,7 +4335,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-6dtt4hkmmm + value: antrea-config-c5h558tg5g image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4383,7 +4386,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-6dtt4hkmmm + name: antrea-config-c5h558tg5g name: antrea-config - name: antrea-controller-tls secret: @@ -4666,7 +4669,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-6dtt4hkmmm + name: antrea-config-c5h558tg5g name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index d64995c56ed..76ce7994191 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3981,6 +3981,9 @@ data: # Deployments and StatefulSets via IP Pool annotation. # AntreaIPAM: false + # Enable multicast traffic. This feature is supported only with noEncap mode. + # Multicast: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. #ovsBridge: br-int @@ -4261,7 +4264,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-25kc566684 + name: antrea-config-h72246h8f2 namespace: kube-system --- apiVersion: v1 @@ -4332,7 +4335,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-25kc566684 + value: antrea-config-h72246h8f2 image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4383,7 +4386,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-25kc566684 + name: antrea-config-h72246h8f2 name: antrea-config - name: antrea-controller-tls secret: @@ -4667,7 +4670,7 @@ spec: path: /home/kubernetes/bin name: host-cni-bin - configMap: - name: antrea-config-25kc566684 + name: antrea-config-h72246h8f2 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 8759ec44cf9..6ba506b0c1f 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3981,6 +3981,9 @@ data: # Deployments and StatefulSets via IP Pool annotation. # AntreaIPAM: false + # Enable multicast traffic. This feature is supported only with noEncap mode. + # Multicast: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. #ovsBridge: br-int @@ -4266,7 +4269,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-tc6kb6tbkh + name: antrea-config-557h844gdh namespace: kube-system --- apiVersion: v1 @@ -4346,7 +4349,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-tc6kb6tbkh + value: antrea-config-557h844gdh image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4397,7 +4400,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-tc6kb6tbkh + name: antrea-config-557h844gdh name: antrea-config - name: antrea-controller-tls secret: @@ -4713,7 +4716,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-tc6kb6tbkh + name: antrea-config-557h844gdh name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-kind.yml b/build/yamls/antrea-kind.yml index 728ce57bd55..58f4484fb7f 100644 --- a/build/yamls/antrea-kind.yml +++ b/build/yamls/antrea-kind.yml @@ -3981,6 +3981,9 @@ data: # Deployments and StatefulSets via IP Pool annotation. # AntreaIPAM: false + # Enable multicast traffic. This feature is supported only with noEncap mode. + # Multicast: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. #ovsBridge: br-int @@ -4266,7 +4269,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-d4hfbm5d8h + name: antrea-config-f7dbh45mc5 namespace: kube-system --- apiVersion: v1 @@ -4337,7 +4340,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-d4hfbm5d8h + value: antrea-config-f7dbh45mc5 image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4388,7 +4391,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-d4hfbm5d8h + name: antrea-config-f7dbh45mc5 name: antrea-config - name: antrea-controller-tls secret: @@ -4665,7 +4668,7 @@ spec: type: CharDevice name: dev-tun - configMap: - name: antrea-config-d4hfbm5d8h + name: antrea-config-f7dbh45mc5 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index f6b6aa44209..803703c3dc2 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3981,6 +3981,9 @@ data: # Deployments and StatefulSets via IP Pool annotation. # AntreaIPAM: false + # Enable multicast traffic. This feature is supported only with noEncap mode. + # Multicast: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. #ovsBridge: br-int @@ -4266,7 +4269,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-786ttm24c7 + name: antrea-config-gg2666bbdt namespace: kube-system --- apiVersion: v1 @@ -4337,7 +4340,7 @@ spec: fieldRef: fieldPath: spec.serviceAccountName - name: ANTREA_CONFIG_MAP_NAME - value: antrea-config-786ttm24c7 + value: antrea-config-gg2666bbdt image: projects.registry.vmware.com/antrea/antrea-ubuntu:latest imagePullPolicy: IfNotPresent livenessProbe: @@ -4388,7 +4391,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-786ttm24c7 + name: antrea-config-gg2666bbdt name: antrea-config - name: antrea-controller-tls secret: @@ -4669,7 +4672,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-786ttm24c7 + name: antrea-config-gg2666bbdt name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index 658a91472c6..2f37a19dd23 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -35,6 +35,9 @@ featureGates: # Deployments and StatefulSets via IP Pool annotation. # AntreaIPAM: false +# Enable multicast traffic. This feature is supported only with noEncap mode. +# Multicast: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. #ovsBridge: br-int diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index d775d05a4c0..e20b2813303 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -36,6 +36,7 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter/exporter" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/metrics" + "antrea.io/antrea/pkg/agent/multicast" npl "antrea.io/antrea/pkg/agent/nodeportlocal" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/proxy" @@ -115,7 +116,8 @@ func run(o *Options) error { features.DefaultFeatureGate.Enabled(features.Egress), features.DefaultFeatureGate.Enabled(features.FlowExporter), o.config.AntreaProxy.ProxyAll, - connectUplinkToBridge) + connectUplinkToBridge, + features.DefaultFeatureGate.Enabled(features.Multicast)) _, serviceCIDRNet, _ := net.ParseCIDR(o.config.ServiceCIDR) var serviceCIDRNetv6 *net.IPNet @@ -455,6 +457,14 @@ func run(o *Options) error { } } + if features.DefaultFeatureGate.Enabled(features.Multicast) { + mcastController := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore) + if err := mcastController.Initialize(); err != nil { + return err + } + go mcastController.Run(stopCh) + } + agentQuerier := querier.NewAgentQuerier( nodeConfig, networkConfig, diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index 51deb3d9238..0f91949fce3 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -15,6 +15,7 @@ package interfacestore import ( + "fmt" "sync" "k8s.io/client-go/tools/cache" @@ -40,6 +41,8 @@ const ( // interfaceIPIndex is the index built with InterfaceConfig.IP // Only the interfaces with IP get indexed. interfaceIPIndex = "ip" + // ofPortIndex is the index built with InterfaceConfig.OFPort + ofPortIndex = "ofPort" ) // Local cache for interfaces created on node, including container, host gateway, and tunnel @@ -221,6 +224,18 @@ func (c *interfaceCache) GetNodeTunnelInterface(nodeName string) (*InterfaceConf return obj.(*InterfaceConfig), true } +// GetInterfaceByOFPort retrieves InterfaceConfig by the given ofPort number. +func (c *interfaceCache) GetInterfaceByOFPort(ofPort uint32) (*InterfaceConfig, bool) { + c.RLock() + defer c.RUnlock() + ofportStr := fmt.Sprintf("%d", ofPort) + interfaceConfigs, _ := c.cache.ByIndex(ofPortIndex, ofportStr) + if len(interfaceConfigs) == 0 { + return nil, false + } + return interfaceConfigs[0].(*InterfaceConfig), true +} + func interfaceNameIndexFunc(obj interface{}) ([]string, error) { interfaceConfig := obj.(*InterfaceConfig) return []string{interfaceConfig.InterfaceName}, nil @@ -260,6 +275,15 @@ func interfaceIPIndexFunc(obj interface{}) ([]string, error) { return intfIPs, nil } +func interfaceOFPortIndexFunc(obj interface{}) ([]string, error) { + interfaceConfig := obj.(*InterfaceConfig) + if interfaceConfig.OFPort < 0 { + // If interfaceConfig OFport is not valid, we return empty key. + return []string{}, nil + } + return []string{fmt.Sprintf("%d", interfaceConfig.OFPort)}, nil +} + func NewInterfaceStore() InterfaceStore { return &interfaceCache{ cache: cache.NewIndexer(getInterfaceKey, cache.Indexers{ @@ -268,6 +292,7 @@ func NewInterfaceStore() InterfaceStore { containerIDIndex: containerIDIndexFunc, podIndex: podIndexFunc, interfaceIPIndex: interfaceIPIndexFunc, + ofPortIndex: interfaceOFPortIndexFunc, }), } } diff --git a/pkg/agent/interfacestore/testing/mock_interfacestore.go b/pkg/agent/interfacestore/testing/mock_interfacestore.go index 0d480cc3443..3e8aa50fbc5 100644 --- a/pkg/agent/interfacestore/testing/mock_interfacestore.go +++ b/pkg/agent/interfacestore/testing/mock_interfacestore.go @@ -160,6 +160,21 @@ func (mr *MockInterfaceStoreMockRecorder) GetInterfaceByName(arg0 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInterfaceByName", reflect.TypeOf((*MockInterfaceStore)(nil).GetInterfaceByName), arg0) } +// GetInterfaceByOFPort mocks base method +func (m *MockInterfaceStore) GetInterfaceByOFPort(arg0 uint32) (*interfacestore.InterfaceConfig, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetInterfaceByOFPort", arg0) + ret0, _ := ret[0].(*interfacestore.InterfaceConfig) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetInterfaceByOFPort indicates an expected call of GetInterfaceByOFPort +func (mr *MockInterfaceStoreMockRecorder) GetInterfaceByOFPort(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInterfaceByOFPort", reflect.TypeOf((*MockInterfaceStore)(nil).GetInterfaceByOFPort), arg0) +} + // GetInterfaceKeysByType mocks base method func (m *MockInterfaceStore) GetInterfaceKeysByType(arg0 interfacestore.InterfaceType) []string { m.ctrl.T.Helper() diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index 5bfdd8455d4..66b023dcabf 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -88,6 +88,7 @@ type InterfaceStore interface { GetContainerInterfacesByPod(podName string, podNamespace string) []*InterfaceConfig GetInterfaceByIP(interfaceIP string) (*InterfaceConfig, bool) GetNodeTunnelInterface(nodeName string) (*InterfaceConfig, bool) + GetInterfaceByOFPort(ofPort uint32) (*InterfaceConfig, bool) GetContainerInterfaceNum() int GetInterfacesByType(interfaceType InterfaceType) []*InterfaceConfig Len() int diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go new file mode 100644 index 00000000000..441c878b4a6 --- /dev/null +++ b/pkg/agent/multicast/mcast_controller.go @@ -0,0 +1,366 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicast + +import ( + "net" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" +) + +type eventType uint8 + +const ( + groupJoin eventType = iota + groupLeave + + // Use IGMPv3 for IGMP query message because it is the main version for IGMP. + // TODO: support sending IGMP query with both v2 and v3 messages + queryVersion = 3 + podInterfaceIndex = "podInterface" + + // How long to wait before retrying the processing of a multicast group change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second +) + +var ( + workerCount uint8 = 2 +) + +type mcastGroupEvent struct { + group net.IP + eType eventType + time time.Time + iface *interfacestore.InterfaceConfig +} + +type GroupMemberStatus struct { + group net.IP + // localMembers is a set for the local Pod's interface name which has joined in the multicast group. + localMembers sets.String + lastProbe time.Time + mutex sync.RWMutex +} + +// eventHandler process the multicast Group membership report or leave messages. +func (c *Controller) eventHandler(stopCh <-chan struct{}) { + for { + select { + case e := <-c.groupEventCh: + c.addOrUpdateGroupEvent(e) + case <-stopCh: + return + } + } +} + +// addGroupMemberStatus adds the new group into groupCache. +func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) { + status := &GroupMemberStatus{ + group: e.group, + lastProbe: e.time, + localMembers: sets.NewString(e.iface.InterfaceName), + } + c.groupCache.Add(status) + c.queue.Add(e.group.String()) + klog.InfoS("Added new multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName) + return +} + +// updateGroupMemberStatus updates the group status in groupCache. If a "join" message is sent from an existing member, +// only updates the lastProbe time. If a "join" message is sent from an "unknown" member, updates the lastProbe time and +// adds the new member into the group's local member set. If a "leave" message is sent from an existing member, remove it from the +// group's local member set, and if the member is the last one in local cache, a query message on the group is sent out +// to check if there are still local members in the group. +func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent) { + status := obj.(*GroupMemberStatus) + newStatus := &GroupMemberStatus{ + group: status.group, + localMembers: status.localMembers, + lastProbe: status.lastProbe, + } + status.mutex.Lock() + defer status.mutex.Unlock() + exist := status.localMembers.Has(e.iface.InterfaceName) + switch e.eType { + case groupJoin: + newStatus.lastProbe = e.time + if !exist { + newStatus.localMembers.Insert(e.iface.InterfaceName) + klog.InfoS("Added interface to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + } + c.groupCache.Update(newStatus) + c.queue.Add(status.group.String()) + case groupLeave: + if exist { + newStatus.localMembers.Delete(e.iface.InterfaceName) + c.groupCache.Update(newStatus) + klog.InfoS("Member left the Multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + if len(status.localMembers) == 0 { + klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + c.checkLastMember(e.group) + } + } + } + return +} + +// checkLastMember sends out a query message on the group to check if there are still members in the group. If no new +// membership report is received in the max response time, the group is removed from groupCache. +func (c *Controller) checkLastMember(group net.IP) { + err := c.igmpSnooper.queryIGMP(group, queryVersion) + if err != nil { + klog.ErrorS(err, "Failed to send IGMP query message", "group", group.String()) + return + } + c.queue.AddAfter(group.String(), igmpMaxResponseTime) +} + +// clearStaleGroups removes the stale groups from groupCache. A stale group is the one that has not been updated for +// mcastGroupTimeout. +func (c *Controller) clearStaleGroups() { + now := time.Now() + for _, obj := range c.groupCache.List() { + status := obj.(*GroupMemberStatus) + status.mutex.RLock() + diff := now.Sub(status.lastProbe).Seconds() + status.mutex.RUnlock() + if diff > mcastGroupTimeout.Seconds() { + c.queue.Add(status.group.String()) + } + } +} + +type Controller struct { + ofClient openflow.Client + nodeConfig *config.NodeConfig + igmpSnooper *IGMPSnooper + groupEventCh chan *mcastGroupEvent + groupCache cache.Indexer + queue workqueue.RateLimitingInterface + // installedGroups saves the groups which are configured on both OVS and the host. + installedGroups sets.String + installedGroupsMutex sync.RWMutex +} + +func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore) *Controller { + eventCh := make(chan *mcastGroupEvent, workerCount) + groupSnooper := newSnooper(ofClient, ifaceStore, eventCh) + groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ + podInterfaceIndex: podInterfaceIndexFunc, + }) + return &Controller{ + ofClient: ofClient, + nodeConfig: nodeConfig, + igmpSnooper: groupSnooper, + groupEventCh: eventCh, + groupCache: groupCache, + installedGroups: sets.NewString(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), + } +} + +func (c *Controller) Initialize() error { + // Install flows on OVS for IGMP packets and multicast traffic forwarding: + // 1) send the IGMP report messages to Antrea Agent, + // 2) forward the IMGP query messages to all local Pods, + // 3) forward the multicast traffic to antrea-gw0 if no local Pods have joined in the group, and this is to ensure + // local Pods can access the external multicast receivers. + err := c.ofClient.InstallMulticastInitialFlows(uint8(openflow.PacketInReasonMC)) + if err != nil { + klog.ErrorS(err, "Failed to install multicast initial flows") + return err + } + return nil +} + +func (c *Controller) Run(stopCh <-chan struct{}) { + // Periodically query Multicast Groups on OVS. + go wait.NonSlidingUntil(func() { + if err := c.igmpSnooper.queryIGMP(net.IPv4zero, queryVersion); err != nil { + klog.ErrorS(err, "Failed to send IGMP query") + } + }, queryInterval, stopCh) + + // Periodically check the group member status, and remove the groups in which no members exist + go wait.NonSlidingUntil(c.clearStaleGroups, queryInterval, stopCh) + + go c.eventHandler(stopCh) + + for i := 0; i < int(workerCount); i++ { + // Process multicast Group membership report or leave messages. + go wait.Until(c.worker, time.Second, stopCh) + } +} + +func (c *Controller) worker() { + for c.processNextWorkItem() { + } +} + +// getGroupMemberStatusByGroup returns the GroupMemberStatus according to the given group. +func (c *Controller) getGroupMemberStatusByGroup(group net.IP) *GroupMemberStatus { + status, ok, _ := c.groupCache.GetByKey(group.String()) + if ok { + return status.(*GroupMemberStatus) + } + return nil +} + +// getGroupMemberStatusesByPod returns all GroupMemberStatus that the given podInterface is included in its localMembers. +func (c *Controller) getGroupMemberStatusesByPod(podInterface string) []*GroupMemberStatus { + groupMembers := make([]*GroupMemberStatus, 0) + statuses, _ := c.groupCache.ByIndex(podInterfaceIndex, podInterface) + for _, s := range statuses { + groupMembers = append(groupMembers, s.(*GroupMemberStatus)) + } + return groupMembers +} + +func (c *Controller) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + + // We expect string (multicast group) to come off the workqueue. + if key, ok := obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call Forget here else we'd + // go into a loop of attempting to process a work item that is invalid. + // This should not happen. + c.queue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := c.syncGroup(key); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.Errorf("Error syncing multicast group %s, requeuing. Error: %v", key, err) + } + return true + +} + +func (c *Controller) syncGroup(groupKey string) error { + startTime := time.Now() + defer func() { + klog.V(4).Infof("Finished syncing GroupMemberStatus for %s. (%v)", groupKey, time.Since(startTime)) + }() + obj, exists, err := c.groupCache.GetByKey(groupKey) + if err != nil { + klog.ErrorS(err, "Failed to get GroupMemberStatus", "group", groupKey) + return err + } + if !exists { + klog.InfoS("multicast group not found in the cache", "group", groupKey) + return nil + } + status := obj.(*GroupMemberStatus) + if c.groupHasInstalled(groupKey) { + status.mutex.Lock() + defer status.mutex.Unlock() + if c.groupIsStale(status) { + // Remove the multicast flow entry if no local Pod has joined the group. + if err := c.ofClient.UninstallMulticastFlow(status.group); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) + return err + } + c.delInstalledGroup(groupKey) + c.groupCache.Delete(status) + klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) + } + return nil + } + if err := c.ofClient.InstallMulticastFlow(status.group); err != nil { + klog.ErrorS(err, "Failed to install multicast flows", "group", status.group) + return err + } + c.addInstalledGroup(groupKey) + return nil +} + +// groupIsStale returns true if no local members in the group, or the time diff to the last probe time is over +// mcastGroupTimeout. +func (c *Controller) groupIsStale(status *GroupMemberStatus) bool { + membersCount := len(status.localMembers) + diff := time.Now().Sub(status.lastProbe).Seconds() + if membersCount == 0 || diff > mcastGroupTimeout.Seconds() { + return true + } + return false +} + +func (c *Controller) groupHasInstalled(groupKey string) bool { + c.installedGroupsMutex.RLock() + defer c.installedGroupsMutex.RUnlock() + return c.installedGroups.Has(groupKey) +} + +func (c *Controller) addInstalledGroup(groupKey string) { + c.installedGroupsMutex.Lock() + c.installedGroups.Insert(groupKey) + c.installedGroupsMutex.Unlock() +} + +func (c *Controller) delInstalledGroup(groupKey string) { + c.installedGroupsMutex.Lock() + c.installedGroups.Delete(groupKey) + c.installedGroupsMutex.Unlock() +} + +func (c *Controller) addOrUpdateGroupEvent(e *mcastGroupEvent) { + obj, ok, _ := c.groupCache.GetByKey(e.group.String()) + switch e.eType { + case groupJoin: + if !ok { + c.addGroupMemberStatus(e) + } else { + c.updateGroupMemberStatus(obj, e) + } + case groupLeave: + if ok { + c.updateGroupMemberStatus(obj, e) + } + } +} + +func podInterfaceIndexFunc(obj interface{}) ([]string, error) { + groupState := obj.(*GroupMemberStatus) + podInterfaces := make([]string, 0, len(groupState.localMembers)) + for podInterface := range groupState.localMembers { + podInterfaces = append(podInterfaces, podInterface) + } + return podInterfaces, nil +} + +func getGroupEventKey(obj interface{}) (string, error) { + groupState := obj.(*GroupMemberStatus) + return groupState.group.String(), nil +} diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go new file mode 100644 index 00000000000..492cae4005c --- /dev/null +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -0,0 +1,245 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicast + +import ( + "net" + "sync" + "testing" + "time" + + "antrea.io/libOpenflow/openflow13" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + ifaceStoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" +) + +var ( + mockOFClient *openflowtest.MockClient + mockIfaceStore *ifaceStoretest.MockInterfaceStore + mgroup = net.ParseIP("224.96.1.3") + if1 = &interfacestore.InterfaceConfig{ + Type: interfacestore.ContainerInterface, + InterfaceName: "if1", + IPs: []net.IP{net.ParseIP("192.168.1.1")}, + } + if2 = &interfacestore.InterfaceConfig{ + Type: interfacestore.ContainerInterface, + InterfaceName: "if2", + IPs: []net.IP{net.ParseIP("192.168.1.2")}, + } +) + +func TestAddGroupMemberStatus(t *testing.T) { + event := &mcastGroupEvent{ + group: mgroup, + eType: groupJoin, + time: time.Now(), + iface: if1, + } + mctrl := newMockMulticastController(t) + mctrl.addGroupMemberStatus(event) + groupCache := mctrl.groupCache + compareGroupStatus(t, groupCache, event) + obj, _ := mctrl.queue.Get() + key, ok := obj.(string) + assert.True(t, ok) + assert.Equal(t, mgroup.String(), key) + mockOFClient.EXPECT().InstallMulticastFlow(mgroup).Times(1) + err := mctrl.syncGroup(key) + assert.Nil(t, err) + mctrl.queue.Forget(obj) +} + +func TestUpdateGroupMemberStatus(t *testing.T) { + mctrl := newMockMulticastController(t) + igmpMaxResponseTime = time.Second * 1 + event := &mcastGroupEvent{ + group: mgroup, + eType: groupJoin, + time: time.Now(), + iface: if1, + } + mctrl.addGroupMemberStatus(event) + obj, _, _ := mctrl.groupCache.GetByKey(event.group.String()) + mockOFClient.EXPECT().SendIGMPQueryPacketOut(igmpQueryDstMac, mcastAllHosts, uint32(openflow13.P_NORMAL), gomock.Any()).Times(1) + for _, e := range []*mcastGroupEvent{ + {group: mgroup, eType: groupJoin, time: event.time.Add(time.Second * 20), iface: if1}, + {group: mgroup, eType: groupJoin, time: event.time.Add(time.Second * 40), iface: if1}, + {group: mgroup, eType: groupJoin, time: event.time.Add(time.Second * 60), iface: if2}, + {group: mgroup, eType: groupLeave, time: event.time.Add(time.Second * 61), iface: if1}, + {group: mgroup, eType: groupLeave, time: event.time.Add(time.Second * 62), iface: if2}, + } { + mctrl.updateGroupMemberStatus(obj, e) + groupCache := mctrl.groupCache + compareGroupStatus(t, groupCache, e) + } +} + +func TestCheckLastMember(t *testing.T) { + mctrl := newMockMulticastController(t) + workerCount = 1 + igmpMaxResponseTime = time.Second * 1 + lastProbe := time.Now() + testCheckLastMember := func(ev *mcastGroupEvent, expExist bool) { + status := &GroupMemberStatus{ + localMembers: sets.NewString(), + lastProbe: lastProbe, + } + if ev != nil { + status.group = ev.group + } else { + status.group = mgroup + } + _ = mctrl.groupCache.Add(status) + mctrl.addInstalledGroup(status.group.String()) + mockOFClient.EXPECT().SendIGMPQueryPacketOut(igmpQueryDstMac, mcastAllHosts, uint32(openflow13.P_NORMAL), gomock.Any()).AnyTimes() + var wg sync.WaitGroup + wg.Add(1) + go func() { + mctrl.checkLastMember(status.group) + // Wait igmpMaxResponseTime to ensure the group is added into mctrl.queue. + time.Sleep(igmpMaxResponseTime) + wg.Done() + }() + if ev != nil { + mctrl.addOrUpdateGroupEvent(ev) + } + wg.Wait() + obj, _ := mctrl.queue.Get() + key, ok := obj.(string) + assert.True(t, ok) + assert.Equal(t, status.group.String(), key) + err := mctrl.syncGroup(key) + assert.Nil(t, err) + _, exists, err := mctrl.groupCache.GetByKey(key) + assert.Nil(t, err) + assert.Equal(t, expExist, exists) + // Clear cache to avoid affecting the next test. + if _, ok, _ := mctrl.groupCache.GetByKey(key); ok { + _ = mctrl.groupCache.Delete(status) + } + mctrl.queue.Forget(obj) + } + mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(2) + for _, tc := range []struct { + ev *mcastGroupEvent + exists bool + }{ + {ev: &mcastGroupEvent{group: net.ParseIP("224.96.1.5"), eType: groupJoin, time: lastProbe.Add(time.Second * 1), iface: if1}, exists: true}, + {ev: &mcastGroupEvent{group: net.ParseIP("224.96.1.6"), eType: groupLeave, time: lastProbe.Add(time.Second * 1), iface: if1}, exists: false}, + {ev: nil, exists: false}, + } { + testCheckLastMember(tc.ev, tc.exists) + } +} + +func TestClearStaleGroups(t *testing.T) { + mctrl := newMockMulticastController(t) + workerCount = 1 + + mockOFClient.EXPECT().InstallMulticastInitialFlows(gomock.Any()).Times(1) + err := mctrl.Initialize() + assert.Nil(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + mctrl.worker() + wg.Done() + }() + + now := time.Now() + validGroups := []*GroupMemberStatus{ + { + group: net.ParseIP("224.96.1.2"), + localMembers: sets.NewString("p1", "p2"), + lastProbe: now.Add(-queryInterval), + }, + { + group: net.ParseIP("224.96.1.3"), + localMembers: sets.NewString(), + lastProbe: now.Add(-queryInterval), + }, + } + staleGroups := []*GroupMemberStatus{ + { + group: net.ParseIP("224.96.1.4"), + localMembers: sets.NewString("p1", "p3"), + lastProbe: now.Add(-mcastGroupTimeout - time.Second), + }, + { + group: net.ParseIP("224.96.1.5"), + localMembers: sets.NewString(), + lastProbe: now.Add(-mcastGroupTimeout - time.Second), + }, + } + for _, g := range validGroups { + err := mctrl.groupCache.Add(g) + assert.Nil(t, err) + mctrl.addInstalledGroup(g.group.String()) + } + for _, g := range staleGroups { + err := mctrl.groupCache.Add(g) + assert.Nil(t, err) + mctrl.addInstalledGroup(g.group.String()) + } + mockOFClient.EXPECT().UninstallMulticastFlow(gomock.Any()).Times(len(staleGroups)) + mctrl.clearStaleGroups() + mctrl.queue.ShutDown() + wg.Wait() + assert.Equal(t, len(validGroups), len(mctrl.groupCache.List())) + for _, g := range validGroups { + _, exists, _ := mctrl.groupCache.GetByKey(g.group.String()) + assert.True(t, exists) + } + for _, g := range staleGroups { + _, exists, _ := mctrl.groupCache.GetByKey(g.group.String()) + assert.False(t, exists) + } +} + +func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEvent) { + obj, exits, err := cache.GetByKey(event.group.String()) + assert.Nil(t, err) + assert.Truef(t, exits, "failed to add group to cache") + status, ok := obj.(*GroupMemberStatus) + assert.Equal(t, true, ok) + assert.Equal(t, event.group, status.group) + if event.eType == groupJoin { + assert.True(t, status.lastProbe.Equal(event.time) || status.lastProbe.After(event.time)) + exists := status.localMembers.Has(event.iface.InterfaceName) + assert.Truef(t, exists, "member is not added into cache") + } else { + assert.True(t, status.lastProbe.Before(event.time)) + exists := status.localMembers.Has(event.iface.InterfaceName) + assert.Falsef(t, exists, "member is not removed from cache") + } +} + +func newMockMulticastController(t *testing.T) *Controller { + controller := gomock.NewController(t) + mockOFClient = openflowtest.NewMockClient(controller) + mockIfaceStore = ifaceStoretest.NewMockInterfaceStore(controller) + nodeConfig := &config.NodeConfig{} + mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + return NewMulticastController(mockOFClient, nodeConfig, mockIfaceStore) +} diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go new file mode 100644 index 00000000000..1e224f6ca39 --- /dev/null +++ b/pkg/agent/multicast/mcast_discovery.go @@ -0,0 +1,239 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicast + +import ( + "errors" + "fmt" + "net" + "time" + + "antrea.io/libOpenflow/openflow13" + "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" + "antrea.io/ofnet/ofctrl" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" +) + +const ( + IGMPProtocolNumber = 2 +) + +const ( + // queryInterval is the interval to send IGMP query messages. + queryInterval = time.Second * 125 + // mcastGroupTimeout is used to identify the multicast group in which no local members have joined. + mcastGroupTimeout = queryInterval * 3 +) + +var ( + // igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the + // "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message + // when checking the last group member. + igmpMaxResponseTime = time.Second * 10 + // igmpQueryDstMac is the MAC address used in the dst MAC field in the IGMP query message + igmpQueryDstMac, _ = net.ParseMAC("01:00:5e:00:00:01") + mcastAllHosts = net.ParseIP("224.0.0.1").To4() +) + +type IGMPSnooper struct { + ofClient openflow.Client + ifaceStore interfacestore.InterfaceStore + eventCh chan *mcastGroupEvent +} + +func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error { + matches := pktIn.GetMatches() + // Get custom reasons in this packet-in. + match := matches.GetMatchByName(openflow.CustomReasonField.GetNXFieldName()) + customReasons, err := getInfoInReg(match, openflow.CustomReasonField.GetRange().ToNXRange()) + if err != nil { + klog.ErrorS(err, "Received error while unloading customReason from OVS reg", "regField", openflow.CustomReasonField.GetName()) + return err + } + if customReasons&openflow.CustomReasonIGMP == openflow.CustomReasonIGMP { + return s.processPacketIn(pktIn) + } + return nil +} + +func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow13.NXRange) (uint32, error) { + regValue, ok := regMatch.GetValue().(*ofctrl.NXRegister) + if !ok { + return 0, errors.New("register value cannot be retrieved") + } + if rng != nil { + return ofctrl.GetUint32ValueWithRange(regValue.Data, rng), nil + } + return regValue.Data, nil +} + +func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) { + matches := pktIn.GetMatches() + ofPortField := matches.GetMatchByName("OXM_OF_IN_PORT") + if ofPortField == nil { + return nil, errors.New("in_port field not found") + } + ofPort := ofPortField.GetValue().(uint32) + ifaceConfig, found := s.ifaceStore.GetInterfaceByOFPort(ofPort) + if !found { + return nil, errors.New("target Pod not found") + } + return ifaceConfig, nil +} + +func (s *IGMPSnooper) queryIGMP(group net.IP, version uint8) error { + igmp, err := generateIGMPQueryPacket(group, version) + if err != nil { + return err + } + if err := s.ofClient.SendIGMPQueryPacketOut(igmpQueryDstMac, mcastAllHosts, openflow13.P_NORMAL, igmp); err != nil { + return err + } + klog.V(2).InfoS("Sent packetOut form IGMP query", "group", group.String(), "version", version) + return nil +} + +func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { + now := time.Now() + iface, err := s.parseSrcInterface(pktIn) + if err != nil { + return err + } + klog.V(2).InfoS("Received PacketIn for IGMP packet", "in_port", iface.OFPort) + igmp, err := parseIGMPPacket(pktIn.Data) + if err != nil { + return err + } + switch igmp.GetMessageType() { + case protocol.IGMPv1Report: + fallthrough + case protocol.IGMPv2Report: + mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress + klog.InfoS("Received IGMPv1or2 Report message", "group", mgroup.String(), "interface", iface.PodName) + event := &mcastGroupEvent{ + group: mgroup, + eType: groupJoin, + time: now, + iface: iface, + } + s.eventCh <- event + case protocol.IGMPv3Report: + msg := igmp.(*protocol.IGMPv3MembershipReport) + for _, gr := range msg.GroupRecords { + mgroup := gr.MulticastAddress + klog.InfoS("Received IGMPv3 Report message", "group", mgroup.String(), "interface", iface.PodName) + event := &mcastGroupEvent{ + group: mgroup, + eType: groupJoin, + time: now, + iface: iface, + } + s.eventCh <- event + } + + case protocol.IGMPv2LeaveGroup: + mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress + klog.InfoS("Received IGMPv2 Leave message", "group", mgroup.String(), "interface", iface.PodName) + event := &mcastGroupEvent{ + group: mgroup, + eType: groupLeave, + time: now, + iface: iface, + } + s.eventCh <- event + } + return nil +} + +func generateIGMPQueryPacket(group net.IP, version uint8) (util.Message, error) { + // The max response time field in IGMP protocol uses a value in units of 1/10 second. + // See https://datatracker.ietf.org/doc/html/rfc2236 and https://datatracker.ietf.org/doc/html/rfc3376 + respTime := uint8(igmpMaxResponseTime.Seconds() * 10) + switch version { + case 1: + return &protocol.IGMPv1or2{ + Type: protocol.IGMPQuery, + MaxResponseTime: 0, + Checksum: 0, + GroupAddress: group, + }, nil + case 2: + return &protocol.IGMPv1or2{ + Type: protocol.IGMPQuery, + MaxResponseTime: respTime, + Checksum: 0, + GroupAddress: group, + }, nil + case 3: + return &protocol.IGMPv3Query{ + Type: protocol.IGMPQuery, + MaxResponseTime: respTime, + GroupAddress: group, + SuppressRouterProcessing: false, + RobustnessValue: 0, + IntervalTime: uint8(queryInterval.Seconds()), + NumberOfSources: 0, + }, nil + } + return nil, fmt.Errorf("unsupported IGMP version %d", version) +} + +func parseIGMPPacket(pkt protocol.Ethernet) (protocol.IGMPMessage, error) { + if pkt.Ethertype != protocol.IPv4_MSG { + return nil, errors.New("not IPv4 packet") + } + ipPacket, ok := pkt.Data.(*protocol.IPv4) + if !ok { + return nil, errors.New("failed to parse IPv4 packet") + } + if ipPacket.Protocol != IGMPProtocolNumber { + return nil, errors.New("not IGMP packet") + } + data, _ := ipPacket.Data.MarshalBinary() + igmpLength := ipPacket.Length - uint16(4*ipPacket.IHL) + if igmpLength == 8 { + igmp := new(protocol.IGMPv1or2) + if err := igmp.UnmarshalBinary(data); err != nil { + return nil, err + } + return igmp, nil + } + switch data[0] { + case protocol.IGMPQuery: + igmp := new(protocol.IGMPv3Query) + if err := igmp.UnmarshalBinary(data); err != nil { + return nil, err + } + return igmp, nil + case protocol.IGMPv3Report: + igmp := new(protocol.IGMPv3MembershipReport) + if err := igmp.UnmarshalBinary(data); err != nil { + return nil, err + } + return igmp, nil + default: + return nil, errors.New("unknown igmp packet") + } +} + +func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent) *IGMPSnooper { + d := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh} + ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonMC), "MulticastGroupDiscovery", d) + return d +} diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index bf9407f0eb7..0b807d67e5d 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -20,6 +20,7 @@ import ( "net" "antrea.io/libOpenflow/protocol" + ofutil "antrea.io/libOpenflow/util" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -299,6 +300,20 @@ type Client interface { AddAddressToDNSConjunction(id uint32, addrs []types.Address) error // DeleteAddressFromDNSConjunction removes addresses from the toAddresses of the dns packetIn conjunction. DeleteAddressFromDNSConjunction(id uint32, addrs []types.Address) error + // InstallMulticastInitialFlows installs OpenFlow to packetIn the IGMP messages and output the Multicast traffic to + // antrea-gw0 so that local Pods could access external Multicast servers. + InstallMulticastInitialFlows(pktInReason uint8) error + // InstallMulticastFlow installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 + // to ensure it can be forwarded to the external addresses. + InstallMulticastFlow(multicastIP net.IP) error + // UninstallMulticastFlow removes the flow matching the given multicastIP. + UninstallMulticastFlow(multicastIP net.IP) error + // SendIGMPQueryPacketOut sends the IGMPQuery packet as a packet-out to OVS from the gateway port. + SendIGMPQueryPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + outPort uint32, + igmp ofutil.Message) error } // GetFlowTableStatus returns an array of flow table status. @@ -1196,3 +1211,47 @@ func (c *client) SendUDPPacketOut( packetOutObj := packetOutBuilder.Done() return c.bridge.SendPacketOut(packetOutObj) } + +func (c *client) InstallMulticastInitialFlows(pktInReason uint8) error { + flows := c.igmpPktInFlows(pktInReason) + flows = append(flows, c.externalMulticastReceiverFlow()) + cacheKey := fmt.Sprintf("multicast") + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.addFlows(c.mcastFlowCache, cacheKey, flows) +} + +func (c *client) InstallMulticastFlow(multicastIP net.IP) error { + flows := c.localMulticastForwardFlow(multicastIP) + cacheKey := fmt.Sprintf("multicast_%s", multicastIP.String()) + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.addFlows(c.mcastFlowCache, cacheKey, flows) +} + +func (c *client) UninstallMulticastFlow(multicastIP net.IP) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + cacheKey := fmt.Sprintf("multicast_%s", multicastIP.String()) + return c.deleteFlows(c.mcastFlowCache, cacheKey) +} + +func (c *client) SendIGMPQueryPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + outPort uint32, + igmp ofutil.Message) error { + // Generate a base IP PacketOutBuilder. + srcMAC := c.nodeConfig.GatewayConfig.MAC.String() + srcIP := c.nodeConfig.GatewayConfig.IPv4.String() + dstMACStr := dstMAC.String() + dstIPStr := dstIP.String() + packetOutBuilder, err := setBasePacketOutBuilder(c.bridge.BuildPacketOut(), srcMAC, dstMACStr, srcIP, dstIPStr, config.HostGatewayOFPort, outPort) + if err != nil { + return err + } + // Set protocol and L4 message. + packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolIGMP).SetL4Packet(igmp) + packetOutObj := packetOutBuilder.Done() + return c.bridge.SendPacketOut(packetOutObj) +} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index bd7b5f91176..18207b1db79 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -103,7 +103,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -132,7 +132,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -174,7 +174,7 @@ func TestFlowInstallationFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -209,7 +209,7 @@ func TestConcurrentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -400,7 +400,7 @@ func Test_client_SendTraceflowPacket(t *testing.T) { } func prepareTraceflowFlow(ctrl *gomock.Controller) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false) c := ofClient.(*client) c.cookieAllocator = cookie.NewAllocator(0) c.nodeConfig = nodeConfig @@ -418,7 +418,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client { } func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false) c := ofClient.(*client) c.nodeConfig = nodeConfig m := ovsoftest.NewMockBridge(ctrl) @@ -506,7 +506,7 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) { } func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false) c := ofClient.(*client) m := ovsoftest.NewMockBridge(ctrl) c.bridge = m diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index f02e49d74f1..b36a2130b9a 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -55,15 +55,17 @@ var ( DispositionRejRegMark = binding.NewRegMark(APDispositionField, DispositionRej) // reg0[24..27]: Field to indicate the reasons of sending packet to the controller. // Marks in this field include, - // - 0b0001: logging - // - 0b0010: reject - // - 0b0100: deny (used by Flow Exporter) - // - 0b1000: DNS packet (used by FQDN) - CustomReasonField = binding.NewRegField(0, 24, 27, "PacketInReason") + // - 0b00001: logging + // - 0b00010: reject + // - 0b00100: deny (used by Flow Exporter) + // - 0b01000: DNS packet (used by FQDN) + // - 0b10000: IGMP packet (used by Multicast) + CustomReasonField = binding.NewRegField(0, 24, 28, "PacketInReason") CustomReasonLoggingRegMark = binding.NewRegMark(CustomReasonField, CustomReasonLogging) CustomReasonRejectRegMark = binding.NewRegMark(CustomReasonField, CustomReasonReject) CustomReasonDenyRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDeny) CustomReasonDNSRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDNS) + CustomReasonIGMPRegMark = binding.NewRegMark(CustomReasonField, CustomReasonIGMP) // reg1(NXM_NX_REG1) // Field to cache the ofPort of the OVS interface where to output packet. diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index b46dd9d0615..d1c983d4492 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -507,7 +507,7 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOperations := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, false, true, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, false, true, false, false, false, false, false) c = ofClient.(*client) c.cookieAllocator = cookie.NewAllocator(0) c.ofEntryOperations = mockOperations @@ -575,7 +575,7 @@ func BenchmarkBatchInstallPolicyRuleFlows(b *testing.B) { ctrl := gomock.NewController(b) defer ctrl.Finish() mockOperations := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, false, true, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, false, true, false, false, false, false, false) c = ofClient.(*client) c.cookieAllocator = cookie.NewAllocator(0) c.ofEntryOperations = mockOperations diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index 7874f120c73..2f5692742f5 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -42,7 +42,12 @@ const ( // PacketIn reasons PacketInReasonTF ofpPacketInReason = 1 + // PacketInReasonNP is used for the custom packetIn reasons for Network Policy, including: Logging, Reject, Deny. + // It is also used to mark the DNS Response packet. PacketInReasonNP ofpPacketInReason = 0 + // PacketInReasonMC shares PacketInReasonNP for IGMP packet_in message. This is because OVS "controller" action + // only correctly supports reason 0 or 1. Change to another value after the OVS action is corrected. + PacketInReasonMC = PacketInReasonNP // PacketInQueueSize defines the size of PacketInQueue. // When PacketInQueue reaches PacketInQueueSize, new packet-in will be dropped. PacketInQueueSize = 200 @@ -111,6 +116,7 @@ func (c *client) parsePacketIn(featurePacketIn *featureStartPacketIn) { } // Use corresponding handlers subscribed to the reason to handle PacketIn for name, handler := range c.packetInHandlers[featurePacketIn.reason] { + klog.V(2).InfoS("Receive packetIn", "reason", featurePacketIn.reason, "handler", name) err := handler.HandlePacketIn(pktIn) if err != nil { klog.Errorf("PacketIn handler %s failed to process packet: %+v", name, err) diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index d634d3464d6..8fc863de452 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -46,6 +46,7 @@ var ( SpoofGuardTable = binding.NewOFTable(10, "SpoofGuard") arpResponderTable = binding.NewOFTable(20, "ARPResponder") IPv6Table = binding.NewOFTable(21, "IPv6") + MulticastTable = binding.NewOFTable(22, "Multicast") ServiceHairpinTable = binding.NewOFTable(23, "ServiceHairpin") ServiceConntrackTable = binding.NewOFTable(24, "ServiceConntrack") // serviceConntrackTable use a new ct_zone to transform SNAT'd connections. ConntrackTable = binding.NewOFTable(30, "ConntrackZone") @@ -233,6 +234,7 @@ const ( // policy rules. CustomReasonDeny = 0b100 CustomReasonDNS = 0b1000 + CustomReasonIGMP = 0b10000 ) var DispositionToString = map[uint32]string{ @@ -253,6 +255,10 @@ var ( GlobalVirtualMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff") hairpinIP = net.ParseIP("169.254.169.252").To4() hairpinIPv6 = net.ParseIP("fc00::aabb:ccdd:eeff").To16() + + mcastAllRouters = net.ParseIP("224.0.0.2") + igmpV3ReportDst = net.ParseIP("224.0.0.22") + _, mcastCIDR, _ = net.ParseCIDR("224.0.0.0/8") ) type OFEntryOperations interface { @@ -288,6 +294,7 @@ type client struct { enableDenyTracking bool enableEgress bool enableWireGuard bool + enableMulticast bool connectUplinkToBridge bool roundInfo types.RoundInfo cookieAllocator cookie.Allocator @@ -295,7 +302,7 @@ type client struct { egressEntryTable uint8 ingressEntryTable uint8 // Flow caches for corresponding deletions. - nodeFlowCache, podFlowCache, serviceFlowCache, snatFlowCache, tfFlowCache *flowCategoryCache + nodeFlowCache, podFlowCache, serviceFlowCache, snatFlowCache, tfFlowCache, mcastFlowCache *flowCategoryCache // "fixed" flows installed by the agent after initialization and which do not change during // the lifetime of the client. gatewayFlows, defaultServiceFlows, defaultTunnelFlows, hostNetworkingFlows []binding.Flow @@ -2518,8 +2525,11 @@ func (c *client) genPacketInMeter(meterID binding.MeterIDType, rate uint32) bind func (c *client) generatePipeline() { c.createOFTable(ClassifierTable, SpoofGuardTable.GetID(), binding.TableMissActionDrop) + c.createOFTable(SpoofGuardTable, ConntrackTable.GetID(), binding.TableMissActionDrop) + c.createOFTable(IPv6Table, ConntrackTable.GetID(), binding.TableMissActionNext) c.createOFTable(arpResponderTable, binding.LastTableID, binding.TableMissActionDrop) c.createOFTable(ConntrackTable, ConntrackStateTable.GetID(), binding.TableMissActionNone) + c.createOFTable(ConntrackStateTable, DNATTable.GetID(), binding.TableMissActionNext) c.createOFTable(EgressRuleTable, EgressDefaultTable.GetID(), binding.TableMissActionNext) c.createOFTable(EgressDefaultTable, EgressMetricTable.GetID(), binding.TableMissActionNext) c.createOFTable(EgressMetricTable, L3ForwardingTable.GetID(), binding.TableMissActionNext) @@ -2530,10 +2540,12 @@ func (c *client) generatePipeline() { c.createOFTable(IngressRuleTable, IngressDefaultTable.GetID(), binding.TableMissActionNext) c.createOFTable(IngressDefaultTable, IngressMetricTable.GetID(), binding.TableMissActionNext) c.createOFTable(IngressMetricTable, ConntrackCommitTable.GetID(), binding.TableMissActionNext) + c.createOFTable(ConntrackCommitTable, L2ForwardingOutTable.GetID(), binding.TableMissActionNext) c.createOFTable(L2ForwardingOutTable, binding.LastTableID, binding.TableMissActionDrop) if c.enableProxy { - SpoofGuardTable = c.createOFTable(SpoofGuardTable, ServiceHairpinTable.GetID(), binding.TableMissActionDrop) - IPv6Table = c.createOFTable(IPv6Table, ServiceHairpinTable.GetID(), binding.TableMissActionNext) + SpoofGuardTable.SetNext(ServiceHairpinTable.GetID()) + IPv6Table.SetNext(ServiceHairpinTable.GetID()) + ConntrackStateTable.SetNext(EndpointDNATTable.GetID()) if c.proxyAll { ServiceHairpinTable = c.createOFTable(ServiceHairpinTable, ServiceConntrackTable.GetID(), binding.TableMissActionNext) ServiceConntrackTable = c.createOFTable(ServiceConntrackTable, ConntrackTable.GetID(), binding.TableMissActionNext) @@ -2542,18 +2554,13 @@ func (c *client) generatePipeline() { } else { ServiceHairpinTable = c.createOFTable(ServiceHairpinTable, ConntrackTable.GetID(), binding.TableMissActionNext) } - ConntrackStateTable = c.createOFTable(ConntrackStateTable, EndpointDNATTable.GetID(), binding.TableMissActionNext) SessionAffinityTable = c.createOFTable(SessionAffinityTable, binding.LastTableID, binding.TableMissActionNone) ServiceLBTable = c.createOFTable(ServiceLBTable, EndpointDNATTable.GetID(), binding.TableMissActionNext) EndpointDNATTable = c.createOFTable(EndpointDNATTable, c.egressEntryTable, binding.TableMissActionNext) - ConntrackCommitTable = c.createOFTable(ConntrackCommitTable, HairpinSNATTable.GetID(), binding.TableMissActionNext) HairpinSNATTable = c.createOFTable(HairpinSNATTable, L2ForwardingOutTable.GetID(), binding.TableMissActionNext) + ConntrackCommitTable.SetNext(HairpinSNATTable.GetID()) } else { - c.createOFTable(SpoofGuardTable, ConntrackTable.GetID(), binding.TableMissActionDrop) - c.createOFTable(IPv6Table, ConntrackTable.GetID(), binding.TableMissActionNext) - c.createOFTable(ConntrackStateTable, DNATTable.GetID(), binding.TableMissActionNext) c.createOFTable(DNATTable, c.egressEntryTable, binding.TableMissActionNext) - c.createOFTable(ConntrackCommitTable, L2ForwardingOutTable.GetID(), binding.TableMissActionNext) } // The default SNAT is implemented with OVS on Windows. if c.enableEgress || runtime.IsWindowsPlatform() { @@ -2566,6 +2573,13 @@ func (c *client) generatePipeline() { c.createOFTable(AntreaPolicyEgressRuleTable, EgressRuleTable.GetID(), binding.TableMissActionNext) c.createOFTable(AntreaPolicyIngressRuleTable, IngressRuleTable.GetID(), binding.TableMissActionNext) } + if c.enableMulticast { + SpoofGuardTable.SetNext(MulticastTable.GetID()) + c.createOFTable(MulticastTable, ConntrackTable.GetID(), binding.TableMissActionNext) + if c.enableProxy { + MulticastTable.SetNext(ServiceHairpinTable.GetID()) + } + } } // createOFTable sets the missAction and the next table ID of the given table according to the pipeline. Then it creates the table on the bridge. At last, it adds the table into the ofTableCache. @@ -2575,6 +2589,52 @@ func (c *client) createOFTable(table binding.Table, nextID uint8, missAction bin return table } +// igmpPktInFlows sets reg0[28] to mark the IGMP packet in MulticastTable and sends it to antrea-agent on MulticastTable. +func (c *client) igmpPktInFlows(reason uint8) []binding.Flow { + var flows []binding.Flow + for _, dstIP := range []net.IP{mcastAllRouters, igmpV3ReportDst} { + // Set a custom reason for the IGMP packets, and then send it to antrea-agent and forward it normally in the + // OVS bridge, so that the OVS multicast db cache can be updated, and antrea-agent can identify the local multicast + // group and its members in the meanwhile. + flows = append(flows, MulticastTable.BuildFlow(priorityHigh). + MatchProtocol(binding.ProtocolIGMP). + MatchRegMark(FromLocalRegMark). + MatchDstIP(dstIP). + Action().LoadRegMark(CustomReasonIGMPRegMark). + Action().SendToController(reason). + Action().Normal(). + Done()) + } + return flows +} + +// localMulticastForwardFlow forwards the multicast traffic with OVS action "normal", and outputs it to antrea-gw0 in +// the meanwhile. So that the packet could be forwarded to local Pods which have joined the Multicast group and to the +// external receivers. For the external multicast traffic accessing to the given multicastIP also hits this flow, and +// the packet is not sent back to antrea-gw0 because OVS datapath will drop it when it finds the output port is the same +// as the input port. +func (c *client) localMulticastForwardFlow(multicastIP net.IP) []binding.Flow { + return []binding.Flow{ + MulticastTable.BuildFlow(priorityNormal). + MatchProtocol(binding.ProtocolIP). + MatchDstIP(multicastIP). + Action().Output(config.HostGatewayOFPort). + Action().Normal(). + Done(), + } +} + +// externalMulticastReceiverFlow outputs the multicast traffic to antrea-gw0, so that local Pods can send multicast traffic +// to access the external receivers. For the case that one or more local Pods have joined the target multicast group, +// it is handled by the flows created by function "localMulticastForwardFlow" after local Pods report the IGMP membership. +func (c *client) externalMulticastReceiverFlow() binding.Flow { + return MulticastTable.BuildFlow(priorityLow). + MatchProtocol(binding.ProtocolIP). + MatchDstIPNet(*mcastCIDR). + Action().Output(config.HostGatewayOFPort). + Done() +} + // NewClient is the constructor of the Client interface. func NewClient(bridgeName string, mgmtAddr string, @@ -2584,7 +2644,8 @@ func NewClient(bridgeName string, enableEgress bool, enableDenyTracking bool, proxyAll bool, - connectUplinkToBridge bool) Client { + connectUplinkToBridge bool, + enableMulticast bool) Client { bridge := binding.NewOFBridge(bridgeName, mgmtAddr) policyCache := cache.NewIndexer( policyConjKeyFunc, @@ -2597,6 +2658,7 @@ func NewClient(bridgeName string, enableAntreaPolicy: enableAntreaPolicy, enableDenyTracking: enableDenyTracking, enableEgress: enableEgress, + enableMulticast: enableMulticast, connectUplinkToBridge: connectUplinkToBridge, nodeFlowCache: newFlowCategoryCache(), podFlowCache: newFlowCategoryCache(), @@ -2619,6 +2681,9 @@ func NewClient(bridgeName string, if enableEgress { c.snatFlowCache = newFlowCategoryCache() } + if enableMulticast { + c.mcastFlowCache = newFlowCategoryCache() + } c.generatePipeline() return c } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index b69cbdefa56..7647ebe10d7 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -25,6 +25,7 @@ import ( openflow "antrea.io/antrea/pkg/ovs/openflow" ip "antrea.io/antrea/pkg/util/ip" proxy "antrea.io/antrea/third_party/proxy" + util "antrea.io/libOpenflow/util" gomock "github.com/golang/mock/gomock" v1 "k8s.io/api/core/v1" net "net" @@ -378,6 +379,34 @@ func (mr *MockClientMockRecorder) InstallLoadBalancerServiceFromOutsideFlows(arg return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallLoadBalancerServiceFromOutsideFlows", reflect.TypeOf((*MockClient)(nil).InstallLoadBalancerServiceFromOutsideFlows), arg0, arg1, arg2) } +// InstallMulticastFlow mocks base method +func (m *MockClient) InstallMulticastFlow(arg0 net.IP) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastFlow", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastFlow indicates an expected call of InstallMulticastFlow +func (mr *MockClientMockRecorder) InstallMulticastFlow(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastFlow", reflect.TypeOf((*MockClient)(nil).InstallMulticastFlow), arg0) +} + +// InstallMulticastInitialFlows mocks base method +func (m *MockClient) InstallMulticastInitialFlows(arg0 byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastInitialFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastInitialFlows indicates an expected call of InstallMulticastInitialFlows +func (mr *MockClientMockRecorder) InstallMulticastInitialFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastInitialFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastInitialFlows), arg0) +} + // InstallNodeFlows mocks base method func (m *MockClient) InstallNodeFlows(arg0 string, arg1 map[*net.IPNet]net.IP, arg2 *ip.DualStackIPs, arg3 uint32, arg4 net.HardwareAddr) error { m.ctrl.T.Helper() @@ -612,6 +641,20 @@ func (mr *MockClientMockRecorder) SendICMPPacketOut(arg0, arg1, arg2, arg3, arg4 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendICMPPacketOut", reflect.TypeOf((*MockClient)(nil).SendICMPPacketOut), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10) } +// SendIGMPQueryPacketOut mocks base method +func (m *MockClient) SendIGMPQueryPacketOut(arg0 net.HardwareAddr, arg1 net.IP, arg2 uint32, arg3 util.Message) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendIGMPQueryPacketOut", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendIGMPQueryPacketOut indicates an expected call of SendIGMPQueryPacketOut +func (mr *MockClientMockRecorder) SendIGMPQueryPacketOut(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendIGMPQueryPacketOut", reflect.TypeOf((*MockClient)(nil).SendIGMPQueryPacketOut), arg0, arg1, arg2, arg3) +} + // SendTCPPacketOut mocks base method func (m *MockClient) SendTCPPacketOut(arg0, arg1, arg2, arg3 string, arg4, arg5 uint32, arg6 bool, arg7, arg8 uint16, arg9 uint32, arg10 byte, arg11 func(openflow.PacketOutBuilder) openflow.PacketOutBuilder) error { m.ctrl.T.Helper() @@ -708,6 +751,20 @@ func (mr *MockClientMockRecorder) UninstallLoadBalancerServiceFromOutsideFlows(a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallLoadBalancerServiceFromOutsideFlows", reflect.TypeOf((*MockClient)(nil).UninstallLoadBalancerServiceFromOutsideFlows), arg0, arg1, arg2) } +// UninstallMulticastFlow mocks base method +func (m *MockClient) UninstallMulticastFlow(arg0 net.IP) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallMulticastFlow", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallMulticastFlow indicates an expected call of UninstallMulticastFlow +func (mr *MockClientMockRecorder) UninstallMulticastFlow(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallMulticastFlow", reflect.TypeOf((*MockClient)(nil).UninstallMulticastFlow), arg0) +} + // UninstallNodeFlows mocks base method func (m *MockClient) UninstallNodeFlows(arg0 string) error { m.ctrl.T.Helper() diff --git a/pkg/apiserver/handlers/featuregates/handler.go b/pkg/apiserver/handlers/featuregates/handler.go index 2587d8e2dc9..92444048434 100644 --- a/pkg/apiserver/handlers/featuregates/handler.go +++ b/pkg/apiserver/handlers/featuregates/handler.go @@ -30,7 +30,7 @@ import ( ) var controllerGates = sets.NewString("Traceflow", "AntreaPolicy", "Egress", "NetworkPolicyStats", "NodeIPAM") -var agentGates = sets.NewString("AntreaPolicy", "AntreaProxy", "Egress", "EndpointSlice", "Traceflow", "FlowExporter", "NetworkPolicyStats", "NodePortLocal", "AntreaIPAM") +var agentGates = sets.NewString("AntreaPolicy", "AntreaProxy", "Egress", "EndpointSlice", "Traceflow", "FlowExporter", "NetworkPolicyStats", "NodePortLocal", "AntreaIPAM", "Multicast") type ( Config struct { diff --git a/pkg/apiserver/handlers/featuregates/handler_test.go b/pkg/apiserver/handlers/featuregates/handler_test.go index 062f014f12c..3256dd8f46d 100644 --- a/pkg/apiserver/handlers/featuregates/handler_test.go +++ b/pkg/apiserver/handlers/featuregates/handler_test.go @@ -54,6 +54,7 @@ func Test_getGatesResponse(t *testing.T) { {Component: "agent", Name: "FlowExporter", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "NetworkPolicyStats", Status: "Enabled", Version: "BETA"}, {Component: "agent", Name: "NodePortLocal", Status: "Enabled", Version: "BETA"}, + {Component: "agent", Name: "Multicast", Status: "Disabled", Version: "ALPHA"}, }, }, } diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 11321495224..d81eae4fec0 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -76,6 +76,10 @@ const ( // alpha: v1.4 // Enable flexible IPAM for Pods. AntreaIPAM featuregate.Feature = "AntreaIPAM" + + // alpha: v1.5 + // Enable Multicast. + Multicast featuregate.Feature = "Multicast" ) var ( @@ -100,6 +104,7 @@ var ( NetworkPolicyStats: {Default: true, PreRelease: featuregate.Beta}, NodePortLocal: {Default: true, PreRelease: featuregate.Beta}, NodeIPAM: {Default: false, PreRelease: featuregate.Alpha}, + Multicast: {Default: false, PreRelease: featuregate.Alpha}, } // UnsupportedFeaturesOnWindows records the features not supported on @@ -116,6 +121,7 @@ var ( NodePortLocal: {}, Egress: {}, AntreaIPAM: {}, + Multicast: {}, } ) diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index c40e6d98b6f..38ed6447282 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -18,6 +18,7 @@ import ( "net" "time" + "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" ) @@ -46,6 +47,7 @@ const ( ProtocolSCTPv6 Protocol = "sctpv6" ProtocolICMP Protocol = "icmp" ProtocolICMPv6 Protocol = "icmpv6" + ProtocolIGMP Protocol = "igmp" ) const ( @@ -382,6 +384,7 @@ type PacketOutBuilder interface { AddLoadAction(name string, data uint64, rng *Range) PacketOutBuilder AddLoadRegMark(mark *RegMark) PacketOutBuilder AddResubmitAction(inPort *uint16, table *uint8) PacketOutBuilder + SetL4Packet(packet util.Message) PacketOutBuilder Done() *ofctrl.PacketOut } diff --git a/pkg/ovs/openflow/ofctrl_builder.go b/pkg/ovs/openflow/ofctrl_builder.go index eb7cc48b153..fd87cf50755 100644 --- a/pkg/ovs/openflow/ofctrl_builder.go +++ b/pkg/ovs/openflow/ofctrl_builder.go @@ -486,6 +486,9 @@ func (b *ofFlowBuilder) MatchProtocol(protocol Protocol) FlowBuilder { case ProtocolICMPv6: b.Match.Ethertype = 0x86dd b.Match.IpProto = 58 + case ProtocolIGMP: + b.Match.Ethertype = 0x0800 + b.Match.IpProto = 2 } b.protocol = protocol return b diff --git a/pkg/ovs/openflow/ofctrl_packetout.go b/pkg/ovs/openflow/ofctrl_packetout.go index e0ecb510644..8f40f2debe8 100644 --- a/pkg/ovs/openflow/ofctrl_packetout.go +++ b/pkg/ovs/openflow/ofctrl_packetout.go @@ -20,6 +20,7 @@ import ( "net" "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" "k8s.io/klog/v2" ) @@ -103,6 +104,8 @@ func (b *ofPacketOutBuilder) SetIPProtocol(proto Protocol) PacketOutBuilder { b.pktOut.IPHeader.Protocol = 0x84 case ProtocolICMP: b.pktOut.IPHeader.Protocol = protocol.Type_ICMP + case ProtocolIGMP: + b.pktOut.IPHeader.Protocol = protocol.Type_IGMP default: b.pktOut.IPHeader.Protocol = 0xff } @@ -286,6 +289,13 @@ func (b *ofPacketOutBuilder) SetOutport(outport uint32) PacketOutBuilder { return b } +// SetL4Packet sets the L4 packet of the packetOut message. It provides a generic function to create a packet +// of other protocols other than TCP/UDP/ICMP. +func (b *ofPacketOutBuilder) SetL4Packet(packet util.Message) PacketOutBuilder { + b.pktOut.IPHeader.Data = packet + return b +} + // AddLoadAction loads the data to the target field at specified range when the packet is received by OVS Switch. func (b *ofPacketOutBuilder) AddLoadAction(name string, data uint64, rng *Range) PacketOutBuilder { act, _ := ofctrl.NewNXLoadAction(name, data, rng.ToNXRange()) @@ -330,6 +340,18 @@ func (b *ofPacketOutBuilder) Done() *ofctrl.PacketOut { b.pktOut.UDPHeader.Length = b.pktOut.UDPHeader.Len() b.pktOut.UDPHeader.Checksum = b.udpHeaderChecksum() b.pktOut.IPHeader.Length = 20 + b.pktOut.UDPHeader.Len() + } else if b.pktOut.IPHeader.Protocol == protocol.Type_IGMP { + if igmpv1or2, ok := b.pktOut.IPHeader.Data.(*protocol.IGMPv1or2); ok { + igmpv1or2.Checksum = 0 + igmpv1or2.Checksum = b.igmpHeaderChecksum() + } else if igmpv3Query, ok := b.pktOut.IPHeader.Data.(*protocol.IGMPv3Query); ok { + igmpv3Query.Checksum = 0 + igmpv3Query.Checksum = b.igmpHeaderChecksum() + } else if igmpv3Report, ok := b.pktOut.IPHeader.Data.(*protocol.IGMPv3MembershipReport); ok { + igmpv3Report.Checksum = 0 + igmpv3Report.Checksum = b.igmpHeaderChecksum() + } + b.pktOut.IPHeader.Length = 20 + b.pktOut.IPHeader.Data.Len() } if b.pktOut.IPHeader.Id == 0 { // #nosec G404: random number generator not used for security purposes @@ -421,6 +443,12 @@ func (b *ofPacketOutBuilder) udpHeaderChecksum() uint16 { return checksum } +func (b *ofPacketOutBuilder) igmpHeaderChecksum() uint16 { + data, _ := b.pktOut.IPHeader.Data.MarshalBinary() + checksum := checksum(data) + return checksum +} + func (b *ofPacketOutBuilder) generatePseudoHeader(length uint16) []byte { var pseudoHeader []byte if b.pktOut.IPv6Header == nil { diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 8d44ca85bdd..0b36cefea4c 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -116,7 +116,7 @@ func TestConnectivityFlows(t *testing.T) { antrearuntime.WindowsOS = runtime.GOOS } - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -160,7 +160,7 @@ func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, true) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -202,7 +202,7 @@ func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { } func TestReplayFlowsConnectivityFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -238,7 +238,7 @@ func TestReplayFlowsConnectivityFlows(t *testing.T) { } func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -424,7 +424,7 @@ func TestNetworkPolicyFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -534,7 +534,7 @@ func TestIPv6ConnectivityFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -576,7 +576,7 @@ type svcConfig struct { } func TestProxyServiceFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -1515,7 +1515,7 @@ func prepareSNATFlows(snatIP net.IP, mark, podOFPort, podOFPortRemote uint32, vM } func TestSNATFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, false, false, true, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, false, false, true, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br))