diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb
index ddb00ec7668..2d4203545c9 100644
--- a/logstash-core/lib/logstash/agent.rb
+++ b/logstash-core/lib/logstash/agent.rb
@@ -8,6 +8,7 @@
 require "logstash/config/source_loader"
 require "logstash/pipeline_action"
 require "logstash/state_resolver"
+require "logstash/pipelines_registry"
 require "stud/trap"
 require "uri"
 require "socket"
@@ -19,7 +20,7 @@ class LogStash::Agent
   include LogStash::Util::Loggable
   STARTED_AT = Time.now.freeze
 
-  attr_reader :metric, :name, :settings, :webserver, :dispatcher, :ephemeral_id, :pipelines, :pipeline_bus
+  attr_reader :metric, :name, :settings, :webserver, :dispatcher, :ephemeral_id, :pipeline_bus
   attr_accessor :logger
 
   # initialize method for LogStash::Agent
@@ -40,7 +41,7 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
     # Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
     @pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new
 
-    @pipelines = java.util.concurrent.ConcurrentHashMap.new();
+    @pipelines_registry = LogStash::PipelinesRegistry.new
 
     @name = setting("node.name")
     @http_host = setting("http.host")
@@ -118,14 +119,17 @@ def execute
         converge_state_and_update unless stopped?
       end
     else
-      return 1 if clean_state?
+      # exit with error status if the initial converge_state_and_update did not create any pipeline
+      return 1 if @pipelines_registry.empty?
 
       while !Stud.stop?
-        if clean_state? || running_user_defined_pipelines?
-          sleep(0.5)
-        else
-          break
-        end
+        # exit if all pipelines are terminated and none are reloading
+        break if no_pipeline?
+
+        # exit if there are no user defined pipelines (not system pipeline) and none are reloading
+        break if !running_user_defined_pipelines?
+
+        sleep(0.5)
       end
     end
 
@@ -139,11 +143,11 @@ def auto_reload?
   end
 
   def running?
-    @running.value
+    @running.true?
   end
 
   def stopped?
-    !@running.value
+    @running.false?
   end
 
   def converge_state_and_update
@@ -237,43 +241,48 @@ def id_path
     @id_path ||= ::File.join(settings.get("path.data"), "uuid")
   end
 
+  #
+  # Backward compatibility proxies to the PipelineRegistry
+  #
+
   def get_pipeline(pipeline_id)
-    pipelines.get(pipeline_id)
+    @pipelines_registry.get_pipeline(pipeline_id)
   end
 
   def pipelines_count
-    pipelines.size
+    @pipelines_registry.size
   end
 
   def running_pipelines
-    pipelines.select {|id,pipeline| running_pipeline?(id) }
-  end
+    @pipelines_registry.running_pipelines
+   end
 
   def non_running_pipelines
-    pipelines.select {|id,pipeline| !running_pipeline?(id) }
+    @pipelines_registry.non_running_pipelines
   end
 
   def running_pipelines?
-    running_pipelines_count > 0
+    @pipelines_registry.running_pipelines.any?
   end
 
   def running_pipelines_count
-    running_pipelines.size
+    @pipelines_registry.running_pipelines.size
   end
 
   def running_user_defined_pipelines?
-    !running_user_defined_pipelines.empty?
+    @pipelines_registry.running_user_defined_pipelines.any?
   end
 
   def running_user_defined_pipelines
-    pipelines.select {|id, pipeline| running_pipeline?(id) && !pipeline.system? }
+    @pipelines_registry.running_user_defined_pipelines
   end
 
-  def with_running_user_defined_pipelines
-    yield running_user_defined_pipelines
+  def no_pipeline?
+    @pipelines_registry.running_pipelines.empty?
   end
 
   private
+
   def transition_to_stopped
     @running.make_false
   end
@@ -298,7 +307,7 @@ def converge_state(pipeline_actions)
     converge_result = LogStash::ConvergeResult.new(pipeline_actions.size)
 
     pipeline_actions.map do |action|
-      Thread.new do
+      Thread.new(action, converge_result) do |action, converge_result|
         java.lang.Thread.currentThread().setName("Converge #{action}");
         # We execute every task we need to converge the current state of pipelines
         # for every task we will record the action result, that will help us
@@ -314,34 +323,35 @@ def converge_state(pipeline_actions)
         # that we currently have.
         begin
           logger.debug("Executing action", :action => action)
-          action_result = action.execute(self, pipelines)
+          action_result = action.execute(self, @pipelines_registry)
           converge_result.add(action, action_result)
 
           unless action_result.successful?
-            logger.error("Failed to execute action", :id => action.pipeline_id,
-                         :action_type => action_result.class, :message => action_result.message,
-                         :backtrace => action_result.backtrace)
+            logger.error("Failed to execute action",
+              :id => action.pipeline_id,
+              :action_type => action_result.class,
+              :message => action_result.message,
+              :backtrace => action_result.backtrace
+            )
           end
-        rescue SystemExit => e
-          converge_result.add(action, e)
-        rescue Exception => e
+        rescue SystemExit, Exception => e
           logger.error("Failed to execute action", :action => action, :exception => e.class.name, :message => e.message, :backtrace => e.backtrace)
           converge_result.add(action, e)
         end
       end
     end.each(&:join)
 
-    if logger.trace?
-      logger.trace("Converge results", :success => converge_result.success?,
-                   :failed_actions => converge_result.failed_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}, message: #{r.message}" },
-                   :successful_actions => converge_result.successful_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}" })
-    end
+    logger.trace? && logger.trace("Converge results",
+      :success => converge_result.success?,
+      :failed_actions => converge_result.failed_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}, message: #{r.message}" },
+      :successful_actions => converge_result.successful_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}" }
+    )
 
     converge_result
   end
 
   def resolve_actions(pipeline_configs)
-    @state_resolver.resolve(@pipelines, pipeline_configs)
+    @state_resolver.resolve(@pipelines_registry, pipeline_configs)
   end
 
   def dispatch_events(converge_results)
@@ -399,7 +409,7 @@ def collect_metrics?
   end
 
   def shutdown_pipelines
-    logger.debug("Shutting down all pipelines", :pipelines_count => pipelines_count)
+    logger.debug("Shutting down all pipelines", :pipelines_count => running_pipelines_count)
 
     # In this context I could just call shutdown, but I've decided to
     # use the stop action implementation for that so we have the same code.
@@ -408,16 +418,6 @@ def shutdown_pipelines
     converge_state(pipeline_actions)
   end
 
-  def running_pipeline?(pipeline_id)
-    pipeline = get_pipeline(pipeline_id)
-    return false unless pipeline
-    thread = pipeline.thread
-    thread.is_a?(Thread) && thread.alive?
-  end
-
-  def clean_state?
-    pipelines.empty?
-  end
 
   def setting(key)
     @settings.get(key)
diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/dlq.rb b/logstash-core/lib/logstash/instrument/periodic_poller/dlq.rb
index f40cb5ed837..c2d92e97bd0 100644
--- a/logstash-core/lib/logstash/instrument/periodic_poller/dlq.rb
+++ b/logstash-core/lib/logstash/instrument/periodic_poller/dlq.rb
@@ -10,13 +10,11 @@ def initialize(metric, agent, options = {})
     end
 
     def collect
-      pipelines = @agent.with_running_user_defined_pipelines {|pipelines| pipelines}
-      unless pipelines.nil?
-        pipelines.each {|_, pipeline|
-          unless pipeline.nil?
-            pipeline.collect_dlq_stats
-          end
-        }
+      pipelines = @agent.running_user_defined_pipelines
+      pipelines.each do |_, pipeline|
+        unless pipeline.nil?
+          pipeline.collect_dlq_stats
+        end
       end
     end
   end
diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/pq.rb b/logstash-core/lib/logstash/instrument/periodic_poller/pq.rb
index 068f72a6fc2..0ef82247cf9 100644
--- a/logstash-core/lib/logstash/instrument/periodic_poller/pq.rb
+++ b/logstash-core/lib/logstash/instrument/periodic_poller/pq.rb
@@ -11,14 +11,12 @@ def initialize(metric, queue_type, agent, options = {})
     end
 
     def collect
-      pipelines = @agent.with_running_user_defined_pipelines {|pipelines| pipelines}
-      unless pipelines.nil?
-        pipelines.each {|_, pipeline|
-          unless pipeline.nil?
-            pipeline.collect_stats
-          end
-        }
+      pipelines = @agent.running_user_defined_pipelines
+      pipelines.each do |_, pipeline|
+        unless pipeline.nil?
+          pipeline.collect_stats
+        end
       end
     end
   end
-end; end; end
+end end end
diff --git a/logstash-core/lib/logstash/instrument/periodic_pollers.rb b/logstash-core/lib/logstash/instrument/periodic_pollers.rb
index 3cc4fea72a3..812075b0335 100644
--- a/logstash-core/lib/logstash/instrument/periodic_pollers.rb
+++ b/logstash-core/lib/logstash/instrument/periodic_pollers.rb
@@ -11,12 +11,12 @@ module LogStash module Instrument
   class PeriodicPollers
     attr_reader :metric
 
