Skip to content

Commit

Permalink
http: add asyncRequest
Browse files Browse the repository at this point in the history
This is implementation of http.request, but it returns a promise and
does all the networking off the event loop.

The code is pretty simple and seems fairly safe given that most possible
race conditions would also happen in the case of
`http.batch`.

Even with that there is at least one test that previously couldn't have
happened with http.batch.

closes #2825
  • Loading branch information
mstoykov committed Jan 26, 2023
1 parent 1d99b0b commit 9d72974
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 5 deletions.
252 changes: 252 additions & 0 deletions js/modules/k6/http/async_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
package http

import (
"testing"
"time"

"github.com/sirupsen/logrus"
logtest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/js/modulestest"
)

func wrapInAsyncLambda(input string) string {
// This makes it possible to use `await` freely on the "top" level
return "(async () => {\n " + input + "\n })()"
}

func runOnEventLoop(runtime *modulestest.Runtime, code string) error {
err := runtime.EventLoop.Start(func() error {
_, err := runtime.VU.Runtime().RunString(wrapInAsyncLambda(code))
return err
})
runtime.EventLoop.WaitOnRegistered()
return err
}

func TestAsyncRequest(t *testing.T) {
t.Parallel()
t.Run("HTTPRequest", func(t *testing.T) {
t.Parallel()
t.Run("EmptyBody", func(t *testing.T) {
t.Parallel()
ts := newTestCase(t)

sr := func(input string) string {
return ts.tb.Replacer.Replace(wrapInAsyncLambda(input))
}
_, err := ts.runtime.VU.Runtime().RunString(sr(`
var reqUrl = "HTTPBIN_URL/cookies"
var res = http.get(reqUrl);
var jar = new http.CookieJar();
jar.set("HTTPBIN_URL/cookies", "key", "value");
res = await http.asyncRequest("GET", "HTTPBIN_URL/cookies", null, { cookies: { key2: "value2" }, jar: jar });
if (res.json().key != "value") { throw new Error("wrong cookie value: " + res.json().key); }
if (res.status != 200) { throw new Error("wrong status: " + res.status); }
if (res.request["method"] !== "GET") { throw new Error("http request method was not \"GET\": " + JSON.stringify(res.request)) }
if (res.request["body"].length != 0) { throw new Error("http request body was not null: " + JSON.stringify(res.request["body"])) }
if (res.request["url"] != reqUrl) {
throw new Error("wrong http request url: " + JSON.stringify(res.request))
}
if (res.request["cookies"]["key2"][0].name != "key2") { throw new Error("wrong http request cookies: " + JSON.stringify(JSON.stringify(res.request["cookies"]["key2"]))) }
if (res.request["headers"]["User-Agent"][0] != "TestUserAgent") { throw new Error("wrong http request headers: " + JSON.stringify(res.request)) }
`))
assert.NoError(t, err)
})
t.Run("NonEmptyBody", func(t *testing.T) {
t.Parallel()
ts := newTestCase(t)

sr := func(input string) string {
return ts.tb.Replacer.Replace(wrapInAsyncLambda(input))
}
_, err := ts.runtime.VU.Runtime().RunString(sr(`
var res = await http.asyncRequest("HTTPBIN_URL/post", {a: "a", b: 2}, {headers: {"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"}});
if (res.status != 200) { throw new Error("wrong status: " + res.status); }
if (res.request["body"] != "a=a&b=2") { throw new Error("http request body was not set properly: " + JSON.stringify(res.request))}
`))
assert.NoError(t, err)
})
})
}

func TestAsyncRequestResponseCallbackRace(t *testing.T) {
// This test is here only to tease out race conditions
t.Parallel()
ts := newTestCase(t)
err := ts.runtime.VU.Runtime().Set("q", func(f func()) {
rg := ts.runtime.EventLoop.RegisterCallback()
time.AfterFunc(time.Millisecond*5, func() {
rg(func() error {
f()
return nil
})
})
})
require.NoError(t, err)
err = ts.runtime.VU.Runtime().Set("log", func(s string) {
// t.Log(s) // uncomment for debugging
})
require.NoError(t, err)
err = runOnEventLoop(ts.runtime, ts.tb.Replacer.Replace(`
let call = (i) => {
log("s"+i)
if (i > 200) { return null; }
http.setResponseCallback(http.expectedStatuses(i))
q(() => call(i+1)) // don't use promises as they resolve before eventloop callbacks such as the one from asyncRequest
}
for (let j = 0; j< 50; j++) {
call(0)
await http.asyncRequest("GET", "HTTPBIN_URL/redirect/20").then(() => log("!!!!!!!!!!!!!!!"+j))
}
`))
require.NoError(t, err)
}

