diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index a41c18eec30..68ce5a1f504 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -137,9 +137,7 @@ def start @thread = Thread.new do error_log_params = ->(e) { - default_logging_keys( - :exception => e, - :backtrace => e.backtrace, + exception_logging_keys(e, "pipeline.sources" => pipeline_source_details ) } @@ -312,7 +310,7 @@ def start_workers @crash_detected.make_true @logger.error( "Pipeline worker error, the pipeline will be stopped", - default_logging_keys(:error => e.cause.message, :exception => e.cause.class, :backtrace => e.cause.backtrace) + exception_logging_keys(e.cause) ) end end @@ -422,21 +420,14 @@ def inputworker(plugin) if plugin.stop? @logger.debug( "Input plugin raised exception during shutdown, ignoring it.", - default_logging_keys( - :plugin => plugin.class.config_name, - :exception => e.message, - :backtrace => e.backtrace)) + exception_logging_keys(e, :plugin => plugin.class.config_name)) return end # otherwise, report error and restart @logger.error(I18n.t( "logstash.pipeline.worker-error-debug", - **default_logging_keys( - :plugin => plugin.inspect, - :error => e.message, - :exception => e.class, - :stacktrace => e.backtrace.join("\n")))) + **exception_logging_keys(e, :plugin => plugin.inspect))) # Assuming the failure that caused this exception is transient, # let's sleep for a bit and execute #run again @@ -580,10 +571,7 @@ def close_plugin_and_ignore(plugin) rescue => e @logger.warn( "plugin raised exception while closing, ignoring", - default_logging_keys( - :plugin => plugin.class.config_name, - :exception => e.message, - :backtrace => e.backtrace)) + exception_logging_keys(e, :plugin => plugin.class.config_name)) end end @@ -607,12 +595,12 @@ def init_worker_loop rescue => e @logger.error( "Worker loop initialization error", - default_logging_keys(:error => e.message, :exception => e.class, :stacktrace => e.backtrace.join("\n"))) + exception_logging_keys(e)) nil rescue Java::java.lang.StackOverflowError => se @logger.error( "Stack overflow error while compiling Pipeline. Please increase thread stack size using -Xss", - default_logging_keys()) + exception_logging_keys(se)) nil end end @@ -630,6 +618,42 @@ def default_logging_keys(other_keys = {}) keys end + def exception_logging_keys(active_exception = $!, other_keys = {}) + base = active_exception ? unwind_cause_chain(active_exception) : {} + default_logging_keys(base.merge(other_keys)) + end + + ## + # Yields once per exception in the provided exception's cause chain + # @param exception [Exception] + # @yield_param [Exception] + def cause_chain(exception, &block) + return enum_for(:cause_chain, exception) unless block_given? + + current = exception + while !current.nil? + yield current + cause = current.cause + current = (cause == current ? nil : cause) + end + end + + ## + # unwinds the provided exception to create a deeply-nested structure + # representing the exception and its cause chain + # @param [Exception] exception + # @return [Hash{Symbol=>[String|Hash]}] + def unwind_cause_chain(exception) + cause_chain(exception).reverse_each.reduce(nil) do |cause, exception| + { + :exception => exception.class.name, + :error => exception.message, + :stacktrace => exception.backtrace.join("\n"), + :cause => cause, + }.compact + end + end + def preserve_event_order?(pipeline_workers) case settings.get("pipeline.ordered") when "auto"