Skip to content

Commit

Permalink
Fix Egress not working with kube-proxy IPVS strictARP mode
Browse files Browse the repository at this point in the history
Check the arp_ignore sysctl value of the transport interface
and start a userspace ARP responder if it has a value other
than 0.
Copy the assigned IPs on the dummy interface to the ARP/NDP
responder on initializing to fix an issue that the responders
may not work as expected after the agent restarts.

Fixes: antrea-io#3804

Signed-off-by: Xu Liu <xliu2@vmware.com>
  • Loading branch information
xliuxu committed May 31, 2022
1 parent 5efac86 commit d97a262
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 34 deletions.
20 changes: 10 additions & 10 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,9 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {
return
}

c.removeStaleEgressIPs()
if err := c.replaceEgressIPs(); err != nil {
klog.ErrorS(err, "failed to replace Egress IPs")
}

go wait.NonSlidingUntil(c.watchEgressGroup, 5*time.Second, stopCh)

Expand All @@ -324,23 +326,21 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {
<-stopCh
}

// removeStaleEgressIPs unassigns stale Egress IPs that shouldn't be present on this Node.
// These Egresses were either deleted from the Kubernetes API or migrated to other Nodes when the agent on this Node
// was not running.
func (c *EgressController) removeStaleEgressIPs() {
// replaceEgressIPs unassigns stale Egress IPs that shouldn't be present on this Node and assigns the missing IPs
// on this node. The unassigned IPs are from Egresses that were either deleted from the Kubernetes API or migrated
// to other Nodes when the agent on this Node was not running.
func (c *EgressController) replaceEgressIPs() error {
desiredLocalEgressIPs := sets.NewString()
egresses, _ := c.egressLister.List(labels.Everything())
for _, egress := range egresses {
if egress.Spec.EgressIP != "" && egress.Spec.ExternalIPPool != "" && egress.Status.EgressNode == c.nodeName {
desiredLocalEgressIPs.Insert(egress.Spec.EgressIP)
}
}
actualLocalEgressIPs := c.ipAssigner.AssignedIPs()
for ip := range actualLocalEgressIPs.Difference(desiredLocalEgressIPs) {
if err := c.ipAssigner.UnassignIP(ip); err != nil {
klog.ErrorS(err, "Failed to clean up stale Egress IP", "ip", ip)
}
if err := c.ipAssigner.InitIPs(desiredLocalEgressIPs); err != nil {
return err
}
return nil
}

// worker is a long-running function that will continually call the processNextWorkItem function in
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/ipassigner/ip_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type IPAssigner interface {
UnassignIP(ip string) error
// AssignedIPs return the IPs that are assigned to the system by this IPAssigner.
AssignedIPs() sets.String
// InitIPs ensures the IPs that are assigned to the system match the given IPs.
InitIPs(sets.String) error
// Run starts the IP assigner.
Run(<-chan struct{})
}
98 changes: 81 additions & 17 deletions pkg/agent/ipassigner/ip_assigner_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"antrea.io/antrea/pkg/agent/ipassigner/responder"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/util/sysctl"
)

// ipAssigner creates a dummy device and assigns IPs to it.
Expand Down Expand Up @@ -60,11 +61,22 @@ func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (*ipAs
assignedIPs: sets.NewString(),
}
if ipv4 != nil {
arpResonder, err := responder.NewARPResponder(externalInterface)
// For the Egress scenario, the external IPs should always be present on the dummy
// interface as they are used as tunnel endpoints. If arp_ignore is set to a value
// other than 0, the host will not reply to ARP requests received on the transport
// interface when the target IPs are assigned on the dummy interface. So a userspace
// ARP responder is needed to handle ARP requests for the Egress IPs.
arpIgnore, err := getARPIgnoreForInterface(externalInterface.Name)
if err != nil {
return nil, fmt.Errorf("failed to create ARP responder for link %s: %v", externalInterface.Name, err)
return nil, err
}
if dummyDeviceName == "" || arpIgnore > 0 {
arpResonder, err := responder.NewARPResponder(externalInterface)
if err != nil {
return nil, fmt.Errorf("failed to create ARP responder for link %s: %v", externalInterface.Name, err)
}
a.arpResponder = arpResonder
}
a.arpResponder = arpResonder
}
if ipv6 != nil {
ndpResponder, err := responder.NewNDPResponder(externalInterface)
Expand All @@ -79,13 +91,27 @@ func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (*ipAs
return nil, fmt.Errorf("error when ensuring dummy device exists: %v", err)
}
a.dummyDevice = dummyDevice
if err := a.loadIPAddresses(); err != nil {
return nil, fmt.Errorf("error when loading IP addresses from the system: %v", err)
}
}
return a, nil
}

// getARPIgnoreForInterface gets the max value of conf/{all,interface}/arp_ignore form sysctl.
func getARPIgnoreForInterface(iface string) (int, error) {
arpIgnoreAll, err := sysctl.GetSysctlNet("ipv4/conf/all/arp_ignore")
if err != nil {
return 0, fmt.Errorf("failed to get arp_ignore for all interfaces: %w", err)
}
arpIgnore, err := sysctl.GetSysctlNet(fmt.Sprintf("ipv4/conf/%s/arp_ignore", iface))
if err != nil {
return 0, fmt.Errorf("failed to get arp_ignore for %s: %w", iface, err)
}
if arpIgnore > arpIgnoreAll {
return arpIgnore, nil
}
return arpIgnoreAll, nil

}

