diff --git a/cmd/tests/tracing_module_test.go b/cmd/tests/tracing_module_test.go index 1e9ee1b20ad..00c36e18710 100644 --- a/cmd/tests/tracing_module_test.go +++ b/cmd/tests/tracing_module_test.go @@ -37,7 +37,7 @@ func TestTracingModuleClient(t *testing.T) { propagator: "w3c", }) - export default function () { + export default async function () { instrumentedHTTP.del("HTTPBIN_IP_URL/tracing"); instrumentedHTTP.get("HTTPBIN_IP_URL/tracing"); instrumentedHTTP.head("HTTPBIN_IP_URL/tracing"); @@ -46,13 +46,14 @@ func TestTracingModuleClient(t *testing.T) { instrumentedHTTP.post("HTTPBIN_IP_URL/tracing"); instrumentedHTTP.put("HTTPBIN_IP_URL/tracing"); instrumentedHTTP.request("GET", "HTTPBIN_IP_URL/tracing"); + await instrumentedHTTP.asyncRequest("GET", "HTTPBIN_IP_URL/tracing"); }; `) ts := getSingleFileTestState(t, script, []string{"--out", "json=results.json"}, 0) cmd.ExecuteWithGlobalState(ts.GlobalState) - assert.Equal(t, int64(8), atomic.LoadInt64(&gotRequests)) + assert.Equal(t, int64(9), atomic.LoadInt64(&gotRequests)) jsonResults, err := afero.ReadFile(ts.FS, "results.json") require.NoError(t, err) @@ -119,7 +120,7 @@ func TestTracingInstrumentHTTP_W3C(t *testing.T) { propagator: "w3c", }) - export default function () { + export default async function () { http.del("HTTPBIN_IP_URL/tracing"); http.get("HTTPBIN_IP_URL/tracing"); http.head("HTTPBIN_IP_URL/tracing"); @@ -128,13 +129,14 @@ func TestTracingInstrumentHTTP_W3C(t *testing.T) { http.post("HTTPBIN_IP_URL/tracing"); http.put("HTTPBIN_IP_URL/tracing"); http.request("GET", "HTTPBIN_IP_URL/tracing"); + await http.asyncRequest("GET", "HTTPBIN_IP_URL/tracing"); }; `) ts := getSingleFileTestState(t, script, []string{"--out", "json=results.json"}, 0) cmd.ExecuteWithGlobalState(ts.GlobalState) - assert.Equal(t, int64(8), atomic.LoadInt64(&gotRequests)) + assert.Equal(t, int64(9), atomic.LoadInt64(&gotRequests)) jsonResults, err := afero.ReadFile(ts.FS, "results.json") require.NoError(t, err) diff --git a/js/modules/k6/experimental/tracing/client.go b/js/modules/k6/experimental/tracing/client.go index d9ee0624dc1..42f53bd204d 100644 --- a/js/modules/k6/experimental/tracing/client.go +++ b/js/modules/k6/experimental/tracing/client.go @@ -6,7 +6,6 @@ import ( "time" "github.com/dop251/goja" - "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" httpmodule "go.k6.io/k6/js/modules/k6/http" "go.k6.io/k6/metrics" @@ -30,47 +29,54 @@ type Client struct { // uses it under the hood to emit the requests it // instruments. requestFunc HTTPRequestFunc + + // asyncRequestFunc holds the http module's asyncRequest function + // used to emit HTTP requests in k6 script. The client + // uses it under the hood to emit the requests it + // instruments. + asyncRequestFunc HTTPAsyncRequestFunc } // HTTPRequestFunc is a type alias representing the prototype of // k6's http module's request function -type HTTPRequestFunc func(method string, url goja.Value, args ...goja.Value) (*httpmodule.Response, error) +type ( + HTTPRequestFunc func(method string, url goja.Value, args ...goja.Value) (*httpmodule.Response, error) + HTTPAsyncRequestFunc func(method string, url goja.Value, args ...goja.Value) (*goja.Promise, error) +) // NewClient instantiates a new tracing Client -func NewClient(vu modules.VU, opts options) *Client { +func NewClient(vu modules.VU, opts options) (*Client, error) { rt := vu.Runtime() - // Get the http module's request function - httpModuleRequest, err := rt.RunString("require('k6/http').request") + // Get the http module + httpModule, err := rt.RunString("require('k6/http')") if err != nil { - common.Throw( - rt, - fmt.Errorf( - "failed initializing tracing client, "+ - "unable to require http.request method; reason: %w", - err, - ), - ) + return nil, + fmt.Errorf("failed initializing tracing client, unable to require k6/http module; reason: %w", err) } + httpModuleObject := httpModule.ToObject(rt) // Export the http module's request function goja.Callable as a Go function var requestFunc HTTPRequestFunc - if err := rt.ExportTo(httpModuleRequest, &requestFunc); err != nil { - common.Throw( - rt, - fmt.Errorf("failed initializing tracing client, unable to export http.request method; reason: %w", err), - ) + if err := rt.ExportTo(httpModuleObject.Get("request"), &requestFunc); err != nil { + return nil, + fmt.Errorf("failed initializing tracing client, unable to require http.request method; reason: %w", err) + } + // Export the http module's syncRequest function goja.Callable as a Go function + var asyncRequestFunc HTTPAsyncRequestFunc + if err := rt.ExportTo(httpModuleObject.Get("asyncRequest"), &asyncRequestFunc); err != nil { + return nil, + fmt.Errorf("failed initializing tracing client, unable to require http.asyncRequest method; reason: %w", + err) } - client := &Client{vu: vu, requestFunc: requestFunc} + client := &Client{vu: vu, requestFunc: requestFunc, asyncRequestFunc: asyncRequestFunc} if err := client.Configure(opts); err != nil { - common.Throw( - rt, - fmt.Errorf("failed initializing tracing client, invalid configuration; reason: %w", err), - ) + return nil, + fmt.Errorf("failed initializing tracing client, invalid configuration; reason: %w", err) } - return client + return client, nil } // Configure configures the tracing client with the given options. @@ -93,15 +99,7 @@ func (c *Client) Configure(opts options) error { return nil } -// Request instruments the http module's request function with tracing headers, -// and ensures the trace_id is emitted as part of the output's data points metadata. -func (c *Client) Request(method string, url goja.Value, args ...goja.Value) (*httpmodule.Response, error) { - // The http module's request function expects the first argument to be the - // request body. If no body is provided, we need to pass null to the function. - if len(args) == 0 { - args = []goja.Value{goja.Null()} - } - +func (c *Client) generateTraceContext() (http.Header, string, error) { traceID := TraceID{ Prefix: k6Prefix, Code: k6CloudCode, @@ -110,21 +108,64 @@ func (c *Client) Request(method string, url goja.Value, args ...goja.Value) (*ht encodedTraceID, err := traceID.Encode() if err != nil { - return nil, fmt.Errorf("failed to encode the generated trace ID; reason: %w", err) + return http.Header{}, "", fmt.Errorf("failed to encode the generated trace ID; reason: %w", err) } - // Produce a trace header in the format defined by the - // configured propagator. + // Produce a trace header in the format defined by the configured propagator. traceContextHeader, err := c.propagator.Propagate(encodedTraceID) if err != nil { - return nil, fmt.Errorf("failed to propagate trace ID; reason: %w", err) + return http.Header{}, "", fmt.Errorf("failed to propagate trace ID; reason: %w", err) } + return traceContextHeader, encodedTraceID, nil +} + +// Request instruments the http module's request function with tracing headers, +// and ensures the trace_id is emitted as part of the output's data points metadata. +func (c *Client) Request(method string, url goja.Value, args ...goja.Value) (*httpmodule.Response, error) { + var result *httpmodule.Response + var err error + err = c.instrumentedCall(func(args ...goja.Value) error { + result, err = c.requestFunc(method, url, args...) + return err + }, args...) + + if err != nil { + return nil, err + } + return result, nil +} + +// AsyncRequest instruments the http module's asyncRequest function with tracing headers, +// and ensures the trace_id is emitted as part of the output's data points metadata. +func (c *Client) AsyncRequest(method string, url goja.Value, args ...goja.Value) (*goja.Promise, error) { + var result *goja.Promise + var err error + err = c.instrumentedCall(func(args ...goja.Value) error { + result, err = c.asyncRequestFunc(method, url, args...) + return err + }, args...) + + if err != nil { + return nil, err + } + return result, nil +} + +func (c *Client) instrumentedCall(call func(args ...goja.Value) error, args ...goja.Value) error { + if len(args) == 0 { + args = []goja.Value{goja.Null()} + } + + traceContextHeader, encodedTraceID, err := c.generateTraceContext() + if err != nil { + return err + } // update the `params` argument with the trace context header // so that it can be used by the http module's request function. args, err = c.instrumentArguments(traceContextHeader, args...) if err != nil { - return nil, fmt.Errorf("failed to instrument request arguments; reason: %w", err) + return fmt.Errorf("failed to instrument request arguments; reason: %w", err) } // Add the trace ID to the VU's state, so that it can be @@ -132,19 +173,14 @@ func (c *Client) Request(method string, url goja.Value, args ...goja.Value) (*ht c.vu.State().Tags.Modify(func(t *metrics.TagsAndMeta) { t.SetMetadata(metadataTraceIDKeyName, encodedTraceID) }) - - response, err := c.requestFunc(method, url, args...) - if err != nil { - return nil, err - } - - // Remove the trace ID from the VU's state, so that it doesn't - // leak into other requests. - c.vu.State().Tags.Modify(func(t *metrics.TagsAndMeta) { - t.DeleteMetadata(metadataTraceIDKeyName) - }) - - return response, nil + // Remove the trace ID from the VU's state, so that it doesn't leak into other requests. + defer func() { + c.vu.State().Tags.Modify(func(t *metrics.TagsAndMeta) { + t.DeleteMetadata(metadataTraceIDKeyName) + }) + }() + + return call(args...) } // Del instruments the http module's delete method. diff --git a/js/modules/k6/experimental/tracing/module.go b/js/modules/k6/experimental/tracing/module.go index 4844697eadd..5a5ea90b2d6 100644 --- a/js/modules/k6/experimental/tracing/module.go +++ b/js/modules/k6/experimental/tracing/module.go @@ -71,7 +71,11 @@ func (mi *ModuleInstance) newClient(cc goja.ConstructorCall) *goja.Object { common.Throw(rt, fmt.Errorf("unable to parse options object; reason: %w", err)) } - return rt.ToValue(NewClient(mi.vu, opts)).ToObject(rt) + client, err := NewClient(mi.vu, opts) + if err != nil { + common.Throw(rt, err) + } + return rt.ToValue(client).ToObject(rt) } // InstrumentHTTP instruments the HTTP module with tracing headers. @@ -96,7 +100,11 @@ func (mi *ModuleInstance) instrumentHTTP(options options) { // Initialize the tracing module's instance default client, // and configure it using the user-supplied set of options. - mi.Client = NewClient(mi.vu, options) + var err error + mi.Client, err = NewClient(mi.vu, options) + if err != nil { + common.Throw(rt, err) + } // Explicitly inject the http module in the VU's runtime. // This allows us to later on override the http module's methods @@ -136,4 +144,5 @@ func (mi *ModuleInstance) instrumentHTTP(options options) { mustSetHTTPMethod("post", httpModuleObj, mi.Client.Patch) mustSetHTTPMethod("put", httpModuleObj, mi.Client.Patch) mustSetHTTPMethod("request", httpModuleObj, mi.Client.Request) + mustSetHTTPMethod("asyncRequest", httpModuleObj, mi.Client.AsyncRequest) } diff --git a/js/modules/k6/http/async_request_test.go b/js/modules/k6/http/async_request_test.go new file mode 100644 index 00000000000..79be3d1d121 --- /dev/null +++ b/js/modules/k6/http/async_request_test.go @@ -0,0 +1,290 @@ +package http + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" + logtest "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/js/modulestest" +) + +func wrapInAsyncLambda(input string) string { + // This makes it possible to use `await` freely on the "top" level + return "(async () => {\n " + input + "\n })()" +} + +func runOnEventLoop(runtime *modulestest.Runtime, code string) error { + // TODO move this in modulestest.Runtime and extend it + err := runtime.EventLoop.Start(func() error { + _, err := runtime.VU.Runtime().RunString(wrapInAsyncLambda(code)) + return err + }) + runtime.EventLoop.WaitOnRegistered() + return err +} + +func TestAsyncRequest(t *testing.T) { + t.Parallel() + t.Run("EmptyBody", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + + sr := ts.tb.Replacer.Replace + err := runOnEventLoop(ts.runtime, sr(` + var reqUrl = "HTTPBIN_URL/cookies" + var res = await http.asyncRequest("GET", reqUrl); + var jar = new http.CookieJar(); + + jar.set("HTTPBIN_URL/cookies", "key", "value"); + res = await http.asyncRequest("GET", reqUrl, null, { cookies: { key2: "value2" }, jar: jar }); + + if (res.json().key != "value") { throw new Error("wrong cookie value: " + res.json().key); } + + if (res.status != 200) { throw new Error("wrong status: " + res.status); } + if (res.request["method"] !== "GET") { throw new Error("http request method was not \"GET\": " + JSON.stringify(res.request)) } + if (res.request["body"].length != 0) { throw new Error("http request body was not null: " + JSON.stringify(res.request["body"])) } + if (res.request["url"] != reqUrl) { + throw new Error("wrong http request url: " + JSON.stringify(res.request)) + } + if (res.request["cookies"]["key2"][0].name != "key2") { throw new Error("wrong http request cookies: " + JSON.stringify(JSON.stringify(res.request["cookies"]["key2"]))) } + if (res.request["headers"]["User-Agent"][0] != "TestUserAgent") { throw new Error("wrong http request headers: " + JSON.stringify(res.request)) } + `)) + assert.NoError(t, err) + }) + t.Run("NonEmptyBody", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + + sr := ts.tb.Replacer.Replace + err := runOnEventLoop(ts.runtime, sr(` + var res = await http.asyncRequest("POST", "HTTPBIN_URL/post", {a: "a", b: 2}, {headers: {"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"}}); + if (res.status != 200) { throw new Error("wrong status: " + res.status); } + if (res.request["body"] != "a=a&b=2") { throw new Error("http request body was not set properly: " + JSON.stringify(res.request))} + `)) + assert.NoError(t, err) + }) + t.Run("Concurrent", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + sr := ts.tb.Replacer.Replace + err := runOnEventLoop(ts.runtime, sr(` + let start = Date.now() + let p1 = http.asyncRequest("GET", "HTTPBIN_URL/delay/200ms").then(() => { return Date.now() - start}) + let p2 = http.asyncRequest("GET", "HTTPBIN_URL/delay/100ms").then(() => { return Date.now() - start}) + let time1 = await p1; + let time2 = await p2; + if (time1 < time2) { + throw("request that should've taken 200ms took less time then one that should take 100ms " + time1 +">" + time2 ) + } + + `)) + assert.NoError(t, err) + }) +} + +func TestAsyncRequestResponseCallbackRace(t *testing.T) { + // This test is here only to tease out race conditions + t.Parallel() + ts := newTestCase(t) + err := ts.runtime.VU.Runtime().Set("q", func(f func()) { + rg := ts.runtime.EventLoop.RegisterCallback() + time.AfterFunc(time.Millisecond*5, func() { + rg(func() error { + f() + return nil + }) + }) + }) + require.NoError(t, err) + err = ts.runtime.VU.Runtime().Set("log", func(s string) { + // t.Log(s) // uncomment for debugging + }) + require.NoError(t, err) + err = runOnEventLoop(ts.runtime, ts.tb.Replacer.Replace(` + let call = (i) => { + log("s"+i) + if (i > 200) { return null; } + http.setResponseCallback(http.expectedStatuses(i)) + q(() => call(i+1)) // don't use promises as they resolve before eventloop callbacks such as the one from asyncRequest + } + for (let j = 0; j< 50; j++) { + call(0) + await http.asyncRequest("GET", "HTTPBIN_URL/redirect/20").then(() => log("!!!!!!!!!!!!!!!"+j)) + } + `)) + require.NoError(t, err) +} + +func TestAsyncRequestErrors(t *testing.T) { + // This likely should have a way to do the same for http.request and http.asyncRequest with the same tests + t.Parallel() + t.Run("Invalid", func(t *testing.T) { + t.Parallel() + t.Run("unsupported protocol", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + state := ts.runtime.VU.State() + + hook := logtest.NewLocal(state.Logger) + defer hook.Reset() + + err := runOnEventLoop(ts.runtime, ` + try { + http.asyncRequest("", "").catch((e) => globalThis.promiseRejected = e ) + } catch (e) { + globalThis.exceptionThrown = e + } + `) + require.NoError(t, err) + promiseRejected := ts.runtime.VU.Runtime().Get("promiseRejected") + exceptionThrown := ts.runtime.VU.Runtime().Get("exceptionThrown") + require.NotNil(t, promiseRejected) + require.True(t, promiseRejected.ToBoolean()) + require.Nil(t, exceptionThrown) + assert.Contains(t, promiseRejected.ToString(), "unsupported protocol scheme") + + logEntry := hook.LastEntry() + assert.Nil(t, logEntry) + }) + + t.Run("throw=false", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + state := ts.runtime.VU.State() + hook := logtest.NewLocal(state.Logger) + defer hook.Reset() + + err := runOnEventLoop(ts.runtime, ` + var res = await http.asyncRequest("GET", "some://example.com", null, { throw: false }); + if (res.error.search('unsupported protocol scheme "some"') == -1) { + throw new Error("wrong error:" + res.error); + } + throw new Error("another error"); + `) + require.ErrorContains(t, err, "another error") + + logEntry := hook.LastEntry() + if assert.NotNil(t, logEntry) { + assert.Equal(t, logrus.WarnLevel, logEntry.Level) + err, ok := logEntry.Data["error"].(error) + require.True(t, ok) + assert.ErrorContains(t, err, "unsupported protocol scheme") + assert.Equal(t, "Request Failed", logEntry.Message) + } + }) + }) + t.Run("InvalidURL", func(t *testing.T) { + t.Parallel() + + expErr := `invalid URL: parse "https:// test.k6.io": invalid character " " in host name` + t.Run("throw=true", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + + js := ` + try { + http.asyncRequest("GET", "https:// test.k6.io").catch((e) => globalThis.promiseRejected = e ) + } catch (e) { + globalThis.exceptionThrown = e + } + ` + err := runOnEventLoop(ts.runtime, js) + require.NoError(t, err) + promiseRejected := ts.runtime.VU.Runtime().Get("promiseRejected") + exceptionThrown := ts.runtime.VU.Runtime().Get("exceptionThrown") + require.NotNil(t, promiseRejected) + require.True(t, promiseRejected.ToBoolean()) + require.Nil(t, exceptionThrown) + assert.Contains(t, promiseRejected.ToString(), expErr) + }) + + t.Run("throw=false", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + rt := ts.runtime.VU.Runtime() + state := ts.runtime.VU.State() + state.Options.Throw.Bool = false + defer func() { state.Options.Throw.Bool = true }() + + hook := logtest.NewLocal(state.Logger) + defer hook.Reset() + + js := ` + var r = await http.asyncRequest("GET", "https:// test.k6.io"); + globalThis.ret = {error: r.error, error_code: r.error_code}; + ` + err := runOnEventLoop(ts.runtime, js) + require.NoError(t, err) + ret := rt.GlobalObject().Get("ret") + var retobj map[string]interface{} + var ok bool + if retobj, ok = ret.Export().(map[string]interface{}); !ok { + require.Fail(t, "got wrong return object: %#+v", retobj) + } + require.Equal(t, int64(1020), retobj["error_code"]) + require.Equal(t, expErr, retobj["error"]) + + logEntry := hook.LastEntry() + require.NotNil(t, logEntry) + assert.Equal(t, logrus.WarnLevel, logEntry.Level) + err, ok = logEntry.Data["error"].(error) + require.True(t, ok) + assert.ErrorContains(t, err, expErr) + assert.Equal(t, "Request Failed", logEntry.Message) + }) + + t.Run("throw=false,nopanic", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + rt := ts.runtime.VU.Runtime() + state := ts.runtime.VU.State() + state.Options.Throw.Bool = false + defer func() { state.Options.Throw.Bool = true }() + + hook := logtest.NewLocal(state.Logger) + defer hook.Reset() + + js := ` + var r = await http.asyncRequest("GET", "https:// test.k6.io"); + r.html(); + r.json(); + globalThis.ret = r.error_code; // not reached because of json() + ` + err := runOnEventLoop(ts.runtime, js) + ret := rt.GlobalObject().Get("ret") + require.Error(t, err) + assert.Nil(t, ret) + assert.Contains(t, err.Error(), "unexpected end of JSON input") + + logEntry := hook.LastEntry() + require.NotNil(t, logEntry) + assert.Equal(t, logrus.WarnLevel, logEntry.Level) + err, ok := logEntry.Data["error"].(error) + require.True(t, ok) + assert.ErrorContains(t, err, expErr) + assert.Equal(t, "Request Failed", logEntry.Message) + }) + }) + + t.Run("Unroutable", func(t *testing.T) { + t.Parallel() + ts := newTestCase(t) + err := runOnEventLoop(ts.runtime, ` + try { + http.asyncRequest("GET", "http://sdafsgdhfjg/").catch((e) => globalThis.promiseRejected = e ) + } catch (e) { + globalThis.exceptionThrown = e + }`) + expErr := "lookup sdafsgdhfjg" + require.NoError(t, err) + promiseRejected := ts.runtime.VU.Runtime().Get("promiseRejected") + exceptionThrown := ts.runtime.VU.Runtime().Get("exceptionThrown") + require.NotNil(t, promiseRejected) + require.True(t, promiseRejected.ToBoolean()) + require.Nil(t, exceptionThrown) + assert.Contains(t, promiseRejected.ToString(), expErr) + }) +} diff --git a/js/modules/k6/http/http.go b/js/modules/k6/http/http.go index be95412f597..87f65670328 100644 --- a/js/modules/k6/http/http.go +++ b/js/modules/k6/http/http.go @@ -86,6 +86,7 @@ func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { mustExport("del", mi.defaultClient.getMethodClosure(http.MethodDelete)) mustExport("options", mi.defaultClient.getMethodClosure(http.MethodOptions)) mustExport("request", mi.defaultClient.Request) + mustExport("asyncRequest", mi.defaultClient.asyncRequest) mustExport("batch", mi.defaultClient.Batch) mustExport("setResponseCallback", mi.defaultClient.SetResponseCallback) diff --git a/js/modules/k6/http/request.go b/js/modules/k6/http/request.go index 6abca5d40ec..fc54e820af2 100644 --- a/js/modules/k6/http/request.go +++ b/js/modules/k6/http/request.go @@ -39,38 +39,85 @@ func (c *Client) Request(method string, url goja.Value, args ...goja.Value) (*Re if state == nil { return nil, ErrHTTPForbiddenInInitContext } + body, params := splitRequestArgs(args) - var body interface{} - var params goja.Value + req, err := c.parseRequest(method, url, body, params) + if err != nil { + return c.handleParseRequestError(err) + } + resp, err := httpext.MakeRequest(c.moduleInstance.vu.Context(), state, req) + if err != nil { + return nil, err + } + c.processResponse(resp, req.ResponseType) + return c.responseFromHTTPext(resp), nil +} + +func splitRequestArgs(args []goja.Value) (body interface{}, params goja.Value) { if len(args) > 0 { body = args[0].Export() } if len(args) > 1 { params = args[1] } + return body, params +} + +func (c *Client) handleParseRequestError(err error) (*Response, error) { + state := c.moduleInstance.vu.State() + + if state.Options.Throw.Bool { + return nil, err + } + state.Logger.WithField("error", err).Warn("Request Failed") + r := httpext.NewResponse() + r.Error = err.Error() + var k6e httpext.K6Error + if errors.As(err, &k6e) { + r.ErrorCode = int(k6e.Code) + } + return &Response{Response: r, client: c}, nil +} +// asyncRequest makes an http request of the provided `method` and returns a promise. All the networking is done off +// the event loop and the returned promise will be resolved with the response or rejected with an error +func (c *Client) asyncRequest(method string, url goja.Value, args ...goja.Value) (*goja.Promise, error) { + state := c.moduleInstance.vu.State() + if c.moduleInstance.vu.State() == nil { + return nil, ErrHTTPForbiddenInInitContext + } + + body, params := splitRequestArgs(args) + rt := c.moduleInstance.vu.Runtime() req, err := c.parseRequest(method, url, body, params) + p, resolve, reject := rt.NewPromise() if err != nil { - if state.Options.Throw.Bool { - return nil, err + var resp *Response + if resp, err = c.handleParseRequestError(err); err != nil { + reject(err) + } else { + resolve(resp) } - state.Logger.WithField("error", err).Warn("Request Failed") - r := httpext.NewResponse() - r.Error = err.Error() - var k6e httpext.K6Error - if errors.As(err, &k6e) { - r.ErrorCode = int(k6e.Code) - } - return &Response{Response: r, client: c}, nil + return p, nil } - resp, err := httpext.MakeRequest(c.moduleInstance.vu.Context(), state, req) - if err != nil { - return nil, err - } - c.processResponse(resp, req.ResponseType) - return c.responseFromHTTPext(resp), nil + callback := c.moduleInstance.vu.RegisterCallback() + + go func() { + resp, err := httpext.MakeRequest(c.moduleInstance.vu.Context(), state, req) + callback(func() error { + if err != nil { + reject(err) + return nil //nolint:nilerr // we want to reject the promise in this case + } + c.processResponse(resp, req.ResponseType) + resolve(c.responseFromHTTPext(resp)) + return nil + }) + }() + + return p, nil } // processResponse stores the body as an ArrayBuffer if indicated by @@ -87,6 +134,7 @@ func (c *Client) responseFromHTTPext(resp *httpext.Response) *Response { } // TODO: break this function up +// //nolint:gocyclo, cyclop, funlen, gocognit func (c *Client) parseRequest( method string, reqURL, body interface{}, params goja.Value, diff --git a/js/modules/k6/http/response_callback_test.go b/js/modules/k6/http/response_callback_test.go index 7dfeb1c09f5..c333173262a 100644 --- a/js/modules/k6/http/response_callback_test.go +++ b/js/modules/k6/http/response_callback_test.go @@ -3,6 +3,7 @@ package http import ( "fmt" "sort" + "strings" "testing" "github.com/dop251/goja" @@ -85,7 +86,6 @@ func TestResponseCallbackInAction(t *testing.T) { ts := newTestCase(t) tb := ts.tb samples := ts.samples - rt := ts.runtime.VU.Runtime() sr := tb.Replacer.Replace @@ -138,7 +138,7 @@ func TestResponseCallbackInAction(t *testing.T) { "overwrite per request": { code: ` http.setResponseCallback(http.expectedStatuses(200)); - res = http.request("GET", "HTTPBIN_URL/redirect/1"); + http.request("GET", "HTTPBIN_URL/redirect/1"); `, expectedSamples: []expectedSample{ { @@ -227,7 +227,7 @@ func TestResponseCallbackInAction(t *testing.T) { "global overwrite with null": { code: ` http.setResponseCallback(null); - res = http.request("GET", "HTTPBIN_URL/redirect/1"); + http.request("GET", "HTTPBIN_URL/redirect/1"); `, expectedSamples: []expectedSample{ { @@ -257,10 +257,16 @@ func TestResponseCallbackInAction(t *testing.T) { } for name, testCase := range testCases { testCase := testCase - t.Run(name, func(t *testing.T) { + + runCode := func(code string) { + t.Helper() ts.instance.defaultClient.responseCallback = defaultExpectedStatuses.match - _, err := rt.RunString(sr(testCase.code)) + err := ts.runtime.EventLoop.Start(func() error { + _, err := ts.runtime.VU.Runtime().RunString(sr(code)) + return err + }) + ts.runtime.EventLoop.WaitOnRegistered() assert.NoError(t, err) bufSamples := metrics.GetBufferedSamples(samples) @@ -278,6 +284,12 @@ func TestResponseCallbackInAction(t *testing.T) { for i, expectedSample := range testCase.expectedSamples { assertRequestMetricsEmittedSingle(t, bufSamples[i], expectedSample.tags, expectedSample.metrics, nil) } + } + t.Run(name, func(t *testing.T) { + runCode(testCase.code) + }) + t.Run("async_"+name, func(t *testing.T) { + runCode(strings.ReplaceAll(testCase.code, "http.request", "http.asyncRequest")) }) } }