Skip to content

Commit

Permalink
refactor: rename channel fields and fix example
Browse files Browse the repository at this point in the history
  • Loading branch information
DarthPestilane committed Jan 30, 2024
1 parent 7a2db09 commit 08ad37b
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 75 deletions.
4 changes: 2 additions & 2 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Benchmark_NoHandler(b *testing.B) {
go s.Run("127.0.0.1:0") // nolint
defer s.Stop() // nolint

<-s.accepting
<-s.acceptingC

// client
client, err := net.Dial("tcp", s.Listener.Addr().String())
Expand All @@ -46,7 +46,7 @@ func Benchmark_OneHandler(b *testing.B) {
go s.Run("127.0.0.1:0") // nolint
defer s.Stop() // nolint

<-s.accepting
<-s.acceptingC

// client
client, err := net.Dial("tcp", s.Listener.Addr().String())
Expand Down
2 changes: 1 addition & 1 deletion internal/examples/tcp/custom_packet/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func init() {
}

func main() {
easytcp._log = log
easytcp.SetLogger(log)

s := easytcp.NewServer(&easytcp.ServerOption{
// specify codec and packer
Expand Down
4 changes: 2 additions & 2 deletions router_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func Test_routeContext_Send(t *testing.T) {
ctx := newTestContext(sess, nil)
ctx.SetResponseMessage(NewMessage(1, []byte("test")))
go ctx.Send()
ctx2 := <-sess.respQueue
ctx2 := <-sess.respStream
assert.Equal(t, ctx, ctx2)
})
}
Expand All @@ -161,7 +161,7 @@ func Test_routeContext_SendTo(t *testing.T) {
ctx := newTestContext(sess1, nil)
ctx.SetResponseMessage(NewMessage(1, []byte("test")))
go ctx.SendTo(sess2)
ctx2 := <-sess2.respQueue
ctx2 := <-sess2.respStream
assert.Equal(t, ctx, ctx2)
})
}
Expand Down
22 changes: 11 additions & 11 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type Server struct {
respQueueSize int
router *Router
printRoutes bool
accepting chan struct{}
stopped chan struct{}
acceptingC chan struct{}
stoppedC chan struct{}
asyncRouter bool
}

Expand Down Expand Up @@ -79,8 +79,8 @@ func NewServer(opt *ServerOption) *Server {
Codec: opt.Codec,
printRoutes: !opt.DoNotPrintRoutes,
router: newRouter(),
accepting: make(chan struct{}),
stopped: make(chan struct{}),
acceptingC: make(chan struct{}),
stoppedC: make(chan struct{}),
asyncRouter: opt.AsyncRouter,
}
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (s *Server) RunTLS(addr string, config *tls.Config) error {
// acceptLoop accepts TCP connections in a loop, and handle connections in goroutines.
// Returns error when error occurred.
func (s *Server) acceptLoop() error {
close(s.accepting)
close(s.acceptingC)
for {
if s.isStopped() {
_log.Tracef("server accept loop stopped")
Expand Down Expand Up @@ -171,25 +171,25 @@ func (s *Server) handleConn(conn net.Conn) {
if s.OnSessionCreate != nil {
s.OnSessionCreate(sess)
}
close(sess.afterCreateHook)
close(sess.afterCreateHookC)

go sess.readInbound(s.router, s.readTimeout) // start reading message packet from connection.
go sess.writeOutbound(s.writeTimeout) // start writing message packet to connection.

select {
case <-sess.closed: // wait for session finished.
case <-s.stopped: // or the server is stopped.
case <-sess.closedC: // wait for session finished.
case <-s.stoppedC: // or the server is stopped.
}

if s.OnSessionClose != nil {
s.OnSessionClose(sess)
}
close(sess.afterCloseHook)
close(sess.afterCloseHookC)
}

// Stop stops server. Closing Listener and all connections.
func (s *Server) Stop() error {
close(s.stopped)
close(s.stoppedC)
return s.Listener.Close()
}

Expand All @@ -210,7 +210,7 @@ func (s *Server) NotFoundHandler(handler HandlerFunc) {

func (s *Server) isStopped() bool {
select {
case <-s.stopped:
case <-s.stoppedC:
return true
default:
return false
Expand Down
22 changes: 11 additions & 11 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func TestNewServer(t *testing.T) {
Codec: &JsonCodec{},
RespQueueSize: -1,
})
assert.NotNil(t, s.accepting)
assert.NotNil(t, s.acceptingC)
assert.IsType(t, s.Packer, NewDefaultPacker())
assert.Equal(t, s.Codec, &JsonCodec{})
assert.Equal(t, s.respQueueSize, DefaultRespQueueSize)
assert.NotNil(t, s.accepting)
assert.NotNil(t, s.stopped)
assert.NotNil(t, s.acceptingC)
assert.NotNil(t, s.stoppedC)
}

func TestServer_Serve(t *testing.T) {
Expand All @@ -34,7 +34,7 @@ func TestServer_Serve(t *testing.T) {
assert.ErrorIs(t, server.Serve(lis), ErrServerStopped)
close(done)
}()
<-server.accepting
<-server.acceptingC
time.Sleep(time.Millisecond * 5)
err = server.Stop()
assert.NoError(t, err)
Expand All @@ -48,7 +48,7 @@ func TestServer_Run(t *testing.T) {
assert.ErrorIs(t, server.Run("localhost:0"), ErrServerStopped)
close(done)
}()
<-server.accepting
<-server.acceptingC
time.Sleep(time.Millisecond * 5)
err := server.Stop()
assert.NoError(t, err)
Expand All @@ -69,7 +69,7 @@ func TestServer_RunTLS(t *testing.T) {
assert.ErrorIs(t, server.RunTLS("localhost:0", cfg), ErrServerStopped)
close(done)
}()
<-server.accepting
<-server.acceptingC
time.Sleep(time.Millisecond * 5)
err = server.Stop()
assert.NoError(t, err)
Expand All @@ -92,7 +92,7 @@ func TestServer_acceptLoop(t *testing.T) {
assert.Error(t, err)
}()

<-server.accepting
<-server.acceptingC

// client
cli, err := net.Dial("tcp", lis.Addr().String())
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestServer_Stop(t *testing.T) {
assert.Equal(t, err, ErrServerStopped)
}()

<-server.accepting
<-server.acceptingC

// client
cli, err := net.Dial("tcp", server.Listener.Addr().String())
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestServer_handleConn(t *testing.T) {
}()
defer func() { assert.NoError(t, server.Stop()) }()

<-server.accepting
<-server.acceptingC

// client
cli, err := net.Dial("tcp", server.Listener.Addr().String())
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestServer_NotFoundHandler(t *testing.T) {
assert.Equal(t, err, ErrServerStopped)
}()

<-server.accepting
<-server.acceptingC

// client
cli, err := net.Dial("tcp", server.Listener.Addr().String())
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestServer_SessionHooks(t *testing.T) {
}()
defer func() { assert.NoError(t, server.Stop()) }()

<-server.accepting
<-server.acceptingC

// client
cli, err := net.Dial("tcp", server.Listener.Addr().String())
Expand Down
64 changes: 32 additions & 32 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Session interface {
// SetID sets current session's id.
SetID(id interface{})

// Send sends the ctx to the respQueue.
// Send sends the ctx to the respStream.
Send(ctx Context) bool

// Codec returns the codec, can be nil.
Expand All @@ -40,17 +40,17 @@ type Session interface {
}

type session struct {
id interface{} // session's ID.
conn net.Conn // tcp connection
closed chan struct{} // to close()
afterCreateHook chan struct{} // to close after session's on-create hook triggered
afterCloseHook chan struct{} // to close after session's on-close hook triggered
closeOnce sync.Once // ensure one session only close once
respQueue chan Context // response queue channel, pushed in Send() and popped in writeOutbound()
packer Packer // to pack and unpack message
codec Codec // encode/decode message data
ctxPool sync.Pool // router context pool
asyncRouter bool // calls router HandlerFunc in a goroutine if false
id interface{} // session's ID.
conn net.Conn // tcp connection
closedC chan struct{} // to close when read/write loop stopped
closeOnce sync.Once // ensure one session only close once
afterCreateHookC chan struct{} // to close after session's on-create hook triggered
afterCloseHookC chan struct{} // to close after session's on-close hook triggered
respStream chan Context // response queue channel, pushed in Send() and popped in writeOutbound()
packer Packer // to pack and unpack message
codec Codec // encode/decode message data
ctxPool sync.Pool // router context pool
asyncRouter bool // calls router HandlerFunc in a goroutine if false
}

// sessionOption is the extra options for session.
Expand All @@ -67,16 +67,16 @@ type sessionOption struct {
// Returns a session pointer.
func newSession(conn net.Conn, opt *sessionOption) *session {
return &session{
id: uuid.NewString(), // use uuid as default
conn: conn,
closed: make(chan struct{}),
afterCreateHook: make(chan struct{}),
afterCloseHook: make(chan struct{}),
respQueue: make(chan Context, opt.respQueueSize),
packer: opt.Packer,
codec: opt.Codec,
ctxPool: sync.Pool{New: func() interface{} { return newContext() }},
asyncRouter: opt.asyncRouter,
id: uuid.NewString(), // use uuid as default
conn: conn,
closedC: make(chan struct{}),
afterCreateHookC: make(chan struct{}),
afterCloseHookC: make(chan struct{}),
respStream: make(chan Context, opt.respQueueSize),
packer: opt.Packer,
codec: opt.Codec,
ctxPool: sync.Pool{New: func() interface{} { return newContext() }},
asyncRouter: opt.asyncRouter,
}
}

Expand All @@ -91,15 +91,15 @@ func (s *session) SetID(id interface{}) {
s.id = id
}

// Send pushes response message to respQueue.
// Send pushes response message to respStream.
// Returns false if session is closed or ctx is done.
func (s *session) Send(ctx Context) (ok bool) {
select {
case <-ctx.Done():
return false
case <-s.closed:
case <-s.closedC:
return false
case s.respQueue <- ctx:
case s.respStream <- ctx:
return true
}
}
Expand All @@ -112,17 +112,17 @@ func (s *session) Codec() Codec {
// Close closes the session, but doesn't close the connection.
// The connection will be closed in the server once the session's closed.
func (s *session) Close() {
s.closeOnce.Do(func() { close(s.closed) })
s.closeOnce.Do(func() { close(s.closedC) })
}

// AfterCreateHook blocks until session's on-create hook triggered.
func (s *session) AfterCreateHook() <-chan struct{} {
return s.afterCreateHook
return s.afterCreateHookC
}

// AfterCloseHook blocks until session's on-close hook triggered.
func (s *session) AfterCloseHook() <-chan struct{} {
return s.afterCloseHook
return s.afterCloseHookC
}

// AllocateContext gets a Context from pool and reset all but session.
Expand All @@ -144,7 +144,7 @@ func (s *session) Conn() net.Conn {
func (s *session) readInbound(router *Router, timeout time.Duration) {
for {
select {
case <-s.closed:
case <-s.closedC:
return
default:
}
Expand Down Expand Up @@ -184,16 +184,16 @@ func (s *session) handleReq(router *Router, reqMsg *Message) {
s.Send(ctx)
}

// writeOutbound fetches message from respQueue channel and writes to TCP connection in a loop.
// writeOutbound fetches message from respStream channel and writes to TCP connection in a loop.
// Parameter writeTimeout specified the connection writing timeout.
// The loop breaks if errors occurred, or the session is closed.
func (s *session) writeOutbound(writeTimeout time.Duration) {
for {
var ctx Context
select {
case <-s.closed:
case <-s.closedC:
return
case ctx = <-s.respQueue:
case ctx = <-s.respStream:
}

outboundBytes, err := s.packResponse(ctx)
Expand Down
Loading

0 comments on commit 08ad37b

Please sign in to comment.