Skip to content

Commit

Permalink
chore(evio): update
Browse files Browse the repository at this point in the history
Change-Id: I8bf144955406810b25141e277996011da154e4af
  • Loading branch information
andeya committed Apr 15, 2020
1 parent 34068e0 commit fd60331
Show file tree
Hide file tree
Showing 33 changed files with 44 additions and 3,275 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ examples/ab/frame_client_ab
_*
.DS_Store
go.sum
.idea
.vscode
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/lucas-clemente/quic-go v0.13.1
github.com/montanaflynn/stats v0.5.0
github.com/stretchr/testify v1.4.0
github.com/tidwall/evio v1.0.7
github.com/tidwall/gjson v1.2.2
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd
)
Expand Down
File renamed without changes.
File renamed without changes.
72 changes: 38 additions & 34 deletions mixer/evio/evio.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package evio

import (
"bytes"
"crypto/tls"
"errors"
"fmt"
Expand All @@ -10,9 +9,9 @@ import (
"sync"
"time"

"github.com/tidwall/evio"

"github.com/henrylee2cn/erpc/v6"
"github.com/henrylee2cn/erpc/v6/mixer/evio/evio"
"github.com/henrylee2cn/erpc/v6/utils"
)

// NewClient creates a evio client, equivalent to erpc.NewPeer.
Expand All @@ -28,6 +27,7 @@ type Server struct {
addr string
readBufferSize int
writeBufferSize int
protoFuncs []erpc.ProtoFunc
}

// NewServer creates a evio server.
Expand All @@ -41,10 +41,6 @@ func NewServer(loops int, cfg erpc.PeerConfig, globalLeftPlugin ...erpc.Plugin)
}

srv.events.NumLoops = loops
// const delayDuration = time.Millisecond * 50
// srv.events.Tick = func() (delay time.Duration, action evio.Action) {
// return delayDuration, action
// }

srv.events.Serving = func(s evio.Server) (action evio.Action) {
erpc.Printf("listen and serve (%s)", srv.addr)
Expand Down Expand Up @@ -73,16 +69,16 @@ func NewServer(loops int, cfg erpc.PeerConfig, globalLeftPlugin ...erpc.Plugin)
}

srv.events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
// defer func() {
// if p := recover(); p != nil {
// erpc.Errorf("[evio] Events.Data: %v", p)
// }
// }()
con := c.Context().(*conn)
if in != nil {
buf := utils.AcquireByteBuffer()
buf.Write(in)
con.in <- buf
data := <-con.in
n := copy(data.b, in)
in = in[n:]
dst := make([]byte, len(in))
copy(dst, in)
con.remainingIn = dst
data.n <- n
close(data.n)
}
select {
case out = <-con.out:
Expand All @@ -94,7 +90,6 @@ func NewServer(loops int, cfg erpc.PeerConfig, globalLeftPlugin ...erpc.Plugin)
select {
case <-con.closeSignal:
action = evio.Close
close(con.in)
default:
}
return
Expand All @@ -106,7 +101,7 @@ func NewServer(loops int, cfg erpc.PeerConfig, globalLeftPlugin ...erpc.Plugin)
func (srv *Server) ListenAndServe(protoFunc ...erpc.ProtoFunc) error {
switch srv.cfg.Network {
default:
return errors.New("Unsupport evio network, refer to the following: tcp, tcp4, tcp6, unix")
return errors.New("unsupport evio network, refer to the following: tcp, tcp4, tcp6, unix")
case "tcp", "tcp4", "tcp6", "unix":
}
var isDefault bool
Expand All @@ -118,6 +113,7 @@ func (srv *Server) ListenAndServe(protoFunc ...erpc.ProtoFunc) error {
if isDefault {
srv.writeBufferSize = 4096
}
srv.protoFuncs = protoFunc
return evio.Serve(srv.events, srv.addr)
}

Expand All @@ -126,52 +122,60 @@ func (srv *Server) serveConn(evioConn evio.Conn) (stat *erpc.Status) {
conn: evioConn,
events: srv.events,
closeSignal: make(chan struct{}),
inBuf: bytes.NewBuffer(make([]byte, 0, srv.readBufferSize)),
in: make(chan *utils.ByteBuffer, srv.readBufferSize/128),
remainingIn: []byte{},
in: make(chan readData, 1),
out: make(chan []byte, srv.writeBufferSize/128),
}
if srv.TLSConfig() != nil {
c.sess, stat = srv.Peer.ServeConn(tls.Server(c, srv.TLSConfig()))
c.sess, stat = srv.Peer.ServeConn(tls.Server(c, srv.TLSConfig()), srv.protoFuncs...)
} else {
c.sess, stat = srv.Peer.ServeConn(c)
c.sess, stat = srv.Peer.ServeConn(c, srv.protoFuncs...)
}
// c.sess.Swap().Store(wakeWriteKey, c)
evioConn.SetContext(c)
return stat
}

// conn is a evio(evio) network connection.
// conn is a evio network connection.
//
// Multiple goroutines may invoke methods on a Conn simultaneously.
type conn struct {
conn evio.Conn
events evio.Events
sess erpc.Session
inBuf *bytes.Buffer
in chan *utils.ByteBuffer
remainingIn []byte
in chan readData
inLock sync.Mutex
out chan []byte
closeSignal chan struct{}
}

var _ net.Conn = new(conn)

type readData struct {
b []byte
n chan int
}

// Read reads data from the connection.
// Read can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetReadDeadline.
func (c *conn) Read(b []byte) (n int, err error) {
n, err = c.inBuf.Read(b)
if err == nil {
return n, nil
}
buf, ok := <-c.in
if !ok {
select {
case <-c.closeSignal:
defer func() { recover() }()
close(c.in)
return n, io.EOF
default:
}
n = copy(b, c.remainingIn)
c.remainingIn = c.remainingIn[n:]
if n > 0 || len(b) == 0 {
return n, nil
}
n2 := copy(b[n:], buf.B)
c.inBuf.Write(buf.B[n2:])
utils.ReleaseByteBuffer(buf)
n += n2
ch := make(chan int)
c.in <- readData{b, ch}
n = <-ch
return n, nil
}

Expand Down
20 changes: 0 additions & 20 deletions mixer/evio/evio/LICENSE

This file was deleted.

168 changes: 0 additions & 168 deletions mixer/evio/evio/README.md

This file was deleted.

Loading

0 comments on commit fd60331

Please sign in to comment.