Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify 5s linking timeout constraint #810

Merged
merged 12 commits into from
Jun 3, 2024
2 changes: 1 addition & 1 deletion lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ defmodule Membrane.Core.Bin do
end

defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do
PadController.handle_linking_timeout(pad_ref, state)
state = PadController.handle_linking_timeout(pad_ref, state)
{:noreply, state}
end

Expand Down
34 changes: 16 additions & 18 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,10 @@ defmodule Membrane.Core.Bin.PadController do
end

state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options})
state = maybe_handle_pad_added(pad_ref, state)

unless PadModel.get_data!(state, pad_ref, :endpoint) do
# If there's no endpoint associated to the pad, no internal link to the pad
# has been requested in the bin yet
_ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000)
:ok
end
_ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000)

state
maybe_handle_pad_added(pad_ref, state)
end

@spec remove_pad(Pad.ref(), State.t()) :: State.t()
Expand All @@ -102,15 +96,19 @@ defmodule Membrane.Core.Bin.PadController do
end
end

@spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return()
@spec handle_linking_timeout(Pad.ref(), State.t()) :: State.t() | no_return()
def handle_linking_timeout(pad_ref, state) do
case PadModel.get_data(state, pad_ref) do
{:ok, %{endpoint: nil} = pad_data} ->
raise Membrane.LinkError,
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{inspect(pad_data, pretty: true)}"
case Map.fetch(state.linking_timeout_counters, pad_ref) do
{:ok, 1} ->
Map.update!(state, :linking_timeout_counters, &Map.delete(&1, pad_ref))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember we talked about something similar, but couldn't we store the reference to the timer in pad_data and compare it when the timeout arrives?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a good idea, this approach generates some weird race conditions and it's implementation would be far more complex. I don't se any pros of it, only cons

Copy link
Member Author

@FelonEkonom FelonEkonom May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conclusion from our last talk was that we don't want to do this


_other ->
:ok
{:ok, counter} when counter > 1 ->
put_in(state.linking_timeout_counters[pad_ref], counter - 1)

_else ->
raise Membrane.LinkError, """
Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}
"""
end
end

Expand Down Expand Up @@ -316,8 +314,8 @@ defmodule Membrane.Core.Bin.PadController do
end

@spec maybe_handle_pad_added(Pad.ref(), Core.Bin.State.t()) :: Core.Bin.State.t()
defp maybe_handle_pad_added(ref, state) do
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref)
defp maybe_handle_pad_added(pad_ref, state) do
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, pad_ref)

if Pad.availability_mode(availability) == :dynamic do
context = &CallbackContext.from_state(&1, pad_options: pad_opts)
Expand All @@ -326,7 +324,7 @@ defmodule Membrane.Core.Bin.PadController do
:handle_pad_added,
ActionHandler,
%{context: context},
[ref],
[pad_ref],
state
)
else
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ defmodule Membrane.Core.Bin.State do
terminating?: boolean(),
resource_guard: Membrane.ResourceGuard.t(),
setup_incomplete?: boolean(),
linking_timeout_counters: %{optional(Pad.ref()) => integer()},
stalker: Membrane.Core.Stalker.t()
}

Expand Down Expand Up @@ -76,5 +77,6 @@ defmodule Membrane.Core.Bin.State do
resource_guard: nil,
subprocess_supervisor: nil,
children_log_metadata: [],
linking_timeout_counters: %{},
pads_data: nil
end
27 changes: 22 additions & 5 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do

state =
put_in(state, [:pending_specs, spec_ref], %{
status: :initializing,
status: :created,
children_names: MapSet.new(all_children_names),
links_ids: Enum.map(links, & &1.id),
dependent_specs: dependent_specs,
Expand Down Expand Up @@ -309,14 +309,31 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end
end

defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do
state =
with %Bin.State{} <- state do
linking_timeout_counters =
spec_data.links_ids
|> Enum.map(&Map.fetch!(state.links, &1))
|> Enum.flat_map(&[&1.from, &1.to])
|> Enum.filter(&(&1.child == {Membrane.Bin, :itself}))
|> Enum.reduce(
state.linking_timeout_counters,
&Map.update(&2, &1.pad_ref, 1, fn i -> i + 1 end)
)

%{state | linking_timeout_counters: linking_timeout_counters}
end

do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state)
end

defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do
Membrane.Logger.debug(
"Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}"
)

%{children: children} = state

if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and
if Enum.all?(spec_data.children_names, &Map.fetch!(state.children, &1).initialized?) and
Enum.empty?(spec_data.dependent_specs) do
Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized")
do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state)
Expand Down Expand Up @@ -351,7 +368,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end

defp do_proceed_spec_startup(spec_ref, %{status: :linking_internally} = spec_data, state) do
if Enum.empty?(spec_data.awaiting_responses) do
if MapSet.size(spec_data.awaiting_responses) == 0 do
state =
spec_data.links_ids
|> Enum.map(&Map.fetch!(state.links, &1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
# and we will not want to have it in :crash_group_members in the callback context in handle_crash_group_down/3,
# so this child is removed from :members in crash group struct
members = List.delete(group.members, child_name)
state = put_in(state, [:crash_groups, group.name, :members], members)
state = put_in(state.crash_groups[group.name].members, members)

if group.detonating? and Enum.all?(members, &(not Map.has_key?(state.children, &1))) do
cleanup_crash_group(group.name, state)
Expand Down
1 change: 1 addition & 0 deletions test/membrane/integration/linking_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ defmodule Membrane.Integration.LinkingTest do
Membrane.Pipeline.terminate(pipeline)
end

@tag :xd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xd

test "Bin should crash if it doesn't link internally within timeout" do
defmodule NoInternalLinkBin do
use Membrane.Bin
Expand Down