Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.0 clean backport of #10431] remove exclusive lock for Ruby pipeline initialization #10462

Merged
merged 1 commit into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)

# Mutex to synchonize in the exclusive method
# Initial usage for the Ruby pipeline initialization which is not thread safe
@exclusive_lock = Mutex.new
@webserver_control_lock = Mutex.new

# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
Expand Down Expand Up @@ -86,10 +85,6 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
@running = Concurrent::AtomicBoolean.new(false)
end

def exclusive(&block)
@exclusive_lock.synchronize { block.call }
end

def execute
@thread = Thread.current # this var is implicitly used by Stud.stop?
logger.debug("Starting agent")
Expand Down
15 changes: 2 additions & 13 deletions logstash-core/lib/logstash/pipeline_action/create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,11 @@ def execution_priority
# The execute assume that the thread safety access of the pipeline
# is managed by the caller.
def execute(agent, pipelines_registry)
new_pipeline =
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
else
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
# executed simultaneously in different threads and we need to synchronize this initialization.
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
end
end

pipeline_class = @pipeline_config.settings.get_value("pipeline.java_execution") ? LogStash::JavaPipeline : LogStash::Pipeline
new_pipeline = pipeline_class.new(@pipeline_config, @metric, agent)
success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do
new_pipeline.start # block until the pipeline is correctly started or crashed
end

LogStash::ConvergeResult::ActionResult.create(self, success)
end

Expand Down
27 changes: 4 additions & 23 deletions logstash-core/lib/logstash/pipeline_action/reload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,10 @@ def execute(agent, pipelines_registry)
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the existing pipeline is not reloadable")
end

java_exec = @pipeline_config.settings.get_value("pipeline.java_execution")

begin
pipeline_validator =
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil)
else
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
# executed simultaneously in different threads and we need to synchronize this initialization.
LogStash::BasePipeline.new(@pipeline_config)
end
end
pipeline_validator = java_exec ? LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil) : LogStash::BasePipeline.new(@pipeline_config)
rescue => e
return LogStash::ConvergeResult::FailedAction.from_exception(e)
end
Expand All @@ -62,18 +54,7 @@ def execute(agent, pipelines_registry)
old_pipeline.thread.join

# Then create a new pipeline
new_pipeline =
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
else
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
# executed simultaneously in different threads and we need to synchronize this initialization.
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
end
end

new_pipeline = java_exec ? LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) : LogStash::Pipeline.new(@pipeline_config, @metric, agent)
success = new_pipeline.start # block until the pipeline is correctly started or crashed

# return success and new_pipeline to registry reload_pipeline
Expand Down