From 1c7490f5011d1443dc88a9541613a96f24e548d4 Mon Sep 17 00:00:00 2001 From: mikcl Date: Sat, 19 Nov 2022 22:04:27 +0000 Subject: [PATCH] frontend: expose workers as a configurable option Signed-off-by: mikcl --- README.md | 3 ++ cmd/syft/cli/options/packages.go | 13 +++++ internal/config/application.go | 4 +- syft/lib.go | 2 +- syft/pkg/cataloger/catalog.go | 59 +++++++++++++++-------- syft/pkg/cataloger/config.go | 8 +-- test/cli/packages_cmd_test.go | 22 +++++++++ test/integration/catalog_packages_test.go | 2 +- 8 files changed, 88 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 9c01f6d626e..01d47a99054 100644 --- a/README.md +++ b/README.md @@ -534,6 +534,9 @@ file-metadata: # SYFT_FILE_METADATA_DIGESTS env var digests: ["sha256"] +# maximum number of workers used to process the list of package catalogers in parallel +parallelism: 1 + # cataloging secrets is exposed through the power-user subcommand secrets: cataloger: diff --git a/cmd/syft/cli/options/packages.go b/cmd/syft/cli/options/packages.go index 0acdcdc2dbb..00e5d85c97d 100644 --- a/cmd/syft/cli/options/packages.go +++ b/cmd/syft/cli/options/packages.go @@ -22,6 +22,7 @@ type PackagesOptions struct { Exclude []string Catalogers []string Name string + Parallelism int } var _ Interface = (*PackagesOptions)(nil) @@ -51,6 +52,14 @@ func (o *PackagesOptions) AddFlags(cmd *cobra.Command, v *viper.Viper) error { cmd.Flags().StringVarP(&o.Name, "name", "", "", "set the name of the target being analyzed") + cmd.Flags().IntVarP(&o.Parallelism, "parallelism", "", 1, + "number of workers to use to process the catalogers in parallel") + + // Lets not expose this as a cli option + if err := cmd.Flags().MarkHidden("parallelism"); err != nil { + return err + } + return bindPackageConfigOptions(cmd.Flags(), v) } @@ -89,5 +98,9 @@ func bindPackageConfigOptions(flags *pflag.FlagSet, v *viper.Viper) error { return err } + if err := v.BindPFlag("parallelism", flags.Lookup("parallelism")); err != nil { + return err + } + return nil } diff --git a/internal/config/application.go b/internal/config/application.go index 2c38f1e98dc..553848e691e 100644 --- a/internal/config/application.go +++ b/internal/config/application.go @@ -56,6 +56,7 @@ type Application struct { Attest attest `yaml:"attest" json:"attest" mapstructure:"attest"` Platform string `yaml:"platform" json:"platform" mapstructure:"platform"` Name string `yaml:"name" json:"name" mapstructure:"name"` + Parallelism int `yaml:"parallelism" json:"parallelism" mapstructure:"parallelism"` // --parallelism the number of catalog workers to run in parallel } func (cfg Application) ToCatalogerConfig() cataloger.Config { @@ -65,7 +66,8 @@ func (cfg Application) ToCatalogerConfig() cataloger.Config { IncludeUnindexedArchives: cfg.Package.SearchUnindexedArchives, Scope: cfg.Package.Cataloger.ScopeOpt, }, - Catalogers: cfg.Catalogers, + Catalogers: cfg.Catalogers, + Parallelism: cfg.Parallelism, } } diff --git a/syft/lib.go b/syft/lib.go index 44261c38421..bfed4bb5b5f 100644 --- a/syft/lib.go +++ b/syft/lib.go @@ -69,7 +69,7 @@ func CatalogPackages(src *source.Source, cfg cataloger.Config) (*pkg.Catalog, [] } } - catalog, relationships, err := cataloger.Catalog(resolver, release, catalogers...) + catalog, relationships, err := cataloger.Catalog(resolver, release, cfg.Parallelism, catalogers...) if err != nil { return nil, nil, nil, err } diff --git a/syft/pkg/cataloger/catalog.go b/syft/pkg/cataloger/catalog.go index 07bfb18fb8e..81880d73aac 100644 --- a/syft/pkg/cataloger/catalog.go +++ b/syft/pkg/cataloger/catalog.go @@ -2,6 +2,7 @@ package cataloger import ( "fmt" + "math" "sync" "github.com/hashicorp/go-multierror" @@ -24,11 +25,13 @@ type Monitor struct { PackagesDiscovered progress.Monitorable // the number of packages discovered from all registered catalogers } -// CatalogResult provides the result of running a single cataloger against source -type CatalogResult struct { +// catalogResult provides the result of running a single cataloger against source +type catalogResult struct { Packages []pkg.Package Relationships []artifact.Relationship - Error error + // Discovered may sometimes be more than len(packages) + Discovered int64 + Error error } // newMonitor creates a new Monitor object and publishes the object on the bus as a PackageCatalogerStarted event. @@ -46,23 +49,21 @@ func newMonitor() (*progress.Manual, *progress.Manual) { return &filesProcessed, &packagesDiscovered } -func runCataloger(cataloger pkg.Cataloger, resolver source.FileResolver, results chan CatalogResult, progress *progress.Manual) { - catalogerResult := new(CatalogResult) +func runCataloger(cataloger pkg.Cataloger, resolver source.FileResolver) (*catalogResult, error) { + catalogerResult := new(catalogResult) // find packages from the underlying raw data log.Debugf("cataloging with %q", cataloger.Name()) packages, relationships, err := cataloger.Catalog(resolver) if err != nil { - catalogerResult.Error = err - results <- *catalogerResult log.Debugf("cataloger=%q error in handling", cataloger.Name()) - return + return catalogerResult, err } catalogedPackages := len(packages) log.Debugf("cataloger=%q discovered %d packages", cataloger.Name(), catalogedPackages) - progress.N += int64(catalogedPackages) + catalogerResult.Discovered = int64(catalogedPackages) for _, p := range packages { // generate CPEs (note: this is excluded from package ID, so is safe to mutate) @@ -86,52 +87,72 @@ func runCataloger(cataloger pkg.Cataloger, resolver source.FileResolver, results catalogerResult.Packages = append(catalogerResult.Packages, p) } catalogerResult.Relationships = append(catalogerResult.Relationships, relationships...) - results <- *catalogerResult log.Debugf("cataloger=%q done handling", cataloger.Name()) + return catalogerResult, nil } // Catalog a given source (container image or filesystem) with the given catalogers, returning all discovered packages. // In order to efficiently retrieve contents from a underlying container image the content fetch requests are // done in bulk. Specifically, all files of interest are collected from each catalogers and accumulated into a single // request. -func Catalog(resolver source.FileResolver, release *linux.Release, catalogers ...pkg.Cataloger) (*pkg.Catalog, []artifact.Relationship, error) { +// +//nolint:funlen +func Catalog(resolver source.FileResolver, release *linux.Release, parallelism int, catalogers ...pkg.Cataloger) (*pkg.Catalog, []artifact.Relationship, error) { catalog := pkg.NewCatalog() var allRelationships []artifact.Relationship filesProcessed, packagesDiscovered := newMonitor() // perform analysis, accumulating errors for each failed analysis var errs error - // TODO - expose workers as a flag to the cli - workers := 1 + nCatalogers := len(catalogers) - jobs := make(chan pkg.Cataloger, len(catalogers)) - results := make(chan CatalogResult, len(catalogers)+1) + // we do not need more parallelism than there are `catalogers`. + parallelism = int(math.Min(float64(nCatalogers), math.Max(1.0, float64(parallelism)))) + log.Debugf("Using parallelism=%d for catalogs=%d", parallelism, nCatalogers) + + jobs := make(chan pkg.Cataloger, nCatalogers) + results := make(chan *catalogResult, nCatalogers) + discoveredPackages := make(chan int64, nCatalogers) waitGroup := sync.WaitGroup{} - for catalogWorkerIdx := 0; catalogWorkerIdx < workers; catalogWorkerIdx++ { + for i := 0; i < parallelism; i++ { waitGroup.Add(1) go func() { defer waitGroup.Done() - // run each job + // wait for / get the next cataloger job available. for cataloger := range jobs { - runCataloger(cataloger, resolver, results, packagesDiscovered) + catalogResult, err := runCataloger(cataloger, resolver) + + // ensure we set the error to be aggregated + catalogResult.Error = err + + discoveredPackages <- catalogResult.Discovered + + results <- catalogResult } }() } + // dynamically show updated discovered package status + go func() { + for discovered := range discoveredPackages { + packagesDiscovered.N += discovered + } + }() + // Enqueue the jobs for _, cataloger := range catalogers { jobs <- cataloger } close(jobs) - // Wait for the jobs to finish waitGroup.Wait() close(results) + close(discoveredPackages) // collect the results for catalogResult := range results { diff --git a/syft/pkg/cataloger/config.go b/syft/pkg/cataloger/config.go index 478fc292d11..c75e34681c2 100644 --- a/syft/pkg/cataloger/config.go +++ b/syft/pkg/cataloger/config.go @@ -5,13 +5,15 @@ import ( ) type Config struct { - Search SearchConfig - Catalogers []string + Search SearchConfig + Catalogers []string + Parallelism int } func DefaultConfig() Config { return Config{ - Search: DefaultSearchConfig(), + Search: DefaultSearchConfig(), + Parallelism: 1, } } diff --git a/test/cli/packages_cmd_test.go b/test/cli/packages_cmd_test.go index fe57819e7ea..82396e86b75 100644 --- a/test/cli/packages_cmd_test.go +++ b/test/cli/packages_cmd_test.go @@ -204,6 +204,28 @@ func TestPackagesCmdFlags(t *testing.T) { assertSuccessfulReturnCode, }, }, + { + name: "override-default-parallelism", + args: []string{"packages", "-vvv", "-o", "json", "--parallelism", "2", coverageImage}, + assertions: []traitAssertion{ + // the application config in the log matches that of what we expect to have been configured. + assertInOutput("parallelism: 2"), + assertInOutput("Using parallelism=2 for catalogs"), + assertPackageCount(34), + assertSuccessfulReturnCode, + }, + }, + { + name: "default-parallelism", + args: []string{"packages", "-vvv", "-o", "json", coverageImage}, + assertions: []traitAssertion{ + // the application config in the log matches that of what we expect to have been configured. + assertInOutput("parallelism: 1"), + assertInOutput("Using parallelism=1 for catalogs"), + assertPackageCount(34), + assertSuccessfulReturnCode, + }, + }, } for _, test := range tests { diff --git a/test/integration/catalog_packages_test.go b/test/integration/catalog_packages_test.go index 8a54b1b0388..84cb86ad2ee 100644 --- a/test/integration/catalog_packages_test.go +++ b/test/integration/catalog_packages_test.go @@ -41,7 +41,7 @@ func BenchmarkImagePackageCatalogers(b *testing.B) { b.Run(c.Name(), func(b *testing.B) { for i := 0; i < b.N; i++ { - pc, _, err = cataloger.Catalog(resolver, theDistro, c) + pc, _, err = cataloger.Catalog(resolver, theDistro, 1, c) if err != nil { b.Fatalf("failure during benchmark: %+v", err) }