Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ignore concurrency if not specified #7182

Merged
merged 5 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions integration/testdata/diagnose/multi-config/diagnose.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ build:
dockerfile: Dockerfile
tagPolicy:
gitCommit: {}
local:
concurrency: 1
local: {}
deploy:
kubectl:
manifests:
Expand All @@ -31,8 +30,7 @@ build:
dockerfile: Dockerfile
tagPolicy:
gitCommit: {}
local:
concurrency: 1
local: {}
deploy:
kubectl:
manifests:
Expand All @@ -50,8 +48,7 @@ build:
dockerfile: Dockerfile
tagPolicy:
gitCommit: {}
local:
concurrency: 1
local: {}
deploy:
kubectl:
manifests:
Expand Down
3 changes: 1 addition & 2 deletions integration/testdata/diagnose/temp-config/diagnose.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ build:
dockerfile: Dockerfile
tagPolicy:
gitCommit: {}
local:
concurrency: 1
local: {}
deploy:
kubectl:
manifests:
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type PipelineBuilder interface {
PostBuild(ctx context.Context, out io.Writer) error

// Concurrency specifies the max number of builds that can run at any one time. If concurrency is 0, then all builds can run in parallel.
Concurrency() int
Concurrency() *int

// Prune removes images built in this pipeline
Prune(context.Context, io.Writer) error
Expand Down
69 changes: 41 additions & 28 deletions pkg/skaffold/build/builder_mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"reflect"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/graph"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/hooks"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/output/log"
Expand Down Expand Up @@ -51,38 +52,19 @@ type Config interface {
func NewBuilderMux(cfg Config, store ArtifactStore, builder func(p latestV1.Pipeline) (PipelineBuilder, error)) (*BuilderMux, error) {
pipelines := cfg.GetPipelines()
m := make(map[string]PipelineBuilder)
var pb []PipelineBuilder
minConcurrency := -1
for i, p := range pipelines {
var pbs []PipelineBuilder
for _, p := range pipelines {
b, err := builder(p)
if err != nil {
return nil, fmt.Errorf("creating builder: %w", err)
}
pb = append(pb, b)
pbs = append(pbs, b)
for _, a := range p.Build.Artifacts {
m[a.ImageName] = b
}

if cfg.BuildConcurrency() >= 0 {
minConcurrency = cfg.BuildConcurrency()
} else {
concurrency := b.Concurrency()
// set mux concurrency to be the minimum of all builders' concurrency. (concurrency = 0 means unlimited)
switch {
case minConcurrency < 0:
minConcurrency = concurrency
log.Entry(context.TODO()).Infof("build concurrency first set to %d parsed from %s[%d]", minConcurrency, reflect.TypeOf(b).String(), i)
case concurrency > 0 && (minConcurrency == 0 || concurrency < minConcurrency):
minConcurrency = concurrency
log.Entry(context.TODO()).Infof("build concurrency updated to %d parsed from %s[%d]", minConcurrency, reflect.TypeOf(b).String(), i)
default:
log.Entry(context.TODO()).Infof("build concurrency value %d parsed from %s[%d] is ignored since it's not less than previously set value %d", concurrency, reflect.TypeOf(b).String(), i, minConcurrency)
}
}
}
log.Entry(context.TODO()).Infof("final build concurrency value is %d", minConcurrency)

return &BuilderMux{builders: pb, byImageName: m, store: store, concurrency: minConcurrency}, nil
concurrency := getConcurrency(pbs, cfg.BuildConcurrency())
return &BuilderMux{builders: pbs, byImageName: m, store: store, concurrency: concurrency}, nil
}

// Build executes the specific image builder for each artifact in the given artifact slice.
Expand All @@ -98,7 +80,7 @@ func (b *BuilderMux) Build(ctx context.Context, out io.Writer, tags tag.ImageTag
}
}

builder := func(ctx context.Context, out io.Writer, artifact *latestV1.Artifact, tag string, platforms platform.Matcher) (string, error) {
builderF := func(ctx context.Context, out io.Writer, artifact *latestV1.Artifact, tag string, platforms platform.Matcher) (string, error) {
p := b.byImageName[artifact.ImageName]
pl, err := filterBuildEnvSupportedPlatforms(p.SupportedPlatforms(), platforms)
if err != nil {
Expand All @@ -124,14 +106,14 @@ func (b *BuilderMux) Build(ctx context.Context, out io.Writer, tags tag.ImageTag
}
return built, nil
}
ar, err := InOrder(ctx, out, tags, resolver, artifacts, builder, b.concurrency, b.store)
ar, err := InOrder(ctx, out, tags, resolver, artifacts, builderF, b.concurrency, b.store)
if err != nil {
return nil, err
}

for builder := range m {
if err := builder.PostBuild(ctx, out); err != nil {
return nil, err
if errB := builder.PostBuild(ctx, out); errB != nil {
return nil, errB
}
}

Expand Down Expand Up @@ -159,3 +141,34 @@ func filterBuildEnvSupportedPlatforms(supported platform.Matcher, target platfor
}
return pl, nil
}

func getConcurrency(pbs []PipelineBuilder, cliConcurrency int) int {
if cliConcurrency >= 0 {
log.Entry(context.TODO()).Infof("build concurrency set to cli concurrency %d", cliConcurrency)
tejal29 marked this conversation as resolved.
Show resolved Hide resolved
return cliConcurrency
}
minConcurrency := -1
for i, b := range pbs {
if b.Concurrency() == nil {
continue
}
concurrency := *b.Concurrency()
// set mux concurrency to be the minimum of all builders' concurrency. (concurrency = 0 means unlimited)
switch {
case minConcurrency < 0:
minConcurrency = concurrency
log.Entry(context.TODO()).Infof("build concurrency first set to %d parsed from %s[%d]", minConcurrency, reflect.TypeOf(b).String(), i)
case concurrency > 0 && (minConcurrency == 0 || concurrency < minConcurrency):
minConcurrency = concurrency
log.Entry(context.TODO()).Infof("build concurrency updated to %d parsed from %s[%d]", minConcurrency, reflect.TypeOf(b).String(), i)
default:
log.Entry(context.TODO()).Infof("build concurrency value %d parsed from %s[%d] is ignored since it's not less than previously set value %d", concurrency, reflect.TypeOf(b).String(), i, minConcurrency)
}
}
if minConcurrency < 0 {
log.Entry(context.TODO()).Infof("build concurrency set to default value of %d", minConcurrency)
return constants.DefaultLocalConcurrency // set default concurrency to 1.
tejal29 marked this conversation as resolved.
Show resolved Hide resolved
}
log.Entry(context.TODO()).Infof("final build concurrency value is %d", minConcurrency)
return minConcurrency
}
79 changes: 70 additions & 9 deletions pkg/skaffold/build/builder_mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,71 @@ func TestNewBuilderMux(t *testing.T) {
}
}

func TestGetConcurrency(t *testing.T) {
tests := []struct {
description string
pbs []PipelineBuilder
cliConcurrency int
expectedConcurrency int
}{
{
description: "default concurrency - builder and cli concurrency unset.",
pbs: []PipelineBuilder{
&mockPipelineBuilder{nil, "local"},
&mockPipelineBuilder{nil, "gcb"},
tejal29 marked this conversation as resolved.
Show resolved Hide resolved
},
cliConcurrency: -1,
expectedConcurrency: 1,
},
{
description: "builder concurrency set to less cli concurrency",
tejal29 marked this conversation as resolved.
Show resolved Hide resolved
pbs: []PipelineBuilder{
&mockPipelineBuilder{util.IntPtr(1), "local"},
&mockPipelineBuilder{util.IntPtr(1), "local"},
&mockPipelineBuilder{nil, "gcb"},
},
cliConcurrency: 2,
expectedConcurrency: 2,
},
{
description: "builder concurrency set to 0 and cli concurrency set to 1",
pbs: []PipelineBuilder{
// build all in parallel
&mockPipelineBuilder{util.IntPtr(0), "local"},
&mockPipelineBuilder{nil, "gcb"},
},
cliConcurrency: 1,
expectedConcurrency: 1,
},
{
description: "builder concurrency set to 0 and cli concurrency unset",
pbs: []PipelineBuilder{
// build all in parallel
&mockPipelineBuilder{util.IntPtr(0), "local"},
&mockPipelineBuilder{nil, "gcb"},
},
cliConcurrency: -1,
expectedConcurrency: 0,
},
{
description: "min non-zero concurrency",
pbs: []PipelineBuilder{
&mockPipelineBuilder{util.IntPtr(0), "local"},
&mockPipelineBuilder{nil, "gcb"},
&mockPipelineBuilder{util.IntPtr(2), "gcb"},
},
cliConcurrency: -1,
expectedConcurrency: 2,
},
tejal29 marked this conversation as resolved.
Show resolved Hide resolved
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
actual := getConcurrency(test.pbs, test.cliConcurrency)
t.CheckDeepEqual(test.expectedConcurrency, actual)
})
}
}

type mockConfig struct {
pipelines []latestV1.Pipeline
optRepo string
Expand All @@ -109,7 +174,7 @@ func (m *mockConfig) MultiLevelRepo() *bool { return nil }
func (m *mockConfig) BuildConcurrency() int { return -1 }

type mockPipelineBuilder struct {
concurrency int
concurrency *int
builderType string
}

Expand All @@ -121,7 +186,7 @@ func (m *mockPipelineBuilder) Build(ctx context.Context, out io.Writer, artifact

func (m *mockPipelineBuilder) PostBuild(ctx context.Context, out io.Writer) error { return nil }

func (m *mockPipelineBuilder) Concurrency() int { return m.concurrency }
func (m *mockPipelineBuilder) Concurrency() *int { return m.concurrency }

func (m *mockPipelineBuilder) Prune(context.Context, io.Writer) error { return nil }

Expand All @@ -132,15 +197,11 @@ func (m *mockPipelineBuilder) SupportedPlatforms() platform.Matcher { return pla
func newMockPipelineBuilder(p latestV1.Pipeline) (PipelineBuilder, error) {
switch {
case p.Build.BuildType.LocalBuild != nil:
c := 0
if p.Build.LocalBuild.Concurrency != nil {
c = *p.Build.LocalBuild.Concurrency
}
return &mockPipelineBuilder{builderType: "local", concurrency: c}, nil
return &mockPipelineBuilder{builderType: "local", concurrency: p.Build.LocalBuild.Concurrency}, nil
case p.Build.BuildType.Cluster != nil:
return &mockPipelineBuilder{builderType: "cluster", concurrency: p.Build.Cluster.Concurrency}, nil
return &mockPipelineBuilder{builderType: "cluster", concurrency: util.IntPtr(p.Build.Cluster.Concurrency)}, nil
case p.Build.BuildType.GoogleCloudBuild != nil:
return &mockPipelineBuilder{builderType: "gcb", concurrency: p.Build.GoogleCloudBuild.Concurrency}, nil
return &mockPipelineBuilder{builderType: "gcb", concurrency: util.IntPtr(p.Build.GoogleCloudBuild.Concurrency)}, nil
default:
return nil, errors.New("invalid config")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/skaffold/build/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func (b *Builder) buildArtifact(ctx context.Context, out io.Writer, artifact *la
return build.TagWithDigest(tag, digest), nil
}

func (b *Builder) Concurrency() int {
return b.ClusterDetails.Concurrency
func (b *Builder) Concurrency() *int {
return util.IntPtr(b.ClusterDetails.Concurrency)
}

func (b *Builder) runBuildForArtifact(ctx context.Context, out io.Writer, a *latestV1.Artifact, tag string, platforms platform.Matcher) (string, error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/skaffold/build/gcb/cloud_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/platform"
latestV1 "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest/v1"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/sources"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/GoogleContainerTools/skaffold/proto/v1"
)

Expand All @@ -65,8 +66,8 @@ func (b *Builder) PostBuild(_ context.Context, _ io.Writer) error {
return nil
}

func (b *Builder) Concurrency() int {
return b.GoogleCloudBuild.Concurrency
func (b *Builder) Concurrency() *int {
return util.IntPtr(b.GoogleCloudBuild.Concurrency)
}

func (b *Builder) buildArtifactWithCloudBuild(ctx context.Context, out io.Writer, artifact *latestV1.Artifact, tag string, platform platform.Matcher) (string, error) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/skaffold/build/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,7 @@ func (b *Builder) PostBuild(ctx context.Context, _ io.Writer) error {
return nil
}

func (b *Builder) Concurrency() int {
if b.local.Concurrency == nil {
return 0
}
return *b.local.Concurrency
}
func (b *Builder) Concurrency() *int { return b.local.Concurrency }

func (b *Builder) PushImages() bool {
return b.pushImages
Expand Down
7 changes: 1 addition & 6 deletions pkg/skaffold/parser/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,11 @@ func createCfg(name string, imageName string, workspace string, requires []lates
Artifacts: []*latestV1.Artifact{{ImageName: imageName, ArtifactType: latestV1.ArtifactType{
DockerArtifact: &latestV1.DockerArtifact{DockerfilePath: "Dockerfile"}}, Workspace: workspace}}, TagPolicy: latestV1.TagPolicy{
GitTagger: &latestV1.GitTagger{}}, BuildType: latestV1.BuildType{
LocalBuild: &latestV1.LocalBuild{Concurrency: concurrency()},
LocalBuild: &latestV1.LocalBuild{},
}}, Deploy: latestV1.DeployConfig{Logs: latestV1.LogsConfig{Prefix: "container"}}},
}
}

func concurrency() *int {
c := 1
return &c
}

type document struct {
path string
configs []mockCfg
Expand Down
21 changes: 0 additions & 21 deletions pkg/skaffold/schema/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ func Set(c *latestV1.SkaffoldConfig) error {
}
}

withLocalBuild(c, func(lb *latestV1.LocalBuild) {
// don't set build concurrency if there are no artifacts in the current config
if len(c.Build.Artifacts) > 0 {
setDefaultConcurrency(lb)
}
})

withCloudBuildConfig(c,
setDefaultCloudBuildDockerImage,
setDefaultCloudBuildMavenImage,
Expand Down Expand Up @@ -136,20 +129,6 @@ func defaultToKubectlDeploy(c *latestV1.SkaffoldConfig) {
c.Deploy.DeployType.KubectlDeploy = &latestV1.KubectlDeploy{}
}

func withLocalBuild(c *latestV1.SkaffoldConfig, operations ...func(*latestV1.LocalBuild)) {
if local := c.Build.LocalBuild; local != nil {
for _, operation := range operations {
operation(local)
}
}
}

func setDefaultConcurrency(local *latestV1.LocalBuild) {
if local.Concurrency == nil {
local.Concurrency = &constants.DefaultLocalConcurrency
}
}

func withCloudBuildConfig(c *latestV1.SkaffoldConfig, operations ...func(*latestV1.GoogleCloudBuild)) {
if gcb := c.Build.GoogleCloudBuild; gcb != nil {
for _, operation := range operations {
Expand Down
1 change: 0 additions & 1 deletion pkg/skaffold/schema/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ func TestSetDefaultsOnLocalBuild(t *testing.T) {
err = Set(cfg2)
testutil.CheckError(t, false, err)
SetDefaultDeployer(cfg2)
testutil.CheckDeepEqual(t, 1, *cfg2.Build.LocalBuild.Concurrency)
}

func TestSetPortForwardLocalPort(t *testing.T) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/skaffold/schema/versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,6 @@ func withLocalBuild(ops ...func(*latestV1.BuildConfig)) func(*latestV1.SkaffoldC
for _, op := range ops {
op(&b)
}
if len(b.Artifacts) > 0 {
b.LocalBuild.Concurrency = &constants.DefaultLocalConcurrency
}
cfg.Build = b
}
}
Expand Down