Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] Install Multicast related iptables rules only on IPv4 chains #6123

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,9 @@ func run(o *Options) error {
validator,
networkConfig.TrafficEncapMode.SupportsEncap(),
nodeInformer,
enableBridgingMode)
enableBridgingMode,
v4Enabled,
v6Enabled)
if err := mcastController.Initialize(); err != nil {
return err
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package multicast

import (
"fmt"
"net"
"sync"
"time"
Expand Down Expand Up @@ -265,6 +266,13 @@ type Controller struct {
installedNodes sets.Set[string]
encapEnabled bool
flexibleIPAMEnabled bool
// ipv4Enabled is the flag that if it is running on IPv4 cluster. An error is returned if IPv4Enabled is false
// in Initialize as Multicast does not support IPv6 for now.
// TODO: remove this flag after IPv6 is supported in Multicast.
ipv4Enabled bool
antoninbas marked this conversation as resolved.
Show resolved Hide resolved
// ipv6Enabled is the flag that if it is running on IPv6 cluster.
// TODO: remove this flag after IPv6 is supported in Multicast.
ipv6Enabled bool
}

func NewMulticastController(ofClient openflow.Client,
Expand All @@ -279,7 +287,9 @@ func NewMulticastController(ofClient openflow.Client,
validator types.McastNetworkPolicyController,
isEncap bool,
nodeInformer coreinformers.NodeInformer,
enableFlexibleIPAM bool) *Controller {
enableFlexibleIPAM bool,
ipv4Enabled bool,
ipv6Enabled bool) *Controller {
eventCh := make(chan *mcastGroupEvent, workerCount)
groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, igmpQueryVersions, validator, isEncap)
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
Expand All @@ -303,6 +313,8 @@ func NewMulticastController(ofClient openflow.Client,
queryGroupId: v4GroupAllocator.Allocate(),
encapEnabled: isEncap,
flexibleIPAMEnabled: enableFlexibleIPAM,
ipv4Enabled: ipv4Enabled,
ipv6Enabled: ipv6Enabled,
}
if isEncap {
c.nodeGroupID = v4GroupAllocator.Allocate()
Expand Down Expand Up @@ -331,6 +343,11 @@ func NewMulticastController(ofClient openflow.Client,
}

func (c *Controller) Initialize() error {
if !c.ipv4Enabled {
return fmt.Errorf("Multicast is not supported on an IPv6-only cluster")
} else if c.ipv6Enabled {
klog.InfoS("Multicast only works with IPv4 traffic on a dual-stack cluster")
}
err := c.mRouteClient.Initialize()
if err != nil {
return err
Expand Down
72 changes: 56 additions & 16 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestAddGroupMemberStatus(t *testing.T) {
iface: if1,
}
mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{
{Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}},
}
Expand All @@ -115,7 +115,7 @@ func TestAddGroupMemberStatus(t *testing.T) {

func TestUpdateGroupMemberStatus(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
assert.NoError(t, err)
mgroup := net.ParseIP("224.96.1.4")
event := &mcastGroupEvent{
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestUpdateGroupMemberStatus(t *testing.T) {

func TestCheckNodeUpdate(t *testing.T) {
mockController := newMockMulticastController(t, false, false)
err := mockController.initialize(t)
err := mockController.initialize()
require.NoError(t, err)

for _, tc := range []struct {
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestGetGroupPods(t *testing.T) {
now := time.Now()

mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)
groupMemberStatuses := []*GroupMemberStatus{
{
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestGetGroupPods(t *testing.T) {

func TestGetPodStats(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)

iface := if1
Expand All @@ -402,7 +402,7 @@ func TestGetPodStats(t *testing.T) {

func TestGetAllPodStats(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)

for _, tc := range []struct {
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestGetAllPodStats(t *testing.T) {
func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
workerCount = 1
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)
now := time.Now()
staleTime := now.Add(-mctrl.mcastGroupTimeout - time.Second)
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) {
func TestClearStaleGroups(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
workerCount = 1
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)
mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{
{Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}},
Expand Down Expand Up @@ -734,13 +734,13 @@ func TestProcessPacketIn(t *testing.T) {
func TestEncapModeInitialize(t *testing.T) {
mockController := newMockMulticastController(t, true, false)
assert.NotZero(t, mockController.nodeGroupID)
err := mockController.initialize(t)
err := mockController.initialize()
assert.NoError(t, err)
}

func TestEncapLocalReportAndNotifyRemote(t *testing.T) {
mockController := newMockMulticastController(t, true, false)
_ = mockController.initialize(t)
_ = mockController.initialize()
mockController.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{
{Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}},
}
Expand Down Expand Up @@ -942,7 +942,7 @@ func TestNodeUpdate(t *testing.T) {

func TestMemberChanged(t *testing.T) {
mockController := newMockMulticastController(t, false, false)
_ = mockController.initialize(t)
_ = mockController.initialize()

containerA := &interfacestore.ContainerInterfaceConfig{PodNamespace: "nameA", PodName: "podA", ContainerID: "tttt"}
containerB := &interfacestore.ContainerInterfaceConfig{PodNamespace: "nameA", PodName: "podB", ContainerID: "mmmm"}
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func TestConcurrentEventHandlerAndWorkers(t *testing.T) {

func TestRemoteMemberJoinLeave(t *testing.T) {
mockController := newMockMulticastController(t, true, false)
_ = mockController.initialize(t)
_ = mockController.initialize()
stopCh := make(chan struct{})
defer close(stopCh)

Expand Down Expand Up @@ -1255,17 +1255,58 @@ func newMockMulticastController(t *testing.T, isEncap bool, enableFlexibleIPAM b
clientset = fake.NewSimpleClientset()
informerFactory = informers.NewSharedInformerFactory(clientset, 12*time.Hour)
nodeInformer := informerFactory.Core().V1().Nodes()
mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM)
mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM, true, false)
return mctrl
}

func TestFlexibleIPAMModeInitialize(t *testing.T) {
mockController := newMockMulticastController(t, false, true)
err := mockController.initialize(t)
err := mockController.initialize()
assert.NoError(t, err)
}

func (c *Controller) initialize(t *testing.T) error {
func TestMulticastControllerOnIPv6Cluster(t *testing.T) {
for _, tc := range []struct {
name string
ipv4Enabled bool
ipv6Enabled bool
expErr string
}{
{
name: "Fails on IPv6-only cluster",
ipv4Enabled: false,
ipv6Enabled: true,
expErr: "Multicast is not supported on an IPv6-only cluster",
},
{
name: "Succeeds on dual-stack cluster",
ipv4Enabled: true,
ipv6Enabled: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
mockController := newMockMulticastController(t, true, false)
mockController.ipv4Enabled = tc.ipv4Enabled
mockController.ipv6Enabled = tc.ipv6Enabled
if tc.expErr == "" {
mockController.initMocks()
}
err := mockController.Initialize()
Comment on lines +1291 to +1294
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we just call err := mockController.initialize() unconditionally here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there are some mock functions inside mockController.initialize(), which are called only when mockController.Initialize() returns nil. If we call it (initialize not Initialize) unconditionally, the IPv6-only case would fail as those mock expects are missing.

if tc.expErr != "" {
assert.EqualError(t, err, tc.expErr)
} else {
assert.NoError(t, err)
}
})
}
}

func (c *Controller) initialize() error {
c.initMocks()
return c.Initialize()
}

func (c *Controller) initMocks() {
mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1)
mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any())
mockIfaceStore.EXPECT().GetInterfacesByType(interfacestore.InterfaceType(0)).Times(1).Return([]*interfacestore.InterfaceConfig{})
Expand All @@ -1278,7 +1319,6 @@ func (c *Controller) initialize(t *testing.T) error {
if c.flexibleIPAMEnabled {
mockOFClient.EXPECT().InstallMulticastFlexibleIPAMFlows().Times(1)
}
return c.Initialize()
}

func createInterface(name string, ofport uint32) *interfacestore.InterfaceConfig {
Expand Down
11 changes: 8 additions & 3 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ type Client struct {
nodePortsIPv6 sync.Map
// clusterNodeIPs stores the IPv4 of all other Nodes in the cluster
clusterNodeIPs sync.Map
// clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster
// clusterNodeIP6s stores the IPv6 address of all other Nodes in the cluster. It is maintained but not consumed
// until Multicast supports IPv6.
clusterNodeIP6s sync.Map
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need clusterNodeIP6s when implementing Multicast for IPv6 in the future?
If yes, perhaps don't remove it, just remove its consumer code and add a comment that it's retained for future Multicast support to avoid back and forth change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shall use it in the future after Multicast is supported with IPv6. Do you mean we can keep the publisher to add Node IPv6 address into this map, but not consume the items?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just add the comment why it's maintained but not used currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

// egressRoutes caches ip routes about Egresses.
egressRoutes sync.Map
Expand Down Expand Up @@ -709,7 +710,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
}...)
}

if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() {
// Note: Multicast can only work with IPv4 for now. Remove condition "!isIPv6" in the future after
// IPv6 is supported.
if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsEncap() {
// Drop the multicast packets forwarded from other Nodes in the cluster. This is because
// the packet sent out from the sender Pod is already received via tunnel port with encap mode,
// and the one forwarded via the underlay network is to send to external receivers
Expand Down Expand Up @@ -832,7 +835,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain))
// The masqueraded multicast traffic will become unicast so we
// stop traversing this antreaPostRoutingChain for multicast traffic.
if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsNoEncap() {
// Note: Multicast can only work with IPv4 for now. Remove condition "!isIPv6" in the future after
// IPv6 is supported.
if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsNoEncap() {
writeLine(iptablesData, []string{
"-A", antreaPostRoutingChain,
"-m", "comment", "--comment", `"Antrea: skip masquerade for multicast traffic"`,
Expand Down
1 change: 0 additions & 1 deletion pkg/agent/route/route_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ COMMIT
:ANTREA-OUTPUT - [0:0]
-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK
-A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK
-A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP6 src -d 224.0.0.0/4 -j DROP
COMMIT
*mangle
:ANTREA-MANGLE - [0:0]
Expand Down
Loading