Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the plugin #19

Merged
merged 11 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions .github/workflows/enforce-changelog-update.yml

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_ivf_plugin` to your list of dep
```elixir
def deps do
[
{:membrane_ivf_plugin, "~> 0.7.0"}
{:membrane_ivf_plugin, "~> 0.8.0"}
]
end
```
Expand Down
53 changes: 28 additions & 25 deletions lib/deserializer.ex → lib/membrane_ivf/deserializer.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.Element.IVF.Deserializer do
defmodule Membrane.IVF.Deserializer do
@moduledoc """
Deserializer is capable of converting stream representing video in IVF format
into stream of Membrane.Buffer's with video frames with correct timestamps in
Expand All @@ -7,25 +7,28 @@ defmodule Membrane.Element.IVF.Deserializer do
use Membrane.Filter
use Numbers, overload_operators: true

alias Membrane.Element.IVF.Headers
alias Membrane.Element.IVF.Headers.FrameHeader
alias Membrane.{Buffer, RemoteStream, Time}
alias Membrane.IVF.Headers
alias Membrane.IVF.Headers.FrameHeader
alias Membrane.{Buffer, IVF, Time}
alias Membrane.{VP8, VP9}

def_input_pad :input, accepted_format: _any
def_input_pad :input,
accepted_format: %Membrane.RemoteStream{content_format: fmt} when fmt in [IVF, nil]

def_output_pad :output,
accepted_format:
%RemoteStream{content_format: format, type: :packetized} when format in [VP9, VP8]
accepted_format: any_of(VP8, VP9)

defmodule State do
@moduledoc false

@doc """
frame_acc is tuple of {bytes_left_to_accumulate, accumulated_binary}
When bytes_left_to_accumulate is equal to 0 it means that whole frame has been accumulated
"""
defstruct [:timebase, frame_acc: <<>>, start_of_stream?: true]
@type t :: %__MODULE__{
timebase: Ratio.t(),
frame_acc: binary(),
beginning_of_stream: boolean()
}

defstruct timebase: nil,
frame_acc: <<>>,
beginning_of_stream: true
end

@impl true
Expand All @@ -35,29 +38,29 @@ defmodule Membrane.Element.IVF.Deserializer do

@impl true
def handle_stream_format(_pad, _stream_format, _ctx, state) do
# ignore incoming stream_format, we will send our own
# in handle_buffer
{[], state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, %State{start_of_stream?: true} = state) do
def handle_buffer(:input, buffer, _ctx, %State{beginning_of_stream: true} = state) do
state = %State{state | frame_acc: state.frame_acc <> buffer.payload}

with {:ok, file_header, rest} <- Headers.parse_ivf_header(state.frame_acc),
{:ok, buffer, rest} <- get_buffer(rest, Ratio.new(file_header.scale, file_header.rate)) do
{:ok, buffer, rest} <- get_buffer(rest, file_header.timebase) do
stream_format =
case file_header.four_cc do
"VP90" -> %Membrane.RemoteStream{content_format: VP9, type: :packetized}
"VP80" -> %Membrane.RemoteStream{content_format: VP8, type: :packetized}
"VP90" -> %VP9{width: file_header.width, height: file_header.height}
"VP80" -> %VP8{width: file_header.width, height: file_header.height}
end

{[stream_format: {:output, stream_format}, buffer: {:output, buffer}],
%State{
frame_acc: rest,
start_of_stream?: false,
timebase: Ratio.new(file_header.scale, file_header.rate)
}}
{
[stream_format: {:output, stream_format}, buffer: {:output, buffer}],
%State{
frame_acc: rest,
beginning_of_stream: false,
timebase: file_header.timebase
}
}
else
{:error, :too_short} ->
{[], state}
Expand Down
76 changes: 24 additions & 52 deletions lib/membrane_element_ivf/headers.ex → lib/membrane_ivf/headers.ex
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
defmodule Membrane.Element.IVF.Headers do
defmodule Membrane.IVF.Headers do
@moduledoc false

alias Membrane.{VP8, VP9}

@signature "DKIF"
@version 0
@header_length 32

defmodule FileHeader do
@moduledoc """
A struct representing IVF file header
"""
@type t :: %__MODULE__{
signature: String.t(),
version: non_neg_integer(),
length_of_header: non_neg_integer(),
four_cc: String.t(),
width: non_neg_integer(),
height: non_neg_integer(),
rate: non_neg_integer(),
scale: non_neg_integer(),
width: pos_integer(),
height: pos_integer(),
timebase: Ratio.t(),
frame_count: non_neg_integer()
}

defstruct [
:signature,
:version,
:length_of_header,
@enforce_keys [
:four_cc,
:width,
:height,
:rate,
:scale,
:timebase,
:frame_count
]
defstruct @enforce_keys
end

defmodule FrameHeader do
Expand All @@ -49,17 +46,12 @@ defmodule Membrane.Element.IVF.Headers do
# bytes 4-11 64-bit presentation timestamp
# bytes 12.. frame data

# Function firstly calculate
# calculating ivf timestamp from membrane timestamp(timebase for membrane timestamp is nanosecond, and timebase for ivf is passed in options)

@spec create_ivf_frame_header(integer, number | Ratio.t(), number | Ratio.t()) :: binary
def create_ivf_frame_header(size, timestamp, timebase) do
ivf_timestamp = Membrane.Time.divide_by_timebase(timestamp, Membrane.Time.seconds(timebase))
# conversion to little-endian binary strings
size_le = <<size::32-little>>
timestamp_le = <<ivf_timestamp::64-little>>

size_le <> timestamp_le
<<size::32-little, ivf_timestamp::64-little>>
end

# IVF Header:
Expand All @@ -73,39 +65,23 @@ defmodule Membrane.Element.IVF.Headers do
# bytes 20-23 time base numerator (scale)
# bytes 24-27 number of frames in file
# bytes 28-31 unused
@spec create_ivf_header(integer, integer, Ratio.t(), integer, any) :: binary
@spec create_ivf_header(pos_integer(), pos_integer(), Ratio.t(), non_neg_integer(), any()) ::
binary()
def create_ivf_header(width, height, timebase, frame_count, stream_format) do
codec_four_cc =
case stream_format do
%Membrane.RemoteStream{content_format: VP9} -> "VP90"
%VP9{} -> "VP90"
%Membrane.RemoteStream{content_format: VP8} -> "VP80"
%VP8{} -> "VP80"
_unknown -> "\0\0\0\0"
end

%Ratio{denominator: rate, numerator: scale} = timebase

signature = "DKIF"
version = <<0, 0>>
length_of_header = <<32, 0>>
# conversion to little-endian binary strings
width_le = <<width::16-little>>
height_le = <<height::16-little>>
rate_le = <<rate::32-little>>
scale_le = <<scale::32-little>>
frame_count = <<frame_count::32>>
# field is not used so it is set to 0
unused = <<0::32>>

signature <>
version <>
length_of_header <>
codec_four_cc <>
width_le <>
height_le <>
rate_le <>
scale_le <>
frame_count <>
unused
<<@signature, @version::16-little, @header_length::16-little, codec_four_cc::binary-4,
width::16-little, height::16-little, rate::32-little, scale::32-little,
frame_count::32-little, 0::32>>
end

@spec parse_ivf_frame_header(binary()) ::
Expand All @@ -122,21 +98,17 @@ defmodule Membrane.Element.IVF.Headers do
def parse_ivf_header(payload) when byte_size(payload) < 32, do: {:error, :too_short}

def parse_ivf_header(
<<signature::binary-size(4), version::16-little, length_of_header::16-little,
four_cc::binary-size(4), width::16-little, height::16-little, rate::32-little,
scale::32-little, frame_count::32-little, _unused::32, rest::binary>>
<<@signature, @version::16-little, @header_length::16-little, four_cc::binary-size(4),
width::16-little, height::16-little, rate::32-little, scale::32-little,
frame_count::32-little, _unused::32, rest::binary>>
) do
if String.valid?(signature) and String.valid?(four_cc) do
if String.valid?(four_cc) do
{:ok,
%FileHeader{
signature: signature,
version: version,
length_of_header: length_of_header,
four_cc: four_cc,
width: width,
height: height,
rate: rate,
scale: scale,
timebase: Ratio.new(scale, rate),
frame_count: frame_count
}, rest}
else
Expand Down
147 changes: 147 additions & 0 deletions lib/membrane_ivf/serializer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
defmodule Membrane.IVF.Serializer do
@moduledoc """
Serializes video stream into IVF format.
"""

use Membrane.Filter
use Numbers, overload_operators: true

alias Membrane.IVF.Headers
alias Membrane.{Buffer, File, IVF, RemoteStream}
alias Membrane.{VP8, VP9}

@frame_count_file_position 24

def_options width: [
spec: non_neg_integer() | nil,
default: nil,
description: """
Width of a frame, needed if not provided with stream format. If it's not specified either in this option or the stream format, the element will crash.
"""
],
height: [
spec: non_neg_integer() | nil,
default: nil,
description: """
Height of a frame, needed if not provided with stream format. If it's not specified either in this option or the stream format, the element will crash.
"""
],
timebase: [
spec: {pos_integer(), pos_integer()},
default: {1, 1_000_000_000},
description: """
Timebase for the timestamps added to the frames
"""
],
frame_count: [
spec: non_neg_integer() | :dynamic,
default: :dynamic,
description: """
Number of frames in the stream. If set to `:dynamic` the frames will be counted and
a `Membrane.File.SeekSinkEvent` will be sent at the end of the stream to insert this value in
the file header. In that case the element MUST be used along with `Membrane.File.Sink`
or any other sink that can handle `Membrane.File.SeekSinkEvent`.
"""
]

def_input_pad :input,
accepted_format:
any_of(
%RemoteStream{content_format: format, type: :packetized} when format in [VP9, VP8],
VP8,
VP9
)

def_output_pad :output, accepted_format: _any

defmodule State do
@moduledoc false

@type t :: %__MODULE__{
width: non_neg_integer() | nil,
height: non_neg_integer() | nil,
timebase: Ratio.t(),
frame_count: non_neg_integer() | :dynamic
}

@enforce_keys [:width, :height, :timebase, :frame_count]
defstruct @enforce_keys ++
[
frames_processed: 0
]
end

@impl true
def handle_init(_ctx, options) do
{timebase_num, timebase_den} = options.timebase

{[],
%State{
width: options.width,
height: options.height,
timebase: Ratio.new(timebase_num, timebase_den),
frame_count: options.frame_count
}}
end

@impl true
def handle_playing(_ctx, state) do
{[stream_format: {:output, %Membrane.RemoteStream{type: :packetized, content_format: IVF}}],
state}
end

@impl true
def handle_stream_format(:input, stream_format, _ctx, state) do
{width, height} = get_frame_dimensions(stream_format, state)

frame_count = if state.frame_count == :dynamic, do: 0, else: state.frame_count

ivf_header =
Headers.create_ivf_header(width, height, state.timebase, frame_count, stream_format)

{[buffer: {:output, %Buffer{payload: ivf_header}}], state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
%Buffer{payload: frame, pts: timestamp} = buffer

ivf_frame =
Headers.create_ivf_frame_header(byte_size(frame), timestamp, state.timebase) <>
frame

{[buffer: {:output, %Buffer{buffer | payload: ivf_frame}}],
%{state | frames_processed: state.frames_processed + 1}}
end

@impl true
def handle_end_of_stream(:input, _ctx, state) do
actions =
if state.frame_count == :dynamic do
[
event: {:output, %File.SeekSinkEvent{position: @frame_count_file_position}},
buffer: {:output, %Buffer{payload: <<state.frames_processed::32-little>>}},
end_of_stream: :output
]
else
[end_of_stream: :output]
end

{actions, state}
end

@spec get_frame_dimensions(RemoteStream.t() | VP8.t() | VP9.t(), State.t()) ::
{width :: non_neg_integer(), height :: non_neg_integer()}
defp get_frame_dimensions(input_stream_format, state) do
case input_stream_format do
%RemoteStream{} ->
{
state.width || raise("Width not provided"),
state.height || raise("Height not provided")
}

%{width: width, height: height} ->
{width, height}
end
end
end
Loading