Skip to content

Commit

Permalink
Merge pull request #2815 from grafana/remove-the-engine
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- authored Feb 2, 2023
2 parents 1da5da3 + e73e804 commit 4e88cc1
Show file tree
Hide file tree
Showing 16 changed files with 1,068 additions and 1,774 deletions.
2 changes: 1 addition & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa
execScheduler, err := execution.NewScheduler(testState)
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(execScheduler.GetState())
me, err := engine.NewMetricsEngine(testState)
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
26 changes: 14 additions & 12 deletions api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
)

func TestSetupData(t *testing.T) {
Expand Down Expand Up @@ -138,31 +140,30 @@ func TestSetupData(t *testing.T) {

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
metricsEngine, err := engine.NewMetricsEngine(testState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
t.Cleanup(globalCancel)
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger)
defer runAbort(fmt.Errorf("unexpected abort"))

require.NoError(t, engine.OutputManager.StartOutputs())
defer engine.OutputManager.StopOutputs(nil)
outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort)
samples := make(chan metrics.SampleContainer, 1000)
_, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
defer stopOutputs(nil)

cs := &ControlSurface{
RunCtx: runCtx,
Samples: engine.Samples,
MetricsEngine: engine.MetricsEngine,
Samples: samples,
MetricsEngine: metricsEngine,
Scheduler: execScheduler,
RunState: testState,
}
run, wait, err := engine.Init(globalCtx, runCtx)
require.NoError(t, err)

defer wait()

errC := make(chan error)
go func() { errC <- run() }()
go func() { errC <- execScheduler.Run(globalCtx, runCtx, samples) }()

handler := NewHandler(cs)

Expand Down Expand Up @@ -194,6 +195,7 @@ func TestSetupData(t *testing.T) {
case <-time.After(10 * time.Second):
t.Fatal("Test timed out")
case err := <-errC:
close(samples)
require.NoError(t, err)
}
})
Expand Down
32 changes: 20 additions & 12 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
)

func TestGetStatus(t *testing.T) {
Expand Down Expand Up @@ -99,6 +101,7 @@ func TestPatchStatus(t *testing.T) {
}

for name, testCase := range testData {
name, testCase := name, testCase
t.Run(name, func(t *testing.T) {
t.Parallel()

Expand All @@ -111,39 +114,44 @@ func TestPatchStatus(t *testing.T) {
testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{})
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)

require.NoError(t, engine.OutputManager.StartOutputs())
defer engine.OutputManager.StopOutputs(nil)
metricsEngine, err := engine.NewMetricsEngine(testState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
t.Cleanup(globalCancel)
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger)
defer runAbort(fmt.Errorf("unexpected abort"))
engine.AbortFn = runAbort

outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort)
samples := make(chan metrics.SampleContainer, 1000)
waitMetricsFlushed, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
defer stopOutputs(nil)

cs := &ControlSurface{
RunCtx: runCtx,
Samples: engine.Samples,
MetricsEngine: engine.MetricsEngine,
Samples: samples,
MetricsEngine: metricsEngine,
Scheduler: execScheduler,
RunState: testState,
}

run, wait, err := engine.Init(globalCtx, runCtx)
stopEmission, err := execScheduler.Init(runCtx, samples)
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
defer func() {
runAbort(fmt.Errorf("custom cancel signal"))
wait()
waitMetricsFlushed()
wg.Wait()
}()

go func() {
assert.ErrorContains(t, run(), "custom cancel signal")
assert.ErrorContains(t, execScheduler.Run(globalCtx, runCtx, samples), "custom cancel signal")
stopEmission()
close(samples)
wg.Done()
}()
// wait for the executor to initialize to avoid a potential data race below
Expand Down
Loading

0 comments on commit 4e88cc1

Please sign in to comment.