2828module LogStash ; class JavaPipeline < JavaBasePipeline
2929 include LogStash ::Util ::Loggable
3030
31+ class WorkerException < RuntimeError ; end
32+
3133 java_import org . apache . logging . log4j . ThreadContext
3234
3335 attr_reader \
@@ -122,7 +124,8 @@ def start
122124 @thread = Thread . new do
123125 error_log_params = -> ( e ) {
124126 default_logging_keys (
125- :exception => e ,
127+ :exception => e . class ,
128+ :message => e . message ,
126129 :backtrace => e . backtrace ,
127130 "pipeline.sources" => pipeline_source_details
128131 )
@@ -133,10 +136,12 @@ def start
133136 ThreadContext . put ( "pipeline.id" , pipeline_id )
134137 run
135138 @finished_run . make_true
136- rescue => e
139+ rescue WorkerException => e
137140 # no need to log at ERROR level since this log will be redundant to the log in
138141 # the worker loop thread global rescue clause
139- logger . debug ( "Pipeline terminated by worker error" , error_log_params . call ( e ) )
142+ logger . debug ( "Worker terminated by error" , error_log_params . call ( e ) )
143+ rescue => e
144+ logger . error ( "Pipeline terminated by error" , error_log_params . call ( e ) )
140145 ensure
141146 # we must trap any exception here to make sure the following @finished_execution
142147 # is always set to true regardless of any exception before in the close method call
@@ -337,7 +342,7 @@ def monitor_inputs_and_workers
337342 # this is a worker thread termination
338343 # delete it from @worker_threads so that wait_for_workers does not wait for it
339344 @worker_threads . delete ( terminated_thread )
340- raise ( "Worker thread terminated in pipeline.id: #{ pipeline_id } " )
345+ raise WorkerException . new ( "Worker thread terminated in pipeline.id: #{ pipeline_id } " )
341346 end
342347 end
343348
0 commit comments