Skip to content

Commit

Permalink
Implement suggestion from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jul 28, 2023
1 parent 9614b65 commit 332a724
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 80 deletions.
12 changes: 4 additions & 8 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,11 @@ defmodule Membrane.Core.Bin.PadController do
SpecificationParser.raw_endpoint(),
SpecificationParser.raw_endpoint(),
%{
stream_format_validation_params:
optional(:input_pad_info) => PadModel.pad_info() | nil,
optional(:link_metadata) => map(),
:stream_format_validation_params =>
StreamFormatController.stream_format_validation_params()
}
| %{
other_info: PadModel.pad_info() | nil,
link_metadata: map,
stream_format_validation_params:
StreamFormatController.stream_format_validation_params()
},
},
Core.Bin.State.t()
) :: {Core.Element.PadController.link_call_reply(), Core.Bin.State.t()}
def handle_link(direction, endpoint, other_endpoint, params, state) do
Expand Down
119 changes: 57 additions & 62 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,14 @@ defmodule Membrane.Core.Element.PadController do
require Membrane.Logger
require Membrane.Pad, as: Pad

@type link_call_props ::
%{
stream_format_validation_params:
StreamFormatController.stream_format_validation_params()
}
| %{
other_info: PadModel.pad_info() | nil,
link_metadata: %{},
stream_format_validation_params:
StreamFormatController.stream_format_validation_params(),
other_effective_flow_control: EffectiveFlowController.effective_flow_control()
}
@type link_call_props :: %{
optional(:output_pad_info) => PadModel.pad_info() | nil,
optional(:link_metadata) => map(),
optional(:output_effective_flow_control) =>
EffectiveFlowController.effective_flow_control(),
:stream_format_validation_params =>
StreamFormatController.stream_format_validation_params()
}

@type link_call_reply_props ::
{Endpoint.t(), PadModel.pad_info(), %{atomic_demand: AtomicDemand.t()}}
Expand Down Expand Up @@ -78,35 +74,35 @@ defmodule Membrane.Core.Element.PadController do
do_handle_link(direction, endpoint, other_endpoint, pad_info, link_props, state)
end

defp do_handle_link(:output, endpoint, other_endpoint, pad_info, props, state) do
defp do_handle_link(:output, endpoint, input_endpoint, pad_info, link_props, state) do
pad_effective_flow_control =
EffectiveFlowController.get_pad_effective_flow_control(endpoint.pad_ref, state)

