Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Membrane.Pipeline.terminate/2 behavior #519

Merged
merged 6 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 56 additions & 24 deletions lib/membrane/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule Membrane.Pipeline do
use Bunch

alias __MODULE__.{Action, CallbackContext}
alias Membrane.{Child, Pad}
alias Membrane.{Child, Pad, PipelineError}

require Membrane.Logger
require Membrane.Core.Message, as: Message
Expand Down Expand Up @@ -295,43 +295,75 @@ defmodule Membrane.Pipeline do
end

@doc """
Gracefully terminates the pipeline.

Accepts two options:
* `blocking?` - tells whether to stop the pipeline synchronously
* `timeout` - if `blocking?` is set to true it tells how much
time (ms) to wait for pipeline to get terminated. Defaults to 5000.
Terminates the pipeline.

Accepts three options:
* `asynchronous?` - if set to `true`, pipline termination won't be blocking and
varsill marked this conversation as resolved.
Show resolved Hide resolved
will be executed in the process, which pid is returned as function result. If
set to `false`, pipeline termination will be blocking and will be executed in
the process that called this function. Defaults to `false`.
* `timeout` - tells how much time (ms) to wait for pipeline to get gracefully
terminated. Defaults to 5000.
* `force?` - if set to `true` and pipeline is still alive after `timeout`,
pipeline will be killed using `Process.exit/2` with reason `:kill`, and function
will return `{:error, :timeout}`. If set to `false` and pipeline is still alive
after `timeout`, function will raise an error. Defaults to `false`.

Returns:
* `{:ok, pid}` - if option `asynchronous?: true` was passed.
* `:ok` - if pipeline was gracefully terminated within `timeout`.
* `{:error, :timeout}` - if pipeline was killed after a `timeout`.
"""
@spec terminate(pipeline :: pid, Keyword.t()) ::
:ok | {:error, :timeout}
@spec terminate(pipeline :: pid, timeout: timeout(), force?: boolean(), asynchronous?: boolean()) ::
:ok | {:ok, pid()} | {:error, :timeout}
def terminate(pipeline, opts \\ []) do
blocking? = Keyword.get(opts, :blocking?, false)
timeout = Keyword.get(opts, :timeout, 5000)

ref = if blocking?, do: Process.monitor(pipeline)
Message.send(pipeline, :terminate)
[asynchronous?: asynchronous?] ++ opts =
Keyword.validate!(opts,
asynchronous?: false,
force?: false,
timeout: 5000
)
|> Enum.sort()

if(blocking?,
do: wait_for_down(ref, timeout),
else: :ok
)
if asynchronous? do
Task.start(__MODULE__, :do_terminate, [pipeline, opts])
else
do_terminate(pipeline, opts)
end
end

@spec call(pid, any, integer()) :: :ok
def call(pipeline, message, timeout \\ 5000) do
GenServer.call(pipeline, message, timeout)
end
@doc false
@spec do_terminate(pipeline :: pid, timeout: timeout(), force?: boolean()) ::
:ok | {:error, :timeout}
def do_terminate(pipeline, opts) do
timeout = Keyword.get(opts, :timeout)
force? = Keyword.get(opts, :force?)

ref = Process.monitor(pipeline)
Message.send(pipeline, :terminate)

defp wait_for_down(ref, timeout) do
receive do
{:DOWN, ^ref, _process, _pid, _reason} ->
:ok
after
timeout ->
{:error, :timeout}
if force? do
Process.exit(pipeline, :kill)
{:error, :timeout}
else
raise PipelineError, """
Pipeline #{inspect(pipeline)} hasn't terminated within given timeout (#{inspect(timeout)} ms).
If you want to kill it anyway, use `force?: true` option.
"""
end
end
end

@spec call(pid, any, timeout()) :: :ok
def call(pipeline, message, timeout \\ 5000) do
GenServer.call(pipeline, message, timeout)
end

@doc """
Checks whether module is a pipeline.
"""
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/rc_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ defmodule Membrane.RCPipeline do
def handle_terminate_request(_ctx, state) do
pipeline_event = %Terminated{from: self()}
send_event_to_controller_if_subscribed(pipeline_event, state)
{[], state}
{[terminate: :normal], state}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's odd that that one wasn't terminating :O

