diff --git a/core/engine_test.go b/core/engine_test.go index 327c69d7fe0..e227bdc240d 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -535,7 +535,7 @@ func TestSentReceivedMetrics(t *testing.T) { {tr(`import ws from "k6/ws"; let data = "0123456789".repeat(100); export default function() { - ws.connect("WSBIN_URL/ws-echo", null, function (socket) { + ws.connect("WSBIN_URL/ws-echo-invalid", null, function (socket) { socket.on('open', function open() { socket.send(data); }); @@ -661,7 +661,7 @@ func TestRunTags(t *testing.T) { }) group("websockets", function() { - var response = ws.connect("WSBIN_URL/ws-echo", params, function (socket) { + var response = ws.connect("WSBIN_URL/ws-echo-invalid", params, function (socket) { socket.on('open', function open() { console.log('ws open and say hello'); socket.send("hello"); diff --git a/js/modules/k6/ws/ws.go b/js/modules/k6/ws/ws.go index 96146ca0461..74e99218cba 100644 --- a/js/modules/k6/ws/ws.go +++ b/js/modules/k6/ws/ws.go @@ -262,9 +262,9 @@ func (*WS) Connect(ctx context.Context, url string, args ...goja.Value) (*WSHTTP case readErr := <-readErrChan: socket.handleEvent("error", rt.ToValue(readErr)) - case readClose := <-readCloseChan: + case <-readCloseChan: // handle server close - socket.handleEvent("close", rt.ToValue(readClose)) + socket.Close() case scheduledFn := <-socket.scheduled: if _, err := scheduledFn(goja.Undefined()); err != nil { @@ -436,7 +436,7 @@ func (s *Socket) closeConnection(code int) error { websocket.FormatCloseMessage(code, ""), time.Now().Add(writeWait), ) - if err != nil { + if err != nil && err != websocket.ErrCloseSent { // Just call the handler, we'll try to close the connection anyway s.handleEvent("error", rt.ToValue(err)) } @@ -445,7 +445,7 @@ func (s *Socket) closeConnection(code int) error { s.handleEvent("close", rt.ToValue(code)) _ = s.conn.Close() - // Stops the main control loop + // Stop the main control loop close(s.done) }) @@ -454,22 +454,19 @@ func (s *Socket) closeConnection(code int) error { // Wraps conn.ReadMessage in a channel func readPump(conn *websocket.Conn, readChan chan []byte, errorChan chan error, closeChan chan int) { - defer func() { _ = conn.Close() }() - for { _, message, err := conn.ReadMessage() if err != nil { - - if websocket.IsCloseError(err, websocket.CloseNormalClosure) { - closeChan <- err.(*websocket.CloseError).Code - } else if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { - // Emit the error if it is not CloseNormalClosure - // and the error is not originated from closing the socket ourselves with `CloseGoingAway` + if websocket.IsUnexpectedCloseError( + err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + // Report an unexpected closure errorChan <- err } - - //CloseGoingAway errors are ignored - return + code := websocket.CloseGoingAway + if e, ok := err.(*websocket.CloseError); ok { + code = e.Code + } + closeChan <- code } readChan <- message diff --git a/js/modules/k6/ws/ws_test.go b/js/modules/k6/ws/ws_test.go index 731fdf1d8af..bddb2b16dcf 100644 --- a/js/modules/k6/ws/ws_test.go +++ b/js/modules/k6/ws/ws_test.go @@ -22,6 +22,7 @@ package ws import ( "context" "crypto/tls" + "fmt" "strconv" "testing" @@ -116,30 +117,30 @@ func TestSession(t *testing.T) { t.Run("connect_ws", func(t *testing.T) { _, err := common.RunString(rt, sr(` - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.close() }); if (res.status != 101) { throw new Error("connection failed with status: " + res.status); } `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") t.Run("connect_wss", func(t *testing.T) { _, err := common.RunString(rt, sr(` - let res = ws.connect("WSSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSSBIN_URL/ws-echo-invalid", function(socket){ socket.close() }); if (res.status != 101) { throw new Error("TLS connection failed with status: " + res.status); } `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo-invalid"), 101, "") t.Run("open", func(t *testing.T) { _, err := common.RunString(rt, sr(` let opened = false; - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.on("open", function() { opened = true; socket.close() @@ -149,11 +150,11 @@ func TestSession(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") t.Run("send_receive", func(t *testing.T) { _, err := common.RunString(rt, sr(` - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.on("open", function() { socket.send("test") }) @@ -169,14 +170,14 @@ func TestSession(t *testing.T) { }) samplesBuf := stats.GetBufferedSamples(samples) - assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "") - assertMetricEmitted(t, metrics.WSMessagesSent, samplesBuf, sr("WSBIN_URL/ws-echo")) - assertMetricEmitted(t, metrics.WSMessagesReceived, samplesBuf, sr("WSBIN_URL/ws-echo")) + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") + assertMetricEmitted(t, metrics.WSMessagesSent, samplesBuf, sr("WSBIN_URL/ws-echo-invalid")) + assertMetricEmitted(t, metrics.WSMessagesReceived, samplesBuf, sr("WSBIN_URL/ws-echo-invalid")) t.Run("interval", func(t *testing.T) { _, err := common.RunString(rt, sr(` let counter = 0; - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.setInterval(function () { counter += 1; if (counter > 2) { socket.close(); } @@ -186,13 +187,13 @@ func TestSession(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") t.Run("timeout", func(t *testing.T) { _, err := common.RunString(rt, sr(` let start = new Date().getTime(); let ellapsed = new Date().getTime() - start; - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.setTimeout(function () { ellapsed = new Date().getTime() - start; socket.close(); @@ -204,12 +205,12 @@ func TestSession(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") t.Run("ping", func(t *testing.T) { _, err := common.RunString(rt, sr(` let pongReceived = false; - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.on("open", function(data) { socket.ping(); }); @@ -227,15 +228,15 @@ func TestSession(t *testing.T) { }) samplesBuf = stats.GetBufferedSamples(samples) - assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "") - assertMetricEmitted(t, metrics.WSPing, samplesBuf, sr("WSBIN_URL/ws-echo")) + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") + assertMetricEmitted(t, metrics.WSPing, samplesBuf, sr("WSBIN_URL/ws-echo-invalid")) t.Run("multiple_handlers", func(t *testing.T) { _, err := common.RunString(rt, sr(` let pongReceived = false; let otherPongReceived = false; - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.on("open", function(data) { socket.ping(); }); @@ -261,13 +262,13 @@ func TestSession(t *testing.T) { }) samplesBuf = stats.GetBufferedSamples(samples) - assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "") - assertMetricEmitted(t, metrics.WSPing, samplesBuf, sr("WSBIN_URL/ws-echo")) + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") + assertMetricEmitted(t, metrics.WSPing, samplesBuf, sr("WSBIN_URL/ws-echo-invalid")) - t.Run("close", func(t *testing.T) { + t.Run("client_close", func(t *testing.T) { _, err := common.RunString(rt, sr(` let closed = false; - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.on("open", function() { socket.close() }) @@ -279,7 +280,38 @@ func TestSession(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + + serverCloseTests := []struct { + name string + endpoint string + }{ + {"server_close_ok", "/ws-echo"}, + // Ensure we correctly handle invalid WS server + // implementations that close the connection prematurely + // without sending a close control frame first. + {"server_close_invalid", "/ws-close-invalid"}, + } + + for _, tc := range serverCloseTests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + _, err := common.RunString(rt, sr(fmt.Sprintf(` + let closed = false; + let res = ws.connect("WSBIN_URL%s", function(socket){ + socket.on("open", function() { + socket.send("test"); + }) + socket.on("close", function() { + closed = true; + }) + }); + if (!closed) { throw new Error ("close event not fired"); } + `, tc.endpoint))) + assert.NoError(t, err) + }) + } + + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") } func TestErrors(t *testing.T) { @@ -332,7 +364,7 @@ func TestErrors(t *testing.T) { t.Run("error_in_setup", func(t *testing.T) { _, err := common.RunString(rt, sr(` - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ throw new Error("error in setup"); }); `)) @@ -342,7 +374,7 @@ func TestErrors(t *testing.T) { t.Run("send_after_close", func(t *testing.T) { _, err := common.RunString(rt, sr(` let hasError = false; - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.on("open", function() { socket.close(); socket.send("test"); @@ -357,13 +389,13 @@ func TestErrors(t *testing.T) { } `)) assert.NoError(t, err) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") }) t.Run("error on close", func(t *testing.T) { _, err := common.RunString(rt, sr(` var closed = false; - let res = ws.connect("WSBIN_URL/ws-close", function(socket){ + let res = ws.connect("WSBIN_URL/ws-close-invalid", function(socket){ socket.on('open', function open() { socket.setInterval(function timeout() { socket.ping(); @@ -386,7 +418,7 @@ func TestErrors(t *testing.T) { }); `)) assert.NoError(t, err) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close-invalid"), 101, "") }) } @@ -426,7 +458,7 @@ func TestSystemTags(t *testing.T) { t.Run("only "+expectedTag, func(t *testing.T) { state.Options.SystemTags = stats.ToSystemTagSet([]string{expectedTag}) _, err := common.RunString(rt, sr(` - let res = ws.connect("WSBIN_URL/ws-echo", function(socket){ + let res = ws.connect("WSBIN_URL/ws-echo-invalid", function(socket){ socket.on("open", function() { socket.send("test") }) @@ -490,20 +522,20 @@ func TestTLSConfig(t *testing.T) { } _, err := common.RunString(rt, sr(` - let res = ws.connect("WSSBIN_URL/ws-close", function(socket){ + let res = ws.connect("WSSBIN_URL/ws-close-invalid", function(socket){ socket.close() }); if (res.status != 101) { throw new Error("TLS connection failed with status: " + res.status); } `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close-invalid"), 101, "") t.Run("custom certificates", func(t *testing.T) { state.TLSConfig = tb.TLSClientConfig _, err := common.RunString(rt, sr(` - let res = ws.connect("WSSBIN_URL/ws-close", function(socket){ + let res = ws.connect("WSSBIN_URL/ws-close-invalid", function(socket){ socket.close() }); if (res.status != 101) { @@ -512,5 +544,5 @@ func TestTLSConfig(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close-invalid"), 101, "") } diff --git a/js/runner_test.go b/js/runner_test.go index 25e2c91bd21..e4a33b0c7b6 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -1403,7 +1403,7 @@ func TestStuffNotPanicking(t *testing.T) { () => doc.find('p').each("wat"), () => doc.find('p').map(), () => doc.find('p').map("wat"), - () => ws.connect("WSBIN_URL/ws-echo"), + () => ws.connect("WSBIN_URL/ws-echo-invalid"), ]; testCases.forEach(function(fn, idx) { diff --git a/lib/testutils/httpmultibin/httpmultibin.go b/lib/testutils/httpmultibin/httpmultibin.go index 3228eac9513..c1aa1218c24 100644 --- a/lib/testutils/httpmultibin/httpmultibin.go +++ b/lib/testutils/httpmultibin/httpmultibin.go @@ -100,32 +100,42 @@ type jsonBody struct { Compression string `json:"compression"` } -func websocketEchoHandler(w http.ResponseWriter, req *http.Request) { - conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header()) - if err != nil { - return - } - - mt, message, err := conn.ReadMessage() - if err != nil { - return - } - err = conn.WriteMessage(mt, message) - if err != nil { - return - } - err = conn.Close() - if err != nil { - return - } -} - -func websocketCloserHandler(w http.ResponseWriter, req *http.Request) { - conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header()) - if err != nil { - return - } - _ = conn.Close() +func getWebsocketHandler(echo bool, closePrematurely bool) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header()) + if err != nil { + return + } + if echo { + messageType, r, e := conn.NextReader() + if e != nil { + return + } + var wc io.WriteCloser + wc, err = conn.NextWriter(messageType) + if err != nil { + return + } + if _, err = io.Copy(wc, r); err != nil { + return + } + if err = wc.Close(); err != nil { + return + } + } + // closePrematurely=true mimics an invalid WS server that doesn't + // send a close control frame before closing the connection. + if !closePrematurely { + closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + _ = conn.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(time.Second)) + // Wait for response control frame + <-time.After(time.Second) + } + err = conn.Close() + if err != nil { + return + } + }) } func writeJSON(w io.Writer, v interface{}) error { @@ -193,8 +203,10 @@ func NewHTTPMultiBin(t testing.TB) *HTTPMultiBin { // Create a http.ServeMux and set the httpbin handler as the default mux := http.NewServeMux() mux.Handle("/brotli", getEncodedHandler(t, httpext.CompressionTypeBr)) - mux.HandleFunc("/ws-echo", websocketEchoHandler) - mux.HandleFunc("/ws-close", websocketCloserHandler) + mux.Handle("/ws-echo", getWebsocketHandler(true, false)) + mux.Handle("/ws-echo-invalid", getWebsocketHandler(true, true)) + mux.Handle("/ws-close", getWebsocketHandler(false, false)) + mux.Handle("/ws-close-invalid", getWebsocketHandler(false, true)) mux.Handle("/zstd", getEncodedHandler(t, httpext.CompressionTypeZstd)) mux.Handle("/zstd-br", getZstdBrHandler(t)) mux.Handle("/", httpbin.New().Handler())