From 480d0eb5d908f97051f6369d9bdbcf22b794af0a Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Mon, 23 Sep 2019 15:14:47 -0500 Subject: [PATCH 1/2] Fixes from @albrow's Wasm review --- README.md | 4 ++-- assert_test.go | 13 +++++++++++-- conn_common.go | 2 +- doc.go | 6 +++--- frame.go | 8 ++++---- internal/wsjs/{wsjs.go => wsjs_js.go} | 0 websocket_js_test.go | 4 ++-- 7 files changed, 23 insertions(+), 14 deletions(-) rename internal/wsjs/{wsjs.go => wsjs_js.go} (100%) diff --git a/README.md b/README.md index dc33e194..c5b8c907 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ go get nhooyr.io/websocket - JSON and ProtoBuf helpers in the [wsjson](https://godoc.org/nhooyr.io/websocket/wsjson) and [wspb](https://godoc.org/nhooyr.io/websocket/wspb) subpackages - Highly optimized by default - Concurrent writes out of the box -- [Complete WASM](https://godoc.org/nhooyr.io/websocket#hdr-WASM) support +- [Complete Wasm](https://godoc.org/nhooyr.io/websocket#hdr-Wasm) support ## Roadmap @@ -130,7 +130,7 @@ The ping API is also nicer. gorilla/websocket requires registering a pong handle which results in awkward control flow. With nhooyr/websocket you use the Ping method on the Conn that sends a ping and also waits for the pong. -Additionally, nhooyr.io/websocket can compile to [WASM](https://godoc.org/nhooyr.io/websocket#hdr-WASM) for the browser. +Additionally, nhooyr.io/websocket can compile to [Wasm](https://godoc.org/nhooyr.io/websocket#hdr-Wasm) for the browser. In terms of performance, the differences mostly depend on your application code. nhooyr/websocket reuses message buffers out of the box if you use the wsjson and wspb subpackages. diff --git a/assert_test.go b/assert_test.go index cddae99d..8970c543 100644 --- a/assert_test.go +++ b/assert_test.go @@ -2,10 +2,10 @@ package websocket_test import ( "context" - "encoding/hex" "fmt" "math/rand" "reflect" + "strings" "github.com/google/go-cmp/cmp" @@ -99,7 +99,16 @@ func randBytes(n int) []byte { } func randString(n int) string { - return hex.EncodeToString(randBytes(n))[:n] + s := strings.ToValidUTF8(string(randBytes(n)), "_") + if len(s) > n { + return s[:n] + } + if len(s) < n { + // Pad with = + extra := n - len(s) + return s + strings.Repeat("=", extra) + } + return s } func assertEcho(ctx context.Context, c *websocket.Conn, typ websocket.MessageType, n int) error { diff --git a/conn_common.go b/conn_common.go index 771db26b..1429b47d 100644 --- a/conn_common.go +++ b/conn_common.go @@ -1,5 +1,5 @@ // This file contains *Conn symbols relevant to both -// WASM and non WASM builds. +// Wasm and non Wasm builds. package websocket diff --git a/doc.go b/doc.go index 7753afc7..a17bfb05 100644 --- a/doc.go +++ b/doc.go @@ -18,14 +18,14 @@ // Use the errors.As function new in Go 1.13 to check for websocket.CloseError. // See the CloseError example. // -// WASM +// Wasm // -// The client side fully supports compiling to WASM. +// The client side fully supports compiling to Wasm. // It wraps the WebSocket browser API. // // See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket // -// Thus the unsupported features (not compiled in) for WASM are: +// Thus the unsupported features (not compiled in) for Wasm are: // // - Accept and AcceptOptions // - Conn.Ping diff --git a/frame.go b/frame.go index 84a18e02..95061f5a 100644 --- a/frame.go +++ b/frame.go @@ -213,8 +213,8 @@ const ( StatusNoStatusRcvd - // This StatusCode is only exported for use with WASM. - // In non WASM Go, the returned error will indicate whether the connection was closed or not or what happened. + // This StatusCode is only exported for use with Wasm. + // In non Wasm Go, the returned error will indicate whether the connection was closed or not or what happened. StatusAbnormalClosure StatusInvalidFramePayloadData @@ -226,8 +226,8 @@ const ( StatusTryAgainLater StatusBadGateway - // This StatusCode is only exported for use with WASM. - // In non WASM Go, the returned error will indicate whether there was a TLS handshake failure. + // This StatusCode is only exported for use with Wasm. + // In non Wasm Go, the returned error will indicate whether there was a TLS handshake failure. StatusTLSHandshake ) diff --git a/internal/wsjs/wsjs.go b/internal/wsjs/wsjs_js.go similarity index 100% rename from internal/wsjs/wsjs.go rename to internal/wsjs/wsjs_js.go diff --git a/websocket_js_test.go b/websocket_js_test.go index e68ba6f3..ba9431d4 100644 --- a/websocket_js_test.go +++ b/websocket_js_test.go @@ -36,12 +36,12 @@ func TestConn(t *testing.T) { t.Fatal(err) } - err = assertJSONEcho(ctx, c, 16) + err = assertJSONEcho(ctx, c, 1024) if err != nil { t.Fatal(err) } - err = assertEcho(ctx, c, websocket.MessageBinary, 16) + err = assertEcho(ctx, c, websocket.MessageBinary, 1024) if err != nil { t.Fatal(err) } From 9fc9f7ab6742008fb936186272696c9933d9c51b Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Mon, 23 Sep 2019 16:53:37 -0500 Subject: [PATCH 2/2] Ensure message order with a buffer --- conn.go | 6 ----- conn_common.go | 6 +++++ websocket_js.go | 62 ++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 54 insertions(+), 20 deletions(-) diff --git a/conn.go b/conn.go index 20dbece2..3d7d574e 100644 --- a/conn.go +++ b/conn.go @@ -120,12 +120,6 @@ func (c *Conn) Subprotocol() string { return c.subprotocol } -func (c *Conn) setCloseErr(err error) { - c.closeErrOnce.Do(func() { - c.closeErr = fmt.Errorf("websocket closed: %w", err) - }) -} - func (c *Conn) close(err error) { c.closeOnce.Do(func() { runtime.SetFinalizer(c, nil) diff --git a/conn_common.go b/conn_common.go index 1429b47d..ae0fe554 100644 --- a/conn_common.go +++ b/conn_common.go @@ -202,3 +202,9 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context { func (c *Conn) SetReadLimit(n int64) { c.msgReadLimit = n } + +func (c *Conn) setCloseErr(err error) { + c.closeErrOnce.Do(func() { + c.closeErr = fmt.Errorf("websocket closed: %w", err) + }) +} diff --git a/websocket_js.go b/websocket_js.go index 4ed49d97..3822797b 100644 --- a/websocket_js.go +++ b/websocket_js.go @@ -23,29 +23,32 @@ type Conn struct { msgReadLimit int64 - readClosed int64 - closeOnce sync.Once - closed chan struct{} - closeErr error + readClosed int64 + closeOnce sync.Once + closed chan struct{} + closeErrOnce sync.Once + closeErr error releaseOnClose func() releaseOnMessage func() - readch chan wsjs.MessageEvent + readSignal chan struct{} + readBufMu sync.Mutex + readBuf []wsjs.MessageEvent } func (c *Conn) close(err error) { c.closeOnce.Do(func() { runtime.SetFinalizer(c, nil) - c.closeErr = fmt.Errorf("websocket closed: %w", err) + c.setCloseErr(err) close(c.closed) }) } func (c *Conn) init() { c.closed = make(chan struct{}) - c.readch = make(chan wsjs.MessageEvent, 1) + c.readSignal = make(chan struct{}, 1) c.msgReadLimit = 32768 c.releaseOnClose = c.ws.OnClose(func(e wsjs.CloseEvent) { @@ -61,15 +64,28 @@ func (c *Conn) init() { }) c.releaseOnMessage = c.ws.OnMessage(func(e wsjs.MessageEvent) { - c.readch <- e + c.readBufMu.Lock() + defer c.readBufMu.Unlock() + + c.readBuf = append(c.readBuf, e) + + // Lets the read goroutine know there is definitely something in readBuf. + select { + case c.readSignal <- struct{}{}: + default: + } }) runtime.SetFinalizer(c, func(c *Conn) { - c.ws.Close(int(StatusInternalError), "") - c.close(errors.New("connection garbage collected")) + c.setCloseErr(errors.New("connection garbage collected")) + c.closeWithInternal() }) } +func (c *Conn) closeWithInternal() { + c.Close(StatusInternalError, "something went wrong") +} + // Read attempts to read a message from the connection. // The maximum time spent waiting is bounded by the context. func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { @@ -89,16 +105,32 @@ func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { } func (c *Conn) read(ctx context.Context) (MessageType, []byte, error) { - var me wsjs.MessageEvent select { case <-ctx.Done(): c.Close(StatusPolicyViolation, "read timed out") return 0, nil, ctx.Err() - case me = <-c.readch: + case <-c.readSignal: case <-c.closed: return 0, nil, c.closeErr } + c.readBufMu.Lock() + defer c.readBufMu.Unlock() + + me := c.readBuf[0] + // We copy the messages forward and decrease the size + // of the slice to avoid reallocating. + copy(c.readBuf, c.readBuf[1:]) + c.readBuf = c.readBuf[:len(c.readBuf)-1] + + if len(c.readBuf) > 0 { + // Next time we read, we'll grab the message. + select { + case c.readSignal <- struct{}{}: + default: + } + } + switch p := me.Data.(type) { case string: return MessageText, []byte(p), nil @@ -118,8 +150,10 @@ func (c *Conn) Write(ctx context.Context, typ MessageType, p []byte) error { // to match the Go API. It can only error if the message type // is unexpected or the passed bytes contain invalid UTF-8 for // MessageText. - c.Close(StatusInternalError, "something went wrong") - return fmt.Errorf("failed to write: %w", err) + err := fmt.Errorf("failed to write: %w", err) + c.setCloseErr(err) + c.closeWithInternal() + return err } return nil }