Skip to content

Commit

Permalink
fix(ws): correctly handle server-initiated close
Browse files Browse the repository at this point in the history
This avoids a hanging behavior when the server closes the WS connection,
either via a control close frame[1] or by closing the connection
prematurely without sending the control frame (deviating from the WS
protocol).

If the control close frame is not received but the connection is closed,
the error (`close 1006 (abnormal closure): unexpected EOF`) will be
returned to the user's WS error handler, as it indicates an issue with
the server.

The user-specified close handler should be called in either case now.

Closes #581

[1]: https://tools.ietf.org/html/rfc6455#section-5.5.1
  • Loading branch information
Ivan Mirić committed Oct 23, 2019
1 parent 5cda323 commit d2a6999
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 79 deletions.
4 changes: 2 additions & 2 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func TestSentReceivedMetrics(t *testing.T) {
{tr(`import ws from "k6/ws";
let data = "0123456789".repeat(100);
export default function() {
ws.connect("WSBIN_URL/ws-echo", null, function (socket) {
ws.connect("WSBIN_URL/ws-echo-invalid", null, function (socket) {
socket.on('open', function open() {
socket.send(data);
});
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestRunTags(t *testing.T) {
})
group("websockets", function() {
var response = ws.connect("WSBIN_URL/ws-echo", params, function (socket) {
var response = ws.connect("WSBIN_URL/ws-echo-invalid", params, function (socket) {
socket.on('open', function open() {
console.log('ws open and say hello');
socket.send("hello");
Expand Down
27 changes: 12 additions & 15 deletions js/modules/k6/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP
case readErr := <-readErrChan:
socket.handleEvent("error", rt.ToValue(readErr))

case readClose := <-readCloseChan:
case <-readCloseChan:
// handle server close
socket.handleEvent("close", rt.ToValue(readClose))
socket.Close()

case scheduledFn := <-socket.scheduled:
if _, err := scheduledFn(goja.Undefined()); err != nil {
Expand Down Expand Up @@ -436,7 +436,7 @@ func (s *Socket) closeConnection(code int) error {
websocket.FormatCloseMessage(code, ""),
time.Now().Add(writeWait),
)
if err != nil {
if err != nil && err != websocket.ErrCloseSent {
// Just call the handler, we'll try to close the connection anyway
s.handleEvent("error", rt.ToValue(err))
}
Expand All @@ -445,7 +445,7 @@ func (s *Socket) closeConnection(code int) error {
s.handleEvent("close", rt.ToValue(code))
_ = s.conn.Close()

// Stops the main control loop
// Stop the main control loop
close(s.done)
})

Expand All @@ -454,22 +454,19 @@ func (s *Socket) closeConnection(code int) error {

// Wraps conn.ReadMessage in a channel
func readPump(conn *websocket.Conn, readChan chan []byte, errorChan chan error, closeChan chan int) {
defer func() { _ = conn.Close() }()

for {
_, message, err := conn.ReadMessage()
if err != nil {

if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
closeChan <- err.(*websocket.CloseError).Code
} else if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
// Emit the error if it is not CloseNormalClosure
// and the error is not originated from closing the socket ourselves with `CloseGoingAway`
if websocket.IsUnexpectedCloseError(
err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
// Report an unexpected closure
errorChan <- err
}

//CloseGoingAway errors are ignored
return
code := websocket.CloseGoingAway
if e, ok := err.(*websocket.CloseError); ok {
code = e.Code
}
closeChan <- code
}

readChan <- message
Expand Down
98 changes: 65 additions & 33 deletions js/modules/k6/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package ws
import (
"context"
"crypto/tls"
"fmt"
"strconv"
"testing"

Expand Down Expand Up @@ -116,30 +117,30 @@ func TestSession(t *testing.T) {

t.Run("connect_ws", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.close()
});
if (res.status != 101) { throw new Error("connection failed with status: " + res.status); }
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")

t.Run("connect_wss", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let res = ws.connect("WSSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSSBIN_URL/ws-echo-invalid", function(socket){
socket.close()
});
if (res.status != 101) { throw new Error("TLS connection failed with status: " + res.status); }
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo-invalid"), 101, "")

t.Run("open", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let opened = false;
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.on("open", function() {
opened = true;
socket.close()
Expand All @@ -149,11 +150,11 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")

t.Run("send_receive", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.on("open", function() {
socket.send("test")
})
Expand All @@ -169,14 +170,14 @@ func TestSession(t *testing.T) {
})

samplesBuf := stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSMessagesSent, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertMetricEmitted(t, metrics.WSMessagesReceived, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")
assertMetricEmitted(t, metrics.WSMessagesSent, samplesBuf, sr("WSBIN_URL/ws-echo-invalid"))
assertMetricEmitted(t, metrics.WSMessagesReceived, samplesBuf, sr("WSBIN_URL/ws-echo-invalid"))

t.Run("interval", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let counter = 0;
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.setInterval(function () {
counter += 1;
if (counter > 2) { socket.close(); }
Expand All @@ -186,13 +187,13 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")

t.Run("timeout", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let start = new Date().getTime();
let ellapsed = new Date().getTime() - start;
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.setTimeout(function () {
ellapsed = new Date().getTime() - start;
socket.close();
Expand All @@ -204,12 +205,12 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")

t.Run("ping", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let pongReceived = false;
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.on("open", function(data) {
socket.ping();
});
Expand All @@ -227,15 +228,15 @@ func TestSession(t *testing.T) {
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSPing, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")
assertMetricEmitted(t, metrics.WSPing, samplesBuf, sr("WSBIN_URL/ws-echo-invalid"))

t.Run("multiple_handlers", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let pongReceived = false;
let otherPongReceived = false;
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.on("open", function(data) {
socket.ping();
});
Expand All @@ -261,13 +262,13 @@ func TestSession(t *testing.T) {
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSPing, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")
assertMetricEmitted(t, metrics.WSPing, samplesBuf, sr("WSBIN_URL/ws-echo-invalid"))

t.Run("close", func(t *testing.T) {
t.Run("client_close", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let closed = false;
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.on("open", function() {
socket.close()
})
Expand All @@ -279,7 +280,38 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")

serverCloseTests := []struct {
name string
endpoint string
}{
{"server_close_ok", "/ws-echo"},
// Ensure we correctly handle invalid WS server
// implementations that close the connection prematurely
// without sending a close control frame first.
{"server_close_invalid", "/ws-close-invalid"},
}

for _, tc := range serverCloseTests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
_, err := common.RunString(rt, sr(fmt.Sprintf(`
let closed = false;
let res = ws.connect("WSBIN_URL%s", function(socket){
socket.on("open", function() {
socket.send("test");
})
socket.on("close", function() {
closed = true;
})
});
if (!closed) { throw new Error ("close event not fired"); }
`, tc.endpoint)))
assert.NoError(t, err)
})
}

assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")
}

func TestErrors(t *testing.T) {
Expand Down Expand Up @@ -332,7 +364,7 @@ func TestErrors(t *testing.T) {

t.Run("error_in_setup", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
throw new Error("error in setup");
});
`))
Expand All @@ -342,7 +374,7 @@ func TestErrors(t *testing.T) {
t.Run("send_after_close", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let hasError = false;
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.on("open", function() {
socket.close();
socket.send("test");
Expand All @@ -357,13 +389,13 @@ func TestErrors(t *testing.T) {
}
`))
assert.NoError(t, err)
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")
})

t.Run("error on close", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
var closed = false;
let res = ws.connect("WSBIN_URL/ws-close", function(socket){
let res = ws.connect("WSBIN_URL/ws-close-invalid", function(socket){
socket.on('open', function open() {
socket.setInterval(function timeout() {
socket.ping();
Expand All @@ -386,7 +418,7 @@ func TestErrors(t *testing.T) {
});
`))
assert.NoError(t, err)
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close-invalid"), 101, "")
})
}

