Skip to content

Commit

Permalink
Fix FD leak and mismatch when batch processing CNI ADD (#933)
Browse files Browse the repository at this point in the history
GratuitousArpOverIface in github.com/j-keck/arping is not thread-safe as
it uses global variables to keep socket and Sockaddr. When batch
processing CNI ADD requests, race condition could happen and lead to FD
leak and mismatch as the goroutine that sends gratuitous ARPs may close
and release others' FDs by accident.

This patch implements a thread-safe GratuitousARPOverIface and merges
creating veth pair and configuring IP address into one method to avoid
creating extra threads.
  • Loading branch information
tnqn authored Jul 9, 2020
1 parent ccee9f8 commit 1a1c6b3
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 56 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ require (
github.com/golang/mock v1.2.0
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/j-keck/arping v1.0.0
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.4.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/j-keck/arping v1.0.0 h1:DN6Wy73IeadEEo5xVCgEp+ZGn2xmAypggxj8mtxXBD0=
github.com/j-keck/arping v1.0.0/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down
81 changes: 30 additions & 51 deletions pkg/agent/cniserver/interface_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"github.com/containernetworking/plugins/pkg/ip"
"github.com/containernetworking/plugins/pkg/ipam"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/j-keck/arping"
"github.com/vishvananda/netlink"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/util"
"github.com/vmware-tanzu/antrea/pkg/agent/util/arping"
"github.com/vmware-tanzu/antrea/pkg/agent/util/ethtool"
cnipb "github.com/vmware-tanzu/antrea/pkg/apis/cni/v1beta1"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
Expand All @@ -44,41 +44,6 @@ func newInterfaceConfigurator(ovsDatapathType string) (*ifConfigurator, error) {
return &ifConfigurator{ovsDatapathType: ovsDatapathType}, nil
}

// setupInterfaces creates a veth pair: containerIface is in the container
// network namespace and hostIface is in the host network namespace.
func (ic *ifConfigurator) setupInterfaces(
hostIfaceName, containerIfaceName string,
netns ns.NetNS,
mtu int) (hostIface *current.Interface, containerIface *current.Interface, err error) {
hostIface = &current.Interface{}
containerIface = &current.Interface{}

if err := netns.Do(func(hostNS ns.NetNS) error {
hostVeth, containerVeth, err := ip.SetupVethWithName(containerIfaceName, hostIfaceName, mtu, hostNS)
if err != nil {
return err
}
klog.V(2).Infof("Setup interfaces host: %s, container %s", hostVeth.Name, containerVeth.Name)
containerIface.Name = containerVeth.Name
containerIface.Mac = containerVeth.HardwareAddr.String()
containerIface.Sandbox = netns.Path()
hostIface.Name = hostVeth.Name
hostIface.Mac = hostVeth.HardwareAddr.String()
// OVS netdev datapath doesn't support TX checksum offloading, i.e. if packet
// arrives with bad/no checksum it will be sent to the output port with same bad/no checksum.
if ic.ovsDatapathType == ovsconfig.OVSDatapathNetdev {
if err := ethtool.EthtoolTXHWCsumOff(containerVeth.Name); err != nil {
return fmt.Errorf("error when disabling TX checksum offload on container veth: %v", err)
}
}
return nil
}); err != nil {
return nil, nil, err
}

return hostIface, containerIface, nil
}

// advertiseContainerAddr sends 3 GARP packets in another goroutine with 50ms interval. It's because Openflow entries are
// installed async, and the gratuitous ARP could be sent out after the Openflow entries are installed. Using another
// goroutine to ensure the processing of CNI ADD request is not blocked.
Expand Down Expand Up @@ -109,7 +74,9 @@ func (ic *ifConfigurator) advertiseContainerAddr(containerNetNS string, containe
for {
// Send gratuitous ARP to network in case of stale mappings for this IP address
// (e.g. if a previous - deleted - Pod was using the same IP).
arping.GratuitousArpOverIface(targetIP, *iface)
if err := arping.GratuitousARPOverIface(targetIP, iface); err != nil {
klog.Warningf("Failed to send gratuitous ARP #%d: %v", count, err)
}
count++
if count == 3 {
break
Expand All @@ -121,6 +88,8 @@ func (ic *ifConfigurator) advertiseContainerAddr(containerNetNS string, containe
return nil
}

// configureContainerLink creates a veth pair: one in the container netns and one in the host netns, and configures IP
// address and routes to the container veth.
func (ic *ifConfigurator) configureContainerLink(
podName string,
podNamespace string,
Expand All @@ -130,25 +99,35 @@ func (ic *ifConfigurator) configureContainerLink(
mtu int,
result *current.Result,
) error {
netns, err := ns.GetNS(containerNetNS)
if err != nil {
return fmt.Errorf("failed to open netns %s: %v", containerNetNS, err)
}
defer netns.Close()
// Create veth pair and link up
hostIfaceName := util.GenerateContainerInterfaceName(podName, podNamespace, containerID)
hostIface, containerIface, err := ic.setupInterfaces(hostIfaceName, containerIfaceName, netns, mtu)
if err != nil {
return fmt.Errorf("failed to create veth devices for container %s: %v", containerID, err)
}

hostIface := &current.Interface{Name: hostIfaceName}
containerIface := &current.Interface{Name: containerIfaceName, Sandbox: containerNetNS}
result.Interfaces = []*current.Interface{hostIface, containerIface}
if err := ns.WithNetNSPath(containerNetNS, func(hostNS ns.NetNS) error {
klog.V(2).Infof("Creating veth devices (%s, %s) for container %s", containerIfaceName, hostIfaceName, containerID)
hostVeth, containerVeth, err := ip.SetupVethWithName(containerIfaceName, hostIfaceName, mtu, hostNS)
if err != nil {
return fmt.Errorf("failed to create veth devices for container %s: %v", containerID, err)
}
containerIface.Mac = containerVeth.HardwareAddr.String()
hostIface.Mac = hostVeth.HardwareAddr.String()
// OVS netdev datapath doesn't support TX checksum offloading, i.e. if packet
// arrives with bad/no checksum it will be sent to the output port with same bad/no checksum.
if ic.ovsDatapathType == ovsconfig.OVSDatapathNetdev {
if err := ethtool.EthtoolTXHWCsumOff(containerVeth.Name); err != nil {
return fmt.Errorf("error when disabling TX checksum offload on container veth: %v", err)
}
}

klog.V(2).Infof("Configuring IP address for container %s", containerID)
if err := netns.Do(func(_ ns.NetNS) error {
return ipam.ConfigureIface(containerIface.Name, result)
klog.V(2).Infof("Configuring IP address for container %s", containerID)
// result.Interfaces must be set before this.
if err := ipam.ConfigureIface(containerIface.Name, result); err != nil {
return fmt.Errorf("failed to configure IP address for container %s: %v", containerID, err)
}
return nil
}); err != nil {
return fmt.Errorf("failed to configure IP address for container %s: %v", containerID, err)
return err
}
return nil
}
Expand Down
70 changes: 70 additions & 0 deletions pkg/agent/util/arping/arping_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2020 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package arping

import (
"bytes"
"encoding/binary"
"fmt"
"net"
"syscall"
)

const (
// 1544 = htons(ETH_P_ARP)
protoARP = 1544
)

// GratuitousARPOverIface sends an gratuitous arp over interface 'iface' from 'srcIP'.
// It refers to "github.com/j-keck/arping" and is simplified and made thread-safe.
func GratuitousARPOverIface(srcIP net.IP, iface *net.Interface) error {
ipv4 := srcIP.To4()
if ipv4 == nil {
return fmt.Errorf("IPv6 is not supported yet")
}

srcMac := iface.HardwareAddr
broadcastMac := []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
request := newARPRequest(srcMac, ipv4, broadcastMac, ipv4)

toSockaddr := &syscall.SockaddrLinklayer{Ifindex: iface.Index}

sock, err := syscall.Socket(syscall.AF_PACKET, syscall.SOCK_RAW, protoARP)
if err != nil {
return err
}
defer syscall.Close(sock)

return syscall.Sendto(sock, request, 0, toSockaddr)
}

func newARPRequest(sha, spa, tha, tpa []byte) []byte {
frame := bytes.NewBuffer(nil)
// Ethernet header.
frame.Write(tha) // Destination MAC address.
frame.Write(sha) // Source MAC address.
frame.Write([]byte{0x08, 0x06}) // Ethernet protocol type, 0x0806 for ARP.
// ARP message.
binary.Write(frame, binary.BigEndian, uint16(1)) // Hardware Type, Ethernet is 1.
binary.Write(frame, binary.BigEndian, uint16(0x0800)) // Protocol type, IPv4 is 0x0800.
binary.Write(frame, binary.BigEndian, uint8(6)) // Hardware length, Ethernet address length is 6.
binary.Write(frame, binary.BigEndian, uint8(4)) // Protocol length, IPv4 address length is 4.
binary.Write(frame, binary.BigEndian, uint16(1)) // Operation, request is 1.
frame.Write(sha) // Sender hardware address.
frame.Write(spa) // Sender protocol address.
frame.Write(tha) // Target hardware address.
frame.Write(tpa) // Target protocol address.
return frame.Bytes()
}
55 changes: 55 additions & 0 deletions pkg/agent/util/arping/arping_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2020 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package arping

import (
"net"
"reflect"
"testing"
)

func TestNewARPRequest(t *testing.T) {
tests := []struct {
name string
sha []byte
spa []byte
tha []byte
tpa []byte
want []byte
}{
{
name: "Gratuitous ARP",
sha: []byte{0x42, 0xaf, 0xb8, 0x14, 0xcb, 0x4e},
spa: net.ParseIP("192.168.10.1").To4(),
tha: []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff},
tpa: net.ParseIP("192.168.10.1").To4(),
want: []byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x42, 0xaf,
0xb8, 0x14, 0xcb, 0x4e, 0x08, 0x06, 0x00, 0x01,
0x08, 0x00, 0x06, 0x04, 0x00, 0x01, 0x42, 0xaf,
0xb8, 0x14, 0xcb, 0x4e, 0xc0, 0xa8, 0x0a, 0x01,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xc0, 0xa8,
0x0a, 0x01,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := newARPRequest(tt.sha, tt.spa, tt.tha, tt.tpa); !reflect.DeepEqual(got, tt.want) {
t.Errorf("newARPRequest() = %v, want %v", got, tt.want)
}
})
}
}
1 change: 0 additions & 1 deletion plugins/octant/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/j-keck/arping v1.0.0/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw=
Expand Down
59 changes: 59 additions & 0 deletions test/e2e/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2020 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

// TestBatchCreatePods verifies there is no FD leak after batched Pod creation.
func TestBatchCreatePods(t *testing.T) {
data, err := setupTest(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)

batchNum := 20

node1 := workerNodeName(1)
podName, err := data.getAntreaPodOnNode(node1)
assert.NoError(t, err)

getFDs := func() string {
// In case that antrea-agent is not running as Pid 1 in future.
cmds := []string{"pidof", "antrea-agent"}
pid, _, err := data.runCommandFromPod(antreaNamespace, podName, "antrea-agent", cmds)
assert.NoError(t, err)

// Ignore the difference of modification time by specifying "--time-style +".
cmds = []string{"ls", "-l", "--time-style", "+", fmt.Sprintf("/proc/%s/fd/", strings.TrimSpace(pid))}
stdout, _, err := data.runCommandFromPod(antreaNamespace, podName, "antrea-agent", cmds)
assert.NoError(t, err)
return stdout
}

oldFDs := getFDs()

_, _, cleanupFn := createTestBusyboxPods(t, data, batchNum, node1)
defer cleanupFn()

newFDs := getFDs()
assert.Equal(t, oldFDs, newFDs, "FDs were changed after batched Pod creation")
}
9 changes: 8 additions & 1 deletion test/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -223,9 +224,15 @@ func createTestBusyboxPods(tb testing.TB, data *TestData, num int, nodeName stri
podNames []string, podIPs []string, cleanupFn func(),
) {
cleanupFn = func() {
var wg sync.WaitGroup
for _, podName := range podNames {
deletePodWrapper(tb, data, podName)
wg.Add(1)
go func(name string) {
deletePodWrapper(tb, data, name)
wg.Done()
}(podName)
}
wg.Wait()
}

type podData struct {
Expand Down

0 comments on commit 1a1c6b3

Please sign in to comment.