Skip to content

Commit

Permalink
[puppetlabs#1058] Serially deploy modules that share a cachedir
Browse files Browse the repository at this point in the history
When we raised the default threadpool for deploying modules above 1, we
began to see issues when caching a remote locally if that remote/cache
was shared by multiple modules within the Puppetfile.

To resolve this, when we create the module we group it with any other
module that may share its cachedir. We then pull modules by cachedir off
of the queue and serially process those.
  • Loading branch information
justinstoller authored and Magisus committed Jul 27, 2020
1 parent 36f8061 commit e27a5d1
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 9 deletions.
4 changes: 1 addition & 3 deletions lib/r10k/git/cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,8 @@ def reset!

alias cached? exist?

private

# Reformat the remote name into something that can be used as a directory
def sanitized_dirname
@remote.gsub(/[^@\w\.-]/, '-')
@sanitized_dirname ||= @remote.gsub(/[^@\w\.-]/, '-')
end
end
4 changes: 4 additions & 0 deletions lib/r10k/git/stateful_repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class R10K::Git::StatefulRepository
# @api private
attr_reader :repo

# @!attribute [r] cache
# @api private
attr_reader :cache

extend Forwardable
def_delegators :@repo, :head, :tracked_paths

Expand Down
33 changes: 31 additions & 2 deletions lib/r10k/puppetfile.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class Puppetfile
# @return [Boolean] Overwrite any locally made changes
attr_accessor :force

# @!attribute [r] modules_by_vcs_cachedir
# @api private Only exposed for testing purposes
# @return [Hash{:none, String => Array<R10K::Module>}]
attr_reader :modules_by_vcs_cachedir

# @param [String] basedir
# @param [String] moduledir The directory to install the modules, default to #{basedir}/modules
# @param [String] puppetfile_path The path to the Puppetfile, default to #{basedir}/Puppetfile
Expand All @@ -58,6 +63,7 @@ def initialize(basedir, moduledir = nil, puppetfile_path = nil, puppetfile_name

@modules = []
@managed_content = {}
@modules_by_vcs_cachedir = {}
@forge = 'forgeapi.puppetlabs.com'

@loaded = false
Expand Down Expand Up @@ -137,6 +143,9 @@ def add_module(name, args)
mod.origin = 'Puppetfile'

@managed_content[install_path] << mod.name
cachedir = module_vcs_cachedir(mod)
@modules_by_vcs_cachedir[cachedir] ||= []
@modules_by_vcs_cachedir[cachedir] << mod
@modules << mod
end

Expand Down Expand Up @@ -180,6 +189,19 @@ def accept(visitor)

private

def module_vcs_cachedir(mod)
if mod.respond_to? :repo
repo = mod.repo
if repo.respond_to? :cache
cache = repo.cache
if cache.respond_to? :sanitized_dirname
return cache.sanitized_dirname
end
end
end
:none
end

def serial_accept(visitor)
visitor.visit(:puppetfile, self) do
modules.each do |mod|
Expand Down Expand Up @@ -212,15 +234,22 @@ def concurrent_accept(visitor, pool_size)
def modules_queue(visitor)
Queue.new.tap do |queue|
visitor.visit(:puppetfile, self) do
modules.each { |mod| queue << mod }
modules_by_cachedir = modules_by_vcs_cachedir.clone
modules_without_vcs_cachedir = modules_by_cachedir.delete(:none) || []

modules_without_vcs_cachedir.each {|mod| queue << Array(mod) }
modules_by_cachedir.values.each {|mods| queue << mods }
end
end
end
public :modules_queue

def visitor_thread(visitor, mods_queue)
Thread.new do
begin
while mod = mods_queue.pop(true) do mod.accept(visitor) end
while mods = mods_queue.pop(true) do
mods.each {|mod| mod.accept(visitor) }
end
rescue ThreadError => e
logger.debug _("Module thread %{id} exiting: %{message}") % {message: e.message, id: Thread.current.object_id}
Thread.exit
Expand Down
4 changes: 2 additions & 2 deletions spec/unit/action/deploy/module_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@

before do
allow(subject).to receive(:visit_environment).and_wrap_original do |original, environment, &block|
expect(environment.puppetfile).to receive(:modules).and_return(
[R10K::Module::Local.new(environment.name, '/fakedir', [], environment)]
expect(environment.puppetfile).to receive(:modules_by_vcs_cachedir).and_return(
{none: [R10K::Module::Local.new(environment.name, '/fakedir', [], environment)]}
)
original.call(environment, &block)
end
Expand Down
1 change: 1 addition & 0 deletions spec/unit/action/puppetfile/install_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def installer(opts = {}, argv = [], settings = {})
before do
allow(puppetfile).to receive(:purge!)
allow(puppetfile).to receive(:modules).and_return(modules)
allow(puppetfile).to receive(:modules_by_vcs_cachedir).and_return({none: modules})
end

it "syncs each module in the Puppetfile" do
Expand Down
49 changes: 47 additions & 2 deletions spec/unit/puppetfile_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,25 @@

expect { subject.add_module('puppet/test_module', module_opts) }.to raise_error(R10K::Error, /cannot manage content.*is not within/i).and not_change { subject.modules }
end

it "groups modules by vcs cache location" do
module_opts = { install_path: File.join(subject.basedir, 'vendor') }
opts1 = module_opts.merge(git: 'git@example.com:puppet/test_module.git')
opts2 = module_opts.merge(git: 'git@example.com:puppet/test_module_c.git')
sanitized_name1 = "git@example.com-puppet-test_module.git"
sanitized_name2 = "git@example.com-puppet-test_module_c.git"

subject.add_module('puppet/test_module_a', opts1)
subject.add_module('puppet/test_module_b', opts1)
subject.add_module('puppet/test_module_c', opts2)
subject.add_module('puppet/test_module_d', '1.2.3')

mods_by_cachedir = subject.modules_by_vcs_cachedir

expect(mods_by_cachedir[:none].length).to be 1
expect(mods_by_cachedir[sanitized_name1].length).to be 2
expect(mods_by_cachedir[sanitized_name2].length).to be 1
end
end

describe "#purge_exclusions" do
Expand Down Expand Up @@ -268,7 +287,7 @@ def expect_wrapped_error(orig, pf_path, wrapped_error)
mod2 = spy('module')
expect(mod2).to receive(:accept).with(visitor)

expect(subject).to receive(:modules).and_return([mod1, mod2])
expect(subject).to receive(:modules_by_vcs_cachedir).and_return({none: [mod1, mod2]})
subject.accept(visitor)
end

Expand All @@ -289,12 +308,38 @@ def expect_wrapped_error(orig, pf_path, wrapped_error)
mod2 = spy('module')
expect(mod2).to receive(:accept).with(visitor)

expect(subject).to receive(:modules).and_return([mod1, mod2])
expect(subject).to receive(:modules_by_vcs_cachedir).and_return({none: [mod1, mod2]})

expect(Thread).to receive(:new).exactly(pool_size).and_call_original
expect(Queue).to receive(:new).and_call_original

subject.accept(visitor)
end

it "Creates queues of modules grouped by cachedir" do
visitor = spy('visitor')
expect(visitor).to receive(:visit) do |type, other, &block|
expect(type).to eq :puppetfile
expect(other).to eq subject
block.call
end

mod1 = spy('module1')
mod2 = spy('module2')
mod3 = spy('module3')
mod4 = spy('module4')
mod5 = spy('module5')
mod6 = spy('module6')

expect(subject).to receive(:modules_by_vcs_cachedir)
.and_return({:none => [mod1, mod2],
"foo-cachedir" => [mod3, mod4],
"bar-cachedir" => [mod5, mod6]})

queue = subject.modules_queue(visitor)
expect(queue.length).to be 4
queue_array = 4.times.map { queue.pop }
expect(queue_array).to match_array([[mod1], [mod2], [mod3, mod4], [mod5, mod6]])
end
end
end

0 comments on commit e27a5d1

Please sign in to comment.