handle_link_response =
Message.call(other_endpoint.pid, :handle_link, [
Pad.opposite_direction(pad_info.direction),
other_endpoint,
Message.call(input_endpoint.pid, :handle_link, [
:input,
input_endpoint,
endpoint,
%{
other_info: pad_info,
output_pad_info: pad_info,
link_metadata: %{
observability_data: Stalker.generate_observability_data_for_link(endpoint.pad_ref)
},
stream_format_validation_params: [],
other_effective_flow_control: pad_effective_flow_control
output_effective_flow_control: pad_effective_flow_control
}
])

case handle_link_response do
{:ok, {other_endpoint, other_info, link_metadata}} ->
{:ok, {input_endpoint, input_pad_info, link_metadata}} ->
state =
init_pad_data(
endpoint,
other_endpoint,
input_endpoint,
pad_info,
props.stream_format_validation_params,
link_props.stream_format_validation_params,
:push,
other_info,
input_pad_info,
link_metadata,
state
)
Expand All @@ -116,7 +112,7 @@ defmodule Membrane.Core.Element.PadController do

{:error, {:call_failure, reason}} ->
Membrane.Logger.debug("""
Tried to link pad #{inspect(endpoint.pad_ref)}, but neighbour #{inspect(other_endpoint.child)}
Tried to link pad #{inspect(endpoint.pad_ref)}, but neighbour #{inspect(input_endpoint.child)}
is not alive.
""")

Expand All @@ -130,18 +126,15 @@ defmodule Membrane.Core.Element.PadController do
end
end

defp do_handle_link(:input, endpoint, other_endpoint, pad_info, link_props, state) do
defp do_handle_link(:input, input_endpoint, output_endpoint, pad_info, link_props, state) do
%{
other_info: other_info,
output_pad_info: output_pad_info,
link_metadata: link_metadata,
stream_format_validation_params: stream_format_validation_params,
other_effective_flow_control: other_effective_flow_control
output_effective_flow_control: output_effective_flow_control
} = link_props

if pad_info.direction != :input,
do: raise("pad direction #{inspect(pad_info.direction)} is wrong")

{output_demand_unit, input_demand_unit} = resolve_demand_units(other_info, pad_info)
{output_demand_unit, input_demand_unit} = resolve_demand_units(output_pad_info, pad_info)

link_metadata =
Map.merge(link_metadata, %{
Expand All @@ -150,24 +143,24 @@ defmodule Membrane.Core.Element.PadController do
})

pad_effective_flow_control =
EffectiveFlowController.get_pad_effective_flow_control(endpoint.pad_ref, state)
EffectiveFlowController.get_pad_effective_flow_control(input_endpoint.pad_ref, state)

atomic_demand =
AtomicDemand.new(%{
receiver_effective_flow_control: pad_effective_flow_control,
receiver_process: self(),
receiver_demand_unit: input_demand_unit || :buffers,
sender_process: other_endpoint.pid,
sender_pad_ref: other_endpoint.pad_ref,
sender_process: output_endpoint.pid,
sender_pad_ref: output_endpoint.pad_ref,
supervisor: state.subprocess_supervisor,
toilet_capacity: endpoint.pad_props[:toilet_capacity],
throttling_factor: endpoint.pad_props[:throttling_factor]
toilet_capacity: input_endpoint.pad_props[:toilet_capacity],
throttling_factor: input_endpoint.pad_props[:throttling_factor]
})

Stalker.register_link(
state.stalker,
endpoint.pad_ref,
other_endpoint.pad_ref,
input_endpoint.pad_ref,
output_endpoint.pad_ref,
link_metadata.observability_data
)

Expand All @@ -177,33 +170,33 @@ defmodule Membrane.Core.Element.PadController do
atomic_demand: atomic_demand,
observability_data:
Stalker.generate_observability_data_for_link(
endpoint.pad_ref,
input_endpoint.pad_ref,
link_metadata.observability_data
)
})

:ok =
Child.PadController.validate_pads_flow_control_compability!(
other_endpoint.pad_ref,
other_info.flow_control,
endpoint.pad_ref,
output_endpoint.pad_ref,
output_pad_info.flow_control,
input_endpoint.pad_ref,
pad_info.flow_control
)

state =
init_pad_data(
endpoint,
other_endpoint,
input_endpoint,
output_endpoint,
pad_info,
stream_format_validation_params,
other_effective_flow_control,
other_info,
output_effective_flow_control,
output_pad_info,
link_metadata,
state
)

