Skip to content

Commit

Permalink
[WIP] Reduce thread locking behaviour to dial phase only
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-mo committed Jul 28, 2020
1 parent c5f8ab7 commit c48ae9b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 318 deletions.
17 changes: 1 addition & 16 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,21 +589,6 @@ type Config struct {
// to 0.
NetNS int

// DisableNSLockThread disables package netlink's default goroutine thread
// locking behavior.
//
// By default, the library will lock the processing goroutine to its
// corresponding OS thread in order to enable communication over netlink to
// a different network namespace.
//
// If the caller already knows that the netlink socket is in the same
// namespace as the calling thread, this can introduce a performance
// impact. This option disables the OS thread locking behavior if
// performance considerations are of interest.
//
// If disabled, it is the responsibility of the caller to make sure that all
// threads are running in the correct namespace.
//
// When DisableNSLockThread is set, the caller cannot set the NetNS value.
// DisableNSLockThread is deprecated.
DisableNSLockThread bool
}
306 changes: 67 additions & 239 deletions conn_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package netlink

import (
"errors"
"math"
"os"
"runtime"
Expand Down Expand Up @@ -46,7 +45,7 @@ type socket interface {
GetSockoptInt(level, opt int) (int, error)
}

// dial is the entry point for Dial. dial opens a netlink socket using
// dial is the entry point for Dial. dial opens a netlink socket using
// system calls, and returns its PID.
func dial(family int, config *Config) (*conn, uint32, error) {
// Prepare sysSocket's internal loop and create the socket.
Expand All @@ -58,11 +57,36 @@ func dial(family int, config *Config) (*conn, uint32, error) {
config = &Config{}
}

sock, err := newSysSocket(config)
if err != nil {
return nil, 0, err
// The caller has indicated it wants the netlink socket to be created
// inside another network namespace.
if config.NetNS != 0 {

runtime.LockOSThread()
defer runtime.UnlockOSThread()

// Retrieve and store the calling OS thread's network namespace so
// the thread can be reassigned to it after creating a socket in another
// network namespace.
threadNS, err := threadNetNS()
if err != nil {
return nil, 0, err
}
// Always close the netns handle created above.
defer threadNS.Close()

// Assign the current OS thread the goroutine is locked to to the given
// network namespace.
if err := threadNS.Set(config.NetNS); err != nil {
return nil, 0, err
}

// Thread's namespace has been successfully set. Return the thread
// back to its original namespace after attempting to create the
// netlink socket.
defer threadNS.Restore()
}

sock := &sysSocket{}
if err := sock.Socket(family); err != nil {
return nil, 0, os.NewSyscallError("socket", err)
}
Expand Down Expand Up @@ -380,35 +404,6 @@ type sysSocket struct {
mu sync.RWMutex
fd *os.File
closed bool
g *lockedNetNSGoroutine
}

// newSysSocket creates a sysSocket that optionally locks its internal goroutine
// to a single thread.
func newSysSocket(config *Config) (*sysSocket, error) {
// Determine network namespaces using the threadNetNS function.
g, err := newLockedNetNSGoroutine(config.NetNS, threadNetNS, !config.DisableNSLockThread)
if err != nil {
return nil, err
}
return &sysSocket{
g: g,
}, nil
}

// do runs f in a worker goroutine which can be locked to one thread.
func (s *sysSocket) do(f func()) error {
// All operations handled by this function are assumed to only
// read from s.done.
s.mu.RLock()
defer s.mu.RUnlock()

if s.closed {
return syscall.EBADF
}

s.g.run(f)
return nil
}

// read executes f, a read function, against the associated file descriptor.
Expand All @@ -420,11 +415,7 @@ func (s *sysSocket) read(f func(fd int) bool) error {
return syscall.EBADF
}

var err error
s.g.run(func() {
err = fdread(s.fd, f)
})
return err
return fdread(s.fd, f)
}

// write executes f, a write function, against the associated file descriptor.
Expand All @@ -436,11 +427,7 @@ func (s *sysSocket) write(f func(fd int) bool) error {
return syscall.EBADF
}

var err error
s.g.run(func() {
err = fdwrite(s.fd, f)
})
return err
return fdwrite(s.fd, f)
}

// control executes f, a control function, against the associated file descriptor.
Expand All @@ -452,67 +439,51 @@ func (s *sysSocket) control(f func(fd int)) error {
return syscall.EBADF
}

var err error
s.g.run(func() {
err = fdcontrol(s.fd, f)
})
return err
return fdcontrol(s.fd, f)
}

