Skip to content

Commit

Permalink
Optimize the keep-alive mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Jianhui Zhao <jianhuizhao329@gmail.com>
  • Loading branch information
Jianhui Zhao committed Sep 9, 2018
1 parent 2123c9d commit ca015ac
Showing 1 changed file with 38 additions and 17 deletions.
55 changes: 38 additions & 17 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (
)

const (
// Time allowed to read the next pong message from the peer.
pongWait = 3 * time.Second

// pings to peer with this period.
// pings period of client.
pingPeriod = 5 * time.Second

// Max lose ping times
aliveTimes = 3
)

var upgrader = websocket.Upgrader{
Expand Down Expand Up @@ -69,6 +69,8 @@ type Client struct {
mutex sync.Mutex
isClosed bool
closeChan chan byte

alive uint32
}

func (c *Client) wsClose() {
Expand Down Expand Up @@ -101,12 +103,6 @@ func (c *Client) readPump() {
c.leave()
}()

c.conn.SetPongHandler(func(string) error {
/* Set not time out */
c.conn.SetReadDeadline(time.Time{});
return nil
})

for {
msgType, data, err := c.conn.ReadMessage()
if err != nil {
Expand All @@ -132,9 +128,7 @@ func (c *Client) readPump() {
}

func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.leave()
}()

Expand All @@ -144,17 +138,43 @@ func (c *Client) writePump() {
if err := c.conn.WriteMessage(msg.msgType, msg.data); err != nil {
return
}
case <- ticker.C:
c.conn.SetReadDeadline(time.Now().Add(pongWait));
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
case <- c.closeChan:
return
}
}
}

func (c *Client) keepAlive() {
ticker := time.NewTicker(pingPeriod)
alive := aliveTimes

defer func() {
c.leave()
}()

// Get the current ping handler
pingHandler := c.conn.PingHandler()

c.conn.SetPingHandler(func(appData string) error {
rlog.Printf("Recv ping\n")
alive = aliveTimes
return pingHandler(appData)
})

for {
select {
case <- c.closeChan:
return
case <- ticker.C:
alive--
rlog.Printf("alive: %d\n", alive)
if alive == 0 {
return
}
}
}
}

/* serveWs handles websocket requests from the peer. */
func serveWs(br *Broker, w http.ResponseWriter, r *http.Request) {
devid := r.URL.Query().Get("devid")
Expand Down Expand Up @@ -190,4 +210,5 @@ func serveWs(br *Broker, w http.ResponseWriter, r *http.Request) {

go client.readPump()
go client.writePump()
go client.keepAlive()
}

0 comments on commit ca015ac

Please sign in to comment.