-    def initialize(metric, queue_type, pipelines)
+    def initialize(metric, queue_type, agent)
       @metric = metric
       @periodic_pollers = [PeriodicPoller::Os.new(metric),
                            PeriodicPoller::JVM.new(metric),
-                           PeriodicPoller::PersistentQueue.new(metric, queue_type, pipelines),
-                           PeriodicPoller::DeadLetterQueue.new(metric, pipelines)]
+                           PeriodicPoller::PersistentQueue.new(metric, queue_type, agent),
+                           PeriodicPoller::DeadLetterQueue.new(metric, agent)]
     end
 
     def start
diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb
index 8f98ef8c08e..d4b1c6f78e1 100644
--- a/logstash-core/lib/logstash/java_pipeline.rb
+++ b/logstash-core/lib/logstash/java_pipeline.rb
@@ -39,9 +39,23 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
     @flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
     @shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
     @outputs_registered = Concurrent::AtomicBoolean.new(false)
+
+    # @finished_execution signals that the pipeline thread has finished its execution
+    # regardless of any exceptions; it will always be true when the thread completes
     @finished_execution = Concurrent::AtomicBoolean.new(false)
+
+    # @finished_run signals that the run methods called in the pipeline thread was completed
+    # without errors and it will NOT be set if the run method exits from an exception; this
+    # is by design and necessary for the wait_until_started semantic
+    @finished_run = Concurrent::AtomicBoolean.new(false)
+
+    @thread = nil
   end # def initialize
 
+  def finished_execution?
+    @finished_execution.true?
+  end
+
   def ready?
     @ready.value
   end
@@ -83,15 +97,18 @@ def start
     @logger.debug("Starting pipeline", default_logging_keys)
 
     @finished_execution.make_false
+    @finished_run.make_false
 
     @thread = Thread.new do
       begin
         LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
         run
-        @finished_execution.make_true
+        @finished_run.make_true
       rescue => e
         close
         logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
+      ensure
+        @finished_execution.make_true
       end
     end
 
@@ -106,15 +123,14 @@ def start
 
   def wait_until_started
     while true do
-      # This should be changed with an appropriate FSM
-      # It's an edge case, if we have a pipeline with
-      # a generator { count => 1 } its possible that `Thread#alive?` doesn't return true
-      # because the execution of the thread was successful and complete
-      if @finished_execution.true?
+      if @finished_run.true?
+        # it completed run without exception
         return true
       elsif thread.nil? || !thread.alive?
+        # some exception occurred and the thread is dead
         return false
       elsif running?
+        # fully initialized and running
         return true
       else
         sleep 0.01
diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb
index b545f41dca9..f70c8063cd7 100644
--- a/logstash-core/lib/logstash/pipeline.rb
+++ b/logstash-core/lib/logstash/pipeline.rb
@@ -107,8 +107,23 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
     @flushing = Concurrent::AtomicReference.new(false)
     @outputs_registered = Concurrent::AtomicBoolean.new(false)
     @worker_shutdown = java.util.concurrent.atomic.AtomicBoolean.new(false)
+
+    # @finished_execution signals that the pipeline thread has finished its execution
+    # regardless of any exceptions; it will always be true when the thread completes
+    @finished_execution = Concurrent::AtomicBoolean.new(false)
+
+    # @finished_run signals that the run methods called in the pipeline thread was completed
+    # without errors and it will NOT be set if the run method exits from an exception; this
+    # is by design and necessary for the wait_until_started semantic
+    @finished_run = Concurrent::AtomicBoolean.new(false)
+
+    @thread = nil
   end # def initialize
 
+  def finished_execution?
+    @finished_execution.true?
+  end
+
   def ready?
     @ready.value
   end
@@ -152,16 +167,19 @@ def start
       "pipeline.batch.size" => settings.get("pipeline.batch.size"),
       "pipeline.batch.delay" => settings.get("pipeline.batch.delay")))
 
-    @finished_execution = Concurrent::AtomicBoolean.new(false)
+    @finished_execution.make_false
+    @finished_run.make_false
 
     @thread = Thread.new do
       begin
         LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
         run
-        @finished_execution.make_true
+        @finished_run.make_true
       rescue => e
         close
         @logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
+      ensure
+        @finished_execution.make_true
       end
     end
 
@@ -176,15 +194,14 @@ def start
 
   def wait_until_started
     while true do
-      # This should be changed with an appropriate FSM
-      # It's an edge case, if we have a pipeline with
-      # a generator { count => 1 } its possible that `Thread#alive?` doesn't return true
-      # because the execution of the thread was successful and complete
-      if @finished_execution.true?
+      if @finished_run.true?
+        # it completed run without exception
         return true
-      elsif !thread.alive?
+      elsif thread.nil? || !thread.alive?
+        # some exception occured and the thread is dead
         return false
       elsif running?
+        # fully initialized and running
         return true
       else
         sleep 0.01
diff --git a/logstash-core/lib/logstash/pipeline_action/base.rb b/logstash-core/lib/logstash/pipeline_action/base.rb
index 81931da258d..364f20ddd4f 100644
--- a/logstash-core/lib/logstash/pipeline_action/base.rb
+++ b/logstash-core/lib/logstash/pipeline_action/base.rb
@@ -12,7 +12,7 @@ def inspect
     end
     alias_method :to_s, :inspect
 
-    def execute(agent, pipelines)
+    def execute(agent, pipelines_registry)
       raise "`#execute` Not implemented!"
     end
 
diff --git a/logstash-core/lib/logstash/pipeline_action/create.rb b/logstash-core/lib/logstash/pipeline_action/create.rb
index b31059b5296..708a9dd3c8b 100644
--- a/logstash-core/lib/logstash/pipeline_action/create.rb
+++ b/logstash-core/lib/logstash/pipeline_action/create.rb
@@ -3,6 +3,7 @@
 require "logstash/pipeline"
 require "logstash/java_pipeline"
 
+
 module LogStash module PipelineAction
   class Create < Base
     include LogStash::Util::Loggable
@@ -30,7 +31,7 @@ def execution_priority
 
     # The execute assume that the thread safety access of the pipeline
     # is managed by the caller.
-    def execute(agent, pipelines)
+    def execute(agent, pipelines_registry)
       new_pipeline =
         if @pipeline_config.settings.get_value("pipeline.java_execution")
           LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
@@ -43,21 +44,13 @@ def execute(agent, pipelines)
           end
         end
 
-      result = nil
-      pipelines.compute(pipeline_id) do |_, current_pipeline|
-        if current_pipeline
-          result = LogStash::ConvergeResult::FailedAction.new("Attempted to create a pipeline that already exists")
-          current_pipeline
-        else
-          result = new_pipeline.start # block until the pipeline is correctly started or crashed
-          result ? new_pipeline : nil
-        end
+      success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do
+        new_pipeline.start # block until the pipeline is correctly started or crashed
       end
 
-      LogStash::ConvergeResult::ActionResult.create(self, result)
+      LogStash::ConvergeResult::ActionResult.create(self, success)
     end
 
-
     def to_s
       "PipelineAction::Create<#{pipeline_id}>"
     end
diff --git a/logstash-core/lib/logstash/pipeline_action/reload.rb b/logstash-core/lib/logstash/pipeline_action/reload.rb
index 381ab8afade..a24f6ad5363 100644
--- a/logstash-core/lib/logstash/pipeline_action/reload.rb
+++ b/logstash-core/lib/logstash/pipeline_action/reload.rb
@@ -20,8 +20,12 @@ def to_s
       "PipelineAction::Reload<#{pipeline_id}>"
     end
 
-    def execute(agent, pipelines)
-      old_pipeline = pipelines[pipeline_id]
+    def execute(agent, pipelines_registry)
+      old_pipeline = pipelines_registry.get_pipeline(pipeline_id)
+
+      if old_pipeline.nil?
+        return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the pipeline does not exist")
+      end
 
       if !old_pipeline.reloadable?
         return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the existing pipeline is not reloadable")
@@ -49,13 +53,35 @@ def execute(agent, pipelines)
 
       logger.info("Reloading pipeline", "pipeline.id" => pipeline_id)
 
-      stop_result = Stop.new(pipeline_id).execute(agent, pipelines)
+      success = pipelines_registry.reload_pipeline(pipeline_id) do
+        # important NOT to explicitly return from block here
+        # the block must emit a success boolean value
 
-      if stop_result.successful?
-        Create.new(@pipeline_config, @metric).execute(agent, pipelines)
-      else
-        stop_result
+        # First shutdown old pipeline
+        old_pipeline.shutdown { LogStash::ShutdownWatcher.start(old_pipeline) }
+        old_pipeline.thread.join
+
+        # Then create a new pipeline
+        new_pipeline =
+          if @pipeline_config.settings.get_value("pipeline.java_execution")
+            LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
+          else
+            agent.exclusive do
+              # The Ruby pipeline initialization is not thread safe because of the module level
+              # shared state in LogsStash::Config::AST. When using multiple pipelines this gets
+              # executed simultaneously in different threads and we need to synchronize this initialization.
+              LogStash::Pipeline.new(@pipeline_config, @metric, agent)
+            end
+          end
+
+        success = new_pipeline.start # block until the pipeline is correctly started or crashed
+
+        # return success and new_pipeline to registry reload_pipeline
+        [success, new_pipeline]
       end
