Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feature/613
Browse files Browse the repository at this point in the history
  • Loading branch information
openmohan committed Jul 3, 2018
2 parents ce37b01 + 38bed9a commit d2954ee
Show file tree
Hide file tree
Showing 41 changed files with 1,005 additions and 554 deletions.
8 changes: 5 additions & 3 deletions api/v1/setup_teardown_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ func HandleSetSetupData(rw http.ResponseWriter, r *http.Request, p httprouter.Pa

// HandleRunSetup executes the runner's Setup() method and returns the result
func HandleRunSetup(rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
runner := common.GetEngine(r.Context()).Executor.GetRunner()
engine := common.GetEngine(r.Context())
runner := engine.Executor.GetRunner()

if err := runner.Setup(r.Context()); err != nil {
if err := runner.Setup(r.Context(), engine.Samples); err != nil {
apiError(rw, "Error executing setup", err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -96,9 +97,10 @@ func HandleRunSetup(rw http.ResponseWriter, r *http.Request, p httprouter.Params

// HandleRunTeardown executes the runner's Teardown() method
func HandleRunTeardown(rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
engine := common.GetEngine(r.Context())
runner := common.GetEngine(r.Context()).Executor.GetRunner()

if err := runner.Teardown(r.Context()); err != nil {
if err := runner.Teardown(r.Context(), engine.Samples); err != nil {
apiError(rw, "Error executing teardown", err.Error(), http.StatusInternalServerError)
}
}
8 changes: 4 additions & 4 deletions api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func TestSetupData(t *testing.T) {
engine, err := core.NewEngine(executor, runner.GetOptions())
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
errC := make(chan error)
go func() { errC <- engine.Run(ctx) }()

handler := NewHandler()

checkSetup := func(method, body, expResult string) {
Expand All @@ -102,10 +106,6 @@ func TestSetupData(t *testing.T) {
checkSetup("PUT", `{"v":2, "test":"mest"}`, `{"data": {"v":2, "test":"mest"}}`)
checkSetup("GET", "", `{"data": {"v":2, "test":"mest"}}`)

ctx, cancel := context.WithCancel(context.Background())
errC := make(chan error)
go func() { errC <- engine.Run(ctx) }()

engine.Executor.SetPaused(false)

select {
Expand Down
14 changes: 10 additions & 4 deletions cmd/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/kelseyhightower/envconfig"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/stats/cloud"
"github.com/loadimpact/k6/ui"
"github.com/pkg/errors"
Expand Down Expand Up @@ -168,7 +169,7 @@ This will execute the test on the Load Impact cloud service. Use "k6 login cloud
case <-ticker.C:
testProgress, progressErr = client.GetTestProgress(refID)
if progressErr == nil {
if (testProgress.RunStatus > 2) || (exitOnRunning && testProgress.RunStatus == 2) {
if (testProgress.RunStatus > lib.RunStatusRunning) || (exitOnRunning && testProgress.RunStatus == lib.RunStatusRunning) {
shouldExitLoop = true
}
progress.Progress = testProgress.Progress
Expand All @@ -180,17 +181,22 @@ This will execute the test on the Load Impact cloud service. Use "k6 login cloud
break runningLoop
}
case sig := <-sigC:
log.WithField("sig", sig).Debug("Exiting in response to signal")
log.WithField("sig", sig).Print("Exiting in response to signal...")
err := client.StopCloudTestRun(refID)
if err != nil {
log.WithError(err).Error("Stop cloud test error")
}
shouldExitLoop = true
shouldExitLoop = true // Exit after the next GetTestProgress call
}
}

if testProgress == nil {
return ExitCode{errors.New("Test progress error"), 98}
}

fmt.Fprintf(stdout, " test status: %s\n", ui.ValueColor.Sprint(testProgress.RunStatusText))

if testProgress.ResultStatus == 1 {
if testProgress.ResultStatus == cloud.ResultStatusFailed {
return ExitCode{errors.New("The test has failed"), 99}
}

Expand Down
14 changes: 4 additions & 10 deletions cmd/login_influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,34 +54,28 @@ This will set the default server used when just "-o influxdb" is passed.`,
}
conf = conf.Apply(urlConf)
}
if conf.Addr == "" {
conf.Addr = "http://localhost:8086"
}
if conf.DB == "" {
conf.DB = "k6"
}

form := ui.Form{
Fields: []ui.Field{
ui.StringField{
Key: "Addr",
Label: "Address",
Default: conf.Addr,
Default: conf.Addr.String,
},
ui.StringField{
Key: "DB",
Label: "Database",
Default: conf.DB,
Default: conf.DB.String,
},
ui.StringField{
Key: "Username",
Label: "Username",
Default: conf.Username,
Default: conf.Username.String,
},
ui.StringField{
Key: "Password",
Label: "Password",
Default: conf.Password,
Default: conf.Password.String,
},
},
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func getOptions(flags *pflag.FlagSet) (lib.Options, error) {
// TODO: find a saner and more dev-friendly and error-proof way to handle options
SetupTimeout: types.NullDuration{Duration: types.Duration(10 * time.Second), Valid: false},
TeardownTimeout: types.NullDuration{Duration: types.Duration(10 * time.Second), Valid: false},

MetricSamplesBufferSize: null.NewInt(1000, false),
}

stageStrings, err := flags.GetStringSlice("stage")
Expand Down
37 changes: 25 additions & 12 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
const (
TickRate = 1 * time.Millisecond
MetricsRate = 1 * time.Second
CollectRate = 10 * time.Millisecond
CollectRate = 50 * time.Millisecond
ThresholdsRate = 2 * time.Second
ShutdownTimeout = 10 * time.Second

Expand All @@ -59,6 +59,8 @@ type Engine struct {
Metrics map[string]*stats.Metric
MetricsLock sync.Mutex

Samples chan stats.SampleContainer

// Assigned to metrics upon first received sample.
thresholds map[string]stats.Thresholds
submetrics map[string][]*stats.Submetric
Expand All @@ -76,6 +78,7 @@ func NewEngine(ex lib.Executor, o lib.Options) (*Engine, error) {
Executor: ex,
Options: o,
Metrics: make(map[string]*stats.Metric),
Samples: make(chan stats.SampleContainer, o.MetricSamplesBufferSize.Int64),
}
e.SetLogger(log.StandardLogger())

Expand Down Expand Up @@ -104,7 +107,7 @@ func NewEngine(ex lib.Executor, o lib.Options) (*Engine, error) {
return e, nil
}

func (e *Engine) setRunStatus(status int) {
func (e *Engine) setRunStatus(status lib.RunStatus) {
if len(e.Collectors) == 0 {
return
}
Expand Down Expand Up @@ -173,15 +176,15 @@ func (e *Engine) Run(ctx context.Context) error {
}

// Run the executor.
out := make(chan []stats.SampleContainer)
errC := make(chan error)
subwg.Add(1)
go func() {
errC <- e.Executor.Run(subctx, out)
errC <- e.Executor.Run(subctx, e.Samples)
e.logger.Debug("Engine: Executor terminated")
subwg.Done()
}()

sampleContainers := []stats.SampleContainer{}
defer func() {
// Shut down subsystems.
subcancel()
Expand All @@ -194,10 +197,14 @@ func (e *Engine) Run(ctx context.Context) error {
errC = nil
}
subwg.Wait()
close(out)
close(e.Samples)
}()
for sampleContainers := range out {
e.processSamples(sampleContainers...)

for sc := range e.Samples {
sampleContainers = append(sampleContainers, sc)
}
if len(sampleContainers) > 0 {
e.processSamples(sampleContainers)
}

// Emit final metrics.
Expand All @@ -213,10 +220,16 @@ func (e *Engine) Run(ctx context.Context) error {
collectorwg.Wait()
}()

ticker := time.NewTicker(CollectRate)
for {
select {
case sampleContainers := <-out:
e.processSamples(sampleContainers...)
case <-ticker.C:
if len(sampleContainers) > 0 {
e.processSamples(sampleContainers)
sampleContainers = []stats.SampleContainer{}
}
case sc := <-e.Samples:
sampleContainers = append(sampleContainers, sc)
case err := <-errC:
errC = nil
if err != nil {
Expand Down Expand Up @@ -262,7 +275,7 @@ func (e *Engine) runMetricsEmission(ctx context.Context) {
func (e *Engine) emitMetrics() {
t := time.Now()

e.processSamples(stats.ConnectedSamples{
e.processSamples([]stats.SampleContainer{stats.ConnectedSamples{
Samples: []stats.Sample{
{
Time: t,
Expand All @@ -278,7 +291,7 @@ func (e *Engine) emitMetrics() {
},
Tags: e.Options.RunTags,
Time: t,
})
}})
}

func (e *Engine) runThresholds(ctx context.Context, abort func()) {
Expand Down Expand Up @@ -330,7 +343,7 @@ func (e *Engine) processThresholds(abort func()) {
}
}

func (e *Engine) processSamples(sampleCointainers ...stats.SampleContainer) {
func (e *Engine) processSamples(sampleCointainers []stats.SampleContainer) {
if len(sampleCointainers) == 0 {
return
}
Expand Down
Loading

0 comments on commit d2954ee

Please sign in to comment.