// ensureDummyDevice creates the dummy device if it doesn't exist.
func ensureDummyDevice(deviceName string) (netlink.Link, error) {
link, err := netlink.LinkByName(deviceName)
Expand All @@ -102,19 +128,16 @@ func ensureDummyDevice(deviceName string) (netlink.Link, error) {
}

// loadIPAddresses gets the IP addresses on the dummy device and caches them in memory.
func (a *ipAssigner) loadIPAddresses() error {
func (a *ipAssigner) loadIPAddresses() (sets.String, error) {
addresses, err := netlink.AddrList(a.dummyDevice, netlink.FAMILY_ALL)
if err != nil {
return err
return nil, err
}
newAssignIPs := sets.NewString()
for _, address := range addresses {
newAssignIPs.Insert(address.IP.String())
}
a.mutex.Lock()
defer a.mutex.Unlock()
a.assignedIPs = newAssignIPs
return nil
return newAssignIPs, nil
}

// AssignIP ensures the provided IP is assigned to the dummy device and the ARP/NDP responders.
Expand Down Expand Up @@ -208,13 +231,54 @@ func (a *ipAssigner) AssignedIPs() sets.String {
return a.assignedIPs.Union(nil)
}

// InitIPs loads the IPs from the dummy device and replaces the IPs that are assigned to it
// with the given ones. This function also adds the given IPs to the ARP/NDP responder if
// applicable. It can be used to recover the IP assigner to the desired state after Agent restarts.
func (a *ipAssigner) InitIPs(ips sets.String) error {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.dummyDevice != nil {
assigned, err := a.loadIPAddresses()
if err != nil {
return fmt.Errorf("error when loading IP addresses from the system: %v", err)
}
for ip := range ips.Difference(assigned) {
addr := util.NewIPNet(net.ParseIP(ip))
if err := netlink.AddrAdd(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil {
if !errors.Is(err, unix.EEXIST) {
return fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
}
}
}
for ip := range assigned.Difference(ips) {
addr := util.NewIPNet(net.ParseIP(ip))
if err := netlink.AddrDel(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil {
if !errors.Is(err, unix.EADDRNOTAVAIL) {
return fmt.Errorf("failed to delete IP %v from interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
}
}
}
}
for ipStr := range ips {
ip := net.ParseIP(ipStr)
var err error
if utilnet.IsIPv4(ip) && a.arpResponder != nil {
err = a.arpResponder.AddIP(ip)
}
if utilnet.IsIPv6(ip) && a.ndpResponder != nil {
err = a.ndpResponder.AddIP(ip)
}
if err != nil {
return err
}
}
a.assignedIPs = ips.Union(nil)
return nil
}

// Run starts the ARP responder and NDP responder.
func (a *ipAssigner) Run(ch <-chan struct{}) {
// Start the ARP responder only when the dummy device is not created. The kernel will handle ARP requests
// for IPs assigned to the dummy devices by default.
// TODO: Check the arp_ignore sysctl parameter of the transport interface to determine whether to start
// the ARP responder or not.
if a.dummyDevice == nil && a.arpResponder != nil {
if a.arpResponder != nil {
go a.arpResponder.Run(ch)
}
if a.ndpResponder != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/ipassigner/ip_assigner_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,9 @@ func (a *ipAssigner) AssignedIPs() sets.String {
return nil
}

func (a *ipAssigner) InitIPs(ips sets.String) error {
return nil
}

func (a *ipAssigner) Run(ch <-chan struct{}) {
}
16 changes: 15 additions & 1 deletion pkg/agent/ipassigner/testing/mock_ipassigner.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions test/integration/agent/ip_assigner_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,25 @@ func TestIPAssigner(t *testing.T) {
require.NoError(t, err, "Failed to list IP addresses")
assert.Equal(t, desiredIPs, actualIPs, "Actual IPs don't match")

// NewIPAssigner should load existing IPs correctly.
newIPAssigner, err := ipassigner.NewIPAssigner(nodeLinkName, dummyDeviceName)
require.NoError(t, err, "Initializing new IP assigner failed")
assert.Equal(t, desiredIPs, newIPAssigner.AssignedIPs(), "Assigned IPs don't match")
assert.Equal(t, sets.NewString(), newIPAssigner.AssignedIPs(), "Assigned IPs don't match")

for ip := range desiredIPs {
err = ipAssigner.UnassignIP(ip)
ip4 := "2021:124:6020:1006:250:56ff:fea7:36c4"
newDesiredIPs := sets.NewString(ip1, ip2, ip4)
err = newIPAssigner.InitIPs(newDesiredIPs)
require.NoError(t, err, "InitIPs failed")
assert.Equal(t, newDesiredIPs, newIPAssigner.AssignedIPs(), "Assigned IPs don't match")

actualIPs, err = listIPAddresses(dummyDevice)
require.NoError(t, err, "Failed to list IP addresses")
assert.Equal(t, newDesiredIPs, actualIPs, "Actual IPs don't match")

for ip := range newDesiredIPs {
err = newIPAssigner.UnassignIP(ip)
assert.NoError(t, err, "Failed to unassign a valid IP")
}

assert.Equal(t, sets.NewString(), ipAssigner.AssignedIPs(), "Assigned IPs don't match")
assert.Equal(t, sets.NewString(), newIPAssigner.AssignedIPs(), "Assigned IPs don't match")

actualIPs, err = listIPAddresses(dummyDevice)
require.NoError(t, err, "Failed to list IP addresses")
Expand Down

0 comments on commit d97a262

Please sign in to comment.