Skip to content

Commit

Permalink
fixes nanomsg#2 Add support for Named Pipes on Windows
Browse files Browse the repository at this point in the history
This adds support for Windows Named Pipes as ipc:// URLs that are
compatible with nanomsg and NNG.

It also includes (untested) support for SecurityDescriptor and
InputBufferSize and OutputBufferSize tunables.   It is built on
the github.com/Microsoft/go-winio library.

Note that legacy libnanomsg (all versions 1.1.3 and earlier) is
very fragile in it's handling of IPC, and assumes that senders
will only send a single atomic write.  This assumption requires
us to make an extra data copy.  (Note that NNG has no such assumptions,
and we could easily dispense with the data copy for NNG.)
  • Loading branch information
gdamore committed Jun 7, 2018
1 parent d29e27b commit 6e946f1
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 103 deletions.
65 changes: 1 addition & 64 deletions conn.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 The Mangos Authors
// Copyright 2018 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand Down Expand Up @@ -145,69 +145,6 @@ func NewConnPipe(c net.Conn, sock Socket, props ...interface{}) (Pipe, error) {
return p, nil
}

// NewConnPipeIPC allocates a new Pipe using the IPC exchange protocol.
func NewConnPipeIPC(c net.Conn, sock Socket, props ...interface{}) (Pipe, error) {
p := &connipc{conn: conn{c: c, proto: sock.GetProtocol(), sock: sock}}

if err := p.handshake(props); err != nil {
return nil, err
}

return p, nil
}

func (p *connipc) Send(msg *Message) error {

l := uint64(len(msg.Header) + len(msg.Body))
one := [1]byte{1}
var err error

// send length header
if _, err = p.c.Write(one[:]); err != nil {
return err
}
if err = binary.Write(p.c, binary.BigEndian, l); err != nil {
return err
}
if _, err = p.c.Write(msg.Header); err != nil {
return err
}
// hope this works
if _, err = p.c.Write(msg.Body); err != nil {
return err
}
msg.Free()
return nil
}

func (p *connipc) Recv() (*Message, error) {

var sz int64
var err error
var msg *Message
var one [1]byte

if _, err = p.c.Read(one[:]); err != nil {
return nil, err
}
if err = binary.Read(p.c, binary.BigEndian, &sz); err != nil {
return nil, err
}

// Limit messages to the maximum receive value, if not
// unlimited. This avoids a potential denaial of service.
if sz < 0 || (p.maxrx > 0 && sz > p.maxrx) {
return nil, ErrTooLong
}
msg = NewMessage(int(sz))
msg.Body = msg.Body[0:sz]
if _, err = io.ReadFull(p.c, msg.Body); err != nil {
msg.Free()
return nil, err
}
return msg, nil
}

