diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index 9893d87bfe4..84cc8dcd341 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.k6.io/k6/execution" + "go.k6.io/k6/execution/local" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" @@ -41,7 +42,7 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib } func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface { - execScheduler, err := execution.NewScheduler(testState) + execScheduler, err := execution.NewScheduler(testState, local.NewController()) require.NoError(tb, err) me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 65917e0a8ab..5a804eca21d 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -16,6 +16,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/execution" + "go.k6.io/k6/execution/local" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/types" @@ -138,7 +139,7 @@ func TestSetupData(t *testing.T) { TeardownTimeout: types.NullDurationFrom(5 * time.Second), }, runner) - execScheduler, err := execution.NewScheduler(testState) + execScheduler, err := execution.NewScheduler(testState, local.NewController()) require.NoError(t, err) metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(t, err) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index f74f0ed5a20..9328ae1505f 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -16,6 +16,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/execution" + "go.k6.io/k6/execution/local" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" "go.k6.io/k6/metrics" @@ -112,7 +113,7 @@ func TestPatchStatus(t *testing.T) { require.NoError(t, err) testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{}) - execScheduler, err := execution.NewScheduler(testState) + execScheduler, err := execution.NewScheduler(testState, local.NewController()) require.NoError(t, err) metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) diff --git a/cmd/run.go b/cmd/run.go index 2c640513c17..72e9c1734ba 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -25,6 +25,7 @@ import ( "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/event" "go.k6.io/k6/execution" + "go.k6.io/k6/execution/local" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" "go.k6.io/k6/lib/consts" @@ -114,7 +115,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { // Create a local execution scheduler wrapping the runner. logger.Debug("Initializing the execution scheduler...") - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) if err != nil { return err } diff --git a/execution/controller.go b/execution/controller.go new file mode 100644 index 00000000000..7b1e1bb9238 --- /dev/null +++ b/execution/controller.go @@ -0,0 +1,53 @@ +package execution + +// Controller implementations are used to control the k6 execution of a test or +// test suite, either locally or in a distributed environment. +type Controller interface { + // GetOrCreateData requests the data chunk with the given ID, if it already + // exists. If it doesn't (i.e. this was the first time this function was + // called with that ID), the given callback is called and its result and + // error are saved for the ID and returned for all other calls with it. + // + // This is an atomic function, so any calls to it while the callback is + // being executed the the same ID will wait for the first call to to finish + // and receive its result. + // + // TODO: split apart into `Once()`, `SetData(), `GetData()` and implement + // the GetOrCreateData() behavior in a helper like the ones below? + GetOrCreateData(ID string, callback func() ([]byte, error)) ([]byte, error) + + // Signal is used to notify that the current instance has reached the given + // event ID, or that it has had an error. + Signal(eventID string, err error) error + + // Subscribe creates a listener for the specified event ID and returns a + // callback that can wait until all other instances have reached it. + Subscribe(eventID string) (wait func() error) +} + +// SignalAndWait implements a rendezvous point / barrier, a way for all +// instances to reach the same execution point and wait for each other, before +// they all ~simultaneously continue with the execution. +// +// It subscribes for the given event ID, signals that the current instance has +// reached it without an error, and then waits until all other instances have +// reached it or until there is an error in one of them. +func SignalAndWait(c Controller, eventID string) error { + wait := c.Subscribe(eventID) + + if err := c.Signal(eventID, nil); err != nil { + return err + } + return wait() +} + +// SignalErrorOrWait is a helper method that either immediately signals the +// given error and returns it, or it signals nominal completion and waits for +// all other instances to do the same (or signal an error themselves). +func SignalErrorOrWait(c Controller, eventID string, err error) error { + if err != nil { + _ = c.Signal(eventID, err) + return err // return the same error we got + } + return SignalAndWait(c, eventID) +} diff --git a/execution/local/controller.go b/execution/local/controller.go new file mode 100644 index 00000000000..33b2b9123e1 --- /dev/null +++ b/execution/local/controller.go @@ -0,0 +1,45 @@ +// Package local implements the execution.Controller interface for local +// (single-machine) k6 execution. +package local + +// Controller "controls" local tests. It doesn't actually do anything, it just +// implements the execution.Controller interface with no-op operations. The +// methods don't do anything because local tests have only a single instance. +// +// However, for test suites (https://github.com/grafana/k6/issues/1342) in the +// future, we will probably need to actually implement some of these methods and +// introduce simple synchronization primitives even for a single machine... +type Controller struct{} + +// NewController creates a new local execution Controller. +func NewController() *Controller { + return &Controller{} +} + +// GetOrCreateData immediately calls the given callback and returns its results. +func (c *Controller) GetOrCreateData(_ string, callback func() ([]byte, error)) ([]byte, error) { + return callback() +} + +// Subscribe is a no-op, it doesn't actually wait for anything, because there is +// nothing to wait on - we only have one instance in local tests. +// +// TODO: actually use waitgroups, since this may actually matter for test +// suites, even for local test runs. That's because multiple tests might be +// executed even by a single instance, and if we have complicated flows (e.g. +// "test C is executed only after test A and test B finish"), the easiest way +// would be for different tests in the suite to reuse this Controller API *both* +// local and distributed runs. +func (c *Controller) Subscribe(_ string) func() error { + return func() error { + return nil + } +} + +// Signal is a no-op, it doesn't actually do anything for local test runs. +// +// TODO: similar to Wait() above, this might actually be required for +// complex/branching test suites, even during local non-distributed execution. +func (c *Controller) Signal(_ string, _ error) error { + return nil +} diff --git a/execution/scheduler.go b/execution/scheduler.go index 5469e452d6c..f2c34a5a6f0 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -20,6 +20,8 @@ import ( // executors, running setup() and teardown(), and actually starting the // executors for the different scenarios at the appropriate times. type Scheduler struct { + controller Controller + initProgress *pb.ProgressBar executorConfigs []lib.ExecutorConfig // sorted by (startTime, ID) executors []lib.Executor // sorted by (startTime, ID), excludes executors with no work @@ -33,7 +35,7 @@ type Scheduler struct { // initializing it beyond the bare minimum. Specifically, it creates the needed // executor instances and a lot of state placeholders, but it doesn't initialize // the executors and it doesn't initialize or run VUs. -func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) { +func NewScheduler(trs *lib.TestRunState, controller Controller) (*Scheduler, error) { options := trs.Options et, err := lib.NewExecutionTuple(options.ExecutionSegment, options.ExecutionSegmentSequence) if err != nil { @@ -81,6 +83,7 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) { maxDuration: maxDuration, maxPossibleVUs: maxPossibleVUs, state: executionState, + controller: controller, }, nil } @@ -380,6 +383,13 @@ func (e *Scheduler) Init( ) (stopVUEmission func(), initErr error) { logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init") + if err := SignalAndWait(e.controller, "scheduler-init-start"); err != nil { + return nil, err + } + defer func() { + initErr = SignalErrorOrWait(e.controller, "scheduler-init-done", initErr) + }() + execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx) waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut) stopVUEmission = func() { @@ -405,16 +415,20 @@ func (e *Scheduler) Init( // Run the Scheduler, funneling all generated metric samples through the supplied // out channel. // -//nolint:funlen +//nolint:funlen, gocognit func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (runErr error) { logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run") + if err := SignalAndWait(e.controller, "scheduler-run-start"); err != nil { + return err + } defer func() { if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil { logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, runErr) e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted) runErr = interruptErr } + runErr = SignalErrorOrWait(e.controller, "scheduler-run-done", runErr) }() e.initProgress.Modify(pb.WithConstLeft("Run")) @@ -430,6 +444,10 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met } } + if err := SignalAndWait(e.controller, "test-ready-to-run-setup"); err != nil { + return err + } + e.initProgress.Modify(pb.WithConstProgress(1, "Starting test...")) e.state.MarkStarted() defer e.state.MarkEnded() @@ -449,11 +467,27 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met if !e.state.Test.Options.NoSetup.Bool { e.state.SetExecutionStatus(lib.ExecutionStatusSetup) e.initProgress.Modify(pb.WithConstProgress(1, "setup()")) - if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil { - logger.WithField("error", err).Debug("setup() aborted by error") + actuallyRanSetup := false + data, err := e.controller.GetOrCreateData("setup", func() ([]byte, error) { + actuallyRanSetup = true + if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil { + logger.WithField("error", err).Debug("setup() aborted by error") + return nil, err + } + return e.state.Test.Runner.GetSetupData(), nil + }) + if err != nil { return err } + if !actuallyRanSetup { + e.state.Test.Runner.SetSetupData(data) + } + } + + if err := SignalAndWait(e.controller, "setup-done"); err != nil { + return err } + e.initProgress.Modify(pb.WithHijack(e.getRunStats)) // Start all executors at their particular startTime in a separate goroutine... @@ -469,6 +503,8 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met // Wait for all executors to finish var firstErr error for range e.executors { + // TODO: add logic to abort the test early if there was an error from + // the controller (e.g. some other instance for this test died) err := <-runResults if err != nil && firstErr == nil { logger.WithError(err).Debug("Executor returned with an error, cancelling test run...") @@ -477,6 +513,10 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met } } + if err := SignalAndWait(e.controller, "execution-done"); err != nil { + return err + } + // Run teardown() after all executors are done, if it's not disabled if !e.state.Test.Options.NoTeardown.Bool { e.state.SetExecutionStatus(lib.ExecutionStatusTeardown) @@ -484,12 +524,23 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met // We run teardown() with the global context, so it isn't interrupted by // thresholds or test.abort() or even Ctrl+C (unless used twice). - if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil { - logger.WithField("error", err).Debug("teardown() aborted by error") + // TODO: add a `sync.Once` equivalent? + _, err := e.controller.GetOrCreateData("teardown", func() ([]byte, error) { + if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil { + logger.WithField("error", err).Debug("teardown() aborted by error") + return nil, err + } + return nil, nil + }) + if err != nil { return err } } + if err := SignalAndWait(e.controller, "teardown-done"); err != nil { + return err + } + return firstErr } diff --git a/execution/scheduler_ext_test.go b/execution/scheduler_ext_test.go index 46ceb2e206a..73796319403 100644 --- a/execution/scheduler_ext_test.go +++ b/execution/scheduler_ext_test.go @@ -19,6 +19,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/execution" + "go.k6.io/k6/execution/local" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/executor" @@ -73,7 +74,7 @@ func newTestScheduler( testRunState.Logger = logger } - execScheduler, err = execution.NewScheduler(testRunState) + execScheduler, err = execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) samples = make(chan metrics.SampleContainer, newOpts.MetricSamplesBufferSize.Int64) @@ -141,7 +142,7 @@ func TestSchedulerRunNonDefault(t *testing.T) { testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -258,7 +259,7 @@ func TestSchedulerRunEnv(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -332,7 +333,7 @@ func TestSchedulerSystemTags(t *testing.T) { }))) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -468,7 +469,7 @@ func TestSchedulerRunCustomTags(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -637,7 +638,7 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -978,7 +979,7 @@ func TestSchedulerEndIterations(t *testing.T) { defer cancel() testRunState := getTestRunState(t, getTestPreInitState(t), runner.GetOptions(), runner) - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) samples := make(chan metrics.SampleContainer, 300) @@ -1190,7 +1191,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, options, runner) - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -1362,7 +1363,7 @@ func TestNewSchedulerHasWork(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) assert.Len(t, execScheduler.GetExecutors(), 2) diff --git a/execution/scheduler_int_test.go b/execution/scheduler_int_test.go index c0451ad7e63..455e4f198e2 100644 --- a/execution/scheduler_int_test.go +++ b/execution/scheduler_int_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.k6.io/k6/execution/local" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" @@ -43,7 +44,7 @@ func TestSetPaused(t *testing.T) { t.Run("second pause is an error", func(t *testing.T) { t.Parallel() testRunState := getBogusTestRunState(t) - sched, err := NewScheduler(testRunState) + sched, err := NewScheduler(testRunState, local.NewController()) require.NoError(t, err) sched.executors = []lib.Executor{pausableExecutor{err: nil}} @@ -56,7 +57,7 @@ func TestSetPaused(t *testing.T) { t.Run("unpause at the start is an error", func(t *testing.T) { t.Parallel() testRunState := getBogusTestRunState(t) - sched, err := NewScheduler(testRunState) + sched, err := NewScheduler(testRunState, local.NewController()) require.NoError(t, err) sched.executors = []lib.Executor{pausableExecutor{err: nil}} err = sched.SetPaused(false) @@ -67,7 +68,7 @@ func TestSetPaused(t *testing.T) { t.Run("second unpause is an error", func(t *testing.T) { t.Parallel() testRunState := getBogusTestRunState(t) - sched, err := NewScheduler(testRunState) + sched, err := NewScheduler(testRunState, local.NewController()) require.NoError(t, err) sched.executors = []lib.Executor{pausableExecutor{err: nil}} require.NoError(t, sched.SetPaused(true)) @@ -80,7 +81,7 @@ func TestSetPaused(t *testing.T) { t.Run("an error on pausing is propagated", func(t *testing.T) { t.Parallel() testRunState := getBogusTestRunState(t) - sched, err := NewScheduler(testRunState) + sched, err := NewScheduler(testRunState, local.NewController()) require.NoError(t, err) expectedErr := errors.New("testing pausable executor error") sched.executors = []lib.Executor{pausableExecutor{err: expectedErr}} diff --git a/js/runner_test.go b/js/runner_test.go index 4b34dcd1641..25d2787b82c 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -34,6 +34,7 @@ import ( "go.k6.io/k6/errext" "go.k6.io/k6/execution" + "go.k6.io/k6/execution/local" "go.k6.io/k6/js/modules/k6" k6http "go.k6.io/k6/js/modules/k6/http" k6metrics "go.k6.io/k6/js/modules/k6/metrics" @@ -383,7 +384,7 @@ func TestDataIsolation(t *testing.T) { RunTags: runner.preInitState.Registry.RootTagSet().WithTagsFromMap(options.RunTags), } - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) @@ -2704,7 +2705,7 @@ func TestExecutionInfo(t *testing.T) { Runner: r, } - execScheduler, err := execution.NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) ctx = lib.WithExecutionState(ctx, execScheduler.GetState())