Skip to content

Commit

Permalink
Add global VU IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Mirić committed Jun 21, 2021
1 parent 01a82de commit af0cffe
Show file tree
Hide file tree
Showing 23 changed files with 196 additions and 143 deletions.
10 changes: 5 additions & 5 deletions core/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ func (e *ExecutionScheduler) GetExecutionPlan() []lib.ExecutionStep ***REMOVED**
func (e *ExecutionScheduler) initVU(
samplesOut chan<- stats.SampleContainer, logger *logrus.Entry,
) (lib.InitializedVU, error) ***REMOVED***
// Get the VU ID here, so that the VUs are (mostly) ordered by their
// Get the VU IDs here, so that the VUs are (mostly) ordered by their
// number in the channel buffer
vuID := e.state.GetUniqueVUIdentifier()
vu, err := e.runner.NewVU(vuID, samplesOut)
vuIDLocal, vuIDGlobal := e.state.GetUniqueVUIdentifiers()
vu, err := e.runner.NewVU(vuIDLocal, vuIDGlobal, samplesOut)
if err != nil ***REMOVED***
return nil, errext.WithHint(err, fmt.Sprintf("error while initializing VU #%d", vuID))
return nil, errext.WithHint(err, fmt.Sprintf("error while initializing VU #%d", vuIDGlobal))
***REMOVED***

logger.Debugf("Initialized VU #%d", vuID)
logger.Debugf("Initialized VU #%d", vuIDGlobal)
return vu, nil
***REMOVED***

Expand Down
4 changes: 2 additions & 2 deletions js/console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestConsole(t *testing.T) ***REMOVED***
assert.NoError(t, err)
samples := make(chan stats.SampleContainer, 100)
initVU, err := r.newVU(1, samples)
initVU, err := r.newVU(1, 1, samples)
assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestFileConsole(t *testing.T) ***REMOVED***
assert.NoError(t, err)

samples := make(chan stats.SampleContainer, 100)
initVU, err := r.newVU(1, samples)
initVU, err := r.newVU(1, 1, samples)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion js/empty_iteartions_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func BenchmarkEmptyIteration(b *testing.B) ***REMOVED***
for range ch ***REMOVED***
***REMOVED***
***REMOVED***()
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
if !assert.NoError(b, err) ***REMOVED***
return
***REMOVED***
Expand Down
4 changes: 2 additions & 2 deletions js/http_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func BenchmarkHTTPRequests(b *testing.B) ***REMOVED***
for range ch ***REMOVED***
***REMOVED***
***REMOVED***()
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
if !assert.NoError(b, err) ***REMOVED***
return
***REMOVED***
Expand Down Expand Up @@ -105,7 +105,7 @@ func BenchmarkHTTPRequestsBase(b *testing.B) ***REMOVED***
for range ch ***REMOVED***
***REMOVED***
***REMOVED***()
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
if !assert.NoError(b, err) ***REMOVED***
return
***REMOVED***
Expand Down
7 changes: 4 additions & 3 deletions js/init_and_modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import (
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/js"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/loader"
"go.k6.io/k6/stats"
"gopkg.in/guregu/null.v3"
)

type CheckModule struct ***REMOVED***
Expand Down Expand Up @@ -94,7 +95,7 @@ func TestNewJSRunnerWithCustomModule(t *testing.T) ***REMOVED***
assert.Equal(t, checkModule.initCtxCalled, 1)
assert.Equal(t, checkModule.vuCtxCalled, 0)

