Skip to content

Commit

Permalink
Add basic event loop with some API to be used by modules
Browse files Browse the repository at this point in the history
As well as cut down setTimeout implementation

A recent update to goja introduced Promise. The catch here is that
Promise's then will only be called when goja exits executing js code and
it has already been resolved. Also resolving and rejecting Promises
needs to happen while no other js code is being executed.

This more or less necessitates adding an event loop. Additionally
because a call to a k6 modules such as `k6/http` might make a promise to
signal when an http request is made, but if (no changes were made) the
iteration then finishes before the request completes, nothing would've
stopped the start of a *new* iteration (which would probably just again
ask k6/http to make a new request and return Promise).
This might be a desirable behaviour for some cases but arguably will be
very confusing so this commit also adds a way to Reserve(name should be
changed) a place on the queue (doesn't reserve an exact spot) so that
the event loop will not let the iteration finish until it gets
unreserved.
Additional abstraction to make a "handled" Promise is added so that k6
js-modules can use it the same way goja.NewPromise but with the key the
difference that the Promise will be waited to be resolved before the
event loop can end.

Additionally to that, some additional code was needed so there is an
event loop for all special functions calls (setup, teardown,
handleSummary, default) and the init context.

And finally, a basic setTimeout implementation was added.
There is no way to currently cancel the setTimeout and it doesn't take
code as the first argument or additional arguments to be given to the
callback later on.

fixes #882
  • Loading branch information
mstoykov committed Nov 12, 2021
1 parent 5b4fe2d commit d80f421
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 18 deletions.
125 changes: 125 additions & 0 deletions core/local/eventloop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2021 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package local

import (
"io/ioutil"
"net/url"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
)

func TestEventLoop(t *testing.T) {
t.Parallel()
script := []byte(`
setTimeout(()=> {console.log("initcontext setTimeout")}, 200)
console.log("initcontext");
export default function() {
setTimeout(()=> {console.log("default setTimeout")}, 200)
console.log("default");
};
export function setup() {
setTimeout(()=> {console.log("setup setTimeout")}, 200)
console.log("setup");
};
export function teardown() {
setTimeout(()=> {console.log("teardown setTimeout")}, 200)
console.log("teardown");
};
export function handleSummary() {
setTimeout(()=> {console.log("handleSummary setTimeout")}, 200)
console.log("handleSummary");
};
`)

logger := logrus.New()
logger.SetOutput(ioutil.Discard)
logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}}
logger.AddHook(&logHook)

registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
runner, err := js.New(
logger,
&loader.SourceData{
URL: &url.URL{Path: "/script.js"},
Data: script,
},
nil,
lib.RuntimeOptions{},
builtinMetrics,
registry,
)
require.NoError(t, err)

ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger,
lib.Options{
TeardownTimeout: types.NullDurationFrom(time.Second),
SetupTimeout: types.NullDurationFrom(time.Second),
})
defer cancel()

errCh := make(chan error, 1)
go func() { errCh <- execScheduler.Run(ctx, ctx, samples, builtinMetrics) }()

select {
case err := <-errCh:
require.NoError(t, err)
_, err = runner.HandleSummary(ctx, &lib.Summary{RootGroup: &lib.Group{}})
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{
"initcontext", // first initialization
"initcontext setTimeout",
"initcontext", // for vu
"initcontext setTimeout",
"initcontext", // for setup
"initcontext setTimeout",
"setup", // setup
"setup setTimeout",
"default", // one iteration
"default setTimeout",
"initcontext", // for teardown
"initcontext setTimeout",
"teardown", // teardown
"teardown setTimeout",
"initcontext", // for handleSummary
"initcontext setTimeout",
"handleSummary", // handleSummary
"handleSummary setTimeout",
}, msgs)
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
}
18 changes: 17 additions & 1 deletion js/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"fmt"
"net/url"
"runtime"
"time"

"github.com/dop251/goja"
"github.com/dop251/goja/parser"
Expand Down Expand Up @@ -69,6 +70,7 @@ type BundleInstance struct {
env map[string]string

exports map[string]goja.Callable
loop *eventLoop
}

// NewBundle creates a new bundle from a source file and a filesystem.
Expand Down Expand Up @@ -261,6 +263,7 @@ func (b *Bundle) Instantiate(logger logrus.FieldLogger, vuID uint64) (bi *Bundle
Context: ctxPtr,
exports: make(map[string]goja.Callable),
env: b.RuntimeOptions.Env,
loop: init.loop,
}

