Skip to content
This repository has been archived by the owner on Aug 27, 2020. It is now read-only.

Commit

Permalink
fix mux websocket receive message
Browse files Browse the repository at this point in the history
  • Loading branch information
lzjluzijie committed Dec 3, 2018
1 parent c68236d commit de17a1d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 13 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (client *WebSocksClient) HandleConn(conn *net.TCPConn) {
}

//debug log
log.Printf("created new mux conn: %x", muxConn.ID)
log.Printf("created new mux conn: %x %s", muxConn.ID, host)

muxConn.Run(conn)
return
Expand Down
7 changes: 4 additions & 3 deletions core/mux/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (group *Group) Handle(m *Message) {
//debug log
err := errors.New(fmt.Sprintf("conn does not exist: %x", m.ConnID))
log.Println(err.Error())
log.Println(m)
log.Printf("%X %X %X %d", m.Method, m.ConnID, m.MessageID, m.Length)
return
}

Expand All @@ -76,7 +76,9 @@ func (group *Group) AddConn(conn *Conn) {
}

func (group *Group) DeleteConn(id uint32) {
group.connMapMutex.Lock()
delete(group.connMap, id)
group.connMapMutex.Unlock()
return
}

Expand Down Expand Up @@ -119,12 +121,11 @@ func (group *Group) Listen(muxWS *MuxWebSocket) {
for {
m, err := muxWS.Receive()
if err != nil {
log.Printf(err.Error())
log.Println(err.Error())
return
}

go group.Handle(m)
}
return
}()
}
35 changes: 29 additions & 6 deletions core/mux/websocket.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package mux

import (
"bytes"
"io"
"log"
"sync"

"github.com/lzjluzijie/websocks/core"
Expand All @@ -27,7 +29,10 @@ func (muxWS *MuxWebSocket) Send(m *Message) (err error) {
muxWS.sMutex.Lock()
_, err = io.Copy(muxWS, m)
if err != nil {
//muxWS.Close()
e := muxWS.Close()
if e != nil {
log.Println(e.Error())
}
return
}
muxWS.sMutex.Unlock()
Expand All @@ -39,26 +44,44 @@ func (muxWS *MuxWebSocket) Send(m *Message) (err error) {

func (muxWS *MuxWebSocket) Receive() (m *Message, err error) {
muxWS.rMutex.Lock()

h := make([]byte, 13)

_, err = muxWS.Read(h)
if err != nil {
//muxWS.Close()
e := muxWS.Close()
if e != nil {
log.Println(e.Error())
}
return
}

//debug log
//log.Printf("%d %x",n, h)

m = LoadMessage(h)
data := make([]byte, m.Length)
buf := &bytes.Buffer{}
r := io.LimitReader(muxWS, int64(m.Length))

_, err = muxWS.Read(data)
_, err = io.Copy(buf, r)
if err != nil {
//muxWS.Close()
e := muxWS.Close()
if e != nil {
log.Println(e.Error())
}
return
}
muxWS.rMutex.Unlock()

m.Data = data
m.Data = buf.Bytes()

////debug log
//log.Printf("received %#v", m)
return
}

func (muxWS *MuxWebSocket) Close() (err error) {
muxWS.group.MuxWSs = nil
err = muxWS.WebSocket.Close()
return
}
8 changes: 5 additions & 3 deletions core/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/gorilla/websocket"
)

var ErrWebSocketClosed = errors.New("websocket closed")

type WebSocket struct {
conn *websocket.Conn
buf []byte
Expand All @@ -28,7 +30,7 @@ func NewWebSocket(conn *websocket.Conn, stats *Stats) (ws *WebSocket) {

func (ws *WebSocket) Read(p []byte) (n int, err error) {
if ws.closed == true {
return 0, errors.New("websocket closed")
return 0, ErrWebSocketClosed
}

if len(ws.buf) == 0 {
Expand All @@ -51,7 +53,7 @@ func (ws *WebSocket) Read(p []byte) (n int, err error) {

func (ws *WebSocket) Write(p []byte) (n int, err error) {
if ws.closed == true {
return 0, errors.New("websocket closed")
return 0, ErrWebSocketClosed
}

err = ws.conn.WriteMessage(websocket.BinaryMessage, p)
Expand All @@ -68,7 +70,7 @@ func (ws *WebSocket) Write(p []byte) (n int, err error) {
}

func (ws *WebSocket) Close() (err error) {
ws.conn.Close()
ws.closed = true
err = ws.conn.Close()
return
}

0 comments on commit de17a1d

Please sign in to comment.