From 270fb9da8664174b59b77e4c170ca7327f864e75 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 26 Jan 2023 18:31:35 +0100 Subject: [PATCH] Remove link from specs, when child removes it's pad --- lib/membrane/core/element/pad_controller.ex | 1 + .../core/parent/child_life_controller.ex | 123 ++++++++++++++++-- .../child_life_controller/link_utils.ex | 59 +++++---- .../integration/child_pad_removed_test.exs | 2 - 4 files changed, 147 insertions(+), 38 deletions(-) diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 1c84927c3..c6d950d41 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -207,6 +207,7 @@ defmodule Membrane.Core.Element.PadController do {:error, :unknown_pad} -> with false <- state.terminating?, + :hard <- mode, %{availability: :always} <- state.pads_info[Pad.name_by_ref(pad_ref)] do raise Membrane.PadError, "Tried to unlink a static pad #{inspect(pad_ref)}, before it was linked. Static pads cannot be unlinked unless element is terminating" diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index b3196453d..9dcf6b3e7 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -144,16 +144,16 @@ defmodule Membrane.Core.Parent.ChildLifeController do resolved_links = LinkUtils.resolve_links(links, spec_ref, state) state = %{state | links: Map.merge(state.links, Map.new(resolved_links, &{&1.id, &1}))} + # resolved_links + # |> Enum.flat_map(&[&1.from.child_spec_ref, &1.to.child_spec_ref]) + # |> Enum.filter(fn spec_ref -> + # get_in(state, [:pending_specs, spec_ref, :status]) in @spec_dependency_requiring_statuses + # end) + # |> MapSet.new() dependent_specs = - # resolved_links - # |> Enum.flat_map(&[&1.from.child_spec_ref, &1.to.child_spec_ref]) - # |> Enum.filter(fn spec_ref -> - # get_in(state, [:pending_specs, spec_ref, :status]) in @spec_dependency_requiring_statuses - # end) - # |> MapSet.new() resolved_links |> Enum.flat_map(&[&1.from, &1.to]) - |> Enum.map(& {&1.child_spec_ref, &1.child}) + |> Enum.map(&{&1.child_spec_ref, &1.child}) |> Enum.filter(fn {spec_ref, _child} -> get_in(state, [:pending_specs, spec_ref, :status]) in @spec_dependency_requiring_statuses end) @@ -494,10 +494,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end @spec handle_remove_children( - Child.ref() - | [Child.ref()] - | Child.group() - | [Child.group()], + Child.ref() | [Child.ref()] | Child.group() | [Child.group()], Parent.state() ) :: Parent.state() def handle_remove_children(children_or_children_groups, state) do @@ -539,6 +536,110 @@ defmodule Membrane.Core.Parent.ChildLifeController do LinkUtils.remove_link(child_name, pad_ref, state) end + defp remove_children_from_specs(children, state) do + children = Bunch.listify(children) + children_set = MapSet.new(children) + + children_links_ids_set = + state.links + |> Enum.filter(&(&1.from.child in children_set or &1.to.child in children_set)) + |> MapSet.new(& &1.id) + + affected_specs = + state.pending_specs + |> Enum.filter(fn {_ref, spec_data} -> + Enum.any?(spec_data.children, &(&1 in children_set)) or + Enum.any?(spec_data.links, &(&1 in children_links_ids_set)) + end) + + updated_specs = + affected_specs + |> Map.new(fn {spec_ref, spec_data} -> + children_names = + spec_data.children_names + |> Enum.reject(&(&1 in children_set)) + + links_ids = Enum.reject(spec_data.links_ids, &(&1 in children_links_ids_set)) + + awaiting_responses = + spec_data.awaiting_responses + |> Enum.reject(fn {link_id, _direction} -> link_id in children_links_ids_set end) + |> MapSet.new() + + dependent_specs = + spec_data.dependent_specs + |> Enum.map(fn {ref, spec_children} -> + {ref, Enum.reject(spec_children, &(&1 in children_set))} + end) + |> Enum.reject(&match?({_ref, []}, &1)) + |> Map.new() + + spec_data = %{ + spec_data + | children_names: children_names, + links_ids: links_ids, + awaiting_responses: awaiting_responses, + dependent_specs: dependent_specs + } + + {spec_ref, spec_data} + end) + + state = Map.update!(state, :pending_specs, &Map.merge(&1, updated_specs)) + + Enum.reduce(updated_specs, state, fn {spec_ref, _spec_data}, state -> + proceed_spec_startup(spec_ref, state) + end) + end + + @spec remove_link_from_spec(Link.id(), Parent.state()) :: Parent.state() + def remove_link_from_spec(link_id, state) do + {:ok, removed_link} = Map.fetch(state.links, link_id) + spec_ref = removed_link.spec_ref + + with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do + links_ids = Enum.reject(spec_data.links_ids, &(&1 == link_id)) + + spec_links_endpoints = + Enum.flat_map(links_ids, fn id -> + link = state.links[id] + [link.from.child, link.to.child] + end) + + dependent_specs = + [removed_link.from.child, removed_link.to.child] + |> Enum.filter(&(&1 not in spec_links_endpoints)) + |> case do + [] -> + spec_data.dependent_specs + + endpoints_to_remove -> + spec_data.dependent_specs + |> Enum.map(fn {spec_ref, spec_children} -> + {spec_ref, Enum.reject(spec_children, &(&1 in endpoints_to_remove))} + end) + |> Enum.reject(&match?({_ref, []}, &1)) + |> Map.new() + end + + awaiting_responses = + spec_data.awaiting_responses + |> MapSet.difference(MapSet.new([{link_id, :input}, {link_id, :output}])) + + spec_data = %{ + spec_data + | dependent_specs: dependent_specs, + links_ids: links_ids, + awaiting_responses: awaiting_responses + } + + state = put_in(state, [:pending_specs, spec_ref], spec_data) + proceed_spec_startup(spec_ref, state) + else + :error -> state + end + end + @spec handle_child_pad_removed(Child.name(), Pad.ref(), Parent.state()) :: Parent.state() def handle_child_pad_removed(child, pad, state) do # TODO: when spec is not ready yet, delete specific link from it and trigger proceeding diff --git a/lib/membrane/core/parent/child_life_controller/link_utils.ex b/lib/membrane/core/parent/child_life_controller/link_utils.ex index b65e19d52..85b49c1d1 100644 --- a/lib/membrane/core/parent/child_life_controller/link_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/link_utils.ex @@ -38,36 +38,42 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do def handle_child_pad_removed(child, pad, state) do {:ok, link} = get_link(state.links, child, pad) - opposite_endpoint(link, child) - |> case do - %Endpoint{child: {Membrane.Bin, :itself}} = bin_endpoint -> - PadController.remove_dynamic_pad!(bin_endpoint.pad_ref, state) + state = + opposite_endpoint(link, child) + |> case do + %Endpoint{child: {Membrane.Bin, :itself}} = bin_endpoint -> + PadController.remove_dynamic_pad!(bin_endpoint.pad_ref, state) - %Endpoint{} = endpoint -> - send_handle_unlink(endpoint, state) - state - end - |> delete_link(link) + %Endpoint{} = endpoint -> + send_handle_unlink(endpoint, state) + state + end + + state = ChildLifeController.remove_link_from_spec(link.id, state) + + delete_link(link, state) end @spec remove_link(Child.name(), Pad.ref(), Parent.state()) :: Parent.state() def remove_link(child_name, pad_ref, state) do with {:ok, link} <- get_link(state.links, child_name, pad_ref) do - if {Membrane.Bin, :itself} in [link.from.child, link.to.child] do - child_endpoint = opposite_endpoint(link, {Membrane.Bin, :itself}) - send_handle_unlink(child_endpoint, state) + state = + if {Membrane.Bin, :itself} in [link.from.child, link.to.child] do + child_endpoint = opposite_endpoint(link, {Membrane.Bin, :itself}) + send_handle_unlink(child_endpoint, state) + + bin_endpoint = opposite_endpoint(link, child_endpoint.child) + PadController.remove_dynamic_pad!(bin_endpoint.pad_ref, state) + else + for endpoint <- [link.from, link.to] do + send_handle_unlink(endpoint, state) + end - bin_endpoint = opposite_endpoint(link, child_endpoint.child) - state = PadController.remove_dynamic_pad!(bin_endpoint.pad_ref, state) - - delete_link(state, link) - else - for endpoint <- [link.from, link.to] do - send_handle_unlink(endpoint, state) + state end - delete_link(state, link) - end + state = ChildLifeController.remove_link_from_spec(link.id, state) + delete_link(link, state) else {:error, :not_found} -> with %{^child_name => _child_entry} <- state.children do @@ -184,9 +190,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do defp opposite_endpoint(%Link{to: %Endpoint{child: child}, from: from}, child), do: from - defp delete_link(state, link) do - links = Map.delete(state.links, link.id) - state = Map.put(state, :links, links) + defp delete_link(link, state) do + {_link, state} = pop_in(state, [:links, link.id]) spec_ref = link.spec_ref with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do @@ -328,7 +333,11 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do end defp send_handle_unlink(%Endpoint{child: child} = endpoint, state) do - mode = if state.children[child].terminating?, do: :soft, else: :hard + mode = + if state.children[child].terminating?, + do: :soft, + else: :hard + Message.send(endpoint.pid, :handle_unlink, [endpoint.pad_ref, mode]) end end diff --git a/test/membrane/integration/child_pad_removed_test.exs b/test/membrane/integration/child_pad_removed_test.exs index 83665c0c0..86d8b00bd 100644 --- a/test/membrane/integration/child_pad_removed_test.exs +++ b/test/membrane/integration/child_pad_removed_test.exs @@ -1,8 +1,6 @@ defmodule Membrane.Integration.ChildPadRemovedTest do use ExUnit.Case, async: false - import Membrane.Testing.Assertions - alias Membrane.Testing require Membrane.Pad, as: Pad