Skip to content

Commit

Permalink
add -maxworker parameter to limit parallel Goroutines (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
xorpaul authored Aug 1, 2017
1 parent 22779e9 commit 19b953d
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 31 deletions.
32 changes: 21 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,37 @@ https://github.com/xorpaul/g10k/releases
```
Usage of ./g10k:
-branch string
which git branch of the Puppet environment to update, e.g. core_foobar
which git branch of the Puppet environment to update, e.g. core_foobar
-cachedir string
allows overriding of the g10k config file cachedir setting, the folder in which g10k will download git repositories and Forge modules
-check4update
only check if the is newer version of the Puppet module avaialable. Does implicitly set parameter dryrun to true
only check if the is newer version of the Puppet module avaialable. Does implicitly set dryrun to true
-checksum
get the md5 check sum for each Puppetlabs Forge module and verify the integrity of the downloaded archive. Increases g10k run time!
-config string
which config file to use
which config file to use
-debug
log debug output, defaults to false
log debug output, defaults to false
-dryrun
do not modify anything, just print what would be changed
do not modify anything, just print what would be changed
-force
purge the Puppet environment directory and do a full sync
purge the Puppet environment directory and do a full sync
-info
log info output, defaults to false
log info output, defaults to false
-maxworker int
how many Goroutines are allowed to run in parallel for Git and Forge module resolving (default 50)
-moduledir string
allows overriding of Puppetfile specific moduledir setting, the folder in which Puppet modules will be extracted
-puppetfile
install all modules from Puppetfile in cwd
install all modules from Puppetfile in cwd
-quiet
no output, defaults to false
-usemove
do not use hardlinks to populate your Puppet environments with Puppetlabs Forge modules. Uses simple move instead of hard links and purge the Forge cache directory after each run!
do not use hardlinks to populate your Puppet environments with Puppetlabs Forge modules. Instead uses simple move commands and purges the Forge cache directory after each run! Var(&Useful for g10k runs inside a Docker container)
-verbose
log verbose output, defaults to false
log verbose output, defaults to false
-version
show build time and version number
show build time and version number
```

Regarding anything usage/workflow you really can just use the great [puppetlabs/r10k](https://github.com/puppetlabs/r10k/blob/master/doc/dynamic-environments.mkd) docs as the [Puppetfile](https://github.com/puppetlabs/r10k/blob/master/doc/puppetfile.mkd) etc. are all intentionally kept unchanged.
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func readConfigfile(configFile string) ConfigSettings {
config.Timeout = 5
}

// set default max Go routines for Forge and Git module resolution if none is given
if maxworker == 0 {
config.Maxworker = 50
} else {
config.Maxworker = maxworker
}

return config
}

Expand Down
46 changes: 41 additions & 5 deletions forge.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ func unTar(r io.Reader, targetBaseDir string) {
// get the individual filename and extract to the current directory
filename := header.Name
targetFilename := targetBaseDir + filename
//Debugf("Trying to extract file" + filename)

switch header.Typeflag {
case tar.TypeDir:
Expand Down Expand Up @@ -479,21 +478,58 @@ func resolveForgeModules(modules map[string]ForgeModule) {
Debugf("empty ForgeModule[] found, skipping...")
return
}
var wgForge sync.WaitGroup
bar := uiprogress.AddBar(len(modules)).AppendCompleted().PrependElapsed()
bar.PrependFunc(func(b *uiprogress.Bar) string {
return fmt.Sprintf("Resolving Forge modules (%d/%d)", b.Current(), len(modules))
})
// Dummy channel to coordinate the number of concurrent goroutines.
// This channel should be buffered otherwise we will be immediately blocked
// when trying to fill it.

Debugf("Resolving " + strconv.Itoa(len(modules)) + " Forge modules with " + strconv.Itoa(config.Maxworker) + " workers")
concurrentGoroutines := make(chan struct{}, config.Maxworker)
// Fill the dummy channel with config.Maxworker empty struct.
for i := 0; i < config.Maxworker; i++ {
concurrentGoroutines <- struct{}{}
}

// The done channel indicates when a single goroutine has
// finished its job.
done := make(chan bool)
// The waitForAllJobs channel allows the main program
// to wait until we have indeed done all the jobs.
waitForAllJobs := make(chan bool)
// Collect all the jobs, and since the job is finished, we can
// release another spot for a goroutine.
go func() {
for i := 1; i <= len(modules); i++ {
<-done
// Say that another goroutine can now start.
concurrentGoroutines <- struct{}{}
}
// We have collected all the jobs, the program can now terminate
waitForAllJobs <- true
}()
wg := sync.WaitGroup{}
wg.Add(len(modules))

for m, fm := range modules {
wgForge.Add(1)
go func(m string, fm ForgeModule, bar *uiprogress.Bar) {
defer wgForge.Done()
// Try to receive from the concurrentGoroutines channel. When we have something,
// it means we can start a new goroutine because another one finished.
// Otherwise, it will block the execution until an execution
// spot is available.
<-concurrentGoroutines
defer bar.Incr()
defer wg.Done()
Debugf("resolveForgeModules(): Trying to get forge module " + m + " with Forge base url " + fm.baseUrl + " and CacheTtl set to " + fm.cacheTtl.String())
doModuleInstallOrNothing(fm)
done <- true
}(m, fm, bar)
}
wgForge.Wait()
// Wait for all jobs to finish
<-waitForAllJobs
wg.Wait()
}

func check4ForgeUpdate(moduleName string, currentVersion string, latestVersion string) {
Expand Down
7 changes: 5 additions & 2 deletions g10k.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
buildtime string
uniqueForgeModules map[string]ForgeModule
latestForgeModules LatestForgeModules
maxworker int
)

type LatestForgeModules struct {
Expand All @@ -60,6 +61,7 @@ type ConfigSettings struct {
Sources map[string]Source
Timeout int `yaml:"timeout"`
IgnoreUnreachableModules bool `yaml:"ignore_unreachable_modules"`
Maxworker int
}

type Forge struct {
Expand Down Expand Up @@ -141,6 +143,7 @@ func main() {
flag.StringVar(&branchParam, "branch", "", "which git branch of the Puppet environment to update, e.g. core_foobar")
flag.StringVar(&moduleDirParam, "moduledir", "", "allows overriding of Puppetfile specific moduledir setting, the folder in which Puppet modules will be extracted")
flag.StringVar(&cacheDirParam, "cachedir", "", "allows overriding of the g10k config file cachedir setting, the folder in which g10k will download git repositories and Forge modules")
flag.IntVar(&maxworker, "maxworker", 50, "how many Goroutines are allowed to run in parallel for Git and Forge module resolving")
flag.BoolVar(&pfMode, "puppetfile", false, "install all modules from Puppetfile in cwd")
flag.BoolVar(&force, "force", false, "purge the Puppet environment directory and do a full sync")
flag.BoolVar(&dryRun, "dryrun", false, "do not modify anything, just print what would be changed")
Expand Down Expand Up @@ -206,7 +209,7 @@ func main() {
}
//config = ConfigSettings{CacheDir: cachedir, ForgeCacheDir: cachedir, ModulesCacheDir: cachedir, EnvCacheDir: cachedir, Forge:{Baseurl: "https://forgeapi.puppetlabs.com"}, Sources: sm}
forgeDefaultSettings := Forge{Baseurl: "https://forgeapi.puppetlabs.com"}
config = ConfigSettings{CacheDir: cachedir, ForgeCacheDir: cachedir, ModulesCacheDir: cachedir, EnvCacheDir: cachedir, Sources: sm, Forge: forgeDefaultSettings}
config = ConfigSettings{CacheDir: cachedir, ForgeCacheDir: cachedir, ModulesCacheDir: cachedir, EnvCacheDir: cachedir, Sources: sm, Forge: forgeDefaultSettings, Maxworker: maxworker}
target = "./Puppetfile"
puppetfile := readPuppetfile("./Puppetfile", "", "cmdlineparam", false)
puppetfile.workDir = "."
Expand Down Expand Up @@ -238,7 +241,7 @@ func main() {
Debugf("Forge modules metadata.json parsing took " + strconv.FormatFloat(metadataJsonParseTime, 'f', 4, 64) + " seconds")

if !check4update && !quiet {
fmt.Println("Synced", target, "with", syncGitCount, "git repositories and", syncForgeCount, "Forge modules in "+strconv.FormatFloat(time.Since(before).Seconds(), 'f', 1, 64)+"s with git ("+strconv.FormatFloat(syncGitTime, 'f', 1, 64)+"s sync, I/O", strconv.FormatFloat(ioGitTime, 'f', 1, 64)+"s) and Forge ("+strconv.FormatFloat(syncForgeTime, 'f', 1, 64)+"s query+download, I/O", strconv.FormatFloat(ioForgeTime, 'f', 1, 64)+"s)")
fmt.Println("Synced", target, "with", syncGitCount, "git repositories and", syncForgeCount, "Forge modules in "+strconv.FormatFloat(time.Since(before).Seconds(), 'f', 1, 64)+"s with git ("+strconv.FormatFloat(syncGitTime, 'f', 1, 64)+"s sync, I/O", strconv.FormatFloat(ioGitTime, 'f', 1, 64)+"s) and Forge ("+strconv.FormatFloat(syncForgeTime, 'f', 1, 64)+"s query+download, I/O", strconv.FormatFloat(ioForgeTime, 'f', 1, 64)+"s) using", strconv.Itoa(config.Maxworker), "workers")
}
if dryRun && (needSyncForgeCount > 0 || needSyncGitCount > 0) {
os.Exit(1)
Expand Down
14 changes: 8 additions & 6 deletions g10k_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestConfigPrefix(t *testing.T) {
ModulesCacheDir: "/tmp/g10k/modules/", EnvCacheDir: "/tmp/g10k/environments/",
Git: Git{privateKey: "", username: ""},
Forge: Forge{Baseurl: "https://forgeapi.puppetlabs.com"},
Sources: s, Timeout: 5}
Sources: s, Timeout: 5, Maxworker: 50}

if !reflect.DeepEqual(got, expected) {
t.Errorf("Expected ConfigSettings: %+v, but got ConfigSettings: %+v", expected, got)
Expand All @@ -60,7 +60,7 @@ func TestConfigForceForgeVersions(t *testing.T) {
ModulesCacheDir: "/tmp/g10k/modules/", EnvCacheDir: "/tmp/g10k/environments/",
Git: Git{privateKey: "", username: ""},
Forge: Forge{Baseurl: "https://forgeapi.puppetlabs.com"},
Sources: s, Timeout: 5}
Sources: s, Timeout: 5, Maxworker: 50}

if !reflect.DeepEqual(got, expected) {
t.Errorf("Expected ConfigSettings: %+v, but got ConfigSettings: %+v", expected, got)
Expand All @@ -80,7 +80,7 @@ func TestConfigAddWarning(t *testing.T) {
ModulesCacheDir: "/tmp/g10k/modules/", EnvCacheDir: "/tmp/g10k/environments/",
Git: Git{privateKey: "", username: ""},
Forge: Forge{Baseurl: "https://forgeapi.puppetlabs.com"},
Sources: s, Timeout: 5}
Sources: s, Timeout: 5, Maxworker: 50}

if !reflect.DeepEqual(got, expected) {
t.Errorf("Expected ConfigSettings: %+v, but got ConfigSettings: %+v", expected, got)
Expand Down Expand Up @@ -123,6 +123,8 @@ func TestResolvStatic(t *testing.T) {
purgeDir("./cache/", "TestResolvStatic()")
purgeDir("./example/", "TestResolvStatic()")
config = readConfigfile("tests/TestConfigStatic.yaml")
// increase maxworker to finish the test quicker
config.Maxworker = 500
resolvePuppetEnvironment("static")

cmd := exec.Command(path, "-vvv", "-l", "-r", "./example", "-a", "-k", "tests/hashdeep_example_static.hashdeep")
Expand Down Expand Up @@ -214,7 +216,7 @@ func TestInvalidFilesizeForgemodule(t *testing.T) {
pfm := make(map[string]Puppetfile)
pfm["test"] = pf

config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache"}
config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache", Maxworker: 500}
defer purgeDir(pf.workDir, "TestInvalidMetadataForgemodule")
defer purgeDir(config.ForgeCacheDir, "TestInvalidMetadataForgemodule")

Expand Down Expand Up @@ -267,7 +269,7 @@ func TestInvalidMd5sumForgemodule(t *testing.T) {
pfm := make(map[string]Puppetfile)
pfm["test"] = pf

config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache"}
config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache", Maxworker: 500}
defer purgeDir(pf.workDir, "TestInvalidMetadataForgemodule")
defer purgeDir(config.ForgeCacheDir, "TestInvalidMetadataForgemodule")

Expand Down Expand Up @@ -313,7 +315,7 @@ func TestInvalidSha256sumForgemodule(t *testing.T) {
pfm := make(map[string]Puppetfile)
pfm["test"] = pf

config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache"}
config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache", Maxworker: 500}
defer purgeDir(pf.workDir, "TestInvalidMetadataForgemodule")
defer purgeDir(config.ForgeCacheDir, "TestInvalidMetadataForgemodule")

Expand Down
56 changes: 49 additions & 7 deletions git.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,56 @@ func resolveGitRepositories(uniqueGitModules map[string]GitModule) {
Debugf("uniqueGitModules[] is empty, skipping...")
return
}
var wgGit sync.WaitGroup
bar := uiprogress.AddBar(len(uniqueGitModules)).AppendCompleted().PrependElapsed()
bar.PrependFunc(func(b *uiprogress.Bar) string {
return fmt.Sprintf("Resolving Git modules (%d/%d)", b.Current(), len(uniqueGitModules))
})
// Dummy channel to coordinate the number of concurrent goroutines.
// This channel should be buffered otherwise we will be immediately blocked
// when trying to fill it.

Debugf("Resolving " + strconv.Itoa(len(uniqueGitModules)) + " Git modules with " + strconv.Itoa(config.Maxworker) + " workers")
concurrentGoroutines := make(chan struct{}, config.Maxworker)
// Fill the dummy channel with config.Maxworker empty struct.
for i := 0; i < config.Maxworker; i++ {
concurrentGoroutines <- struct{}{}
}

// The done channel indicates when a single goroutine has
// finished its job.
done := make(chan bool)
// The waitForAllJobs channel allows the main program
// to wait until we have indeed done all the jobs.
waitForAllJobs := make(chan bool)
// Collect all the jobs, and since the job is finished, we can
// release another spot for a goroutine.
go func() {
for _, gm := range uniqueGitModules {
go func(gm GitModule) {
<-done
// Say that another goroutine can now start.
concurrentGoroutines <- struct{}{}
}(gm)
}
// We have collected all the jobs, the program
// can now terminate 8.6s with git (13.7s sync, I/O 1.2s)
waitForAllJobs <- true
}()
wg := sync.WaitGroup{}
wg.Add(len(uniqueGitModules))

for url, gm := range uniqueGitModules {
wgGit.Add(1)
Debugf("git repo url " + url)
privateKey := gm.privateKey
go func(url string, privateKey string, gm GitModule) {
defer wgGit.Done()
go func(url string, privateKey string, gm GitModule, bar *uiprogress.Bar) {
// Try to receive from the concurrentGoroutines channel. When we have something,
// it means we can start a new goroutine because another one finished.
// Otherwise, it will block the execution until an execution
// spot is available.
<-concurrentGoroutines
defer bar.Incr()
defer wg.Done()

if len(gm.privateKey) > 0 {
Debugf("git repo url " + url + " with ssh key " + privateKey)
} else {
Expand All @@ -43,10 +82,13 @@ func resolveGitRepositories(uniqueGitModules map[string]GitModule) {

doMirrorOrUpdate(url, workDir, privateKey, gm.ignoreUnreachable)
// doCloneOrPull(source, workDir, targetDir, sa.Remote, branch, sa.PrivateKey)

}(url, privateKey, gm)
}(url, privateKey, gm, bar)
done <- true
}
wgGit.Wait()

// Wait for all jobs to finish
<-waitForAllJobs
wg.Wait()
}

func doMirrorOrUpdate(url string, workDir string, sshPrivateKey string, allowFail bool) bool {
Expand Down

0 comments on commit 19b953d

Please sign in to comment.