From 29d85b61a6feea062b55c3e59497f852f5e4b1a8 Mon Sep 17 00:00:00 2001 From: Wenying Dong Date: Wed, 20 Mar 2024 11:23:24 +0800 Subject: [PATCH] [Bugfix] Install Multicast related iptables rules only on IPv4 cluster Add a pre-check on the Multicast feature gate status with IPv6-only cluster settings in agent Initializer, and install the iptables rules only in the IPv4 related chains. Signed-off-by: Wenying Dong --- cmd/antrea-agent/agent.go | 4 +- pkg/agent/multicast/mcast_controller.go | 19 +++++- pkg/agent/multicast/mcast_controller_test.go | 72 +++++++++++++++----- pkg/agent/route/route_linux.go | 11 ++- pkg/agent/route/route_linux_test.go | 1 - 5 files changed, 85 insertions(+), 22 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 7c663f57e89..a912db846a6 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -818,7 +818,9 @@ func run(o *Options) error { validator, networkConfig.TrafficEncapMode.SupportsEncap(), nodeInformer, - enableBridgingMode) + enableBridgingMode, + v4Enabled, + v6Enabled) if err := mcastController.Initialize(); err != nil { return err } diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 9de45eff092..653e21db750 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -15,6 +15,7 @@ package multicast import ( + "fmt" "net" "sync" "time" @@ -265,6 +266,13 @@ type Controller struct { installedNodes sets.Set[string] encapEnabled bool flexibleIPAMEnabled bool + // ipv4Enabled is the flag that if it is running on IPv4 cluster. An error is returned if IPv4Enabled is false + // in Initialize as Multicast does not support IPv6 for now. + // TODO: remove this flag after IPv6 is supported in Multicast. + ipv4Enabled bool + // ipv6Enabled is the flag that if it is running on IPv6 cluster. + // TODO: remove this flag after IPv6 is supported in Multicast. + ipv6Enabled bool } func NewMulticastController(ofClient openflow.Client, @@ -279,7 +287,9 @@ func NewMulticastController(ofClient openflow.Client, validator types.McastNetworkPolicyController, isEncap bool, nodeInformer coreinformers.NodeInformer, - enableFlexibleIPAM bool) *Controller { + enableFlexibleIPAM bool, + ipv4Enabled bool, + ipv6Enabled bool) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, igmpQueryVersions, validator, isEncap) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ @@ -303,6 +313,8 @@ func NewMulticastController(ofClient openflow.Client, queryGroupId: v4GroupAllocator.Allocate(), encapEnabled: isEncap, flexibleIPAMEnabled: enableFlexibleIPAM, + ipv4Enabled: ipv4Enabled, + ipv6Enabled: ipv6Enabled, } if isEncap { c.nodeGroupID = v4GroupAllocator.Allocate() @@ -331,6 +343,11 @@ func NewMulticastController(ofClient openflow.Client, } func (c *Controller) Initialize() error { + if !c.ipv4Enabled { + return fmt.Errorf("Multicast is not supported on an IPv6-only cluster") + } else if c.ipv6Enabled { + klog.InfoS("Multicast only works with IPv4 traffic on a dual-stack cluster") + } err := c.mRouteClient.Initialize() if err != nil { return err diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index 2956e6d1ea7..e8f61a84767 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -92,7 +92,7 @@ func TestAddGroupMemberStatus(t *testing.T) { iface: if1, } mctrl := newMockMulticastController(t, false, false) - err := mctrl.initialize(t) + err := mctrl.initialize() mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, } @@ -115,7 +115,7 @@ func TestAddGroupMemberStatus(t *testing.T) { func TestUpdateGroupMemberStatus(t *testing.T) { mctrl := newMockMulticastController(t, false, false) - err := mctrl.initialize(t) + err := mctrl.initialize() assert.NoError(t, err) mgroup := net.ParseIP("224.96.1.4") event := &mcastGroupEvent{ @@ -181,7 +181,7 @@ func TestUpdateGroupMemberStatus(t *testing.T) { func TestCheckNodeUpdate(t *testing.T) { mockController := newMockMulticastController(t, false, false) - err := mockController.initialize(t) + err := mockController.initialize() require.NoError(t, err) for _, tc := range []struct { @@ -349,7 +349,7 @@ func TestGetGroupPods(t *testing.T) { now := time.Now() mctrl := newMockMulticastController(t, false, false) - err := mctrl.initialize(t) + err := mctrl.initialize() require.NoError(t, err) groupMemberStatuses := []*GroupMemberStatus{ { @@ -385,7 +385,7 @@ func TestGetGroupPods(t *testing.T) { func TestGetPodStats(t *testing.T) { mctrl := newMockMulticastController(t, false, false) - err := mctrl.initialize(t) + err := mctrl.initialize() require.NoError(t, err) iface := if1 @@ -402,7 +402,7 @@ func TestGetPodStats(t *testing.T) { func TestGetAllPodStats(t *testing.T) { mctrl := newMockMulticastController(t, false, false) - err := mctrl.initialize(t) + err := mctrl.initialize() require.NoError(t, err) for _, tc := range []struct { @@ -447,7 +447,7 @@ func TestGetAllPodStats(t *testing.T) { func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) { mctrl := newMockMulticastController(t, false, false) workerCount = 1 - err := mctrl.initialize(t) + err := mctrl.initialize() require.NoError(t, err) now := time.Now() staleTime := now.Add(-mctrl.mcastGroupTimeout - time.Second) @@ -483,7 +483,7 @@ func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) { func TestClearStaleGroups(t *testing.T) { mctrl := newMockMulticastController(t, false, false) workerCount = 1 - err := mctrl.initialize(t) + err := mctrl.initialize() require.NoError(t, err) mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, @@ -734,13 +734,13 @@ func TestProcessPacketIn(t *testing.T) { func TestEncapModeInitialize(t *testing.T) { mockController := newMockMulticastController(t, true, false) assert.NotZero(t, mockController.nodeGroupID) - err := mockController.initialize(t) + err := mockController.initialize() assert.NoError(t, err) } func TestEncapLocalReportAndNotifyRemote(t *testing.T) { mockController := newMockMulticastController(t, true, false) - _ = mockController.initialize(t) + _ = mockController.initialize() mockController.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, } @@ -942,7 +942,7 @@ func TestNodeUpdate(t *testing.T) { func TestMemberChanged(t *testing.T) { mockController := newMockMulticastController(t, false, false) - _ = mockController.initialize(t) + _ = mockController.initialize() containerA := &interfacestore.ContainerInterfaceConfig{PodNamespace: "nameA", PodName: "podA", ContainerID: "tttt"} containerB := &interfacestore.ContainerInterfaceConfig{PodNamespace: "nameA", PodName: "podB", ContainerID: "mmmm"} @@ -1085,7 +1085,7 @@ func TestConcurrentEventHandlerAndWorkers(t *testing.T) { func TestRemoteMemberJoinLeave(t *testing.T) { mockController := newMockMulticastController(t, true, false) - _ = mockController.initialize(t) + _ = mockController.initialize() stopCh := make(chan struct{}) defer close(stopCh) @@ -1255,17 +1255,58 @@ func newMockMulticastController(t *testing.T, isEncap bool, enableFlexibleIPAM b clientset = fake.NewSimpleClientset() informerFactory = informers.NewSharedInformerFactory(clientset, 12*time.Hour) nodeInformer := informerFactory.Core().V1().Nodes() - mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM) + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM, true, false) return mctrl } func TestFlexibleIPAMModeInitialize(t *testing.T) { mockController := newMockMulticastController(t, false, true) - err := mockController.initialize(t) + err := mockController.initialize() assert.NoError(t, err) } -func (c *Controller) initialize(t *testing.T) error { +func TestMulticastControllerOnIPv6Cluster(t *testing.T) { + for _, tc := range []struct { + name string + ipv4Enabled bool + ipv6Enabled bool + expErr string + }{ + { + name: "Fails on IPv6-only cluster", + ipv4Enabled: false, + ipv6Enabled: true, + expErr: "Multicast is not supported on an IPv6-only cluster", + }, + { + name: "Succeeds on dual-stack cluster", + ipv4Enabled: true, + ipv6Enabled: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + mockController := newMockMulticastController(t, true, false) + mockController.ipv4Enabled = tc.ipv4Enabled + mockController.ipv6Enabled = tc.ipv6Enabled + if tc.expErr == "" { + mockController.initMocks() + } + err := mockController.Initialize() + if tc.expErr != "" { + assert.EqualError(t, err, tc.expErr) + } else { + assert.NoError(t, err) + } + }) + } +} + +func (c *Controller) initialize() error { + c.initMocks() + return c.Initialize() +} + +func (c *Controller) initMocks() { mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) mockIfaceStore.EXPECT().GetInterfacesByType(interfacestore.InterfaceType(0)).Times(1).Return([]*interfacestore.InterfaceConfig{}) @@ -1278,7 +1319,6 @@ func (c *Controller) initialize(t *testing.T) error { if c.flexibleIPAMEnabled { mockOFClient.EXPECT().InstallMulticastFlexibleIPAMFlows().Times(1) } - return c.Initialize() } func createInterface(name string, ofport uint32) *interfacestore.InterfaceConfig { diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index f18d5e4d424..31890415aa5 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -128,7 +128,8 @@ type Client struct { nodePortsIPv6 sync.Map // clusterNodeIPs stores the IPv4 of all other Nodes in the cluster clusterNodeIPs sync.Map - // clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster + // clusterNodeIP6s stores the IPv6 address of all other Nodes in the cluster. It is maintained but not consumed + // until Multicast supports IPv6. clusterNodeIP6s sync.Map // egressRoutes caches ip routes about Egresses. egressRoutes sync.Map @@ -709,7 +710,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, }...) } - if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() { + // Note: Multicast can only work with IPv4 for now. Remove condition "!isIPv6" in the future after + // IPv6 is supported. + if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsEncap() { // Drop the multicast packets forwarded from other Nodes in the cluster. This is because // the packet sent out from the sender Pod is already received via tunnel port with encap mode, // and the one forwarded via the underlay network is to send to external receivers @@ -832,7 +835,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain)) // The masqueraded multicast traffic will become unicast so we // stop traversing this antreaPostRoutingChain for multicast traffic. - if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsNoEncap() { + // Note: Multicast can only work with IPv4 for now. Remove condition "!isIPv6" in the future after + // IPv6 is supported. + if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsNoEncap() { writeLine(iptablesData, []string{ "-A", antreaPostRoutingChain, "-m", "comment", "--comment", `"Antrea: skip masquerade for multicast traffic"`, diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index 417bc1613a1..ab266186509 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -402,7 +402,6 @@ COMMIT :ANTREA-OUTPUT - [0:0] -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK -A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK --A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP6 src -d 224.0.0.0/4 -j DROP COMMIT *mangle :ANTREA-MANGLE - [0:0]