Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[6.5 clean backport of #10346] fix agent silent exit upon pipelines reloading #10367

Merged
merged 1 commit into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "logstash/config/source_loader"
require "logstash/pipeline_action"
require "logstash/state_resolver"
require "logstash/pipelines_registry"
require "stud/trap"
require "uri"
require "socket"
Expand All @@ -19,7 +20,7 @@ class LogStash::Agent
include LogStash::Util::Loggable
STARTED_AT = Time.now.freeze

attr_reader :metric, :name, :settings, :webserver, :dispatcher, :ephemeral_id, :pipelines, :pipeline_bus
attr_reader :metric, :name, :settings, :webserver, :dispatcher, :ephemeral_id, :pipeline_bus
attr_accessor :logger

# initialize method for LogStash::Agent
Expand All @@ -40,7 +41,7 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
@pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new

@pipelines = java.util.concurrent.ConcurrentHashMap.new();
@pipelines_registry = LogStash::PipelinesRegistry.new

@name = setting("node.name")
@http_host = setting("http.host")
Expand Down Expand Up @@ -118,14 +119,17 @@ def execute
converge_state_and_update unless stopped?
end
else
return 1 if clean_state?
# exit with error status if the initial converge_state_and_update did not create any pipeline
return 1 if @pipelines_registry.empty?

while !Stud.stop?
if clean_state? || running_user_defined_pipelines?
sleep(0.5)
else
break
end
# exit if all pipelines are terminated and none are reloading
break if no_pipeline?

# exit if there are no user defined pipelines (not system pipeline) and none are reloading
break if !running_user_defined_pipelines?

sleep(0.5)
end
end

Expand All @@ -139,11 +143,11 @@ def auto_reload?
end

def running?
@running.value
@running.true?
end

def stopped?
!@running.value
@running.false?
end

def converge_state_and_update
Expand Down Expand Up @@ -237,43 +241,48 @@ def id_path
@id_path ||= ::File.join(settings.get("path.data"), "uuid")
end

#
# Backward compatibility proxies to the PipelineRegistry
#

def get_pipeline(pipeline_id)
pipelines.get(pipeline_id)
@pipelines_registry.get_pipeline(pipeline_id)
end

def pipelines_count
pipelines.size
@pipelines_registry.size
end

def running_pipelines
pipelines.select {|id,pipeline| running_pipeline?(id) }
end
@pipelines_registry.running_pipelines
end

def non_running_pipelines
pipelines.select {|id,pipeline| !running_pipeline?(id) }
@pipelines_registry.non_running_pipelines
end

def running_pipelines?
running_pipelines_count > 0
@pipelines_registry.running_pipelines.any?
end

def running_pipelines_count
running_pipelines.size
@pipelines_registry.running_pipelines.size
end

def running_user_defined_pipelines?
!running_user_defined_pipelines.empty?
@pipelines_registry.running_user_defined_pipelines.any?
end

def running_user_defined_pipelines
pipelines.select {|id, pipeline| running_pipeline?(id) && !pipeline.system? }
@pipelines_registry.running_user_defined_pipelines
end

def with_running_user_defined_pipelines
yield running_user_defined_pipelines
def no_pipeline?
@pipelines_registry.running_pipelines.empty?
end

private

def transition_to_stopped
@running.make_false
end
Expand All @@ -298,7 +307,7 @@ def converge_state(pipeline_actions)
converge_result = LogStash::ConvergeResult.new(pipeline_actions.size)

pipeline_actions.map do |action|
Thread.new do
Thread.new(action, converge_result) do |action, converge_result|
java.lang.Thread.currentThread().setName("Converge #{action}");
# We execute every task we need to converge the current state of pipelines
# for every task we will record the action result, that will help us
Expand All @@ -314,34 +323,35 @@ def converge_state(pipeline_actions)
# that we currently have.
begin
logger.debug("Executing action", :action => action)
action_result = action.execute(self, pipelines)
action_result = action.execute(self, @pipelines_registry)
converge_result.add(action, action_result)

unless action_result.successful?
logger.error("Failed to execute action", :id => action.pipeline_id,
:action_type => action_result.class, :message => action_result.message,
:backtrace => action_result.backtrace)
logger.error("Failed to execute action",
:id => action.pipeline_id,
:action_type => action_result.class,
:message => action_result.message,
:backtrace => action_result.backtrace
)
end
rescue SystemExit => e
converge_result.add(action, e)
rescue Exception => e
rescue SystemExit, Exception => e
logger.error("Failed to execute action", :action => action, :exception => e.class.name, :message => e.message, :backtrace => e.backtrace)
converge_result.add(action, e)
end
end
end.each(&:join)

if logger.trace?
logger.trace("Converge results", :success => converge_result.success?,
:failed_actions => converge_result.failed_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}, message: #{r.message}" },
:successful_actions => converge_result.successful_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}" })
end
logger.trace? && logger.trace("Converge results",
:success => converge_result.success?,
:failed_actions => converge_result.failed_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}, message: #{r.message}" },
:successful_actions => converge_result.successful_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}" }
)

converge_result
end

def resolve_actions(pipeline_configs)
@state_resolver.resolve(@pipelines, pipeline_configs)
@state_resolver.resolve(@pipelines_registry, pipeline_configs)
end

def dispatch_events(converge_results)
Expand Down Expand Up @@ -399,7 +409,7 @@ def collect_metrics?
end

def shutdown_pipelines
logger.debug("Shutting down all pipelines", :pipelines_count => pipelines_count)
logger.debug("Shutting down all pipelines", :pipelines_count => running_pipelines_count)

# In this context I could just call shutdown, but I've decided to
# use the stop action implementation for that so we have the same code.
Expand All @@ -408,16 +418,6 @@ def shutdown_pipelines
converge_state(pipeline_actions)
end

