From e9819aebd107ffa4dcc5f1e629c2aef19ecf4820 Mon Sep 17 00:00:00 2001 From: Daniel Sweet Date: Sat, 14 Jul 2018 11:15:49 -0400 Subject: [PATCH 1/4] Eliminate initial connection race condition in win32PipeListener Previously, win32PipeListener would create an initial pipe instance, open a client connection on it, then immediately close that connection. The reasoning behind this was unclear, but it appears to have been a mechanism for claiming the pipe name in the pipe namespace. If the listener-side single client could not be created, the listener would abort. This created a race condition where a legitimate client could attach itself to the named pipe before the listener could create its own client, causing the listener to incorrectly abort due to a legitimate client connection. This commit alters the process for listening for and accepting new connections: when the win32PipeListener is constructed, the first pipe is still created as before, but it is now also used as the first client connection as well. In order to maintain the namespace stake, the listenerRoutine() creates another named pipe instance before returning the connected instance to a pending Accept(), so that there is at least one pending named pipe instance at all times. There are some caveats to this approach: transient errors from CreateNamedPipe will not be returned until the _next_ Accept() call, so there may be instances where calling clients may correct the cause of the error, but still receive the same error across two Accept() calls. Additionally, if transient errors from CreateNamedPipe begin occurring and all other named pipe instances are closed, the listener loses its namespace stake and _may_ accidentally clobber other processes if the transient errors resolve themselves. It may be worth investingating closing the listener "from the inside" if transient errors begin to occur in CreateNamedPipe, so that the calling client is forced to create a new Listener and detect whether its stake in the pipe namespace has been lost. --- pipe.go | 77 +++++++++++++++++++++++++++------------------ pipe_test.go | 24 ++++++++++++++ zsyscall_windows.go | 13 ++++++++ 3 files changed, 84 insertions(+), 30 deletions(-) diff --git a/pipe.go b/pipe.go index 806fd342..ab9e1488 100644 --- a/pipe.go +++ b/pipe.go @@ -13,6 +13,7 @@ import ( ) //sys connectNamedPipe(pipe syscall.Handle, o *syscall.Overlapped) (err error) = ConnectNamedPipe +//sys disconnectNamedPipe(pipe syscall.Handle) (err error) = DisconnectNamedPipe //sys createNamedPipe(name string, flags uint32, pipeMode uint32, maxInstances uint32, outSize uint32, inSize uint32, defaultTimeout uint32, sa *syscall.SecurityAttributes) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateNamedPipeW //sys createFile(name string, access uint32, mode uint32, sa *syscall.SecurityAttributes, createmode uint32, attrs uint32, templatefile syscall.Handle) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateFileW //sys waitNamedPipe(name string, timeout uint32) (err error) = WaitNamedPipeW @@ -202,7 +203,7 @@ type acceptResponse struct { } type win32PipeListener struct { - firstHandle syscall.Handle + nextPipe *win32File path string securityDescriptor []byte config PipeConfig @@ -237,8 +238,8 @@ func makeServerPipeHandle(path string, securityDescriptor []byte, c *PipeConfig, return h, nil } -func (l *win32PipeListener) makeServerPipe() (*win32File, error) { - h, err := makeServerPipeHandle(l.path, l.securityDescriptor, &l.config, false) +func makeServerPipeFirst(path string, securityDescriptor []byte, c *PipeConfig) (*win32File, error) { + h, err := makeServerPipeHandle(path, securityDescriptor, c, true) if err != nil { return nil, err } @@ -250,61 +251,85 @@ func (l *win32PipeListener) makeServerPipe() (*win32File, error) { return f, nil } -func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) { - p, err := l.makeServerPipe() +func (l *win32PipeListener) makeServerPipe() (*win32File, error) { + h, err := makeServerPipeHandle(l.path, l.securityDescriptor, &l.config, false) + if err != nil { + return nil, err + } + f, err := makeWin32File(h) if err != nil { + syscall.Close(h) return nil, err } + return f, nil +} +func (l *win32PipeListener) connectServerPipe() error { + var err error + // Wait for the client to connect. ch := make(chan error) go func(p *win32File) { ch <- connectPipe(p) - }(p) - + }(l.nextPipe) + select { case err = <-ch: if err != nil { - p.Close() - p = nil + disconnectNamedPipe(l.nextPipe.handle) } case <-l.closeCh: // Abort the connect request by closing the handle. - p.Close() - p = nil + l.closeNextPipe() err = <-ch if err == nil || err == ErrFileClosed { err = ErrPipeListenerClosed } } - return p, err + return err +} + +func (l *win32PipeListener) closeNextPipe() (err error) { + // This isn't thread-safe, but all the usage of nextPipe are + // confined to one goroutine, so this is fine. + if l.nextPipe != nil { + err = l.nextPipe.Close() + l.nextPipe = nil + } + return } func (l *win32PipeListener) listenerRoutine() { closed := false for !closed { + var nextErr error select { case <-l.closeCh: closed = true case responseCh := <-l.acceptCh: - var ( - p *win32File - err error - ) + var err error + if nextErr != nil { + responseCh <- acceptResponse{nil, nextErr} + l.nextPipe, nextErr = l.makeServerPipe() + continue + } for { - p, err = l.makeConnectedServerPipe() + err = l.connectServerPipe() // If the connection was immediately closed by the client, try // again. if err != cERROR_NO_DATA { break } } - responseCh <- acceptResponse{p, err} closed = err == ErrPipeListenerClosed + p := l.nextPipe + if !closed { + l.nextPipe, nextErr = l.makeServerPipe() + } + responseCh <- acceptResponse{p, err} } } - syscall.Close(l.firstHandle) - l.firstHandle = 0 + l.closeNextPipe() // Notify Close() and Accept() callers that the handle has been closed. close(l.doneCh) } @@ -345,20 +370,12 @@ func ListenPipe(path string, c *PipeConfig) (net.Listener, error) { return nil, err } } - h, err := makeServerPipeHandle(path, sd, c, true) - if err != nil { - return nil, err - } - // Immediately open and then close a client handle so that the named pipe is - // created but not currently accepting connections. - h2, err := createFile(path, 0, 0, nil, syscall.OPEN_EXISTING, cSECURITY_SQOS_PRESENT|cSECURITY_ANONYMOUS, 0) + p, err := makeServerPipeFirst(path, sd, c) if err != nil { - syscall.Close(h) return nil, err } - syscall.Close(h2) l := &win32PipeListener{ - firstHandle: h, + nextPipe: p, path: path, securityDescriptor: sd, config: *c, diff --git a/pipe_test.go b/pipe_test.go index 38692073..f507b70f 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -30,6 +30,18 @@ func TestDialListenerTimesOut(t *testing.T) { t.Fatal(err) } defer l.Close() + // listener.Listen() always keeps a pipe instance open. Whether + // anyone can connect to it is a matter of whether someone has + // already connected to it, or if so, if the server has Accept()ed + // the connection and has opened a new pipe instance for future + // connections. So, in order to properly test timeouts, we need + // to establish a "blocker" connection that is not Accept()ed, + // blocking future connection attempts in the process. + blocker, err := DialPipe(testPipeName, nil) + if err != nil { + t.Fatal(err) + } + defer blocker.Close() var d = time.Duration(10 * time.Millisecond) _, err = DialPipe(testPipeName, &d) if err != ErrTimeout { @@ -260,6 +272,18 @@ func TestDialTimesOutByDefault(t *testing.T) { t.Fatal(err) } defer l.Close() + // listener.Listen() always keeps a pipe instance open. Whether + // anyone can connect to it is a matter of whether someone has + // already connected to it, or if so, if the server has Accept()ed + // the connection and has opened a new pipe instance for future + // connections. So, in order to properly test timeouts, we need + // to establish a "blocker" connection that is not Accept()ed, + // blocking future connection attempts in the process. + blocker, err := DialPipe(testPipeName, nil) + if err != nil { + t.Fatal(err) + } + defer blocker.Close() _, err = DialPipe(testPipeName, nil) if err != ErrTimeout { t.Fatalf("expected ErrTimeout, got %v", err) diff --git a/zsyscall_windows.go b/zsyscall_windows.go index 3f527639..8fa8da9b 100644 --- a/zsyscall_windows.go +++ b/zsyscall_windows.go @@ -45,6 +45,7 @@ var ( procGetQueuedCompletionStatus = modkernel32.NewProc("GetQueuedCompletionStatus") procSetFileCompletionNotificationModes = modkernel32.NewProc("SetFileCompletionNotificationModes") procConnectNamedPipe = modkernel32.NewProc("ConnectNamedPipe") + procDisconnectNamedPipe = modkernel32.NewProc("DisconnectNamedPipe") procCreateNamedPipeW = modkernel32.NewProc("CreateNamedPipeW") procCreateFileW = modkernel32.NewProc("CreateFileW") procWaitNamedPipeW = modkernel32.NewProc("WaitNamedPipeW") @@ -132,6 +133,18 @@ func connectNamedPipe(pipe syscall.Handle, o *syscall.Overlapped) (err error) { return } +func disconnectNamedPipe(pipe syscall.Handle) (err error) { + r1, _, e1 := syscall.Syscall(procDisconnectNamedPipe.Addr(), 1, uintptr(pipe), 0, 0) + if r1 == 0 { + if e1 != 0 { + err = errnoErr(e1) + } else { + err = syscall.EINVAL + } + } + return +} + func createNamedPipe(name string, flags uint32, pipeMode uint32, maxInstances uint32, outSize uint32, inSize uint32, defaultTimeout uint32, sa *syscall.SecurityAttributes) (handle syscall.Handle, err error) { var _p0 *uint16 _p0, err = syscall.UTF16PtrFromString(name) From 0b7898c5093fc1227609dfc4ba107b9a699ba58f Mon Sep 17 00:00:00 2001 From: Daniel Sweet Date: Sat, 14 Jul 2018 12:42:47 -0400 Subject: [PATCH 2/4] Scope nextErr properly in listenerRoutine --- pipe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipe.go b/pipe.go index ab9e1488..7160967d 100644 --- a/pipe.go +++ b/pipe.go @@ -301,8 +301,8 @@ func (l *win32PipeListener) closeNextPipe() (err error) { func (l *win32PipeListener) listenerRoutine() { closed := false + var nextErr error for !closed { - var nextErr error select { case <-l.closeCh: closed = true From efc7a80639cf1cb8dfa71ba43078a62212ffa2ea Mon Sep 17 00:00:00 2001 From: Daniel Sweet Date: Mon, 16 Jul 2018 08:33:08 -0400 Subject: [PATCH 3/4] Allow for file handle reuse upon internal close --- file.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/file.go b/file.go index 4334ff1c..92fed905 100644 --- a/file.go +++ b/file.go @@ -108,12 +108,24 @@ func makeWin32File(h syscall.Handle) (*win32File, error) { return f, nil } +// If a handle previously associated with a successfully constructed win32File +// needs to be recycled, use _this_ constructor. It doesn't re-add the handle +// to the ioCompletionPort. + +func reuseWin32File(h syscall.Handle) *win32File { + f := &win32File{handle: h} + f.readDeadline.channel = make(timeoutChan) + f.writeDeadline.channel = make(timeoutChan) + return f +} + func MakeOpenFile(h syscall.Handle) (io.ReadWriteCloser, error) { return makeWin32File(h) } // closeHandle closes the resources associated with a Win32 handle -func (f *win32File) closeHandle() { +func (f *win32File) nilHandleReturning() syscall.Handle { + var ret syscall.Handle f.wgLock.Lock() // Atomically set that we are closing, releasing the resources only once. if !f.closing.swap(true) { @@ -122,16 +134,20 @@ func (f *win32File) closeHandle() { cancelIoEx(f.handle, nil) f.wg.Wait() // at this point, no new IO can start - syscall.Close(f.handle) + ret = f.handle f.handle = 0 } else { f.wgLock.Unlock() } + return ret } // Close closes a win32File. func (f *win32File) Close() error { - f.closeHandle() + handle := f.nilHandleReturning() + if handle != 0 { + syscall.Close(handle) + } return nil } From 1c4c364c32423537103d015142d80a61c6edb0cf Mon Sep 17 00:00:00 2001 From: Daniel Sweet Date: Mon, 16 Jul 2018 08:33:16 -0400 Subject: [PATCH 4/4] Reuse pipe instances if listenerRoutine fails to create new instances This commit enforces the invariant that a win32PipeListener guarantees its stake in the pipe namespace by having instances of win32Pipe "donate" their disconnected pipe instances to the listener if the listener failed to create a new instance. Currently, this is done with heavy use of sync.Mutex. The "best case" of the listener successfully creating a successor instance elides one locking call with the use of atomic.LoadPointer; it may be possible to use atomic.CompareAndSwapPointer to achieve the same effect in general. This is worth pursuing further. This commit also fixes a race condition in win32PipeListener.Close, where the close channel send may have been swallowed by connectServerPipe if a client successfully connected before its pipe instance was shut down. --- pipe.go | 122 ++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 97 insertions(+), 25 deletions(-) diff --git a/pipe.go b/pipe.go index 7160967d..66f295b1 100644 --- a/pipe.go +++ b/pipe.go @@ -7,6 +7,8 @@ import ( "io" "net" "os" + "sync" + "sync/atomic" "syscall" "time" "unsafe" @@ -55,6 +57,12 @@ var ( type win32Pipe struct { *win32File path string + // If this instance of a pipe was created by a listener, the Close() + // method may attempt to return its instance to the listener to be + // re-used iff the listener does not already have another instance + // prepared for connection, usually as the consequence of an error + // returned by createNamedPipe. + listener *win32PipeListener } type win32MessageBytePipe struct { @@ -79,6 +87,44 @@ func (f *win32Pipe) SetDeadline(t time.Time) error { return nil } +// This somewhat overrides the win32File implementation, because we're +// sometimes responsible for keeping at least one pipe instance open so +// this process can retain its claim on the pipe name. + +func (f *win32Pipe) Close() error { + // Not all instances of win32Pipe have a listener. In particular, + // instances created with DialPipe definitely don't have a listener. + if f.listener == nil { + return f.win32File.Close() + } + f.listener.nextLock.Lock() + var ( + listenerOpen bool + nextPipe *win32File + ) + select { + case <- f.listener.doneCh: + // pass, default for bool is false + default: + listenerOpen = true + } + nextPipe = (*win32File)(atomic.LoadPointer(&f.listener.nextPipe)) + // If the nextPipe is not nil, this means the listenerRoutine managed + // to successfully fill it. We don't need to touch it in this case. + if nextPipe != nil || !listenerOpen { + f.listener.nextLock.Unlock() + return f.win32File.Close() + } + handle := f.win32File.nilHandleReturning() + disconnectNamedPipe(handle) + // Simply reconnecting the pipe will keep the instance open, meaning + // we keep our name in the pipe namespace. + nextPipe = reuseWin32File(handle) + atomic.StorePointer(&f.listener.nextPipe, unsafe.Pointer(nextPipe)) + f.listener.nextLock.Unlock() + return nil +} + // CloseWrite closes the write side of a message pipe in byte mode. func (f *win32MessageBytePipe) CloseWrite() error { if f.writeClosed { @@ -203,13 +249,16 @@ type acceptResponse struct { } type win32PipeListener struct { - nextPipe *win32File + // this is actually a *win32File, but because of the use of atomic + // we have to keep this as an unsafe.Pointer + nextPipe unsafe.Pointer path string securityDescriptor []byte config PipeConfig acceptCh chan (chan acceptResponse) closeCh chan int doneCh chan int + nextLock sync.Mutex } func makeServerPipeHandle(path string, securityDescriptor []byte, c *PipeConfig, first bool) (syscall.Handle, error) { @@ -264,39 +313,35 @@ func (l *win32PipeListener) makeServerPipe() (*win32File, error) { return f, nil } -func (l *win32PipeListener) connectServerPipe() error { +func (l *win32PipeListener) connectServerPipe(pipe *win32File) error { var err error - + // Wait for the client to connect. ch := make(chan error) go func(p *win32File) { ch <- connectPipe(p) - }(l.nextPipe) + }(pipe) select { case err = <-ch: if err != nil { - disconnectNamedPipe(l.nextPipe.handle) + disconnectNamedPipe(pipe.handle) } case <-l.closeCh: // Abort the connect request by closing the handle. - l.closeNextPipe() + pipe.Close() + // Note that we aren't nil-ing out l.nextPipe, it's + // harmless to .Close() on the file more than once. err = <-ch - if err == nil || err == ErrFileClosed { + if err == nil || err == ErrFileClosed || pipeWasConnected(err) { err = ErrPipeListenerClosed } } return err } -func (l *win32PipeListener) closeNextPipe() (err error) { - // This isn't thread-safe, but all the usage of nextPipe are - // confined to one goroutine, so this is fine. - if l.nextPipe != nil { - err = l.nextPipe.Close() - l.nextPipe = nil - } - return +func pipeWasConnected(err error) bool { + return err == cERROR_NO_DATA || err == cERROR_PIPE_CONNECTED } func (l *win32PipeListener) listenerRoutine() { @@ -307,14 +352,28 @@ func (l *win32PipeListener) listenerRoutine() { case <-l.closeCh: closed = true case responseCh := <-l.acceptCh: - var err error - if nextErr != nil { + var ( + nextPipe *win32File + err error + ) + + nextPipe = (*win32File)(atomic.LoadPointer(&l.nextPipe)) + + if nextPipe == nil { responseCh <- acceptResponse{nil, nextErr} - l.nextPipe, nextErr = l.makeServerPipe() + + l.nextLock.Lock() + nextPipe = (*win32File)(atomic.LoadPointer(&l.nextPipe)) + if nextPipe == nil { + nextPipe, nextErr = l.makeServerPipe() + atomic.StorePointer(&l.nextPipe, unsafe.Pointer(nextPipe)) + // l.nextPipe, nextErr = l.makeServerPipe() + } + l.nextLock.Unlock() continue } for { - err = l.connectServerPipe() + err = l.connectServerPipe(nextPipe) // If the connection was immediately closed by the client, try // again. if err != cERROR_NO_DATA { @@ -322,16 +381,25 @@ func (l *win32PipeListener) listenerRoutine() { } } closed = err == ErrPipeListenerClosed - p := l.nextPipe + p := nextPipe if !closed { - l.nextPipe, nextErr = l.makeServerPipe() + l.nextLock.Lock() + nextPipe, nextErr = l.makeServerPipe() + atomic.StorePointer(&l.nextPipe, unsafe.Pointer(nextPipe)) + l.nextLock.Unlock() } responseCh <- acceptResponse{p, err} } } - l.closeNextPipe() // Notify Close() and Accept() callers that the handle has been closed. close(l.doneCh) + l.nextLock.Lock() + defer l.nextLock.Unlock() + if l.nextPipe != nil { + nextPipe := (*win32File)(atomic.LoadPointer(&l.nextPipe)) + nextPipe.Close() + atomic.StorePointer(&l.nextPipe, nil) + } } // PipeConfig contain configuration for the pipe listener. @@ -375,7 +443,7 @@ func ListenPipe(path string, c *PipeConfig) (net.Listener, error) { return nil, err } l := &win32PipeListener{ - nextPipe: p, + nextPipe: unsafe.Pointer(p), path: path, securityDescriptor: sd, config: *c, @@ -413,10 +481,14 @@ func (l *win32PipeListener) Accept() (net.Conn, error) { } if l.config.MessageMode { return &win32MessageBytePipe{ - win32Pipe: win32Pipe{win32File: response.f, path: l.path}, + win32Pipe: win32Pipe{ + win32File: response.f, + path: l.path, + listener: l, + }, }, nil } - return &win32Pipe{win32File: response.f, path: l.path}, nil + return &win32Pipe{win32File: response.f, path: l.path, listener: l}, nil case <-l.doneCh: return nil, ErrPipeListenerClosed }