func TestAsyncRequestErrors(t *testing.T) {
// This likely should have a way to do the same for http.request and http.asyncRequest with the same tests
t.Parallel()
t.Run("Invalid", func(t *testing.T) {
t.Parallel()
t.Run("unsupported protocol", func(t *testing.T) {
t.Parallel()
ts := newTestCase(t)
state := ts.runtime.VU.State()

hook := logtest.NewLocal(state.Logger)
defer hook.Reset()

err := runOnEventLoop(ts.runtime, `await http.asyncRequest("", "");`)
require.Error(t, err)
assert.Contains(t, err.Error(), "unsupported protocol scheme")

logEntry := hook.LastEntry()
assert.Nil(t, logEntry)
})

t.Run("throw=false", func(t *testing.T) {
t.Parallel()
ts := newTestCase(t)
state := ts.runtime.VU.State()
hook := logtest.NewLocal(state.Logger)
defer hook.Reset()

err := runOnEventLoop(ts.runtime, `
var res = await http.asyncRequest("GET", "some://example.com", null, { throw: false });
if (res.error.search('unsupported protocol scheme "some"') == -1) {
throw new Error("wrong error:" + res.error);
}
throw new Error("another error");
`)
require.ErrorContains(t, err, "another error")

logEntry := hook.LastEntry()
if assert.NotNil(t, logEntry) {
assert.Equal(t, logrus.WarnLevel, logEntry.Level)
err, ok := logEntry.Data["error"].(error)
require.True(t, ok)
assert.ErrorContains(t, err, "unsupported protocol scheme")
assert.Equal(t, "Request Failed", logEntry.Message)
}
})
})
t.Run("InvalidURL", func(t *testing.T) {
t.Parallel()

expErr := `invalid URL: parse "https:// test.k6.io": invalid character " " in host name`
t.Run("throw=true", func(t *testing.T) {
t.Parallel()
ts := newTestCase(t)

js := `
await http.asyncRequest("GET", "https:// test.k6.io");
throw new Error("whoops!"); // shouldn't be reached
`
err := runOnEventLoop(ts.runtime, js)
require.Error(t, err)
assert.Contains(t, err.Error(), expErr)
})

t.Run("throw=false", func(t *testing.T) {
t.Parallel()
ts := newTestCase(t)
rt := ts.runtime.VU.Runtime()
state := ts.runtime.VU.State()
state.Options.Throw.Bool = false
defer func() { state.Options.Throw.Bool = true }()

hook := logtest.NewLocal(state.Logger)
defer hook.Reset()

js := `
(async function(){
var r = await http.asyncRequest("GET", "https:// test.k6.io");
globalThis.ret = {error: r.error, error_code: r.error_code};
})()
`
err := runOnEventLoop(ts.runtime, js)
require.NoError(t, err)
ret := rt.GlobalObject().Get("ret")
var retobj map[string]interface{}
var ok bool
if retobj, ok = ret.Export().(map[string]interface{}); !ok {
require.Fail(t, "got wrong return object: %#+v", retobj)
}
require.Equal(t, int64(1020), retobj["error_code"])
require.Equal(t, expErr, retobj["error"])

logEntry := hook.LastEntry()
require.NotNil(t, logEntry)
assert.Equal(t, logrus.WarnLevel, logEntry.Level)
err, ok = logEntry.Data["error"].(error)
require.True(t, ok)
assert.ErrorContains(t, err, expErr)
assert.Equal(t, "Request Failed", logEntry.Message)
})

t.Run("throw=false,nopanic", func(t *testing.T) {
t.Parallel()
ts := newTestCase(t)
rt := ts.runtime.VU.Runtime()
state := ts.runtime.VU.State()
state.Options.Throw.Bool = false
defer func() { state.Options.Throw.Bool = true }()

hook := logtest.NewLocal(state.Logger)
defer hook.Reset()

js := `
(async function(){
var r = await http.asyncRequest("GET", "https:// test.k6.io");
r.html();
r.json();
globalThis.ret = r.error_code; // not reached because of json()
})()
`
err := runOnEventLoop(ts.runtime, js)
ret := rt.GlobalObject().Get("ret")
require.Error(t, err)
assert.Nil(t, ret)
assert.Contains(t, err.Error(), "unexpected end of JSON input")

logEntry := hook.LastEntry()
require.NotNil(t, logEntry)
assert.Equal(t, logrus.WarnLevel, logEntry.Level)
err, ok := logEntry.Data["error"].(error)
require.True(t, ok)
assert.ErrorContains(t, err, expErr)
assert.Equal(t, "Request Failed", logEntry.Message)
})
})

t.Run("Unroutable", func(t *testing.T) {
t.Parallel()
ts := newTestCase(t)
err := runOnEventLoop(ts.runtime, `await http.asyncRequest("GET", "http://sdafsgdhfjg/");`)
assert.Error(t, err)
})
}
1 change: 1 addition & 0 deletions js/modules/k6/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
mustExport("del", mi.defaultClient.getMethodClosure(http.MethodDelete))
mustExport("options", mi.defaultClient.getMethodClosure(http.MethodOptions))
mustExport("request", mi.defaultClient.Request)
mustExport("asyncRequest", mi.defaultClient.asyncRequest)
mustExport("batch", mi.defaultClient.Batch)
mustExport("setResponseCallback", mi.defaultClient.SetResponseCallback)

