From e27a5d14c05324a89820ef8c382f63cb407410fb Mon Sep 17 00:00:00 2001 From: Justin Stoller Date: Fri, 1 May 2020 13:40:51 -0700 Subject: [PATCH] [#1058] Serially deploy modules that share a cachedir When we raised the default threadpool for deploying modules above 1, we began to see issues when caching a remote locally if that remote/cache was shared by multiple modules within the Puppetfile. To resolve this, when we create the module we group it with any other module that may share its cachedir. We then pull modules by cachedir off of the queue and serially process those. --- lib/r10k/git/cache.rb | 4 +- lib/r10k/git/stateful_repository.rb | 4 ++ lib/r10k/puppetfile.rb | 33 +++++++++++++- spec/unit/action/deploy/module_spec.rb | 4 +- spec/unit/action/puppetfile/install_spec.rb | 1 + spec/unit/puppetfile_spec.rb | 49 ++++++++++++++++++++- 6 files changed, 86 insertions(+), 9 deletions(-) diff --git a/lib/r10k/git/cache.rb b/lib/r10k/git/cache.rb index 5c03f9fce..61269d835 100644 --- a/lib/r10k/git/cache.rb +++ b/lib/r10k/git/cache.rb @@ -99,10 +99,8 @@ def reset! alias cached? exist? - private - # Reformat the remote name into something that can be used as a directory def sanitized_dirname - @remote.gsub(/[^@\w\.-]/, '-') + @sanitized_dirname ||= @remote.gsub(/[^@\w\.-]/, '-') end end diff --git a/lib/r10k/git/stateful_repository.rb b/lib/r10k/git/stateful_repository.rb index 981ac0ff6..ad2eed6c4 100644 --- a/lib/r10k/git/stateful_repository.rb +++ b/lib/r10k/git/stateful_repository.rb @@ -12,6 +12,10 @@ class R10K::Git::StatefulRepository # @api private attr_reader :repo + # @!attribute [r] cache + # @api private + attr_reader :cache + extend Forwardable def_delegators :@repo, :head, :tracked_paths diff --git a/lib/r10k/puppetfile.rb b/lib/r10k/puppetfile.rb index 070d93913..6192eb489 100644 --- a/lib/r10k/puppetfile.rb +++ b/lib/r10k/puppetfile.rb @@ -42,6 +42,11 @@ class Puppetfile # @return [Boolean] Overwrite any locally made changes attr_accessor :force + # @!attribute [r] modules_by_vcs_cachedir + # @api private Only exposed for testing purposes + # @return [Hash{:none, String => Array}] + attr_reader :modules_by_vcs_cachedir + # @param [String] basedir # @param [String] moduledir The directory to install the modules, default to #{basedir}/modules # @param [String] puppetfile_path The path to the Puppetfile, default to #{basedir}/Puppetfile @@ -58,6 +63,7 @@ def initialize(basedir, moduledir = nil, puppetfile_path = nil, puppetfile_name @modules = [] @managed_content = {} + @modules_by_vcs_cachedir = {} @forge = 'forgeapi.puppetlabs.com' @loaded = false @@ -137,6 +143,9 @@ def add_module(name, args) mod.origin = 'Puppetfile' @managed_content[install_path] << mod.name + cachedir = module_vcs_cachedir(mod) + @modules_by_vcs_cachedir[cachedir] ||= [] + @modules_by_vcs_cachedir[cachedir] << mod @modules << mod end @@ -180,6 +189,19 @@ def accept(visitor) private + def module_vcs_cachedir(mod) + if mod.respond_to? :repo + repo = mod.repo + if repo.respond_to? :cache + cache = repo.cache + if cache.respond_to? :sanitized_dirname + return cache.sanitized_dirname + end + end + end + :none + end + def serial_accept(visitor) visitor.visit(:puppetfile, self) do modules.each do |mod| @@ -212,15 +234,22 @@ def concurrent_accept(visitor, pool_size) def modules_queue(visitor) Queue.new.tap do |queue| visitor.visit(:puppetfile, self) do - modules.each { |mod| queue << mod } + modules_by_cachedir = modules_by_vcs_cachedir.clone + modules_without_vcs_cachedir = modules_by_cachedir.delete(:none) || [] + + modules_without_vcs_cachedir.each {|mod| queue << Array(mod) } + modules_by_cachedir.values.each {|mods| queue << mods } end end end + public :modules_queue def visitor_thread(visitor, mods_queue) Thread.new do begin - while mod = mods_queue.pop(true) do mod.accept(visitor) end + while mods = mods_queue.pop(true) do + mods.each {|mod| mod.accept(visitor) } + end rescue ThreadError => e logger.debug _("Module thread %{id} exiting: %{message}") % {message: e.message, id: Thread.current.object_id} Thread.exit diff --git a/spec/unit/action/deploy/module_spec.rb b/spec/unit/action/deploy/module_spec.rb index 82e1115b2..a213255bd 100644 --- a/spec/unit/action/deploy/module_spec.rb +++ b/spec/unit/action/deploy/module_spec.rb @@ -70,8 +70,8 @@ before do allow(subject).to receive(:visit_environment).and_wrap_original do |original, environment, &block| - expect(environment.puppetfile).to receive(:modules).and_return( - [R10K::Module::Local.new(environment.name, '/fakedir', [], environment)] + expect(environment.puppetfile).to receive(:modules_by_vcs_cachedir).and_return( + {none: [R10K::Module::Local.new(environment.name, '/fakedir', [], environment)]} ) original.call(environment, &block) end diff --git a/spec/unit/action/puppetfile/install_spec.rb b/spec/unit/action/puppetfile/install_spec.rb index f77641dba..0b677e8ca 100644 --- a/spec/unit/action/puppetfile/install_spec.rb +++ b/spec/unit/action/puppetfile/install_spec.rb @@ -27,6 +27,7 @@ def installer(opts = {}, argv = [], settings = {}) before do allow(puppetfile).to receive(:purge!) allow(puppetfile).to receive(:modules).and_return(modules) + allow(puppetfile).to receive(:modules_by_vcs_cachedir).and_return({none: modules}) end it "syncs each module in the Puppetfile" do diff --git a/spec/unit/puppetfile_spec.rb b/spec/unit/puppetfile_spec.rb index dc5ac388a..8a42104b5 100644 --- a/spec/unit/puppetfile_spec.rb +++ b/spec/unit/puppetfile_spec.rb @@ -127,6 +127,25 @@ expect { subject.add_module('puppet/test_module', module_opts) }.to raise_error(R10K::Error, /cannot manage content.*is not within/i).and not_change { subject.modules } end + + it "groups modules by vcs cache location" do + module_opts = { install_path: File.join(subject.basedir, 'vendor') } + opts1 = module_opts.merge(git: 'git@example.com:puppet/test_module.git') + opts2 = module_opts.merge(git: 'git@example.com:puppet/test_module_c.git') + sanitized_name1 = "git@example.com-puppet-test_module.git" + sanitized_name2 = "git@example.com-puppet-test_module_c.git" + + subject.add_module('puppet/test_module_a', opts1) + subject.add_module('puppet/test_module_b', opts1) + subject.add_module('puppet/test_module_c', opts2) + subject.add_module('puppet/test_module_d', '1.2.3') + + mods_by_cachedir = subject.modules_by_vcs_cachedir + + expect(mods_by_cachedir[:none].length).to be 1 + expect(mods_by_cachedir[sanitized_name1].length).to be 2 + expect(mods_by_cachedir[sanitized_name2].length).to be 1 + end end describe "#purge_exclusions" do @@ -268,7 +287,7 @@ def expect_wrapped_error(orig, pf_path, wrapped_error) mod2 = spy('module') expect(mod2).to receive(:accept).with(visitor) - expect(subject).to receive(:modules).and_return([mod1, mod2]) + expect(subject).to receive(:modules_by_vcs_cachedir).and_return({none: [mod1, mod2]}) subject.accept(visitor) end @@ -289,12 +308,38 @@ def expect_wrapped_error(orig, pf_path, wrapped_error) mod2 = spy('module') expect(mod2).to receive(:accept).with(visitor) - expect(subject).to receive(:modules).and_return([mod1, mod2]) + expect(subject).to receive(:modules_by_vcs_cachedir).and_return({none: [mod1, mod2]}) expect(Thread).to receive(:new).exactly(pool_size).and_call_original expect(Queue).to receive(:new).and_call_original subject.accept(visitor) end + + it "Creates queues of modules grouped by cachedir" do + visitor = spy('visitor') + expect(visitor).to receive(:visit) do |type, other, &block| + expect(type).to eq :puppetfile + expect(other).to eq subject + block.call + end + + mod1 = spy('module1') + mod2 = spy('module2') + mod3 = spy('module3') + mod4 = spy('module4') + mod5 = spy('module5') + mod6 = spy('module6') + + expect(subject).to receive(:modules_by_vcs_cachedir) + .and_return({:none => [mod1, mod2], + "foo-cachedir" => [mod3, mod4], + "bar-cachedir" => [mod5, mod6]}) + + queue = subject.modules_queue(visitor) + expect(queue.length).to be 4 + queue_array = 4.times.map { queue.pop } + expect(queue_array).to match_array([[mod1], [mod2], [mod3, mod4], [mod5, mod6]]) + end end end