Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce thread locking behaviour to dial phase only #171

Merged
merged 2 commits into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,21 +589,6 @@ type Config struct {
// to 0.
NetNS int

// DisableNSLockThread disables package netlink's default goroutine thread
// locking behavior.
//
// By default, the library will lock the processing goroutine to its
// corresponding OS thread in order to enable communication over netlink to
// a different network namespace.
//
// If the caller already knows that the netlink socket is in the same
// namespace as the calling thread, this can introduce a performance
// impact. This option disables the OS thread locking behavior if
// performance considerations are of interest.
//
// If disabled, it is the responsibility of the caller to make sure that all
// threads are running in the correct namespace.
//
// When DisableNSLockThread is set, the caller cannot set the NetNS value.
// DisableNSLockThread is deprecated and has no effect.
DisableNSLockThread bool
}
306 changes: 67 additions & 239 deletions conn_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package netlink

import (
"errors"
"math"
"os"
"runtime"
Expand Down Expand Up @@ -46,7 +45,7 @@ type socket interface {
GetSockoptInt(level, opt int) (int, error)
}

// dial is the entry point for Dial. dial opens a netlink socket using
// dial is the entry point for Dial. dial opens a netlink socket using
// system calls, and returns its PID.
func dial(family int, config *Config) (*conn, uint32, error) {
// Prepare sysSocket's internal loop and create the socket.
Expand All @@ -58,11 +57,36 @@ func dial(family int, config *Config) (*conn, uint32, error) {
config = &Config{}
}

sock, err := newSysSocket(config)
if err != nil {
return nil, 0, err
// The caller has indicated it wants the netlink socket to be created
// inside another network namespace.
if config.NetNS != 0 {

runtime.LockOSThread()
defer runtime.UnlockOSThread()

// Retrieve and store the calling OS thread's network namespace so
// the thread can be reassigned to it after creating a socket in another
// network namespace.
threadNS, err := threadNetNS()
if err != nil {
return nil, 0, err
}
// Always close the netns handle created above.
defer threadNS.Close()

// Assign the current OS thread the goroutine is locked to to the given
// network namespace.
if err := threadNS.Set(config.NetNS); err != nil {
return nil, 0, err
}

// Thread's namespace has been successfully set. Return the thread
// back to its original namespace after attempting to create the
// netlink socket.
defer threadNS.Restore()
}

sock := &sysSocket{}
if err := sock.Socket(family); err != nil {
return nil, 0, os.NewSyscallError("socket", err)
}
Expand Down Expand Up @@ -380,35 +404,6 @@ type sysSocket struct {
mu sync.RWMutex
fd *os.File
closed bool
g *lockedNetNSGoroutine
}

// newSysSocket creates a sysSocket that optionally locks its internal goroutine
// to a single thread.
func newSysSocket(config *Config) (*sysSocket, error) {
// Determine network namespaces using the threadNetNS function.
g, err := newLockedNetNSGoroutine(config.NetNS, threadNetNS, !config.DisableNSLockThread)
if err != nil {
return nil, err
}
return &sysSocket{
g: g,
}, nil
}

// do runs f in a worker goroutine which can be locked to one thread.
func (s *sysSocket) do(f func()) error {
// All operations handled by this function are assumed to only
// read from s.done.
s.mu.RLock()
defer s.mu.RUnlock()

if s.closed {
return syscall.EBADF
}

s.g.run(f)
return nil
}

// read executes f, a read function, against the associated file descriptor.
Expand All @@ -420,11 +415,7 @@ func (s *sysSocket) read(f func(fd int) bool) error {
return syscall.EBADF
}

var err error
s.g.run(func() {
err = fdread(s.fd, f)
})
return err
return fdread(s.fd, f)
}

// write executes f, a write function, against the associated file descriptor.
Expand All @@ -436,11 +427,7 @@ func (s *sysSocket) write(f func(fd int) bool) error {
return syscall.EBADF
}

var err error
s.g.run(func() {
err = fdwrite(s.fd, f)
})
return err
return fdwrite(s.fd, f)
}

// control executes f, a control function, against the associated file descriptor.
Expand All @@ -452,67 +439,51 @@ func (s *sysSocket) control(f func(fd int)) error {
return syscall.EBADF
}

var err error
s.g.run(func() {
err = fdcontrol(s.fd, f)
})
return err
return fdcontrol(s.fd, f)
}