vu, err := runner.NewVU(1, make(chan stats.SampleContainer, 100))
vu, err := runner.NewVU(1, 1, make(chan stats.SampleContainer, 100))
require.NoError(t, err)
assert.Equal(t, checkModule.initCtxCalled, 2)
assert.Equal(t, checkModule.vuCtxCalled, 0)
Expand All @@ -118,7 +119,7 @@ func TestNewJSRunnerWithCustomModule(t *testing.T) ***REMOVED***
require.NoError(t, err)
assert.Equal(t, checkModule.initCtxCalled, 3) // changes because we need to get the exported functions
assert.Equal(t, checkModule.vuCtxCalled, 2)
vuFromArc, err := runnerFromArc.NewVU(2, make(chan stats.SampleContainer, 100))
vuFromArc, err := runnerFromArc.NewVU(2, 2, make(chan stats.SampleContainer, 100))
require.NoError(t, err)
assert.Equal(t, checkModule.initCtxCalled, 4)
assert.Equal(t, checkModule.vuCtxCalled, 2)
Expand Down
20 changes: 10 additions & 10 deletions js/module_loading_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestLoadOnceGlobalVars(t *testing.T) ***REMOVED***
t.Parallel()
ch := newDevNullSampleChannel()
defer close(ch)
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestLoadExportsIsUsableInModule(t *testing.T) ***REMOVED***
t.Parallel()
ch := newDevNullSampleChannel()
defer close(ch)
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vu := initVU.Activate(&lib.VUActivationParams***REMOVED***RunContext: ctx***REMOVED***)
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestLoadDoesntBreakHTTPGet(t *testing.T) ***REMOVED***
t.Parallel()
ch := newDevNullSampleChannel()
defer close(ch)
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestLoadGlobalVarsAreNotSharedBetweenVUs(t *testing.T) ***REMOVED***
t.Parallel()
ch := newDevNullSampleChannel()
defer close(ch)
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -270,7 +270,7 @@ func TestLoadGlobalVarsAreNotSharedBetweenVUs(t *testing.T) ***REMOVED***
require.NoError(t, err)
// run a second VU
initVU, err = r.NewVU(2, ch)
initVU, err = r.NewVU(2, 2, ch)
require.NoError(t, err)
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestLoadCycle(t *testing.T) ***REMOVED***
t.Parallel()
ch := newDevNullSampleChannel()
defer close(ch)
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestLoadCycleBinding(t *testing.T) ***REMOVED***
t.Parallel()
ch := newDevNullSampleChannel()
defer close(ch)
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestBrowserified(t *testing.T) ***REMOVED***
t.Parallel()
ch := make(chan stats.SampleContainer, 100)
defer close(ch)
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestLoadingUnexistingModuleDoesntPanic(t *testing.T) ***REMOVED***
t.Parallel()
ch := newDevNullSampleChannel()
defer close(ch)
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestLoadingSourceMapsDoesntErrorOut(t *testing.T) ***REMOVED***
t.Parallel()
ch := newDevNullSampleChannel()
defer close(ch)
initVU, err := r.NewVU(1, ch)
initVU, err := r.NewVU(1, 1, ch)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion js/modules/k6/marshalling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestSetupDataMarshalling(t *testing.T) ***REMOVED***
if !assert.NoError(t, runner.Setup(context.Background(), samples)) ***REMOVED***
return
***REMOVED***
initVU, err := runner.NewVU(1, samples)
initVU, err := runner.NewVU(1, 1, samples)
if assert.NoError(t, err) ***REMOVED***
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
47 changes: 25 additions & 22 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,18 @@ func (r *Runner) MakeArchive() *lib.Archive ***REMOVED***
***REMOVED***

// NewVU returns a new initialized VU.
func (r *Runner) NewVU(id uint64, samplesOut chan<- stats.SampleContainer) (lib.InitializedVU, error) ***REMOVED***
vu, err := r.newVU(id, samplesOut)
func (r *Runner) NewVU(idLocal, idGlobal uint64, samplesOut chan<- stats.SampleContainer) (lib.InitializedVU, error) ***REMOVED***
vu, err := r.newVU(idLocal, idGlobal, samplesOut)
if err != nil ***REMOVED***
return nil, err
***REMOVED***
return lib.InitializedVU(vu), nil
***REMOVED***