// Grab any exported functions that could be executed. These were
Expand Down Expand Up @@ -307,6 +310,16 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init *
}
rt.Set("__ENV", env)
rt.Set("__VU", vuID)
_ = rt.Set("setTimeout", func(f func(), t float64) {
// TODO checks and fixes
// TODO maybe really return something to use with `clearTimeout
// TODO support arguments ... maybe
runOnLoop := init.loop.Reserve()
go func() {
time.Sleep(time.Duration(t * float64(time.Millisecond)))
runOnLoop(f)
}()
})
rt.Set("console", common.Bind(rt, newConsole(logger), init.ctxPtr))

if init.compatibilityMode == lib.CompatibilityModeExtended {
Expand All @@ -324,7 +337,10 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init *
ctx := common.WithInitEnv(context.Background(), initenv)
*init.ctxPtr = common.WithRuntime(ctx, rt)
unbindInit := common.BindToGlobal(rt, common.Bind(rt, init, init.ctxPtr))
if _, err := rt.RunProgram(b.Program); err != nil {
var err error
init.loop.RunOnLoop(func() { _, err = rt.RunProgram(b.Program) })
init.loop.Start(*init.ctxPtr)
if err != nil {
var exception *goja.Exception
if errors.As(err, &exception) {
err = &scriptException{inner: exception}
Expand Down
117 changes: 117 additions & 0 deletions js/eventloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2021 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package js

import (
"context"
"sync"
)

// an event loop
// TODO: DO NOT USE AS IT'S NOT DONE
type eventLoop struct {
queueLock sync.Mutex
queue []func()
wakeupCh chan struct{} // maybe use sync.Cond ?
reservedCount int
}

func newEventLoop() *eventLoop {
return &eventLoop{
wakeupCh: make(chan struct{}, 1),
}
}

// RunOnLoop queues the function to be called from/on the loop
// This needs to be called before calling `Start`
// TODO maybe have only Reserve as this is equal to `e.Reserve()(f)`
func (e *eventLoop) RunOnLoop(f func()) {
e.queueLock.Lock()
e.queue = append(e.queue, f)
e.queueLock.Unlock()
select {
case e.wakeupCh <- struct{}{}:
default:
}
}

// Reserve "reserves" a spot on the loop, preventing it from returning/finishing. The returning function will queue it's
// argument and wakeup the loop if needed and also unreserve the spot so that the loop can exit.
// this should be used instead of MakeHandledPromise if a promise will not be returned
// TODO better name
func (e *eventLoop) Reserve() func(func()) {
e.queueLock.Lock()
e.reservedCount++
e.queueLock.Unlock()

return func(f func()) {
e.queueLock.Lock()
e.queue = append(e.queue, f)
e.reservedCount--
e.queueLock.Unlock()
select {
case e.wakeupCh <- struct{}{}:
default:
}
}
}

// Start will run the event loop until it's empty and there are no reserved spots
// or the context is done
//nolint:cyclop
func (e *eventLoop) Start(ctx context.Context) {
done := ctx.Done()
for {
select { // check if done
case <-done:
return
default:
}

// acquire the queue
e.queueLock.Lock()
queue := e.queue
e.queue = make([]func(), 0, len(queue))
reserved := e.reservedCount != 0
e.queueLock.Unlock()

if len(queue) == 0 {
if !reserved { // we have empty queue and nothing that reserved a spot
return
}
select { // wait until the reserved is done
case <-done:
return
case <-e.wakeupCh:
}
}

for _, f := range queue {
// run each function in the queue if not done
select {
case <-done:
return
default:
f()
}
}
}
}
71 changes: 71 additions & 0 deletions js/eventloop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2021 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package js

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestBasicEventLoop(t *testing.T) {
t.Parallel()
loop := newEventLoop()
var ran int
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
loop.RunOnLoop(func() { ran++ })
loop.Start(ctx)
require.Equal(t, ran, 1)
loop.RunOnLoop(func() { ran++ })
loop.RunOnLoop(func() { ran++ })
loop.Start(ctx)
require.Equal(t, ran, 3)
loop.RunOnLoop(func() { ran++; cancel() })
loop.RunOnLoop(func() { ran++ })
loop.Start(ctx)
require.Equal(t, ran, 4)
}

func TestEventLoopReserve(t *testing.T) {
t.Parallel()
loop := newEventLoop()
var ran int
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
loop.RunOnLoop(func() {
ran++
r := loop.Reserve()
go func() {
time.Sleep(time.Second)
r(func() {
ran++
})
}()
})
start := time.Now()
loop.Start(ctx)
took := time.Since(start)
require.Equal(t, ran, 2)
require.Greater(t, took, time.Second)
}
Loading

0 comments on commit d80f421

Please sign in to comment.