Skip to content

Commit

Permalink
Remove link from specs, when child removes it's pad
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jan 27, 2023
1 parent f346161 commit 270fb9d
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 38 deletions.
1 change: 1 addition & 0 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ defmodule Membrane.Core.Element.PadController do

{:error, :unknown_pad} ->
with false <- state.terminating?,
:hard <- mode,
%{availability: :always} <- state.pads_info[Pad.name_by_ref(pad_ref)] do
raise Membrane.PadError,
"Tried to unlink a static pad #{inspect(pad_ref)}, before it was linked. Static pads cannot be unlinked unless element is terminating"
Expand Down
123 changes: 112 additions & 11 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,16 @@ defmodule Membrane.Core.Parent.ChildLifeController do
resolved_links = LinkUtils.resolve_links(links, spec_ref, state)
state = %{state | links: Map.merge(state.links, Map.new(resolved_links, &{&1.id, &1}))}

# resolved_links
# |> Enum.flat_map(&[&1.from.child_spec_ref, &1.to.child_spec_ref])
# |> Enum.filter(fn spec_ref ->
# get_in(state, [:pending_specs, spec_ref, :status]) in @spec_dependency_requiring_statuses
# end)
# |> MapSet.new()
dependent_specs =
# resolved_links
# |> Enum.flat_map(&[&1.from.child_spec_ref, &1.to.child_spec_ref])
# |> Enum.filter(fn spec_ref ->
# get_in(state, [:pending_specs, spec_ref, :status]) in @spec_dependency_requiring_statuses
# end)
# |> MapSet.new()
resolved_links
|> Enum.flat_map(&[&1.from, &1.to])
|> Enum.map(& {&1.child_spec_ref, &1.child})
|> Enum.map(&{&1.child_spec_ref, &1.child})
|> Enum.filter(fn {spec_ref, _child} ->
get_in(state, [:pending_specs, spec_ref, :status]) in @spec_dependency_requiring_statuses
end)
Expand Down Expand Up @@ -494,10 +494,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end

@spec handle_remove_children(
Child.ref()
| [Child.ref()]
| Child.group()
| [Child.group()],
Child.ref() | [Child.ref()] | Child.group() | [Child.group()],
Parent.state()
) :: Parent.state()
def handle_remove_children(children_or_children_groups, state) do
Expand Down Expand Up @@ -539,6 +536,110 @@ defmodule Membrane.Core.Parent.ChildLifeController do
LinkUtils.remove_link(child_name, pad_ref, state)
end

defp remove_children_from_specs(children, state) do
children = Bunch.listify(children)
children_set = MapSet.new(children)

children_links_ids_set =
state.links
|> Enum.filter(&(&1.from.child in children_set or &1.to.child in children_set))
|> MapSet.new(& &1.id)

affected_specs =
state.pending_specs
|> Enum.filter(fn {_ref, spec_data} ->
Enum.any?(spec_data.children, &(&1 in children_set)) or
Enum.any?(spec_data.links, &(&1 in children_links_ids_set))
end)

updated_specs =
affected_specs
|> Map.new(fn {spec_ref, spec_data} ->
children_names =
spec_data.children_names
|> Enum.reject(&(&1 in children_set))

links_ids = Enum.reject(spec_data.links_ids, &(&1 in children_links_ids_set))

awaiting_responses =
spec_data.awaiting_responses
|> Enum.reject(fn {link_id, _direction} -> link_id in children_links_ids_set end)
|> MapSet.new()

dependent_specs =
spec_data.dependent_specs
|> Enum.map(fn {ref, spec_children} ->
{ref, Enum.reject(spec_children, &(&1 in children_set))}
end)
|> Enum.reject(&match?({_ref, []}, &1))
|> Map.new()

spec_data = %{
spec_data
| children_names: children_names,
links_ids: links_ids,
awaiting_responses: awaiting_responses,
dependent_specs: dependent_specs
}

{spec_ref, spec_data}
end)

state = Map.update!(state, :pending_specs, &Map.merge(&1, updated_specs))

Enum.reduce(updated_specs, state, fn {spec_ref, _spec_data}, state ->
proceed_spec_startup(spec_ref, state)
end)
end

@spec remove_link_from_spec(Link.id(), Parent.state()) :: Parent.state()
def remove_link_from_spec(link_id, state) do
{:ok, removed_link} = Map.fetch(state.links, link_id)
spec_ref = removed_link.spec_ref

with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do
links_ids = Enum.reject(spec_data.links_ids, &(&1 == link_id))

spec_links_endpoints =
Enum.flat_map(links_ids, fn id ->
link = state.links[id]
[link.from.child, link.to.child]
end)

