Skip to content

Commit

Permalink
⚡ discover assets in parallel
Browse files Browse the repository at this point in the history
Signed-off-by: Salim Afiune Maya <afiune@mondoo.com>
  • Loading branch information
afiune committed Dec 12, 2024
1 parent 4765ace commit 8c98904
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 27 deletions.
12 changes: 12 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@
"shell", "ssh", "user@18.215.249.49",
],
},
{
"name": "scan github org",
"type": "go",
"request": "launch",
"program": "${workspaceRoot}/apps/cnquery/cnquery.go",
"args": [
"scan",
"github",
"org", "hit-training",
"--log-level", "trace"
]
},
{
"name": "Configure Built-in Providers",
"type": "go",
Expand Down
61 changes: 40 additions & 21 deletions explorer/scan/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package scan
import (
"context"
"errors"
"sync"
"time"

"github.com/rs/zerolog/log"
"go.mondoo.com/cnquery/v11/cli/config"
"go.mondoo.com/cnquery/v11/cli/execruntime"
"go.mondoo.com/cnquery/v11/internal/workerpool"
"go.mondoo.com/cnquery/v11/llx"
"go.mondoo.com/cnquery/v11/logger"
"go.mondoo.com/cnquery/v11/providers"
Expand All @@ -20,6 +22,9 @@ import (
"go.mondoo.com/cnquery/v11/providers-sdk/v1/upstream"
)

// number of parallel goroutines discovering assets
const workers = 10

type AssetWithRuntime struct {
Asset *inventory.Asset
Runtime *providers.Runtime
Expand All @@ -34,11 +39,15 @@ type DiscoveredAssets struct {
platformIds map[string]struct{}
Assets []*AssetWithRuntime
Errors []*AssetWithError
assetsLock sync.Mutex
}

// Add adds an asset and its runtime to the discovered assets list. It returns true if the
// asset has been added, false if it is a duplicate
func (d *DiscoveredAssets) Add(asset *inventory.Asset, runtime *providers.Runtime) bool {
d.assetsLock.Lock()
defer d.assetsLock.Unlock()

isDuplicate := false
for _, platformId := range asset.PlatformIds {
if _, ok := d.platformIds[platformId]; ok {
Expand Down Expand Up @@ -161,35 +170,45 @@ func discoverAssets(rootAssetWithRuntime *AssetWithRuntime, resolvedRootAsset *i
return
}

pool := workerpool.New[bool](workers)
pool.Start()
defer pool.Close()

// for all discovered assets, we apply mondoo-specific labels and annotations that come from the root asset
for _, a := range rootAssetWithRuntime.Runtime.Provider.Connection.Inventory.Spec.Assets {
// create runtime for root asset
assetWithRuntime, err := createRuntimeForAsset(a, upstream, recording)
if err != nil {
log.Error().Err(err).Str("asset", a.Name).Msg("unable to create runtime for asset")
discoveredAssets.AddError(a, err)
continue
}
pool.Submit(func() (bool, error) {
// create runtime for root asset
assetWithRuntime, err := createRuntimeForAsset(a, upstream, recording)
if err != nil {
log.Error().Err(err).Str("asset", a.Name).Msg("unable to create runtime for asset")
discoveredAssets.AddError(a, err)
return false, err
}

// If no asset was returned and no error, then we observed a duplicate asset with a
// runtime that already exists.
if assetWithRuntime == nil {
continue
}
// If no asset was returned and no error, then we observed a duplicate asset with a
// runtime that already exists.
if assetWithRuntime == nil {
return false, nil
}

resolvedAsset := assetWithRuntime.Runtime.Provider.Connection.Asset
if len(resolvedAsset.PlatformIds) > 0 {
prepareAsset(resolvedAsset, resolvedRootAsset, runtimeLabels)
resolvedAsset := assetWithRuntime.Runtime.Provider.Connection.Asset
if len(resolvedAsset.PlatformIds) > 0 {
prepareAsset(resolvedAsset, resolvedRootAsset, runtimeLabels)

// If the asset has been already added, we should close its runtime
if !discoveredAssets.Add(resolvedAsset, assetWithRuntime.Runtime) {
// If the asset has been already added, we should close its runtime
if !discoveredAssets.Add(resolvedAsset, assetWithRuntime.Runtime) {
assetWithRuntime.Runtime.Close()
}
} else {
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
assetWithRuntime.Runtime.Close()
}
} else {
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
assetWithRuntime.Runtime.Close()
}
return true, nil
})
}

// Wait for the workers to finish processing
pool.Wait()
}

func createRuntimeForAsset(asset *inventory.Asset, upstream *upstream.UpstreamConfig, recording llx.Recording) (*AssetWithRuntime, error) {
Expand Down
18 changes: 12 additions & 6 deletions providers-sdk/v1/plugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,8 @@ func (s *Service) AddRuntime(conf *inventory.Config, createRuntime func(connId u
}
// ^^

s.runtimesLock.Lock()
defer s.runtimesLock.Unlock()

// If a runtime with this ID already exists, then return that
if runtime, ok := s.runtimes[conf.Id]; ok {
if runtime, err := s.GetRuntime(conf.Id); err == nil {
return runtime, nil
}

Expand All @@ -66,18 +63,27 @@ func (s *Service) AddRuntime(conf *inventory.Config, createRuntime func(connId u

if runtime.Connection != nil {
if parentId := runtime.Connection.ParentID(); parentId > 0 {
parentRuntime, err := s.doGetRuntime(parentId)
parentRuntime, err := s.GetRuntime(parentId)
if err != nil {
return nil, errors.New("parent connection " + strconv.FormatUint(uint64(parentId), 10) + " not found")
}
runtime.Resources = parentRuntime.Resources

}
}
s.runtimes[conf.Id] = runtime

// store the new runtime
s.addRuntime(conf.Id, runtime)

return runtime, nil
}

func (s *Service) addRuntime(id uint32, runtime *Runtime) {
s.runtimesLock.Lock()
defer s.runtimesLock.Unlock()
s.runtimes[id] = runtime
}

// FIXME: DEPRECATED, remove in v12.0 vv
func (s *Service) deprecatedAddRuntime(createRuntime func(connId uint32) (*Runtime, error)) (*Runtime, error) {
s.runtimesLock.Lock()
Expand Down
1 change: 1 addition & 0 deletions providers/github/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func NewGithubConnection(id uint32, asset *inventory.Asset) (*GithubConnection,
ctx := context.WithValue(context.Background(), github.SleepUntilPrimaryRateLimitResetWhenRateLimited, true)

// perform a quick call to verify the token's validity.
// @afiune do we need to validate the token for every connection? can this be a "once" operation?
_, resp, err := client.Meta.Zen(ctx)
if err != nil {
if resp != nil && resp.StatusCode == 401 {
Expand Down

1 comment on commit 8c98904

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: 8c98904 Previous: ac14153 Ratio
BenchmarkScan_MultipleAssets 120509233 ns/op 10996023 B/op 72642 allocs/op 23087851 ns/op 10947462 B/op 72614 allocs/op 5.22
BenchmarkScan_MultipleAssets - ns/op 120509233 ns/op 23087851 ns/op 5.22

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.