From 38ef8432f0c512f400859c28472b93d8e50425ce Mon Sep 17 00:00:00 2001 From: Paul Armstrong Date: Thu, 1 Jun 2023 08:39:51 +0000 Subject: [PATCH 1/3] Log incoming jobs. Log the full contents of the job protobuf to make debugging jobs easier --- pkg/model/workflow.go | 1 + pkg/runner/runner.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/pkg/model/workflow.go b/pkg/model/workflow.go index e7d43e4b338..19edfb43db7 100644 --- a/pkg/model/workflow.go +++ b/pkg/model/workflow.go @@ -417,6 +417,7 @@ func (j *Job) GetMatrixes() ([]map[string]interface{}, error) { } } else { matrixes = append(matrixes, make(map[string]interface{})) + log.Debugf("Empty Strategy, matrixes=%v", matrixes) } return matrixes, nil } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index a47cf8b19dc..02f7faa6328 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -103,15 +103,45 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { maxJobNameLen := 0 stagePipeline := make([]common.Executor, 0) + log.Debugf("Plan Stages: %v", plan.Stages) + for i := range plan.Stages { stage := plan.Stages[i] stagePipeline = append(stagePipeline, func(ctx context.Context) error { pipeline := make([]common.Executor, 0) for _, run := range stage.Runs { + log.Debugf("Stages Runs: %v", stage.Runs) stageExecutor := make([]common.Executor, 0) job := run.Job() + log.Debugf("Job.Name: %v", job.Name) + log.Debugf("Job.RawNeeds: %v", job.RawNeeds) + log.Debugf("Job.RawRunsOn: %v", job.RawRunsOn) + log.Debugf("Job.Env: %v", job.Env) + log.Debugf("Job.If: %v", job.If) + for step := range job.Steps { + if nil != job.Steps[step] { + log.Debugf("Job.Steps: %v", job.Steps[step].String()) + } + } + log.Debugf("Job.TimeoutMinutes: %v", job.TimeoutMinutes) + log.Debugf("Job.Services: %v", job.Services) + log.Debugf("Job.Strategy: %v", job.Strategy) + log.Debugf("Job.RawContainer: %v", job.RawContainer) + log.Debugf("Job.Defaults.Run.Shell: %v", job.Defaults.Run.Shell) + log.Debugf("Job.Defaults.Run.WorkingDirectory: %v", job.Defaults.Run.WorkingDirectory) + log.Debugf("Job.Outputs: %v", job.Outputs) + log.Debugf("Job.Uses: %v", job.Uses) + log.Debugf("Job.With: %v", job.With) + //log.Debugf("Job.RawSecrets: %v", job.RawSecrets) + log.Debugf("Job.Result: %v", job.Result) if job.Strategy != nil { + log.Debugf("Job.Strategy.FailFast: %v", job.Strategy.FailFast) + log.Debugf("Job.Strategy.MaxParallel: %v", job.Strategy.MaxParallel) + log.Debugf("Job.Strategy.FailFastString: %v", job.Strategy.FailFastString) + log.Debugf("Job.Strategy.MaxParallelString: %v", job.Strategy.MaxParallelString) + log.Debugf("Job.Strategy.RawMatrix: %v", job.Strategy.RawMatrix) + strategyRc := runner.newRunContext(ctx, run, nil) if err := strategyRc.NewExpressionEvaluator(ctx).EvaluateYamlNode(ctx, &job.Strategy.RawMatrix); err != nil { log.Errorf("Error while evaluating matrix: %v", err) @@ -122,6 +152,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { if m, err := job.GetMatrixes(); err != nil { log.Errorf("Error while get job's matrix: %v", err) } else { + log.Debugf("Job Matrices: %v", m) + log.Debugf("Runner Matrices: %v", runner.config.Matrix) matrixes = selectMatrixes(m, runner.config.Matrix) } log.Debugf("Final matrix after applying user inclusions '%v'", matrixes) From e59526776cf742a3099b7f1527a3ba2e85419812 Mon Sep 17 00:00:00 2001 From: Paul Armstrong Date: Fri, 2 Jun 2023 00:23:34 +1000 Subject: [PATCH 2/3] Ensure that the parallel executor always uses at least one thread. The caller may mis-calculate the number of CPUs as zero, in which case ensure that at least one thread is spawned. --- pkg/common/executor.go | 7 +++++++ pkg/common/executor_test.go | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/pkg/common/executor.go b/pkg/common/executor.go index c5b05f3b8c2..a5eb079b039 100644 --- a/pkg/common/executor.go +++ b/pkg/common/executor.go @@ -3,6 +3,8 @@ package common import ( "context" "fmt" + + log "github.com/sirupsen/logrus" ) // Warning that implements `error` but safe to ignore @@ -94,6 +96,11 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor { work := make(chan Executor, len(executors)) errs := make(chan error, len(executors)) + if 1 > parallel { + log.Infof("Parallel tasks (%d) below minimum, setting to 1", parallel) + parallel = 1 + } + for i := 0; i < parallel; i++ { go func(work <-chan Executor, errs chan<- error) { for executor := range work { diff --git a/pkg/common/executor_test.go b/pkg/common/executor_test.go index 7f691e42952..e70c638e912 100644 --- a/pkg/common/executor_test.go +++ b/pkg/common/executor_test.go @@ -100,6 +100,17 @@ func TestNewParallelExecutor(t *testing.T) { assert.Equal(3, count, "should run all 3 executors") assert.Equal(2, maxCount, "should run at most 2 executors in parallel") assert.Nil(err) + + // Reset to test running the executor with 0 parallelism + count = 0 + activeCount = 0 + maxCount = 0 + + errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) + + assert.Equal(3, count, "should run all 3 executors") + assert.Equal(1, maxCount, "should run at most 1 executors in parallel") + assert.Nil(errSingle) } func TestNewParallelExecutorFailed(t *testing.T) { From b9f7945f9253d0b3557eafcd72fd88b6ed4c9db9 Mon Sep 17 00:00:00 2001 From: Paul Armstrong Date: Fri, 2 Jun 2023 00:33:34 +1000 Subject: [PATCH 3/3] Use runtime.NumCPU for CPU counts. For hosts without docker, GetHostInfo() returns a blank struct which has zero CPUs and causes downstream trouble. --- pkg/runner/runner.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 02f7faa6328..e1d8d8a7b74 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -5,11 +5,11 @@ import ( "encoding/json" "fmt" "os" + "runtime" log "github.com/sirupsen/logrus" "github.com/nektos/act/pkg/common" - "github.com/nektos/act/pkg/container" "github.com/nektos/act/pkg/model" ) @@ -132,7 +132,7 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { log.Debugf("Job.Outputs: %v", job.Outputs) log.Debugf("Job.Uses: %v", job.Uses) log.Debugf("Job.With: %v", job.With) - //log.Debugf("Job.RawSecrets: %v", job.RawSecrets) + // log.Debugf("Job.RawSecrets: %v", job.RawSecrets) log.Debugf("Job.Result: %v", job.Result) if job.Strategy != nil { @@ -184,14 +184,11 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { } pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...)) } - var ncpu int - info, err := container.GetHostInfo(ctx) - if err != nil { - log.Errorf("failed to obtain container engine info: %s", err) - ncpu = 1 // sane default? - } else { - ncpu = info.NCPU + ncpu := runtime.NumCPU() + if 1 > ncpu { + ncpu = 1 } + log.Debugf("Detected CPUs: %d", ncpu) return common.NewParallelExecutor(ncpu, pipeline...)(ctx) }) }