Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send end of stream, even if there was no start of stream before #577

Merged
merged 17 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Remove `assert_pipeline_play/2` from `Membrane.Testing.Assertions`. [#528](https://github.com/membraneframework/membrane_core/pull/528)
* Make sure enumerable with all elements being `Membrane.Buffer.t()`, passed as `:output` parameter for `Membrane.Testing.Source` won't get rewrapped in `Membrane.Buffer.t()` struct.
* Implement `Membrane.Debug.Filter` and `Membrane.Debug.Sink`. [#552](https://github.com/membraneframework/membrane_core/pull/552)
* Send `:end_of_stream`, even if it is not preceded by `:start_of_stream`. [#557](https://github.com/membraneframework/membrane_core/pull/577)

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
Expand All @@ -31,9 +32,9 @@
* New `spec` action syntax - the structure of pipeline is now defined with the use of `Membrane.ChildrenSpec`
* Rename `:caps` to `:stream_format`.
* Use Elixir patterns as `:accepted_format` in pad definition.
* Delete `:ok` from tuples returned from callbacks
* Remove `:type` from specs passed to `def_options/1` macro in bins and elements.
* Add `Membrane.Testing.MockResourceGuard`
* Delete `:ok` from tuples returned from callbacks.
* Remove `:type` from specs passed to `def_options/1` macro in bins and elements.
* Add `Membrane.Testing.MockResourceGuard`.

## 0.10.0
* Remove all deprecated stuff [#399](https://github.com/membraneframework/membrane_core/pull/399)
Expand Down
6 changes: 5 additions & 1 deletion lib/membrane/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ defmodule Membrane.Bin.CallbackContext do
Field `:pad_options` is present only in `c:Membrane.Bin.handle_pad_added/3`
and `c:Membrane.Bin.handle_pad_removed/3`.

Field `:start_of_stream_received?` is present only in
`c:Membrane.Bin.handle_element_end_of_stream/4`.

Fields `:members` and `:crash_initiator` are present only in
`c:Membrane.Pipeline.handle_crash_group_down/3`.
"""
Expand All @@ -23,6 +26,7 @@ defmodule Membrane.Bin.CallbackContext do
:utility_supervisor => Membrane.UtilitySupervisor.t(),
optional(:pad_options) => map(),
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name()
optional(:crash_initiator) => Membrane.Child.name(),
optional(:start_of_stream_received?) => boolean()
}
end
3 changes: 2 additions & 1 deletion lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,15 @@ defmodule Membrane.Core.Bin do
end

defp do_handle_info(
Message.new(:stream_management_event, [element_name, pad_ref, event]),
Message.new(:stream_management_event, [element_name, pad_ref, event, event_params]),
state
) do
state =
Parent.LifecycleController.handle_stream_management_event(
event,
element_name,
pad_ref,
event_params,
state
)

Expand Down
5 changes: 4 additions & 1 deletion lib/membrane/core/bin/callback_context.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
defmodule Membrane.Core.Bin.CallbackContext do
@moduledoc false

@type optional_fields :: [pad_options: map()]
@type optional_fields ::
[pad_options: map()]
| [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()]
| [start_of_stream_received?: boolean()]

@spec from_state(Membrane.Core.Bin.State.t(), optional_fields()) ::
Membrane.Bin.CallbackContext.t()
Expand Down
8 changes: 4 additions & 4 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ defmodule Membrane.Core.Element.ActionHandler do

if Event.event?(event) do
%{pid: pid, other_ref: other_ref} = PadModel.get_data!(state, pad_ref)
state = handle_event(pad_ref, event, state)
state = handle_outgoing_event(pad_ref, event, state)
Message.send(pid, :event, event, for_pad: other_ref)
state
else
Expand All @@ -457,8 +457,8 @@ defmodule Membrane.Core.Element.ActionHandler do
end
end

@spec handle_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_event(pad_ref, %Events.EndOfStream{}, state) do
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
state = PadController.remove_pad_associations(pad_ref, state)
PadModel.set_data!(state, pad_ref, :end_of_stream?, true)
Expand All @@ -471,5 +471,5 @@ defmodule Membrane.Core.Element.ActionHandler do
end
end

defp handle_event(_pad_ref, _event, state), do: state
defp handle_outgoing_event(_pad_ref, _event, state), do: state
end
1 change: 1 addition & 0 deletions lib/membrane/core/element/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Membrane.Core.Element.CallbackContext do
[incoming_demand: non_neg_integer()]
| [pad_options: map()]
| [old_stream_format: Membrane.StreamFormat.t()]
| [start_of_stream_received?: boolean()]

@spec from_state(Membrane.Core.Element.State.t(), optional_fields()) ::
Membrane.Element.CallbackContext.t()
Expand Down
117 changes: 57 additions & 60 deletions lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,58 +60,90 @@ defmodule Membrane.Core.Element.EventController do
end

@spec exec_handle_event(Pad.ref(), Event.t(), params :: map, State.t()) :: State.t()
def exec_handle_event(pad_ref, event, params \\ %{}, state) do
case handle_special_event(pad_ref, event, state) do
{:handle, state} ->
:ok = check_sync(event, state)
do_exec_handle_event(pad_ref, event, params, state)
def exec_handle_event(pad_ref, event, params \\ %{}, state)

{:ignore, state} ->
state
end
end

@spec do_exec_handle_event(Pad.ref(), Event.t(), params :: map, State.t()) :: State.t()
defp do_exec_handle_event(pad_ref, %event_type{} = event, params, state)
when event_type in [Events.StartOfStream, Events.EndOfStream] do
data = PadModel.get_data!(state, pad_ref)
callback = stream_event_to_callback(event)
def exec_handle_event(pad_ref, %Events.StartOfStream{} = event, params, state) do
state = PadModel.set_data!(state, pad_ref, :start_of_stream?, true)
:ok = check_sync(state)

new_params =
Map.merge(params, %{
context: &CallbackContext.from_state/1,
direction: data.direction
direction: PadModel.get_data!(state, pad_ref, :direction)
})

state =
CallbackHandler.exec_and_handle_callback(
callback,
:handle_start_of_stream,
ActionHandler,
new_params,
[pad_ref],
state
)

Message.send(state.parent_pid, :stream_management_event, [
state.name,
pad_ref,
event
])
Message.send(
state.parent_pid,
:stream_management_event,
[state.name, pad_ref, event, []]
)

state
end

defp do_exec_handle_event(pad_ref, event, params, state) do
def exec_handle_event(pad_ref, %Events.EndOfStream{} = event, params, state) do
if PadModel.get_data!(state, pad_ref, :end_of_stream?) do
Membrane.Logger.debug("Ignoring end of stream as it has already arrived before")
state
else
Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}")

state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true)
state = PadController.remove_pad_associations(pad_ref, state)

%{
start_of_stream?: start_of_stream?,
direction: direction
} = PadModel.get_data!(state, pad_ref)

event_params = [start_of_stream_received?: start_of_stream?]

new_params =
Map.merge(params, %{
context: &CallbackContext.from_state(&1, event_params),
direction: direction
})

state =
CallbackHandler.exec_and_handle_callback(
:handle_end_of_stream,
ActionHandler,
new_params,
[pad_ref],
state
)

Message.send(
state.parent_pid,
:stream_management_event,
[state.name, pad_ref, event, event_params]
)

state
end
end

def exec_handle_event(pad_ref, event, params, state) do
data = PadModel.get_data!(state, pad_ref)

params =
%{context: &CallbackContext.from_state/1, direction: data.direction} |> Map.merge(params)
%{context: &CallbackContext.from_state/1, direction: data.direction}
|> Map.merge(params)

args = [pad_ref, event]
CallbackHandler.exec_and_handle_callback(:handle_event, ActionHandler, params, args, state)
end

defp check_sync(%Events.StartOfStream{}, state) do
defp check_sync(state) do
if state.pads_data
|> Map.values()
|> Enum.filter(&(&1.direction == :input))
Expand All @@ -122,42 +154,7 @@ defmodule Membrane.Core.Element.EventController do
:ok
end

defp check_sync(_event, _state) do
:ok
end

@spec handle_special_event(Pad.ref(), Event.t(), State.t()) ::
{:handle | :ignore, State.t()}
defp handle_special_event(pad_ref, %Events.StartOfStream{}, state) do
Membrane.Logger.debug("received start of stream")
state = PadModel.set_data!(state, pad_ref, :start_of_stream?, true)
{:handle, state}
end

defp handle_special_event(pad_ref, %Events.EndOfStream{}, state) do
pad_data = PadModel.get_data!(state, pad_ref)

with %{start_of_stream?: true, end_of_stream?: false} <- pad_data do
state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true)
state = PadController.remove_pad_associations(pad_ref, state)
{:handle, state}
else
%{end_of_stream?: true} ->
Membrane.Logger.debug("Ignoring end of stream as it has already arrived before")
{:ignore, state}

%{start_of_stream?: false} ->
Membrane.Logger.debug("Ignoring end of stream as start of stream hasn't arrived yet")
{:ignore, state}
end
end

defp handle_special_event(_pad_ref, _event, state), do: {:handle, state}

defp buffers_before_event_present?(pad_data) do
pad_data.input_queue && not InputQueue.empty?(pad_data.input_queue)
end

defp stream_event_to_callback(%Events.StartOfStream{}), do: :handle_start_of_stream
defp stream_event_to_callback(%Events.EndOfStream{}), do: :handle_end_of_stream
end
5 changes: 3 additions & 2 deletions lib/membrane/core/parent/lifecycle_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,10 @@ defmodule Membrane.Core.Parent.LifecycleController do
Membrane.Event.t(),
Child.name(),
Pad.ref(),
[start_of_stream_received?: boolean()],
Parent.state()
) :: Parent.state()
def handle_stream_management_event(%event_type{}, element_name, pad_ref, state)
def handle_stream_management_event(%event_type{}, element_name, pad_ref, event_params, state)
when event_type in [Events.StartOfStream, Events.EndOfStream] do
callback =
case event_type do
Expand All @@ -147,7 +148,7 @@ defmodule Membrane.Core.Parent.LifecycleController do
CallbackHandler.exec_and_handle_callback(
callback,
Component.action_handler(state),
%{context: &Component.context_from_state/1},
%{context: &Component.context_from_state(&1, event_params)},
[element_name, pad_ref],
state
)
Expand Down
13 changes: 11 additions & 2 deletions lib/membrane/core/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,18 @@ defmodule Membrane.Core.Pipeline do
end

@impl GenServer
def handle_info(Message.new(:stream_management_event, [element_name, pad_ref, event]), state) do
def handle_info(
Message.new(:stream_management_event, [element_name, pad_ref, event, event_params]),
state
) do
state =
LifecycleController.handle_stream_management_event(event, element_name, pad_ref, state)
LifecycleController.handle_stream_management_event(
event,
element_name,
pad_ref,
event_params,
state
)

{:noreply, state}
end
Expand Down
1 change: 1 addition & 0 deletions lib/membrane/core/pipeline/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Membrane.Core.Pipeline.CallbackContext do
@type optional_fields ::
[from: GenServer.from()]
| [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()]
| [start_of_stream_received?: boolean()]

@spec from_state(Membrane.Core.Pipeline.State.t(), optional_fields()) ::
Membrane.Pipeline.CallbackContext.t()
Expand Down
6 changes: 5 additions & 1 deletion lib/membrane/element/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ defmodule Membrane.Element.CallbackContext do
Field `:pad_options` is present only in `c:Membrane.Element.Base.handle_pad_added/3`
and `c:Membrane.Element.Base.handle_pad_removed/3`.

Field `:start_of_stream_received?` is present only in
`c:Membrane.Element.WithInputPads.handle_end_of_stream/3`.

Field `:old_stream_format` is present only in
`c:Membrane.Element.WithInputPads.handle_stream_format/4`.
"""
Expand All @@ -25,6 +28,7 @@ defmodule Membrane.Element.CallbackContext do
:utility_supervisor => Membrane.UtilitySupervisor.t(),
optional(:incoming_demand) => non_neg_integer(),
optional(:pad_options) => map(),
optional(:old_stream_format) => Membrane.StreamFormat.t()
optional(:old_stream_format) => Membrane.StreamFormat.t(),
optional(:start_of_stream_received?) => boolean()
}
end
6 changes: 5 additions & 1 deletion lib/membrane/pipeline/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ defmodule Membrane.Pipeline.CallbackContext do

Field `:from` is present only in `c:Membrane.Pipeline.handle_call/3`.

Field `:start_of_stream_received?` is present only in
`c:Membrane.Pipeline.handle_element_end_of_stream/4`.

Fields `:members` and `:crash_initiator` are present only in
`c:Membrane.Pipeline.handle_crash_group_down/3`.
"""
Expand All @@ -19,6 +22,7 @@ defmodule Membrane.Pipeline.CallbackContext do
:utility_supervisor => Membrane.UtilitySupervisor.t(),
optional(:from) => [GenServer.from()],
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name()
optional(:crash_initiator) => Membrane.Child.name(),
optional(:start_of_stream_received?) => boolean()
}
end
6 changes: 0 additions & 6 deletions test/membrane/core/element/event_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ defmodule Membrane.Core.Element.EventControllerTest do
assert state.pads_data.input.start_of_stream?
end

test "ignoring end of stream when there was no start of stream prior", %{state: state} do
state = EventController.handle_event(:input, %Events.EndOfStream{}, state)
refute state.pads_data.input.end_of_stream?
refute state.pads_data.input.start_of_stream?
end

test "end of stream successfully", %{state: state} do
state = put_start_of_stream(state, :input)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do
if state.lazy?, do: Process.sleep(100)
{[forward: buffer], state}
end

@impl true
def handle_end_of_stream(_pad, _ctx, state) do
{[], state}
end
end

defmodule DoubleFlowControlSource do
Expand Down
Loading