Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unaligned 64 bit atomic loads on 32 bit platforms #154

Merged
merged 2 commits into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/wasm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ GOOS=js GOARCH=wasm go test -exec=wasmbrowsertest ./... -args "$WS_ECHO_SERVER_U

if ! wait "$wsjstestPID"; then
echo "wsjstest exited unsuccessfully"
echo "output:"
cat "$wsjstestOut"
exit 1
fi
15 changes: 8 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
)

// Conn represents a WebSocket connection.
// All methods may be called concurrently except for Reader, Read
// and SetReadLimit.
// All methods may be called concurrently except for Reader and Read.
//
// You must always read from the connection. Otherwise control
// frames will not be handled. See the docs on Reader and CloseRead.
Expand Down Expand Up @@ -56,7 +55,7 @@ type Conn struct {
writeHeaderBuf []byte
writeHeader *header
// read limit for a message in bytes.
msgReadLimit int64
msgReadLimit *atomicInt64

// Used to ensure a previous writer is not used after being closed.
activeWriter atomic.Value
Expand All @@ -70,7 +69,7 @@ type Conn struct {
activeReader *messageReader
// readFrameLock is acquired to read from bw.
readFrameLock chan struct{}
readClosed int64
readClosed *atomicInt64
readHeaderBuf []byte
controlPayloadBuf []byte

Expand All @@ -90,7 +89,8 @@ type Conn struct {
func (c *Conn) init() {
c.closed = make(chan struct{})

c.msgReadLimit = 32768
c.msgReadLimit = &atomicInt64{}
c.msgReadLimit.Store(32768)

c.writeMsgLock = make(chan struct{}, 1)
c.writeFrameLock = make(chan struct{}, 1)
Expand All @@ -105,6 +105,7 @@ func (c *Conn) init() {
c.writeHeaderBuf = makeWriteHeaderBuf()
c.writeHeader = &header{}
c.readHeaderBuf = makeReadHeaderBuf()
c.readClosed = &atomicInt64{}
c.controlPayloadBuf = make([]byte, maxControlFramePayload)

runtime.SetFinalizer(c, func(c *Conn) {
Expand Down Expand Up @@ -341,7 +342,7 @@ func (c *Conn) handleControl(ctx context.Context, h header) error {
// See https://github.com/nhooyr/websocket/issues/87#issue-451703332
// Most users should not need this.
func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) {
if atomic.LoadInt64(&c.readClosed) == 1 {
if c.readClosed.Load() == 1 {
return 0, nil, fmt.Errorf("websocket connection read closed")
}

Expand Down Expand Up @@ -391,7 +392,7 @@ func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) {
c.readerMsgHeader = h
c.readerFrameEOF = false
c.readerMaskPos = 0
c.readMsgLeft = c.msgReadLimit
c.readMsgLeft = c.msgReadLimit.Load()

r := &messageReader{
c: c,
Expand Down
25 changes: 23 additions & 2 deletions conn_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *netConn) SetReadDeadline(t time.Time) error {
// Use this when you do not want to read data messages from the connection anymore but will
// want to write messages to it.
func (c *Conn) CloseRead(ctx context.Context) context.Context {
atomic.StoreInt64(&c.readClosed, 1)
c.readClosed.Store(1)

ctx, cancel := context.WithCancel(ctx)
go func() {
Expand All @@ -200,11 +200,32 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context {
//
// When the limit is hit, the connection will be closed with StatusMessageTooBig.
func (c *Conn) SetReadLimit(n int64) {
c.msgReadLimit = n
c.msgReadLimit.Store(n)
}

func (c *Conn) setCloseErr(err error) {
c.closeErrOnce.Do(func() {
c.closeErr = fmt.Errorf("websocket closed: %w", err)
})
}

// See https://github.com/nhooyr/websocket/issues/153
type atomicInt64 struct {
v atomic.Value
}

func (v *atomicInt64) Load() int64 {
i, ok := v.v.Load().(int64)
if !ok {
return 0
}
return i
}

func (v *atomicInt64) Store(i int64) {
v.v.Store(i)
}

func (v *atomicInt64) String() string {
return fmt.Sprint(v.v.Load())
}
16 changes: 10 additions & 6 deletions websocket_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"reflect"
"runtime"
"sync"
"sync/atomic"
"syscall/js"

"nhooyr.io/websocket/internal/bpool"
Expand All @@ -21,9 +20,10 @@ import (
type Conn struct {
ws wsjs.WebSocket

msgReadLimit int64
// read limit for a message in bytes.
msgReadLimit *atomicInt64

readClosed int64
readClosed *atomicInt64
closeOnce sync.Once
closed chan struct{}
closeErrOnce sync.Once
Expand All @@ -49,7 +49,11 @@ func (c *Conn) close(err error) {
func (c *Conn) init() {
c.closed = make(chan struct{})
c.readSignal = make(chan struct{}, 1)
c.msgReadLimit = 32768

c.msgReadLimit = &atomicInt64{}
c.msgReadLimit.Store(32768)

c.readClosed = &atomicInt64{}

c.releaseOnClose = c.ws.OnClose(func(e wsjs.CloseEvent) {
cerr := CloseError{
Expand Down Expand Up @@ -89,15 +93,15 @@ func (c *Conn) closeWithInternal() {
// Read attempts to read a message from the connection.
// The maximum time spent waiting is bounded by the context.
func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
if atomic.LoadInt64(&c.readClosed) == 1 {
if c.readClosed.Load() == 1 {
return 0, nil, fmt.Errorf("websocket connection read closed")
}

typ, p, err := c.read(ctx)
if err != nil {
return 0, nil, fmt.Errorf("failed to read: %w", err)
}
if int64(len(p)) > c.msgReadLimit {
if int64(len(p)) > c.msgReadLimit.Load() {
c.Close(StatusMessageTooBig, fmt.Sprintf("read limited at %v bytes", c.msgReadLimit))
return 0, nil, c.closeErr
}
Expand Down