+
+      LogStash::ConvergeResult::ActionResult.create(self, success)
     end
+
   end
 end end
diff --git a/logstash-core/lib/logstash/pipeline_action/stop.rb b/logstash-core/lib/logstash/pipeline_action/stop.rb
index b2c8a5f1555..0bb405629b0 100644
--- a/logstash-core/lib/logstash/pipeline_action/stop.rb
+++ b/logstash-core/lib/logstash/pipeline_action/stop.rb
@@ -9,11 +9,10 @@ def initialize(pipeline_id)
       @pipeline_id = pipeline_id
     end
 
-    def execute(agent, pipelines)
-      pipelines.compute(pipeline_id) do |_, pipeline|
+    def execute(agent, pipelines_registry)
+      pipelines_registry.terminate_pipeline(pipeline_id) do |pipeline|
         pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
         pipeline.thread.join
-        nil # remove pipeline from pipelines
       end
 
       LogStash::ConvergeResult::SuccessfulAction.new
diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb
new file mode 100644
index 00000000000..15358db925e
--- /dev/null
+++ b/logstash-core/lib/logstash/pipelines_registry.rb
@@ -0,0 +1,166 @@
+# encoding: utf-8
+
+module LogStash
+  class PipelineState
+    attr_reader :pipeline_id, :pipeline
+
+    def initialize(pipeline_id, pipeline)
+      @pipeline_id = pipeline_id
+      @pipeline = pipeline
+      @reloading = Concurrent::AtomicBoolean.new(false)
+    end
+
+    def terminated?
+      # a reloading pipeline is never considered terminated
+      @reloading.false? && @pipeline.finished_execution?
+    end
+
+    def set_reloading(is_reloading)
+      @reloading.value = is_reloading
+    end
+
+    def set_pipeline(pipeline)
+      raise(ArgumentError, "invalid nil pipeline") if pipeline.nil?
+      @pipeline = pipeline
+    end
+  end
+
+  class PipelinesRegistry
+    attr_reader :states
+    include LogStash::Util::Loggable
+
+    def initialize
+      # we leverage the semantic of the Java ConcurrentHashMap for the
+      # compute() method which is atomic; calling compute() concurrently
+      # will block until the other compute finishes so no mutex is necessary
+      # for synchronizing compute calls
+      @states = java.util.concurrent.ConcurrentHashMap.new
+    end
+
+    # Execute the passed creation logic block and create a new state upon success
+    # @param pipeline_id [String, Symbol] the pipeline id
+    # @param pipeline [Pipeline] the new pipeline to create
+    # @param create_block [Block] the creation execution logic
+    #
+    # @yieldreturn [Boolean] the new pipeline creation success
+    #
+    # @return [Boolean] new pipeline creation success
+    def create_pipeline(pipeline_id, pipeline, &create_block)
+      success = false
+
+      @states.compute(pipeline_id) do |_, state|
+        if state
+          if state.terminated?
+            success = yield
+            state.set_pipeline(pipeline)
+          else
+            logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
+          end
+          state
+        else
+          success = yield
+          success ? PipelineState.new(pipeline_id, pipeline) : nil
+        end
+      end
+
+      success
+    end
+
+    # Execute the passed termination logic block
+    # @param pipeline_id [String, Symbol] the pipeline id
+    # @param stop_block [Block] the termination execution logic
+    #
+    # @yieldparam [Pipeline] the pipeline to terminate
+    def terminate_pipeline(pipeline_id, &stop_block)
+      @states.compute(pipeline_id) do |_, state|
+        if state.nil?
+          logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id)
+          nil
+        else
+          yield(state.pipeline)
+          state
+        end
+      end
+    end
+
+    # Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state
+    # @param pipeline_id [String, Symbol] the pipeline id
+    # @param reload_block [Block] the reloading execution logic
+    #
+    # @yieldreturn [Array<Boolean, Pipeline>] the new pipeline creation success and new pipeline object
+    #
+    # @return [Boolean] new pipeline creation success
+    def reload_pipeline(pipeline_id, &reload_block)
+      success = false
+
+      @states.compute(pipeline_id) do |_, state|
+        if state.nil?
+          logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id)
+          nil
+        else
+          state.set_reloading(true)
+          begin
+            success, new_pipeline = yield
+            state.set_pipeline(new_pipeline)
+          ensure
+            state.set_reloading(false)
+          end
+          state
+        end
+      end
+
+      success
+    end
+
+    # @param pipeline_id [String, Symbol] the pipeline id
+    # @return [Pipeline] the pipeline object or nil if none for pipeline_id
+    def get_pipeline(pipeline_id)
+      state = @states.get(pipeline_id)
+      state.nil? ? nil : state.pipeline
+    end
+
+    # @return [Fixnum] number of items in the states collection
+    def size
+      @states.size
+    end
+
+    # @return [Boolean] true if the states collection is empty.
+    def empty?
+      @states.isEmpty
+    end
+
+    # @return [Hash{String=>Pipeline}]
+    def running_pipelines
+      select_pipelines { |state| !state.terminated? }
+    end
+
+    # @return [Hash{String=>Pipeline}]
+    def non_running_pipelines
+      select_pipelines { |state| state.terminated? }
+    end
+
+    # @return [Hash{String=>Pipeline}]
+    def running_user_defined_pipelines
+      select_pipelines { |state | !state.terminated? && !state.pipeline.system? }
+    end
+
+    private
+
+    # Returns a mapping of pipelines by their ids.
+    # Pipelines can optionally be filtered by their `PipelineState` by passing
+    # a block that returns truthy when a pipeline should be included in the
+    # result.
+    #
+    # @yieldparam [PipelineState]
+    # @yieldreturn [Boolean]
+    #
+    # @return [Hash{String=>Pipeline}]
+    def select_pipelines(&optional_state_filter)
+      @states.each_with_object({}) do |(id, state), memo|
+        if state && (!block_given? || yield(state))
+          memo[id] = state.pipeline
+        end
+      end
+    end
+  end
+end
diff --git a/logstash-core/lib/logstash/state_resolver.rb b/logstash-core/lib/logstash/state_resolver.rb
index de4c5243620..ec32d23bae3 100644
--- a/logstash-core/lib/logstash/state_resolver.rb
+++ b/logstash-core/lib/logstash/state_resolver.rb
@@ -10,11 +10,11 @@ def initialize(metric)
       @metric = metric
     end
 
-    def resolve(pipelines, pipeline_configs)
+    def resolve(pipelines_registry, pipeline_configs)
       actions = []
 
       pipeline_configs.each do |pipeline_config|
-        pipeline = pipelines[pipeline_config.pipeline_id]
+        pipeline = pipelines_registry.get_pipeline(pipeline_config.pipeline_id)
 
         if pipeline.nil?
           actions << LogStash::PipelineAction::Create.new(pipeline_config, @metric)
@@ -25,12 +25,12 @@ def resolve(pipelines, pipeline_configs)
         end
       end
 
-      running_pipelines = pipeline_configs.collect(&:pipeline_id)
+      configured_pipelines = pipeline_configs.collect(&:pipeline_id)
 
       # If one of the running pipeline is not in the pipeline_configs, we assume that we need to
       # stop it.
-      pipelines.keys
-        .select { |pipeline_id| !running_pipelines.include?(pipeline_id) }
+      pipelines_registry.running_pipelines.keys
+        .select { |pipeline_id| !configured_pipelines.include?(pipeline_id) }
         .each { |pipeline_id| actions << LogStash::PipelineAction::Stop.new(pipeline_id) }
 
       actions.sort # See logstash/pipeline_action.rb
diff --git a/logstash-core/spec/logstash/agent/converge_spec.rb b/logstash-core/spec/logstash/agent/converge_spec.rb
index c1b6b56ffba..0e0f8941f17 100644
--- a/logstash-core/spec/logstash/agent/converge_spec.rb
+++ b/logstash-core/spec/logstash/agent/converge_spec.rb
@@ -49,7 +49,7 @@
 
     context "system pipeline" do
       
-      let(:system_pipeline_config) { mock_pipeline_config(:system_pipeline, "input { generator { } } output { null {} }", { "pipeline.system" => true }) }
+      let(:system_pipeline_config) { mock_pipeline_config(:system_pipeline, "input { dummyblockinginput { } } output { null {} }", { "pipeline.system" => true }) }
 
       context "when we have a finite pipeline and a system pipeline running" do
 
@@ -65,40 +65,40 @@
       end
 
       context "when we have an infinite pipeline and a system pipeline running" do
-        let(:infinite_pipeline_config) { mock_pipeline_config(:main, "input { generator { } } output { null {} }") }
+        let(:infinite_pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { } } output { null {} }") }
 
         let(:source_loader) do
           TestSourceLoader.new(infinite_pipeline_config, system_pipeline_config)
         end
 
         before(:each) do