end

defp send_event_to_controller_if_subscribed(message, state) do
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/core/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ defmodule Membrane.Core.PipelineTest do

test "Pipeline can be terminated synchronously" do
pid = Testing.Pipeline.start_link_supervised!(module: TestPipeline)
assert :ok == Testing.Pipeline.terminate(pid, blocking?: true)
assert :ok == Testing.Pipeline.terminate(pid)
end

test "Pipeline should be able to terminate itself with :terminate action" do
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
end
end)

Pipeline.terminate(pipeline, blocking?: true)
Pipeline.terminate(pipeline)
refute_sink_buffer(pipeline, :sink, _buffer, 0)
end

Expand Down
2 changes: 1 addition & 1 deletion test/membrane/integration/bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ defmodule Membrane.Core.BinTest do
refute is_nil(clock2)

assert proxy_for?(clock1, clock2)
ClockPipeline.terminate(pid, blocking?: true)
ClockPipeline.terminate(pid)
end

test "handle_parent_notification/3 works for Bin" do
Expand Down
12 changes: 9 additions & 3 deletions test/membrane/integration/child_removal_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ defmodule Membrane.Integration.ChildRemovalTest do
)

monitor = Process.monitor(pipeline)
Testing.Pipeline.terminate(pipeline)
Testing.Pipeline.terminate(pipeline, asynchronous?: true)
Process.sleep(100)

assert %{module: Membrane.Core.Pipeline.Zombie} = :sys.get_state(pipeline)
send(RemovalDeferSource, :terminate)
send(RemovalDeferSink, :terminate)
Expand All @@ -209,7 +211,9 @@ defmodule Membrane.Integration.ChildRemovalTest do
)

monitor = Process.monitor(pipeline)
Testing.Pipeline.terminate(pipeline)
Testing.Pipeline.terminate(pipeline, asynchronous?: true)
Process.sleep(100)

assert %{module: Membrane.Core.Pipeline.Zombie} = :sys.get_state(pipeline)
assert_receive {RemovalDeferBin, :terminate_request}
assert %{module: Membrane.Core.Bin.Zombie} = :sys.get_state(RemovalDeferBin)
Expand All @@ -227,7 +231,9 @@ defmodule Membrane.Integration.ChildRemovalTest do
)

pipeline_monitor = Process.monitor(pipeline)
Testing.Pipeline.terminate(pipeline)
Testing.Pipeline.terminate(pipeline, asynchronous?: true)
Process.sleep(100)

assert %{module: Membrane.Core.Pipeline.Zombie} = :sys.get_state(pipeline)
assert_receive {RemovalDeferBin, :terminate_request}
assert %{module: RemovalDeferBin} = :sys.get_state(RemovalDeferBin)
Expand Down
10 changes: 5 additions & 5 deletions test/membrane/integration/child_spawn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ defmodule Membrane.Integration.ChildSpawnTest do
Testing.Pipeline.execute_actions(pipeline_pid, spec: spec2)
assert_receive {:DOWN, ^pipeline_ref, :process, ^pipeline_pid, {reason, _stack_trace}}
assert reason.message =~ ~r/Cannot create children groups with ids: \[:source\]/
Testing.Pipeline.terminate(pipeline_pid, blocking?: true)
Testing.Pipeline.terminate(pipeline_pid)
end

test "if the pipeline raises an exception when a children group with the same name as an exisiting child is added" do
Expand All @@ -169,7 +169,7 @@ defmodule Membrane.Integration.ChildSpawnTest do
Testing.Pipeline.execute_actions(pipeline_pid, spec: spec2)
assert_receive {:DOWN, ^pipeline_ref, :process, ^pipeline_pid, {reason, _stack_trace}}
assert reason.message =~ ~r/Cannot spawn children with names: \[:first_group\]/
Testing.Pipeline.terminate(pipeline_pid, blocking?: true)
Testing.Pipeline.terminate(pipeline_pid)
end