// connHeader is exchanged during the initial handshake.
type connHeader struct {
Zero byte // must be zero
Expand Down
88 changes: 88 additions & 0 deletions connipc_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// +build !windows

// Copyright 2018 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
// You may obtain a copy of the license at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mangos

import (
"encoding/binary"
"io"
"net"
)

// NewConnPipeIPC allocates a new Pipe using the IPC exchange protocol.
func NewConnPipeIPC(c net.Conn, sock Socket, props ...interface{}) (Pipe, error) {
p := &connipc{conn: conn{c: c, proto: sock.GetProtocol(), sock: sock}}

if err := p.handshake(props); err != nil {
return nil, err
}

return p, nil
}

func (p *connipc) Send(msg *Message) error {

l := uint64(len(msg.Header) + len(msg.Body))
// one := [1]byte{1}
var err error

// send length header
header := make([]byte, 9)
header[0] = 1
binary.BigEndian.PutUint64(header[1:], l)

if _, err = p.c.Write(header[:]); err != nil {
return err
}

if _, err = p.c.Write(msg.Header); err != nil {
return err
}
// hope this works
if _, err = p.c.Write(msg.Body); err != nil {
return err
}
msg.Free()
return nil
}

func (p *connipc) Recv() (*Message, error) {

var sz int64
var err error
var msg *Message
var one [1]byte

if _, err = p.c.Read(one[:]); err != nil {
return nil, err
}
if err = binary.Read(p.c, binary.BigEndian, &sz); err != nil {
return nil, err
}

// Limit messages to the maximum receive value, if not
// unlimited. This avoids a potential denaial of service.
if sz < 0 || (p.maxrx > 0 && sz > p.maxrx) {
return nil, ErrTooLong
}
msg = NewMessage(int(sz))
msg.Body = msg.Body[0:sz]
if _, err = io.ReadFull(p.c, msg.Body); err != nil {
msg.Free()
return nil, err
}
return msg, nil
}
87 changes: 87 additions & 0 deletions connipc_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// +build windows

// Copyright 2018 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
// You may obtain a copy of the license at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mangos

import (
"encoding/binary"
"io"
"net"
)

// NewConnPipeIPC allocates a new Pipe using the IPC exchange protocol.
func NewConnPipeIPC(c net.Conn, sock Socket, props ...interface{}) (Pipe, error) {
p := &connipc{conn: conn{c: c, proto: sock.GetProtocol(), sock: sock}}

if err := p.handshake(props); err != nil {
return nil, err
}

return p, nil
}

func (p *connipc) Send(msg *Message) error {

l := uint64(len(msg.Header) + len(msg.Body))
var err error

// On Windows, we have to put everything into a contiguous buffer.
// This is to workaround bugs in legacy libnanomsg. Eventually we
// might do away with this logic, but only when legacy libnanomsg
// has been fixed. This puts some pressure on the gc too, which
// makes me pretty sad.

// send length header
buf := make([]byte, 9, 9+l)
buf[0] = 1
binary.BigEndian.PutUint64(buf[1:], l)
buf = append(buf, msg.Header...)
buf = append(buf, msg.Body...)

if _, err = p.c.Write(buf[:]); err != nil {
return err
}
msg.Free()
return nil
}

func (p *connipc) Recv() (*Message, error) {

var sz int64
var err error
var msg *Message
var one [1]byte

if _, err = p.c.Read(one[:]); err != nil {
return nil, err
}
if err = binary.Read(p.c, binary.BigEndian, &sz); err != nil {
return nil, err
}

// Limit messages to the maximum receive value, if not
// unlimited. This avoids a potential denaial of service.
if sz < 0 || (p.maxrx > 0 && sz > p.maxrx) {
return nil, ErrTooLong
}
msg = NewMessage(int(sz))
msg.Body = msg.Body[0:sz]
if _, err = io.ReadFull(p.c, msg.Body); err != nil {
msg.Free()
return nil, err
}
return msg, nil
}
15 changes: 2 additions & 13 deletions test/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015 The Mangos Authors
// Copyright 2018 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand All @@ -15,7 +15,6 @@
package test

import (
"runtime"
"strings"
"testing"
"time"
Expand All @@ -29,11 +28,6 @@ import (

func benchmarkReq(t *testing.B, url string, size int) {

if strings.HasPrefix(url, "ipc://") && runtime.GOOS == "windows" {
t.Skip("IPC not supported on Windows")
return
}

srvopts := make(map[string]interface{})
cliopts := make(map[string]interface{})

Expand Down Expand Up @@ -111,11 +105,6 @@ func benchmarkReq(t *testing.B, url string, size int) {

func benchmarkPair(t *testing.B, url string, size int) {

if strings.HasPrefix(url, "ipc://") && runtime.GOOS == "windows" {
t.Skip("IPC not supported on Windows")
return
}

srvopts := make(map[string]interface{})
cliopts := make(map[string]interface{})

Expand Down Expand Up @@ -187,7 +176,7 @@ func benchmarkPair(t *testing.B, url string, size int) {

var benchInpAddr = "inproc://benchmark_test"
var benchTCPAddr = "tcp://127.0.0.1:33833"
var benchIPCAddr = "ipc:///tmp/benchmark_test"
var benchIPCAddr = "ipc://benchmark_test_sock"
var benchTLSAddr = "tls+tcp://127.0.0.1:44844"
var benchWSAddr = "ws://127.0.0.1:55855/BENCHMARK"
var benchWSSAddr = "wss://127.0.0.1:55856/BENCHMARK"
Expand Down
10 changes: 2 additions & 8 deletions test/common_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 The Mangos Authors
// Copyright 2018 The Mangos Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@ import (
"bytes"
"encoding/binary"
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -583,11 +582,6 @@ func slowStart(t *testing.T, cases []TestCase) bool {
// RunTests runs tests.
func RunTests(t *testing.T, addr string, cases []TestCase) {

if strings.HasPrefix(addr, "ipc://") && runtime.GOOS == "windows" {
t.Skip("IPC not supported on Windows yet")
return
}

// We need to inject a slight bit of sleep to allow any sessions to
// drain before we close connections.
defer time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -629,7 +623,7 @@ func RunTests(t *testing.T, addr string, cases []TestCase) {
var AddrTestTCP = "tcp://127.0.0.1:59093"

// AddrTestIPC is a suitable IPC address for testing.
var AddrTestIPC = "ipc:///tmp/MYTEST_IPC"
var AddrTestIPC = "ipc://MYTEST_IPC_SOCK"

// AddrTestInp is a suitable Inproc address for testing.
var AddrTestInp = "inproc://MYTEST_INPROC"
Expand Down
7 changes: 1 addition & 6 deletions test/device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package test

import (
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -294,11 +293,7 @@ func TestDeviceLoopInp(t *testing.T) {
}

func TestDeviceLoopIPC(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("IPC not supported on Windows")
} else {
testDevLoop(t, AddrTestIPC)
}
testDevLoop(t, AddrTestIPC)
}

func TestDeviceLoopTLS(t *testing.T) {
Expand Down
Loading

0 comments on commit 6e946f1

Please sign in to comment.