Skip to content

Commit 528112c

Browse files
authoredJan 16, 2019
correctly handle pipeline actions (#10331)
1 parent ce80262 commit 528112c

File tree

3 files changed

+22
-25
lines changed

3 files changed

+22
-25
lines changed
 

‎logstash-core/lib/logstash/pipeline_action/create.rb

+11-10
Original file line numberDiff line numberDiff line change
@@ -31,29 +31,30 @@ def execution_priority
3131
# The execute assume that the thread safety access of the pipeline
3232
# is managed by the caller.
3333
def execute(agent, pipelines)
34-
pipeline =
34+
new_pipeline =
3535
if @pipeline_config.settings.get_value("pipeline.java_execution")
3636
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
3737
else
3838
agent.exclusive do
3939
# The Ruby pipeline initialization is not thread safe because of the module level
4040
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
41-
# executed simultaneously in different threads and we need to synchonize this initialization.
41+
# executed simultaneously in different threads and we need to synchronize this initialization.
4242
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
4343
end
4444
end
4545

46-
status = nil
47-
pipelines.compute(pipeline_id) do |id,value|
48-
if value
49-
LogStash::ConvergeResult::ActionResult.create(self, true)
46+
result = nil
47+
pipelines.compute(pipeline_id) do |_, current_pipeline|
48+
if current_pipeline
49+
result = LogStash::ConvergeResult::FailedAction.new("Attempted to create a pipeline that already exists")
50+
current_pipeline
51+
else
52+
result = new_pipeline.start # block until the pipeline is correctly started or crashed
53+
result ? new_pipeline : nil
5054
end
51-
status = pipeline.start # block until the pipeline is correctly started or crashed
52-
pipeline # The pipeline is successfully started we can add it to the map
5355
end
5456

55-
56-
LogStash::ConvergeResult::ActionResult.create(self, status)
57+
LogStash::ConvergeResult::ActionResult.create(self, result)
5758
end
5859

5960

‎logstash-core/lib/logstash/pipeline_action/reload.rb

+7-10
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ def execute(agent, pipelines)
3434
else
3535
agent.exclusive do
3636
# The Ruby pipeline initialization is not thread safe because of the module level
37-
# shared state in LogsStash::Config::AST. When using multiple pipelines this can gets
38-
# executed simultaneously in different threads and we need to synchonize this initialization.
37+
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
38+
# executed simultaneously in different threads and we need to synchronize this initialization.
3939
LogStash::BasePipeline.new(@pipeline_config)
4040
end
4141
end
@@ -49,15 +49,12 @@ def execute(agent, pipelines)
4949

5050
logger.info("Reloading pipeline", "pipeline.id" => pipeline_id)
5151

52-
pipelines.compute(pipeline_id) do |_,pipeline|
53-
status = Stop.new(pipeline_id).execute(agent, pipelines)
52+
stop_result = Stop.new(pipeline_id).execute(agent, pipelines)
5453

55-
if status
56-
return Create.new(@pipeline_config, @metric).execute(agent, pipelines)
57-
else
58-
return status
59-
end
60-
pipeline
54+
if stop_result.successful?
55+
Create.new(@pipeline_config, @metric).execute(agent, pipelines)
56+
else
57+
stop_result
6158
end
6259
end
6360
end

‎logstash-core/lib/logstash/pipeline_action/stop.rb

+4-5
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@ def initialize(pipeline_id)
1010
end
1111

1212
def execute(agent, pipelines)
13-
pipelines.compute(pipeline_id) do |_,pipeline|
13+
pipelines.compute(pipeline_id) do |_, pipeline|
1414
pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
1515
pipeline.thread.join
16-
nil # delete the pipeline
16+
nil # remove pipeline from pipelines
1717
end
18-
# If we reach this part of the code we have succeeded because
19-
# the shutdown call will block.
20-
return LogStash::ConvergeResult::SuccessfulAction.new
18+
19+
LogStash::ConvergeResult::SuccessfulAction.new
2120
end
2221

2322
def to_s

0 commit comments

Comments
 (0)