Skip to content

Commit

Permalink
Remove the core.Engine, finally 🎉
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Jan 25, 2023
1 parent 943e595 commit 6f6b105
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 972 deletions.
26 changes: 14 additions & 12 deletions api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
)

func TestSetupData(t *testing.T) {
Expand Down Expand Up @@ -138,31 +140,30 @@ func TestSetupData(t *testing.T) {

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
t.Cleanup(globalCancel)
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger)
defer runAbort(fmt.Errorf("unexpected abort"))

require.NoError(t, engine.OutputManager.StartOutputs())
defer engine.OutputManager.StopOutputs(nil)
outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort)
samples := make(chan metrics.SampleContainer, 1000)
_, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
defer stopOutputs(nil)

cs := &ControlSurface{
RunCtx: runCtx,
Samples: engine.Samples,
MetricsEngine: engine.MetricsEngine,
Samples: samples,
MetricsEngine: metricsEngine,
Scheduler: execScheduler,
RunState: testState,
}
run, wait, err := engine.Init(globalCtx, runCtx)
require.NoError(t, err)

defer wait()

errC := make(chan error)
go func() { errC <- run() }()
go func() { errC <- execScheduler.Run(globalCtx, runCtx, samples) }()

handler := NewHandler(cs)

Expand Down Expand Up @@ -194,6 +195,7 @@ func TestSetupData(t *testing.T) {
case <-time.After(10 * time.Second):
t.Fatal("Test timed out")
case err := <-errC:
close(samples)
require.NoError(t, err)
}
})
Expand Down
31 changes: 17 additions & 14 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
)

func TestGetStatus(t *testing.T) {
Expand Down Expand Up @@ -111,39 +113,40 @@ func TestPatchStatus(t *testing.T) {
testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{})
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)

require.NoError(t, engine.OutputManager.StartOutputs())
defer engine.OutputManager.StopOutputs(nil)
metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
t.Cleanup(globalCancel)
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger)
defer runAbort(fmt.Errorf("unexpected abort"))
engine.AbortFn = runAbort

outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort)
samples := make(chan metrics.SampleContainer, 1000)
waitMetricsFlushed, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
defer stopOutputs(nil)

cs := &ControlSurface{
RunCtx: runCtx,
Samples: engine.Samples,
MetricsEngine: engine.MetricsEngine,
Samples: samples,
MetricsEngine: metricsEngine,
Scheduler: execScheduler,
RunState: testState,
}

run, wait, err := engine.Init(globalCtx, runCtx)
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
defer func() {
runAbort(fmt.Errorf("custom cancel signal"))
wait()
waitMetricsFlushed()
wg.Wait()
}()

go func() {
assert.ErrorContains(t, run(), "custom cancel signal")
assert.ErrorContains(t, execScheduler.Run(globalCtx, runCtx, samples), "custom cancel signal")
close(samples)
wg.Done()
}()
// wait for the executor to initialize to avoid a potential data race below
Expand Down
Loading

0 comments on commit 6f6b105

Please sign in to comment.