// nolint:funlen
func (r *Runner) newVU(id uint64, samplesOut chan<- stats.SampleContainer) (*VU, error) ***REMOVED***
func (r *Runner) newVU(idLocal, idGlobal uint64, samplesOut chan<- stats.SampleContainer) (*VU, error) ***REMOVED***
// Instantiate a new bundle, make a VU out of it.
bi, err := r.Bundle.Instantiate(r.Logger, id)
bi, err := r.Bundle.Instantiate(r.Logger, idLocal)
if err != nil ***REMOVED***
return nil, err
***REMOVED***
Expand Down Expand Up @@ -175,8 +175,8 @@ func (r *Runner) newVU(id uint64, samplesOut chan<- stats.SampleContainer) (*VU,
***REMOVED***
if r.Bundle.Options.LocalIPs.Valid ***REMOVED***
var ipIndex uint64
if id > 0 ***REMOVED***
ipIndex = uint64(id - 1)
if idLocal > 0 ***REMOVED***
ipIndex = idLocal - 1
***REMOVED***
dialer.Dialer.LocalAddr = &net.TCPAddr***REMOVED***IP: r.Bundle.Options.LocalIPs.Pool.GetIP(ipIndex)***REMOVED***
***REMOVED***
Expand Down Expand Up @@ -207,7 +207,8 @@ func (r *Runner) newVU(id uint64, samplesOut chan<- stats.SampleContainer) (*VU,
***REMOVED***

vu := &VU***REMOVED***
ID: id,
ID: idLocal,
IDGlobal: idGlobal,
iteration: int64(-1),
BundleInstance: *bi,
Runner: r,
Expand All @@ -223,18 +224,19 @@ func (r *Runner) newVU(id uint64, samplesOut chan<- stats.SampleContainer) (*VU,
***REMOVED***

vu.state = &lib.State***REMOVED***
Logger: vu.Runner.Logger,
Options: vu.Runner.Bundle.Options,
Transport: vu.Transport,
Dialer: vu.Dialer,
TLSConfig: vu.TLSConfig,
CookieJar: cookieJar,
RPSLimit: vu.Runner.RPSLimit,
BPool: vu.BPool,
Vu: vu.ID,
Samples: vu.Samples,
Tags: vu.Runner.Bundle.Options.RunTags.CloneTags(),
Group: r.defaultGroup,
Logger: vu.Runner.Logger,
Options: vu.Runner.Bundle.Options,
Transport: vu.Transport,
Dialer: vu.Dialer,
TLSConfig: vu.TLSConfig,
CookieJar: cookieJar,
RPSLimit: vu.Runner.RPSLimit,
BPool: vu.BPool,
VUID: vu.ID,
VUIDGlobal: vu.IDGlobal,
Samples: vu.Samples,
Tags: vu.Runner.Bundle.Options.RunTags.CloneTags(),
Group: r.defaultGroup,
***REMOVED***
vu.Runtime.Set("console", common.Bind(vu.Runtime, vu.Console, vu.Context))

Expand Down Expand Up @@ -325,7 +327,7 @@ func (r *Runner) HandleSummary(ctx context.Context, summary *lib.Summary) (map[s
***REMOVED***
***REMOVED***()

vu, err := r.newVU(0, out)
vu, err := r.newVU(0, 0, out)
if err != nil ***REMOVED***
return nil, err
***REMOVED***
Expand Down Expand Up @@ -463,7 +465,7 @@ func parseTTL(ttlS string) (time.Duration, error) ***REMOVED***
// Runs an exported function in its own temporary VU, optionally with an argument. Execution is
// interrupted if the context expires. No error is returned if the part does not exist.
func (r *Runner) runPart(ctx context.Context, out chan<- stats.SampleContainer, name string, arg interface***REMOVED******REMOVED***) (goja.Value, error) ***REMOVED***
vu, err := r.newVU(0, out)
vu, err := r.newVU(0, 0, out)
if err != nil ***REMOVED***
return goja.Undefined(), err
***REMOVED***
Expand Down Expand Up @@ -533,7 +535,8 @@ type VU struct ***REMOVED***
Dialer *netext.Dialer
CookieJar *cookiejar.Jar
TLSConfig *tls.Config
ID uint64
ID uint64 // local to the current instance
IDGlobal uint64 // global across all instances
iteration int64

Console *console
Expand Down
Loading

0 comments on commit af0cffe

Please sign in to comment.