test "if the pipeline raises an exception when a children group and a child with the same names are added" do
Expand All @@ -186,7 +186,7 @@ defmodule Membrane.Integration.ChildSpawnTest do
assert reason.message =~
~r/Cannot proceed, since the children group ids and children names created in this process are duplicating: \[:first_group\]/

Testing.Pipeline.terminate(pipeline_pid, blocking?: true)
Testing.Pipeline.terminate(pipeline_pid)
end

test "if children can be spawned anonymously" do
Expand All @@ -195,7 +195,7 @@ defmodule Membrane.Integration.ChildSpawnTest do
Testing.Pipeline.execute_actions(pipeline_pid, spec: spec)
assert_pipeline_play(pipeline_pid)

Testing.Pipeline.terminate(pipeline_pid, blocking?: true)
Testing.Pipeline.terminate(pipeline_pid)
end

test "if the pipeline raises an exception when there is an attempt to spawn a child with a name satisfying the Membrane's reserved pattern" do
Expand All @@ -217,6 +217,6 @@ defmodule Membrane.Integration.ChildSpawnTest do
assert reason.message =~
~r/Improper name: {Membrane.Child, :first_group, :source}/

Testing.Pipeline.terminate(pipeline_pid, blocking?: true)
Testing.Pipeline.terminate(pipeline_pid)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ defmodule Membrane.Integration.ElementsCompatibilityTest do
end

assert Enum.join(Utilities.buffer()) == state_dump |> Enum.reverse() |> Enum.join()
Pipeline.terminate(pid, blocking?: true)
Pipeline.terminate(pid)
end

test "if sink demanding in bytes receives all the data and no more then demanded number of bytes at once" do
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/integration/linking_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ defmodule Membrane.Integration.LinkingTest do
)

assert_end_of_stream(pipeline, :sink)
Membrane.Pipeline.terminate(pipeline, blocking?: true)
Membrane.Pipeline.terminate(pipeline)
end

test "Bin should crash if it doesn't link internally within timeout" do
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/integration/sync_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Membrane.Integration.SyncTest do
pipeline = Testing.Pipeline.start_link_supervised!(pipeline_opts)
assert_pipeline_notified(pipeline, :source, :start_timer)
Process.sleep(tick_interval * tries)
Testing.Pipeline.terminate(pipeline, blocking?: true)
Testing.Pipeline.terminate(pipeline)
ticks_amount = Sync.Helper.receive_ticks()
assert_in_delta ticks_amount, tries, @tick_number_error
end
Expand Down
4 changes: 2 additions & 2 deletions test/membrane/integration/timer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Membrane.Integration.TimerTest do
assert_pipeline_notified(pipeline, :element, :tick)
assert_pipeline_notified(pipeline, :bin, :tick)
assert_receive :pipeline_tick
Testing.Pipeline.terminate(pipeline, blocking?: true)
Testing.Pipeline.terminate(pipeline)
end

defmodule StopNoInterval do
Expand All @@ -89,6 +89,6 @@ defmodule Membrane.Integration.TimerTest do

assert_pipeline_play(pipeline)
assert_pipeline_notified(pipeline, :element, :ok)
Testing.Pipeline.terminate(pipeline, blocking?: true)
Testing.Pipeline.terminate(pipeline)
end
end
2 changes: 1 addition & 1 deletion test/membrane/remote_controlled/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ defmodule Membrane.RCPipelineTest do
assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :c, pad: :input}

# STOP
RCPipeline.terminate(pipeline, blocking?: true)
RCPipeline.terminate(pipeline)

# TEST
refute_receive %RCMessage.Terminated{from: ^pipeline}
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/resource_guard_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ defmodule Membrane.ResourceGuardTest do

assert :ready = Membrane.Pipeline.call(pipeline, :setup_guard)
monitor_ref = Process.monitor(:membrane_resource_guard_test_pipeline_resource)
Membrane.Pipeline.terminate(pipeline, blocking?: true)
Membrane.Pipeline.terminate(pipeline)
assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown}
end

Expand Down