Skip to content

Commit

Permalink
frontend: expose workers as a configurable option
Browse files Browse the repository at this point in the history
Signed-off-by: mikcl <mikesmikes400@gmail.com>
  • Loading branch information
Mikcl committed Dec 7, 2022
1 parent 7f0c257 commit 1c7490f
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 25 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ file-metadata:
# SYFT_FILE_METADATA_DIGESTS env var
digests: ["sha256"]

# maximum number of workers used to process the list of package catalogers in parallel
parallelism: 1

# cataloging secrets is exposed through the power-user subcommand
secrets:
cataloger:
Expand Down
13 changes: 13 additions & 0 deletions cmd/syft/cli/options/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type PackagesOptions struct {
Exclude []string
Catalogers []string
Name string
Parallelism int
}

var _ Interface = (*PackagesOptions)(nil)
Expand Down Expand Up @@ -51,6 +52,14 @@ func (o *PackagesOptions) AddFlags(cmd *cobra.Command, v *viper.Viper) error {
cmd.Flags().StringVarP(&o.Name, "name", "", "",
"set the name of the target being analyzed")

cmd.Flags().IntVarP(&o.Parallelism, "parallelism", "", 1,
"number of workers to use to process the catalogers in parallel")

// Lets not expose this as a cli option
if err := cmd.Flags().MarkHidden("parallelism"); err != nil {
return err
}

return bindPackageConfigOptions(cmd.Flags(), v)
}

Expand Down Expand Up @@ -89,5 +98,9 @@ func bindPackageConfigOptions(flags *pflag.FlagSet, v *viper.Viper) error {
return err
}

if err := v.BindPFlag("parallelism", flags.Lookup("parallelism")); err != nil {
return err
}

return nil
}
4 changes: 3 additions & 1 deletion internal/config/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Application struct {
Attest attest `yaml:"attest" json:"attest" mapstructure:"attest"`
Platform string `yaml:"platform" json:"platform" mapstructure:"platform"`
Name string `yaml:"name" json:"name" mapstructure:"name"`
Parallelism int `yaml:"parallelism" json:"parallelism" mapstructure:"parallelism"` // --parallelism the number of catalog workers to run in parallel
}

func (cfg Application) ToCatalogerConfig() cataloger.Config {
Expand All @@ -65,7 +66,8 @@ func (cfg Application) ToCatalogerConfig() cataloger.Config {
IncludeUnindexedArchives: cfg.Package.SearchUnindexedArchives,
Scope: cfg.Package.Cataloger.ScopeOpt,
},
Catalogers: cfg.Catalogers,
Catalogers: cfg.Catalogers,
Parallelism: cfg.Parallelism,
}
}

Expand Down
2 changes: 1 addition & 1 deletion syft/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func CatalogPackages(src *source.Source, cfg cataloger.Config) (*pkg.Catalog, []
}
}

catalog, relationships, err := cataloger.Catalog(resolver, release, catalogers...)
catalog, relationships, err := cataloger.Catalog(resolver, release, cfg.Parallelism, catalogers...)
if err != nil {
return nil, nil, nil, err
}
Expand Down
59 changes: 40 additions & 19 deletions syft/pkg/cataloger/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cataloger