Expand Down
56 changes: 56 additions & 0 deletions js/modules/k6/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,61 @@ func (c *Client) Request(method string, url goja.Value, args ...goja.Value) (*Re
return c.responseFromHTTPext(resp), nil
}

// Request makes an http request of the provided `method` and returns a corresponding response by
// taking goja.Values as arguments
func (c *Client) asyncRequest(method string, url goja.Value, args ...goja.Value) (*goja.Promise, error) {
state := c.moduleInstance.vu.State()
if state == nil {
return nil, ErrHTTPForbiddenInInitContext
}

var body interface{}
var params goja.Value

if len(args) > 0 {
body = args[0].Export()
}
if len(args) > 1 {
params = args[1]
}

rt := c.moduleInstance.vu.Runtime()
req, err := c.parseRequest(method, url, body, params)
if err != nil {
if state.Options.Throw.Bool {
return nil, err
}
state.Logger.WithField("error", err).Warn("Request Failed")
r := httpext.NewResponse()
r.Error = err.Error()
var k6e httpext.K6Error
if errors.As(err, &k6e) {
r.ErrorCode = int(k6e.Code)
}
p, resolve, _ := rt.NewPromise()
resolve(&Response{Response: r, client: c})
return p, nil
}

p, resolve, reject := rt.NewPromise()
callback := c.moduleInstance.vu.RegisterCallback()

go func() {
resp, err := httpext.MakeRequest(c.moduleInstance.vu.Context(), state, req)
callback(func() error {
if err != nil {
_ = reject
return err // maybe reject
}
c.processResponse(resp, req.ResponseType)
resolve(c.responseFromHTTPext(resp))
return nil
})
}()

return p, nil
}

// processResponse stores the body as an ArrayBuffer if indicated by
// respType. This is done here instead of in httpext.readResponseBody to avoid
// a reverse dependency on js/common or goja.
Expand All @@ -87,6 +142,7 @@ func (c *Client) responseFromHTTPext(resp *httpext.Response) *Response {
}

// TODO: break this function up
//
//nolint:gocyclo, cyclop, funlen, gocognit
func (c *Client) parseRequest(
method string, reqURL, body interface{}, params goja.Value,
Expand Down
22 changes: 17 additions & 5 deletions js/modules/k6/http/response_callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"fmt"
"sort"
"strings"
"testing"

"github.com/dop251/goja"
Expand Down Expand Up @@ -85,7 +86,6 @@ func TestResponseCallbackInAction(t *testing.T) {
ts := newTestCase(t)
tb := ts.tb
samples := ts.samples
rt := ts.runtime.VU.Runtime()

sr := tb.Replacer.Replace

Expand Down Expand Up @@ -138,7 +138,7 @@ func TestResponseCallbackInAction(t *testing.T) {
"overwrite per request": {
code: `
http.setResponseCallback(http.expectedStatuses(200));
res = http.request("GET", "HTTPBIN_URL/redirect/1");
http.request("GET", "HTTPBIN_URL/redirect/1");
`,
expectedSamples: []expectedSample{
{
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestResponseCallbackInAction(t *testing.T) {
"global overwrite with null": {
code: `
http.setResponseCallback(null);
res = http.request("GET", "HTTPBIN_URL/redirect/1");
http.request("GET", "HTTPBIN_URL/redirect/1");
`,
expectedSamples: []expectedSample{
{
Expand Down Expand Up @@ -257,10 +257,16 @@ func TestResponseCallbackInAction(t *testing.T) {
}
for name, testCase := range testCases {
testCase := testCase
t.Run(name, func(t *testing.T) {

runCode := func(code string) {
t.Helper()
ts.instance.defaultClient.responseCallback = defaultExpectedStatuses.match

_, err := rt.RunString(sr(testCase.code))
err := ts.runtime.EventLoop.Start(func() error {
_, err := ts.runtime.VU.Runtime().RunString(sr(code))
return err
})
ts.runtime.EventLoop.WaitOnRegistered()
assert.NoError(t, err)
bufSamples := metrics.GetBufferedSamples(samples)

Expand All @@ -278,6 +284,12 @@ func TestResponseCallbackInAction(t *testing.T) {
for i, expectedSample := range testCase.expectedSamples {
assertRequestMetricsEmittedSingle(t, bufSamples[i], expectedSample.tags, expectedSample.metrics, nil)
}
}
t.Run(name, func(t *testing.T) {
runCode(testCase.code)
})
t.Run("async_"+name, func(t *testing.T) {
runCode(strings.ReplaceAll(testCase.code, "http.request", "http.asyncRequest"))
})
}
}
Expand Down

0 comments on commit 9d72974

Please sign in to comment.