Skip to content

Commit

Permalink
Merge pull request #931 from jonjohnsonjr/concurrency
Browse files Browse the repository at this point in the history
Use errgroup over github.com/korovkin/limiter
  • Loading branch information
luhring authored Jan 16, 2024
2 parents cd70df2 + 4735458 commit 15b714f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 61 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/klauspost/compress v1.17.4
github.com/klauspost/pgzip v1.2.6
github.com/korovkin/limiter v0.0.0-20230307205149-3d4b2b34c99d
github.com/kubescape/go-git-url v0.0.26
github.com/lima-vm/lima v0.19.1
github.com/opencontainers/image-spec v1.1.0-rc5
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,6 @@ github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/korovkin/limiter v0.0.0-20230307205149-3d4b2b34c99d h1:7CfsXfFpCG1wrUpuyOzG8+vpL1ZqH2goz23wZ9pboGE=
github.com/korovkin/limiter v0.0.0-20230307205149-3d4b2b34c99d/go.mod h1:3NeYeWwAOTnDChps1fD7YGD/uWzp+tqmShgjhhMIHDM=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
Expand Down
35 changes: 11 additions & 24 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ import (
"os"
"path/filepath"
"strings"
"sync"

apko_log "chainguard.dev/apko/pkg/log"
apkrepo "github.com/chainguard-dev/go-apk/pkg/apk"
sign "github.com/chainguard-dev/go-apk/pkg/signature"
"github.com/korovkin/limiter"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup"
)

type Index struct {
Expand Down Expand Up @@ -156,46 +155,34 @@ func (idx *Index) LoadIndex(sourceFile string) error {

func (idx *Index) UpdateIndex(ctx context.Context) error {
packages := make([]*apkrepo.Package, len(idx.PackageFiles))
var mtx sync.Mutex

g := limiter.NewConcurrencyLimiterForIO(limiter.DefaultConcurrencyLimitIO)

var g errgroup.Group
g.SetLimit(4)
for i, apkFile := range idx.PackageFiles {
i, apkFile := i, apkFile // capture the loop variables
if _, err := g.Execute(func() {
g.Go(func() error {
idx.Logger.Printf("processing package %s", apkFile)
f, err := os.Open(apkFile)
if err != nil {
// nolint:errcheck
g.FirstErrorStore(fmt.Errorf("failed to open package %s: %w", apkFile, err))
return
return fmt.Errorf("failed to open package %s: %w", apkFile, err)
}
defer f.Close()
pkg, err := apkrepo.ParsePackage(ctx, f)
if err != nil {
// nolint:errcheck
g.FirstErrorStore(fmt.Errorf("failed to parse package %s: %w", apkFile, err))
return
return fmt.Errorf("failed to parse package %s: %w", apkFile, err)
}

if idx.ExpectedArch != "" && pkg.Arch != idx.ExpectedArch {
idx.Logger.Printf("WARNING: %s-%s: found unexpected architecture %s, expecting %s",
pkg.Name, pkg.Version, pkg.Arch, idx.ExpectedArch)
return
return nil
}

mtx.Lock()
packages[i] = pkg
mtx.Unlock()
}); err != nil {
return fmt.Errorf("executing processor function: %w", err)
}
}
if err := g.WaitAndClose(); err != nil {
return err
}

if err := g.FirstErrorGet(); err != nil {
return nil
})
}
if err := g.Wait(); err != nil {
return err
}

Expand Down
51 changes: 17 additions & 34 deletions pkg/sbom/implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
"regexp"
"sort"
"strings"
"sync"
"time"
"unicode/utf8"

"github.com/korovkin/limiter"
purl "github.com/package-url/packageurl-go"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"sigs.k8s.io/release-utils/hash"
Expand Down Expand Up @@ -117,13 +117,14 @@ func ScanFiles(spec *Spec, dirPackage *pkg) error {

dirPackage.FilesAnalyzed = true

g := limiter.NewConcurrencyLimiterForIO(limiter.DefaultConcurrencyLimitIO)
files := sync.Map{}
for _, path := range fileList {
path := path
var g errgroup.Group
g.SetLimit(4)

// nolint:errcheck
g.Execute(func() {
files := make([]file, len(fileList))
for i, path := range fileList {
i, path := i, path

g.Go(func() error {
f := file{
id: stringToIdentifier(path),
Name: strings.TrimPrefix(path, "/"),
Expand All @@ -139,52 +140,34 @@ func ScanFiles(spec *Spec, dirPackage *pkg) error {
} {
csum, err := fn(filepath.Join(dirPath, path))
if err != nil {
// nolint:errcheck
g.FirstErrorStore(fmt.Errorf("hashing %s file %s: %w", algo, path, err))
return fmt.Errorf("hashing %s file %s: %w", algo, path, err)
}
f.Checksums[algo] = csum
}

files.Store(path, f)
files[i] = f
return nil
})
}

if err := g.WaitAndClose(); err != nil {
return fmt.Errorf("waiting for limiter to finish: %w", err)
}

if err := g.FirstErrorGet(); err != nil {
if err := g.Wait(); err != nil {
return err
}

// Sort the resulting dataset to ensure deterministic order
// to ensure builds are reproducible.
pathList := []string{}
files.Range(func(key, _ any) bool {
pathList = append(pathList, key.(string))
return true
slices.SortFunc(files, func(a, b file) int {
return strings.Compare(a.Name, b.Name)
})

sort.Strings(pathList)

// Add files into the package
for _, path := range pathList {
for _, f := range files {
rel := relationship{
Source: dirPackage,
Type: "CONTAINS",
}

f, ok := files.Load(path)
if !ok {
continue
}

switch v := f.(type) {
case file:
rel.Target = &v
case pkg:
rel.Target = &v
}
rel.Target = &f

dirPackage.Relationships = append(dirPackage.Relationships, rel)
}
Expand Down

0 comments on commit 15b714f

Please sign in to comment.