Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: feat: support for Windows #191

Open
wants to merge 10 commits into
base: revert-178-feat/adjust_windows
Choose a base branch
from
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@

# Dependency directories (remove the comment below to include it)
# vendor/
.idea/
.idea/

.vscode
1 change: 1 addition & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ type Connection interface {
// to polling check connection status.
AddCloseCallback(callback CloseCallback) error
}

6 changes: 3 additions & 3 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (

// connection is the implement of Connection
type connection struct {
netFD
netFD
onEvent
locker
operator *FDOperator
Expand Down Expand Up @@ -290,7 +290,7 @@ var barrierPool = sync.Pool{
New: func() interface{} {
return &barrier{
bs: make([][]byte, barriercap),
ivs: make([]syscall.Iovec, barriercap),
ivs: make([]iovec, barriercap),
}
},
}
Expand All @@ -308,7 +308,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
c.initFDOperator()
c.initFinalizer()

syscall.SetNonblock(c.fd, true)
sysSetNonblock(c.fd, true)
// enable TCP_NODELAY by default
switch c.network {
case "tcp", "tcp4", "tcp6":
Expand Down
3 changes: 1 addition & 2 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package netpoll

import (
"sync/atomic"
"syscall"
)

// ------------------------------------------ implement FDOperator ------------------------------------------
Expand Down Expand Up @@ -139,7 +138,7 @@ func (c *connection) flush() error {
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
if err != nil && err != syscall.EAGAIN {
if err != nil && err != SEND_RECV_AGAIN {
return Exception(err, "when flush")
}
if n > 0 {
Expand Down
35 changes: 23 additions & 12 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestConnectionReadAfterClosed(t *testing.T) {
Equal(t, len(buf), size)
}()
time.Sleep(time.Millisecond)
syscall.Write(w, msg)
sysWrite(w, msg)
syscall.Close(w)
wg.Wait()
}
Expand All @@ -150,7 +150,7 @@ func TestConnectionWaitReadHalfPacket(t *testing.T) {
var msg = make([]byte, size)

// write half packet
syscall.Write(w, msg[:size/2])
sysWrite(w, msg[:size/2])
// wait poller reads buffer
for rconn.inputBuffer.Len() <= 0 {
runtime.Gosched()
Expand All @@ -172,7 +172,7 @@ func TestConnectionWaitReadHalfPacket(t *testing.T) {
runtime.Gosched()
}
Equal(t, atomic.LoadInt64(&rconn.waitReadSize), int64(size))
syscall.Write(w, msg[size/2:])
sysWrite(w, msg[size/2:])
wg.Wait()
}

Expand All @@ -192,9 +192,9 @@ func TestReadTrigger(t *testing.T) {
Equal(t, len(trigger), 1)
}

func writeAll(fd int, buf []byte) error {
func writeAll(fd fdtype, buf []byte) error {
for len(buf) > 0 {
n, err := syscall.Write(fd, buf)
n, err := sysWrite(fd, buf)
if n < 0 {
return err
}
Expand All @@ -209,7 +209,7 @@ func TestLargeBufferWrite(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
MustNil(t, err)

trigger := make(chan int)
trigger := make(chan fdtype)
defer close(trigger)
go func() {
for {
Expand Down Expand Up @@ -247,8 +247,12 @@ func TestLargeBufferWrite(t *testing.T) {

time.Sleep(time.Millisecond * 50)
buf := make([]byte, 1024)
for i := 0; i < 128*bufferSize/1024; i++ {
_, err := syscall.Read(rfd, buf)
for i := 0; i < 128*bufferSize; {
j, err := sysRead(rfd, buf)
if err == syscall.Errno(10035) && runtime.GOOS == "windows" {
continue
}
i += j
MustNil(t, err)
}
// close success
Expand Down Expand Up @@ -280,7 +284,7 @@ func TestConnectionLargeMemory(t *testing.T) {

var msg = make([]byte, rn)
for i := 0; i < wn/rn; i++ {
n, err := syscall.Write(w, msg)
n, err := sysWrite(w, msg)
if err != nil {
panic(err)
}
Expand All @@ -292,7 +296,12 @@ func TestConnectionLargeMemory(t *testing.T) {

runtime.ReadMemStats(&end)
alloc := end.TotalAlloc - start.TotalAlloc
limit := uint64(4 * 1024 * 1024)
var limit uint64
if runtime.GOOS == "windows" {
limit = uint64(100 * 1024 * 1024)
} else {
limit = uint64(4 * 1024 * 1024)
}
if alloc > limit {
panic(fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit))
}
Expand All @@ -301,14 +310,16 @@ func TestConnectionLargeMemory(t *testing.T) {
// TestSetTCPNoDelay is used to verify the connection initialization set the TCP_NODELAY correctly
func TestSetTCPNoDelay(t *testing.T) {
fd, err := sysSocket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
MustNil(t, err)

conn := &connection{}
conn.init(&netFD{network: "tcp", fd: fd}, nil)

n, _ := syscall.GetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY)
n, _ := sysGetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY)
MustTrue(t, n > 0)
err = setTCPNoDelay(fd, false)
MustNil(t, err)
n, _ = syscall.GetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY)
n, _ = sysGetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY)
MustTrue(t, n == 0)
}

Expand Down
2 changes: 1 addition & 1 deletion fd_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
// FD is file descriptor, poll will bind when register.
FD int
FD fdtype

// The FDOperator provides three operations of reading, writing, and hanging.
// The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator.
Expand Down
4 changes: 2 additions & 2 deletions fd_operator_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestPersistFDOperator(t *testing.T) {
var ops = make([]*FDOperator, size)
for i := 0; i < size; i++ {
op := allocop()
op.FD = i
op.FD = fdtype(i)
ops[i] = op
}
// gc
Expand All @@ -35,7 +35,7 @@ func TestPersistFDOperator(t *testing.T) {
}
// check alloc
for i := range ops {
Equal(t, ops[i].FD, i)
Equal(t, ops[i].FD, fdtype(i))
freeop(ops[i])
}
}
Expand Down
1 change: 1 addition & 0 deletions net_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (d *dialer) DialConnection(network, address string, timeout time.Duration)
}
connection, err = DialTCP(ctx, network, nil, raddr)
// case "udp", "udp4", "udp6": // TODO: unsupport now
// TODO: Unix sockets are not supported under the windows platform
case "unix", "unixgram", "unixpacket":
var raddr *UnixAddr
raddr, err = ResolveUnixAddr(network, address)
Expand Down
12 changes: 7 additions & 5 deletions net_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strconv"
"strings"
"sync"
"syscall"
"testing"
"time"
)
Expand Down Expand Up @@ -61,6 +60,9 @@ func TestDialerTCP(t *testing.T) {
}

func TestDialerUnix(t *testing.T) {
if runtime.GOOS == "windows" {
return
}
dialer := NewDialer()
conn, err := dialer.DialTimeout("unix", "tmp.sock", time.Second)
MustTrue(t, err != nil)
Expand Down Expand Up @@ -126,7 +128,7 @@ func TestDialerFdAlloc(t *testing.T) {
runtime.Gosched()
}
time.Sleep(time.Millisecond)
syscall.SetNonblock(fd, true)
sysSetNonblock(fd, true)
}
}

Expand All @@ -145,18 +147,18 @@ func TestFDClose(t *testing.T) {
defer cancel1()
defer el1.Shutdown(ctx1)

var fd int
var fd fdtype
var conn Connection
conn, err = DialConnection("tcp", ":1234", time.Second)
MustNil(t, err)
fd = conn.(*TCPConnection).fd
syscall.SetNonblock(fd, true)
sysSetNonblock(fd, true)
conn.Close()

conn, err = DialConnection("tcp", ":1234", time.Second)
MustNil(t, err)
fd = conn.(*TCPConnection).fd
syscall.SetNonblock(fd, true)
sysSetNonblock(fd, true)
time.Sleep(time.Second)
conn.Close()
}
Expand Down
14 changes: 7 additions & 7 deletions net_listener.go → net_listener_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Listener interface {
net.Listener

// Fd return listener's fd, used by poll.
Fd() (fd int)
Fd() (fd fdtype)
}

// CreateListener return a new Listener.
Expand Down Expand Up @@ -57,7 +57,7 @@ func ConvertListener(l net.Listener) (nl Listener, err error) {
if err != nil {
return nil, err
}
return ln, syscall.SetNonblock(ln.fd, true)
return ln, sysSetNonblock(ln.fd, true)
}

// TODO: udpListener does not work now.
Expand All @@ -75,14 +75,14 @@ func udpListener(network, addr string) (l Listener, err error) {
if err != nil {
return nil, err
}
ln.fd = int(ln.file.Fd())
return ln, syscall.SetNonblock(ln.fd, true)
ln.fd = fdtype(ln.file.Fd())
return ln, sysSetNonblock(ln.fd, true)
}

var _ net.Listener = &listener{}

type listener struct {
fd int
fd fdtype
addr net.Addr // listener's local addr
ln net.Listener // tcp|unix listener
pconn net.PacketConn // udp listener
Expand Down Expand Up @@ -139,7 +139,7 @@ func (ln *listener) Addr() net.Addr {
}

// Fd implements Listener.
func (ln *listener) Fd() (fd int) {
func (ln *listener) Fd() (fd fdtype) {
return ln.fd
}

Expand All @@ -155,6 +155,6 @@ func (ln *listener) parseFD() (err error) {
if err != nil {
return err
}
ln.fd = int(ln.file.Fd())
ln.fd = fdtype(ln.file.Fd())
return nil
}
2 changes: 2 additions & 0 deletions net_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestListenerDialer(t *testing.T) {
if conn == nil && err == nil {
continue
}
MustNil(t, err)
go func(conn net.Conn) {
<-trigger
buf := make([]byte, 10)
Expand Down Expand Up @@ -80,6 +81,7 @@ func TestListenerDialer(t *testing.T) {
for i := 0; i < 10; i++ {
conn, err := dialer.DialConnection(network, addr, time.Second)
if err != nil {
//fmt.Println(err.Error())
continue
}
conn.AddCloseCallback(callback)
Expand Down
Loading