Skip to content

Commit

Permalink
Refactor removing children and links, while specs are still proceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Feb 7, 2023
1 parent 16a6832 commit 28f1644
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 100 deletions.
137 changes: 51 additions & 86 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ defmodule Membrane.Core.Parent.ChildLifeController do
| :linked_internally
| :linking_externally
| :ready,
children_names: [Child.name()],
links_ids: [Link.id()],
children_names: MapSet.t(Child.name()),
links_ids: MapSet.t(Link.id()),
awaiting_responses: MapSet.t({Link.id(), Membrane.Pad.direction()}),
dependencies: %{spec_ref() => [Child.name()]}
dependent_specs: MapSet.t(spec_ref())
}

@type pending_specs :: %{spec_ref() => pending_spec()}
Expand Down Expand Up @@ -144,21 +144,21 @@ defmodule Membrane.Core.Parent.ChildLifeController do
resolved_links = LinkUtils.resolve_links(links, spec_ref, state)
state = %{state | links: Map.merge(state.links, Map.new(resolved_links, &{&1.id, &1}))}

dependencies =
dependent_specs =
resolved_links
|> Enum.flat_map(&[&1.from, &1.to])
|> Enum.map(&{&1.child_spec_ref, &1.child})
|> Enum.filter(fn {spec_ref, _child} ->
|> Enum.flat_map(&[&1.from.child_spec_ref, &1.to.child_spec_ref])
|> Enum.uniq()
|> Enum.filter(fn spec_ref ->
get_in(state, [:pending_specs, spec_ref, :status]) in @spec_dependency_requiring_statuses
end)
|> Enum.group_by(fn {spec_ref, _child} -> spec_ref end)
|> MapSet.new()

state =
put_in(state, [:pending_specs, spec_ref], %{
status: :initializing,
children_names: all_children_names,
links_ids: Enum.map(links, & &1.id),
dependencies: dependencies,
children_names: MapSet.new(all_children_names),
links_ids: MapSet.new(links, & &1.id),
dependent_specs: dependent_specs,
awaiting_responses: MapSet.new()
})

Expand Down Expand Up @@ -347,13 +347,13 @@ defmodule Membrane.Core.Parent.ChildLifeController do

defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do
Membrane.Logger.debug(
"Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(Map.keys(spec_data.dependencies))}"
"Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}"
)

%{children: children} = state

if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and
Enum.empty?(spec_data.dependencies) do
Enum.empty?(spec_data.dependent_specs) do
Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized")
do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state)
else
Expand Down Expand Up @@ -536,114 +536,79 @@ defmodule Membrane.Core.Parent.ChildLifeController do
LinkUtils.remove_link(child_name, pad_ref, state)
end

defp remove_children_from_specs(children, state) do
children = Bunch.listify(children)
children_set = MapSet.new(children)
defp remove_children_from_specs(removed_children, state) do
removed_children = Bunch.listify(removed_children) |> MapSet.new()

children_links_ids_set =
Map.values(state.links)
|> Enum.filter(&(&1.from.child in children_set or &1.to.child in children_set))
removed_links_ids =
state.links
|> Map.values()
|> Enum.filter(
&(MapSet.member?(removed_children, &1.from.child) or
MapSet.member?(removed_children, &1.to.child))
)
|> MapSet.new(& &1.id)

affected_specs =
updated_specs =
state.pending_specs
|> Enum.filter(fn {_ref, spec_data} ->
Enum.any?(spec_data.children_names, &(&1 in children_set)) or
Enum.any?(spec_data.links_ids, &(&1 in children_links_ids_set))
not MapSet.disjoint?(spec_data.children_names, removed_children) or
not MapSet.disjoint?(spec_data.links_ids, removed_links_ids)
end)

updated_specs =
affected_specs
|> Map.new(fn {spec_ref, spec_data} ->
children_names =
spec_data.children_names
|> Enum.reject(&(&1 in children_set))
|> MapSet.difference(removed_children)

links_ids = Enum.reject(spec_data.links_ids, &(&1 in children_links_ids_set))
links_ids =
spec_data.links_ids
|> MapSet.difference(removed_links_ids)

awaiting_responses =
spec_data.awaiting_responses
|> Enum.reject(fn {link_id, _direction} -> link_id in children_links_ids_set end)
|> MapSet.new()

dependencies =
spec_data.dependencies
|> update_spec_dependencies(children_set)
|> MapSet.reject(fn {link_id, _direction} ->
MapSet.member?(removed_links_ids, link_id)
end)

spec_data = %{
spec_data
| children_names: children_names,
links_ids: links_ids,
awaiting_responses: awaiting_responses,
dependencies: dependencies
awaiting_responses: awaiting_responses
}

{spec_ref, spec_data}
end)

state = Map.update!(state, :pending_specs, &Map.merge(&1, updated_specs))

Enum.reduce(updated_specs, state, fn {spec_ref, _spec_data}, state ->
proceed_spec_startup(spec_ref, state)
end)
updated_specs
|> Enum.map(fn {spec_ref, _spec_data} -> spec_ref end)
|> Enum.reduce(state, &proceed_spec_startup/2)
end