-            @agent_task = start_agent(subject)
+          @agent_task = start_agent(subject)
         end
 
         after(:each) do
-            @agent_task.stop!
+          @agent_task.stop!
+          @agent_task.wait
+          subject.shutdown
         end
 
         describe "#running_user_defined_pipelines" do
           it "returns the user defined pipelines" do
-            wait_for do
-              subject.with_running_user_defined_pipelines {|pipelines| pipelines.keys }
-            end.to eq([:main])
-          end
+            # wait is necessary to accommodate for pipelines startup time
+            wait(60).for {subject.running_user_defined_pipelines.keys}.to eq([:main])
+           end
         end
 
         describe "#running_user_defined_pipelines?" do
           it "returns true" do
-            wait_for do
-              subject.running_user_defined_pipelines?
-            end.to be_truthy
+            # wait is necessary to accommodate for pipelines startup time
+            wait(60).for {subject.running_user_defined_pipelines?}.to be_truthy
           end
         end
       end
     end
 
     context "when `config.reload.automatic`" do
-      let(:pipeline_config) { mock_pipeline_config(:main, "input { generator {} } output { null {} }") }
+      let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput {} } output { null {} }") }
 
       let(:source_loader) do
         TestSourceLoader.new(pipeline_config)
@@ -114,14 +114,14 @@
 
           after(:each) do
             @agent_task.stop!
+            @agent_task.wait
+            subject.shutdown
           end
 
           it "converge only once" do
             wait(60).for { source_loader.fetch_count }.to eq(1)
-
+            # no need to wait here because have_running_pipeline? does the wait
             expect(subject).to have_running_pipeline?(pipeline_config)
-
-            subject.shutdown
           end
         end
 
@@ -135,8 +135,6 @@
 
             expect(source_loader.fetch_count).to eq(1)
             expect(subject.pipelines_count).to eq(0)
-
-            subject.shutdown
           end
         end
       end
@@ -149,26 +147,25 @@
             "config.reload.interval" =>  interval
           )
         end
+
         before(:each) do
           @agent_task = start_agent(subject)
         end
 
         after(:each) do
           @agent_task.stop!
+          @agent_task.wait
+          subject.shutdown
         end
 
         context "and successfully load the config" do
           it "converges periodically the pipelines from the configs source" do
-            sleep(2) # let the interval reload a few times
+            # no need to wait here because have_running_pipeline? does the wait
             expect(subject).to have_running_pipeline?(pipeline_config)
 
             # we rely on a periodic thread to call fetch count, we have seen unreliable run on
             # travis, so lets add a few retries
-            try do
-              expect(source_loader.fetch_count).to be > 1
-            end
-
-            subject.shutdown
+            try { expect(source_loader.fetch_count).to be > 1 }
           end
         end
 
@@ -178,12 +175,9 @@
           end
 
           it "it will keep trying to converge" do
-
             sleep(agent_settings.get("config.reload.interval") / 1_000_000_000.0 * 20) # let the interval reload a few times
             expect(subject.pipelines_count).to eq(0)
             expect(source_loader.fetch_count).to be > 1
-
-            subject.shutdown
           end
         end
       end
@@ -191,8 +185,8 @@
   end
 
   context "when shutting down the agent" do
-    let(:pipeline_config) { mock_pipeline_config(:main, "input { generator {} } output { null {} }") }
-    let(:new_pipeline_config) { mock_pipeline_config(:new, "input { generator { id => 'new' } } output { null {} }") }
+    let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput {} } output { null {} }") }
+    let(:new_pipeline_config) { mock_pipeline_config(:new, "input { dummyblockinginput { id => 'new' } } output { null {} }") }
 
     let(:source_loader) do
       TestSourceLoader.new([pipeline_config, new_pipeline_config])
@@ -205,8 +199,8 @@
   end
 
   context "Configuration converge scenario" do
-    let(:pipeline_config) { mock_pipeline_config(:main, "input { generator {} } output { null {} }", { "pipeline.reloadable" => true }) }
-    let(:new_pipeline_config) { mock_pipeline_config(:new, "input { generator {} } output { null {} }", { "pipeline.reloadable" => true }) }
+    let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput {} } output { null {} }", { "pipeline.reloadable" => true }) }
+    let(:new_pipeline_config) { mock_pipeline_config(:new, "input { dummyblockinginput {} } output { null {} }", { "pipeline.reloadable" => true }) }
 
     before do
       # Set the Agent to an initial state of pipelines
@@ -263,7 +257,7 @@
     end
 
     context "when the source return a modified pipeline" do
-      let(:modified_pipeline_config) { mock_pipeline_config(:main, "input { generator { id => 'new-and-modified' } } output { null {} }", { "pipeline.reloadable" => true }) }
+      let(:modified_pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => 'new-and-modified' } } output { null {} }", { "pipeline.reloadable" => true }) }
 
       let(:source_loader) do
         TestSequenceSourceLoader.new(
diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb
index 907851240e7..6d1889f6d19 100644
--- a/logstash-core/spec/logstash/agent_spec.rb
+++ b/logstash-core/spec/logstash/agent_spec.rb
@@ -118,7 +118,7 @@
         context "if state is clean" do
           before :each do
             allow(subject).to receive(:running_user_defined_pipelines?).and_return(true)
-            allow(subject).to receive(:clean_state?).and_return(false)
+            allow(subject).to receive(:no_pipeline?).and_return(false)
           end
 
           it "should not converge state more than once" do
@@ -141,7 +141,7 @@
             it "does not upgrade the new config" do
               t = Thread.new { subject.execute }
               wait(timeout)
-                  .for { subject.running_pipelines? && subject.pipelines.values.first.ready? }
+                  .for { subject.running_pipelines? && subject.running_pipelines.values.first.ready? }
                   .to eq(true)
               expect(subject.converge_state_and_update).not_to be_a_successful_converge
               expect(subject).to have_running_pipeline?(mock_config_pipeline)
@@ -161,7 +161,7 @@
             it "does upgrade the new config" do
               t = Thread.new { subject.execute }
               Timeout.timeout(timeout) do
-                sleep(0.1) until subject.pipelines_count > 0 && subject.pipelines.values.first.ready?
+                sleep(0.1) until subject.running_pipelines_count > 0 && subject.running_pipelines.values.first.ready?
               end
 
               expect(subject.converge_state_and_update).to be_a_successful_converge
@@ -185,7 +185,7 @@
             it "does not try to reload the pipeline" do
               t = Thread.new { subject.execute }
               Timeout.timeout(timeout) do
-                sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.running?
+                sleep(0.1) until subject.running_pipelines? && subject.running_pipelines.values.first.running?
               end
               expect(subject.converge_state_and_update).not_to be_a_successful_converge
               expect(subject).to have_running_pipeline?(mock_config_pipeline)
@@ -205,7 +205,7 @@
             it "tries to reload the pipeline" do
               t = Thread.new { subject.execute }
               Timeout.timeout(timeout) do
-                sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.running?
+                sleep(0.1) until subject.running_pipelines? && subject.running_pipelines.values.first.running?
               end
 
               expect(subject.converge_state_and_update).to be_a_successful_converge
diff --git a/logstash-core/spec/logstash/pipeline_action/create_spec.rb b/logstash-core/spec/logstash/pipeline_action/create_spec.rb
index 33eba97a702..147900fcba3 100644
--- a/logstash-core/spec/logstash/pipeline_action/create_spec.rb
+++ b/logstash-core/spec/logstash/pipeline_action/create_spec.rb
@@ -2,13 +2,14 @@
 require "spec_helper"
 require_relative "../../support/helpers"
 require_relative "../../support/matchers"
+require "logstash/pipelines_registry"
 require "logstash/pipeline_action/create"
 require "logstash/inputs/generator"
 
 describe LogStash::PipelineAction::Create do
   let(:metric) { LogStash::Instrument::NullMetric.new(LogStash::Instrument::Collector.new) }
-  let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }") }
-  let(:pipelines) { java.util.concurrent.ConcurrentHashMap.new }
+  let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => '123' } } output { null {} }") }
+  let(:pipelines) { LogStash::PipelinesRegistry.new }
   let(:agent) { double("agent") }
 
   before do
@@ -18,7 +19,7 @@
   subject { described_class.new(pipeline_config, metric) }
 
   after do
-    pipelines.each do |_, pipeline|
+    pipelines.running_pipelines do |_, pipeline|
       pipeline.shutdown
       pipeline.thread.join
     end
@@ -47,7 +48,7 @@
     it "starts the pipeline" do
       allow(agent).to receive(:exclusive) { |&arg| arg.call }
       subject.execute(agent, pipelines)
-      expect(pipelines[:main].running?).to be_truthy
+      expect(pipelines.get_pipeline(:main).running?).to be_truthy
     end
 
     it "returns a successful execution status" do
@@ -58,7 +59,7 @@
 
   context  "when the pipeline doesn't start" do
     context "with a syntax error" do