func (s *sysSocket) Socket(family int) error {
var (
fd int
err error
)

doErr := s.do(func() {
// Mirror what the standard library does when creating file
// descriptors: avoid racing a fork/exec with the creation
// of new file descriptors, so that child processes do not
// inherit netlink socket file descriptors unexpectedly.
//
// On Linux, SOCK_CLOEXEC was introduced in 2.6.27. OTOH,
// Go supports Linux 2.6.23 and above. If we get EINVAL on
// the first try, it may be that we are running on a kernel
// older than 2.6.27. In that case, take syscall.ForkLock
// and try again without SOCK_CLOEXEC.
//
// SOCK_NONBLOCK was also added in 2.6.27, but we don't
// use SOCK_NONBLOCK here for now, not until we remove support
// for Go 1.11, since we still support the old blocking file
// descriptor behavior.
//
// For a more thorough explanation, see similar work in the
// Go tree: func sysSocket in net/sock_cloexec.go, as well
// as the detailed comment in syscall/exec_unix.go.
//
// TODO(acln): update this to mirror net.sysSocket completely:
// use SOCK_NONBLOCK as well, and remove the separate
// setBlockingMode step once Go 1.11 support is removed and
// we switch to using entirely non-blocking file descriptors.
// Mirror what the standard library does when creating file
// descriptors: avoid racing a fork/exec with the creation
// of new file descriptors, so that child processes do not
// inherit netlink socket file descriptors unexpectedly.
//
// On Linux, SOCK_CLOEXEC was introduced in 2.6.27. OTOH,
// Go supports Linux 2.6.23 and above. If we get EINVAL on
// the first try, it may be that we are running on a kernel
// older than 2.6.27. In that case, take syscall.ForkLock
// and try again without SOCK_CLOEXEC.
//
// SOCK_NONBLOCK was also added in 2.6.27, but we don't
// use SOCK_NONBLOCK here for now, not until we remove support
// for Go 1.11, since we still support the old blocking file
// descriptor behavior.
//
// For a more thorough explanation, see similar work in the
// Go tree: func sysSocket in net/sock_cloexec.go, as well
// as the detailed comment in syscall/exec_unix.go.
//
// TODO(acln): update this to mirror net.sysSocket completely:
// use SOCK_NONBLOCK as well, and remove the separate
// setBlockingMode step once Go 1.11 support is removed and
// we switch to using entirely non-blocking file descriptors.
fd, err := unix.Socket(
unix.AF_NETLINK,
unix.SOCK_RAW|unix.SOCK_CLOEXEC,
family,
)
if err == unix.EINVAL {
syscall.ForkLock.RLock()
fd, err = unix.Socket(
unix.AF_NETLINK,
unix.SOCK_RAW|unix.SOCK_CLOEXEC,
unix.SOCK_RAW,
family,
)
if err == unix.EINVAL {
syscall.ForkLock.RLock()
fd, err = unix.Socket(
unix.AF_NETLINK,
unix.SOCK_RAW,
family,
)
if err == nil {
unix.CloseOnExec(fd)
}
syscall.ForkLock.RUnlock()
if err == nil {
unix.CloseOnExec(fd)
}
})
if doErr != nil {
return doErr
}
if err != nil {
return err
syscall.ForkLock.RUnlock()
}

if err := setBlockingMode(fd); err != nil {
Expand Down Expand Up @@ -554,9 +525,6 @@ func (s *sysSocket) Close() error {
err := s.fd.Close()
s.closed = true

// Stop the associated goroutine and wait for it to return.
s.g.stop()

return err
}

Expand Down Expand Up @@ -692,143 +660,3 @@ func ready(err error) bool {
return true
}
}

// lockedNetNSGoroutine is a worker goroutine locked to an operating system
// thread, optionally configured to run in a non-default network namespace.
type lockedNetNSGoroutine struct {
wg sync.WaitGroup
doneC chan struct{}
funcC chan func()
}

// newLockedNetNSGoroutine creates a lockedNetNSGoroutine that will enter the
// specified network namespace netNS (by file descriptor), and will use the
// getNS function to produce netNS handles.
func newLockedNetNSGoroutine(netNS int, getNS func() (*netNS, error), lockThread bool) (*lockedNetNSGoroutine, error) {
// Any bare syscall errors (e.g. setns) should be wrapped with
// os.NewSyscallError for the remainder of this function.

// If the caller has instructed us to not lock OS thread but also attempts
// to set a namespace, return an error.
if !lockThread && netNS != 0 {
return nil, errors.New("netlink Conn attempted to set a namespace with OS thread locking disabled")
}

callerNS, err := getNS()
if err != nil {
return nil, err
}
defer callerNS.Close()

g := &lockedNetNSGoroutine{
doneC: make(chan struct{}),
funcC: make(chan func()),
}

errC := make(chan error)
g.wg.Add(1)

go func() {
// It is important to lock this goroutine to its OS thread for the duration
// of the netlink socket being used, or else the kernel may end up routing
// messages to the wrong places.
// See: http://lists.infradead.org/pipermail/libnl/2017-February/002293.html.
//
//
// In addition, the OS thread must also remain locked because we attempt
// to manipulate the network namespace of the thread within this goroutine.
//
// The intent is to never unlock the OS thread, so that the thread
// will terminate when the goroutine exits starting in Go 1.10:
// https://go-review.googlesource.com/c/go/+/46038.
//
// However, due to recent instability and a potential bad interaction
// with the Go runtime for threads which are not unlocked, we have
// elected to temporarily unlock the thread when the goroutine terminates:
// https://github.com/golang/go/issues/25128#issuecomment-410764489.
//
// Locking the thread is not implemented if the caller explicitly asks
// for an unlocked thread.

// Only lock the tread, if the lockThread is set.
if lockThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer g.wg.Done()

// Get the current namespace of the thread the goroutine is locked to.
threadNS, err := getNS()
if err != nil {
errC <- err
return
}
defer threadNS.Close()

// Attempt to set the network namespace of the current thread to either:
// - the namespace referred to by the provided file descriptor from config
// - the calling thread's namespace
//
// See the rules specified in the Config.NetNS documentation.
explicitNS := true
if netNS == 0 {
explicitNS = false
netNS = int(callerNS.FD())
}

// Only return an error if the network namespace was explicitly
// configured; implicit configuration by zero value should be ignored.
err = threadNS.Set(netNS)
switch {
case err != nil && explicitNS:
errC <- err
return
case err == nil:
// If the thread's namespace has been successfully manipulated,
// make sure we change it back when the goroutine returns.
defer threadNS.Restore()
default:
// We couldn't successfully set the namespace, but the caller didn't
// explicitly ask for it to be set either. Continue.
}

// Signal to caller that initialization was successful.
errC <- nil

for {
select {
case <-g.doneC:
return
case f := <-g.funcC:
f()
}
}
}()

// Wait for the goroutine to return err or nil.
if err := <-errC; err != nil {
return nil, err
}

return g, nil
}

// stop signals the goroutine to stop and blocks until it does.
//
// It is invalid to call run concurrently with stop. It is also invalid to
// call run after stop has returned.
func (g *lockedNetNSGoroutine) stop() {
close(g.doneC)
g.wg.Wait()
}

// run runs f on the worker goroutine.
func (g *lockedNetNSGoroutine) run(f func()) {
done := make(chan struct{})
g.funcC <- func() {
defer close(done)
f()
}
<-done
}
Loading