Skip to content

Commit

Permalink
feat: implement gnet on Windows
Browse files Browse the repository at this point in the history
Fixes #339
  • Loading branch information
panjf2000 committed May 10, 2023
1 parent af2aae9 commit e13a05c
Show file tree
Hide file tree
Showing 26 changed files with 1,632 additions and 267 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ jobs:
os:
- ubuntu-latest
- macos-latest
#- windows-latest
name: Run golangci-lint
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -42,14 +43,17 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2
args: -v -E gofumpt -E gocritic -E misspell -E revive -E godot
args: -v -E gofumpt -E gocritic -E misspell -E revive -E godot --timeout 5m
test:
needs: lint
strategy:
fail-fast: false
matrix:
go: [1.17, 1.18, 1.19]
os: [ubuntu-latest, macos-latest]
os:
- ubuntu-latest
- macos-latest
- windows-latest
name: Go ${{ matrix.go }} @ ${{ matrix.os }}
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -61,7 +65,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: '^1.17'
go-version: ${{ matrix.go }}

- name: Print Go environment
id: go-env
Expand Down
File renamed without changes.
71 changes: 71 additions & 0 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2023 Andy Pan.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package gnet

import (
"net"
"runtime"
)

func (eng *engine) listen() (err error) {
if eng.opts.LockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer func() { eng.shutdown(err) }()

var buffer [0x10000]byte
for {
if eng.ln.pc != nil {
// Read data from UDP socket.
n, addr, e := eng.ln.pc.ReadFrom(buffer[:])
if e != nil {
err = e
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
return
}

el := eng.lb.next(addr)
c := newUDPConn(el, eng.ln.addr, addr)
el.ch <- packUDPConn(c, buffer[:n])
} else {
// Accept TCP socket.
tc, e := eng.ln.ln.Accept()
if e != nil {
err = e
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
return
}
el := eng.lb.next(tc.RemoteAddr())
c := newTCPConn(tc, el)
el.ch <- c
go func(c *conn, tc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
n, err := tc.Read(buffer[:])
if err != nil {
el.ch <- &netErr{c, err}
return
}
el.ch <- packTCPConn(c, buffer[:n])
}
}(c, tc, el)
}
}
}
8 changes: 4 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build linux || freebsd || dragonfly || darwin
// +build linux freebsd dragonfly darwin
//go:build linux || freebsd || dragonfly || darwin || windows
// +build linux freebsd dragonfly darwin windows

package gnet

Expand Down Expand Up @@ -264,6 +264,7 @@ func (s *testClientServer) OnTraffic(c Conn) (action Action) {
}

func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
delay = time.Second / 5
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
atomic.AddInt32(&s.clientActive, 1)
Expand All @@ -278,7 +279,6 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
action = Shutdown
return
}
delay = time.Second / 5
return
}

Expand Down Expand Up @@ -327,7 +327,7 @@ func startGnetClient(t *testing.T, cli *Client, ev *clientEvents, network, addr
)
if netDial {
var netConn net.Conn
netConn, err = net.Dial(network, addr)
netConn, err = NetDial(network, addr)
require.NoError(t, err)
c, err = cli.Enroll(netConn)
} else {
Expand Down
File renamed without changes.
220 changes: 220 additions & 0 deletions client_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Copyright (c) 2023 Andy Pan.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package gnet

import (
"context"
"net"
"os"
"path/filepath"
"sync"

"golang.org/x/sync/errgroup"

errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)

type Client struct {
opts *Options
el *eventloop
logFlush func() error
}

func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
options := loadOptions(opts...)
cli = &Client{opts: options}
var logger logging.Logger
if options.LogPath != "" {
if logger, cli.logFlush, err = logging.CreateLoggerAsLocalFile(options.LogPath, options.LogLevel); err != nil {
return
}
} else {
logger = logging.GetDefaultLogger()
}
if options.Logger == nil {
options.Logger = logger
}

eg := errgroup.Group{}

shutdownCtx, cancel := context.WithCancel(context.Background())
eng := &engine{
ln: &listener{},
opts: options,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&eg, shutdownCtx, cancel},
eventHandler: eh,
}
cli.el = &eventloop{
ch: make(chan interface{}, 1024),
eng: eng,
connections: make(map[*conn]struct{}),
eventHandler: eh,
}
return
}

func (cli *Client) Start() error {
cli.el.eventHandler.OnBoot(Engine{})
cli.el.eng.workerPool.Go(cli.el.run)
if cli.opts.Ticker {
cli.el.eng.ticker.ctx, cli.el.eng.ticker.cancel = context.WithCancel(context.Background())
cli.el.eng.workerPool.Go(func() error {
cli.el.ticker(cli.el.eng.ticker.ctx)
return nil
})
}
return nil
}