-      let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } output { stdout ") } # bad syntax
+      let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => '123' } } output { stdout ") } # bad syntax
 
       it "raises the exception upstream" do
         expect { subject.execute(agent, pipelines) }.to raise_error
@@ -66,7 +67,7 @@
     end
 
     context "with an error raised during `#register`" do
-      let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } filter { ruby { init => '1/0' code => '1+2' } } output { null {} }") }
+      let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => '123' } } filter { ruby { init => '1/0' code => '1+2' } } output { null {} }") }
 
       it "returns false" do
         allow(agent).to receive(:exclusive) { |&arg| arg.call }
@@ -76,8 +77,8 @@
   end
 
   context "when sorting create action" do
-    let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }") }
-    let(:system_pipeline_config) { mock_pipeline_config(:main_2, "input { generator { id => '123' } } output { null {} }", { "pipeline.system" => true }) }
+    let(:pipeline_config) { mock_pipeline_config(:main, "input { dummyblockinginput { id => '123' } } output { null {} }") }
+    let(:system_pipeline_config) { mock_pipeline_config(:main_2, "input { dummyblockinginput { id => '123' } } output { null {} }", { "pipeline.system" => true }) }
 
     it "should give higher priority to system pipeline" do
       action_user_pipeline = described_class.new(pipeline_config, metric)
diff --git a/logstash-core/spec/logstash/pipeline_action/reload_spec.rb b/logstash-core/spec/logstash/pipeline_action/reload_spec.rb
index 23b55796756..041f3951856 100644
--- a/logstash-core/spec/logstash/pipeline_action/reload_spec.rb
+++ b/logstash-core/spec/logstash/pipeline_action/reload_spec.rb
@@ -2,15 +2,16 @@
 require "spec_helper"
 require_relative "../../support/helpers"
 require_relative "../../support/matchers"
+require "logstash/pipelines_registry"
 require "logstash/pipeline_action/reload"
 
 describe LogStash::PipelineAction::Reload do
   let(:metric) { LogStash::Instrument::NullMetric.new(LogStash::Instrument::Collector.new) }
   let(:pipeline_id) { :main }
-  let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { generator { id => 'new' } } output { null {} }", { "pipeline.reloadable" => true}) }
-  let(:pipeline_config) { "input { generator {} } output { null {} }" }
+  let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { dummyblockinginput { id => 'new' } } output { null {} }", { "pipeline.reloadable" => true}) }
+  let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" }
   let(:pipeline) { mock_pipeline_from_string(pipeline_config, mock_settings("pipeline.reloadable" => true)) }
-  let(:pipelines) { chm = java.util.concurrent.ConcurrentHashMap.new; chm[pipeline_id] = pipeline; chm }
+  let(:pipelines) { r = LogStash::PipelinesRegistry.new; r.create_pipeline(pipeline_id, pipeline) { true }; r }
   let(:agent) { double("agent") }
 
   subject { described_class.new(new_pipeline_config, metric) }
@@ -21,7 +22,7 @@
   end
 
   after do
-    pipelines.each do |_, pipeline|
+    pipelines.running_pipelines do |_, pipeline|
       pipeline.shutdown
       pipeline.thread.join
     end
@@ -40,13 +41,13 @@
     it "start the new pipeline" do
       allow(agent).to receive(:exclusive) { |&arg| arg.call }
       subject.execute(agent, pipelines)
-      expect(pipelines[pipeline_id].running?).to be_truthy
+      expect(pipelines.get_pipeline(pipeline_id).running?).to be_truthy
     end
 
     it "run the new pipeline code" do
       allow(agent).to receive(:exclusive) { |&arg| arg.call }
       subject.execute(agent, pipelines)
-      expect(pipelines[pipeline_id].config_hash).to eq(new_pipeline_config.config_hash)
+      expect(pipelines.get_pipeline(pipeline_id).config_hash).to eq(new_pipeline_config.config_hash)
     end
   end
 
@@ -61,7 +62,7 @@
   end
 
   context "when the new pipeline is not reloadable" do
-    let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { generator { id => 'new' } } output { null {} }", { "pipeline.reloadable" => false}) }
+    let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { dummyblockinginput { id => 'new' } } output { null {} }", { "pipeline.reloadable" => false}) }
 
     it "cannot successfully execute the action" do
       allow(agent).to receive(:exclusive) { |&arg| arg.call }
@@ -70,7 +71,7 @@
   end
 
   context "when the new pipeline has syntax errors" do
-    let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input generator { id => 'new' } } output { null {} }", { "pipeline.reloadable" => false}) }
+    let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input dummyblockinginput { id => 'new' } } output { null {} }", { "pipeline.reloadable" => false}) }
 
     it "cannot successfully execute the action" do
       allow(agent).to receive(:exclusive) { |&arg| arg.call }
@@ -80,7 +81,7 @@
 
   context "when there is an error in the register" do
     before do
-      allow_any_instance_of(LogStash::Inputs::Generator).to receive(:register).and_raise("Bad value")
+      allow_any_instance_of(LogStash::Inputs::DummyBlockingInput).to receive(:register).and_raise("Bad value")
     end
 
     it "cannot successfully execute the action" do
diff --git a/logstash-core/spec/logstash/pipeline_action/stop_spec.rb b/logstash-core/spec/logstash/pipeline_action/stop_spec.rb
index 57963286671..75648c50bf1 100644
--- a/logstash-core/spec/logstash/pipeline_action/stop_spec.rb
+++ b/logstash-core/spec/logstash/pipeline_action/stop_spec.rb
@@ -1,14 +1,15 @@
 # encoding: utf-8
 require "spec_helper"
 require_relative "../../support/helpers"
+require "logstash/pipelines_registry"
 require "logstash/pipeline_action/stop"
 require "logstash/pipeline"
 
 describe LogStash::PipelineAction::Stop do
-  let(:pipeline_config) { "input { generator {} } output { null {} }" }
+  let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" }
   let(:pipeline_id) { :main }
   let(:pipeline) { mock_pipeline_from_string(pipeline_config) }
-  let(:pipelines) { chm = java.util.concurrent.ConcurrentHashMap.new; chm[:main] = pipeline; chm }
+  let(:pipelines) { chm = LogStash::PipelinesRegistry.new; chm.create_pipeline(pipeline_id, pipeline) { true }; chm }
   let(:agent) { double("agent") }
 
   subject { described_class.new(pipeline_id) }
@@ -31,6 +32,6 @@
   end
 
   it "removes the pipeline from the running pipelines" do
-    expect { subject.execute(agent, pipelines) }.to change { pipelines.include?(pipeline_id) }.from(true).to(false)
+    expect { subject.execute(agent, pipelines) }.to change { pipelines.running_pipelines.keys }.from([:main]).to([])
   end
 end
