-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update processed specs, when children or link is down #517
Conversation
…, but it's spec is not ready yet
842186b
to
c5eec27
Compare
bc340e4
to
270fb9d
Compare
326f10a
to
8b53257
Compare
lib/membrane/pipeline.ex
Outdated
@doc """ | ||
Callback invoked when a child removes its pad. | ||
|
||
Removing child's pad due to return `t:Membrane.Bin.Action.remove_link()` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this whole sentence is hard to understand anyway, let's rephrase it
case Map.fetch(state.links, link_id) do | ||
{:ok, %Link{spec_ref: spec_ref}} -> | ||
state = | ||
update_in( | ||
state, | ||
[:pending_specs, spec_ref, :awaiting_responses], | ||
&MapSet.delete(&1, {link_id, direction}) | ||
) | ||
with %{pending_specs: %{^spec_ref => _spec_data}} <- state do | ||
update_in( | ||
state, | ||
[:pending_specs, spec_ref, :awaiting_responses], | ||
&MapSet.delete(&1, {link_id, direction}) | ||
) | ||
end | ||
|
||
proceed_spec_startup(spec_ref, state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- is there any point of proceeding with a spec startup when it's not pending?
- if so, we can merge the
case
andwith
into a singlewith
|> MapSet.new(& &1.id) | ||
|
||
affected_specs = | ||
state.pending_specs | ||
|> Enum.filter(fn {_ref, spec_data} -> | ||
Enum.any?(spec_data.children_names, &(&1 in children_set)) or | ||
Enum.any?(spec_data.links_ids, &(&1 in children_links_ids_set)) | ||
end) | ||
|
||
updated_specs = | ||
affected_specs | ||
|> Map.new(fn {spec_ref, spec_data} -> | ||
children_names = | ||
spec_data.children_names | ||
|> Enum.reject(&(&1 in children_set)) | ||
|
||
links_ids = Enum.reject(spec_data.links_ids, &(&1 in children_links_ids_set)) | ||
|
||
awaiting_responses = | ||
spec_data.awaiting_responses | ||
|> Enum.reject(fn {link_id, _direction} -> link_id in children_links_ids_set end) | ||
|> MapSet.new() | ||
|
||
dependencies = | ||
spec_data.dependencies | ||
|> update_spec_dependencies(children_set) | ||
|
||
spec_data = %{ | ||
spec_data | ||
| children_names: children_names, | ||
links_ids: links_ids, | ||
awaiting_responses: awaiting_responses, | ||
dependencies: dependencies | ||
} | ||
|
||
{spec_ref, spec_data} | ||
end) | ||
|
||
state = Map.update!(state, :pending_specs, &Map.merge(&1, updated_specs)) | ||
|
||
Enum.reduce(updated_specs, state, fn {spec_ref, _spec_data}, state -> | ||
proceed_spec_startup(spec_ref, state) | ||
end) | ||
end | ||
|
||
@spec remove_link_from_spec(Link.id(), Parent.state()) :: Parent.state() | ||
def remove_link_from_spec(link_id, state) do | ||
{:ok, removed_link} = Map.fetch(state.links, link_id) | ||
spec_ref = removed_link.spec_ref | ||
|
||
with {:ok, spec_data} <- Map.fetch(state.pending_specs, spec_ref) do | ||
links_ids = Enum.reject(spec_data.links_ids, &(&1 == link_id)) | ||
|
||
spec_links_endpoints = | ||
Enum.flat_map(links_ids, fn id -> | ||
link = state.links[id] | ||
[link.from.child, link.to.child] | ||
end) | ||
|
||
dependencies = | ||
[removed_link.from.child, removed_link.to.child] | ||
|> Enum.filter(&(&1 not in spec_links_endpoints)) | ||
|> case do | ||
[] -> | ||
spec_data.dependencies | ||
|
||
endpoints_to_remove -> | ||
spec_data.dependencies | ||
|> update_spec_dependencies(endpoints_to_remove) | ||
end | ||
|
||
awaiting_responses = | ||
spec_data.awaiting_responses | ||
|> MapSet.difference(MapSet.new([{link_id, :input}, {link_id, :output}])) | ||
|
||
spec_data = %{ | ||
spec_data | ||
| dependencies: dependencies, | ||
links_ids: links_ids, | ||
awaiting_responses: awaiting_responses | ||
} | ||
|
||
state = put_in(state, [:pending_specs, spec_ref], spec_data) | ||
proceed_spec_startup(spec_ref, state) | ||
else | ||
:error -> state | ||
end | ||
end | ||
|
||
defp update_spec_dependencies(spec_dependencies, children_removed_from_spec) do | ||
spec_dependencies | ||
|> Enum.map(fn {spec_ref, spec_children} -> | ||
{ | ||
spec_ref, | ||
Enum.reject(spec_children, &(&1 in children_removed_from_spec)) | ||
} | ||
end) | ||
|> Enum.reject(&match?({_ref, []}, &1)) | ||
|> Map.new() | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- don't proceed with a dependent spec even if the dependency is removed
- simplify the code by iterating through children first and making use of spec_ref in child and link data
647747e
to
28f1644
Compare
e8ae474
to
f4de5d3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBC
state.pending_specs | ||
|> Enum.filter(fn {_ref, spec_data} -> | ||
not MapSet.disjoint?(spec_data.children_names, removed_children) or | ||
not MapSet.disjoint?(spec_data.links_ids, removed_links_ids) | ||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can get links specs as stated in the comment above and children specs from children data, so that this won't be needed
state = Map.update!(state, :pending_specs, &Map.merge(&1, updated_specs)) | ||
|
||
updated_specs | ||
|> Enum.map(fn {spec_ref, _spec_data} -> spec_ref end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|> Enum.map(fn {spec_ref, _spec_data} -> spec_ref end) | |
|> Map.keys() |
def remove_link_from_specs(link_id, state) when is_reference(link_id) do | ||
link = Map.fetch!(state.links, link_id) | ||
spec_ref = link.spec_ref | ||
|
||
with %{pending_specs: %{^spec_ref => spec_data}} <- state do | ||
links_ids = MapSet.delete(spec_data.links_ids, link.id) | ||
|
||
awaiting_responses = | ||
spec_data.awaiting_responses | ||
|> Enum.reject(&match?({^link_id, _direction}, &1)) | ||
|> MapSet.new() | ||
|
||
state = | ||
put_in( | ||
state, | ||
[:pending_specs, spec_ref], | ||
%{spec_data | links_ids: links_ids, awaiting_responses: awaiting_responses} | ||
) | ||
|
||
proceed_spec_startup(spec_ref, state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still duplicates the code from remove_children_from_specs
dependent_specs = | ||
dependencies = | ||
state.pending_specs | ||
|> Enum.filter(fn {_ref, data} -> spec_ref in data.dependent_specs end) | ||
|> Enum.filter(fn {_ref, data} -> MapSet.member?(data.dependent_specs, spec_ref) end) | ||
|> Map.new(fn {ref, data} -> | ||
{ref, Map.update!(data, :dependent_specs, &MapSet.delete(&1, spec_ref))} | ||
end) | ||
|
||
state = %{state | pending_specs: Map.merge(state.pending_specs, dependent_specs)} | ||
dependent_specs |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2) | ||
state = %{state | pending_specs: Map.merge(state.pending_specs, dependencies)} | ||
dependencies |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems these are dependent_specs again
lib/membrane/pad.ex
Outdated
@@ -164,6 +164,12 @@ defmodule Membrane.Pad do | |||
(term |> is_tuple and term |> tuple_size == 3 and term |> elem(0) == __MODULE__ and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the above can now be is_static_pad_ref or is_dynamic_pad_ref
lib/membrane/pipeline.ex
Outdated
@doc """ | ||
Callback invoked when a child removes its pad. | ||
|
||
Removing child's pad due to return `t:Membrane.Bin.Action.remove_link()` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this changed, but I still have a hard time understanding this. The doc for this callback in Bin is even more sophisticated since there is a 'parent' and 'child' bin involved and it's hard to determine which is which. Maybe let's focus on when this callback is called and provide some example/s.
lib/membrane/testing/pipeline.ex
Outdated
Child #{inspect(child)} removed it's pad #{inspect(pad)}. If you want to | ||
handle such a scenario, pass `raise_on_child_pad_removed?: false` option to | ||
`Membrane.Testing.Pipeline.start_*/2` or pass there a pipeline module | ||
implementing this callback via `:name` option. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name
option?
fc820bf
to
515984d
Compare
515984d
to
ad72e26
Compare
This reverts commit 924aec4.
Rekated Jira ticket: https://membraneframework.atlassian.net/browse/MC-148