Skip to content

Commit

Permalink
Add an execution.Controller and a local no-op implementation
Browse files Browse the repository at this point in the history
This doesn't change any existing k6 behavior (see how the tests were not touched), but it adds hooks for distributed execution later on.
  • Loading branch information
na-- committed Jul 19, 2023
1 parent 703970e commit eab37e5
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 25 deletions.
3 changes: 2 additions & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
Expand Down Expand Up @@ -41,7 +42,7 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib
}

func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface {
execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
Expand Down
3 changes: 2 additions & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/types"
Expand Down Expand Up @@ -138,7 +139,7 @@ func TestSetupData(t *testing.T) {
TeardownTimeout: types.NullDurationFrom(5 * time.Second),
}, runner)

execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(t, err)
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestPatchStatus(t *testing.T) {
require.NoError(t, err)

testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{})
execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
Expand Down
3 changes: 2 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/event"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/js/common"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
Expand Down Expand Up @@ -114,7 +115,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {

// Create a local execution scheduler wrapping the runner.
logger.Debug("Initializing the execution scheduler...")
execScheduler, err := execution.NewScheduler(testRunState)
execScheduler, err := execution.NewScheduler(testRunState, local.NewController())
if err != nil {
return err
}
Expand Down
53 changes: 53 additions & 0 deletions execution/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package execution

// Controller implementations are used to control the k6 execution of a test or
// test suite, either locally or in a distributed environment.
type Controller interface {
// GetOrCreateData requests the data chunk with the given ID, if it already
// exists. If it doesn't (i.e. this was the first time this function was
// called with that ID), the given callback is called and its result and
// error are saved for the ID and returned for all other calls with it.
//
// This is an atomic function, so any calls to it while the callback is
// being executed the the same ID will wait for the first call to to finish
// and receive its result.
//
// TODO: split apart into `Once()`, `SetData(), `GetData()` and implement
// the GetOrCreateData() behavior in a helper like the ones below?
GetOrCreateData(ID string, callback func() ([]byte, error)) ([]byte, error)

// Signal is used to notify that the current instance has reached the given
// event ID, or that it has had an error.
Signal(eventID string, err error) error

// Subscribe creates a listener for the specified event ID and returns a
// callback that can wait until all other instances have reached it.
Subscribe(eventID string) (wait func() error)
}

// SignalAndWait implements a rendezvous point / barrier, a way for all
// instances to reach the same execution point and wait for each other, before
// they all ~simultaneously continue with the execution.
//
// It subscribes for the given event ID, signals that the current instance has
// reached it without an error, and then waits until all other instances have
// reached it or until there is an error in one of them.
func SignalAndWait(c Controller, eventID string) error {
wait := c.Subscribe(eventID)

if err := c.Signal(eventID, nil); err != nil {
return err
}
return wait()
}

// SignalErrorOrWait is a helper method that either immediately signals the
// given error and returns it, or it signals nominal completion and waits for
// all other instances to do the same (or signal an error themselves).
func SignalErrorOrWait(c Controller, eventID string, err error) error {
if err != nil {
_ = c.Signal(eventID, err)
return err // return the same error we got
}
return SignalAndWait(c, eventID)
}
45 changes: 45 additions & 0 deletions execution/local/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Package local implements the execution.Controller interface for local
// (single-machine) k6 execution.
package local

// Controller "controls" local tests. It doesn't actually do anything, it just
// implements the execution.Controller interface with no-op operations. The
// methods don't do anything because local tests have only a single instance.
//
// However, for test suites (https://github.com/grafana/k6/issues/1342) in the
// future, we will probably need to actually implement some of these methods and
// introduce simple synchronization primitives even for a single machine...
type Controller struct{}

// NewController creates a new local execution Controller.
func NewController() *Controller {
return &Controller{}
}

// GetOrCreateData immediately calls the given callback and returns its results.
func (c *Controller) GetOrCreateData(_ string, callback func() ([]byte, error)) ([]byte, error) {
return callback()
}

// Subscribe is a no-op, it doesn't actually wait for anything, because there is
// nothing to wait on - we only have one instance in local tests.
//
// TODO: actually use waitgroups, since this may actually matter for test
// suites, even for local test runs. That's because multiple tests might be
// executed even by a single instance, and if we have complicated flows (e.g.
// "test C is executed only after test A and test B finish"), the easiest way
// would be for different tests in the suite to reuse this Controller API *both*
// local and distributed runs.
func (c *Controller) Subscribe(_ string) func() error {
return func() error {
return nil
}
}

// Signal is a no-op, it doesn't actually do anything for local test runs.
//
// TODO: similar to Wait() above, this might actually be required for
// complex/branching test suites, even during local non-distributed execution.
func (c *Controller) Signal(_ string, _ error) error {
return nil
}
63 changes: 57 additions & 6 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
// executors, running setup() and teardown(), and actually starting the
// executors for the different scenarios at the appropriate times.
type Scheduler struct {
controller Controller

initProgress *pb.ProgressBar
executorConfigs []lib.ExecutorConfig // sorted by (startTime, ID)
executors []lib.Executor // sorted by (startTime, ID), excludes executors with no work
Expand All @@ -33,7 +35,7 @@ type Scheduler struct {
// initializing it beyond the bare minimum. Specifically, it creates the needed
// executor instances and a lot of state placeholders, but it doesn't initialize
// the executors and it doesn't initialize or run VUs.
func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) {
func NewScheduler(trs *lib.TestRunState, controller Controller) (*Scheduler, error) {
options := trs.Options
et, err := lib.NewExecutionTuple(options.ExecutionSegment, options.ExecutionSegmentSequence)
if err != nil {
Expand Down Expand Up @@ -81,6 +83,7 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) {
maxDuration: maxDuration,
maxPossibleVUs: maxPossibleVUs,
state: executionState,
controller: controller,
}, nil
}

Expand Down Expand Up @@ -380,6 +383,13 @@ func (e *Scheduler) Init(
) (stopVUEmission func(), initErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init")

if err := SignalAndWait(e.controller, "scheduler-init-start"); err != nil {
return nil, err
}
defer func() {
initErr = SignalErrorOrWait(e.controller, "scheduler-init-done", initErr)
}()

execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut)
stopVUEmission = func() {
Expand All @@ -405,16 +415,20 @@ func (e *Scheduler) Init(
// Run the Scheduler, funneling all generated metric samples through the supplied
// out channel.
//
//nolint:funlen
//nolint:funlen, gocognit
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (runErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run")

if err := SignalAndWait(e.controller, "scheduler-run-start"); err != nil {
return err
}
defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, runErr)
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
runErr = interruptErr
}
runErr = SignalErrorOrWait(e.controller, "scheduler-run-done", runErr)
}()

e.initProgress.Modify(pb.WithConstLeft("Run"))
Expand All @@ -430,6 +444,10 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
}
}

if err := SignalAndWait(e.controller, "test-ready-to-run-setup"); err != nil {
return err
}

e.initProgress.Modify(pb.WithConstProgress(1, "Starting test..."))
e.state.MarkStarted()
defer e.state.MarkEnded()
Expand All @@ -449,11 +467,27 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
if !e.state.Test.Options.NoSetup.Bool {
e.state.SetExecutionStatus(lib.ExecutionStatusSetup)
e.initProgress.Modify(pb.WithConstProgress(1, "setup()"))
if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("setup() aborted by error")
actuallyRanSetup := false
data, err := e.controller.GetOrCreateData("setup", func() ([]byte, error) {
actuallyRanSetup = true
if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("setup() aborted by error")
return nil, err
}
return e.state.Test.Runner.GetSetupData(), nil
})
if err != nil {
return err
}
if !actuallyRanSetup {
e.state.Test.Runner.SetSetupData(data)
}
}

