From b3eb7bbf37206cd19799df953926825f7d82691e Mon Sep 17 00:00:00 2001 From: Johannes Haass Date: Tue, 14 Jan 2025 12:51:08 +0100 Subject: [PATCH] Log worker name in steno logger Logging the worker name will make it easier to analyse the logs of particular worker process/thread. The worker name will be added as an additional key in the data field where e.g. the request_id is already logged. Example: ``` {"timestamp":"2025-01-14T09:51:48.249447000Z","message":"(0.001651s) (app/actions/organization_delete.rb:33:in `block (2 levels) in delete') (query_length=43) DELETE FROM \"organizations\" WHERE \"id\" = 32","log_level":"debug","source":"cc.background","data":{"worker_name":"cc_global_worker_generic thread:7","request_guid":"0728f0f0-7f96-420c-ab2d-1a3768bb7047"},"thread_id":39340,"fiber_id":39360,"process_id":68504,"file":"/Users/bommel/.rbenv/versions/3.2.6/lib/ruby/gems/3.2.0/gems/sequel-5.88.0/lib/sequel/database/logging.rb","lineno":88,"method":"public_send"} ``` --- lib/delayed_job/delayed_worker.rb | 1 + lib/delayed_job/threaded_worker.rb | 1 + spec/unit/lib/delayed_job/delayed_worker_spec.rb | 12 ++++++++++-- spec/unit/lib/delayed_job/threaded_worker_spec.rb | 10 ++++++++++ 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/lib/delayed_job/delayed_worker.rb b/lib/delayed_job/delayed_worker.rb index 9b7519d86a6..0b4ef95d2a0 100644 --- a/lib/delayed_job/delayed_worker.rb +++ b/lib/delayed_job/delayed_worker.rb @@ -61,6 +61,7 @@ def get_initialized_delayed_worker(config, logger) worker = Delayed::Worker.new(@queue_options) worker.name = @queue_options[:worker_name] + Steno.config.context.data[:worker_name] = worker.name worker end diff --git a/lib/delayed_job/threaded_worker.rb b/lib/delayed_job/threaded_worker.rb index 1a34a1fe04b..3f514daaf7b 100644 --- a/lib/delayed_job/threaded_worker.rb +++ b/lib/delayed_job/threaded_worker.rb @@ -33,6 +33,7 @@ def start @num_threads.times do |thread_index| thread = Thread.new do Thread.current[:thread_index] = thread_index + Steno.config.context.data[:worker_name] = name # override logged worker name with thread specific name threaded_start rescue Exception => e # rubocop:disable Lint/RescueException say "Unexpected error: #{e.message}\n#{e.backtrace.join("\n")}", 'error' diff --git a/spec/unit/lib/delayed_job/delayed_worker_spec.rb b/spec/unit/lib/delayed_job/delayed_worker_spec.rb index 091e32f97d8..c21cd511b48 100644 --- a/spec/unit/lib/delayed_job/delayed_worker_spec.rb +++ b/spec/unit/lib/delayed_job/delayed_worker_spec.rb @@ -69,6 +69,8 @@ describe '#start_working' do let(:cc_delayed_worker) { CloudController::DelayedWorker.new(options) } + before { allow(delayed_worker).to receive(:name).and_return(options[:name]) } + it 'sets up the environment and starts the worker' do expect(environment).to receive(:setup_environment).with(nil) expect(Delayed::Worker).to receive(:new).with(anything).and_return(delayed_worker) @@ -86,10 +88,16 @@ expect(Delayed::Worker.sleep_delay).to eq(5) end + it 'sets the worker name in the Steno context' do + cc_delayed_worker.start_working + expect(Steno.config.context.data[:worker_name]).to eq(options[:name]) + end + context 'when the number of threads is specified' do before do allow(Delayed).to receive(:remove_const).with(:Worker) allow(Delayed).to receive(:const_set).with(:Worker, Delayed::ThreadedWorker) + allow(threaded_worker).to receive(:name) options[:num_threads] = 7 end @@ -122,7 +130,7 @@ expect(environment).to receive(:setup_environment).with(nil) expect(Delayed::Worker).to receive(:new).with(anything).and_return(delayed_worker).once expect(delayed_worker).to receive(:name=).with(options[:name]).once - expect(delayed_worker).to receive(:name).and_return(options[:name]).once + expect(delayed_worker).to receive(:name).and_return(options[:name]).twice expect(Delayed::Job).to receive(:clear_locks!).with(options[:name]).once cc_delayed_worker.clear_locks! @@ -134,7 +142,7 @@ expect(environment).to receive(:setup_environment).with(nil) expect(Delayed::Worker).to receive(:new).with(anything).and_return(threaded_worker).once expect(threaded_worker).to receive(:name=).with(options[:name]).once - expect(threaded_worker).to receive(:name).and_return(options[:name]).once + expect(threaded_worker).to receive(:name).and_return(options[:name]).twice expect(threaded_worker).to receive(:names_with_threads).and_return(["#{options[:name]} thread:1", "#{options[:name]} thread:2"]).once expect(Delayed::Job).to receive(:clear_locks!).with(options[:name]).once expect(Delayed::Job).to receive(:clear_locks!).with("#{options[:name]} thread:1").once diff --git a/spec/unit/lib/delayed_job/threaded_worker_spec.rb b/spec/unit/lib/delayed_job/threaded_worker_spec.rb index bad035d0af4..29136888832 100644 --- a/spec/unit/lib/delayed_job/threaded_worker_spec.rb +++ b/spec/unit/lib/delayed_job/threaded_worker_spec.rb @@ -62,6 +62,16 @@ expect { worker.start }.to raise_error('Unexpected error occurred in one of the worker threads') expect(worker.instance_variable_get(:@unexpected_error)).to be true end + + it 'sets the worker name in the Steno context' do + steno_data_spy = spy('data') + allow(Steno.config.context).to receive(:data).and_return(steno_data_spy) + + worker.start + worker.instance_variable_get(:@threads).each_with_index do |_, index| + expect(steno_data_spy).to have_received(:[]=).with(:worker_name, "#{worker_name} thread:#{index + 1}") + end + end end describe '#names_with_threads' do