From fd60331b649478b169f1d0f876f4c011f5c0c385 Mon Sep 17 00:00:00 2001 From: henrylee2cn Date: Wed, 15 Apr 2020 19:27:49 +0800 Subject: [PATCH] chore(evio): update Change-Id: I8bf144955406810b25141e277996011da154e4af --- .gitignore | 2 + go.mod | 1 + mixer/evio/bench/{tp_client.go => client.go} | 0 .../bench/{tp_evio_server.go => server.go} | 0 mixer/evio/evio.go | 72 +-- mixer/evio/evio/LICENSE | 20 - mixer/evio/evio/README.md | 168 ------ mixer/evio/evio/evio.go | 268 --------- mixer/evio/evio/evio_other.go | 41 -- mixer/evio/evio/evio_std.go | 459 --------------- mixer/evio/evio/evio_test.go | 478 --------------- mixer/evio/evio/evio_unix.go | 544 ------------------ mixer/evio/evio/internal/internal_bsd.go | 125 ---- mixer/evio/evio/internal/internal_darwin.go | 20 - mixer/evio/evio/internal/internal_linux.go | 129 ----- mixer/evio/evio/internal/internal_openbsd.go | 11 - mixer/evio/evio/internal/internal_unix.go | 19 - mixer/evio/evio/internal/notequeue.go | 53 -- mixer/evio/evio/internal/socktoaddr.go | 39 -- mixer/evio/evio/vendor/.stub | 1 - .../github.com/kavu/go_reuseport/LICENSE | 21 - .../github.com/kavu/go_reuseport/Makefile | 9 - .../github.com/kavu/go_reuseport/README.md | 48 -- .../github.com/kavu/go_reuseport/reuseport.go | 50 -- .../kavu/go_reuseport/reuseport_bsd.go | 44 -- .../kavu/go_reuseport/reuseport_linux.go | 52 -- .../kavu/go_reuseport/reuseport_windows.go | 19 - .../github.com/kavu/go_reuseport/tcp.go | 143 ----- .../github.com/kavu/go_reuseport/tcp_test.go | 218 ------- .../github.com/kavu/go_reuseport/test.bash | 22 - .../github.com/kavu/go_reuseport/udp.go | 139 ----- .../github.com/kavu/go_reuseport/udp_test.go | 99 ---- mixer/evio/evio_test.go | 5 +- 33 files changed, 44 insertions(+), 3275 deletions(-) rename mixer/evio/bench/{tp_client.go => client.go} (100%) rename mixer/evio/bench/{tp_evio_server.go => server.go} (100%) delete mode 100644 mixer/evio/evio/LICENSE delete mode 100644 mixer/evio/evio/README.md delete mode 100644 mixer/evio/evio/evio.go delete mode 100644 mixer/evio/evio/evio_other.go delete mode 100644 mixer/evio/evio/evio_std.go delete mode 100644 mixer/evio/evio/evio_test.go delete mode 100644 mixer/evio/evio/evio_unix.go delete mode 100644 mixer/evio/evio/internal/internal_bsd.go delete mode 100644 mixer/evio/evio/internal/internal_darwin.go delete mode 100644 mixer/evio/evio/internal/internal_linux.go delete mode 100644 mixer/evio/evio/internal/internal_openbsd.go delete mode 100644 mixer/evio/evio/internal/internal_unix.go delete mode 100644 mixer/evio/evio/internal/notequeue.go delete mode 100644 mixer/evio/evio/internal/socktoaddr.go delete mode 100644 mixer/evio/evio/vendor/.stub delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/LICENSE delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/Makefile delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/README.md delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport.go delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_bsd.go delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_linux.go delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_windows.go delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/tcp.go delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/tcp_test.go delete mode 100755 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/test.bash delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/udp.go delete mode 100644 mixer/evio/evio/vendor/github.com/kavu/go_reuseport/udp_test.go diff --git a/.gitignore b/.gitignore index f893de20..e86950d7 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,5 @@ examples/ab/frame_client_ab _* .DS_Store go.sum +.idea +.vscode diff --git a/go.mod b/go.mod index c3aea1e9..80f72a66 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/lucas-clemente/quic-go v0.13.1 github.com/montanaflynn/stats v0.5.0 github.com/stretchr/testify v1.4.0 + github.com/tidwall/evio v1.0.7 github.com/tidwall/gjson v1.2.2 golang.org/x/sys v0.0.0-20190904154756-749cb33beabd ) diff --git a/mixer/evio/bench/tp_client.go b/mixer/evio/bench/client.go similarity index 100% rename from mixer/evio/bench/tp_client.go rename to mixer/evio/bench/client.go diff --git a/mixer/evio/bench/tp_evio_server.go b/mixer/evio/bench/server.go similarity index 100% rename from mixer/evio/bench/tp_evio_server.go rename to mixer/evio/bench/server.go diff --git a/mixer/evio/evio.go b/mixer/evio/evio.go index 9c387660..c4e8d295 100644 --- a/mixer/evio/evio.go +++ b/mixer/evio/evio.go @@ -1,7 +1,6 @@ package evio import ( - "bytes" "crypto/tls" "errors" "fmt" @@ -10,9 +9,9 @@ import ( "sync" "time" + "github.com/tidwall/evio" + "github.com/henrylee2cn/erpc/v6" - "github.com/henrylee2cn/erpc/v6/mixer/evio/evio" - "github.com/henrylee2cn/erpc/v6/utils" ) // NewClient creates a evio client, equivalent to erpc.NewPeer. @@ -28,6 +27,7 @@ type Server struct { addr string readBufferSize int writeBufferSize int + protoFuncs []erpc.ProtoFunc } // NewServer creates a evio server. @@ -41,10 +41,6 @@ func NewServer(loops int, cfg erpc.PeerConfig, globalLeftPlugin ...erpc.Plugin) } srv.events.NumLoops = loops - // const delayDuration = time.Millisecond * 50 - // srv.events.Tick = func() (delay time.Duration, action evio.Action) { - // return delayDuration, action - // } srv.events.Serving = func(s evio.Server) (action evio.Action) { erpc.Printf("listen and serve (%s)", srv.addr) @@ -73,16 +69,16 @@ func NewServer(loops int, cfg erpc.PeerConfig, globalLeftPlugin ...erpc.Plugin) } srv.events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) { - // defer func() { - // if p := recover(); p != nil { - // erpc.Errorf("[evio] Events.Data: %v", p) - // } - // }() con := c.Context().(*conn) if in != nil { - buf := utils.AcquireByteBuffer() - buf.Write(in) - con.in <- buf + data := <-con.in + n := copy(data.b, in) + in = in[n:] + dst := make([]byte, len(in)) + copy(dst, in) + con.remainingIn = dst + data.n <- n + close(data.n) } select { case out = <-con.out: @@ -94,7 +90,6 @@ func NewServer(loops int, cfg erpc.PeerConfig, globalLeftPlugin ...erpc.Plugin) select { case <-con.closeSignal: action = evio.Close - close(con.in) default: } return @@ -106,7 +101,7 @@ func NewServer(loops int, cfg erpc.PeerConfig, globalLeftPlugin ...erpc.Plugin) func (srv *Server) ListenAndServe(protoFunc ...erpc.ProtoFunc) error { switch srv.cfg.Network { default: - return errors.New("Unsupport evio network, refer to the following: tcp, tcp4, tcp6, unix") + return errors.New("unsupport evio network, refer to the following: tcp, tcp4, tcp6, unix") case "tcp", "tcp4", "tcp6", "unix": } var isDefault bool @@ -118,6 +113,7 @@ func (srv *Server) ListenAndServe(protoFunc ...erpc.ProtoFunc) error { if isDefault { srv.writeBufferSize = 4096 } + srv.protoFuncs = protoFunc return evio.Serve(srv.events, srv.addr) } @@ -126,29 +122,29 @@ func (srv *Server) serveConn(evioConn evio.Conn) (stat *erpc.Status) { conn: evioConn, events: srv.events, closeSignal: make(chan struct{}), - inBuf: bytes.NewBuffer(make([]byte, 0, srv.readBufferSize)), - in: make(chan *utils.ByteBuffer, srv.readBufferSize/128), + remainingIn: []byte{}, + in: make(chan readData, 1), out: make(chan []byte, srv.writeBufferSize/128), } if srv.TLSConfig() != nil { - c.sess, stat = srv.Peer.ServeConn(tls.Server(c, srv.TLSConfig())) + c.sess, stat = srv.Peer.ServeConn(tls.Server(c, srv.TLSConfig()), srv.protoFuncs...) } else { - c.sess, stat = srv.Peer.ServeConn(c) + c.sess, stat = srv.Peer.ServeConn(c, srv.protoFuncs...) } // c.sess.Swap().Store(wakeWriteKey, c) evioConn.SetContext(c) return stat } -// conn is a evio(evio) network connection. +// conn is a evio network connection. // // Multiple goroutines may invoke methods on a Conn simultaneously. type conn struct { conn evio.Conn events evio.Events sess erpc.Session - inBuf *bytes.Buffer - in chan *utils.ByteBuffer + remainingIn []byte + in chan readData inLock sync.Mutex out chan []byte closeSignal chan struct{} @@ -156,22 +152,30 @@ type conn struct { var _ net.Conn = new(conn) +type readData struct { + b []byte + n chan int +} + // Read reads data from the connection. // Read can be made to time out and return an Error with Timeout() == true // after a fixed time limit; see SetDeadline and SetReadDeadline. func (c *conn) Read(b []byte) (n int, err error) { - n, err = c.inBuf.Read(b) - if err == nil { - return n, nil - } - buf, ok := <-c.in - if !ok { + select { + case <-c.closeSignal: + defer func() { recover() }() + close(c.in) return n, io.EOF + default: + } + n = copy(b, c.remainingIn) + c.remainingIn = c.remainingIn[n:] + if n > 0 || len(b) == 0 { + return n, nil } - n2 := copy(b[n:], buf.B) - c.inBuf.Write(buf.B[n2:]) - utils.ReleaseByteBuffer(buf) - n += n2 + ch := make(chan int) + c.in <- readData{b, ch} + n = <-ch return n, nil } diff --git a/mixer/evio/evio/LICENSE b/mixer/evio/evio/LICENSE deleted file mode 100644 index 92a9728f..00000000 --- a/mixer/evio/evio/LICENSE +++ /dev/null @@ -1,20 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2017 Joshua J Baker - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software is furnished to do so, -subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/mixer/evio/evio/README.md b/mixer/evio/evio/README.md deleted file mode 100644 index bc5f9678..00000000 --- a/mixer/evio/evio/README.md +++ /dev/null @@ -1,168 +0,0 @@ -

-evio -
-Build Status -GoDoc -

- -`evio` is an event loop networking framework that is fast and small. It makes direct [epoll](https://en.wikipedia.org/wiki/Epoll) and [kqueue](https://en.wikipedia.org/wiki/Kqueue) syscalls rather than using the standard Go [net](https://golang.org/pkg/net/) package, and works in a similar manner as [libuv](https://github.com/libuv/libuv) and [libevent](https://github.com/libevent/libevent). - -The goal of this project is to create a server framework for Go that performs on par with [Redis](http://redis.io) and [Haproxy](http://www.haproxy.org) for packet handling. It was built to be the foundation for [Tile38](https://github.com/tidwall/tile38) and a future L7 proxy for Go. - -*Please note: Evio should not be considered as a drop-in replacement for the standard Go net or net/http packages.* - -## Features - -- [Fast](#performance) single-threaded or [multithreaded](#multithreaded) event loop -- Built-in [load balancing](#load-balancing) options -- Simple API -- Low memory usage -- Supports tcp, [udp](#udp), and unix sockets -- Allows [multiple network binding](#multiple-addresses) on the same event loop -- Flexible [ticker](#ticker) event -- Fallback for non-epoll/kqueue operating systems by simulating events with the [net](https://golang.org/pkg/net/) package -- [SO_REUSEPORT](#so_reuseport) socket option - -## Getting Started - -### Installing - -To start using evio, install Go and run `go get`: - -```sh -$ go get -u github.com/tidwall/evio -``` - -This will retrieve the library. - -### Usage - -Starting a server is easy with `evio`. Just set up your events and pass them to the `Serve` function along with the binding address(es). Each connections is represented as an `evio.Conn` object that is passed to various events to differentiate the clients. At any point you can close a client or shutdown the server by return a `Close` or `Shutdown` action from an event. - -Example echo server that binds to port 5000: - -```go -package main - -import "github.com/tidwall/evio" - -func main() { - var events evio.Events - events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) { - out = in - return - } - if err := evio.Serve(events, "tcp://localhost:5000"); err != nil { - panic(err.Error()) - } -} -``` - -Here the only event being used is `Data`, which fires when the server receives input data from a client. -The exact same input data is then passed through the output return value, which is then sent back to the client. - -Connect to the echo server: - -```sh -$ telnet localhost 5000 -``` - -### Events - -The event type has a bunch of handy events: - -- `Serving` fires when the server is ready to accept new connections. -- `Opened` fires when a connection has opened. -- `Closed` fires when a connection has closed. -- `Detach` fires when a connection has been detached using the `Detach` return action. -- `Data` fires when the server receives new data from a connection. -- `Tick` fires immediately after the server starts and will fire again after a specified interval. - -### Multiple addresses - -A server can bind to multiple addresses and share the same event loop. - -```go -evio.Serve(events, "tcp://192.168.0.10:5000", "unix://socket") -``` - -### Ticker - -The `Tick` event fires ticks at a specified interval. -The first tick fires immediately after the `Serving` events. - -```go -events.Tick = func() (delay time.Duration, action Action){ - log.Printf("tick") - delay = time.Second - return -} -``` - -## UDP - -The `Serve` function can bind to UDP addresses. - -- All incoming and outgoing packets are not buffered and sent individually. -- The `Opened` and `Closed` events are not availble for UDP sockets, only the `Data` event. - -## Multithreaded - -The `events.NumLoops` options sets the number of loops to use for the server. -A value greater than 1 will effectively make the server multithreaded for multi-core machines. -Which means you must take care when synchonizing memory between event callbacks. -Setting to 0 or 1 will run the server as single-threaded. -Setting to -1 will automatically assign this value equal to `runtime.NumProcs()`. - -## Load balancing - -The `events.LoadBalance` options sets the load balancing method. -Load balancing is always a best effort to attempt to distribute the incoming connections between multiple loops. -This option is only available when `events.NumLoops` is set. - -- `Random` requests that connections are randomly distributed. -- `RoundRobin` requests that connections are distributed to a loop in a round-robin fashion. -- `LeastConnections` assigns the next accepted connection to the loop with the least number of active connections. - -## SO_REUSEPORT - -Servers can utilize the [SO_REUSEPORT](https://lwn.net/Articles/542629/) option which allows multiple sockets on the same host to bind to the same port. - -Just provide `reuseport=true` to an address: - -```go -evio.Serve(events, "tcp://0.0.0.0:1234?reuseport=true")) -``` - -## More examples - -Please check out the [examples](examples) subdirectory for a simplified [redis](examples/redis-server/main.go) clone, an [echo](examples/echo-server/main.go) server, and a very basic [http](examples/http-server/main.go) server. - -To run an example: - -```sh -$ go run examples/http-server/main.go -$ go run examples/redis-server/main.go -$ go run examples/echo-server/main.go -``` - -## Performance - -### Benchmarks - -These benchmarks were run on an ec2 c4.xlarge instance in single-threaded mode (GOMAXPROC=1) over Ipv4 localhost. -Check out [benchmarks](benchmarks) for more info. - -echo benchmarkhttp benchmarkredis 1 benchmarkredis 8 benchmark - - -## Contact - -Josh Baker [@tidwall](http://twitter.com/tidwall) - -## License - -`evio` source code is available under the MIT [License](/LICENSE). - diff --git a/mixer/evio/evio/evio.go b/mixer/evio/evio/evio.go deleted file mode 100644 index 8e31287d..00000000 --- a/mixer/evio/evio/evio.go +++ /dev/null @@ -1,268 +0,0 @@ -// Copyright 2018 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package evio - -import ( - "io" - "net" - "os" - "strings" - "time" -) - -// Action is an action that occurs after the completion of an event. -type Action int - -const ( - // None indicates that no action should occur following an event. - None Action = iota - // Detach detaches a connection. Not available for UDP connections. - Detach - // Close closes the connection. - Close - // Shutdown shutdowns the server. - Shutdown -) - -// Options are set when the client opens. -type Options struct { - // TCPKeepAlive (SO_KEEPALIVE) socket option. - TCPKeepAlive time.Duration - // ReuseInputBuffer will forces the connection to share and reuse the - // same input packet buffer with all other connections that also use - // this option. - // Default value is false, which means that all input data which is - // passed to the Data event will be a uniquely copied []byte slice. - ReuseInputBuffer bool -} - -// Server represents a server context which provides information about the -// running server and has control functions for managing state. -type Server struct { - // The addrs parameter is an array of listening addresses that align - // with the addr strings passed to the Serve function. - Addrs []net.Addr - // NumLoops is the number of loops that the server is using. - NumLoops int -} - -// Conn is an evio connection. -type Conn interface { - // Context returns a user-defined context. - Context() interface{} - // SetContext sets a user-defined context. - SetContext(interface{}) - // AddrIndex is the index of server address that was passed to the Serve call. - AddrIndex() int - // LocalAddr is the connection's local socket address. - LocalAddr() net.Addr - // RemoteAddr is the connection's remote peer address. - RemoteAddr() net.Addr - // Wake triggers a Data event for this connection. - Wake() -} - -// LoadBalance sets the load balancing method. -type LoadBalance int - -const ( - // Random requests that connections are randomly distributed. - Random LoadBalance = iota - // RoundRobin requests that connections are distributed to a loop in a - // round-robin fashion. - RoundRobin - // LeastConnections assigns the next accepted connection to the loop with - // the least number of active connections. - LeastConnections -) - -// Events represents the server events for the Serve call. -// Each event has an Action return value that is used manage the state -// of the connection and server. -type Events struct { - // NumLoops sets the number of loops to use for the server. Setting this - // to a value greater than 1 will effectively make the server - // multithreaded for multi-core machines. Which means you must take care - // with synchonizing memory between all event callbacks. Setting to 0 or 1 - // will run the server single-threaded. Setting to -1 will automatically - // assign this value equal to runtime.NumProcs(). - NumLoops int - // LoadBalance sets the load balancing method. Load balancing is always a - // best effort to attempt to distribute the incoming connections between - // multiple loops. This option is only works when NumLoops is set. - LoadBalance LoadBalance - // Serving fires when the server can accept connections. The server - // parameter has information and various utilities. - Serving func(server Server) (action Action) - // Opened fires when a new connection has opened. - // The info parameter has information about the connection such as - // it's local and remote address. - // Use the out return value to write data to the connection. - // The opts return value is used to set connection options. - Opened func(c Conn) (out []byte, opts Options, action Action) - // Closed fires when a connection has closed. - // The err parameter is the last known connection error. - Closed func(c Conn, err error) (action Action) - // Detached fires when a connection has been previously detached. - // Once detached it's up to the receiver of this event to manage the - // state of the connection. The Closed event will not be called for - // this connection. - // The conn parameter is a ReadWriteCloser that represents the - // underlying socket connection. It can be freely used in goroutines - // and should be closed when it's no longer needed. - Detached func(c Conn, rwc io.ReadWriteCloser) (action Action) - // PreWrite fires just before any data is written to any client socket. - PreWrite func() - // Data fires when a connection sends the server data. - // The in parameter is the incoming data. - // Use the out return value to write data to the connection. - Data func(c Conn, in []byte) (out []byte, action Action) - // Tick fires immediately after the server starts and will fire again - // following the duration specified by the delay return value. - Tick func() (delay time.Duration, action Action) -} - -// Serve starts handling events for the specified addresses. -// -// Addresses should use a scheme prefix and be formatted -// like `tcp://192.168.0.10:9851` or `unix://socket`. -// Valid network schemes: -// tcp - bind to both IPv4 and IPv6 -// tcp4 - IPv4 -// tcp6 - IPv6 -// udp - bind to both IPv4 and IPv6 -// udp4 - IPv4 -// udp6 - IPv6 -// unix - Unix Domain Socket -// -// The "tcp" network scheme is assumed when one is not specified. -func Serve(events Events, addr ...string) error { - var lns []*listener - defer func() { - for _, ln := range lns { - ln.close() - } - }() - var stdlib bool - for _, addr := range addr { - var ln listener - var stdlibt bool - ln.network, ln.addr, ln.opts, stdlibt = parseAddr(addr) - if stdlibt { - stdlib = true - } - if ln.network == "unix" { - os.RemoveAll(ln.addr) - } - var err error - if ln.network == "udp" { - if ln.opts.reusePort { - ln.pconn, err = reuseportListenPacket(ln.network, ln.addr) - } else { - ln.pconn, err = net.ListenPacket(ln.network, ln.addr) - } - } else { - if ln.opts.reusePort { - ln.ln, err = reuseportListen(ln.network, ln.addr) - } else { - ln.ln, err = net.Listen(ln.network, ln.addr) - } - } - if err != nil { - return err - } - if ln.pconn != nil { - ln.lnaddr = ln.pconn.LocalAddr() - } else { - ln.lnaddr = ln.ln.Addr() - } - if !stdlib { - if err := ln.system(); err != nil { - return err - } - } - lns = append(lns, &ln) - } - if stdlib { - return stdserve(events, lns) - } - return serve(events, lns) -} - -// InputStream is a helper type for managing input streams from inside -// the Data event. -type InputStream struct{ b []byte } - -// Begin accepts a new packet and returns a working sequence of -// unprocessed bytes. -func (is *InputStream) Begin(packet []byte) (data []byte) { - data = packet - if len(is.b) > 0 { - is.b = append(is.b, data...) - data = is.b - } - return data -} - -// End shifts the stream to match the unprocessed data. -func (is *InputStream) End(data []byte) { - if len(data) > 0 { - if len(data) != len(is.b) { - is.b = append(is.b[:0], data...) - } - } else if len(is.b) > 0 { - is.b = is.b[:0] - } -} - -type listener struct { - ln net.Listener - lnaddr net.Addr - pconn net.PacketConn - opts addrOpts - f *os.File - fd int - network string - addr string -} - -type addrOpts struct { - reusePort bool -} - -func parseAddr(addr string) (network, address string, opts addrOpts, stdlib bool) { - network = "tcp" - address = addr - opts.reusePort = false - if strings.Contains(address, "://") { - network = strings.Split(address, "://")[0] - address = strings.Split(address, "://")[1] - } - if strings.HasSuffix(network, "-net") { - stdlib = true - network = network[:len(network)-4] - } - q := strings.Index(address, "?") - if q != -1 { - for _, part := range strings.Split(address[q+1:], "&") { - kv := strings.Split(part, "=") - if len(kv) == 2 { - switch kv[0] { - case "reuseport": - if len(kv[1]) != 0 { - switch kv[1][0] { - default: - opts.reusePort = kv[1][0] >= '1' && kv[1][0] <= '9' - case 'T', 't', 'Y', 'y': - opts.reusePort = true - } - } - } - } - } - address = address[:q] - } - return -} diff --git a/mixer/evio/evio/evio_other.go b/mixer/evio/evio/evio_other.go deleted file mode 100644 index 31b92313..00000000 --- a/mixer/evio/evio/evio_other.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2018 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -// +build !darwin,!netbsd,!freebsd,!openbsd,!dragonfly,!linux - -package evio - -import ( - "errors" - "net" - "os" -) - -func (ln *listener) close() { - if ln.ln != nil { - ln.ln.Close() - } - if ln.pconn != nil { - ln.pconn.Close() - } - if ln.network == "unix" { - os.RemoveAll(ln.addr) - } -} - -func (ln *listener) system() error { - return nil -} - -func serve(events Events, listeners []*listener) error { - return stdserve(events, listeners) -} - -func reuseportListenPacket(proto, addr string) (l net.PacketConn, err error) { - return nil, errors.New("reuseport is not available") -} - -func reuseportListen(proto, addr string) (l net.Listener, err error) { - return nil, errors.New("reuseport is not available") -} diff --git a/mixer/evio/evio/evio_std.go b/mixer/evio/evio/evio_std.go deleted file mode 100644 index 4eb3e27d..00000000 --- a/mixer/evio/evio/evio_std.go +++ /dev/null @@ -1,459 +0,0 @@ -// Copyright 2018 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package evio - -import ( - "errors" - "io" - "net" - "runtime" - "sync" - "sync/atomic" - "time" -) - -var errClosing = errors.New("closing") -var errCloseConns = errors.New("close conns") - -type stdserver struct { - events Events // user events - loops []*stdloop // all the loops - lns []*listener // all the listeners - loopwg sync.WaitGroup // loop close waitgroup - lnwg sync.WaitGroup // listener close waitgroup - cond *sync.Cond // shutdown signaler - serr error // signal error - accepted uintptr // accept counter -} - -type stdudpconn struct { - addrIndex int - localAddr net.Addr - remoteAddr net.Addr - in []byte -} - -func (c *stdudpconn) Context() interface{} { return nil } -func (c *stdudpconn) SetContext(ctx interface{}) {} -func (c *stdudpconn) AddrIndex() int { return c.addrIndex } -func (c *stdudpconn) LocalAddr() net.Addr { return c.localAddr } -func (c *stdudpconn) RemoteAddr() net.Addr { return c.remoteAddr } -func (c *stdudpconn) Wake() {} - -type stdloop struct { - idx int // loop index - ch chan interface{} // command channel - conns map[*stdconn]bool // track all the conns bound to this loop -} - -type stdconn struct { - addrIndex int - localAddr net.Addr - remoteAddr net.Addr - conn net.Conn // original connection - ctx interface{} // user-defined context - loop *stdloop // owner loop - lnidx int // index of listener - donein []byte // extra data for done connection - done int32 // 0: attached, 1: closed, 2: detached -} - -type wakeReq struct { - c *stdconn -} - -func (c *stdconn) Context() interface{} { return c.ctx } -func (c *stdconn) SetContext(ctx interface{}) { c.ctx = ctx } -func (c *stdconn) AddrIndex() int { return c.addrIndex } -func (c *stdconn) LocalAddr() net.Addr { return c.localAddr } -func (c *stdconn) RemoteAddr() net.Addr { return c.remoteAddr } -func (c *stdconn) Wake() { c.loop.ch <- wakeReq{c} } - -type stdin struct { - c *stdconn - in []byte -} - -type stderr struct { - c *stdconn - err error -} - -// waitForShutdown waits for a signal to shutdown -func (s *stdserver) waitForShutdown() error { - s.cond.L.Lock() - s.cond.Wait() - err := s.serr - s.cond.L.Unlock() - return err -} - -// signalShutdown signals a shutdown an begins server closing -func (s *stdserver) signalShutdown(err error) { - s.cond.L.Lock() - s.serr = err - s.cond.Signal() - s.cond.L.Unlock() -} - -func stdserve(events Events, listeners []*listener) error { - numLoops := events.NumLoops - if numLoops <= 0 { - if numLoops == 0 { - numLoops = 1 - } else { - numLoops = runtime.NumCPU() - } - } - - s := &stdserver{} - s.events = events - s.lns = listeners - s.cond = sync.NewCond(&sync.Mutex{}) - - //println("-- server starting") - if events.Serving != nil { - var svr Server - svr.NumLoops = numLoops - svr.Addrs = make([]net.Addr, len(listeners)) - for i, ln := range listeners { - svr.Addrs[i] = ln.lnaddr - } - action := events.Serving(svr) - switch action { - case Shutdown: - return nil - } - } - for i := 0; i < numLoops; i++ { - s.loops = append(s.loops, &stdloop{ - idx: i, - ch: make(chan interface{}), - conns: make(map[*stdconn]bool), - }) - } - var ferr error - defer func() { - // wait on a signal for shutdown - ferr = s.waitForShutdown() - - // notify all loops to close by closing all listeners - for _, l := range s.loops { - l.ch <- errClosing - } - - // wait on all loops to main loop channel events - s.loopwg.Wait() - - // shutdown all listeners - for i := 0; i < len(s.lns); i++ { - s.lns[i].close() - } - - // wait on all listeners to complete - s.lnwg.Wait() - - // close all connections - s.loopwg.Add(len(s.loops)) - for _, l := range s.loops { - l.ch <- errCloseConns - } - s.loopwg.Wait() - - }() - s.loopwg.Add(numLoops) - for i := 0; i < numLoops; i++ { - go stdloopRun(s, s.loops[i]) - } - s.lnwg.Add(len(listeners)) - for i := 0; i < len(listeners); i++ { - go stdlistenerRun(s, listeners[i], i) - } - return ferr -} - -func stdlistenerRun(s *stdserver, ln *listener, lnidx int) { - var ferr error - defer func() { - s.signalShutdown(ferr) - s.lnwg.Done() - }() - var packet [0xFFFF]byte - for { - if ln.pconn != nil { - // udp - n, addr, err := ln.pconn.ReadFrom(packet[:]) - if err != nil { - ferr = err - return - } - l := s.loops[int(atomic.AddUintptr(&s.accepted, 1))%len(s.loops)] - l.ch <- &stdudpconn{ - addrIndex: lnidx, - localAddr: ln.lnaddr, - remoteAddr: addr, - in: append([]byte{}, packet[:n]...), - } - } else { - // tcp - conn, err := ln.ln.Accept() - if err != nil { - ferr = err - return - } - l := s.loops[int(atomic.AddUintptr(&s.accepted, 1))%len(s.loops)] - c := &stdconn{conn: conn, loop: l, lnidx: lnidx} - l.ch <- c - go func(c *stdconn) { - var packet [0xFFFF]byte - for { - n, err := c.conn.Read(packet[:]) - if err != nil { - c.conn.SetReadDeadline(time.Time{}) - l.ch <- &stderr{c, err} - return - } - l.ch <- &stdin{c, append([]byte{}, packet[:n]...)} - } - }(c) - } - } -} - -func stdloopRun(s *stdserver, l *stdloop) { - var err error - tick := make(chan bool) - tock := make(chan time.Duration) - defer func() { - //fmt.Println("-- loop stopped --", l.idx) - if l.idx == 0 && s.events.Tick != nil { - close(tock) - go func() { - for range tick { - } - }() - } - s.signalShutdown(err) - s.loopwg.Done() - stdloopEgress(s, l) - s.loopwg.Done() - }() - if l.idx == 0 && s.events.Tick != nil { - go func() { - for { - tick <- true - delay, ok := <-tock - if !ok { - break - } - time.Sleep(delay) - } - }() - } - //fmt.Println("-- loop started --", l.idx) - for { - select { - case <-tick: - delay, action := s.events.Tick() - switch action { - case Shutdown: - err = errClosing - } - tock <- delay - case v := <-l.ch: - switch v := v.(type) { - case error: - err = v - case *stdconn: - err = stdloopAccept(s, l, v) - case *stdin: - err = stdloopRead(s, l, v.c, v.in) - case *stdudpconn: - err = stdloopReadUDP(s, l, v) - case *stderr: - err = stdloopError(s, l, v.c, v.err) - case wakeReq: - err = stdloopRead(s, l, v.c, nil) - } - } - if err != nil { - return - } - } -} - -func stdloopEgress(s *stdserver, l *stdloop) { - var closed bool -loop: - for v := range l.ch { - switch v := v.(type) { - case error: - if v == errCloseConns { - closed = true - for c := range l.conns { - stdloopClose(s, l, c) - } - } - case *stderr: - stdloopError(s, l, v.c, v.err) - } - if len(l.conns) == 0 && closed { - break loop - } - } -} - -func stdloopError(s *stdserver, l *stdloop, c *stdconn, err error) error { - delete(l.conns, c) - closeEvent := true - switch atomic.LoadInt32(&c.done) { - case 0: // read error - c.conn.Close() - if err == io.EOF { - err = nil - } - case 1: // closed - c.conn.Close() - err = nil - case 2: // detached - err = nil - if s.events.Detached == nil { - c.conn.Close() - } else { - closeEvent = false - switch s.events.Detached(c, &stddetachedConn{c.conn, c.donein}) { - case Shutdown: - return errClosing - } - } - } - if closeEvent { - if s.events.Closed != nil { - switch s.events.Closed(c, err) { - case Shutdown: - return errClosing - } - } - } - return nil -} - -type stddetachedConn struct { - conn net.Conn // original conn - in []byte // extra input data -} - -func (c *stddetachedConn) Read(p []byte) (n int, err error) { - if len(c.in) > 0 { - if len(c.in) <= len(p) { - copy(p, c.in) - n = len(c.in) - c.in = nil - return - } - copy(p, c.in[:len(p)]) - n = len(p) - c.in = c.in[n:] - return - } - return c.conn.Read(p) -} - -func (c *stddetachedConn) Write(p []byte) (n int, err error) { - return c.conn.Write(p) -} - -func (c *stddetachedConn) Close() error { - return c.conn.Close() -} - -func (c *stddetachedConn) Wake() {} - -func stdloopRead(s *stdserver, l *stdloop, c *stdconn, in []byte) error { - if atomic.LoadInt32(&c.done) == 2 { - // should not ignore reads for detached connections - c.donein = append(c.donein, in...) - return nil - } - if s.events.Data != nil { - out, action := s.events.Data(c, in) - if len(out) > 0 { - if s.events.PreWrite != nil { - s.events.PreWrite() - } - c.conn.Write(out) - } - switch action { - case Shutdown: - return errClosing - case Detach: - return stdloopDetach(s, l, c) - case Close: - return stdloopClose(s, l, c) - } - } - return nil -} - -func stdloopReadUDP(s *stdserver, l *stdloop, c *stdudpconn) error { - if s.events.Data != nil { - out, action := s.events.Data(c, c.in) - if len(out) > 0 { - if s.events.PreWrite != nil { - s.events.PreWrite() - } - s.lns[c.addrIndex].pconn.WriteTo(out, c.remoteAddr) - } - switch action { - case Shutdown: - return errClosing - } - } - return nil -} - -func stdloopDetach(s *stdserver, l *stdloop, c *stdconn) error { - atomic.StoreInt32(&c.done, 2) - c.conn.SetReadDeadline(time.Now()) - return nil -} - -func stdloopClose(s *stdserver, l *stdloop, c *stdconn) error { - atomic.StoreInt32(&c.done, 1) - c.conn.SetReadDeadline(time.Now()) - return nil -} - -func stdloopAccept(s *stdserver, l *stdloop, c *stdconn) error { - l.conns[c] = true - c.addrIndex = c.lnidx - c.localAddr = s.lns[c.lnidx].lnaddr - c.remoteAddr = c.conn.RemoteAddr() - - if s.events.Opened != nil { - out, opts, action := s.events.Opened(c) - if len(out) > 0 { - if s.events.PreWrite != nil { - s.events.PreWrite() - } - c.conn.Write(out) - } - if opts.TCPKeepAlive > 0 { - if c, ok := c.conn.(*net.TCPConn); ok { - c.SetKeepAlive(true) - c.SetKeepAlivePeriod(opts.TCPKeepAlive) - } - } - switch action { - case Shutdown: - return errClosing - case Detach: - return stdloopDetach(s, l, c) - case Close: - return stdloopClose(s, l, c) - } - } - return nil -} diff --git a/mixer/evio/evio/evio_test.go b/mixer/evio/evio/evio_test.go deleted file mode 100644 index ec37b968..00000000 --- a/mixer/evio/evio/evio_test.go +++ /dev/null @@ -1,478 +0,0 @@ -// Copyright 2017 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package evio - -import ( - "bufio" - "fmt" - "io" - "math/rand" - "net" - "os" - "strings" - "sync" - "sync/atomic" - "testing" - "time" -) - -func TestServe(t *testing.T) { - // start a server - // connect 10 clients - // each client will pipe random data for 1-3 seconds. - // the writes to the server will be random sizes. 0KB - 1MB. - // the server will echo back the data. - // waits for graceful connection closing. - t.Run("stdlib", func(t *testing.T) { - t.Run("tcp", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServe("tcp-net", ":9997", false, 10, 1, Random) - }) - t.Run("5-loop", func(t *testing.T) { - testServe("tcp-net", ":9998", false, 10, 5, LeastConnections) - }) - t.Run("N-loop", func(t *testing.T) { - testServe("tcp-net", ":9999", false, 10, -1, RoundRobin) - }) - }) - t.Run("unix", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServe("tcp-net", ":9989", true, 10, 1, Random) - }) - t.Run("5-loop", func(t *testing.T) { - testServe("tcp-net", ":9988", true, 10, 5, LeastConnections) - }) - t.Run("N-loop", func(t *testing.T) { - testServe("tcp-net", ":9987", true, 10, -1, RoundRobin) - }) - }) - }) - t.Run("poll", func(t *testing.T) { - t.Run("tcp", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServe("tcp", ":9991", false, 10, 1, Random) - }) - t.Run("5-loop", func(t *testing.T) { - testServe("tcp", ":9992", false, 10, 5, LeastConnections) - }) - t.Run("N-loop", func(t *testing.T) { - testServe("tcp", ":9993", false, 10, -1, RoundRobin) - }) - }) - t.Run("unix", func(t *testing.T) { - t.Run("1-loop", func(t *testing.T) { - testServe("tcp", ":9994", true, 10, 1, Random) - }) - t.Run("5-loop", func(t *testing.T) { - testServe("tcp", ":9995", true, 10, 5, LeastConnections) - }) - t.Run("N-loop", func(t *testing.T) { - testServe("tcp", ":9996", true, 10, -1, RoundRobin) - }) - }) - }) - -} - -func testServe(network, addr string, unix bool, nclients, nloops int, balance LoadBalance) { - var started int32 - var connected int32 - var disconnected int32 - - var events Events - events.LoadBalance = balance - events.NumLoops = nloops - events.Serving = func(srv Server) (action Action) { - return - } - events.Opened = func(c Conn) (out []byte, opts Options, action Action) { - c.SetContext(c) - atomic.AddInt32(&connected, 1) - out = []byte("sweetness\r\n") - opts.TCPKeepAlive = time.Minute * 5 - if c.LocalAddr() == nil { - panic("nil local addr") - } - if c.RemoteAddr() == nil { - panic("nil local addr") - } - return - } - events.Closed = func(c Conn, err error) (action Action) { - if c.Context() != c { - panic("invalid context") - } - atomic.AddInt32(&disconnected, 1) - if atomic.LoadInt32(&connected) == atomic.LoadInt32(&disconnected) && - atomic.LoadInt32(&disconnected) == int32(nclients) { - action = Shutdown - } - return - } - events.Data = func(c Conn, in []byte) (out []byte, action Action) { - out = in - return - } - events.Tick = func() (delay time.Duration, action Action) { - if atomic.LoadInt32(&started) == 0 { - for i := 0; i < nclients; i++ { - go startClient(network, addr, nloops) - } - atomic.StoreInt32(&started, 1) - } - delay = time.Second / 5 - return - } - var err error - if unix { - socket := strings.Replace(addr, ":", "socket", 1) - os.RemoveAll(socket) - defer os.RemoveAll(socket) - err = Serve(events, network+"://"+addr, "unix://"+socket) - } else { - err = Serve(events, network+"://"+addr) - } - if err != nil { - panic(err) - } -} - -func startClient(network, addr string, nloops int) { - onetwork := network - network = strings.Replace(network, "-net", "", -1) - rand.Seed(time.Now().UnixNano()) - c, err := net.Dial(network, addr) - if err != nil { - panic(err) - } - defer c.Close() - rd := bufio.NewReader(c) - msg, err := rd.ReadBytes('\n') - if err != nil { - panic(err) - } - if string(msg) != "sweetness\r\n" { - panic("bad header") - } - duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 8 - start := time.Now() - for time.Since(start) < duration { - sz := rand.Int() % (1024 * 1024) - data := make([]byte, sz) - if _, err := rand.Read(data); err != nil { - panic(err) - } - if _, err := c.Write(data); err != nil { - panic(err) - } - data2 := make([]byte, len(data)) - if _, err := io.ReadFull(rd, data2); err != nil { - panic(err) - } - if string(data) != string(data2) { - fmt.Printf("mismatch %s/%d: %d vs %d bytes\n", onetwork, nloops, len(data), len(data2)) - //panic("mismatch") - } - } -} - -func must(err error) { - if err != nil { - panic(err) - } -} -func TestTick(t *testing.T) { - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - testTick("tcp", ":9991", false) - }() - wg.Add(1) - go func() { - defer wg.Done() - testTick("tcp", ":9992", true) - }() - wg.Add(1) - go func() { - defer wg.Done() - testTick("unix", "socket1", false) - }() - wg.Add(1) - go func() { - defer wg.Done() - testTick("unix", "socket2", true) - }() - wg.Wait() -} -func testTick(network, addr string, stdlib bool) { - var events Events - var count int - start := time.Now() - events.Tick = func() (delay time.Duration, action Action) { - if count == 25 { - action = Shutdown - return - } - count++ - delay = time.Millisecond * 10 - return - } - if stdlib { - must(Serve(events, network+"-net://"+addr)) - } else { - must(Serve(events, network+"://"+addr)) - } - dur := time.Since(start) - if dur < 250&time.Millisecond || dur > time.Second { - panic("bad ticker timing") - } -} - -func TestShutdown(t *testing.T) { - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - testShutdown("tcp", ":9991", false) - }() - wg.Add(1) - go func() { - defer wg.Done() - testShutdown("tcp", ":9992", true) - }() - wg.Add(1) - go func() { - defer wg.Done() - testShutdown("unix", "socket1", false) - }() - wg.Add(1) - go func() { - defer wg.Done() - testShutdown("unix", "socket2", true) - }() - wg.Wait() -} -func testShutdown(network, addr string, stdlib bool) { - var events Events - var count int - var clients int64 - var N = 10 - events.Opened = func(c Conn) (out []byte, opts Options, action Action) { - atomic.AddInt64(&clients, 1) - return - } - events.Closed = func(c Conn, err error) (action Action) { - atomic.AddInt64(&clients, -1) - return - } - events.Tick = func() (delay time.Duration, action Action) { - if count == 0 { - // start clients - for i := 0; i < N; i++ { - go func() { - conn, err := net.Dial(network, addr) - must(err) - defer conn.Close() - _, err = conn.Read([]byte{0}) - if err == nil { - panic("expected error") - } - }() - } - } else { - if int(atomic.LoadInt64(&clients)) == N { - action = Shutdown - } - } - count++ - delay = time.Second / 20 - return - } - if stdlib { - must(Serve(events, network+"-net://"+addr)) - } else { - must(Serve(events, network+"://"+addr)) - } - if clients != 0 { - panic("did not call close on all clients") - } -} - -func TestDetach(t *testing.T) { - t.Run("poll", func(t *testing.T) { - t.Run("tcp", func(t *testing.T) { - testDetach("tcp", ":9991", false) - }) - t.Run("unix", func(t *testing.T) { - testDetach("unix", "socket1", false) - }) - }) - t.Run("stdlib", func(t *testing.T) { - t.Run("tcp", func(t *testing.T) { - testDetach("tcp", ":9992", true) - }) - t.Run("unix", func(t *testing.T) { - testDetach("unix", "socket2", true) - }) - }) -} - -func testDetach(network, addr string, stdlib bool) { - // we will write a bunch of data with the text "--detached--" in the - // middle followed by a bunch of data. - rand.Seed(time.Now().UnixNano()) - rdat := make([]byte, 10*1024) - if _, err := rand.Read(rdat); err != nil { - panic("random error: " + err.Error()) - } - expected := []byte(string(rdat) + "--detached--" + string(rdat)) - var cin []byte - var events Events - events.Data = func(c Conn, in []byte) (out []byte, action Action) { - cin = append(cin, in...) - if len(cin) >= len(expected) { - if string(cin) != string(expected) { - panic("mismatch client -> server") - } - return cin, Detach - } - return - } - - var done int64 - events.Detached = func(c Conn, conn io.ReadWriteCloser) (action Action) { - go func() { - p := make([]byte, len(expected)) - defer conn.Close() - _, err := io.ReadFull(conn, p) - must(err) - conn.Write(expected) - }() - return - } - - events.Serving = func(srv Server) (action Action) { - go func() { - p := make([]byte, len(expected)) - _ = expected - conn, err := net.Dial(network, addr) - must(err) - defer conn.Close() - conn.Write(expected) - _, err = io.ReadFull(conn, p) - must(err) - conn.Write(expected) - _, err = io.ReadFull(conn, p) - must(err) - atomic.StoreInt64(&done, 1) - }() - return - } - events.Tick = func() (delay time.Duration, action Action) { - delay = time.Second / 5 - if atomic.LoadInt64(&done) == 1 { - action = Shutdown - } - return - } - if stdlib { - must(Serve(events, network+"-net://"+addr)) - } else { - must(Serve(events, network+"://"+addr)) - } -} - -func TestBadAddresses(t *testing.T) { - var events Events - events.Serving = func(srv Server) (action Action) { - return Shutdown - } - if err := Serve(events, "tulip://howdy"); err == nil { - t.Fatalf("expected error") - } - if err := Serve(events, "howdy"); err == nil { - t.Fatalf("expected error") - } - if err := Serve(events, "tcp://"); err != nil { - t.Fatalf("expected nil, got '%v'", err) - } -} - -func TestInputStream(t *testing.T) { - var s InputStream - in := []byte("HELLO") - data := s.Begin(in) - if string(data) != string(in) { - t.Fatalf("expected '%v', got '%v'", in, data) - } - s.End(in[3:]) - data = s.Begin([]byte("WLY")) - if string(data) != "LOWLY" { - t.Fatalf("expected '%v', got '%v'", "LOWLY", data) - } - s.End(nil) - data = s.Begin([]byte("PLAYER")) - if string(data) != "PLAYER" { - t.Fatalf("expected '%v', got '%v'", "PLAYER", data) - } -} - -func TestReuseInputBuffer(t *testing.T) { - reuses := []bool{true, false} - for _, reuse := range reuses { - var events Events - events.Opened = func(c Conn) (out []byte, opts Options, action Action) { - opts.ReuseInputBuffer = reuse - return - } - var prev []byte - events.Data = func(c Conn, in []byte) (out []byte, action Action) { - if prev == nil { - prev = in - } else { - reused := string(in) == string(prev) - if reused != reuse { - t.Fatalf("expected %v, got %v", reuse, reused) - } - action = Shutdown - } - return - } - events.Serving = func(_ Server) (action Action) { - go func() { - c, err := net.Dial("tcp", ":9991") - must(err) - defer c.Close() - c.Write([]byte("packet1")) - time.Sleep(time.Second / 5) - c.Write([]byte("packet2")) - }() - return - } - must(Serve(events, "tcp://:9991")) - } - -} - -func TestReuseport(t *testing.T) { - var events Events - events.Serving = func(s Server) (action Action) { - return Shutdown - } - var wg sync.WaitGroup - wg.Add(5) - for i := 0; i < 5; i++ { - var t = "1" - if i%2 == 0 { - t = "true" - } - go func(t string) { - defer wg.Done() - must(Serve(events, "tcp://:9991?reuseport="+t)) - }(t) - } - wg.Wait() -} diff --git a/mixer/evio/evio/evio_unix.go b/mixer/evio/evio/evio_unix.go deleted file mode 100644 index ffbf1866..00000000 --- a/mixer/evio/evio/evio_unix.go +++ /dev/null @@ -1,544 +0,0 @@ -// Copyright 2018 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -// +build darwin netbsd freebsd openbsd dragonfly linux - -package evio - -import ( - "io" - "net" - "os" - "runtime" - "sync" - "sync/atomic" - "syscall" - "time" - - "github.com/henrylee2cn/erpc/v6/mixer/evio/evio/internal" - reuseport "github.com/kavu/go_reuseport" -) - -type conn struct { - fd int // file descriptor - lnidx int // listener index in the server lns list - out []byte // write buffer - sa syscall.Sockaddr // remote socket address - reuse bool // should reuse input buffer - opened bool // connection opened event fired - action Action // next user action - ctx interface{} // user-defined context - addrIndex int // index of listening address - localAddr net.Addr // local addre - remoteAddr net.Addr // remote addr - loop *loop // connected loop -} - -func (c *conn) Context() interface{} { return c.ctx } -func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx } -func (c *conn) AddrIndex() int { return c.addrIndex } -func (c *conn) LocalAddr() net.Addr { return c.localAddr } -func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr } -func (c *conn) Wake() { - if c.loop != nil { - c.loop.poll.Trigger(c) - } -} - -type server struct { - events Events // user events - loops []*loop // all the loops - lns []*listener // all the listeners - wg sync.WaitGroup // loop close waitgroup - cond *sync.Cond // shutdown signaler - balance LoadBalance // load balancing method - accepted uintptr // accept counter - tch chan time.Duration // ticker channel - - //ticktm time.Time // next tick time -} - -type loop struct { - idx int // loop index in the server loops list - poll *internal.Poll // epoll or kqueue - packet []byte // read packet buffer - fdconns map[int]*conn // loop connections fd -> conn - count int32 // connection count -} - -// waitForShutdown waits for a signal to shutdown -func (s *server) waitForShutdown() { - s.cond.L.Lock() - s.cond.Wait() - s.cond.L.Unlock() -} - -// signalShutdown signals a shutdown an begins server closing -func (s *server) signalShutdown() { - s.cond.L.Lock() - s.cond.Signal() - s.cond.L.Unlock() -} - -func serve(events Events, listeners []*listener) error { - // figure out the correct number of loops/goroutines to use. - numLoops := events.NumLoops - if numLoops <= 0 { - if numLoops == 0 { - numLoops = 1 - } else { - numLoops = runtime.NumCPU() - } - } - - s := &server{} - s.events = events - s.lns = listeners - s.cond = sync.NewCond(&sync.Mutex{}) - s.balance = events.LoadBalance - s.tch = make(chan time.Duration) - - //println("-- server starting") - if s.events.Serving != nil { - var svr Server - svr.NumLoops = numLoops - svr.Addrs = make([]net.Addr, len(listeners)) - for i, ln := range listeners { - svr.Addrs[i] = ln.lnaddr - } - action := s.events.Serving(svr) - switch action { - case None: - case Shutdown: - return nil - } - } - - defer func() { - // wait on a signal for shutdown - s.waitForShutdown() - - // notify all loops to close by closing all listeners - for _, l := range s.loops { - l.poll.Trigger(errClosing) - } - - // wait on all loops to complete reading events - s.wg.Wait() - - // close loops and all outstanding connections - for _, l := range s.loops { - for _, c := range l.fdconns { - loopCloseConn(s, l, c, nil) - } - l.poll.Close() - } - //println("-- server stopped") - }() - - // create loops locally and bind the listeners. - for i := 0; i < numLoops; i++ { - l := &loop{ - idx: i, - poll: internal.OpenPoll(), - packet: make([]byte, 0xFFFF), - fdconns: make(map[int]*conn), - } - for _, ln := range listeners { - l.poll.AddRead(ln.fd) - } - s.loops = append(s.loops, l) - } - // start loops in background - s.wg.Add(len(s.loops)) - for _, l := range s.loops { - go loopRun(s, l) - } - return nil -} - -func loopCloseConn(s *server, l *loop, c *conn, err error) error { - atomic.AddInt32(&l.count, -1) - delete(l.fdconns, c.fd) - syscall.Close(c.fd) - if s.events.Closed != nil { - switch s.events.Closed(c, err) { - case None: - case Shutdown: - return errClosing - } - } - return nil -} - -func loopDetachConn(s *server, l *loop, c *conn, err error) error { - if s.events.Detached == nil { - return loopCloseConn(s, l, c, err) - } - l.poll.ModDetach(c.fd) - - atomic.AddInt32(&l.count, -1) - delete(l.fdconns, c.fd) - if err := syscall.SetNonblock(c.fd, false); err != nil { - return err - } - switch s.events.Detached(c, &detachedConn{fd: c.fd}) { - case None: - case Shutdown: - return errClosing - } - return nil -} - -func loopNote(s *server, l *loop, note interface{}) error { - var err error - switch v := note.(type) { - case time.Duration: - delay, action := s.events.Tick() - switch action { - case None: - case Shutdown: - err = errClosing - } - s.tch <- delay - case error: // shutdown - err = v - case *conn: - // Wake called for connection - if l.fdconns[v.fd] != v { - return nil // ignore stale wakes - } - return loopWake(s, l, v) - } - return err -} - -func loopRun(s *server, l *loop) { - defer func() { - //fmt.Println("-- loop stopped --", l.idx) - s.signalShutdown() - s.wg.Done() - }() - - if l.idx == 0 && s.events.Tick != nil { - go loopTicker(s, l) - } - - //fmt.Println("-- loop started --", l.idx) - l.poll.Wait(func(fd int, note interface{}) error { - if fd == 0 { - return loopNote(s, l, note) - } - c := l.fdconns[fd] - switch { - case c == nil: - return loopAccept(s, l, fd) - case !c.opened: - return loopOpened(s, l, c) - case len(c.out) > 0: - return loopWrite(s, l, c) - case c.action != None: - return loopAction(s, l, c) - default: - return loopRead(s, l, c) - } - }) -} - -func loopTicker(s *server, l *loop) { - for { - if err := l.poll.Trigger(time.Duration(0)); err != nil { - break - } - time.Sleep(<-s.tch) - } -} - -func loopAccept(s *server, l *loop, fd int) error { - for i, ln := range s.lns { - if ln.fd == fd { - if len(s.loops) > 1 { - switch s.balance { - case LeastConnections: - n := atomic.LoadInt32(&l.count) - for _, lp := range s.loops { - if lp.idx != l.idx { - if atomic.LoadInt32(&lp.count) < n { - return nil // do not accept - } - } - } - case RoundRobin: - idx := int(atomic.LoadUintptr(&s.accepted)) % len(s.loops) - if idx != l.idx { - return nil // do not accept - } - atomic.AddUintptr(&s.accepted, 1) - } - } - if ln.pconn != nil { - return loopUDPRead(s, l, i, fd) - } - nfd, sa, err := syscall.Accept(fd) - if err != nil { - if err == syscall.EAGAIN { - return nil - } - return err - } - if err := syscall.SetNonblock(nfd, true); err != nil { - return err - } - c := &conn{fd: nfd, sa: sa, lnidx: i, loop: l} - l.fdconns[c.fd] = c - l.poll.AddReadWrite(c.fd) - atomic.AddInt32(&l.count, 1) - break - } - } - return nil -} - -func loopUDPRead(s *server, l *loop, lnidx, fd int) error { - n, sa, err := syscall.Recvfrom(fd, l.packet, 0) - if err != nil || n == 0 { - return nil - } - if s.events.Data != nil { - var sa6 syscall.SockaddrInet6 - switch sa := sa.(type) { - case *syscall.SockaddrInet4: - sa6.ZoneId = 0 - sa6.Port = sa.Port - for i := 0; i < 12; i++ { - sa6.Addr[i] = 0 - } - sa6.Addr[12] = sa.Addr[0] - sa6.Addr[13] = sa.Addr[1] - sa6.Addr[14] = sa.Addr[2] - sa6.Addr[15] = sa.Addr[3] - case *syscall.SockaddrInet6: - sa6 = *sa - } - c := &conn{} - c.addrIndex = lnidx - c.localAddr = s.lns[lnidx].lnaddr - c.remoteAddr = internal.SockaddrToAddr(&sa6) - var in []byte - if c.reuse { - in = l.packet[:n] - } else { - b := l.packet[:n] - in = make([]byte, len(b)) - copy(in, b) - } - out, action := s.events.Data(c, in) - if len(out) > 0 { - if s.events.PreWrite != nil { - s.events.PreWrite() - } - syscall.Sendto(fd, out, 0, sa) - } - switch action { - case Shutdown: - return errClosing - } - } - return nil -} - -func loopOpened(s *server, l *loop, c *conn) error { - c.opened = true - c.addrIndex = c.lnidx - c.localAddr = s.lns[c.lnidx].lnaddr - c.remoteAddr = internal.SockaddrToAddr(c.sa) - if s.events.Opened != nil { - out, opts, action := s.events.Opened(c) - if len(out) > 0 { - c.out = append([]byte{}, out...) - } - c.action = action - c.reuse = opts.ReuseInputBuffer - if opts.TCPKeepAlive > 0 { - if _, ok := s.lns[c.lnidx].ln.(*net.TCPListener); ok { - internal.SetKeepAlive(c.fd, int(opts.TCPKeepAlive/time.Second)) - } - } - } - if len(c.out) == 0 && c.action == None { - l.poll.ModRead(c.fd) - } - return nil -} - -func loopWrite(s *server, l *loop, c *conn) error { - if s.events.PreWrite != nil { - s.events.PreWrite() - } - n, err := syscall.Write(c.fd, c.out) - if err != nil { - if err == syscall.EAGAIN { - return nil - } - return loopCloseConn(s, l, c, err) - } - if n == len(c.out) { - c.out = nil - } else { - c.out = c.out[n:] - } - if len(c.out) == 0 && c.action == None { - l.poll.ModRead(c.fd) - } - return nil -} - -func loopAction(s *server, l *loop, c *conn) error { - switch c.action { - default: - c.action = None - case Close: - return loopCloseConn(s, l, c, nil) - case Shutdown: - return errClosing - case Detach: - return loopDetachConn(s, l, c, nil) - } - if len(c.out) == 0 && c.action == None { - l.poll.ModRead(c.fd) - } - return nil -} - -func loopWake(s *server, l *loop, c *conn) error { - if s.events.Data == nil { - return nil - } - out, action := s.events.Data(c, nil) - c.action = action - if len(out) > 0 { - c.out = append([]byte{}, out...) - } - if len(c.out) != 0 || c.action != None { - l.poll.ModReadWrite(c.fd) - } - return nil -} - -func loopRead(s *server, l *loop, c *conn) error { - var in []byte - n, err := syscall.Read(c.fd, l.packet) - if n == 0 || err != nil { - if err == syscall.EAGAIN { - return nil - } - return loopCloseConn(s, l, c, err) - } - if c.reuse { - in = l.packet[:n] - } else { - b := l.packet[:n] - in = make([]byte, len(b)) - copy(in, b) - } - if s.events.Data != nil { - out, action := s.events.Data(c, in) - c.action = action - if len(out) > 0 { - c.out = append([]byte{}, out...) - } - } - if len(c.out) != 0 || c.action != None { - l.poll.ModReadWrite(c.fd) - } - return nil -} - -type detachedConn struct { - fd int -} - -func (c *detachedConn) Close() error { - err := syscall.Close(c.fd) - if err != nil { - return err - } - c.fd = -1 - return nil -} - -func (c *detachedConn) Read(p []byte) (n int, err error) { - n, err = syscall.Read(c.fd, p) - if err != nil { - return n, err - } - if n == 0 { - if len(p) == 0 { - return 0, nil - } - return 0, io.EOF - } - return n, nil -} - -func (c *detachedConn) Write(p []byte) (n int, err error) { - n = len(p) - for len(p) > 0 { - nn, err := syscall.Write(c.fd, p) - if err != nil { - return n, err - } - p = p[nn:] - } - return n, nil -} - -func (ln *listener) close() { - if ln.fd != 0 { - syscall.Close(ln.fd) - } - if ln.f != nil { - ln.f.Close() - } - if ln.ln != nil { - ln.ln.Close() - } - if ln.pconn != nil { - ln.pconn.Close() - } - if ln.network == "unix" { - os.RemoveAll(ln.addr) - } -} - -// system takes the net listener and detaches it from it's parent -// event loop, grabs the file descriptor, and makes it non-blocking. -func (ln *listener) system() error { - var err error - switch netln := ln.ln.(type) { - case nil: - switch pconn := ln.pconn.(type) { - case *net.UDPConn: - ln.f, err = pconn.File() - } - case *net.TCPListener: - ln.f, err = netln.File() - case *net.UnixListener: - ln.f, err = netln.File() - } - if err != nil { - ln.close() - return err - } - ln.fd = int(ln.f.Fd()) - return syscall.SetNonblock(ln.fd, true) -} - -func reuseportListenPacket(proto, addr string) (l net.PacketConn, err error) { - return reuseport.ListenPacket(proto, addr) -} - -func reuseportListen(proto, addr string) (l net.Listener, err error) { - return reuseport.Listen(proto, addr) -} diff --git a/mixer/evio/evio/internal/internal_bsd.go b/mixer/evio/evio/internal/internal_bsd.go deleted file mode 100644 index 5b433cd5..00000000 --- a/mixer/evio/evio/internal/internal_bsd.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2017 Joshua J Baker. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -// +build darwin netbsd freebsd openbsd dragonfly - -package internal - -import ( - "syscall" -) - -// Poll ... -type Poll struct { - fd int - changes []syscall.Kevent_t - notes noteQueue -} - -// OpenPoll ... -func OpenPoll() *Poll { - l := new(Poll) - p, err := syscall.Kqueue() - if err != nil { - panic(err) - } - l.fd = p - _, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{ - Ident: 0, - Filter: syscall.EVFILT_USER, - Flags: syscall.EV_ADD | syscall.EV_CLEAR, - }}, nil, nil) - if err != nil { - panic(err) - } - - return l -} - -// Close ... -func (p *Poll) Close() error { - return syscall.Close(p.fd) -} - -// Trigger ... -func (p *Poll) Trigger(note interface{}) error { - p.notes.Add(note) - _, err := syscall.Kevent(p.fd, []syscall.Kevent_t{{ - Ident: 0, - Filter: syscall.EVFILT_USER, - Fflags: syscall.NOTE_TRIGGER, - }}, nil, nil) - return err -} - -// Wait ... -func (p *Poll) Wait(iter func(fd int, note interface{}) error) error { - events := make([]syscall.Kevent_t, 128) - for { - n, err := syscall.Kevent(p.fd, p.changes, events, nil) - if err != nil && err != syscall.EINTR { - return err - } - p.changes = p.changes[:0] - if err := p.notes.ForEach(func(note interface{}) error { - return iter(0, note) - }); err != nil { - return err - } - for i := 0; i < n; i++ { - if fd := int(events[i].Ident); fd != 0 { - if err := iter(fd, nil); err != nil { - return err - } - } - } - } -} - -// AddRead ... -func (p *Poll) AddRead(fd int) { - p.changes = append(p.changes, - syscall.Kevent_t{ - Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ, - }, - ) -} - -// AddReadWrite ... -func (p *Poll) AddReadWrite(fd int) { - p.changes = append(p.changes, - syscall.Kevent_t{ - Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ, - }, - syscall.Kevent_t{ - Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE, - }, - ) -} - -// ModRead ... -func (p *Poll) ModRead(fd int) { - p.changes = append(p.changes, syscall.Kevent_t{ - Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE, - }) -} - -// ModReadWrite ... -func (p *Poll) ModReadWrite(fd int) { - p.changes = append(p.changes, syscall.Kevent_t{ - Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE, - }) -} - -// ModDetach ... -func (p *Poll) ModDetach(fd int) { - p.changes = append(p.changes, - syscall.Kevent_t{ - Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ, - }, - syscall.Kevent_t{ - Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE, - }, - ) -} diff --git a/mixer/evio/evio/internal/internal_darwin.go b/mixer/evio/evio/internal/internal_darwin.go deleted file mode 100644 index 3e8fde8b..00000000 --- a/mixer/evio/evio/internal/internal_darwin.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2017 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package internal - -import "syscall" - -// SetKeepAlive sets the keepalive for the connection -func SetKeepAlive(fd, secs int) error { - if err := syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, 0x8, 1); err != nil { - return err - } - switch err := syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, 0x101, secs); err { - case nil, syscall.ENOPROTOOPT: // OS X 10.7 and earlier don't support this option - default: - return err - } - return syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPALIVE, secs) -} diff --git a/mixer/evio/evio/internal/internal_linux.go b/mixer/evio/evio/internal/internal_linux.go deleted file mode 100644 index 6bbaffa4..00000000 --- a/mixer/evio/evio/internal/internal_linux.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2017 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package internal - -import ( - "syscall" -) - -// Poll ... -type Poll struct { - fd int // epoll fd - wfd int // wake fd - notes noteQueue -} - -// OpenPoll ... -func OpenPoll() *Poll { - l := new(Poll) - p, err := syscall.EpollCreate1(0) - if err != nil { - panic(err) - } - l.fd = p - r0, _, e0 := syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) - if e0 != 0 { - syscall.Close(p) - panic(err) - } - l.wfd = int(r0) - l.AddRead(l.wfd) - return l -} - -// Close ... -func (p *Poll) Close() error { - if err := syscall.Close(p.wfd); err != nil { - return err - } - return syscall.Close(p.fd) -} - -// Trigger ... -func (p *Poll) Trigger(note interface{}) error { - p.notes.Add(note) - _, err := syscall.Write(p.wfd, []byte{0, 0, 0, 0, 0, 0, 0, 1}) - return err -} - -// Wait ... -func (p *Poll) Wait(iter func(fd int, note interface{}) error) error { - events := make([]syscall.EpollEvent, 64) - for { - n, err := syscall.EpollWait(p.fd, events, -1) - if err != nil && err != syscall.EINTR { - return err - } - if err := p.notes.ForEach(func(note interface{}) error { - return iter(0, note) - }); err != nil { - return err - } - for i := 0; i < n; i++ { - if fd := int(events[i].Fd); fd != p.wfd { - if err := iter(fd, nil); err != nil { - return err - } - } else { - - } - } - } -} - -// AddReadWrite ... -func (p *Poll) AddReadWrite(fd int) { - if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_ADD, fd, - &syscall.EpollEvent{Fd: int32(fd), - Events: syscall.EPOLLIN | syscall.EPOLLOUT, - }, - ); err != nil { - panic(err) - } -} - -// AddRead ... -func (p *Poll) AddRead(fd int) { - if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_ADD, fd, - &syscall.EpollEvent{Fd: int32(fd), - Events: syscall.EPOLLIN, - }, - ); err != nil { - panic(err) - } -} - -// ModRead ... -func (p *Poll) ModRead(fd int) { - if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_MOD, fd, - &syscall.EpollEvent{Fd: int32(fd), - Events: syscall.EPOLLIN, - }, - ); err != nil { - panic(err) - } -} - -// ModReadWrite ... -func (p *Poll) ModReadWrite(fd int) { - if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_MOD, fd, - &syscall.EpollEvent{Fd: int32(fd), - Events: syscall.EPOLLIN | syscall.EPOLLOUT, - }, - ); err != nil { - panic(err) - } -} - -// ModDetach ... -func (p *Poll) ModDetach(fd int) { - if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_DEL, fd, - &syscall.EpollEvent{Fd: int32(fd), - Events: syscall.EPOLLIN | syscall.EPOLLOUT, - }, - ); err != nil { - panic(err) - } -} diff --git a/mixer/evio/evio/internal/internal_openbsd.go b/mixer/evio/evio/internal/internal_openbsd.go deleted file mode 100644 index 8ea7edbd..00000000 --- a/mixer/evio/evio/internal/internal_openbsd.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright 2017 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package internal - -// SetKeepAlive sets the keepalive for the connection -func SetKeepAlive(fd, secs int) error { - // OpenBSD has no user-settable per-socket TCP keepalive options. - return nil -} diff --git a/mixer/evio/evio/internal/internal_unix.go b/mixer/evio/evio/internal/internal_unix.go deleted file mode 100644 index ddd386a6..00000000 --- a/mixer/evio/evio/internal/internal_unix.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2017 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -// +build netbsd freebsd dragonfly linux - -package internal - -import "syscall" - -func SetKeepAlive(fd, secs int) error { - if err := syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1); err != nil { - return err - } - if err := syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, secs); err != nil { - return err - } - return syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, secs) -} diff --git a/mixer/evio/evio/internal/notequeue.go b/mixer/evio/evio/internal/notequeue.go deleted file mode 100644 index 29cf2bc7..00000000 --- a/mixer/evio/evio/internal/notequeue.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2018 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package internal - -import ( - "runtime" - "sync/atomic" -) - -// this is a good candiate for a lock-free structure. - -type spinlock struct{ lock uintptr } - -func (l *spinlock) Lock() { - for !atomic.CompareAndSwapUintptr(&l.lock, 0, 1) { - runtime.Gosched() - } -} -func (l *spinlock) Unlock() { - atomic.StoreUintptr(&l.lock, 0) -} - -type noteQueue struct { - mu spinlock - notes []interface{} -} - -func (q *noteQueue) Add(note interface{}) (one bool) { - q.mu.Lock() - q.notes = append(q.notes, note) - n := len(q.notes) - q.mu.Unlock() - return n == 1 -} - -func (q *noteQueue) ForEach(iter func(note interface{}) error) error { - q.mu.Lock() - if len(q.notes) == 0 { - q.mu.Unlock() - return nil - } - notes := q.notes - q.notes = nil - q.mu.Unlock() - for _, note := range notes { - if err := iter(note); err != nil { - return err - } - } - return nil -} diff --git a/mixer/evio/evio/internal/socktoaddr.go b/mixer/evio/evio/internal/socktoaddr.go deleted file mode 100644 index 9e9425c6..00000000 --- a/mixer/evio/evio/internal/socktoaddr.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2018 Joshua J Baker. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -package internal - -import ( - "net" - "syscall" -) - -// SockaddrToAddr returns a go/net friendly address -func SockaddrToAddr(sa syscall.Sockaddr) net.Addr { - var a net.Addr - switch sa := sa.(type) { - case *syscall.SockaddrInet4: - a = &net.TCPAddr{ - IP: append([]byte{}, sa.Addr[:]...), - Port: sa.Port, - } - case *syscall.SockaddrInet6: - var zone string - if sa.ZoneId != 0 { - if ifi, err := net.InterfaceByIndex(int(sa.ZoneId)); err == nil { - zone = ifi.Name - } - } - if zone == "" && sa.ZoneId != 0 { - } - a = &net.TCPAddr{ - IP: append([]byte{}, sa.Addr[:]...), - Port: sa.Port, - Zone: zone, - } - case *syscall.SockaddrUnix: - a = &net.UnixAddr{Net: "unix", Name: sa.Name} - } - return a -} diff --git a/mixer/evio/evio/vendor/.stub b/mixer/evio/evio/vendor/.stub deleted file mode 100644 index a25c7f48..00000000 --- a/mixer/evio/evio/vendor/.stub +++ /dev/null @@ -1 +0,0 @@ -// DO NOT REMOVE \ No newline at end of file diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/LICENSE b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/LICENSE deleted file mode 100644 index 5f25159a..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2014 Max Riveiro - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/Makefile b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/Makefile deleted file mode 100644 index 4aa3d2b6..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/Makefile +++ /dev/null @@ -1,9 +0,0 @@ -lint: - @gometalinter \ - --disable=errcheck \ - --disable=dupl \ - --min-const-length=5 \ - --min-confidence=0.25 \ - --cyclo-over=20 \ - --enable=unused \ - --deadline=100s diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/README.md b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/README.md deleted file mode 100644 index 9e9726b6..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/README.md +++ /dev/null @@ -1,48 +0,0 @@ -# GO_REUSEPORT - -[![Build Status](https://travis-ci.org/kavu/go_reuseport.png?branch=master)](https://travis-ci.org/kavu/go_reuseport) -[![codecov](https://codecov.io/gh/kavu/go_reuseport/branch/master/graph/badge.svg)](https://codecov.io/gh/kavu/go_reuseport) -[![GoDoc](https://godoc.org/github.com/kavu/go_reuseport?status.png)](https://godoc.org/github.com/kavu/go_reuseport) - -**GO_REUSEPORT** is a little expirement to create a `net.Listener` that supports [SO_REUSEPORT](http://lwn.net/Articles/542629/) socket option. - -For now, Darwin and Linux (from 3.9) systems are supported. I'll be pleased if you'll test other systems and tell me the results. - documentation on [godoc.org](http://godoc.org/github.com/kavu/go_reuseport "go_reuseport documentation"). - -## Example ## - -```go -package main - -import ( - "fmt" - "html" - "net/http" - "os" - "runtime" - "github.com/kavu/go_reuseport" -) - -func main() { - listener, err := reuseport.Listen("tcp", "localhost:8881") - if err != nil { - panic(err) - } - defer listener.Close() - - server := &http.Server{} - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - fmt.Println(os.Getgid()) - fmt.Fprintf(w, "Hello, %q\n", html.EscapeString(r.URL.Path)) - }) - - panic(server.Serve(listener)) -} -``` - -Now you can run several instances of this tiny server without `Address already in use` errors. - -## Thanks - -Inspired by [Artur Siekielski](https://github.com/aartur) [post](http://freeprogrammersblog.vhex.net/post/linux-39-introdued-new-way-of-writing-socket-servers/2) about `SO_REUSEPORT`. - diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport.go b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport.go deleted file mode 100644 index ea4c7c44..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport.go +++ /dev/null @@ -1,50 +0,0 @@ -// +build linux darwin dragonfly freebsd netbsd openbsd - -// Copyright (C) 2017 Max Riveiro -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -// Package reuseport provides a function that returns a net.Listener powered -// by a net.FileListener with a SO_REUSEPORT option set to the socket. -package reuseport - -import ( - "errors" - "fmt" - "net" - "os" - "syscall" -) - -const fileNameTemplate = "reuseport.%d.%s.%s" - -var errUnsupportedProtocol = errors.New("only tcp, tcp4, tcp6, udp, udp4, udp6 are supported") - -// getSockaddr parses protocol and address and returns implementor -// of syscall.Sockaddr: syscall.SockaddrInet4 or syscall.SockaddrInet6. -func getSockaddr(proto, addr string) (sa syscall.Sockaddr, soType int, err error) { - switch proto { - case "tcp", "tcp4", "tcp6": - return getTCPSockaddr(proto, addr) - case "udp", "udp4", "udp6": - return getUDPSockaddr(proto, addr) - default: - return nil, -1, errUnsupportedProtocol - } -} - -func getSocketFileName(proto, addr string) string { - return fmt.Sprintf(fileNameTemplate, os.Getpid(), proto, addr) -} - -// Listen function is an alias for NewReusablePortListener. -func Listen(proto, addr string) (l net.Listener, err error) { - return NewReusablePortListener(proto, addr) -} - -// ListenPacket is an alias for NewReusablePortPacketConn. -func ListenPacket(proto, addr string) (l net.PacketConn, err error) { - return NewReusablePortPacketConn(proto, addr) -} diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_bsd.go b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_bsd.go deleted file mode 100644 index 19000e8d..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_bsd.go +++ /dev/null @@ -1,44 +0,0 @@ -// +build darwin dragonfly freebsd netbsd openbsd - -// Copyright (C) 2017 Ma Weiwei, Max Riveiro -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -package reuseport - -import ( - "runtime" - "syscall" -) - -var reusePort = syscall.SO_REUSEPORT - -func maxListenerBacklog() int { - var ( - n uint32 - err error - ) - - switch runtime.GOOS { - case "darwin", "freebsd": - n, err = syscall.SysctlUint32("kern.ipc.somaxconn") - case "netbsd": - // NOTE: NetBSD has no somaxconn-like kernel state so far - case "openbsd": - n, err = syscall.SysctlUint32("kern.somaxconn") - } - - if n == 0 || err != nil { - return syscall.SOMAXCONN - } - - // FreeBSD stores the backlog in a uint16, as does Linux. - // Assume the other BSDs do too. Truncate number to avoid wrapping. - // See issue 5030. - if n > 1<<16-1 { - n = 1<<16 - 1 - } - return int(n) -} diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_linux.go b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_linux.go deleted file mode 100644 index f6f85a49..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_linux.go +++ /dev/null @@ -1,52 +0,0 @@ -// +build linux - -// Copyright (C) 2017 Ma Weiwei, Max Riveiro -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -package reuseport - -import ( - "bufio" - "os" - "strconv" - "strings" - "syscall" -) - -var reusePort = 0x0F - -func maxListenerBacklog() int { - fd, err := os.Open("/proc/sys/net/core/somaxconn") - if err != nil { - return syscall.SOMAXCONN - } - defer fd.Close() - - rd := bufio.NewReader(fd) - line, err := rd.ReadString('\n') - if err != nil { - return syscall.SOMAXCONN - } - - f := strings.Fields(line) - if len(f) < 1 { - return syscall.SOMAXCONN - } - - n, err := strconv.Atoi(f[0]) - if err != nil || n == 0 { - return syscall.SOMAXCONN - } - - // Linux stores the backlog in a uint16. - // Truncate number to avoid wrapping. - // See issue 5030. - if n > 1<<16-1 { - n = 1<<16 - 1 - } - - return n -} diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_windows.go b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_windows.go deleted file mode 100644 index e1e90df6..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/reuseport_windows.go +++ /dev/null @@ -1,19 +0,0 @@ -// +build windows - -// Copyright (C) 2017 Ma Weiwei, Max Riveiro -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -package reuseport - -import "net" - -func NewReusablePortListener(proto, addr string) (net.Listener, error) { - return net.Listen(proto, addr) -} - -func NewReusablePortPacketConn(proto, addr string) (net.PacketConn, error) { - return net.ListenPacket(proto, addr) -} diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/tcp.go b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/tcp.go deleted file mode 100644 index 76540a15..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/tcp.go +++ /dev/null @@ -1,143 +0,0 @@ -// +build linux darwin dragonfly freebsd netbsd openbsd - -// Copyright (C) 2017 Max Riveiro -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -package reuseport - -import ( - "errors" - "net" - "os" - "syscall" -) - -var ( - listenerBacklogMaxSize = maxListenerBacklog() - errUnsupportedTCPProtocol = errors.New("only tcp, tcp4, tcp6 are supported") -) - -func getTCPSockaddr(proto, addr string) (sa syscall.Sockaddr, soType int, err error) { - var tcp *net.TCPAddr - - tcp, err = net.ResolveTCPAddr(proto, addr) - if err != nil && tcp.IP != nil { - return nil, -1, err - } - - tcpVersion, err := determineTCPProto(proto, tcp) - if err != nil { - return nil, -1, err - } - - switch tcpVersion { - case "tcp": - return &syscall.SockaddrInet4{Port: tcp.Port}, syscall.AF_INET, nil - case "tcp4": - sa := &syscall.SockaddrInet4{Port: tcp.Port} - - if tcp.IP != nil { - copy(sa.Addr[:], tcp.IP[12:16]) // copy last 4 bytes of slice to array - } - - return sa, syscall.AF_INET, nil - case "tcp6": - sa := &syscall.SockaddrInet6{Port: tcp.Port} - - if tcp.IP != nil { - copy(sa.Addr[:], tcp.IP) // copy all bytes of slice to array - } - - if tcp.Zone != "" { - iface, err := net.InterfaceByName(tcp.Zone) - if err != nil { - return nil, -1, err - } - - sa.ZoneId = uint32(iface.Index) - } - - return sa, syscall.AF_INET6, nil - } - - return nil, -1, errUnsupportedProtocol -} - -func determineTCPProto(proto string, ip *net.TCPAddr) (string, error) { - // If the protocol is set to "tcp", we try to determine the actual protocol - // version from the size of the resolved IP address. Otherwise, we simple use - // the protcol given to us by the caller. - - if ip.IP.To4() != nil { - return "tcp4", nil - } - - if ip.IP.To16() != nil { - return "tcp6", nil - } - - switch proto { - case "tcp", "tcp4", "tcp6": - return proto, nil - } - - return "", errUnsupportedTCPProtocol -} - -// NewReusablePortListener returns net.FileListener that created from -// a file discriptor for a socket with SO_REUSEPORT option. -func NewReusablePortListener(proto, addr string) (l net.Listener, err error) { - var ( - soType, fd int - file *os.File - sockaddr syscall.Sockaddr - ) - - if sockaddr, soType, err = getSockaddr(proto, addr); err != nil { - return nil, err - } - - syscall.ForkLock.RLock() - if fd, err = syscall.Socket(soType, syscall.SOCK_STREAM, syscall.IPPROTO_TCP); err != nil { - syscall.ForkLock.RUnlock() - - return nil, err - } - syscall.ForkLock.RUnlock() - - if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil { - syscall.Close(fd) - return nil, err - } - - if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, reusePort, 1); err != nil { - syscall.Close(fd) - return nil, err - } - - if err = syscall.Bind(fd, sockaddr); err != nil { - syscall.Close(fd) - return nil, err - } - - // Set backlog size to the maximum - if err = syscall.Listen(fd, listenerBacklogMaxSize); err != nil { - syscall.Close(fd) - return nil, err - } - - file = os.NewFile(uintptr(fd), getSocketFileName(proto, addr)) - if l, err = net.FileListener(file); err != nil { - file.Close() - return nil, err - } - - if err = file.Close(); err != nil { - return nil, err - } - - return l, err -} diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/tcp_test.go b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/tcp_test.go deleted file mode 100644 index 1620f9d3..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/tcp_test.go +++ /dev/null @@ -1,218 +0,0 @@ -// +build linux darwin dragonfly freebsd netbsd openbsd - -// Copyright (C) 2017 Max Riveiro -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -package reuseport - -import ( - "fmt" - "html" - "io/ioutil" - "net/http" - "net/http/httptest" - "os" - "testing" -) - -const ( - httpServerOneResponse = "1" - httpServerTwoResponse = "2" -) - -var ( - httpServerOne = NewHTTPServer(httpServerOneResponse) - httpServerTwo = NewHTTPServer(httpServerTwoResponse) -) - -func NewHTTPServer(resp string) *httptest.Server { - return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprint(w, resp) - })) -} -func TestNewReusablePortListener(t *testing.T) { - listenerOne, err := NewReusablePortListener("tcp4", "localhost:10081") - if err != nil { - t.Error(err) - } - defer listenerOne.Close() - - listenerTwo, err := NewReusablePortListener("tcp", "127.0.0.1:10081") - if err != nil { - t.Error(err) - } - defer listenerTwo.Close() - - listenerThree, err := NewReusablePortListener("tcp6", "[::1]:10081") - if err != nil { - t.Error(err) - } - defer listenerThree.Close() - - listenerFour, err := NewReusablePortListener("tcp6", ":10081") - if err != nil { - t.Error(err) - } - defer listenerFour.Close() - - listenerFive, err := NewReusablePortListener("tcp4", ":10081") - if err != nil { - t.Error(err) - } - defer listenerFive.Close() - - listenerSix, err := NewReusablePortListener("tcp", ":10081") - if err != nil { - t.Error(err) - } - defer listenerSix.Close() -} - -func TestListen(t *testing.T) { - listenerOne, err := Listen("tcp4", "localhost:10081") - if err != nil { - t.Error(err) - } - defer listenerOne.Close() - - listenerTwo, err := Listen("tcp", "127.0.0.1:10081") - if err != nil { - t.Error(err) - } - defer listenerTwo.Close() - - listenerThree, err := Listen("tcp6", "[::1]:10081") - if err != nil { - t.Error(err) - } - defer listenerThree.Close() - - listenerFour, err := Listen("tcp6", ":10081") - if err != nil { - t.Error(err) - } - defer listenerFour.Close() - - listenerFive, err := Listen("tcp4", ":10081") - if err != nil { - t.Error(err) - } - defer listenerFive.Close() - - listenerSix, err := Listen("tcp", ":10081") - if err != nil { - t.Error(err) - } - defer listenerSix.Close() -} - -func TestNewReusablePortServers(t *testing.T) { - listenerOne, err := NewReusablePortListener("tcp4", "localhost:10081") - if err != nil { - t.Error(err) - } - defer listenerOne.Close() - - listenerTwo, err := NewReusablePortListener("tcp6", ":10081") - if err != nil { - t.Error(err) - } - defer listenerTwo.Close() - - httpServerOne.Listener = listenerOne - httpServerTwo.Listener = listenerTwo - - httpServerOne.Start() - httpServerTwo.Start() - - // Server One — First Response - resp1, err := http.Get(httpServerOne.URL) - if err != nil { - t.Error(err) - } - body1, err := ioutil.ReadAll(resp1.Body) - resp1.Body.Close() - if err != nil { - t.Error(err) - } - if string(body1) != httpServerOneResponse && string(body1) != httpServerTwoResponse { - t.Errorf("Expected %#v or %#v, got %#v.", httpServerOneResponse, httpServerTwoResponse, string(body1)) - } - - // Server Two — First Response - resp2, err := http.Get(httpServerTwo.URL) - if err != nil { - t.Error(err) - } - body2, err := ioutil.ReadAll(resp2.Body) - resp1.Body.Close() - if err != nil { - t.Error(err) - } - if string(body2) != httpServerOneResponse && string(body2) != httpServerTwoResponse { - t.Errorf("Expected %#v or %#v, got %#v.", httpServerOneResponse, httpServerTwoResponse, string(body2)) - } - - httpServerTwo.Close() - - // Server One — Second Response - resp3, err := http.Get(httpServerOne.URL) - if err != nil { - t.Error(err) - } - body3, err := ioutil.ReadAll(resp3.Body) - resp1.Body.Close() - if err != nil { - t.Error(err) - } - if string(body3) != httpServerOneResponse { - t.Errorf("Expected %#v, got %#v.", httpServerOneResponse, string(body3)) - } - - // Server One — Third Response - resp5, err := http.Get(httpServerOne.URL) - if err != nil { - t.Error(err) - } - body5, err := ioutil.ReadAll(resp5.Body) - resp1.Body.Close() - if err != nil { - t.Error(err) - } - if string(body5) != httpServerOneResponse { - t.Errorf("Expected %#v, got %#v.", httpServerOneResponse, string(body5)) - } - - httpServerOne.Close() -} - -func BenchmarkNewReusablePortListener(b *testing.B) { - for i := 0; i < b.N; i++ { - listener, err := NewReusablePortListener("tcp", ":10081") - - if err != nil { - b.Error(err) - } else { - listener.Close() - } - } -} - -func ExampleNewReusablePortListener() { - listener, err := NewReusablePortListener("tcp", ":8881") - if err != nil { - panic(err) - } - defer listener.Close() - - server := &http.Server{} - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - fmt.Println(os.Getgid()) - fmt.Fprintf(w, "Hello, %q\n", html.EscapeString(r.URL.Path)) - }) - - panic(server.Serve(listener)) -} diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/test.bash b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/test.bash deleted file mode 100755 index a57c012a..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/test.bash +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash - -set -e - -# Thanks to IPFS team -if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then - if [[ "$TRAVIS_SUDO" == true ]]; then - # Ensure that IPv6 is enabled. - # While this is unsupported by TravisCI, it still works for localhost. - sudo sysctl -w net.ipv6.conf.lo.disable_ipv6=0 - sudo sysctl -w net.ipv6.conf.default.disable_ipv6=0 - sudo sysctl -w net.ipv6.conf.all.disable_ipv6=0 - fi -else - # OSX has a default file limit of 256, for some tests we need a - # maximum of 8192. - ulimit -Sn 8192 -fi - -go test -v -cover ./... -go test -v -cover -race ./... -coverprofile=coverage.txt -covermode=atomic -go test -v -cover -race -benchmem -benchtime=5s -bench=. \ No newline at end of file diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/udp.go b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/udp.go deleted file mode 100644 index 9b30c1b2..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/udp.go +++ /dev/null @@ -1,139 +0,0 @@ -// +build linux darwin dragonfly freebsd netbsd openbsd - -// Copyright (C) 2017 Max Riveiro -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -package reuseport - -import ( - "errors" - "net" - "os" - "syscall" -) - -var errUnsupportedUDPProtocol = errors.New("only udp, udp4, udp6 are supported") - -func getUDPSockaddr(proto, addr string) (sa syscall.Sockaddr, soType int, err error) { - var udp *net.UDPAddr - - udp, err = net.ResolveUDPAddr(proto, addr) - if err != nil && udp.IP != nil { - return nil, -1, err - } - - udpVersion, err := determineUDPProto(proto, udp) - if err != nil { - return nil, -1, err - } - - switch udpVersion { - case "udp": - return &syscall.SockaddrInet4{Port: udp.Port}, syscall.AF_INET, nil - case "udp4": - sa := &syscall.SockaddrInet4{Port: udp.Port} - - if udp.IP != nil { - copy(sa.Addr[:], udp.IP[12:16]) // copy last 4 bytes of slice to array - } - - return sa, syscall.AF_INET, nil - case "udp6": - sa := &syscall.SockaddrInet6{Port: udp.Port} - - if udp.IP != nil { - copy(sa.Addr[:], udp.IP) // copy all bytes of slice to array - } - - if udp.Zone != "" { - iface, err := net.InterfaceByName(udp.Zone) - if err != nil { - return nil, -1, err - } - - sa.ZoneId = uint32(iface.Index) - } - - return sa, syscall.AF_INET6, nil - } - - return nil, -1, errUnsupportedProtocol -} - -func determineUDPProto(proto string, ip *net.UDPAddr) (string, error) { - // If the protocol is set to "udp", we try to determine the actual protocol - // version from the size of the resolved IP address. Otherwise, we simple use - // the protcol given to us by the caller. - - if ip.IP.To4() != nil { - return "udp4", nil - } - - if ip.IP.To16() != nil { - return "udp6", nil - } - - switch proto { - case "udp", "udp4", "udp6": - return proto, nil - } - - return "", errUnsupportedUDPProtocol -} - -// NewReusablePortPacketConn returns net.FilePacketConn that created from -// a file discriptor for a socket with SO_REUSEPORT option. -func NewReusablePortPacketConn(proto, addr string) (l net.PacketConn, err error) { - var ( - soType, fd int - file *os.File - sockaddr syscall.Sockaddr - ) - - if sockaddr, soType, err = getSockaddr(proto, addr); err != nil { - return nil, err - } - - syscall.ForkLock.RLock() - fd, err = syscall.Socket(soType, syscall.SOCK_DGRAM, syscall.IPPROTO_UDP) - if err == nil { - syscall.CloseOnExec(fd) - } - syscall.ForkLock.RUnlock() - if err != nil { - syscall.Close(fd) - return nil, err - } - - defer func() { - if err != nil { - syscall.Close(fd) - } - }() - - if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil { - return nil, err - } - - if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, reusePort, 1); err != nil { - return nil, err - } - - if err = syscall.Bind(fd, sockaddr); err != nil { - return nil, err - } - - file = os.NewFile(uintptr(fd), getSocketFileName(proto, addr)) - if l, err = net.FilePacketConn(file); err != nil { - return nil, err - } - - if err = file.Close(); err != nil { - return nil, err - } - - return l, err -} diff --git a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/udp_test.go b/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/udp_test.go deleted file mode 100644 index d6550e36..00000000 --- a/mixer/evio/evio/vendor/github.com/kavu/go_reuseport/udp_test.go +++ /dev/null @@ -1,99 +0,0 @@ -// +build linux darwin dragonfly freebsd netbsd openbsd - -// Copyright (C) 2017 Max Riveiro -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -package reuseport - -import "testing" - -func TestNewReusablePortPacketConn(t *testing.T) { - listenerOne, err := NewReusablePortPacketConn("udp4", "localhost:10082") - if err != nil { - t.Error(err) - } - defer listenerOne.Close() - - listenerTwo, err := NewReusablePortPacketConn("udp", "127.0.0.1:10082") - if err != nil { - t.Error(err) - } - defer listenerTwo.Close() - - listenerThree, err := NewReusablePortPacketConn("udp6", "[::1]:10082") - if err != nil { - t.Error(err) - } - defer listenerThree.Close() - - listenerFour, err := NewReusablePortListener("udp6", ":10081") - if err != nil { - t.Error(err) - } - defer listenerFour.Close() - - listenerFive, err := NewReusablePortListener("udp4", ":10081") - if err != nil { - t.Error(err) - } - defer listenerFive.Close() - - listenerSix, err := NewReusablePortListener("udp", ":10081") - if err != nil { - t.Error(err) - } - defer listenerSix.Close() -} - -func TestListenPacket(t *testing.T) { - listenerOne, err := ListenPacket("udp4", "localhost:10082") - if err != nil { - t.Error(err) - } - defer listenerOne.Close() - - listenerTwo, err := ListenPacket("udp", "127.0.0.1:10082") - if err != nil { - t.Error(err) - } - defer listenerTwo.Close() - - listenerThree, err := ListenPacket("udp6", "[::1]:10082") - if err != nil { - t.Error(err) - } - defer listenerThree.Close() - - listenerFour, err := ListenPacket("udp6", ":10081") - if err != nil { - t.Error(err) - } - defer listenerFour.Close() - - listenerFive, err := ListenPacket("udp4", ":10081") - if err != nil { - t.Error(err) - } - defer listenerFive.Close() - - listenerSix, err := ListenPacket("udp", ":10081") - if err != nil { - t.Error(err) - } - defer listenerSix.Close() -} - -func BenchmarkNewReusableUDPPortListener(b *testing.B) { - for i := 0; i < b.N; i++ { - listener, err := NewReusablePortPacketConn("udp4", "localhost:10082") - - if err != nil { - b.Error(err) - } else { - listener.Close() - } - } -} diff --git a/mixer/evio/evio_test.go b/mixer/evio/evio_test.go index 21c8a618..e2e95cb6 100644 --- a/mixer/evio/evio_test.go +++ b/mixer/evio/evio_test.go @@ -6,6 +6,7 @@ import ( "github.com/henrylee2cn/erpc/v6" "github.com/henrylee2cn/erpc/v6/mixer/evio" + "github.com/henrylee2cn/erpc/v6/proto/jsonproto" ) func Test(t *testing.T) { @@ -14,7 +15,7 @@ func Test(t *testing.T) { // use TLS srv.SetTLSConfig(erpc.GenerateTLSConfigForServer()) srv.RouteCall(new(Home)) - go srv.ListenAndServe() + go srv.ListenAndServe(jsonproto.NewJSONProtoFunc()) time.Sleep(1e9) // client @@ -22,7 +23,7 @@ func Test(t *testing.T) { // use TLS cli.SetTLSConfig(erpc.GenerateTLSConfigForClient()) cli.RoutePush(new(Push)) - sess, stat := cli.Dial(":9090") + sess, stat := cli.Dial(":9090", jsonproto.NewJSONProtoFunc()) if !stat.OK() { t.Fatal(stat) }