state =
case PadModel.get_data!(state, endpoint.pad_ref) do
case PadModel.get_data!(state, input_endpoint.pad_ref) do
%{flow_control: :auto, direction: :input} = pad_data ->
EffectiveFlowController.handle_sender_effective_flow_control(
pad_data.ref,
Expand All @@ -215,9 +208,9 @@ defmodule Membrane.Core.Element.PadController do
state
end

state = maybe_handle_pad_added(endpoint.pad_ref, state)
state = maybe_handle_pad_added(input_endpoint.pad_ref, state)

{{:ok, {endpoint, pad_info, link_metadata}}, state}
{{:ok, {input_endpoint, pad_info, link_metadata}}, state}
end

@doc """
Expand Down Expand Up @@ -279,7 +272,7 @@ defmodule Membrane.Core.Element.PadController do
pad_info,
stream_format_validation_params,
other_effective_flow_control,
other_info,
other_pad_info,
metadata,
state
) do
Expand Down Expand Up @@ -318,7 +311,7 @@ defmodule Membrane.Core.Element.PadController do
}
})
|> merge_pad_direction_data(metadata, state)
|> merge_pad_mode_data(endpoint.pad_props, other_info, state)
|> merge_pad_mode_data(endpoint.pad_props, other_pad_info, state)
|> then(&struct!(Membrane.Element.PadData, &1))

state = put_in(state, [:pads_data, endpoint.pad_ref], pad_data)
Expand Down Expand Up @@ -373,8 +366,8 @@ defmodule Membrane.Core.Element.PadController do

defp merge_pad_mode_data(
%{direction: :input, flow_control: :manual} = pad_data,
props,
other_info,
pad_props,
other_pad_info,
%State{}
) do
%{
Expand All @@ -385,12 +378,12 @@ defmodule Membrane.Core.Element.PadController do

input_queue =
InputQueue.init(%{
inbound_demand_unit: other_info[:demand_unit] || this_demand_unit,
inbound_demand_unit: other_pad_info[:demand_unit] || this_demand_unit,
outbound_demand_unit: this_demand_unit,
atomic_demand: atomic_demand,
pad_ref: ref,
log_tag: inspect(ref),
target_size: props.target_queue_size
target_size: pad_props.target_queue_size
})

pad_data
Expand All @@ -402,16 +395,16 @@ defmodule Membrane.Core.Element.PadController do

defp merge_pad_mode_data(
%{direction: :output, flow_control: :manual} = pad_data,
_props,
_other_info,
_pad_props,
_other_pad_info,
_state
) do
Map.put(pad_data, :demand, 0)
end

defp merge_pad_mode_data(
%{flow_control: :auto, direction: direction} = pad_data,
props,
pad_props,
_other_info,
%State{} = state
) do
Expand All @@ -426,8 +419,8 @@ defmodule Membrane.Core.Element.PadController do
direction == :output ->
nil

props.auto_demand_size != nil ->
props.auto_demand_size
pad_props.auto_demand_size != nil ->
pad_props.auto_demand_size

true ->
demand_unit = pad_data.other_demand_unit || pad_data.demand_unit || :buffers
Expand All @@ -438,11 +431,13 @@ defmodule Membrane.Core.Element.PadController do
demand_metric =
if direction == :input do
:atomics.new(1, [])
|> tap(
&Stalker.register_metric_function(:auto_demand_size, fn -> :atomics.get(&1, 1) end,
|> tap(fn atomic ->
Stalker.register_metric_function(
:auto_demand_size,
fn -> :atomics.get(atomic, 1) end,
pad: pad_data.ref
)
)
end)
end

pad_data
Expand Down
8 changes: 6 additions & 2 deletions test/membrane/core/element/pad_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ defmodule Membrane.Core.Element.PadControllerTest do
pad_props: %{options: [], toilet_capacity: nil, throttling_factor: nil}
},
%{
other_info: %{direction: :output, flow_control: :manual, demand_unit: :buffers},
output_pad_info: %{
direction: :output,
flow_control: :manual,
demand_unit: :buffers
},
link_metadata: %{toilet: make_ref(), observability_data: %{path: ""}},
stream_format_validation_params: [],
other_effective_flow_control: :pull
output_effective_flow_control: :pull
},
state
)
Expand Down
16 changes: 8 additions & 8 deletions test/membrane/core/element_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ defmodule Membrane.Core.ElementTest do
%Endpoint{pad_spec: :output, pad_ref: :output, pad_props: %{options: []}, child: :this},
output_other_endpoint,
%{
other_info: other_info,
# other_info: other_info,
link_metadata: %{atomic_demand: output_atomic_demand, observability_data: %{path: ""}},
stream_format_validation_params: [],
other_effective_flow_control: :pull
stream_format_validation_params: []
# other_effective_flow_control: :pull
}
]),
nil,
Expand All @@ -160,10 +160,10 @@ defmodule Membrane.Core.ElementTest do
},
%Endpoint{pad_spec: :output, pad_ref: :output, pid: self(), child: :other},
%{
other_info: %{direction: :output, flow_control: :manual},
output_pad_info: %{direction: :output, flow_control: :manual},
link_metadata: %{toilet: nil, observability_data: %{path: ""}},
stream_format_validation_params: [],
other_effective_flow_control: :pull
output_effective_flow_control: :pull
}
]),
nil,
Expand Down Expand Up @@ -270,14 +270,14 @@ defmodule Membrane.Core.ElementTest do
pad_props: %{options: [], toilet_capacity: nil, throttling_factor: nil}
},
%{
other_info: %{
direction: :input,
output_pad_info: %{
direction: :output,
demand_unit: :buffers,
flow_control: :manual
},
link_metadata: %{observability_data: %{path: ""}},
stream_format_validation_params: [],
other_effective_flow_control: :pull
output_effective_flow_control: :pull
}
]),
nil,
Expand Down

0 comments on commit 332a724

Please sign in to comment.