Skip to content

Commit

Permalink
fix: ignore concurrency if not specified (#7182)
Browse files Browse the repository at this point in the history
* fix: ignore concurrency if not specified

* fix: separte getConcurrency and add tests for special case

* fix default when concurrency is not specified

* docs: add more code docs and better test cases

* address code review comments:

Co-authored-by: tejal29 <tejal29@gmail.com>
  • Loading branch information
gsquared94 and tejal29 authored Mar 15, 2022
1 parent 10e2d34 commit 59f488e
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 87 deletions.
9 changes: 3 additions & 6 deletions integration/testdata/diagnose/multi-config/diagnose.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ build:
dockerfile: Dockerfile
tagPolicy:
gitCommit: {}
local:
concurrency: 1
local: {}
deploy:
kubectl:
manifests:
Expand All @@ -31,8 +30,7 @@ build:
dockerfile: Dockerfile
tagPolicy:
gitCommit: {}
local:
concurrency: 1
local: {}
deploy:
kubectl:
manifests:
Expand All @@ -50,8 +48,7 @@ build:
dockerfile: Dockerfile
tagPolicy:
gitCommit: {}
local:
concurrency: 1
local: {}
deploy:
kubectl:
manifests:
Expand Down
3 changes: 1 addition & 2 deletions integration/testdata/diagnose/temp-config/diagnose.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ build:
dockerfile: Dockerfile
tagPolicy:
gitCommit: {}
local:
concurrency: 1
local: {}
deploy:
kubectl:
manifests:
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type PipelineBuilder interface {
PostBuild(ctx context.Context, out io.Writer) error

// Concurrency specifies the max number of builds that can run at any one time. If concurrency is 0, then all builds can run in parallel.
Concurrency() int
Concurrency() *int

// Prune removes images built in this pipeline
Prune(context.Context, io.Writer) error
Expand Down
71 changes: 43 additions & 28 deletions pkg/skaffold/build/builder_mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"reflect"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/graph"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/hooks"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/output/log"
Expand Down Expand Up @@ -51,38 +52,19 @@ type Config interface {
func NewBuilderMux(cfg Config, store ArtifactStore, builder func(p latestV1.Pipeline) (PipelineBuilder, error)) (*BuilderMux, error) {
pipelines := cfg.GetPipelines()
m := make(map[string]PipelineBuilder)
var pb []PipelineBuilder
minConcurrency := -1
for i, p := range pipelines {
var pbs []PipelineBuilder
for _, p := range pipelines {
b, err := builder(p)
if err != nil {
return nil, fmt.Errorf("creating builder: %w", err)
}
pb = append(pb, b)
pbs = append(pbs, b)
for _, a := range p.Build.Artifacts {
m[a.ImageName] = b
}

if cfg.BuildConcurrency() >= 0 {
minConcurrency = cfg.BuildConcurrency()
} else {
concurrency := b.Concurrency()
// set mux concurrency to be the minimum of all builders' concurrency. (concurrency = 0 means unlimited)
switch {
case minConcurrency < 0:
minConcurrency = concurrency
log.Entry(context.TODO()).Infof("build concurrency first set to %d parsed from %s[%d]", minConcurrency, reflect.TypeOf(b).String(), i)
case concurrency > 0 && (minConcurrency == 0 || concurrency < minConcurrency):
minConcurrency = concurrency
log.Entry(context.TODO()).Infof("build concurrency updated to %d parsed from %s[%d]", minConcurrency, reflect.TypeOf(b).String(), i)
default:
log.Entry(context.TODO()).Infof("build concurrency value %d parsed from %s[%d] is ignored since it's not less than previously set value %d", concurrency, reflect.TypeOf(b).String(), i, minConcurrency)
}
}
}
log.Entry(context.TODO()).Infof("final build concurrency value is %d", minConcurrency)

return &BuilderMux{builders: pb, byImageName: m, store: store, concurrency: minConcurrency}, nil
concurrency := getConcurrency(pbs, cfg.BuildConcurrency())
return &BuilderMux{builders: pbs, byImageName: m, store: store, concurrency: concurrency}, nil
}

// Build executes the specific image builder for each artifact in the given artifact slice.
Expand All @@ -98,7 +80,7 @@ func (b *BuilderMux) Build(ctx context.Context, out io.Writer, tags tag.ImageTag
}
}

builder := func(ctx context.Context, out io.Writer, artifact *latestV1.Artifact, tag string, platforms platform.Matcher) (string, error) {
builderF := func(ctx context.Context, out io.Writer, artifact *latestV1.Artifact, tag string, platforms platform.Matcher) (string, error) {
p := b.byImageName[artifact.ImageName]
pl, err := filterBuildEnvSupportedPlatforms(p.SupportedPlatforms(), platforms)
if err != nil {
Expand All @@ -124,14 +106,14 @@ func (b *BuilderMux) Build(ctx context.Context, out io.Writer, tags tag.ImageTag
}
return built, nil
}
ar, err := InOrder(ctx, out, tags, resolver, artifacts, builder, b.concurrency, b.store)
ar, err := InOrder(ctx, out, tags, resolver, artifacts, builderF, b.concurrency, b.store)
if err != nil {
return nil, err
}

for builder := range m {
if err := builder.PostBuild(ctx, out); err != nil {
return nil, err
if errB := builder.PostBuild(ctx, out); errB != nil {
return nil, errB
}
}

Expand Down Expand Up @@ -159,3 +141,36 @@ func filterBuildEnvSupportedPlatforms(supported platform.Matcher, target platfor
}
return pl, nil
}

func getConcurrency(pbs []PipelineBuilder, cliConcurrency int) int {
ctx := context.TODO()
if cliConcurrency >= 0 {
log.Entry(ctx).Infof("build concurrency set to cli concurrency %d", cliConcurrency)
return cliConcurrency
}
minConcurrency := -1
for i, b := range pbs {
concurrency := 1
if b.Concurrency() != nil {
concurrency = *b.Concurrency()
}
// set mux concurrency to be the minimum of all builders' concurrency. (concurrency = 0 means unlimited)
switch {
case minConcurrency < 0:
minConcurrency = concurrency
log.Entry(ctx).Infof("build concurrency first set to %d parsed from %s[%d]", minConcurrency, reflect.TypeOf(b).String(), i)
case concurrency > 0 && (minConcurrency == 0 || concurrency < minConcurrency):
minConcurrency = concurrency
log.Entry(ctx).Infof("build concurrency updated to %d parsed from %s[%d]", minConcurrency, reflect.TypeOf(b).String(), i)
default:
log.Entry(ctx).Infof("build concurrency value %d parsed from %s[%d] is ignored since it's not less than previously set value %d", concurrency, reflect.TypeOf(b).String(), i, minConcurrency)
}
}
if minConcurrency < 0 {
log.Entry(ctx).Infof("build concurrency set to default value of %d", minConcurrency)
// set default concurrency to 1 for local builder. For GCB and Cluster build the default value is 0
return constants.DefaultLocalConcurrency
}
log.Entry(ctx).Infof("final build concurrency value is %d", minConcurrency)
return minConcurrency
}
101 changes: 92 additions & 9 deletions pkg/skaffold/build/builder_mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,93 @@ func TestNewBuilderMux(t *testing.T) {
}
}

func TestGetConcurrency(t *testing.T) {
tests := []struct {
description string
pbs []PipelineBuilder
cliConcurrency int
expectedConcurrency int
}{
{
description: "default concurrency - builder and cli concurrency unset.",
pbs: []PipelineBuilder{
&mockPipelineBuilder{concurrency: nil, builderType: "local"},
&mockPipelineBuilder{concurrency: nil, builderType: "gcb"},
},
cliConcurrency: -1,
expectedConcurrency: 1,
},
{
description: "builder concurrency set to less than cli concurrency",
pbs: []PipelineBuilder{
&mockPipelineBuilder{concurrency: util.IntPtr(1), builderType: "local"},
&mockPipelineBuilder{concurrency: util.IntPtr(1), builderType: "local"},
&mockPipelineBuilder{concurrency: nil, builderType: "gcb"},
},
cliConcurrency: 2,
expectedConcurrency: 2,
},
{
description: "builder concurrency set",
pbs: []PipelineBuilder{
&mockPipelineBuilder{concurrency: util.IntPtr(2), builderType: "local"},
&mockPipelineBuilder{concurrency: util.IntPtr(2), builderType: "local"},
// As per docs https://github.com/GoogleContainerTools/skaffold/blob/dbd18994955f5805e80c6354ed0fd424ec4d987b/pkg/skaffold/schema/v2beta26/config.go#L287
// nil concurrency defaults to 1
&mockPipelineBuilder{concurrency: nil, builderType: "local"},
},
cliConcurrency: -1,
expectedConcurrency: 1,
},
{
description: "builder concurrency set to 0 and cli concurrency set to 1",
pbs: []PipelineBuilder{
// build all in parallel
&mockPipelineBuilder{concurrency: util.IntPtr(0), builderType: "local"},
&mockPipelineBuilder{concurrency: util.IntPtr(0), builderType: "gcb"},
},
cliConcurrency: 1,
expectedConcurrency: 1,
},
{
description: "builder concurrency set to 0 and cli concurrency unset",
pbs: []PipelineBuilder{
// build all in parallel
&mockPipelineBuilder{concurrency: util.IntPtr(0), builderType: "local"},
&mockPipelineBuilder{concurrency: util.IntPtr(0), builderType: "gcb"},
},
cliConcurrency: -1,
expectedConcurrency: 0,
},
{
description: "builder concurrency set to default 0 for gcb",
pbs: []PipelineBuilder{
// build all in parallel
&mockPipelineBuilder{concurrency: util.IntPtr(0), builderType: "gcb"},
&mockPipelineBuilder{concurrency: util.IntPtr(0), builderType: "gcb"},
},
cliConcurrency: -1,
expectedConcurrency: 0,
},
{
description: "min non-zero concurrency",
pbs: []PipelineBuilder{
&mockPipelineBuilder{concurrency: util.IntPtr(0), builderType: "local"},
&mockPipelineBuilder{concurrency: util.IntPtr(3), builderType: "gcb"},
&mockPipelineBuilder{concurrency: util.IntPtr(2), builderType: "gcb"},
},
cliConcurrency: -1,
expectedConcurrency: 2,
},
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
actual := getConcurrency(test.pbs, test.cliConcurrency)
t.CheckDeepEqual(test.expectedConcurrency, actual)
})
}
}

type mockConfig struct {
pipelines []latestV1.Pipeline
optRepo string
Expand All @@ -109,7 +196,7 @@ func (m *mockConfig) MultiLevelRepo() *bool { return nil }
func (m *mockConfig) BuildConcurrency() int { return -1 }

type mockPipelineBuilder struct {
concurrency int
concurrency *int
builderType string
}

Expand All @@ -121,7 +208,7 @@ func (m *mockPipelineBuilder) Build(ctx context.Context, out io.Writer, artifact

func (m *mockPipelineBuilder) PostBuild(ctx context.Context, out io.Writer) error { return nil }

func (m *mockPipelineBuilder) Concurrency() int { return m.concurrency }
func (m *mockPipelineBuilder) Concurrency() *int { return m.concurrency }

func (m *mockPipelineBuilder) Prune(context.Context, io.Writer) error { return nil }

Expand All @@ -132,15 +219,11 @@ func (m *mockPipelineBuilder) SupportedPlatforms() platform.Matcher { return pla
func newMockPipelineBuilder(p latestV1.Pipeline) (PipelineBuilder, error) {
switch {
case p.Build.BuildType.LocalBuild != nil:
c := 0
if p.Build.LocalBuild.Concurrency != nil {
c = *p.Build.LocalBuild.Concurrency
}
return &mockPipelineBuilder{builderType: "local", concurrency: c}, nil
return &mockPipelineBuilder{builderType: "local", concurrency: p.Build.LocalBuild.Concurrency}, nil
case p.Build.BuildType.Cluster != nil:
return &mockPipelineBuilder{builderType: "cluster", concurrency: p.Build.Cluster.Concurrency}, nil
return &mockPipelineBuilder{builderType: "cluster", concurrency: util.IntPtr(p.Build.Cluster.Concurrency)}, nil
case p.Build.BuildType.GoogleCloudBuild != nil:
return &mockPipelineBuilder{builderType: "gcb", concurrency: p.Build.GoogleCloudBuild.Concurrency}, nil
return &mockPipelineBuilder{builderType: "gcb", concurrency: util.IntPtr(p.Build.GoogleCloudBuild.Concurrency)}, nil
default:
return nil, errors.New("invalid config")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/skaffold/build/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func (b *Builder) buildArtifact(ctx context.Context, out io.Writer, artifact *la
return build.TagWithDigest(tag, digest), nil
}

func (b *Builder) Concurrency() int {
return b.ClusterDetails.Concurrency
func (b *Builder) Concurrency() *int {
return util.IntPtr(b.ClusterDetails.Concurrency)
}

func (b *Builder) runBuildForArtifact(ctx context.Context, out io.Writer, a *latestV1.Artifact, tag string, platforms platform.Matcher) (string, error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/skaffold/build/gcb/cloud_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/platform"
latestV1 "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest/v1"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/sources"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/GoogleContainerTools/skaffold/proto/v1"
)

Expand All @@ -65,8 +66,8 @@ func (b *Builder) PostBuild(_ context.Context, _ io.Writer) error {
return nil
}

func (b *Builder) Concurrency() int {
return b.GoogleCloudBuild.Concurrency
func (b *Builder) Concurrency() *int {
return util.IntPtr(b.GoogleCloudBuild.Concurrency)
}

func (b *Builder) buildArtifactWithCloudBuild(ctx context.Context, out io.Writer, artifact *latestV1.Artifact, tag string, platform platform.Matcher) (string, error) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/skaffold/build/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,7 @@ func (b *Builder) PostBuild(ctx context.Context, _ io.Writer) error {
return nil
}

func (b *Builder) Concurrency() int {
if b.local.Concurrency == nil {
return 0
}
return *b.local.Concurrency
}
func (b *Builder) Concurrency() *int { return b.local.Concurrency }

func (b *Builder) PushImages() bool {
return b.pushImages
Expand Down
7 changes: 1 addition & 6 deletions pkg/skaffold/parser/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,11 @@ func createCfg(name string, imageName string, workspace string, requires []lates
Artifacts: []*latestV1.Artifact{{ImageName: imageName, ArtifactType: latestV1.ArtifactType{
DockerArtifact: &latestV1.DockerArtifact{DockerfilePath: "Dockerfile"}}, Workspace: workspace}}, TagPolicy: latestV1.TagPolicy{
GitTagger: &latestV1.GitTagger{}}, BuildType: latestV1.BuildType{
LocalBuild: &latestV1.LocalBuild{Concurrency: concurrency()},
LocalBuild: &latestV1.LocalBuild{},
}}, Deploy: latestV1.DeployConfig{Logs: latestV1.LogsConfig{Prefix: "container"}}},
}
}

func concurrency() *int {
c := 1
return &c
}

type document struct {
path string
configs []mockCfg
Expand Down
21 changes: 0 additions & 21 deletions pkg/skaffold/schema/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ func Set(c *latestV1.SkaffoldConfig) error {
}
}

withLocalBuild(c, func(lb *latestV1.LocalBuild) {
// don't set build concurrency if there are no artifacts in the current config
if len(c.Build.Artifacts) > 0 {
setDefaultConcurrency(lb)
}
})

withCloudBuildConfig(c,
setDefaultCloudBuildDockerImage,
setDefaultCloudBuildMavenImage,
Expand Down Expand Up @@ -136,20 +129,6 @@ func defaultToKubectlDeploy(c *latestV1.SkaffoldConfig) {
c.Deploy.DeployType.KubectlDeploy = &latestV1.KubectlDeploy{}
}

func withLocalBuild(c *latestV1.SkaffoldConfig, operations ...func(*latestV1.LocalBuild)) {
if local := c.Build.LocalBuild; local != nil {
for _, operation := range operations {
operation(local)
}
}
}

func setDefaultConcurrency(local *latestV1.LocalBuild) {
if local.Concurrency == nil {
local.Concurrency = &constants.DefaultLocalConcurrency
}
}

func withCloudBuildConfig(c *latestV1.SkaffoldConfig, operations ...func(*latestV1.GoogleCloudBuild)) {
if gcb := c.Build.GoogleCloudBuild; gcb != nil {
for _, operation := range operations {
Expand Down
Loading

0 comments on commit 59f488e

Please sign in to comment.