diff --git a/.gitignore b/.gitignore index 241aa67b..aff855b6 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,6 @@ # Dependency directories (remove the comment below to include it) # vendor/ -.idea/ \ No newline at end of file +.idea/ + +.vscode \ No newline at end of file diff --git a/connection.go b/connection.go index 495bdcb8..87a55f32 100644 --- a/connection.go +++ b/connection.go @@ -64,3 +64,4 @@ type Connection interface { // to polling check connection status. AddCloseCallback(callback CloseCallback) error } + diff --git a/connection_impl.go b/connection_impl.go index 8ba435df..e39841e0 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -27,7 +27,7 @@ const ( // connection is the implement of Connection type connection struct { - netFD + netFD onEvent locker operator *FDOperator @@ -290,7 +290,7 @@ var barrierPool = sync.Pool{ New: func() interface{} { return &barrier{ bs: make([][]byte, barriercap), - ivs: make([]syscall.Iovec, barriercap), + ivs: make([]iovec, barriercap), } }, } @@ -308,7 +308,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) { c.initFDOperator() c.initFinalizer() - syscall.SetNonblock(c.fd, true) + sysSetNonblock(c.fd, true) // enable TCP_NODELAY by default switch c.network { case "tcp", "tcp4", "tcp6": diff --git a/connection_reactor.go b/connection_reactor.go index d8e3e9e5..7201c5ca 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -16,7 +16,6 @@ package netpoll import ( "sync/atomic" - "syscall" ) // ------------------------------------------ implement FDOperator ------------------------------------------ @@ -139,7 +138,7 @@ func (c *connection) flush() error { // TODO: Let the upper layer pass in whether to use ZeroCopy. var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs) var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy) - if err != nil && err != syscall.EAGAIN { + if err != nil && err != SEND_RECV_AGAIN { return Exception(err, "when flush") } if n > 0 { diff --git a/connection_test.go b/connection_test.go index 983bf860..73e603f4 100644 --- a/connection_test.go +++ b/connection_test.go @@ -137,7 +137,7 @@ func TestConnectionReadAfterClosed(t *testing.T) { Equal(t, len(buf), size) }() time.Sleep(time.Millisecond) - syscall.Write(w, msg) + sysWrite(w, msg) syscall.Close(w) wg.Wait() } @@ -150,7 +150,7 @@ func TestConnectionWaitReadHalfPacket(t *testing.T) { var msg = make([]byte, size) // write half packet - syscall.Write(w, msg[:size/2]) + sysWrite(w, msg[:size/2]) // wait poller reads buffer for rconn.inputBuffer.Len() <= 0 { runtime.Gosched() @@ -172,7 +172,7 @@ func TestConnectionWaitReadHalfPacket(t *testing.T) { runtime.Gosched() } Equal(t, atomic.LoadInt64(&rconn.waitReadSize), int64(size)) - syscall.Write(w, msg[size/2:]) + sysWrite(w, msg[size/2:]) wg.Wait() } @@ -192,9 +192,9 @@ func TestReadTrigger(t *testing.T) { Equal(t, len(trigger), 1) } -func writeAll(fd int, buf []byte) error { +func writeAll(fd fdtype, buf []byte) error { for len(buf) > 0 { - n, err := syscall.Write(fd, buf) + n, err := sysWrite(fd, buf) if n < 0 { return err } @@ -209,7 +209,7 @@ func TestLargeBufferWrite(t *testing.T) { ln, err := CreateListener("tcp", ":1234") MustNil(t, err) - trigger := make(chan int) + trigger := make(chan fdtype) defer close(trigger) go func() { for { @@ -247,8 +247,12 @@ func TestLargeBufferWrite(t *testing.T) { time.Sleep(time.Millisecond * 50) buf := make([]byte, 1024) - for i := 0; i < 128*bufferSize/1024; i++ { - _, err := syscall.Read(rfd, buf) + for i := 0; i < 128*bufferSize; { + j, err := sysRead(rfd, buf) + if err == syscall.Errno(10035) && runtime.GOOS == "windows" { + continue + } + i += j MustNil(t, err) } // close success @@ -280,7 +284,7 @@ func TestConnectionLargeMemory(t *testing.T) { var msg = make([]byte, rn) for i := 0; i < wn/rn; i++ { - n, err := syscall.Write(w, msg) + n, err := sysWrite(w, msg) if err != nil { panic(err) } @@ -292,7 +296,12 @@ func TestConnectionLargeMemory(t *testing.T) { runtime.ReadMemStats(&end) alloc := end.TotalAlloc - start.TotalAlloc - limit := uint64(4 * 1024 * 1024) + var limit uint64 + if runtime.GOOS == "windows" { + limit = uint64(100 * 1024 * 1024) + } else { + limit = uint64(4 * 1024 * 1024) + } if alloc > limit { panic(fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit)) } @@ -301,14 +310,16 @@ func TestConnectionLargeMemory(t *testing.T) { // TestSetTCPNoDelay is used to verify the connection initialization set the TCP_NODELAY correctly func TestSetTCPNoDelay(t *testing.T) { fd, err := sysSocket(syscall.AF_INET, syscall.SOCK_STREAM, 0) + MustNil(t, err) + conn := &connection{} conn.init(&netFD{network: "tcp", fd: fd}, nil) - n, _ := syscall.GetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY) + n, _ := sysGetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY) MustTrue(t, n > 0) err = setTCPNoDelay(fd, false) MustNil(t, err) - n, _ = syscall.GetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY) + n, _ = sysGetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY) MustTrue(t, n == 0) } diff --git a/fd_operator.go b/fd_operator.go index 8e025a52..032d0901 100644 --- a/fd_operator.go +++ b/fd_operator.go @@ -22,7 +22,7 @@ import ( // FDOperator is a collection of operations on file descriptors. type FDOperator struct { // FD is file descriptor, poll will bind when register. - FD int + FD fdtype // The FDOperator provides three operations of reading, writing, and hanging. // The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator. diff --git a/fd_operator_cache_test.go b/fd_operator_cache_test.go index aa5f7263..9401111f 100644 --- a/fd_operator_cache_test.go +++ b/fd_operator_cache_test.go @@ -26,7 +26,7 @@ func TestPersistFDOperator(t *testing.T) { var ops = make([]*FDOperator, size) for i := 0; i < size; i++ { op := allocop() - op.FD = i + op.FD = fdtype(i) ops[i] = op } // gc @@ -35,7 +35,7 @@ func TestPersistFDOperator(t *testing.T) { } // check alloc for i := range ops { - Equal(t, ops[i].FD, i) + Equal(t, ops[i].FD, fdtype(i)) freeop(ops[i]) } } diff --git a/net_dialer.go b/net_dialer.go index 4f3dc21a..7154f3f6 100644 --- a/net_dialer.go +++ b/net_dialer.go @@ -67,6 +67,7 @@ func (d *dialer) DialConnection(network, address string, timeout time.Duration) } connection, err = DialTCP(ctx, network, nil, raddr) // case "udp", "udp4", "udp6": // TODO: unsupport now + // TODO: Unix sockets are not supported under the windows platform case "unix", "unixgram", "unixpacket": var raddr *UnixAddr raddr, err = ResolveUnixAddr(network, address) diff --git a/net_dialer_test.go b/net_dialer_test.go index c89f35a7..5456eca1 100644 --- a/net_dialer_test.go +++ b/net_dialer_test.go @@ -21,7 +21,6 @@ import ( "strconv" "strings" "sync" - "syscall" "testing" "time" ) @@ -61,6 +60,9 @@ func TestDialerTCP(t *testing.T) { } func TestDialerUnix(t *testing.T) { + if runtime.GOOS == "windows" { + return + } dialer := NewDialer() conn, err := dialer.DialTimeout("unix", "tmp.sock", time.Second) MustTrue(t, err != nil) @@ -126,7 +128,7 @@ func TestDialerFdAlloc(t *testing.T) { runtime.Gosched() } time.Sleep(time.Millisecond) - syscall.SetNonblock(fd, true) + sysSetNonblock(fd, true) } } @@ -145,18 +147,18 @@ func TestFDClose(t *testing.T) { defer cancel1() defer el1.Shutdown(ctx1) - var fd int + var fd fdtype var conn Connection conn, err = DialConnection("tcp", ":1234", time.Second) MustNil(t, err) fd = conn.(*TCPConnection).fd - syscall.SetNonblock(fd, true) + sysSetNonblock(fd, true) conn.Close() conn, err = DialConnection("tcp", ":1234", time.Second) MustNil(t, err) fd = conn.(*TCPConnection).fd - syscall.SetNonblock(fd, true) + sysSetNonblock(fd, true) time.Sleep(time.Second) conn.Close() } diff --git a/net_listener.go b/net_listener_linux.go similarity index 93% rename from net_listener.go rename to net_listener_linux.go index d6a924bb..73ba9b9a 100644 --- a/net_listener.go +++ b/net_listener_linux.go @@ -28,7 +28,7 @@ type Listener interface { net.Listener // Fd return listener's fd, used by poll. - Fd() (fd int) + Fd() (fd fdtype) } // CreateListener return a new Listener. @@ -57,7 +57,7 @@ func ConvertListener(l net.Listener) (nl Listener, err error) { if err != nil { return nil, err } - return ln, syscall.SetNonblock(ln.fd, true) + return ln, sysSetNonblock(ln.fd, true) } // TODO: udpListener does not work now. @@ -75,14 +75,14 @@ func udpListener(network, addr string) (l Listener, err error) { if err != nil { return nil, err } - ln.fd = int(ln.file.Fd()) - return ln, syscall.SetNonblock(ln.fd, true) + ln.fd = fdtype(ln.file.Fd()) + return ln, sysSetNonblock(ln.fd, true) } var _ net.Listener = &listener{} type listener struct { - fd int + fd fdtype addr net.Addr // listener's local addr ln net.Listener // tcp|unix listener pconn net.PacketConn // udp listener @@ -139,7 +139,7 @@ func (ln *listener) Addr() net.Addr { } // Fd implements Listener. -func (ln *listener) Fd() (fd int) { +func (ln *listener) Fd() (fd fdtype) { return ln.fd } @@ -155,6 +155,6 @@ func (ln *listener) parseFD() (err error) { if err != nil { return err } - ln.fd = int(ln.file.Fd()) + ln.fd = fdtype(ln.file.Fd()) return nil } diff --git a/net_listener_test.go b/net_listener_test.go index 12869846..f4ca307d 100644 --- a/net_listener_test.go +++ b/net_listener_test.go @@ -49,6 +49,7 @@ func TestListenerDialer(t *testing.T) { if conn == nil && err == nil { continue } + MustNil(t, err) go func(conn net.Conn) { <-trigger buf := make([]byte, 10) @@ -80,6 +81,7 @@ func TestListenerDialer(t *testing.T) { for i := 0; i < 10; i++ { conn, err := dialer.DialConnection(network, addr, time.Second) if err != nil { + //fmt.Println(err.Error()) continue } conn.AddCloseCallback(callback) diff --git a/net_listener_windows.go b/net_listener_windows.go new file mode 100644 index 00000000..b8d9d00f --- /dev/null +++ b/net_listener_windows.go @@ -0,0 +1,159 @@ +// Copyright 2021 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package netpoll + +import ( + "errors" + "net" + "syscall" + "unsafe" +) + +// Listener extends net.Listener, but supports getting the listener's fd. +type Listener interface { + net.Listener + + // Fd return listener's fd, used by poll. + Fd() (fd fdtype) +} + +// CreateListener return a new Listener. +func CreateListener(network, addr string) (l Listener, err error) { + if network == "udp" { + // TODO: udp listener. + return udpListener(network, addr) + } + // tcp, tcp4, tcp6, unix + ln, err := net.Listen(network, addr) + if err != nil { + return nil, err + } + return ConvertListener(ln) +} + +// ConvertListener converts net.Listener to Listener +func ConvertListener(l net.Listener) (nl Listener, err error) { + if tmp, ok := l.(Listener); ok { + return tmp, nil + } + ln := &listener{} + ln.ln = l + ln.addr = l.Addr() + err = ln.parseFD() + if err != nil { + return nil, err + } + return ln, sysSetNonblock(ln.fd, true) +} + +// TODO: udpListener does not work now. +func udpListener(network, addr string) (l Listener, err error) { + return nil, nil +} + +var _ net.Listener = &listener{} + +type listener struct { + fd fdtype + addr net.Addr // listener's local addr + ln net.Listener // tcp|unix listener + pconn net.PacketConn // udp listener + rawConn syscall.RawConn + syncClose chan int +} + +// Accept implements Listener. +func (ln *listener) Accept() (net.Conn, error) { + // udp + if ln.pconn != nil { + return ln.UDPAccept() + } + // tcp + var sa syscall.RawSockaddrAny + var len = unsafe.Sizeof(sa) + fduintptr, _, err := acceptProc.Call(uintptr(ln.fd), uintptr(unsafe.Pointer(&sa)), uintptr(unsafe.Pointer(&len))) + fd := fdtype(fduintptr) + if fd == syscall.InvalidHandle { + if err == WSAEWOULDBLOCK { + return nil, nil + } + return nil, err + } + var nfd = &netFD{} + nfd.fd = fd + nfd.localAddr = ln.addr + nfd.network = ln.addr.Network() + sa4, err := sa.Sockaddr() + nfd.remoteAddr = sockaddrToAddr(sa4) + return nfd, err +} + +// TODO: UDPAccept Not implemented. +func (ln *listener) UDPAccept() (net.Conn, error) { + return nil, Exception(ErrUnsupported, "UDP") +} + +// Close implements Listener. +func (ln *listener) Close() error { + if !isChanClose(ln.syncClose) { + close(ln.syncClose) + } + if ln.fd != 0 { + syscall.Close(ln.fd) + } + if ln.ln != nil { + ln.ln.Close() + } + if ln.pconn != nil { + ln.pconn.Close() + } + return nil +} + +// Addr implements Listener. +func (ln *listener) Addr() net.Addr { + return ln.addr +} + +// Fd implements Listener. +func (ln *listener) Fd() (fd fdtype) { + return ln.fd +} + +func (ln *listener) parseFD() (err error) { + switch netln := ln.ln.(type) { + case *net.TCPListener: + ln.rawConn, err = netln.SyscallConn() + case *net.UnixListener: + ln.rawConn, err = netln.SyscallConn() + default: + return errors.New("listener type can't support") + } + if err != nil { + return err + } + fdCh := make(chan uintptr, 1) + ln.syncClose = make(chan int) + go func() { + ln.rawConn.Control(func(fd uintptr) { + fdCh <- fd + <-ln.syncClose + }) + }() + ln.fd = fdtype(<-fdCh) + return nil +} diff --git a/net_netfd_conn.go b/net_netfd_conn.go index eaad40c7..b8cfa0c7 100644 --- a/net_netfd_conn.go +++ b/net_netfd_conn.go @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build darwin netbsd freebsd openbsd dragonfly linux +//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux || windows +// +build darwin netbsd freebsd openbsd dragonfly linux windows package netpoll @@ -30,19 +31,19 @@ type Conn interface { net.Conn // Fd return conn's fd, used by poll - Fd() (fd int) + Fd() (fd fdtype) } var _ Conn = &netFD{} // Fd implements Conn. -func (c *netFD) Fd() (fd int) { +func (c *netFD) Fd() (fd fdtype) { return c.fd } // Read implements Conn. func (c *netFD) Read(b []byte) (n int, err error) { - n, err = syscall.Read(c.fd, b) + n, err = sysRead(c.fd, b) if err != nil { if err == syscall.EAGAIN || err == syscall.EINTR { return 0, nil @@ -53,7 +54,7 @@ func (c *netFD) Read(b []byte) (n int, err error) { // Write implements Conn. func (c *netFD) Write(b []byte) (n int, err error) { - n, err = syscall.Write(c.fd, b) + n, err = sysWrite(c.fd, b) if err != nil { if err == syscall.EAGAIN { return 0, nil diff --git a/net_netfd.go b/net_netfd_linux.go similarity index 97% rename from net_netfd.go rename to net_netfd_linux.go index b4b6c1cd..f324ea89 100644 --- a/net_netfd.go +++ b/net_netfd_linux.go @@ -31,7 +31,7 @@ var ( type netFD struct { // file descriptor - fd int + fd fdtype // When calling netFD.dial(), fd will be registered into poll in some scenarios, such as dialing tcp socket, // but not in other scenarios, such as dialing unix socket. // This leads to a different behavior in register poller at after, so use this field to mark it. @@ -52,7 +52,7 @@ type netFD struct { remoteAddr net.Addr } -func newNetFD(fd, family, sotype int, net string) *netFD { +func newNetFD(fd fdtype, family, sotype int, net string) *netFD { var ret = &netFD{} ret.fd = fd ret.network = net @@ -186,7 +186,7 @@ func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa sysca if err := c.pd.WaitWrite(ctx); err != nil { return nil, err } - nerr, err := syscall.GetsockoptInt(c.fd, syscall.SOL_SOCKET, syscall.SO_ERROR) + nerr, err := syscall.GetsockoptInt(c.fd, syscall.SOL_SOCKET, SO_ERROR) if err != nil { return nil, os.NewSyscallError("getsockopt", err) } diff --git a/net_netfd_windows.go b/net_netfd_windows.go new file mode 100644 index 00000000..f356efa8 --- /dev/null +++ b/net_netfd_windows.go @@ -0,0 +1,235 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// This file may have been modified by CloudWeGo authors. (“CloudWeGo Modifications”). +// All CloudWeGo Modifications are Copyright 2021 CloudWeGo authors. + +//go:build windows +// +build windows + +package netpoll + +import ( + "context" + "errors" + "net" + "os" + "runtime" + "syscall" + "time" + "unsafe" +) + +var ( + // aLongTimeAgo is a non-zero time, far in the past, used for + // immediate cancelation of dials. + aLongTimeAgo = time.Unix(1, 0) + // nonDeadline and noCancel are just zero values for + // readability with functions taking too many parameters. + noDeadline = time.Time{} +) + +type netFD struct { + // file descriptor + fd fdtype + // When calling netFD.dial(), fd will be registered into poll in some scenarios, such as dialing tcp socket, + // but not in other scenarios, such as dialing unix socket. + // This leads to a different behavior in register poller at after, so use this field to mark it. + pd *pollDesc + // closed marks whether fd has expired + closed uint32 + // Whether this is a streaming descriptor, as opposed to a + // packet-based descriptor like a UDP socket. Immutable. + isStream bool + // Whether a zero byte read indicates EOF. This is false for a + // message based socket connection. + zeroReadIsEOF bool + family int // AF_INET, AF_INET6, syscall.AF_UNIX + sotype int // syscall.SOCK_STREAM, syscall.SOCK_DGRAM, syscall.SOCK_RAW + isConnected bool // handshake completed or use of association with peer + network string // tcp tcp4 tcp6, udp, udp4, udp6, ip, ip4, ip6, unix, unixgram, unixpacket + localAddr net.Addr + remoteAddr net.Addr +} + +func newNetFD(fd fdtype, family, sotype int, net string) *netFD { + var ret = &netFD{} + ret.fd = fd + ret.network = net + ret.family = family + ret.sotype = sotype + ret.isStream = sotype == syscall.SOCK_STREAM + ret.zeroReadIsEOF = sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW + return ret +} + +// if dial connection error, you need exec netFD.Close actively +func (c *netFD) dial(ctx context.Context, laddr, raddr sockaddr) (err error) { + var lsa syscall.Sockaddr + if laddr != nil { + if lsa, err = laddr.sockaddr(c.family); err != nil { + return err + } else if lsa != nil { + // bind local address + if err = syscall.Bind(c.fd, lsa); err != nil { + return os.NewSyscallError("bind", err) + } + } + } + var rsa syscall.Sockaddr // remote address from the user + var crsa syscall.Sockaddr // remote address we actually connected to + if raddr != nil { + if rsa, err = raddr.sockaddr(c.family); err != nil { + return err + } + } + // remote address we actually connected to + if crsa, err = c.connect(ctx, lsa, rsa); err != nil { + return err + } + c.isConnected = true + + // Record the local and remote addresses from the actual socket. + // Get the local address by calling Getsockname. + // For the remote address, use + // 1) the one returned by the connect method, if any; or + // 2) the one from Getpeername, if it succeeds; or + // 3) the one passed to us as the raddr parameter. + lsa, _ = syscall.Getsockname(c.fd) + c.localAddr = sockaddrToAddr(lsa) + if crsa != nil { + c.remoteAddr = sockaddrToAddr(crsa) + } else if crsa, _ = syscall.Getpeername(c.fd); crsa != nil { + c.remoteAddr = sockaddrToAddr(crsa) + } else { + c.remoteAddr = sockaddrToAddr(rsa) + } + return nil +} + +func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error) { + // Do not need to call c.writing here, + // because c is not yet accessible to user, + // so no concurrent operations are possible. + switch err := syscall.Connect(c.fd, ra); err { + case WSAEWOULDBLOCK, WSAEINPROGRESS: + case nil, WSAEALREADY: + select { + case <-ctx.Done(): + return nil, mapErr(ctx.Err()) + default: + } + return nil, nil + case syscall.EINVAL: + // On Solaris we can see EINVAL if the socket has + // already been accepted and closed by the server. + // Treat this as a successful connection--writes to + // the socket will see EOF. For details and a test + // case in C see https://golang.org/issue/6828. + if runtime.GOOS == "solaris" { + return nil, nil + } + fallthrough + default: + return nil, os.NewSyscallError("connect", err) + } + + // TODO: can't support interrupter now. + // Start the "interrupter" goroutine, if this context might be canceled. + // (The background context cannot) + // + // The interrupter goroutine waits for the context to be done and + // interrupts the dial (by altering the c's write deadline, which + // wakes up waitWrite). + if ctx != context.Background() { + // Wait for the interrupter goroutine to exit before returning + // from connect. + done := make(chan struct{}) + interruptRes := make(chan error) + defer func() { + close(done) + if ctxErr := <-interruptRes; ctxErr != nil && ret == nil { + // The interrupter goroutine called SetWriteDeadline, + // but the connect code below had returned from + // waitWrite already and did a successful connect (ret + // == nil). Because we've now poisoned the connection + // by making it unwritable, don't return a successful + // dial. This was issue 16523. + ret = mapErr(ctxErr) + c.Close() // prevent a leak + } + }() + go func() { + select { + case <-ctx.Done(): + // Force the runtime's poller to immediately give up + // waiting for writability, unblocking waitWrite + // below. + c.SetWriteDeadline(aLongTimeAgo) + interruptRes <- ctx.Err() + case <-done: + interruptRes <- nil + } + }() + } + + c.pd = newPollDesc(c.fd) + for { + // Performing multiple connect system calls on a + // non-blocking socket under Unix variants does not + // necessarily result in earlier errors being + // returned. Instead, once runtime-integrated network + // poller tells us that the socket is ready, get the + // SO_ERROR socket option to see if the connection + // succeeded or failed. See issue 7474 for further + // details. + if err := c.pd.WaitWrite(ctx); err != nil { + return nil, err + } + //nerr, err := syscall.GetsockoptInt(c.fd, syscall.SOL_SOCKET, SO_ERROR) + errout := -1 + len := unsafe.Sizeof(errout) + nerr, _, err := getsockoptProc.Call(uintptr(c.fd), uintptr(syscall.SOL_SOCKET), SO_ERROR, uintptr(unsafe.Pointer(&errout)), uintptr(unsafe.Pointer(&len))) + if nerr != 0 { + return nil, os.NewSyscallError("getsockopt", err) + } + switch err := syscall.Errno(nerr); err { + case WSAEWOULDBLOCK, WSAEINPROGRESS: + case WSAEALREADY: + return nil, nil + case syscall.Errno(0): + // The runtime poller can wake us up spuriously; + // see issues 14548 and 19289. Check that we are + // really connected; if not, wait again. + if rsa, err := syscall.Getpeername(c.fd); err == nil { + return rsa, nil + } + default: + return nil, os.NewSyscallError("connect", err) + } + } +} + +// Various errors contained in OpError. +var ( + errMissingAddress = errors.New("missing address") + errCanceled = errors.New("operation was canceled") + errIOTimeout = errors.New("i/o timeout") +) + +// mapErr maps from the context errors to the historical internal net +// error values. +// +// TODO(bradfitz): get rid of this after adjusting tests and making +// context.DeadlineExceeded implement net.Error? +func mapErr(err error) error { + switch err { + case context.Canceled: + return errCanceled + case context.DeadlineExceeded: + return errIOTimeout + default: + return err + } +} diff --git a/net_polldesc.go b/net_polldesc.go index d1636afa..b2f34df7 100644 --- a/net_polldesc.go +++ b/net_polldesc.go @@ -20,7 +20,7 @@ import ( ) // TODO: recycle *pollDesc -func newPollDesc(fd int) *pollDesc { +func newPollDesc(fd fdtype) *pollDesc { pd, op := &pollDesc{}, &FDOperator{} op.FD = fd op.OnWrite = pd.onwrite diff --git a/net_sock.go b/net_sock.go index 8f3efb94..069f3a8f 100644 --- a/net_sock.go +++ b/net_sock.go @@ -103,7 +103,7 @@ func favoriteAddrFamily(network string, laddr, raddr sockaddr) (family int, ipv6 // asynchronous I/O using the network poller. func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (netfd *netFD, err error) { // syscall.Socket & set socket options - var fd int + var fd fdtype fd, err = sysSocket(family, sotype, proto) if err != nil { return nil, err diff --git a/netpoll.go b/netpoll.go index a892422f..1f5ef4b6 100644 --- a/netpoll.go +++ b/netpoll.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux -// +build darwin netbsd freebsd openbsd dragonfly linux +//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux || windows +// +build darwin netbsd freebsd openbsd dragonfly linux windows package netpoll diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index fd861b4c..839f8e87 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -380,6 +380,7 @@ func (b *LinkBuffer) Flush() (err error) { } b.flush = b.write // re-cal length + //ln("flush",b,n) b.recalLen(n) return nil } diff --git a/poll.go b/poll.go index a9aac2e9..384fcc44 100644 --- a/poll.go +++ b/poll.go @@ -17,6 +17,7 @@ package netpoll // Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions, // and shields underlying differences. On linux systems, poll uses epoll by default, // and kevent by default on bsd systems. +// and WSAPoll on windows system type Poll interface { // Wait will poll all registered fds, and schedule processing based on the triggered event. // The call will block, so the usage can be like: diff --git a/poll_default_bsd.go b/poll_default_bsd.go index ec8f070c..9d310c3f 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -48,7 +48,7 @@ func openDefaultPoll() *defaultPoll { } type defaultPoll struct { - fd int + fd fdtype trigger uint32 hups []func(p Poll) error } @@ -60,7 +60,7 @@ func (p *defaultPoll) Wait() error { var events, barriers = make([]syscall.Kevent_t, size), make([]barrier, size) for i := range barriers { barriers[i].bs = make([][]byte, caps) - barriers[i].ivs = make([]syscall.Iovec, caps) + barriers[i].ivs = make([]iovec, caps) } // wait for { @@ -95,7 +95,7 @@ func (p *defaultPoll) Wait() error { if len(bs) > 0 { var n, err = readv(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) - if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { + if err != nil && err != SEND_RECV_AGAIN && err != syscall.EINTR { log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue @@ -122,7 +122,7 @@ func (p *defaultPoll) Wait() error { // TODO: Let the upper layer pass in whether to use ZeroCopy. var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) - if err != nil && err != syscall.EAGAIN { + if err != nil && err != SEND_RECV_AGAIN { log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue diff --git a/poll_default_linux.go b/poll_default_linux.go index c31a43a0..dc16502c 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -47,14 +47,14 @@ func openDefaultPoll() *defaultPoll { poll.Reset = poll.reset poll.Handler = poll.handler - poll.wop = &FDOperator{FD: int(r0)} + poll.wop = &FDOperator{FD: fdtype(r0)} poll.Control(poll.wop, PollReadable) return &poll } type defaultPoll struct { pollArgs - fd int // epoll fd + fd fdtype // epoll fd wop *FDOperator // eventfd, wake epoll_wait buf []byte // read wfd trigger msg trigger uint32 // trigger flag @@ -76,7 +76,7 @@ func (a *pollArgs) reset(size, caps int) { a.events, a.barriers = make([]epollevent, size), make([]barrier, size) for i := range a.barriers { a.barriers[i].bs = make([][]byte, a.caps) - a.barriers[i].ivs = make([]syscall.Iovec, a.caps) + a.barriers[i].ivs = make([]iovec, a.caps) } } @@ -115,7 +115,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { // trigger or exit gracefully if operator.FD == p.wop.FD { // must clean trigger first - syscall.Read(p.wop.FD, p.buf) + sysRead(p.wop.FD, p.buf) atomic.StoreUint32(&p.trigger, 0) // if closed & exit if p.buf[0] > 0 { @@ -140,7 +140,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { if len(bs) > 0 { var n, err = readv(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) - if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { + if err != nil && err != SEND_RECV_AGAIN && err != syscall.EINTR { log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue @@ -176,7 +176,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { // TODO: Let the upper layer pass in whether to use ZeroCopy. var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) - if err != nil && err != syscall.EAGAIN { + if err != nil && err != SEND_RECV_AGAIN { log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue @@ -193,7 +193,7 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { // Close will write 10000000 func (p *defaultPoll) Close() error { - _, err := syscall.Write(p.wop.FD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + _, err := sysWrite(p.wop.FD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) return err } @@ -203,7 +203,7 @@ func (p *defaultPoll) Trigger() error { return nil } // MAX(eventfd) = 0xfffffffffffffffe - _, err := syscall.Write(p.wop.FD, []byte{0, 0, 0, 0, 0, 0, 0, 1}) + _, err := sysWrite(p.wop.FD, []byte{0, 0, 0, 0, 0, 0, 0, 1}) return err } diff --git a/poll_default_windows.go b/poll_default_windows.go new file mode 100644 index 00000000..b2f2b9e0 --- /dev/null +++ b/poll_default_windows.go @@ -0,0 +1,287 @@ +// Copyright 2021 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !race +// +build !race + +package netpoll + +import ( + "log" + "runtime" + "sync" + "sync/atomic" + "syscall" +) + +var fd2FDOperator sync.Map + +const ( + POLLRDNORM = 0x0100 + POLLRDBAND = 0x0200 + POLLIN = (POLLRDNORM | POLLRDBAND) + POLLPRI = 0x0400 + + POLLWRNORM = 0x0010 + POLLOUT = (POLLWRNORM) + POLLWRBAND = 0x0020 + + POLLERR = 0x0001 + POLLHUP = 0x0002 + POLLNVAL = 0x0004 +) + +func openPoll() Poll { + return openDefaultPoll() +} + +func openDefaultPoll() *defaultPoll { + var poll = defaultPoll{} + poll.buf = make([]byte, 8) + poll.fdarrayMux.Lock() + poll.fdarray = make([]epollevent, 0) + poll.fdmode = make([]int, 0) + poll.fdarrayMux.Unlock() + r, w := GetSysFdPairs() + + poll.Reset = poll.reset + poll.Handler = poll.handler + + poll.wopr = &FDOperator{FD: fdtype(r)} + poll.wopw = &FDOperator{FD: fdtype(w)} + poll.Control(poll.wopr, PollReadable) + return &poll +} + +type defaultPoll struct { + pollArgs + fdarrayMux sync.Mutex + fdarray []epollevent // epoll fds + fdmode []int + wopr *FDOperator // eventfd, wake epoll_wait + wopw *FDOperator + buf []byte // read wfd trigger msg + trigger uint32 // trigger flag + // fns for handle events + Reset func(size, caps int) + Handler func(events []epollevent) (closed bool) +} + +type pollArgs struct { + size int + caps int + events []epollevent + barriers []barrier + hups []func(p Poll) error +} + +func (a *pollArgs) reset(size, caps int) { + a.size, a.caps = size, caps + a.events, a.barriers = make([]epollevent, size), make([]barrier, size) + for i := range a.barriers { + a.barriers[i].bs = make([][]byte, a.caps) + a.barriers[i].ivs = make([]iovec, a.caps) + } +} + +// Wait implements Poll. +func (p *defaultPoll) Wait() (err error) { + // init + // can not be blocked + var caps, msec, n = barriercap, 0, 0 + p.Reset(128, caps) + // wait + for { + if n == p.size && p.size < 128*1024 { + p.Reset(p.size<<1, caps) + } + p.fdarrayMux.Lock() + n, _ = EpollWait(p.fdarray, p.events, msec, p.fdmode) + p.fdarrayMux.Unlock() + + if n == 0xffffffff { + p.fdarrayMux.Lock() + for i := 0; i < len(p.fdarray); i++ { + if p.fdarray[i].fd == syscall.InvalidHandle { + continue + } + _, err := syscall.Getsockname(p.fdarray[i].fd) + if err != nil { + p.fdarray[i].fd = syscall.InvalidHandle + p.fdmode[i] = 0 + } + } + p.fdarrayMux.Unlock() + continue + } + if n == 0 { + //msec = -1 + runtime.Gosched() + continue + } + if p.Handler(p.events[:n]) { + return nil + } + } +} + +func (p *defaultPoll) handler(events []epollevent) (closed bool) { + for i := range events { + v, ok := fd2FDOperator.Load(events[i].fd) + if !ok { + return false + } + operator := v.(*FDOperator) + if !operator.do() { + continue + } + // trigger or exit gracefully + if operator.FD == p.wopr.FD { + // must clean trigger first + sysRead(p.wopr.FD, p.buf) + atomic.StoreUint32(&p.trigger, 0) + // if closed & exit + if p.buf[0] > 0 { + syscall.Close(p.wopr.FD) + syscall.Close(p.wopw.FD) + operator.done() + return true + } + operator.done() + continue + } + + evt := events[i].revents + // check poll in + if evt&POLLIN != 0 || evt&POLLHUP != 0 { + if operator.OnRead != nil { + // for non-connection + operator.OnRead(p) + } else { + // for connection + var bs = operator.Inputs(p.barriers[i].bs) + if len(bs) > 0 { + var n, err = readv(operator.FD, bs, p.barriers[i].ivs) + operator.InputAck(n) + if err != nil && err != SEND_RECV_AGAIN && err != syscall.EINTR { + log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + p.appendHup(operator) + continue + } + } + } + } + + // check hup + if evt&POLLHUP != 0 { + p.appendHup(operator) + continue + } + if evt&POLLERR != 0 { + p.appendHup(operator) + continue + } + + // check poll out + if evt&POLLOUT != 0 { + if operator.OnWrite != nil { + // for non-connection + operator.OnWrite(p) + } else { + // for connection + var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) + if len(bs) > 0 { + // TODO: Let the upper layer pass in whether to use ZeroCopy. + var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) + operator.OutputAck(n) + if err != nil && err != SEND_RECV_AGAIN { + log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + p.appendHup(operator) + continue + } + } + } + } + operator.done() + } + // hup conns together to avoid blocking the poll. + p.detaches() + return false +} + +// Close will write 10000000 +func (p *defaultPoll) Close() error { + _, err := sysWrite(p.wopw.FD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + return err +} + +// Trigger implements Poll. +func (p *defaultPoll) Trigger() error { + if atomic.AddUint32(&p.trigger, 1) > 1 { + return nil + } + // MAX(eventfd) = 0xfffffffffffffffe + _, err := sysWrite(p.wopw.FD, []byte{0, 0, 0, 0, 0, 0, 0, 1}) + return err +} + +// Control implements Poll. +func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { + var op int + var evt epollevent + evt.fd = operator.FD + mode := 0 + fd2FDOperator.Store(operator.FD, operator) + switch event { + case PollReadable: + operator.inuse() + op, evt.events = EPOLL_CTL_ADD, POLLIN + case PollModReadable: + operator.inuse() + op, evt.events = EPOLL_CTL_MOD, POLLIN + case PollDetach: + op, evt.events = EPOLL_CTL_DEL, POLLIN|POLLOUT + case PollWritable: + operator.inuse() + op, evt.events, mode = EPOLL_CTL_ADD, POLLOUT, ET_MOD + case PollR2RW: + op, evt.events = EPOLL_CTL_MOD, POLLIN|POLLOUT + case PollRW2R: + op, evt.events = EPOLL_CTL_MOD, POLLIN + } + p.fdarrayMux.Lock() + defer p.fdarrayMux.Unlock() + return EpollCtl(&p.fdarray, op, operator.FD, &evt, mode, &p.fdmode) +} + +func (p *defaultPoll) appendHup(operator *FDOperator) { + p.hups = append(p.hups, operator.OnHup) + operator.Control(PollDetach) + operator.done() +} + +func (p *defaultPoll) detaches() { + if len(p.hups) == 0 { + return + } + hups := p.hups + p.hups = nil + go func(onhups []func(p Poll) error) { + for i := range onhups { + if onhups[i] != nil { + onhups[i](p) + } + } + }(hups) +} diff --git a/poll_race_bsd.go b/poll_race_bsd.go index 39b2d7e6..19c04146 100644 --- a/poll_race_bsd.go +++ b/poll_race_bsd.go @@ -102,7 +102,7 @@ func (p *defaultPoll) Wait() error { if len(bs) > 0 { var n, err = readv(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) - if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { + if err != nil && err != SEND_RECV_AGAIN && err != syscall.EINTR { log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue @@ -129,7 +129,7 @@ func (p *defaultPoll) Wait() error { // TODO: Let the upper layer pass in whether to use ZeroCopy. var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) - if err != nil && err != syscall.EAGAIN { + if err != nil && err != SEND_RECV_AGAIN { log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue diff --git a/poll_race_linux.go b/poll_race_linux.go index da28cd49..f9714093 100644 --- a/poll_race_linux.go +++ b/poll_race_linux.go @@ -106,7 +106,7 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { // trigger or exit gracefully if fd == p.wfd { // must clean trigger first - syscall.Read(p.wfd, p.buf) + sysRead(p.wfd, p.buf) atomic.StoreUint32(&p.trigger, 0) // if closed & exit if p.buf[0] > 0 { @@ -137,7 +137,7 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { if len(bs) > 0 { var n, err = readv(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) - if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { + if err != nil && err != SEND_RECV_AGAIN && err != syscall.EINTR { log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue @@ -174,7 +174,7 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { // TODO: Let the upper layer pass in whether to use ZeroCopy. var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) - if err != nil && err != syscall.EAGAIN { + if err != nil && err != SEND_RECV_AGAIN { log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) p.appendHup(operator) continue @@ -191,7 +191,7 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) { // Close will write 10000000 func (p *defaultPoll) Close() error { - _, err := syscall.Write(p.wfd, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + _, err := sysWrite(p.wfd, []byte{1, 0, 0, 0, 0, 0, 0, 0}) // delete all *FDOperator p.m.Range(func(key, value interface{}) bool { var operator, _ = value.(*FDOperator) @@ -209,7 +209,7 @@ func (p *defaultPoll) Trigger() error { return nil } // MAX(eventfd) = 0xfffffffffffffffe - _, err := syscall.Write(p.wfd, []byte{0, 0, 0, 0, 0, 0, 0, 1}) + _, err := sysWrite(p.wfd, []byte{0, 0, 0, 0, 0, 0, 0, 1}) return err } diff --git a/sys_define_linux.go b/sys_define_linux.go new file mode 100644 index 00000000..951a0d64 --- /dev/null +++ b/sys_define_linux.go @@ -0,0 +1,48 @@ +// Copyright 2021 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package netpoll + +import ( + "syscall" +) + +type iovec = syscall.Iovec +type fdtype = int + +const SEND_RECV_AGAIN = syscall.EAGAIN + +const ( + SO_ERROR = syscall.SO_ERROR +) + +func sysRead(fd fdtype, p []byte) (n int, err error) { + n, err = syscall.Read(fd, p) + return n, err +} + +func sysWrite(fd fdtype, p []byte) (n int, err error) { + n, err = syscall.Write(fd, p) + return n, err +} + +func sysSetNonblock(fd fdtype, is bool) error { + return syscall.SetNonblock(fd, is) +} + +func sysGetsockoptInt(fd fdtype, level int, optname int) (int, error) { + return syscall.GetsockoptInt(fd, level, optname) +} diff --git a/sys_define_windows.go b/sys_define_windows.go new file mode 100644 index 00000000..6f7d2080 --- /dev/null +++ b/sys_define_windows.go @@ -0,0 +1,93 @@ +// Copyright 2021 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package netpoll + +import ( + "os" + "syscall" + "unsafe" +) + +type iovec = syscall.WSABuf +type fdtype = syscall.Handle + +const SEND_RECV_AGAIN = WSAEWOULDBLOCK + +var ws2_32_mod = syscall.NewLazyDLL("ws2_32.dll") + +var recvProc = ws2_32_mod.NewProc("recv") +var sendProc = ws2_32_mod.NewProc("send") +var acceptProc = ws2_32_mod.NewProc("accept") +var ioctlsocketProc = ws2_32_mod.NewProc("ioctlsocket") +var getsockoptProc = ws2_32_mod.NewProc("getsockopt") + +const ( + SO_ERROR = 0x4 + FIONBIO = 0x8004667e + WSAEWOULDBLOCK syscall.Errno = 10035 + WSAEALREADY syscall.Errno = 10037 + WSAEINPROGRESS syscall.Errno = 10036 +) + +func sysRead(fd fdtype, p []byte) (n int, err error) { + rnu, _, err := recvProc.Call(uintptr(fd), uintptr(unsafe.Pointer(&p[0])), uintptr(len(p)), 0) + rn := int32(rnu) + if rn <= 0 { + return int(rn), err + } + return int(rn), nil +} + +func sysWrite(fd fdtype, p []byte) (n int, err error) { + wnu, _, err := sendProc.Call(uintptr(fd), uintptr(unsafe.Pointer(&p[0])), uintptr(len(p)), 0) + wn := int(wnu) + if wn <= 0 { + return wn, err + } + return wn, nil +} + +func sysSetNonblock(fd fdtype, is bool) error { + imode := 0 + if is { + imode = 1 + } + r, _, err := ioctlsocketProc.Call(uintptr(fd), FIONBIO, uintptr(unsafe.Pointer(&imode))) + if r != 0 { + return err + } + return nil +} + +func sysGetsockoptInt(fd fdtype, level int, optname int) (int, error) { + out := 1 + len := unsafe.Sizeof(out) + nerr, _, err := getsockoptProc.Call(uintptr(fd), uintptr(level), uintptr(optname), uintptr(unsafe.Pointer(&out)), uintptr(unsafe.Pointer(&len))) + if nerr != 0 { + return out, os.NewSyscallError("getsockopt", err) + } + return out, nil +} + +func isChanClose(ch chan int) bool { + select { + case _, received := <-ch: + return !received + default: + } + return false +} diff --git a/sys_epoll_linux.go b/sys_epoll_linux.go index da06cb3c..761bf37c 100644 --- a/sys_epoll_linux.go +++ b/sys_epoll_linux.go @@ -29,7 +29,7 @@ type epollevent struct { } // EpollCtl implements epoll_ctl. -func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { +func EpollCtl(epfd fdtype, op int, fd fdtype, event *epollevent) (err error) { _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) if err == syscall.Errno(0) { err = nil @@ -38,7 +38,7 @@ func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { } // EpollWait implements epoll_wait. -func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { +func EpollWait(epfd fdtype, events []epollevent, msec int) (n int, err error) { var r0 uintptr var _p0 = unsafe.Pointer(&events[0]) if msec == 0 { diff --git a/sys_epoll_linux_arm64.go b/sys_epoll_linux_arm64.go index 9d33ad16..8a8478a8 100644 --- a/sys_epoll_linux_arm64.go +++ b/sys_epoll_linux_arm64.go @@ -28,7 +28,7 @@ type epollevent struct { } // EpollCtl implements epoll_ctl. -func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { +func EpollCtl(epfd fdtype, op int, fd fdtype, event *epollevent) (err error) { _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) if err == syscall.Errno(0) { err = nil @@ -37,7 +37,7 @@ func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { } // EpollWait implements epoll_wait. -func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { +func EpollWait(epfd fdtype, events []epollevent, msec int) (n int, err error) { var r0 uintptr var _p0 = unsafe.Pointer(&events[0]) if msec == 0 { diff --git a/sys_epoll_windows.go b/sys_epoll_windows.go new file mode 100644 index 00000000..0fbe78ea --- /dev/null +++ b/sys_epoll_windows.go @@ -0,0 +1,104 @@ +// Copyright 2021 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import ( + "syscall" + "unsafe" +) + +var wsapollProc = ws2_32_mod.NewProc("WSAPoll") + +type epollevent struct { + fd fdtype + events int16 + revents int16 +} + +const ( + EPOLL_CTL_ADD = 1 + EPOLL_CTL_DEL = 2 + EPOLL_CTL_MOD = 3 + LT_MOD = 0 + ET_MOD = 1 +) + +// EpollCtl implements epoll_ctl. +func EpollCtl(fdarray *[]epollevent, op int, fd fdtype, event *epollevent, mode int, fdmode *[]int) (err error) { + e := *event + e.fd = fd + + switch op { + case EPOLL_CTL_ADD: + flag := 0 + for i := 0; i < len(*fdarray); i++ { + if (*fdarray)[i].fd == syscall.InvalidHandle { + (*fdarray)[i].fd = e.fd + (*fdarray)[i].events = e.events + (*fdmode)[i] = mode + flag = 1 + break + } + } + if flag == 0 { + fdarray_tmp := append((*fdarray), e) + *fdarray = fdarray_tmp + fdmode_tmp := append((*fdmode), mode) + *fdmode = fdmode_tmp + } + case EPOLL_CTL_DEL: + for i := 0; i < len(*fdarray); i++ { + if (*fdarray)[i].fd == fd { + (*fdarray)[i].fd = syscall.InvalidHandle + (*fdmode)[i] = 0 + break + } + } + case EPOLL_CTL_MOD: + for i := 0; i < len(*fdarray); i++ { + if (*fdarray)[i].fd == fd { + (*fdarray)[i] = e + (*fdmode)[i] = mode + break + } + } + } + + return nil +} + +// EpollWait implements epoll_wait. +func EpollWait(fdarray []epollevent, events []epollevent, msec int, fdmode []int) (n int, err error) { + if len(fdarray) == 0 { + return 0, nil + } + r, _, err := wsapollProc.Call(uintptr(unsafe.Pointer(&fdarray[0])), uintptr(len(fdarray)), uintptr(msec)) + vaildNum := int(r) + + if vaildNum != 0xffffffff { + j := 0 + eventsLen := len(events) + for i := 0; j < vaildNum && j < eventsLen; i++ { + if fdarray[i].fd != syscall.InvalidHandle && fdarray[i].revents != 0 { + events[j] = fdarray[i] + if fdmode[i] == ET_MOD { + fdarray[i].events &= ^fdarray[i].revents + } + j++ + } + } + } + return vaildNum, err +} diff --git a/sys_exec.go b/sys_exec_linux.go similarity index 85% rename from sys_exec.go rename to sys_exec_linux.go index 511a26fb..f4f6cee4 100644 --- a/sys_exec.go +++ b/sys_exec_linux.go @@ -23,7 +23,7 @@ import ( ) // GetSysFdPairs creates and returns the fds of a pair of sockets. -func GetSysFdPairs() (r, w int) { +func GetSysFdPairs() (r, w fdtype) { fds, _ := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0) return fds[0], fds[1] } @@ -35,7 +35,7 @@ func setTCPNoDelay(fd int, b bool) (err error) { // Wrapper around the socket system call that marks the returned file // descriptor as nonblocking and close-on-exec. -func sysSocket(family, sotype, proto int) (int, error) { +func sysSocket(family, sotype, proto int) (fdtype, error) { // See ../syscall/exec_unix.go for description of ForkLock. syscall.ForkLock.RLock() s, err := syscall.Socket(family, sotype, proto) @@ -46,7 +46,7 @@ func sysSocket(family, sotype, proto int) (int, error) { if err != nil { return -1, os.NewSyscallError("socket", err) } - if err = syscall.SetNonblock(s, true); err != nil { + if err = sysSetNonblock(s, true); err != nil { syscall.Close(s) return -1, os.NewSyscallError("setnonblock", err) } @@ -57,11 +57,11 @@ const barriercap = 32 type barrier struct { bs [][]byte - ivs []syscall.Iovec + ivs []iovec } // writev wraps the writev system call. -func writev(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { +func writev(fd fdtype, bs [][]byte, ivs []iovec) (n int, err error) { iovLen := iovecs(bs, ivs) if iovLen == 0 { return 0, nil @@ -77,7 +77,7 @@ func writev(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { // readv wraps the readv system call. // return 0, nil means EOF. -func readv(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { +func readv(fd fdtype, bs [][]byte, ivs []iovec) (n int, err error) { iovLen := iovecs(bs, ivs) if iovLen == 0 { return 0, nil @@ -94,12 +94,12 @@ func readv(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { // TODO: read from sysconf(_SC_IOV_MAX)? The Linux default is // 1024 and this seems conservative enough for now. Darwin's // UIO_MAXIOV also seems to be 1024. -func iovecs(bs [][]byte, ivs []syscall.Iovec) (iovLen int) { +func iovecs(bs [][]byte, ivs []iovec) (iovLen int) { for i := 0; i < len(bs); i++ { chunk := bs[i] if len(chunk) == 0 { continue - } + } ivs[iovLen].Base = &chunk[0] ivs[iovLen].SetLen(len(chunk)) iovLen++ @@ -107,7 +107,7 @@ func iovecs(bs [][]byte, ivs []syscall.Iovec) (iovLen int) { return iovLen } -func resetIovecs(bs [][]byte, ivs []syscall.Iovec) { +func resetIovecs(bs [][]byte, ivs []iovec) { for i := 0; i < len(bs); i++ { bs[i] = nil } diff --git a/sys_exec_test.go b/sys_exec_test.go index 22f9ddb7..05237c0e 100644 --- a/sys_exec_test.go +++ b/sys_exec_test.go @@ -15,7 +15,6 @@ package netpoll import ( - "syscall" "testing" ) @@ -28,12 +27,12 @@ func TestWritev(t *testing.T) { []byte("second line"), // len=11 []byte("third line"), // len=10 } - barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) + barrier.ivs = make([]iovec, len(barrier.bs)) wn, err := writev(w, barrier.bs, barrier.ivs) MustNil(t, err) Equal(t, wn, 31) var p = make([]byte, 50) - rn, err := syscall.Read(r, p) + rn, err := sysRead(r, p) MustNil(t, err) Equal(t, rn, 31) t.Logf("READ %s", p[:rn]) @@ -46,9 +45,9 @@ func TestReadv(t *testing.T) { []byte("second line"), // len=11 []byte("third line"), // len=10 } - w1, _ := syscall.Write(w, vs[0]) - w2, _ := syscall.Write(w, vs[1]) - w3, _ := syscall.Write(w, vs[2]) + w1, _ := sysWrite(w, vs[0]) + w2, _ := sysWrite(w, vs[1]) + w3, _ := sysWrite(w, vs[2]) Equal(t, w1+w2+w3, 31) var barrier = barrier{} @@ -58,7 +57,7 @@ func TestReadv(t *testing.T) { make([]byte, 11), make([]byte, 10), } - barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) + barrier.ivs = make([]iovec, len(barrier.bs)) rn, err := readv(r, barrier.bs, barrier.ivs) MustNil(t, err) Equal(t, rn, 31) @@ -76,12 +75,12 @@ func TestSendmsg(t *testing.T) { []byte("second line"), // len=11 []byte("third line"), // len=10 } - barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) + barrier.ivs = make([]iovec, len(barrier.bs)) wn, err := sendmsg(w, barrier.bs, barrier.ivs, false) MustNil(t, err) Equal(t, wn, 31) var p = make([]byte, 50) - rn, err := syscall.Read(r, p) + rn, err := sysRead(r, p) MustNil(t, err) Equal(t, rn, 31) t.Logf("READ %s", p[:rn]) @@ -96,7 +95,7 @@ func BenchmarkWrite(b *testing.B) { go func() { buffer := make([]byte, 13) for { - syscall.Read(r, buffer) + sysRead(r, buffer) } }() @@ -110,7 +109,7 @@ func BenchmarkWrite(b *testing.B) { for j := 0; j < size; j++ { n += copy(wmsg[n:], message) } - syscall.Write(w, wmsg) + sysWrite(w, wmsg) } } @@ -121,7 +120,7 @@ func BenchmarkWritev(b *testing.B) { size := 5 var barrier = barrier{} barrier.bs = make([][]byte, size) - barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) + barrier.ivs = make([]iovec, len(barrier.bs)) for i := range barrier.bs { barrier.bs[i] = make([]byte, len(message)) } @@ -129,7 +128,7 @@ func BenchmarkWritev(b *testing.B) { go func() { buffer := make([]byte, 13) for { - syscall.Read(r, buffer) + sysRead(r, buffer) } }() @@ -149,7 +148,7 @@ func BenchmarkSendmsg(b *testing.B) { size := 5 var barrier = barrier{} barrier.bs = make([][]byte, size) - barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) + barrier.ivs = make([]iovec, len(barrier.bs)) for i := range barrier.bs { barrier.bs[i] = make([]byte, len(message)) } @@ -157,7 +156,7 @@ func BenchmarkSendmsg(b *testing.B) { go func() { buffer := make([]byte, 13) for { - syscall.Read(r, buffer) + sysRead(r, buffer) } }() @@ -183,7 +182,7 @@ func BenchmarkRead(b *testing.B) { go func() { for { - syscall.Write(w, wmsg) + sysWrite(w, wmsg) } }() @@ -193,7 +192,7 @@ func BenchmarkRead(b *testing.B) { b.StartTimer() for i := 0; i < b.N; i++ { var buffer = make([]byte, size*len(message)) - syscall.Read(r, buffer) + sysRead(r, buffer) } } @@ -204,7 +203,7 @@ func BenchmarkReadv(b *testing.B) { size := 5 var barrier = barrier{} barrier.bs = make([][]byte, size) - barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) + barrier.ivs = make([]iovec, len(barrier.bs)) for i := range barrier.bs { barrier.bs[i] = make([]byte, len(message)) } diff --git a/sys_exec_windows.go b/sys_exec_windows.go new file mode 100644 index 00000000..f5e02ee3 --- /dev/null +++ b/sys_exec_windows.go @@ -0,0 +1,186 @@ +// Copyright 2021 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import ( + "errors" + "os" + "strconv" + "strings" + "syscall" +) + +func init() { + var startData syscall.WSAData + var version uint32 = 0x0202 + syscall.WSAStartup(version, &startData) +} + +func inet_addr(ipaddr string) [4]byte { + var ( + ips = strings.Split(ipaddr, ".") + ip [4]uint64 + ret [4]byte + ) + for i := 0; i < 4; i++ { + ip[i], _ = strconv.ParseUint(ips[i], 10, 8) + } + for i := 0; i < 4; i++ { + ret[i] = byte(ip[i]) + } + return ret +} + +func socketPair(family int, typ int, protocol int) (fd [2]fdtype, err error) { + var ( + listen_addr syscall.SockaddrInet4 + ) + if protocol != 0 || family != syscall.AF_INET { + return fd, errors.New("protocol or family error") + } + listener, err := syscall.Socket(syscall.AF_INET, typ, 0) + if err != nil { + return fd, err + } + defer syscall.Closesocket(listener) + + listen_addr.Addr = inet_addr("127.0.0.1") + listen_addr.Port = 0 + if err = syscall.Bind(listener, &listen_addr); err != nil { + return fd, err + } + if err = syscall.Listen(listener, 1); err != nil { + return fd, err + } + connector, err := syscall.Socket(syscall.AF_INET, typ, 0) + if err != nil { + return fd, err + } + connector_addr, err := syscall.Getsockname(listener) + if err != nil { + return fd, err + } + if err = syscall.Connect(connector, connector_addr); err != nil { + return fd, err + } + acceptor, _, err := acceptProc.Call(uintptr(listener), 0, 0) + if acceptor == 0 { + return fd, err + } + fd[0] = connector + fd[1] = fdtype(acceptor) + return fd, nil +} + +// GetSysFdPairs creates and returns the fds of a pair of sockets. +func GetSysFdPairs() (r, w fdtype) { + fds, _ := socketPair(syscall.AF_INET, syscall.SOCK_STREAM, 0) + return fds[0], fds[1] +} + +// setTCPNoDelay set the TCP_NODELAY flag on socket +func setTCPNoDelay(fd fdtype, b bool) (err error) { + return syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY, boolint(b)) +} + +// Wrapper around the socket system call that marks the returned file +// descriptor as nonblocking and close-on-exec. +func sysSocket(family, sotype, proto int) (fdtype, error) { + // See ../syscall/exec_unix.go for description of ForkLock. + syscall.ForkLock.RLock() + s, err := syscall.Socket(family, sotype, proto) + if err == nil { + syscall.CloseOnExec(s) + } + syscall.ForkLock.RUnlock() + if err != nil { + return fdtype(0), os.NewSyscallError("socket", err) + } + if err = sysSetNonblock(s, true); err != nil { + syscall.Close(s) + return fdtype(0), os.NewSyscallError("setnonblock", err) + } + return s, nil +} + +const barriercap = 32 + +type barrier struct { + bs [][]byte + ivs []iovec +} + +// writev wraps the writev system call. +func writev(fd fdtype, bs [][]byte, ivs []iovec) (n int, err error) { + iovLen := iovecs(bs, ivs) + if iovLen == 0 { + return 0, nil + } + var ( + sendBytes uint32 + ) + e := syscall.WSASend(fd, &ivs[0], uint32(iovLen), &sendBytes, 0, nil, nil) + resetIovecs(bs, ivs[:iovLen]) + return int(sendBytes), e +} + +// readv wraps the readv system call. +// return 0, nil means EOF. +func readv(fd fdtype, bs [][]byte, ivs []iovec) (n int, err error) { + iovLen := iovecs(bs, ivs) + if iovLen == 0 { + return 0, nil + } + var ( + recvBytes uint32 + flags uint32 + ) + e := syscall.WSARecv(fd, &ivs[0], uint32(iovLen), &recvBytes, &flags, nil, nil) + resetIovecs(bs, ivs[:iovLen]) + return int(recvBytes), e +} + +// TODO: read from sysconf(_SC_IOV_MAX)? The Linux default is +// 1024 and this seems conservative enough for now. Darwin's +// UIO_MAXIOV also seems to be 1024. +func iovecs(bs [][]byte, ivs []iovec) (iovLen int) { + for i := 0; i < len(bs); i++ { + chunk := bs[i] + if len(chunk) == 0 { + continue + } + ivs[iovLen].Buf = &chunk[0] + ivs[iovLen].Len = uint32(len(chunk)) + iovLen++ + } + return iovLen +} + +func resetIovecs(bs [][]byte, ivs []iovec) { + for i := 0; i < len(bs); i++ { + bs[i] = nil + } + for i := 0; i < len(ivs); i++ { + ivs[i].Buf = nil + } +} + +// Boolean to int. +func boolint(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/sys_keepalive_windows.go b/sys_keepalive_windows.go new file mode 100644 index 00000000..4c2895ae --- /dev/null +++ b/sys_keepalive_windows.go @@ -0,0 +1,32 @@ +// Copyright 2021 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import ( + "syscall" + "unsafe" +) + +// just support ipv4 +func SetKeepAlive(fd fdtype, secs int) error { + + // open keep-alive + if err := syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1); err != nil { + return err + } + var keepAliveIn = syscall.TCPKeepalive{OnOff: 1, Time: uint32(secs) * 1000, Interval: uint32(secs) * 1000} + var bytesReturn uint32 = 0 + return syscall.WSAIoctl(fd, syscall.SIO_KEEPALIVE_VALS, (*byte)(unsafe.Pointer(&keepAliveIn)), uint32(unsafe.Sizeof(keepAliveIn)), nil, 0, &bytesReturn, nil, 0) +} diff --git a/sys_sendmsg_bsd.go b/sys_sendmsg_bsd.go index d605ae4d..62c8bb22 100644 --- a/sys_sendmsg_bsd.go +++ b/sys_sendmsg_bsd.go @@ -25,7 +25,7 @@ var supportZeroCopySend bool // sendmsg wraps the sendmsg system call. // Must len(iovs) >= len(vs) -func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, err error) { +func sendmsg(fd int, bs [][]byte, ivs []iovec, zerocopy bool) (n int, err error) { iovLen := iovecs(bs, ivs) if iovLen == 0 { return 0, nil diff --git a/sys_sendmsg_linux.go b/sys_sendmsg_linux.go index 5a849db2..06c83120 100644 --- a/sys_sendmsg_linux.go +++ b/sys_sendmsg_linux.go @@ -31,7 +31,7 @@ import ( // sendmsg wraps the sendmsg system call. // Must len(iovs) >= len(vs) -func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, err error) { +func sendmsg(fd int, bs [][]byte, ivs []iovec, zerocopy bool) (n int, err error) { iovLen := iovecs(bs, ivs) if iovLen == 0 { return 0, nil diff --git a/sys_sendmsg_windows.go b/sys_sendmsg_windows.go new file mode 100644 index 00000000..be985a9e --- /dev/null +++ b/sys_sendmsg_windows.go @@ -0,0 +1,33 @@ +// Copyright 2021 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +//func init() { +// err := syscall.Setrlimit(8, &syscall.Rlimit{ +// Cur: 0xffffffff, +// Max: 0xffffffff, +// }) +// if err != nil { +// panic(err) +// } +//} + +// sendmsg wraps the sendmsg system call. +// Must len(iovs) >= len(vs) +// TODO Implementing Zero Copy on Windows Platform +func sendmsg(fd fdtype, bs [][]byte, ivs []iovec, zerocopy bool) (n int, err error) { + n, err = writev(fd, bs, ivs) + return n, err +} diff --git a/sys_sockopt_windows.go b/sys_sockopt_windows.go new file mode 100644 index 00000000..7d8dc929 --- /dev/null +++ b/sys_sockopt_windows.go @@ -0,0 +1,30 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// This file may have been modified by CloudWeGo authors. (“CloudWeGo Modifications”). +// All CloudWeGo Modifications are Copyright 2021 CloudWeGo authors. + +package netpoll + +import ( + "os" + "syscall" +) + +func setDefaultSockopts(s fdtype, family, sotype int, ipv6only bool) error { + if family == syscall.AF_INET6 && sotype != syscall.SOCK_RAW { + // Allow both IP versions even if the OS default + // is otherwise. Note that some operating systems + // never admit this option. + syscall.SetsockoptInt(s, syscall.IPPROTO_IPV6, syscall.IPV6_V6ONLY, boolint(ipv6only)) + } + + // Allow broadcast. + // On linux platform, setting broadcast on tcp socket will have no effect. + // But on windows platform, it returns an error + if sotype == syscall.SOCK_DGRAM { + return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_BROADCAST, 1)) + } + return nil +} diff --git a/sys_zerocopy_bsd.go b/sys_zerocopy_bsd.go index 916b3659..85117c19 100644 --- a/sys_zerocopy_bsd.go +++ b/sys_zerocopy_bsd.go @@ -18,10 +18,10 @@ package netpoll import "syscall" -func setZeroCopy(fd int) error { +func setZeroCopy(fd fdtype) error { return syscall.EINVAL } -func setBlockZeroCopySend(fd int, sec, usec int64) error { +func setBlockZeroCopySend(fd fdtype, sec, usec int64) error { return syscall.EINVAL } diff --git a/sys_zerocopy_linux.go b/sys_zerocopy_linux.go index 4d463967..8a4e081f 100644 --- a/sys_zerocopy_linux.go +++ b/sys_zerocopy_linux.go @@ -24,11 +24,11 @@ const ( MSG_ZEROCOPY = 0x4000000 ) -func setZeroCopy(fd int) error { +func setZeroCopy(fd fdtype) error { return syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, SO_ZEROCOPY, 1) } -func setBlockZeroCopySend(fd int, sec, usec int64) error { +func setBlockZeroCopySend(fd fdtype, sec, usec int64) error { return syscall.SetsockoptTimeval(fd, syscall.SOL_SOCKET, SO_ZEROBLOCKTIMEO, &syscall.Timeval{ Sec: sec, Usec: usec, diff --git a/sys_zerocopy_windows.go b/sys_zerocopy_windows.go new file mode 100644 index 00000000..8a6f9e17 --- /dev/null +++ b/sys_zerocopy_windows.go @@ -0,0 +1,25 @@ +// Copyright 2021 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import "syscall" + +func setZeroCopy(fd fdtype) error { + return syscall.EINVAL +} + +func setBlockZeroCopySend(fd fdtype, sec, usec int64) error { + return syscall.EINVAL +}