import (
"fmt"
"math"
"sync"

"github.com/hashicorp/go-multierror"
Expand All @@ -24,11 +25,13 @@ type Monitor struct {
PackagesDiscovered progress.Monitorable // the number of packages discovered from all registered catalogers
}

// CatalogResult provides the result of running a single cataloger against source
type CatalogResult struct {
// catalogResult provides the result of running a single cataloger against source
type catalogResult struct {
Packages []pkg.Package
Relationships []artifact.Relationship
Error error
// Discovered may sometimes be more than len(packages)
Discovered int64
Error error
}

// newMonitor creates a new Monitor object and publishes the object on the bus as a PackageCatalogerStarted event.
Expand All @@ -46,23 +49,21 @@ func newMonitor() (*progress.Manual, *progress.Manual) {
return &filesProcessed, &packagesDiscovered
}

func runCataloger(cataloger pkg.Cataloger, resolver source.FileResolver, results chan CatalogResult, progress *progress.Manual) {
catalogerResult := new(CatalogResult)
func runCataloger(cataloger pkg.Cataloger, resolver source.FileResolver) (*catalogResult, error) {
catalogerResult := new(catalogResult)

// find packages from the underlying raw data
log.Debugf("cataloging with %q", cataloger.Name())
packages, relationships, err := cataloger.Catalog(resolver)
if err != nil {
catalogerResult.Error = err
results <- *catalogerResult
log.Debugf("cataloger=%q error in handling", cataloger.Name())
return
return catalogerResult, err
}

catalogedPackages := len(packages)

log.Debugf("cataloger=%q discovered %d packages", cataloger.Name(), catalogedPackages)
progress.N += int64(catalogedPackages)
catalogerResult.Discovered = int64(catalogedPackages)

for _, p := range packages {
// generate CPEs (note: this is excluded from package ID, so is safe to mutate)
Expand All @@ -86,52 +87,72 @@ func runCataloger(cataloger pkg.Cataloger, resolver source.FileResolver, results
catalogerResult.Packages = append(catalogerResult.Packages, p)
}
catalogerResult.Relationships = append(catalogerResult.Relationships, relationships...)
results <- *catalogerResult
log.Debugf("cataloger=%q done handling", cataloger.Name())
return catalogerResult, nil
}

// Catalog a given source (container image or filesystem) with the given catalogers, returning all discovered packages.
// In order to efficiently retrieve contents from a underlying container image the content fetch requests are
// done in bulk. Specifically, all files of interest are collected from each catalogers and accumulated into a single
// request.
func Catalog(resolver source.FileResolver, release *linux.Release, catalogers ...pkg.Cataloger) (*pkg.Catalog, []artifact.Relationship, error) {
//
//nolint:funlen
func Catalog(resolver source.FileResolver, release *linux.Release, parallelism int, catalogers ...pkg.Cataloger) (*pkg.Catalog, []artifact.Relationship, error) {
catalog := pkg.NewCatalog()
var allRelationships []artifact.Relationship
filesProcessed, packagesDiscovered := newMonitor()
// perform analysis, accumulating errors for each failed analysis
var errs error

// TODO - expose workers as a flag to the cli
workers := 1
nCatalogers := len(catalogers)

jobs := make(chan pkg.Cataloger, len(catalogers))
results := make(chan CatalogResult, len(catalogers)+1)
// we do not need more parallelism than there are `catalogers`.
parallelism = int(math.Min(float64(nCatalogers), math.Max(1.0, float64(parallelism))))
log.Debugf("Using parallelism=%d for catalogs=%d", parallelism, nCatalogers)

jobs := make(chan pkg.Cataloger, nCatalogers)
results := make(chan *catalogResult, nCatalogers)
discoveredPackages := make(chan int64, nCatalogers)

waitGroup := sync.WaitGroup{}

for catalogWorkerIdx := 0; catalogWorkerIdx < workers; catalogWorkerIdx++ {
for i := 0; i < parallelism; i++ {
waitGroup.Add(1)

go func() {
defer waitGroup.Done()

// run each job
// wait for / get the next cataloger job available.
for cataloger := range jobs {
runCataloger(cataloger, resolver, results, packagesDiscovered)
catalogResult, err := runCataloger(cataloger, resolver)

// ensure we set the error to be aggregated
catalogResult.Error = err

discoveredPackages <- catalogResult.Discovered

results <- catalogResult
}
}()
}

// dynamically show updated discovered package status
go func() {
for discovered := range discoveredPackages {
packagesDiscovered.N += discovered
}
}()

// Enqueue the jobs
for _, cataloger := range catalogers {
jobs <- cataloger
}
close(jobs)


// Wait for the jobs to finish
waitGroup.Wait()
close(results)
close(discoveredPackages)

// collect the results
for catalogResult := range results {
Expand Down
8 changes: 5 additions & 3 deletions syft/pkg/cataloger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
)

type Config struct {
Search SearchConfig
Catalogers []string
Search SearchConfig
Catalogers []string
Parallelism int
}

func DefaultConfig() Config {
return Config{
Search: DefaultSearchConfig(),
Search: DefaultSearchConfig(),
Parallelism: 1,
}
}

Expand Down
22 changes: 22 additions & 0 deletions test/cli/packages_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,28 @@ func TestPackagesCmdFlags(t *testing.T) {
assertSuccessfulReturnCode,
},
},
{
name: "override-default-parallelism",
args: []string{"packages", "-vvv", "-o", "json", "--parallelism", "2", coverageImage},
assertions: []traitAssertion{
// the application config in the log matches that of what we expect to have been configured.
assertInOutput("parallelism: 2"),
assertInOutput("Using parallelism=2 for catalogs"),
assertPackageCount(34),
assertSuccessfulReturnCode,
},
},
{
name: "default-parallelism",
args: []string{"packages", "-vvv", "-o", "json", coverageImage},
assertions: []traitAssertion{
// the application config in the log matches that of what we expect to have been configured.
assertInOutput("parallelism: 1"),
assertInOutput("Using parallelism=1 for catalogs"),
assertPackageCount(34),
assertSuccessfulReturnCode,
},
},
}

for _, test := range tests {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/catalog_packages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func BenchmarkImagePackageCatalogers(b *testing.B) {

b.Run(c.Name(), func(b *testing.B) {
for i := 0; i < b.N; i++ {
pc, _, err = cataloger.Catalog(resolver, theDistro, c)
pc, _, err = cataloger.Catalog(resolver, theDistro, 1, c)
if err != nil {
b.Fatalf("failure during benchmark: %+v", err)
}
Expand Down

0 comments on commit 1c7490f

Please sign in to comment.