func (s *sysSocket) Socket(family int) error {
var (
fd int
err error
)

doErr := s.do(func() {
// Mirror what the standard library does when creating file
// descriptors: avoid racing a fork/exec with the creation
// of new file descriptors, so that child processes do not
// inherit netlink socket file descriptors unexpectedly.
//
// On Linux, SOCK_CLOEXEC was introduced in 2.6.27. OTOH,
// Go supports Linux 2.6.23 and above. If we get EINVAL on
// the first try, it may be that we are running on a kernel
// older than 2.6.27. In that case, take syscall.ForkLock
// and try again without SOCK_CLOEXEC.
//
// SOCK_NONBLOCK was also added in 2.6.27, but we don't
// use SOCK_NONBLOCK here for now, not until we remove support
// for Go 1.11, since we still support the old blocking file
// descriptor behavior.
//
// For a more thorough explanation, see similar work in the
// Go tree: func sysSocket in net/sock_cloexec.go, as well
// as the detailed comment in syscall/exec_unix.go.
//
// TODO(acln): update this to mirror net.sysSocket completely:
// use SOCK_NONBLOCK as well, and remove the separate
// setBlockingMode step once Go 1.11 support is removed and
// we switch to using entirely non-blocking file descriptors.
// Mirror what the standard library does when creating file
// descriptors: avoid racing a fork/exec with the creation
// of new file descriptors, so that child processes do not
// inherit netlink socket file descriptors unexpectedly.
//
// On Linux, SOCK_CLOEXEC was introduced in 2.6.27. OTOH,
// Go supports Linux 2.6.23 and above. If we get EINVAL on
// the first try, it may be that we are running on a kernel
// older than 2.6.27. In that case, take syscall.ForkLock
// and try again without SOCK_CLOEXEC.
//
// SOCK_NONBLOCK was also added in 2.6.27, but we don't
// use SOCK_NONBLOCK here for now, not until we remove support
// for Go 1.11, since we still support the old blocking file
// descriptor behavior.
//
// For a more thorough explanation, see similar work in the
// Go tree: func sysSocket in net/sock_cloexec.go, as well
// as the detailed comment in syscall/exec_unix.go.
//
// TODO(acln): update this to mirror net.sysSocket completely:
// use SOCK_NONBLOCK as well, and remove the separate
// setBlockingMode step once Go 1.11 support is removed and
// we switch to using entirely non-blocking file descriptors.
fd, err := unix.Socket(
unix.AF_NETLINK,
unix.SOCK_RAW|unix.SOCK_CLOEXEC,
family,
)
if err == unix.EINVAL {
syscall.ForkLock.RLock()
fd, err = unix.Socket(
unix.AF_NETLINK,
unix.SOCK_RAW|unix.SOCK_CLOEXEC,
unix.SOCK_RAW,
family,
)
if err == unix.EINVAL {
syscall.ForkLock.RLock()
fd, err = unix.Socket(
unix.AF_NETLINK,
unix.SOCK_RAW,
family,
)
if err == nil {
unix.CloseOnExec(fd)
}
syscall.ForkLock.RUnlock()
if err == nil {
unix.CloseOnExec(fd)
}
})
if doErr != nil {
return doErr
}
if err != nil {
return err
syscall.ForkLock.RUnlock()
}

if err := setBlockingMode(fd); err != nil {
Expand Down Expand Up @@ -554,9 +525,6 @@ func (s *sysSocket) Close() error {
err := s.fd.Close()
s.closed = true

// Stop the associated goroutine and wait for it to return.
s.g.stop()

return err
}

Expand Down Expand Up @@ -692,143 +660,3 @@ func ready(err error) bool {
return true
}
}

// lockedNetNSGoroutine is a worker goroutine locked to an operating system
// thread, optionally configured to run in a non-default network namespace.
type lockedNetNSGoroutine struct {
wg sync.WaitGroup
doneC chan struct{}
funcC chan func()
}

// newLockedNetNSGoroutine creates a lockedNetNSGoroutine that will enter the
// specified network namespace netNS (by file descriptor), and will use the
// getNS function to produce netNS handles.
func newLockedNetNSGoroutine(netNS int, getNS func() (*netNS, error), lockThread bool) (*lockedNetNSGoroutine, error) {
// Any bare syscall errors (e.g. setns) should be wrapped with
// os.NewSyscallError for the remainder of this function.

// If the caller has instructed us to not lock OS thread but also attempts
// to set a namespace, return an error.
if !lockThread && netNS != 0 {
return nil, errors.New("netlink Conn attempted to set a namespace with OS thread locking disabled")
}

callerNS, err := getNS()
if err != nil {
return nil, err
}
defer callerNS.Close()

g := &lockedNetNSGoroutine{
doneC: make(chan struct{}),
funcC: make(chan func()),
}

errC := make(chan error)
g.wg.Add(1)

go func() {
// It is important to lock this goroutine to its OS thread for the duration
// of the netlink socket being used, or else the kernel may end up routing
// messages to the wrong places.
// See: http://lists.infradead.org/pipermail/libnl/2017-February/002293.html.
//
//
// In addition, the OS thread must also remain locked because we attempt
// to manipulate the network namespace of the thread within this goroutine.
//
// The intent is to never unlock the OS thread, so that the thread
// will terminate when the goroutine exits starting in Go 1.10:
// https://go-review.googlesource.com/c/go/+/46038.
//
// However, due to recent instability and a potential bad interaction
// with the Go runtime for threads which are not unlocked, we have
// elected to temporarily unlock the thread when the goroutine terminates:
// https://github.com/golang/go/issues/25128#issuecomment-410764489.
//
// Locking the thread is not implemented if the caller explicitly asks
// for an unlocked thread.

// Only lock the tread, if the lockThread is set.
if lockThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer g.wg.Done()

// Get the current namespace of the thread the goroutine is locked to.
threadNS, err := getNS()
if err != nil {
errC <- err
return
}
defer threadNS.Close()

// Attempt to set the network namespace of the current thread to either:
// - the namespace referred to by the provided file descriptor from config
// - the calling thread's namespace
//
// See the rules specified in the Config.NetNS documentation.
explicitNS := true
if netNS == 0 {
explicitNS = false
netNS = int(callerNS.FD())
}

// Only return an error if the network namespace was explicitly
// configured; implicit configuration by zero value should be ignored.
err = threadNS.Set(netNS)
switch {
case err != nil && explicitNS:
errC <- err
return
case err == nil:
// If the thread's namespace has been successfully manipulated,
// make sure we change it back when the goroutine returns.
defer threadNS.Restore()
default:
// We couldn't successfully set the namespace, but the caller didn't
// explicitly ask for it to be set either. Continue.
}

// Signal to caller that initialization was successful.
errC <- nil

for {
select {
case <-g.doneC:
return
case f := <-g.funcC:
f()
}
}
}()

// Wait for the goroutine to return err or nil.
if err := <-errC; err != nil {
return nil, err
}

return g, nil
}

// stop signals the goroutine to stop and blocks until it does.
//
// It is invalid to call run concurrently with stop. It is also invalid to
// call run after stop has returned.
func (g *lockedNetNSGoroutine) stop() {
close(g.doneC)
g.wg.Wait()
}

// run runs f on the worker goroutine.
func (g *lockedNetNSGoroutine) run(f func()) {
done := make(chan struct{})
g.funcC <- func() {
defer close(done)
f()
}
<-done
}
Loading

0 comments on commit c48ae9b

Please sign in to comment.