Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit cb38ec0

Browse files
committedFeb 15, 2019
remove exclusive lock for Ruby pipeline initialization
1 parent d8c7a98 commit cb38ec0

File tree

3 files changed

+6
-41
lines changed

3 files changed

+6
-41
lines changed
 

‎logstash-core/lib/logstash/agent.rb

-5
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
3636

3737
# Mutex to synchonize in the exclusive method
3838
# Initial usage for the Ruby pipeline initialization which is not thread safe
39-
@exclusive_lock = Mutex.new
4039
@webserver_control_lock = Mutex.new
4140

4241
# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
@@ -86,10 +85,6 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
8685
@running = Concurrent::AtomicBoolean.new(false)
8786
end
8887

89-
def exclusive(&block)
90-
@exclusive_lock.synchronize { block.call }
91-
end
92-
9388
def execute
9489
@thread = Thread.current # this var is implicitly used by Stud.stop?
9590
logger.debug("Starting agent")

‎logstash-core/lib/logstash/pipeline_action/create.rb

+2-13
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,11 @@ def execution_priority
3232
# The execute assume that the thread safety access of the pipeline
3333
# is managed by the caller.
3434
def execute(agent, pipelines_registry)
35-
new_pipeline =
36-
if @pipeline_config.settings.get_value("pipeline.java_execution")
37-
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
38-
else
39-
agent.exclusive do
40-
# The Ruby pipeline initialization is not thread safe because of the module level
41-
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
42-
# executed simultaneously in different threads and we need to synchronize this initialization.
43-
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
44-
end
45-
end
46-
35+
pipeline_class = @pipeline_config.settings.get_value("pipeline.java_execution") ? LogStash::JavaPipeline : LogStash::Pipeline
36+
new_pipeline = pipeline_class.new(@pipeline_config, @metric, agent)
4737
success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do
4838
new_pipeline.start # block until the pipeline is correctly started or crashed
4939
end
50-
5140
LogStash::ConvergeResult::ActionResult.create(self, success)
5241
end
5342

‎logstash-core/lib/logstash/pipeline_action/reload.rb

+4-23
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,10 @@ def execute(agent, pipelines_registry)
3131
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the existing pipeline is not reloadable")
3232
end
3333

34+
java_exec = @pipeline_config.settings.get_value("pipeline.java_execution")
35+
3436
begin
35-
pipeline_validator =
36-
if @pipeline_config.settings.get_value("pipeline.java_execution")
37-
LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil)
38-
else
39-
agent.exclusive do
40-
# The Ruby pipeline initialization is not thread safe because of the module level
41-
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
42-
# executed simultaneously in different threads and we need to synchronize this initialization.
43-
LogStash::BasePipeline.new(@pipeline_config)
44-
end
45-
end
37+
pipeline_validator = java_exec ? LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil) : LogStash::BasePipeline.new(@pipeline_config)
4638
rescue => e
4739
return LogStash::ConvergeResult::FailedAction.from_exception(e)
4840
end
@@ -62,18 +54,7 @@ def execute(agent, pipelines_registry)
6254
old_pipeline.thread.join
6355

6456
# Then create a new pipeline
65-
new_pipeline =
66-
if @pipeline_config.settings.get_value("pipeline.java_execution")
67-
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
68-
else
69-
agent.exclusive do
70-
# The Ruby pipeline initialization is not thread safe because of the module level
71-
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
72-
# executed simultaneously in different threads and we need to synchronize this initialization.
73-
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
74-
end
75-
end
76-
57+
new_pipeline = java_exec ? LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) : LogStash::Pipeline.new(@pipeline_config, @metric, agent)
7758
success = new_pipeline.start # block until the pipeline is correctly started or crashed
7859

7960
# return success and new_pipeline to registry reload_pipeline

0 commit comments

Comments
 (0)
Please sign in to comment.