Skip to content

Commit

Permalink
Fix Egress not working with kube-proxy IPVS strictARP mode
Browse files Browse the repository at this point in the history
Check the arp_ignore sysctl value of the transport interface
and start a userspace ARP responder if it has a value other
than 0.

Fixes: #3804

Signed-off-by: Xu Liu <xliu2@vmware.com>
  • Loading branch information
xliuxu committed May 28, 2022
1 parent 5efac86 commit 2ded772
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 60 deletions.
62 changes: 54 additions & 8 deletions pkg/agent/ipassigner/ip_assigner_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"antrea.io/antrea/pkg/agent/ipassigner/responder"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/util/sysctl"
)

// ipAssigner creates a dummy device and assigns IPs to it.
Expand Down Expand Up @@ -60,11 +61,22 @@ func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (*ipAs
assignedIPs: sets.NewString(),
}
if ipv4 != nil {
arpResonder, err := responder.NewARPResponder(externalInterface)
// For Egress scenario, the external IPs should always be present on the dummy
// interface as they are used as tunnel endpoints. If arp_ignore it set to values
// other than 0, the host will not reply to ARP requests received on the transport
// interface and the target IPs are assigned on the dummy interface. So a userspace
// ARP responder is needed to handle ARP requests for the Egress IPs.
arpIgnore, err := getARPIgnoreForInterface(externalInterface.Name)
if err != nil {
return nil, fmt.Errorf("failed to create ARP responder for link %s: %v", externalInterface.Name, err)
return nil, err
}
if dummyDeviceName == "" || arpIgnore > 0 {
arpResonder, err := responder.NewARPResponder(externalInterface)
if err != nil {
return nil, fmt.Errorf("failed to create ARP responder for link %s: %v", externalInterface.Name, err)
}
a.arpResponder = arpResonder
}
a.arpResponder = arpResonder
}
if ipv6 != nil {
ndpResponder, err := responder.NewNDPResponder(externalInterface)
Expand All @@ -83,9 +95,47 @@ func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (*ipAs
return nil, fmt.Errorf("error when loading IP addresses from the system: %v", err)
}
}
// Copy the assigned IPs on the dummy interface to ARP responder or NDP responder.
var assignedIPsV4, assignedIPsV6 []net.IP
for assigned := range a.assignedIPs {
ip := net.ParseIP(assigned)
if utilnet.IsIPv4(ip) {
assignedIPsV4 = append(assignedIPsV4, ip)
}
if utilnet.IsIPv6(ip) {
assignedIPsV6 = append(assignedIPsV6, ip)
}
}
if a.arpResponder != nil {
if err = a.arpResponder.ReplaceIPs(assignedIPsV4); err != nil {
return nil, fmt.Errorf("failed to replace IPs for ARP responder: %w", err)
}
}
if a.ndpResponder != nil {
if err = a.ndpResponder.ReplaceIPs(assignedIPsV6); err != nil {
return nil, fmt.Errorf("failed to replace IPs for NDP responder: %w", err)
}
}
return a, nil
}

// getARPIgnoreForInterface gets the max value of conf/{all,interface}/arp_ignore form sysctl.
func getARPIgnoreForInterface(iface string) (int, error) {
arpIgnoreAll, err := sysctl.GetSysctlNet("ipv4/conf/all/arp_ignore")
if err != nil {
return 0, fmt.Errorf("failed to get arp_ignore for all interfaces: %w", err)
}
arpIgnore, err := sysctl.GetSysctlNet(fmt.Sprintf("ipv4/conf/%s/arp_ignore", iface))
if err != nil {
return 0, fmt.Errorf("failed to get arp_ignore for %s: %w", iface, err)
}
if arpIgnore > arpIgnoreAll {
return arpIgnore, nil
}
return arpIgnoreAll, nil

}