@spec remove_link_from_spec(Link.id(), Parent.state()) :: Parent.state()
def remove_link_from_spec(link_id, state) do
{:ok, removed_link} = Map.fetch(state.links, link_id)
spec_ref = removed_link.spec_ref

with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do
links_ids = Enum.reject(spec_data.links_ids, &(&1 == link_id))
@spec remove_link_from_specs(Link.id(), Parent.state()) :: Parent.state()
def remove_link_from_specs(link_id, state) when is_reference(link_id) do
link = Map.fetch!(state.links, link_id)
spec_ref = link.spec_ref

spec_links_endpoints =
Enum.flat_map(links_ids, fn id ->
link = state.links[id]
[link.from.child, link.to.child]
end)

dependencies =
[removed_link.from.child, removed_link.to.child]
|> Enum.filter(&(&1 not in spec_links_endpoints))
|> case do
[] ->
spec_data.dependencies

endpoints_to_remove ->
spec_data.dependencies
|> update_spec_dependencies(endpoints_to_remove)
end
with %{pending_specs: %{^spec_ref => spec_data}} <- state do
links_ids = MapSet.delete(spec_data.links_ids, link.id)

awaiting_responses =
spec_data.awaiting_responses
|> MapSet.difference(MapSet.new([{link_id, :input}, {link_id, :output}]))
|> MapSet.reject(&match?({^link_id, _direction}, &1))

spec_data = %{
spec_data
| dependencies: dependencies,
links_ids: links_ids,
awaiting_responses: awaiting_responses
}
state =
put_in(
state,
[:pending_specs, spec_ref],
%{spec_data | links_ids: links_ids, awaiting_responses: awaiting_responses}
)

state = put_in(state, [:pending_specs, spec_ref], spec_data)
proceed_spec_startup(spec_ref, state)
else
:error -> state
end
end

defp update_spec_dependencies(spec_dependencies, children_removed_from_spec) do
spec_dependencies
|> Enum.map(fn {spec_ref, spec_children} ->
{
spec_ref,
Enum.reject(spec_children, &(&1 in children_removed_from_spec))
}
end)
|> Enum.reject(&match?({_ref, []}, &1))
|> Map.new()
end

@spec handle_child_pad_removed(Child.name(), Pad.ref(), Parent.state()) :: Parent.state()
def handle_child_pad_removed(child, pad, state) do
Membrane.Logger.debug_verbose("Child #{inspect(child)} removed pad #{inspect(pad)}")
Expand Down Expand Up @@ -749,9 +714,9 @@ defmodule Membrane.Core.Parent.ChildLifeController do
defp remove_spec_from_dependencies(spec_ref, state) do
dependencies =
state.pending_specs
|> Enum.filter(fn {_ref, data} -> Map.has_key?(data.dependencies, spec_ref) end)
|> Enum.filter(fn {_ref, data} -> MapSet.member?(data.dependent_specs, spec_ref) end)
|> Map.new(fn {ref, data} ->
{ref, Map.update!(data, :dependencies, &Map.delete(&1, spec_ref))}
{ref, Map.update!(data, :dependent_specs, &MapSet.delete(&1, spec_ref))}
end)

state = %{state | pending_specs: Map.merge(state.pending_specs, dependencies)}
Expand Down
28 changes: 14 additions & 14 deletions lib/membrane/core/parent/child_life_controller/link_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do
state
end

state = ChildLifeController.remove_link_from_spec(link.id, state)

delete_link(link, state)
ChildLifeController.remove_link_from_specs(link.id, state)
|> Map.update!(:links, &Map.delete(&1, link.id))
end

@spec remove_link(Child.name(), Pad.ref(), Parent.state()) :: Parent.state()
Expand All @@ -63,8 +62,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do
[link.to, link.from]
|> Enum.reduce(state, &unlink_endpoint/2)

state = ChildLifeController.remove_link_from_spec(link.id, state)
delete_link(link, state)
ChildLifeController.remove_link_from_specs(link.id, state)
|> Map.update!(:links, &Map.delete(&1, link.id))
else
{:error, :not_found} ->
with %{^child_name => _child_entry} <- state.children do
Expand Down Expand Up @@ -186,15 +185,16 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do

defp delete_link(link, state) do
{_link, state} = pop_in(state, [:links, link.id])
spec_ref = link.spec_ref

with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do
new_links_ids = Enum.reject(spec_data.links_ids, &(&1 == link.id))
state = put_in(state, [:pending_specs, spec_ref, :links_ids], new_links_ids)
ChildLifeController.proceed_spec_startup(spec_ref, state)
else
:error -> state
end
state
# spec_ref = link.spec_ref

# with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do
# new_links_ids = Enum.reject(spec_data.links_ids, &(&1 == link.id))
# state = put_in(state, [:pending_specs, spec_ref, :links_ids], new_links_ids)
# ChildLifeController.proceed_spec_startup(spec_ref, state)
# else
# :error -> state
# end
end

defp validate_links(links, state) do
Expand Down

0 comments on commit 28f1644

Please sign in to comment.