dependent_specs =
[removed_link.from.child, removed_link.to.child]
|> Enum.filter(&(&1 not in spec_links_endpoints))
|> case do
[] ->
spec_data.dependent_specs

endpoints_to_remove ->
spec_data.dependent_specs
|> Enum.map(fn {spec_ref, spec_children} ->
{spec_ref, Enum.reject(spec_children, &(&1 in endpoints_to_remove))}
end)
|> Enum.reject(&match?({_ref, []}, &1))
|> Map.new()
end

awaiting_responses =
spec_data.awaiting_responses
|> MapSet.difference(MapSet.new([{link_id, :input}, {link_id, :output}]))

spec_data = %{
spec_data
| dependent_specs: dependent_specs,
links_ids: links_ids,
awaiting_responses: awaiting_responses
}

state = put_in(state, [:pending_specs, spec_ref], spec_data)
proceed_spec_startup(spec_ref, state)
else
:error -> state
end
end

@spec handle_child_pad_removed(Child.name(), Pad.ref(), Parent.state()) :: Parent.state()
def handle_child_pad_removed(child, pad, state) do
# TODO: when spec is not ready yet, delete specific link from it and trigger proceeding
Expand Down
59 changes: 34 additions & 25 deletions lib/membrane/core/parent/child_life_controller/link_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,42 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do
def handle_child_pad_removed(child, pad, state) do
{:ok, link} = get_link(state.links, child, pad)

opposite_endpoint(link, child)
|> case do
%Endpoint{child: {Membrane.Bin, :itself}} = bin_endpoint ->
PadController.remove_dynamic_pad!(bin_endpoint.pad_ref, state)
state =
opposite_endpoint(link, child)
|> case do
%Endpoint{child: {Membrane.Bin, :itself}} = bin_endpoint ->
PadController.remove_dynamic_pad!(bin_endpoint.pad_ref, state)

%Endpoint{} = endpoint ->
send_handle_unlink(endpoint, state)
state
end
|> delete_link(link)
%Endpoint{} = endpoint ->
send_handle_unlink(endpoint, state)
state
end

state = ChildLifeController.remove_link_from_spec(link.id, state)

delete_link(link, state)
end

@spec remove_link(Child.name(), Pad.ref(), Parent.state()) :: Parent.state()
def remove_link(child_name, pad_ref, state) do
with {:ok, link} <- get_link(state.links, child_name, pad_ref) do
if {Membrane.Bin, :itself} in [link.from.child, link.to.child] do
child_endpoint = opposite_endpoint(link, {Membrane.Bin, :itself})
send_handle_unlink(child_endpoint, state)
state =
if {Membrane.Bin, :itself} in [link.from.child, link.to.child] do
child_endpoint = opposite_endpoint(link, {Membrane.Bin, :itself})
send_handle_unlink(child_endpoint, state)

bin_endpoint = opposite_endpoint(link, child_endpoint.child)
PadController.remove_dynamic_pad!(bin_endpoint.pad_ref, state)
else
for endpoint <- [link.from, link.to] do
send_handle_unlink(endpoint, state)
end

bin_endpoint = opposite_endpoint(link, child_endpoint.child)
state = PadController.remove_dynamic_pad!(bin_endpoint.pad_ref, state)

delete_link(state, link)
else
for endpoint <- [link.from, link.to] do
send_handle_unlink(endpoint, state)
state
end

delete_link(state, link)
end
state = ChildLifeController.remove_link_from_spec(link.id, state)
delete_link(link, state)
else
{:error, :not_found} ->
with %{^child_name => _child_entry} <- state.children do
Expand Down Expand Up @@ -184,9 +190,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do

defp opposite_endpoint(%Link{to: %Endpoint{child: child}, from: from}, child), do: from

defp delete_link(state, link) do
links = Map.delete(state.links, link.id)
state = Map.put(state, :links, links)
defp delete_link(link, state) do
{_link, state} = pop_in(state, [:links, link.id])
spec_ref = link.spec_ref

with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do
Expand Down Expand Up @@ -328,7 +333,11 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do
end

defp send_handle_unlink(%Endpoint{child: child} = endpoint, state) do
mode = if state.children[child].terminating?, do: :soft, else: :hard
mode =
if state.children[child].terminating?,
do: :soft,
else: :hard

Message.send(endpoint.pid, :handle_unlink, [endpoint.pad_ref, mode])
end
end
2 changes: 0 additions & 2 deletions test/membrane/integration/child_pad_removed_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
defmodule Membrane.Integration.ChildPadRemovedTest do
use ExUnit.Case, async: false

import Membrane.Testing.Assertions

alias Membrane.Testing

require Membrane.Pad, as: Pad
Expand Down

0 comments on commit 270fb9d

Please sign in to comment.