From 19b953df3a2cb474a0f768d9215489d75a0a8b03 Mon Sep 17 00:00:00 2001 From: Andreas Paul Date: Tue, 1 Aug 2017 13:24:19 +0200 Subject: [PATCH] add -maxworker parameter to limit parallel Goroutines (#64) --- README.md | 32 +++++++++++++++++++----------- config.go | 7 +++++++ forge.go | 46 +++++++++++++++++++++++++++++++++++++----- g10k.go | 7 +++++-- g10k_test.go | 14 +++++++------ git.go | 56 +++++++++++++++++++++++++++++++++++++++++++++------- 6 files changed, 131 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 9bb901f..0518ab3 100644 --- a/README.md +++ b/README.md @@ -58,27 +58,37 @@ https://github.com/xorpaul/g10k/releases ``` Usage of ./g10k: -branch string - which git branch of the Puppet environment to update, e.g. core_foobar + which git branch of the Puppet environment to update, e.g. core_foobar + -cachedir string + allows overriding of the g10k config file cachedir setting, the folder in which g10k will download git repositories and Forge modules -check4update - only check if the is newer version of the Puppet module avaialable. Does implicitly set parameter dryrun to true + only check if the is newer version of the Puppet module avaialable. Does implicitly set dryrun to true + -checksum + get the md5 check sum for each Puppetlabs Forge module and verify the integrity of the downloaded archive. Increases g10k run time! -config string - which config file to use + which config file to use -debug - log debug output, defaults to false + log debug output, defaults to false -dryrun - do not modify anything, just print what would be changed + do not modify anything, just print what would be changed -force - purge the Puppet environment directory and do a full sync + purge the Puppet environment directory and do a full sync -info - log info output, defaults to false + log info output, defaults to false + -maxworker int + how many Goroutines are allowed to run in parallel for Git and Forge module resolving (default 50) + -moduledir string + allows overriding of Puppetfile specific moduledir setting, the folder in which Puppet modules will be extracted -puppetfile - install all modules from Puppetfile in cwd + install all modules from Puppetfile in cwd + -quiet + no output, defaults to false -usemove - do not use hardlinks to populate your Puppet environments with Puppetlabs Forge modules. Uses simple move instead of hard links and purge the Forge cache directory after each run! + do not use hardlinks to populate your Puppet environments with Puppetlabs Forge modules. Instead uses simple move commands and purges the Forge cache directory after each run! Var(&Useful for g10k runs inside a Docker container) -verbose - log verbose output, defaults to false + log verbose output, defaults to false -version - show build time and version number + show build time and version number ``` Regarding anything usage/workflow you really can just use the great [puppetlabs/r10k](https://github.com/puppetlabs/r10k/blob/master/doc/dynamic-environments.mkd) docs as the [Puppetfile](https://github.com/puppetlabs/r10k/blob/master/doc/puppetfile.mkd) etc. are all intentionally kept unchanged. diff --git a/config.go b/config.go index 0a025dd..9bb8e03 100644 --- a/config.go +++ b/config.go @@ -56,6 +56,13 @@ func readConfigfile(configFile string) ConfigSettings { config.Timeout = 5 } + // set default max Go routines for Forge and Git module resolution if none is given + if maxworker == 0 { + config.Maxworker = 50 + } else { + config.Maxworker = maxworker + } + return config } diff --git a/forge.go b/forge.go index bd436fa..7a9bd2d 100644 --- a/forge.go +++ b/forge.go @@ -302,7 +302,6 @@ func unTar(r io.Reader, targetBaseDir string) { // get the individual filename and extract to the current directory filename := header.Name targetFilename := targetBaseDir + filename - //Debugf("Trying to extract file" + filename) switch header.Typeflag { case tar.TypeDir: @@ -479,21 +478,58 @@ func resolveForgeModules(modules map[string]ForgeModule) { Debugf("empty ForgeModule[] found, skipping...") return } - var wgForge sync.WaitGroup bar := uiprogress.AddBar(len(modules)).AppendCompleted().PrependElapsed() bar.PrependFunc(func(b *uiprogress.Bar) string { return fmt.Sprintf("Resolving Forge modules (%d/%d)", b.Current(), len(modules)) }) + // Dummy channel to coordinate the number of concurrent goroutines. + // This channel should be buffered otherwise we will be immediately blocked + // when trying to fill it. + + Debugf("Resolving " + strconv.Itoa(len(modules)) + " Forge modules with " + strconv.Itoa(config.Maxworker) + " workers") + concurrentGoroutines := make(chan struct{}, config.Maxworker) + // Fill the dummy channel with config.Maxworker empty struct. + for i := 0; i < config.Maxworker; i++ { + concurrentGoroutines <- struct{}{} + } + + // The done channel indicates when a single goroutine has + // finished its job. + done := make(chan bool) + // The waitForAllJobs channel allows the main program + // to wait until we have indeed done all the jobs. + waitForAllJobs := make(chan bool) + // Collect all the jobs, and since the job is finished, we can + // release another spot for a goroutine. + go func() { + for i := 1; i <= len(modules); i++ { + <-done + // Say that another goroutine can now start. + concurrentGoroutines <- struct{}{} + } + // We have collected all the jobs, the program can now terminate + waitForAllJobs <- true + }() + wg := sync.WaitGroup{} + wg.Add(len(modules)) + for m, fm := range modules { - wgForge.Add(1) go func(m string, fm ForgeModule, bar *uiprogress.Bar) { - defer wgForge.Done() + // Try to receive from the concurrentGoroutines channel. When we have something, + // it means we can start a new goroutine because another one finished. + // Otherwise, it will block the execution until an execution + // spot is available. + <-concurrentGoroutines defer bar.Incr() + defer wg.Done() Debugf("resolveForgeModules(): Trying to get forge module " + m + " with Forge base url " + fm.baseUrl + " and CacheTtl set to " + fm.cacheTtl.String()) doModuleInstallOrNothing(fm) + done <- true }(m, fm, bar) } - wgForge.Wait() + // Wait for all jobs to finish + <-waitForAllJobs + wg.Wait() } func check4ForgeUpdate(moduleName string, currentVersion string, latestVersion string) { diff --git a/g10k.go b/g10k.go index a7bd24b..b1e00b0 100644 --- a/g10k.go +++ b/g10k.go @@ -42,6 +42,7 @@ var ( buildtime string uniqueForgeModules map[string]ForgeModule latestForgeModules LatestForgeModules + maxworker int ) type LatestForgeModules struct { @@ -60,6 +61,7 @@ type ConfigSettings struct { Sources map[string]Source Timeout int `yaml:"timeout"` IgnoreUnreachableModules bool `yaml:"ignore_unreachable_modules"` + Maxworker int } type Forge struct { @@ -141,6 +143,7 @@ func main() { flag.StringVar(&branchParam, "branch", "", "which git branch of the Puppet environment to update, e.g. core_foobar") flag.StringVar(&moduleDirParam, "moduledir", "", "allows overriding of Puppetfile specific moduledir setting, the folder in which Puppet modules will be extracted") flag.StringVar(&cacheDirParam, "cachedir", "", "allows overriding of the g10k config file cachedir setting, the folder in which g10k will download git repositories and Forge modules") + flag.IntVar(&maxworker, "maxworker", 50, "how many Goroutines are allowed to run in parallel for Git and Forge module resolving") flag.BoolVar(&pfMode, "puppetfile", false, "install all modules from Puppetfile in cwd") flag.BoolVar(&force, "force", false, "purge the Puppet environment directory and do a full sync") flag.BoolVar(&dryRun, "dryrun", false, "do not modify anything, just print what would be changed") @@ -206,7 +209,7 @@ func main() { } //config = ConfigSettings{CacheDir: cachedir, ForgeCacheDir: cachedir, ModulesCacheDir: cachedir, EnvCacheDir: cachedir, Forge:{Baseurl: "https://forgeapi.puppetlabs.com"}, Sources: sm} forgeDefaultSettings := Forge{Baseurl: "https://forgeapi.puppetlabs.com"} - config = ConfigSettings{CacheDir: cachedir, ForgeCacheDir: cachedir, ModulesCacheDir: cachedir, EnvCacheDir: cachedir, Sources: sm, Forge: forgeDefaultSettings} + config = ConfigSettings{CacheDir: cachedir, ForgeCacheDir: cachedir, ModulesCacheDir: cachedir, EnvCacheDir: cachedir, Sources: sm, Forge: forgeDefaultSettings, Maxworker: maxworker} target = "./Puppetfile" puppetfile := readPuppetfile("./Puppetfile", "", "cmdlineparam", false) puppetfile.workDir = "." @@ -238,7 +241,7 @@ func main() { Debugf("Forge modules metadata.json parsing took " + strconv.FormatFloat(metadataJsonParseTime, 'f', 4, 64) + " seconds") if !check4update && !quiet { - fmt.Println("Synced", target, "with", syncGitCount, "git repositories and", syncForgeCount, "Forge modules in "+strconv.FormatFloat(time.Since(before).Seconds(), 'f', 1, 64)+"s with git ("+strconv.FormatFloat(syncGitTime, 'f', 1, 64)+"s sync, I/O", strconv.FormatFloat(ioGitTime, 'f', 1, 64)+"s) and Forge ("+strconv.FormatFloat(syncForgeTime, 'f', 1, 64)+"s query+download, I/O", strconv.FormatFloat(ioForgeTime, 'f', 1, 64)+"s)") + fmt.Println("Synced", target, "with", syncGitCount, "git repositories and", syncForgeCount, "Forge modules in "+strconv.FormatFloat(time.Since(before).Seconds(), 'f', 1, 64)+"s with git ("+strconv.FormatFloat(syncGitTime, 'f', 1, 64)+"s sync, I/O", strconv.FormatFloat(ioGitTime, 'f', 1, 64)+"s) and Forge ("+strconv.FormatFloat(syncForgeTime, 'f', 1, 64)+"s query+download, I/O", strconv.FormatFloat(ioForgeTime, 'f', 1, 64)+"s) using", strconv.Itoa(config.Maxworker), "workers") } if dryRun && (needSyncForgeCount > 0 || needSyncGitCount > 0) { os.Exit(1) diff --git a/g10k_test.go b/g10k_test.go index 760c5d7..c91111c 100644 --- a/g10k_test.go +++ b/g10k_test.go @@ -40,7 +40,7 @@ func TestConfigPrefix(t *testing.T) { ModulesCacheDir: "/tmp/g10k/modules/", EnvCacheDir: "/tmp/g10k/environments/", Git: Git{privateKey: "", username: ""}, Forge: Forge{Baseurl: "https://forgeapi.puppetlabs.com"}, - Sources: s, Timeout: 5} + Sources: s, Timeout: 5, Maxworker: 50} if !reflect.DeepEqual(got, expected) { t.Errorf("Expected ConfigSettings: %+v, but got ConfigSettings: %+v", expected, got) @@ -60,7 +60,7 @@ func TestConfigForceForgeVersions(t *testing.T) { ModulesCacheDir: "/tmp/g10k/modules/", EnvCacheDir: "/tmp/g10k/environments/", Git: Git{privateKey: "", username: ""}, Forge: Forge{Baseurl: "https://forgeapi.puppetlabs.com"}, - Sources: s, Timeout: 5} + Sources: s, Timeout: 5, Maxworker: 50} if !reflect.DeepEqual(got, expected) { t.Errorf("Expected ConfigSettings: %+v, but got ConfigSettings: %+v", expected, got) @@ -80,7 +80,7 @@ func TestConfigAddWarning(t *testing.T) { ModulesCacheDir: "/tmp/g10k/modules/", EnvCacheDir: "/tmp/g10k/environments/", Git: Git{privateKey: "", username: ""}, Forge: Forge{Baseurl: "https://forgeapi.puppetlabs.com"}, - Sources: s, Timeout: 5} + Sources: s, Timeout: 5, Maxworker: 50} if !reflect.DeepEqual(got, expected) { t.Errorf("Expected ConfigSettings: %+v, but got ConfigSettings: %+v", expected, got) @@ -123,6 +123,8 @@ func TestResolvStatic(t *testing.T) { purgeDir("./cache/", "TestResolvStatic()") purgeDir("./example/", "TestResolvStatic()") config = readConfigfile("tests/TestConfigStatic.yaml") + // increase maxworker to finish the test quicker + config.Maxworker = 500 resolvePuppetEnvironment("static") cmd := exec.Command(path, "-vvv", "-l", "-r", "./example", "-a", "-k", "tests/hashdeep_example_static.hashdeep") @@ -214,7 +216,7 @@ func TestInvalidFilesizeForgemodule(t *testing.T) { pfm := make(map[string]Puppetfile) pfm["test"] = pf - config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache"} + config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache", Maxworker: 500} defer purgeDir(pf.workDir, "TestInvalidMetadataForgemodule") defer purgeDir(config.ForgeCacheDir, "TestInvalidMetadataForgemodule") @@ -267,7 +269,7 @@ func TestInvalidMd5sumForgemodule(t *testing.T) { pfm := make(map[string]Puppetfile) pfm["test"] = pf - config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache"} + config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache", Maxworker: 500} defer purgeDir(pf.workDir, "TestInvalidMetadataForgemodule") defer purgeDir(config.ForgeCacheDir, "TestInvalidMetadataForgemodule") @@ -313,7 +315,7 @@ func TestInvalidSha256sumForgemodule(t *testing.T) { pfm := make(map[string]Puppetfile) pfm["test"] = pf - config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache"} + config = ConfigSettings{ForgeCacheDir: "/tmp/forge_cache", Maxworker: 500} defer purgeDir(pf.workDir, "TestInvalidMetadataForgemodule") defer purgeDir(config.ForgeCacheDir, "TestInvalidMetadataForgemodule") diff --git a/git.go b/git.go index 9d2730b..8e3fb4d 100644 --- a/git.go +++ b/git.go @@ -19,17 +19,56 @@ func resolveGitRepositories(uniqueGitModules map[string]GitModule) { Debugf("uniqueGitModules[] is empty, skipping...") return } - var wgGit sync.WaitGroup bar := uiprogress.AddBar(len(uniqueGitModules)).AppendCompleted().PrependElapsed() bar.PrependFunc(func(b *uiprogress.Bar) string { return fmt.Sprintf("Resolving Git modules (%d/%d)", b.Current(), len(uniqueGitModules)) }) + // Dummy channel to coordinate the number of concurrent goroutines. + // This channel should be buffered otherwise we will be immediately blocked + // when trying to fill it. + + Debugf("Resolving " + strconv.Itoa(len(uniqueGitModules)) + " Git modules with " + strconv.Itoa(config.Maxworker) + " workers") + concurrentGoroutines := make(chan struct{}, config.Maxworker) + // Fill the dummy channel with config.Maxworker empty struct. + for i := 0; i < config.Maxworker; i++ { + concurrentGoroutines <- struct{}{} + } + + // The done channel indicates when a single goroutine has + // finished its job. + done := make(chan bool) + // The waitForAllJobs channel allows the main program + // to wait until we have indeed done all the jobs. + waitForAllJobs := make(chan bool) + // Collect all the jobs, and since the job is finished, we can + // release another spot for a goroutine. + go func() { + for _, gm := range uniqueGitModules { + go func(gm GitModule) { + <-done + // Say that another goroutine can now start. + concurrentGoroutines <- struct{}{} + }(gm) + } + // We have collected all the jobs, the program + // can now terminate 8.6s with git (13.7s sync, I/O 1.2s) + waitForAllJobs <- true + }() + wg := sync.WaitGroup{} + wg.Add(len(uniqueGitModules)) + for url, gm := range uniqueGitModules { - wgGit.Add(1) + Debugf("git repo url " + url) privateKey := gm.privateKey - go func(url string, privateKey string, gm GitModule) { - defer wgGit.Done() + go func(url string, privateKey string, gm GitModule, bar *uiprogress.Bar) { + // Try to receive from the concurrentGoroutines channel. When we have something, + // it means we can start a new goroutine because another one finished. + // Otherwise, it will block the execution until an execution + // spot is available. + <-concurrentGoroutines defer bar.Incr() + defer wg.Done() + if len(gm.privateKey) > 0 { Debugf("git repo url " + url + " with ssh key " + privateKey) } else { @@ -43,10 +82,13 @@ func resolveGitRepositories(uniqueGitModules map[string]GitModule) { doMirrorOrUpdate(url, workDir, privateKey, gm.ignoreUnreachable) // doCloneOrPull(source, workDir, targetDir, sa.Remote, branch, sa.PrivateKey) - - }(url, privateKey, gm) + }(url, privateKey, gm, bar) + done <- true } - wgGit.Wait() + + // Wait for all jobs to finish + <-waitForAllJobs + wg.Wait() } func doMirrorOrUpdate(url string, workDir string, sshPrivateKey string, allowFail bool) bool {