From aab12d55f5e49e6ca4866fb598e13c6f391d05ca Mon Sep 17 00:00:00 2001 From: Ajmal Moochingal Date: Tue, 19 Oct 2021 18:01:52 +0530 Subject: [PATCH] More multi message tests for WebSockets (#2184) * add multi msg tests for ws * added more assertions * fixed typo * refactoring according to suggestions * small refactoring --- js/modules/k6/ws/ws_test.go | 186 +++++++++++++++++++++++++++++++----- 1 file changed, 160 insertions(+), 26 deletions(-) diff --git a/js/modules/k6/ws/ws_test.go b/js/modules/k6/ws/ws_test.go index 27ec5499e5a..fc147a1e071 100644 --- a/js/modules/k6/ws/ws_test.go +++ b/js/modules/k6/ws/ws_test.go @@ -23,6 +23,7 @@ import ( "context" "crypto/tls" "fmt" + "io" "net/http" "net/http/httptest" "strconv" @@ -42,6 +43,8 @@ import ( "go.k6.io/k6/stats" ) +const statusProtocolSwitch = 101 + func assertSessionMetricsEmitted(t *testing.T, sampleContainers []stats.SampleContainer, subprotocol, url string, status int, group string) { seenSessions := false seenSessionDuration := false @@ -71,21 +74,20 @@ func assertSessionMetricsEmitted(t *testing.T, sampleContainers []stats.SampleCo assert.True(t, seenSessionDuration, "url %s didn't emit SessionDuration", url) } -func assertMetricEmitted(t *testing.T, metricName string, sampleContainers []stats.SampleContainer, url string) { - seenMetric := false +func assertMetricEmittedCount(t *testing.T, metricName string, sampleContainers []stats.SampleContainer, url string, count int) { + t.Helper() + actualCount := 0 for _, sampleContainer := range sampleContainers { for _, sample := range sampleContainer.GetSamples() { surl, ok := sample.Tags.Get("url") assert.True(t, ok) - if surl == url { - if sample.Metric.Name == metricName { - seenMetric = true - } + if surl == url && sample.Metric.Name == metricName { + actualCount++ } } } - assert.True(t, seenMetric, "url %s didn't emit %s", url, metricName) + assert.Equal(t, count, actualCount, "url %s emitted %s %d times, expected was %d times", url, metricName, actualCount, count) } func TestSession(t *testing.T) { @@ -131,7 +133,7 @@ func TestSession(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "") t.Run("connect_wss", func(t *testing.T) { _, err := rt.RunString(sr(` @@ -142,7 +144,7 @@ func TestSession(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo"), statusProtocolSwitch, "") t.Run("open", func(t *testing.T) { _, err := rt.RunString(sr(` @@ -157,7 +159,7 @@ func TestSession(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "") t.Run("send_receive", func(t *testing.T) { _, err := rt.RunString(sr(` @@ -165,7 +167,7 @@ func TestSession(t *testing.T) { socket.on("open", function() { socket.send("test") }) - socket.on("message", function (data){ + socket.on("message", function (data) { if (!data=="test") { throw new Error ("echo'd data doesn't match our message!"); } @@ -177,9 +179,9 @@ func TestSession(t *testing.T) { }) samplesBuf := stats.GetBufferedSamples(samples) - assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "") - assertMetricEmitted(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo")) - assertMetricEmitted(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo")) + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "") + assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1) + assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1) t.Run("interval", func(t *testing.T) { _, err := rt.RunString(sr(` @@ -194,7 +196,7 @@ func TestSession(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "") t.Run("bad interval", func(t *testing.T) { _, err := rt.RunString(sr(` var counter = 0; @@ -240,7 +242,7 @@ func TestSession(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "setTimeout requires a >0 timeout parameter, received 0.00 ") }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "") t.Run("ping", func(t *testing.T) { _, err := rt.RunString(sr(` @@ -263,8 +265,8 @@ func TestSession(t *testing.T) { }) samplesBuf = stats.GetBufferedSamples(samples) - assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "") - assertMetricEmitted(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo")) + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "") + assertMetricEmittedCount(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1) t.Run("multiple_handlers", func(t *testing.T) { _, err := rt.RunString(sr(` @@ -297,8 +299,8 @@ func TestSession(t *testing.T) { }) samplesBuf = stats.GetBufferedSamples(samples) - assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "") - assertMetricEmitted(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo")) + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "") + assertMetricEmittedCount(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1) t.Run("client_close", func(t *testing.T) { _, err := rt.RunString(sr(` @@ -315,7 +317,7 @@ func TestSession(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "") serverCloseTests := []struct { name string @@ -346,6 +348,138 @@ func TestSession(t *testing.T) { assert.NoError(t, err) }) } + + t.Run("multi_message", func(t *testing.T) { + t.Parallel() + + tb.Mux.HandleFunc("/ws-echo-multi", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header()) + if err != nil { + return + } + + for { + messageType, r, e := conn.NextReader() + if e != nil { + return + } + var wc io.WriteCloser + wc, err = conn.NextWriter(messageType) + if err != nil { + return + } + if _, err = io.Copy(wc, r); err != nil { + return + } + if err = wc.Close(); err != nil { + return + } + } + })) + + t.Run("send_receive_multiple_ws", func(t *testing.T) { + _, err := rt.RunString(sr(` + var msg1 = "test1" + var msg2 = "test2" + var msg3 = "test3" + var allMsgsRecvd = false + var res = ws.connect("WSBIN_URL/ws-echo-multi", (socket) => { + socket.on("open", () => { + socket.send(msg1) + }) + socket.on("message", (data) => { + if (data == msg1){ + socket.send(msg2) + } + if (data == msg2){ + socket.send(msg3) + } + if (data == msg3){ + allMsgsRecvd = true + socket.close() + } + }); + }); + + if (!allMsgsRecvd) { + throw new Error ("messages 1,2,3 in sequence, was not received from server"); + } + `)) + assert.NoError(t, err) + }) + + samplesBuf = stats.GetBufferedSamples(samples) + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-multi"), statusProtocolSwitch, "") + assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 3) + assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 3) + + t.Run("send_receive_multiple_wss", func(t *testing.T) { + _, err := rt.RunString(sr(` + var msg1 = "test1" + var msg2 = "test2" + var secondMsgReceived = false + var res = ws.connect("WSSBIN_URL/ws-echo-multi", (socket) => { + socket.on("open", () => { + socket.send(msg1) + }) + socket.on("message", (data) => { + if (data == msg1){ + socket.send(msg2) + } + if (data == msg2){ + secondMsgReceived = true + socket.close() + } + }); + }); + + if (!secondMsgReceived) { + throw new Error ("second test message was not received from server!"); + } + `)) + assert.NoError(t, err) + }) + + samplesBuf = stats.GetBufferedSamples(samples) + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSSBIN_URL/ws-echo-multi"), statusProtocolSwitch, "") + assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSSBIN_URL/ws-echo-multi"), 2) + assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSSBIN_URL/ws-echo-multi"), 2) + + t.Run("send_receive_text_binary", func(t *testing.T) { + _, err := rt.RunString(sr(` + var msg1 = "test1" + var msg2 = new Uint8Array([116, 101, 115, 116, 50]); // 'test2' + var secondMsgReceived = false + var res = ws.connect("WSBIN_URL/ws-echo-multi", (socket) => { + socket.on("open", () => { + socket.send(msg1) + }) + socket.on("message", (data) => { + if (data == msg1){ + socket.sendBinary(msg2.buffer) + } + }); + socket.on("binaryMessage", (data) => { + let data2 = new Uint8Array(data) + if(JSON.stringify(msg2) == JSON.stringify(data2)){ + secondMsgReceived = true + } + socket.close() + }) + }); + + if (!secondMsgReceived) { + throw new Error ("second test message was not received from server!"); + } + `)) + assert.NoError(t, err) + }) + + samplesBuf = stats.GetBufferedSamples(samples) + assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-multi"), statusProtocolSwitch, "") + assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 2) + assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 2) + }) } func TestSocketSendBinary(t *testing.T) { //nolint: tparallel @@ -519,7 +653,7 @@ func TestErrors(t *testing.T) { } `)) assert.NoError(t, err) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), statusProtocolSwitch, "") }) t.Run("error on close", func(t *testing.T) { @@ -548,7 +682,7 @@ func TestErrors(t *testing.T) { }); `)) assert.NoError(t, err) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close"), statusProtocolSwitch, "") }) } @@ -659,7 +793,7 @@ func TestTLSConfig(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), statusProtocolSwitch, "") t.Run("custom certificates", func(t *testing.T) { state.TLSConfig = tb.TLSClientConfig @@ -674,7 +808,7 @@ func TestTLSConfig(t *testing.T) { `)) assert.NoError(t, err) }) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), statusProtocolSwitch, "") } func TestReadPump(t *testing.T) { @@ -800,5 +934,5 @@ func TestUserAgent(t *testing.T) { `)) assert.NoError(t, err) - assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-useragent"), 101, "") + assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-useragent"), statusProtocolSwitch, "") }