diff --git a/logstash-core/spec/logstash/pipelines_registry_spec.rb b/logstash-core/spec/logstash/pipelines_registry_spec.rb
new file mode 100644
index 00000000000..eca53483b9e
--- /dev/null
+++ b/logstash-core/spec/logstash/pipelines_registry_spec.rb
@@ -0,0 +1,220 @@
+# encoding: utf-8
+require "spec_helper"
+require "logstash/pipelines_registry"
+
+describe LogStash::PipelinesRegistry do
+
+  let(:pipeline_id) { "test" }
+  let(:pipeline) { double("Pipeline") }
+  let (:logger) { double("Logger") }
+
+  context "at object creation" do
+    it "should be empty" do
+      expect(subject.size).to eq(0)
+      expect(subject.empty?).to be_truthy
+      expect(subject.running_pipelines).to be_empty
+      expect(subject.non_running_pipelines).to be_empty
+      expect(subject.running_user_defined_pipelines).to be_empty
+    end
+  end
+
+  context "creating a pipeline" do
+    context "without existing same pipeline id" do
+      it "registry should not have a state for pipeline_id" do
+        expect(subject.get_pipeline(pipeline_id)).to be_nil
+      end
+
+      it "should return block return value" do
+        expect(subject.create_pipeline(pipeline_id, pipeline) { "dummy" }).to eq("dummy")
+      end
+
+      it "should register the new pipeline upon successful create block" do
+        subject.create_pipeline(pipeline_id, pipeline) { true }
+        expect(subject.get_pipeline(pipeline_id)).to eq(pipeline)
+      end
+
+      it "should not register the new pipeline upon unsuccessful create block" do
+        subject.create_pipeline(pipeline_id, pipeline) { false }
+        expect(subject.get_pipeline(pipeline_id)).to be_nil
+      end
+    end
+
+    context "with existing pipeline id" do
+      before :each do
+        subject.create_pipeline(pipeline_id, pipeline) { true }
+      end
+
+      it "registry should have a state for pipeline_id" do
+        expect(subject.get_pipeline(pipeline_id)).to eq(pipeline)
+      end
+
+      context "when existing pipeline is not terminated" do
+        before :each do
+          expect(pipeline).to receive(:finished_execution?).and_return(false)
+        end
+
+        it "should return false" do
+          expect(subject.create_pipeline(pipeline_id, pipeline) { "dummy" }).to be_falsey
+        end
+
+        it "should not call block and log error if pipeline is not terminated" do
+          expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
+          expect(logger).to receive(:error)
+          expect { |b| subject.create_pipeline(pipeline_id, pipeline, &b) }.not_to yield_control
+        end
+      end
+
+      context "when existing pipeline is terminated" do
+        let (:new_pipeline) { double("New Pipeline") }
+
+        before :each do
+          expect(pipeline).to receive(:finished_execution?).and_return(true)
+        end
+
+        it "should return block value" do
+          expect(subject.create_pipeline(pipeline_id, new_pipeline) { "dummy" }).to eq("dummy")
+        end
+
+        it "should return block value" do
+          expect(subject.create_pipeline(pipeline_id, new_pipeline) { "dummy" }).to eq("dummy")
+        end
+
+        it "should register new pipeline" do
+          subject.create_pipeline(pipeline_id, new_pipeline) { true }
+          expect(subject.get_pipeline(pipeline_id)).to eq(new_pipeline)
+        end
+      end
+    end
+  end
+
+  context "terminating a pipeline" do
+    context "without existing pipeline id" do
+      it "should log error" do
+        expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
+        expect(logger).to receive(:error)
+        subject.terminate_pipeline(pipeline_id) { "dummy" }
+      end
+
+      it "should not yield to block" do
+        expect { |b| subject.terminate_pipeline(pipeline_id, &b) }.not_to yield_control
+      end
+    end
+
+    context "with existing pipeline id" do
+      before :each do
+        subject.create_pipeline(pipeline_id, pipeline) { true }
+      end
+
+      it "should yield to block" do
+        expect { |b| subject.terminate_pipeline(pipeline_id, &b) }.to yield_control
+      end
+
+      it "should keep pipeline id" do
+        subject.terminate_pipeline(pipeline_id) { "dummy" }
+        expect(subject.get_pipeline(pipeline_id)).to eq(pipeline)
+      end
+    end
+  end
+
+  context "reloading a pipeline" do
+    it "should log error with inexistent pipeline id" do
+      expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger)
+      expect(logger).to receive(:error)
+      subject.reload_pipeline(pipeline_id) { }
+    end
+
+    context "with existing pipeline id" do
+      before :each do
+        subject.create_pipeline(pipeline_id, pipeline) { true }
+      end
+
+      it "should return block value" do
+        expect(subject.reload_pipeline(pipeline_id) { ["dummy", pipeline] }).to eq("dummy")
+      end
+
+      it "should not be terminated while reloading" do
+        expect(pipeline).to receive(:finished_execution?).and_return(false, true, true)
+
+        # 1st call: finished_execution? is false
+        expect(subject.running_pipelines).not_to be_empty
+
+        # 2nd call: finished_execution? is true
+        expect(subject.running_pipelines).to be_empty
+
+
+        queue = Queue.new # threadsafe queue
+        in_block = Concurrent::AtomicBoolean.new(false)
+
+        thread = Thread.new(subject, pipeline_id, pipeline, queue, in_block) do |subject, pipeline_id, pipeline, queue, in_block|
+          subject.reload_pipeline(pipeline_id) do
+            in_block.make_true
+            queue.pop
+            [true, pipeline]
+          end
+        end
+
+        # make sure we entered the block executioin
+        wait(10).for {in_block.true?}.to be_truthy
+
+        # at this point the thread is suspended waiting on queue
+
+        # since in reloading state, running_pipelines is not empty
+        expect(subject.running_pipelines).not_to be_empty
+
+        # unblock thread
+        queue.push(:dummy)
+        thread.join
+
+        # 3rd call: finished_execution? is true
+        expect(subject.running_pipelines).to be_empty
+      end
+    end
+  end
+
+  context "pipelines collections" do
+    context "with a non terminated pipelines" do
+      before :each do
+        subject.create_pipeline(pipeline_id, pipeline) { true }
+        expect(pipeline).to receive(:finished_execution?).and_return(false)
+      end
+
+      it "should find running pipelines" do
+        expect(subject.running_pipelines).not_to be_empty
+      end
+
+      it "should not find non_running pipelines" do
+        expect(subject.non_running_pipelines).to be_empty
+      end
+
+      it "should find running_user_defined_pipelines" do
+        expect(pipeline).to receive(:system?).and_return(false)
+        expect(subject.running_user_defined_pipelines).not_to be_empty
+      end
+
+      it "should not find running_user_defined_pipelines" do
+        expect(pipeline).to receive(:system?).and_return(true)
+        expect(subject.running_user_defined_pipelines).to be_empty
+      end
+    end
+
+    context "with a terminated pipelines" do
+      before :each do
+        subject.create_pipeline(pipeline_id, pipeline) { true }
+        expect(pipeline).to receive(:finished_execution?).and_return(true)
+      end
+
+      it "should not find running pipelines" do
+        expect(subject.running_pipelines).to be_empty
+      end
+
+      it "should find non_running pipelines" do
+        expect(subject.non_running_pipelines).not_to be_empty
+      end
+
+      it "should not find running_user_defined_pipelines" do
+        expect(subject.running_user_defined_pipelines).to be_empty
+      end
+    end
+
+  end
+end
diff --git a/logstash-core/spec/logstash/state_resolver_spec.rb b/logstash-core/spec/logstash/state_resolver_spec.rb
index 430b7b40b1a..28abd899952 100644
--- a/logstash-core/spec/logstash/state_resolver_spec.rb
+++ b/logstash-core/spec/logstash/state_resolver_spec.rb
@@ -18,17 +18,17 @@
 
   after do
     # ensure that the the created pipeline are closed
-    running_pipelines.each { |_, pipeline| pipeline.close }
+    pipelines.running_pipelines.each { |_, pipeline| pipeline.close }
   end
 
   context "when no pipeline is running" do
-    let(:running_pipelines) { {} }
+    let(:pipelines) {  LogStash::PipelinesRegistry.new }
 
     context "no pipeline configs is received" do
       let(:pipeline_configs) { [] }
 
       it "returns no action" do
-        expect(subject.resolve(running_pipelines, pipeline_configs).size).to eq(0)
+        expect(subject.resolve(pipelines, pipeline_configs).size).to eq(0)
       end
     end
 
@@ -36,7 +36,7 @@
       let(:pipeline_configs) { [mock_pipeline_config(:hello_world)] }
 
       it "returns some actions" do
-        expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
+        expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
           [:create, :hello_world],
         )
       end
@@ -47,13 +47,17 @@
     context "when a pipeline is running" do
       let(:main_pipeline) { mock_pipeline(:main) }
       let(:main_pipeline_config) { main_pipeline.pipeline_config }
-      let(:running_pipelines) { { :main => main_pipeline } }
+      let(:pipelines) do
+        r =  LogStash::PipelinesRegistry.new
+        r.create_pipeline(:main, main_pipeline) { true }
+        r
+      end
 
       context "when the pipeline config contains a new one and the existing" do
         let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] }
 
         it "creates the new one and keep the other one" do
-          expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
+          expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
             [:create, :hello_world],
           )
         end
@@ -62,7 +66,7 @@
           let(:pipeline_configs) { [mock_pipeline_config(:hello_world)] }
 
           it "creates the new one and stop the old one one" do
-            expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
+            expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
               [:create, :hello_world],
               [:stop, :main]
             )
@@ -73,7 +77,7 @@
           let(:pipeline_configs) { [] }
 
           it "stops the old one one" do
-            expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
+            expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
               [:stop, :main]
             )
           end
@@ -83,7 +87,7 @@
           let(:pipeline_configs) { [mock_pipeline_config(:main, "input { generator {}}")] }
 
           it "reloads the old one one" do
-            expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
+            expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
               [:reload, :main]
             )
           end
@@ -92,21 +96,21 @@
     end
 
     context "when we have a lot of pipeline running" do
