Skip to content

Commit cda592f

Browse files
committed
Add pipeline.id to log lines
fixes #8290, #10521
1 parent 398e64e commit cda592f

File tree

8 files changed

+84
-6
lines changed

8 files changed

+84
-6
lines changed

config/jvm.options

+3
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,6 @@
7676

7777
# Entropy source for randomness
7878
-Djava.security.egd=file:/dev/urandom
79+
80+
# Copy the logging context from parent threads to children
81+
-Dlog4j2.isThreadContextMapInheritable=true

config/log4j2.properties

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ name = LogstashPropertiesConfig
44
appender.console.type = Console
55
appender.console.name = plain_console
66
appender.console.layout.type = PatternLayout
7-
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n
7+
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n
88

99
appender.json_console.type = Console
1010
appender.json_console.name = json_console
@@ -21,7 +21,7 @@ appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
2121
appender.rolling.policies.time.interval = 1
2222
appender.rolling.policies.time.modulate = true
2323
appender.rolling.layout.type = PatternLayout
24-
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %-.10000m%n
24+
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %-.10000m%n
2525
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
2626
appender.rolling.policies.size.size = 100MB
2727
appender.rolling.strategy.type = DefaultRolloverStrategy

logstash-core/lib/logstash/agent.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
8989

9090
def execute
9191
@thread = Thread.current # this var is implicitly used by Stud.stop?
92+
LogStash::Util.set_thread_name("Agent thread")
9293
logger.debug("Starting agent")
9394

9495
transition_to_running
@@ -307,7 +308,7 @@ def converge_state(pipeline_actions)
307308

308309
pipeline_actions.map do |action|
309310
Thread.new(action, converge_result) do |action, converge_result|
310-
java.lang.Thread.currentThread().setName("Converge #{action}");
311+
LogStash::Util.set_thread_name("Converge #{action}")
311312
# We execute every task we need to converge the current state of pipelines
312313
# for every task we will record the action result, that will help us
313314
# the results of all the task will determine if the converge was successful or not

logstash-core/lib/logstash/java_pipeline.rb

+5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
require "logstash/compiler"
99
require "logstash/config/lir_serializer"
1010

11+
java_import org.apache.logging.log4j.ThreadContext
12+
1113
module LogStash; class JavaPipeline < JavaBasePipeline
1214
include LogStash::Util::Loggable
1315
attr_reader \
@@ -102,6 +104,7 @@ def start
102104
@thread = Thread.new do
103105
begin
104106
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
107+
ThreadContext.put("pipeline.id", pipeline_id)
105108
run
106109
@finished_run.make_true
107110
rescue => e
@@ -236,6 +239,7 @@ def start_workers
236239
pipeline_workers.times do |t|
237240
thread = Thread.new do
238241
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
242+
ThreadContext.put("pipeline.id", pipeline_id)
239243
org.logstash.execution.WorkerLoop.new(
240244
lir_execution, filter_queue_client, @events_filtered, @events_consumed,
241245
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
@@ -305,6 +309,7 @@ def start_input(plugin)
305309

306310
def inputworker(plugin)
307311
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
312+
ThreadContext.put("pipeline.id", pipeline_id)
308313
begin
309314
plugin.run(wrapped_write_client(plugin.id.to_sym))
310315
rescue => e

logstash-core/lib/logstash/pipeline.rb

+9-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
require "logstash/filter_delegator"
1313
require "logstash/compiler"
1414

15+
java_import org.apache.logging.log4j.ThreadContext
16+
1517
module LogStash; class BasePipeline < AbstractPipeline
1618
include LogStash::Util::Loggable
1719

@@ -172,7 +174,8 @@ def start
172174

173175
@thread = Thread.new do
174176
begin
175-
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
177+
LogStash::Util.set_thread_name("[#{pipeline_id}]-manager")
178+
ThreadContext.put("pipeline.id", pipeline_id)
176179
run
177180
@finished_run.make_true
178181
rescue => e
@@ -300,7 +303,8 @@ def start_workers
300303

301304
pipeline_workers.times do |t|
302305
thread = Thread.new(batch_size, batch_delay, self) do |_b_size, _b_delay, _pipeline|
303-
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
306+
LogStash::Util::set_thread_name("[#{pipeline_id}]>worker#{t}")
307+
ThreadContext.put("pipeline.id", pipeline_id)
304308
_pipeline.worker_loop(_b_size, _b_delay)
305309
end
306310
@worker_threads << thread
@@ -430,6 +434,7 @@ def start_input(plugin)
430434

431435
def inputworker(plugin)
432436
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
437+
ThreadContext.put("pipeline.id", pipeline_id)
433438
begin
434439
plugin.run(wrapped_write_client(plugin.id.to_sym))
435440
rescue => e
@@ -535,6 +540,8 @@ def start_flusher
535540
raise "Attempted to start flusher on a stopped pipeline!" if stopped?
536541

537542
@flusher_thread = Thread.new do
543+
LogStash::Util.set_thread_name("[#{pipeline_id}]-flusher-thread")
544+
ThreadContext.put("pipeline.id", pipeline_id)
538545
while Stud.stoppable_sleep(5, 0.1) { stopped? }
539546
flush
540547
break if stopped?

logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ public IRubyObject start(final ThreadContext context) {
6060
} else {
6161
queueWriter = qw;
6262
}
63-
Thread t = new Thread(() -> input.start(queueWriter::push));
63+
Thread t = new Thread(() -> {
64+
org.apache.logging.log4j.ThreadContext.put("pipeline.id", pipeline.pipelineId().toString());
65+
input.start(queueWriter::push);
66+
});
6467
t.setName(pipeline.pipelineId().asJavaString() + "_" + input.getName() + "_" + input.getId());
6568
t.start();
6669
return JavaObject.wrap(context.getRuntime(), t);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
services:
3+
- logstash
4+
config: |-
5+
input {
6+
generator {
7+
count => 4
8+
}
9+
}
10+
output {
11+
null {}
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
require_relative '../framework/fixture'
2+
require_relative '../framework/settings'
3+
require_relative '../services/logstash_service'
4+
require_relative '../framework/helpers'
5+
require "logstash/devutils/rspec/spec_helper"
6+
require "yaml"
7+
8+
describe "Test Logstash Pipeline id" do
9+
before(:all) {
10+
@fixture = Fixture.new(__FILE__)
11+
# used in multiple LS tests
12+
@ls = @fixture.get_service("logstash")
13+
}
14+
15+
after(:all) {
16+
@fixture.teardown
17+
}
18+
19+
before(:each) {
20+
# backup the application settings file -- logstash.yml
21+
FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original")
22+
}
23+
24+
after(:each) {
25+
@ls.teardown
26+
# restore the application settings file -- logstash.yml
27+
FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file)
28+
}
29+
30+
let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") }
31+
let(:config) { @fixture.config("root") }
32+
33+
it "should write logs with pipeline.id" do
34+
pipeline_name = "custom_pipeline"
35+
settings = {
36+
"path.logs" => temp_dir,
37+
"pipeline.id" => pipeline_name
38+
}
39+
IO.write(@ls.application_settings_file, settings.to_yaml)
40+
@ls.spawn_logstash("-w", "1" , "-e", config)
41+
@ls.wait_for_logstash
42+
sleep 2 until @ls.exited?
43+
plainlog_file = "#{temp_dir}/logstash-plain.log"
44+
expect(File.exists?(plainlog_file)).to be true
45+
expect(IO.read(plainlog_file) =~ /\[logstash.javapipeline\s*\]\[#{pipeline_name}\]/).to be > 0
46+
end
47+
end

0 commit comments

Comments
 (0)