if err := SignalAndWait(e.controller, "setup-done"); err != nil {
return err
}

e.initProgress.Modify(pb.WithHijack(e.getRunStats))

// Start all executors at their particular startTime in a separate goroutine...
Expand All @@ -469,6 +503,8 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
// Wait for all executors to finish
var firstErr error
for range e.executors {
// TODO: add logic to abort the test early if there was an error from
// the controller (e.g. some other instance for this test died)
err := <-runResults
if err != nil && firstErr == nil {
logger.WithError(err).Debug("Executor returned with an error, cancelling test run...")
Expand All @@ -477,19 +513,34 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
}
}

if err := SignalAndWait(e.controller, "execution-done"); err != nil {
return err
}

// Run teardown() after all executors are done, if it's not disabled
if !e.state.Test.Options.NoTeardown.Bool {
e.state.SetExecutionStatus(lib.ExecutionStatusTeardown)
e.initProgress.Modify(pb.WithConstProgress(1, "teardown()"))

// We run teardown() with the global context, so it isn't interrupted by
// thresholds or test.abort() or even Ctrl+C (unless used twice).
if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("teardown() aborted by error")
// TODO: add a `sync.Once` equivalent?
_, err := e.controller.GetOrCreateData("teardown", func() ([]byte, error) {
if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("teardown() aborted by error")
return nil, err
}
return nil, nil
})
if err != nil {
return err
}
}

if err := SignalAndWait(e.controller, "teardown-done"); err != nil {
return err
}

return firstErr
}

Expand Down
Loading

0 comments on commit eab37e5

Please sign in to comment.