-      let(:running_pipelines) do
-        {
-          :main1 => mock_pipeline(:main1),
-          :main2 => mock_pipeline(:main2),
-          :main3 => mock_pipeline(:main3),
-          :main4 => mock_pipeline(:main4),
-          :main5 => mock_pipeline(:main5),
-          :main6 => mock_pipeline(:main6),
-        }
+      let(:pipelines) do
+        r =  LogStash::PipelinesRegistry.new
+        r.create_pipeline(:main1, mock_pipeline(:main1)) { true }
+        r.create_pipeline(:main2, mock_pipeline(:main2)) { true }
+        r.create_pipeline(:main3, mock_pipeline(:main3)) { true }
+        r.create_pipeline(:main4, mock_pipeline(:main4)) { true }
+        r.create_pipeline(:main5, mock_pipeline(:main5)) { true }
+        r.create_pipeline(:main6, mock_pipeline(:main6)) { true }
+        r
       end
 
       context "without system pipeline" do
         let(:pipeline_configs) do
           [
-            running_pipelines[:main1].pipeline_config,
+            pipelines.get_pipeline(:main1).pipeline_config,
             mock_pipeline_config(:main9),
             mock_pipeline_config(:main5, "input { generator {}}"),
             mock_pipeline_config(:main3, "input { generator {}}"),
@@ -115,7 +119,7 @@
         end
 
         it "generates actions required to converge" do
-          expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
+          expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
             [:create, :main7],
             [:create, :main9],
             [:reload, :main3],
@@ -130,7 +134,7 @@
       context "with system pipeline" do
         let(:pipeline_configs) do
           [
-            running_pipelines[:main1].pipeline_config,
+            pipelines.get_pipeline(:main1).pipeline_config,
             mock_pipeline_config(:main9),
             mock_pipeline_config(:main5, "input { generator {}}"),
             mock_pipeline_config(:main3, "input { generator {}}"),
@@ -140,7 +144,7 @@
         end
 
         it "creates the system pipeline before user defined pipelines" do
-          expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
+          expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
             [:create, :monitoring],
             [:create, :main7],
             [:create, :main9],
diff --git a/logstash-core/spec/support/matchers.rb b/logstash-core/spec/support/matchers.rb
index 3782f69742f..da0948ddd54 100644
--- a/logstash-core/spec/support/matchers.rb
+++ b/logstash-core/spec/support/matchers.rb
@@ -51,44 +51,50 @@ def all_instance_methods_implemented?
 
 RSpec::Matchers.define :have_pipeline? do |pipeline_config|
   match do |agent|
-    pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
-    expect(pipeline).to_not be_nil
+    pipeline = nil
+    try(30) do
+      pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
+      expect(pipeline).to_not be_nil
+    end
     expect(pipeline.config_str).to eq(pipeline_config.config_string)
+    expect(agent.running_pipelines.keys.map(&:to_s)).to include(pipeline_config.pipeline_id.to_s)
   end
 
   match_when_negated do |agent|
-    pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
-    pipeline.nil? || pipeline.config_str != pipeline_config.config_string
+    pipeline = nil
+    try(30) do
+      pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
+      expect(pipeline).to_not be_nil
+    end
+    # either the pipeline_id is not in the running pipelines OR it is but have different configurations
+    expect(!agent.running_pipelines.keys.map(&:to_s).include?(pipeline_config.pipeline_id.to_s) ||  pipeline.config_str != pipeline_config.config_string).to be_truthy
   end
 end
 
 RSpec::Matchers.define :have_running_pipeline? do |pipeline_config|
   match do |agent|
-    Stud.try(10.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
+    pipeline = nil
+    try(30) do
       pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
       expect(pipeline).to_not be_nil
-      expect(pipeline.config_str).to eq(pipeline_config.config_string)
-      expect(pipeline.running?).to be_truthy
     end
+    expect(pipeline.config_str).to eq(pipeline_config.config_string)
+    expect(pipeline.running?).to be_truthy
+    expect(agent.running_pipelines.keys.map(&:to_s)).to include(pipeline_config.pipeline_id.to_s)
   end
 
   failure_message do |agent|
     pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
 
     if pipeline.nil?
-      "Expected pipeline to exist and running, be we cannot find `#{pipeline_config.pipeline_id}` in the running pipelines `#{agent.pipelines.keys.join(",")}`"
-      else
-        if pipeline.running? == false
-          "Found `#{pipeline_config.pipeline_id}` in the list of pipelines but its not running"
-        elsif pipeline.config_str != pipeline_config.config_string
-          "Found `#{pipeline_config.pipeline_id}` in the list of pipelines and running, but the config_string doesn't match,
-Expected:
-#{pipeline_config.config_string}
-
-got:
-#{pipeline.config_str}"
-        end
+      "Expected pipeline to exist and running, be we cannot find '#{pipeline_config.pipeline_id.to_s}' in the running pipelines '#{agent.running_pipelines.keys.join(",")}'"
+    else
+      if !pipeline.running?
+        "Found '#{pipeline_config.pipeline_id.to_s}' in the list of pipelines but its not running"
+      elsif pipeline.config_str != pipeline_config.config_string
+        "Found '#{pipeline_config.pipeline_id.to_s}' in the list of pipelines and running, but the config_string doesn't match,\nExpected:\n#{pipeline_config.config_string}\n\ngot:\n#{pipeline.config_str}"
       end
+    end
   end
 
   match_when_negated do
diff --git a/logstash-core/spec/support/shared_contexts.rb b/logstash-core/spec/support/shared_contexts.rb
index b30e820c574..55ddfd7e9f4 100644
--- a/logstash-core/spec/support/shared_contexts.rb
+++ b/logstash-core/spec/support/shared_contexts.rb
@@ -26,12 +26,12 @@
     @agent.execute
     pipeline_config = mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }")
     pipeline_creator =  LogStash::PipelineAction::Create.new(pipeline_config, @agent.metric)
-    @pipelines = java.util.concurrent.ConcurrentHashMap.new
-    expect(pipeline_creator.execute(@agent, @pipelines)).to be_truthy
+    @pipelines_registry = LogStash::PipelinesRegistry.new
+    expect(pipeline_creator.execute(@agent, @pipelines_registry)).to be_truthy
   end
 
   after :all do
-    @pipelines.each do |_, pipeline|
+    @pipelines_registry.running_pipelines.each do |_, pipeline|
       pipeline.shutdown
       pipeline.thread.join
     end
diff --git a/logstash-core/src/main/java/org/logstash/plugins/HooksRegistryExt.java b/logstash-core/src/main/java/org/logstash/plugins/HooksRegistryExt.java
index b90b48d71d0..b3a66ed150e 100644
--- a/logstash-core/src/main/java/org/logstash/plugins/HooksRegistryExt.java
+++ b/logstash-core/src/main/java/org/logstash/plugins/HooksRegistryExt.java
@@ -9,6 +9,7 @@
 import org.jruby.runtime.ThreadContext;
 import org.jruby.runtime.builtin.IRubyObject;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -48,6 +49,15 @@ public IRubyObject registerHooks(final ThreadContext context, final IRubyObject
         return syncHooks(context);
     }
 
+    @JRubyMethod(name = "remove_hooks")
+    public IRubyObject remove_hooks(final ThreadContext context, final IRubyObject emitterScope, final IRubyObject callback) {
+        final List<IRubyObject> callbacks = registeredHooks.get(emitterScope);
+        if (callbacks == null) {
+            return context.fals;
+        }
+        return callbacks.removeAll(Collections.singleton(callback)) ? context.tru : context.fals;
+     }
+
     @JRubyMethod(name = "emitters_count")
     public IRubyObject emittersCount(final ThreadContext context) {
         return RubyFixnum.newFixnum(context.runtime, registeredEmitters.size());
diff --git a/x-pack/lib/monitoring/inputs/metrics.rb b/x-pack/lib/monitoring/inputs/metrics.rb
index c05733e433a..0cd4861e1a6 100644
--- a/x-pack/lib/monitoring/inputs/metrics.rb
+++ b/x-pack/lib/monitoring/inputs/metrics.rb
@@ -65,18 +65,28 @@ def configure_snapshot_poller
       @timer_task.add_observer(TimerTaskLogger.new)
     end
 
-      def run(arg_queue)
-        @logger.debug("Metric: input started")
-        @queue = arg_queue
-
-        configure_snapshot_poller
-
-        # This must be invoked here because we need a queue to store the data
-        LogStash::PLUGIN_REGISTRY.hooks.register_hooks(LogStash::Agent, self)
-
-        exec_timer_task
-        sleep_till_stop
-      end
+    def run(arg_queue)
+      @logger.debug("Metric: input started")
+      @queue = arg_queue
+
+      configure_snapshot_poller
+
+      # This hook registration was originally set here to act on pipeline_started dispatcher event
+      # from the Agent using the pipeline_started method here which sends events to the pipeline queue
+      # which is only available here in the run method.
+      #
+      # There are 2 things to know with this strategy:
+      # - The initial pipeline creation preceding this plugin invocation will not be catched by our
+      #   hook here because it is added after the initial pipeline creations.
+      #
+      # - The below remove_hooks was added because not removing it was causing problems in tests where
+      #   multiple instances of this plugin would be created and added in the global static PLUGIN_REGISTRY
+      #   leading to calling the pipeline_started method multiple times leading to weird problems.
+      LogStash::PLUGIN_REGISTRY.hooks.register_hooks(LogStash::Agent, self)
+
+      exec_timer_task
+      sleep_till_stop
+    end
 
     def exec_timer_task
       @timer_task.execute
@@ -90,6 +100,7 @@ def sleep_till_stop
 
     def stop
       @logger.debug("Metrics input: stopped")
+      LogStash::PLUGIN_REGISTRY.hooks.remove_hooks(LogStash::Agent, self)
       @timer_task.shutdown if @timer_task
     end
 
@@ -128,7 +139,7 @@ def update_states
       time_for_update = @last_states_update.nil? || @last_states_update < (Time.now - 60*10)
 
       pipeline_hashes = []
-      agent.pipelines.each do |pipeline_id, pipeline|
+      agent.running_pipelines.each do |pipeline_id, pipeline|
         if time_for_update || !@last_updated_pipeline_hashes.include?(pipeline.hash)
           update_pipeline_state(pipeline)
         end
diff --git a/x-pack/lib/monitoring/inputs/metrics/stats_event/pipelines_info.rb b/x-pack/lib/monitoring/inputs/metrics/stats_event/pipelines_info.rb
index 6b9f787d89c..9abd227b4c5 100644
--- a/x-pack/lib/monitoring/inputs/metrics/stats_event/pipelines_info.rb
+++ b/x-pack/lib/monitoring/inputs/metrics/stats_event/pipelines_info.rb
@@ -9,7 +9,7 @@ def self.format_pipelines_info(agent, metric_store, extended_performance_collect
       # metrics pipelines. This prevents race conditions as pipeline stats may be
       # populated before the agent has it in its own pipelines state
       stats = metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines]
-      agent.pipelines.map do |pipeline_id, pipeline|
+      agent.running_pipelines.map do |pipeline_id, pipeline|
         p_stats = stats[pipeline_id]
         # Don't record stats for system pipelines
         next nil if pipeline.system?
diff --git a/x-pack/spec/monitoring/inputs/metrics_spec.rb b/x-pack/spec/monitoring/inputs/metrics_spec.rb
index 0cda3ea423f..093d0813b00 100644
--- a/x-pack/spec/monitoring/inputs/metrics_spec.rb
+++ b/x-pack/spec/monitoring/inputs/metrics_spec.rb
@@ -38,36 +38,34 @@
     }
   end
 
-
   context "integration" do
 
     shared_examples_for 'events are added to the queue' do
       it 'should add a stats events to the queue' do
-        expect(stats_events.size).to eq(1)
+        wait(60).for { stats_events.size }.to eq(1)
       end
 
       it 'should add two state events to the queue' do
         # Triggered event plus the one from `update`
-        expect(state_events.size).to eq(2)
+        wait(60).for { state_events.size }.to eq(2)
       end
     end
 
     shared_examples_for 'events are not added to the queue' do
       it 'should not add a stats events to the queue' do
-        expect(stats_events.size).to eq(0)
+        wait(60).for { stats_events.size }.to eq(0)
       end
 
       it 'should not add a state events to the queue' do
         # Triggered event plus the one from `update`
-        expect(state_events.size).to eq(0)
+        wait(60).for { state_events.size }.to eq(0)
       end
     end
 
-    let(:schemas_path) { File.join("spec", "monitoring", "schemas") }
-    let(:queue) { [] }
+    let(:schemas_path) { File.join(File.dirname(__FILE__), "..", "..", "..", "spec", "monitoring", "schemas") }
+    let(:queue) { Concurrent::Array.new }
 
-    let(:number_of_events) { 20 }
-    let(:config) { "input { generator { count => #{number_of_events} } } output { null { } }" }
+    let(:config) { "input { dummyblockinginput { } } output { null { } }" }
 
     let(:pipeline_settings) { LogStash::Runner::SYSTEM_SETTINGS.clone.merge({
       "pipeline.id" => "main",
@@ -77,6 +75,7 @@
     let(:agent) { LogStash::Agent.new(pipeline_settings) }
     let(:metric) { agent.metric }
     let(:collector) { metric.collector }
+    let(:agent_task) { start_agent(agent) }
 
     # Can't use let because this value can change over time
     def stats_events
@@ -92,74 +91,68 @@ def state_events
       end
     end
 
-    before :each do
-      allow(subject).to receive(:fetch_global_stats).and_return({"uuid" => "00001" })
-    end
-
-    def setup_pipeline
-      agent.execute
-
-      100.times do
-        sleep 0.1
-        break if main_pipeline
-      end
-      raise "No main pipeline registered!" unless main_pipeline
+    context "with pipeline execution" do
 
-      subject.metric = metric
+      before :each do
+        allow(subject).to receive(:fetch_global_stats).and_return({"uuid" => "00001" })
+        allow(subject).to receive(:exec_timer_task)
+        allow(subject).to receive(:sleep_till_stop)
 
-      subject.register
-      subject.run(queue)
-      subject.pipeline_started(agent, main_pipeline)
-    end
+        agent
+        agent_task
 
-    def main_pipeline
-      agent.get_pipeline(:main)
-    end
+        wait(60).for { agent.get_pipeline(:main) }.to_not be_nil
 
-    after :each do
-      agent.shutdown
-    end
+        subject.metric = metric
 
-    context 'after the pipeline is setup' do
-      before do
-        allow(subject).to receive(:exec_timer_task)
-        allow(subject).to receive(:sleep_till_stop)
-        setup_pipeline
+        subject.register
+        subject.run(queue)
+        subject.pipeline_started(agent, agent.get_pipeline(:main))
       end
-      it "should store the agent" do
-        expect(subject.agent).to eq(agent)
+
+      after :each do
+        subject.stop
+        agent.shutdown
+        agent_task.wait
       end
-    end
 
-    describe "#update" do
-      before :each do
-        allow(subject).to receive(:fetch_global_stats).and_return({"uuid" => "00001" })
-        allow(subject).to receive(:exec_timer_task)
-        allow(subject).to receive(:sleep_till_stop)
-        setup_pipeline
-        subject.update(collector.snapshot_metric)
+      context 'after the pipeline is setup' do
+        it "should store the agent" do
+           expect(subject.agent).to eq(agent)
+        end
       end
 
-      it_behaves_like 'events are added to the queue'
+      describe "#update" do
+        before :each do
+          # collector.snapshot_metric is timing dependant and if fired too fast will miss some metrics.
+          # after some tests a correct metric_store.size is 72 but when it is incomplete it is lower.
+          # I guess this 72 is dependant on the metrics we collect and there is probably a better
+          # way to make sure no metrics are missing without forcing a hard sleep but this is what is
+          # easily observable, feel free to refactor with a better "timing" test here.
+          wait(60).for { collector.snapshot_metric.metric_store.size >= 72 }.to be_truthy
 
-      describe "state event" do
-        let(:schema_file) { File.join(schemas_path, "states_document_schema.json") }
-        let(:event) { state_events.first }
+          subject.update(collector.snapshot_metric)
+        end
 
-        it "should validate against the schema" do
-          expect(event).to be_a(LogStash::Event)
-          expect(JSON::Validator.fully_validate(schema_file, event.to_json)).to be_empty
+        it_behaves_like 'events are added to the queue'
+
+        describe "state event" do
+          let(:schema_file) { File.join(schemas_path, "states_document_schema.json") }
+
+          it "should validate against the schema" do
+            wait(60).for { state_events.empty? }.to be_falsey
+            expect(JSON::Validator.fully_validate(schema_file, state_events.first.to_json)).to be_empty
+          end
         end
-      end
 
-      describe "#build_event" do
-        let(:schema_file) { File.join(schemas_path, "monitoring_document_schema.json") }
+        describe "#build_event" do
+          let(:schema_file) { File.join(schemas_path, "monitoring_document_schema.json") }
 
-        describe "data event" do
-          let(:event) { stats_events.first }
-          it "has the correct schema" do
-            expect(event).to be_a(LogStash::Event) # Check that we actually have an event...
-            expect(JSON::Validator.fully_validate(schema_file, event.to_json)).to be_empty
+          describe "data event" do
+            it "has the correct schema" do
+              wait(60).for { stats_events.empty? }.to be_falsey
+              expect(JSON::Validator.fully_validate(schema_file, stats_events.first.to_json)).to be_empty
+            end
           end
         end
       end
@@ -173,6 +166,10 @@ def main_pipeline
       allow(subject).to receive(:queue).and_return(queue)
     end
 
+    after :each do
+      subject.stop
+    end
+
     describe "#update_pipeline_state" do
       let(:pipeline) { double("pipeline") }
       let(:state_event) { double("state event") }
diff --git a/x-pack/spec/support/helpers.rb b/x-pack/spec/support/helpers.rb
index cba1b5a9974..eb26a1262e0 100644
--- a/x-pack/spec/support/helpers.rb
+++ b/x-pack/spec/support/helpers.rb
@@ -2,6 +2,8 @@
 # or more contributor license agreements. Licensed under the Elastic License;
 # you may not use this file except in compliance with the Elastic License.
 
+require "stud/task"
+
 # Settings' TimeValue is using nanos seconds as the default unit
 def time_value(time)
   LogStash::Util::TimeValue.from_value(time).to_nanos
@@ -21,7 +23,6 @@ def define_settings(settings_options)
   end
 end
 
-
 def apply_settings(settings_values, settings = nil)
   settings = settings.nil? ? LogStash::SETTINGS.clone : settings
 
@@ -31,3 +32,35 @@ def apply_settings(settings_values, settings = nil)
 
   settings
 end
+
+def start_agent(agent)
+  agent_task = Stud::Task.new do
+    begin
+      agent.execute
+    rescue => e
+      raise "Start Agent exception: #{e}"
+    end
+  end
+
+  wait(30).for { agent.running? }.to be(true)
+  agent_task
+end
+
+module LogStash
+  module Inputs
+    class DummyBlockingInput < LogStash::Inputs::Base
+      config_name "dummyblockinginput"
+      milestone 2
+
+      def register
+      end
+
+      def run(_)
+        sleep(1) while !stop?
+      end
+
+      def stop
+      end
+    end
+  end
+end