func (cli *Client) Stop() (err error) {
cli.el.ch <- errorx.ErrEngineShutdown
if cli.opts.Ticker {
cli.el.eng.ticker.cancel()
}
_ = cli.el.eng.workerPool.Wait()
cli.el.eventHandler.OnShutdown(Engine{})
if cli.logFlush != nil {
err = cli.logFlush()
}
logging.Cleanup()
return
}

var (
mu sync.RWMutex
unixAddrDirs = make(map[string]string)
)

// unixAddr uses os.MkdirTemp to get a name that is unique.
func unixAddr(addr string) string {
// Pass an empty pattern to get a directory name that is as short as possible.
// If we end up with a name longer than the sun_path field in the sockaddr_un
// struct, we won't be able to make the syscall to open the socket.
d, err := os.MkdirTemp("", "")
if err != nil {
panic(err)
}

tmpAddr := filepath.Join(d, addr)
mu.Lock()
unixAddrDirs[tmpAddr] = d
mu.Unlock()

return tmpAddr
}

func (cli *Client) Dial(network, addr string) (Conn, error) {
var (
c net.Conn
err error
)
if network == "unix" {
laddr, _ := net.ResolveUnixAddr(network, unixAddr(addr))
raddr, _ := net.ResolveUnixAddr(network, addr)
c, err = net.DialUnix(network, laddr, raddr)
if err != nil {
return nil, err
}
} else {
c, err = net.Dial(network, addr)
if err != nil {
return nil, err
}
}
return cli.Enroll(c)
}

func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) {
switch v := nc.(type) {
case *net.TCPConn:
if cli.opts.TCPNoDelay == TCPNoDelay {
if err = v.SetNoDelay(true); err != nil {
return
}
}
if cli.opts.TCPKeepAlive > 0 {
if err = v.SetKeepAlive(true); err != nil {
return
}
if err = v.SetKeepAlivePeriod(cli.opts.TCPKeepAlive); err != nil {
return
}
}

c := newTCPConn(nc, cli.el)
cli.el.ch <- c
go func(c *conn, tc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
n, err := tc.Read(buffer[:])
if err != nil {
el.ch <- &netErr{c, err}
return
}
el.ch <- packTCPConn(c, buffer[:n])
}
}(c, nc, cli.el)
gc = c
case *net.UnixConn:
c := newTCPConn(nc, cli.el)
cli.el.ch <- c
go func(c *conn, uc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
n, err := uc.Read(buffer[:])
if err != nil {
el.ch <- &netErr{c, err}
mu.RLock()
tmpDir := unixAddrDirs[uc.LocalAddr().String()]
mu.RUnlock()
if err := os.RemoveAll(tmpDir); err != nil {
logging.Errorf("failed to remove temporary directory for unix local address: %v", err)
}
return
}
el.ch <- packTCPConn(c, buffer[:n])
}
}(c, nc, cli.el)
gc = c
case *net.UDPConn:
c := newUDPConn(cli.el, nc.LocalAddr(), nc.RemoteAddr())
c.rawConn = nc
go func(uc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
n, err := uc.Read(buffer[:])
if err != nil {
return
}
c := newUDPConn(cli.el, uc.LocalAddr(), uc.RemoteAddr())
c.rawConn = uc
el.ch <- packUDPConn(c, buffer[:n])
}
}(nc, cli.el)
gc = c
default:
return nil, errorx.ErrUnsupportedProtocol
}

return
}
10 changes: 9 additions & 1 deletion connection.go → connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,15 @@ func (c *conn) Dup() (fd int, err error) { fd, _, err = netpoll.Dup(c.fd);
func (c *conn) SetReadBuffer(bytes int) error { return socket.SetRecvBuffer(c.fd, bytes) }
func (c *conn) SetWriteBuffer(bytes int) error { return socket.SetSendBuffer(c.fd, bytes) }
func (c *conn) SetLinger(sec int) error { return socket.SetLinger(c.fd, sec) }
func (c *conn) SetNoDelay(noDelay bool) error { return socket.SetNoDelay(c.fd, bool2int(noDelay)) }
func (c *conn) SetNoDelay(noDelay bool) error {
return socket.SetNoDelay(c.fd, func(b bool) int {
if b {
return 1
}
return 0
}(noDelay))
}

func (c *conn) SetKeepAlivePeriod(d time.Duration) error {
return socket.SetKeepAlivePeriod(c.fd, int(d.Seconds()))
}
Expand Down
Loading

0 comments on commit e13a05c

Please sign in to comment.