Skip to content

Commit

Permalink
Fix bug in processing jobs on platforms without Docker (nektos#1834)
Browse files Browse the repository at this point in the history
* Log incoming jobs.

Log the full contents of the job protobuf to make debugging jobs easier

* Ensure that the parallel executor always uses at least one thread.

The caller may mis-calculate the number of CPUs as zero, in which case
ensure that at least one thread is spawned.

* Use runtime.NumCPU for CPU counts.

For hosts without docker, GetHostInfo() returns a blank struct which
has zero CPUs and causes downstream trouble.

---------

Co-authored-by: Paul Armstrong <psa@users.noreply.gitea.com>
Co-authored-by: Jason Song <i@wolfogre.com>
  • Loading branch information
3 people authored Jun 6, 2023
1 parent c70a674 commit 3ac2b72
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
7 changes: 7 additions & 0 deletions pkg/common/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package common
import (
"context"
"fmt"

log "github.com/sirupsen/logrus"
)

// Warning that implements `error` but safe to ignore
Expand Down Expand Up @@ -94,6 +96,11 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor {
work := make(chan Executor, len(executors))
errs := make(chan error, len(executors))

if 1 > parallel {
log.Infof("Parallel tasks (%d) below minimum, setting to 1", parallel)
parallel = 1
}

for i := 0; i < parallel; i++ {
go func(work <-chan Executor, errs chan<- error) {
for executor := range work {
Expand Down
11 changes: 11 additions & 0 deletions pkg/common/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ func TestNewParallelExecutor(t *testing.T) {
assert.Equal(3, count, "should run all 3 executors")
assert.Equal(2, maxCount, "should run at most 2 executors in parallel")
assert.Nil(err)

// Reset to test running the executor with 0 parallelism
count = 0
activeCount = 0
maxCount = 0

errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)

assert.Equal(3, count, "should run all 3 executors")
assert.Equal(1, maxCount, "should run at most 1 executors in parallel")
assert.Nil(errSingle)
}

func TestNewParallelExecutorFailed(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/model/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func (j *Job) GetMatrixes() ([]map[string]interface{}, error) {
}
} else {
matrixes = append(matrixes, make(map[string]interface{}))
log.Debugf("Empty Strategy, matrixes=%v", matrixes)
}
return matrixes, nil
}
Expand Down
45 changes: 37 additions & 8 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"encoding/json"
"fmt"
"os"
"runtime"

log "github.com/sirupsen/logrus"

"github.com/nektos/act/pkg/common"
"github.com/nektos/act/pkg/container"
"github.com/nektos/act/pkg/model"
)

Expand Down Expand Up @@ -103,15 +103,45 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
maxJobNameLen := 0

stagePipeline := make([]common.Executor, 0)
log.Debugf("Plan Stages: %v", plan.Stages)

for i := range plan.Stages {
stage := plan.Stages[i]
stagePipeline = append(stagePipeline, func(ctx context.Context) error {
pipeline := make([]common.Executor, 0)
for _, run := range stage.Runs {
log.Debugf("Stages Runs: %v", stage.Runs)
stageExecutor := make([]common.Executor, 0)
job := run.Job()
log.Debugf("Job.Name: %v", job.Name)
log.Debugf("Job.RawNeeds: %v", job.RawNeeds)
log.Debugf("Job.RawRunsOn: %v", job.RawRunsOn)
log.Debugf("Job.Env: %v", job.Env)
log.Debugf("Job.If: %v", job.If)
for step := range job.Steps {
if nil != job.Steps[step] {
log.Debugf("Job.Steps: %v", job.Steps[step].String())
}
}
log.Debugf("Job.TimeoutMinutes: %v", job.TimeoutMinutes)
log.Debugf("Job.Services: %v", job.Services)
log.Debugf("Job.Strategy: %v", job.Strategy)
log.Debugf("Job.RawContainer: %v", job.RawContainer)
log.Debugf("Job.Defaults.Run.Shell: %v", job.Defaults.Run.Shell)
log.Debugf("Job.Defaults.Run.WorkingDirectory: %v", job.Defaults.Run.WorkingDirectory)
log.Debugf("Job.Outputs: %v", job.Outputs)
log.Debugf("Job.Uses: %v", job.Uses)
log.Debugf("Job.With: %v", job.With)
// log.Debugf("Job.RawSecrets: %v", job.RawSecrets)
log.Debugf("Job.Result: %v", job.Result)

if job.Strategy != nil {
log.Debugf("Job.Strategy.FailFast: %v", job.Strategy.FailFast)
log.Debugf("Job.Strategy.MaxParallel: %v", job.Strategy.MaxParallel)
log.Debugf("Job.Strategy.FailFastString: %v", job.Strategy.FailFastString)
log.Debugf("Job.Strategy.MaxParallelString: %v", job.Strategy.MaxParallelString)
log.Debugf("Job.Strategy.RawMatrix: %v", job.Strategy.RawMatrix)

strategyRc := runner.newRunContext(ctx, run, nil)
if err := strategyRc.NewExpressionEvaluator(ctx).EvaluateYamlNode(ctx, &job.Strategy.RawMatrix); err != nil {
log.Errorf("Error while evaluating matrix: %v", err)
Expand All @@ -122,6 +152,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
if m, err := job.GetMatrixes(); err != nil {
log.Errorf("Error while get job's matrix: %v", err)
} else {
log.Debugf("Job Matrices: %v", m)
log.Debugf("Runner Matrices: %v", runner.config.Matrix)
matrixes = selectMatrixes(m, runner.config.Matrix)
}
log.Debugf("Final matrix after applying user inclusions '%v'", matrixes)
Expand Down Expand Up @@ -152,14 +184,11 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
}
pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...))
}
var ncpu int
info, err := container.GetHostInfo(ctx)
if err != nil {
log.Errorf("failed to obtain container engine info: %s", err)
ncpu = 1 // sane default?
} else {
ncpu = info.NCPU
ncpu := runtime.NumCPU()
if 1 > ncpu {
ncpu = 1
}
log.Debugf("Detected CPUs: %d", ncpu)
return common.NewParallelExecutor(ncpu, pipeline...)(ctx)
})
}
Expand Down

0 comments on commit 3ac2b72

Please sign in to comment.