Expand Down Expand Up @@ -426,7 +458,7 @@ func TestSystemTags(t *testing.T) {
t.Run("only "+expectedTag, func(t *testing.T) {
state.Options.SystemTags = stats.ToSystemTagSet([]string{expectedTag})
_, err := common.RunString(rt, sr(`
let res = ws.connect("WSBIN_URL/ws-echo", function(socket){
let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){
socket.on("open", function() {
socket.send("test")
})
Expand Down Expand Up @@ -490,20 +522,20 @@ func TestTLSConfig(t *testing.T) {
}

_, err := common.RunString(rt, sr(`
let res = ws.connect("WSSBIN_URL/ws-close", function(socket){
let res = ws.connect("WSSBIN_URL/ws-close-invalid", function(socket){
socket.close()
});
if (res.status != 101) { throw new Error("TLS connection failed with status: " + res.status); }
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close-invalid"), 101, "")

t.Run("custom certificates", func(t *testing.T) {
state.TLSConfig = tb.TLSClientConfig

_, err := common.RunString(rt, sr(`
let res = ws.connect("WSSBIN_URL/ws-close", function(socket){
let res = ws.connect("WSSBIN_URL/ws-close-invalid", function(socket){
socket.close()
});
if (res.status != 101) {
Expand All @@ -512,5 +544,5 @@ func TestTLSConfig(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close-invalid"), 101, "")
}
2 changes: 1 addition & 1 deletion js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,7 @@ func TestStuffNotPanicking(t *testing.T) {
() => doc.find('p').each("wat"),
() => doc.find('p').map(),
() => doc.find('p').map("wat"),
() => ws.connect("WSBIN_URL/ws-echo"),
() => ws.connect("WSBIN_URL/ws-echo-invalid"),
];
testCases.forEach(function(fn, idx) {
Expand Down
Loading

0 comments on commit d2a6999

Please sign in to comment.