// ensureDummyDevice creates the dummy device if it doesn't exist.
func ensureDummyDevice(deviceName string) (netlink.Link, error) {
link, err := netlink.LinkByName(deviceName)
Expand Down Expand Up @@ -210,11 +260,7 @@ func (a *ipAssigner) AssignedIPs() sets.String {

// Run starts the ARP responder and NDP responder.
func (a *ipAssigner) Run(ch <-chan struct{}) {
// Start the ARP responder only when the dummy device is not created. The kernel will handle ARP requests
// for IPs assigned to the dummy devices by default.
// TODO: Check the arp_ignore sysctl parameter of the transport interface to determine whether to start
// the ARP responder or not.
if a.dummyDevice == nil && a.arpResponder != nil {
if a.arpResponder != nil {
go a.arpResponder.Run(ch)
}
if a.ndpResponder != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/ipassigner/responder/arp_responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ func (r *arpResponder) RemoveIP(ip net.IP) error {
return nil
}

func (r *arpResponder) ReplaceIPs(ips []net.IP) error {
newIPs := make([]string, 0, len(ips))
for _, ip := range ips {
if utilnet.IsIPv4(ip) {
newIPs = append(newIPs, ip.String())
}
}
r.mutex.Lock()
defer r.mutex.Unlock()
r.assignedIPs = sets.NewString(newIPs...)
return nil
}

func (r *arpResponder) handleARPRequest() error {
pkt, _, err := r.conn.Read()
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/agent/ipassigner/responder/arp_responder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,30 @@ func Test_arpResponder_removeIP(t *testing.T) {
})
}
}

func Test_arpResponder_ReplaceIPs(t *testing.T) {
tests := []struct {
name string
ipToReplace []net.IP
expectedAssignedIPs sets.String
}{
{
name: "Add two IPs",
ipToReplace: []net.IP{net.ParseIP("1.2.3.4"), net.ParseIP("1.2.3.5")},
expectedAssignedIPs: sets.NewString("1.2.3.4", "1.2.3.5"),
},
{
name: "Add IPs with duplicates",
ipToReplace: []net.IP{net.ParseIP("1.2.3.4"), net.ParseIP("1.2.3.5"), net.ParseIP("1.2.3.5")},
expectedAssignedIPs: sets.NewString("1.2.3.4", "1.2.3.5"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &arpResponder{}
err := r.ReplaceIPs(tt.ipToReplace)
assert.NoError(t, err)
assert.Equal(t, tt.expectedAssignedIPs, r.assignedIPs)
})
}
}
2 changes: 2 additions & 0 deletions pkg/agent/ipassigner/responder/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Responder interface {
AddIP(net.IP) error
// RemoveIP removes the IP from the responder.
RemoveIP(net.IP) error
// ReplaceIPs replaces the assigned IPs with the given IPs.
ReplaceIPs([]net.IP) error
// Run starts the responder.
Run(<-chan struct{})
}
50 changes: 38 additions & 12 deletions pkg/agent/ipassigner/responder/ndp_responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,17 @@ type ndpResponder struct {
iface *net.Interface
conn ndpConn
assignedIPs sets.String
multicastGroups map[int]int
multicastGroups map[string]int
mutex sync.Mutex
}

var _ Responder = (*ndpResponder)(nil)

func parseIPv6SolicitedNodeMulticastAddress(ip net.IP) (net.IP, int) {
func parseIPv6SolicitedNodeMulticastAddress(ip net.IP) net.IP {
group := net.ParseIP(solicitedNodeMulticastAddressPrefix)
// copy lower 24 bits
copy(group[13:], ip[13:])
key := int(group[13])<<16 | int(group[14])<<8 | int(group[15])
return group, key
return group
}

func NewNDPResponder(iface *net.Interface) (*ndpResponder, error) {
Expand All @@ -64,7 +63,7 @@ func NewNDPResponder(iface *net.Interface) (*ndpResponder, error) {
return &ndpResponder{
iface: iface,
conn: conn,
multicastGroups: make(map[int]int),
multicastGroups: make(map[string]int),
assignedIPs: sets.NewString(),
}, nil
}
Expand Down Expand Up @@ -156,18 +155,18 @@ func (r *ndpResponder) AddIP(ip net.IP) error {
if r.isIPAssigned(ip) {
return nil
}
group, key := parseIPv6SolicitedNodeMulticastAddress(ip)
group := parseIPv6SolicitedNodeMulticastAddress(ip)
if err := func() error {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.multicastGroups[key] == 0 {
if r.multicastGroups[group.String()] == 0 {
if err := r.conn.JoinGroup(group); err != nil {
return fmt.Errorf("joining solicited-node multicast group %s for %q failed: %v", group, ip, err)
}
klog.InfoS("Joined solicited-node multicast group", "group", group, "interface", r.iface.Name)
}
klog.InfoS("Assigned IP to NDP responder", "ip", ip, "interface", r.iface.Name)
r.multicastGroups[key]++
r.multicastGroups[group.String()]++
r.assignedIPs.Insert(ip.String())
return nil
}(); err != nil {
Expand All @@ -188,21 +187,48 @@ func (r *ndpResponder) RemoveIP(ip net.IP) error {
if !r.assignedIPs.Has(ip.String()) {
return nil
}
group, key := parseIPv6SolicitedNodeMulticastAddress(ip)
if r.multicastGroups[key] == 1 {
group := parseIPv6SolicitedNodeMulticastAddress(ip)
if r.multicastGroups[group.String()] == 1 {
if err := r.conn.LeaveGroup(group); err != nil {
return fmt.Errorf("leaving solicited-node multicast group %s for %q failed: %v", group, ip, err)
}
klog.InfoS("Left solicited-node multicast group", "group", group, "interface", r.iface.Name)
delete(r.multicastGroups, key)
delete(r.multicastGroups, group.String())
} else {
r.multicastGroups[key]--
r.multicastGroups[group.String()]--
}
r.assignedIPs.Delete(ip.String())
klog.InfoS("Removed IP from NDP responder", "ip", ip, "interface", r.iface.Name)
return nil
}

func (r *ndpResponder) ReplaceIPs(ips []net.IP) error {
r.mutex.Lock()
defer r.mutex.Unlock()
for group := range r.multicastGroups {
if err := r.conn.LeaveGroup(net.ParseIP(group)); err != nil {
return err
}
}
newGroups := make(map[string]int)
newIPs := make([]string, 0, len(ips))
for _, ip := range ips {
if utilnet.IsIPv6(ip) {
newIPs = append(newIPs, ip.String())
group := parseIPv6SolicitedNodeMulticastAddress(ip)
newGroups[group.String()]++
}
}
for group := range newGroups {
if err := r.conn.JoinGroup(net.ParseIP(group)); err != nil {
return err
}
}
r.assignedIPs = sets.NewString(newIPs...)
r.multicastGroups = newGroups
return nil
}

func (r *ndpResponder) isIPAssigned(ip net.IP) bool {
r.mutex.Lock()
defer r.mutex.Unlock()
Expand Down
Loading

0 comments on commit 2ded772

Please sign in to comment.