Skip to content

Commit

Permalink
More multi message tests for WebSockets (#2184)
Browse files Browse the repository at this point in the history
* add multi msg tests for ws

* added more assertions

* fixed typo

* refactoring according to suggestions

* small refactoring
  • Loading branch information
cooliscool authored Oct 19, 2021
1 parent f7a1ba1 commit aab12d5
Showing 1 changed file with 160 additions and 26 deletions.
186 changes: 160 additions & 26 deletions js/modules/k6/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strconv"
Expand All @@ -42,6 +43,8 @@ import (
"go.k6.io/k6/stats"
)

const statusProtocolSwitch = 101

func assertSessionMetricsEmitted(t *testing.T, sampleContainers []stats.SampleContainer, subprotocol, url string, status int, group string) {
seenSessions := false
seenSessionDuration := false
Expand Down Expand Up @@ -71,21 +74,20 @@ func assertSessionMetricsEmitted(t *testing.T, sampleContainers []stats.SampleCo
assert.True(t, seenSessionDuration, "url %s didn't emit SessionDuration", url)
}

func assertMetricEmitted(t *testing.T, metricName string, sampleContainers []stats.SampleContainer, url string) {
seenMetric := false
func assertMetricEmittedCount(t *testing.T, metricName string, sampleContainers []stats.SampleContainer, url string, count int) {
t.Helper()
actualCount := 0

for _, sampleContainer := range sampleContainers {
for _, sample := range sampleContainer.GetSamples() {
surl, ok := sample.Tags.Get("url")
assert.True(t, ok)
if surl == url {
if sample.Metric.Name == metricName {
seenMetric = true
}
if surl == url && sample.Metric.Name == metricName {
actualCount++
}
}
}
assert.True(t, seenMetric, "url %s didn't emit %s", url, metricName)
assert.Equal(t, count, actualCount, "url %s emitted %s %d times, expected was %d times", url, metricName, actualCount, count)
}

func TestSession(t *testing.T) {
Expand Down Expand Up @@ -131,7 +133,7 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")

t.Run("connect_wss", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -142,7 +144,7 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-echo"), statusProtocolSwitch, "")

t.Run("open", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -157,15 +159,15 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")

t.Run("send_receive", func(t *testing.T) {
_, err := rt.RunString(sr(`
var res = ws.connect("WSBIN_URL/ws-echo", function(socket){
socket.on("open", function() {
socket.send("test")
})
socket.on("message", function (data){
socket.on("message", function (data) {
if (!data=="test") {
throw new Error ("echo'd data doesn't match our message!");
}
Expand All @@ -177,9 +179,9 @@ func TestSession(t *testing.T) {
})

samplesBuf := stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertMetricEmitted(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)
assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)

t.Run("interval", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -194,7 +196,7 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
t.Run("bad interval", func(t *testing.T) {
_, err := rt.RunString(sr(`
var counter = 0;
Expand Down Expand Up @@ -240,7 +242,7 @@ func TestSession(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "setTimeout requires a >0 timeout parameter, received 0.00 ")
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")

t.Run("ping", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -263,8 +265,8 @@ func TestSession(t *testing.T) {
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)

t.Run("multiple_handlers", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand Down Expand Up @@ -297,8 +299,8 @@ func TestSession(t *testing.T) {
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), 101, "")
assertMetricEmitted(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"))
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)

t.Run("client_close", func(t *testing.T) {
_, err := rt.RunString(sr(`
Expand All @@ -315,7 +317,7 @@ func TestSession(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")

serverCloseTests := []struct {
name string
Expand Down Expand Up @@ -346,6 +348,138 @@ func TestSession(t *testing.T) {
assert.NoError(t, err)
})
}

t.Run("multi_message", func(t *testing.T) {
t.Parallel()

tb.Mux.HandleFunc("/ws-echo-multi", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header())
if err != nil {
return
}

for {
messageType, r, e := conn.NextReader()
if e != nil {
return
}
var wc io.WriteCloser
wc, err = conn.NextWriter(messageType)
if err != nil {
return
}
if _, err = io.Copy(wc, r); err != nil {
return
}
if err = wc.Close(); err != nil {
return
}
}
}))

t.Run("send_receive_multiple_ws", func(t *testing.T) {
_, err := rt.RunString(sr(`
var msg1 = "test1"
var msg2 = "test2"
var msg3 = "test3"
var allMsgsRecvd = false
var res = ws.connect("WSBIN_URL/ws-echo-multi", (socket) => {
socket.on("open", () => {
socket.send(msg1)
})
socket.on("message", (data) => {
if (data == msg1){
socket.send(msg2)
}
if (data == msg2){
socket.send(msg3)
}
if (data == msg3){
allMsgsRecvd = true
socket.close()
}
});
});
if (!allMsgsRecvd) {
throw new Error ("messages 1,2,3 in sequence, was not received from server");
}
`))
assert.NoError(t, err)
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-multi"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 3)
assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 3)

t.Run("send_receive_multiple_wss", func(t *testing.T) {
_, err := rt.RunString(sr(`
var msg1 = "test1"
var msg2 = "test2"
var secondMsgReceived = false
var res = ws.connect("WSSBIN_URL/ws-echo-multi", (socket) => {
socket.on("open", () => {
socket.send(msg1)
})
socket.on("message", (data) => {
if (data == msg1){
socket.send(msg2)
}
if (data == msg2){
secondMsgReceived = true
socket.close()
}
});
});
if (!secondMsgReceived) {
throw new Error ("second test message was not received from server!");
}
`))
assert.NoError(t, err)
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSSBIN_URL/ws-echo-multi"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSSBIN_URL/ws-echo-multi"), 2)
assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSSBIN_URL/ws-echo-multi"), 2)

t.Run("send_receive_text_binary", func(t *testing.T) {
_, err := rt.RunString(sr(`
var msg1 = "test1"
var msg2 = new Uint8Array([116, 101, 115, 116, 50]); // 'test2'
var secondMsgReceived = false
var res = ws.connect("WSBIN_URL/ws-echo-multi", (socket) => {
socket.on("open", () => {
socket.send(msg1)
})
socket.on("message", (data) => {
if (data == msg1){
socket.sendBinary(msg2.buffer)
}
});
socket.on("binaryMessage", (data) => {
let data2 = new Uint8Array(data)
if(JSON.stringify(msg2) == JSON.stringify(data2)){
secondMsgReceived = true
}
socket.close()
})
});
if (!secondMsgReceived) {
throw new Error ("second test message was not received from server!");
}
`))
assert.NoError(t, err)
})

samplesBuf = stats.GetBufferedSamples(samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo-multi"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSMessagesSentName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 2)
assertMetricEmittedCount(t, metrics.WSMessagesReceivedName, samplesBuf, sr("WSBIN_URL/ws-echo-multi"), 2)
})
}

func TestSocketSendBinary(t *testing.T) { //nolint: tparallel
Expand Down Expand Up @@ -519,7 +653,7 @@ func TestErrors(t *testing.T) {
}
`))
assert.NoError(t, err)
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-invalid"), statusProtocolSwitch, "")
})

t.Run("error on close", func(t *testing.T) {
Expand Down Expand Up @@ -548,7 +682,7 @@ func TestErrors(t *testing.T) {
});
`))
assert.NoError(t, err)
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-close"), statusProtocolSwitch, "")
})
}

Expand Down Expand Up @@ -659,7 +793,7 @@ func TestTLSConfig(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), statusProtocolSwitch, "")

t.Run("custom certificates", func(t *testing.T) {
state.TLSConfig = tb.TLSClientConfig
Expand All @@ -674,7 +808,7 @@ func TestTLSConfig(t *testing.T) {
`))
assert.NoError(t, err)
})
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSSBIN_URL/ws-close"), statusProtocolSwitch, "")
}

func TestReadPump(t *testing.T) {
Expand Down Expand Up @@ -800,5 +934,5 @@ func TestUserAgent(t *testing.T) {
`))
assert.NoError(t, err)

assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-useragent"), 101, "")
assertSessionMetricsEmitted(t, stats.GetBufferedSamples(samples), "", sr("WSBIN_URL/ws-echo-useragent"), statusProtocolSwitch, "")
}

0 comments on commit aab12d5

Please sign in to comment.