def running_pipeline?(pipeline_id)
pipeline = get_pipeline(pipeline_id)
return false unless pipeline
thread = pipeline.thread
thread.is_a?(Thread) && thread.alive?
end

def clean_state?
pipelines.empty?
end

def setting(key)
@settings.get(key)
Expand Down
12 changes: 5 additions & 7 deletions logstash-core/lib/logstash/instrument/periodic_poller/dlq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ def initialize(metric, agent, options = {})
end

def collect
pipelines = @agent.with_running_user_defined_pipelines {|pipelines| pipelines}
unless pipelines.nil?
pipelines.each {|_, pipeline|
unless pipeline.nil?
pipeline.collect_dlq_stats
end
}
pipelines = @agent.running_user_defined_pipelines
pipelines.each do |_, pipeline|
unless pipeline.nil?
pipeline.collect_dlq_stats
end
end
end
end
Expand Down
14 changes: 6 additions & 8 deletions logstash-core/lib/logstash/instrument/periodic_poller/pq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ def initialize(metric, queue_type, agent, options = {})
end

def collect
pipelines = @agent.with_running_user_defined_pipelines {|pipelines| pipelines}
unless pipelines.nil?
pipelines.each {|_, pipeline|
unless pipeline.nil?
pipeline.collect_stats
end
}
pipelines = @agent.running_user_defined_pipelines
pipelines.each do |_, pipeline|
unless pipeline.nil?
pipeline.collect_stats
end
end
end
end
end; end; end
end end end
6 changes: 3 additions & 3 deletions logstash-core/lib/logstash/instrument/periodic_pollers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ module LogStash module Instrument
class PeriodicPollers
attr_reader :metric

def initialize(metric, queue_type, pipelines)
def initialize(metric, queue_type, agent)
@metric = metric
@periodic_pollers = [PeriodicPoller::Os.new(metric),
PeriodicPoller::JVM.new(metric),
PeriodicPoller::PersistentQueue.new(metric, queue_type, pipelines),
PeriodicPoller::DeadLetterQueue.new(metric, pipelines)]
PeriodicPoller::PersistentQueue.new(metric, queue_type, agent),
PeriodicPoller::DeadLetterQueue.new(metric, agent)]
end

def start
Expand Down
28 changes: 22 additions & 6 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,23 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
@flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
@shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
@outputs_registered = Concurrent::AtomicBoolean.new(false)

# @finished_execution signals that the pipeline thread has finished its execution
# regardless of any exceptions; it will always be true when the thread completes
@finished_execution = Concurrent::AtomicBoolean.new(false)

# @finished_run signals that the run methods called in the pipeline thread was completed
# without errors and it will NOT be set if the run method exits from an exception; this
# is by design and necessary for the wait_until_started semantic
@finished_run = Concurrent::AtomicBoolean.new(false)

@thread = nil
end # def initialize

def finished_execution?
@finished_execution.true?
end

def ready?
@ready.value
end
Expand Down Expand Up @@ -83,15 +97,18 @@ def start
@logger.debug("Starting pipeline", default_logging_keys)

@finished_execution.make_false
@finished_run.make_false

@thread = Thread.new do
begin
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
run
@finished_execution.make_true
@finished_run.make_true
rescue => e
close
logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
ensure
@finished_execution.make_true
end
end

Expand All @@ -106,15 +123,14 @@ def start

def wait_until_started
while true do
# This should be changed with an appropriate FSM
# It's an edge case, if we have a pipeline with
# a generator { count => 1 } its possible that `Thread#alive?` doesn't return true
# because the execution of the thread was successful and complete
if @finished_execution.true?
if @finished_run.true?
# it completed run without exception
return true
elsif thread.nil? || !thread.alive?
# some exception occurred and the thread is dead
return false
elsif running?
# fully initialized and running
return true
else
sleep 0.01
Expand Down
33 changes: 25 additions & 8 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,23 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
@flushing = Concurrent::AtomicReference.new(false)
@outputs_registered = Concurrent::AtomicBoolean.new(false)
@worker_shutdown = java.util.concurrent.atomic.AtomicBoolean.new(false)

# @finished_execution signals that the pipeline thread has finished its execution
# regardless of any exceptions; it will always be true when the thread completes
@finished_execution = Concurrent::AtomicBoolean.new(false)

# @finished_run signals that the run methods called in the pipeline thread was completed
# without errors and it will NOT be set if the run method exits from an exception; this
# is by design and necessary for the wait_until_started semantic
@finished_run = Concurrent::AtomicBoolean.new(false)

@thread = nil
end # def initialize

def finished_execution?
@finished_execution.true?
end

def ready?
@ready.value
end
Expand Down Expand Up @@ -152,16 +167,19 @@ def start
"pipeline.batch.size" => settings.get("pipeline.batch.size"),
"pipeline.batch.delay" => settings.get("pipeline.batch.delay")))

@finished_execution = Concurrent::AtomicBoolean.new(false)
@finished_execution.make_false
@finished_run.make_false

@thread = Thread.new do
begin
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
run
@finished_execution.make_true
@finished_run.make_true
rescue => e
close
@logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
ensure
@finished_execution.make_true
end
end

Expand All @@ -176,15 +194,14 @@ def start

def wait_until_started
while true do
# This should be changed with an appropriate FSM
# It's an edge case, if we have a pipeline with
# a generator { count => 1 } its possible that `Thread#alive?` doesn't return true
# because the execution of the thread was successful and complete
if @finished_execution.true?
if @finished_run.true?
# it completed run without exception
return true
elsif !thread.alive?
elsif thread.nil? || !thread.alive?
# some exception occured and the thread is dead
return false
elsif running?
# fully initialized and running
return true
else
sleep 0.01
Expand Down
Loading