Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ogg reader #43

Merged
merged 7 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions examples/send_from_file/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Send From File

Send video from a file to a browser.
Send video and audio from files to a browser.

1. Start `ex_ice/signalling_server` with `mix run --no-halt`
2. Run `elixir example.exs`
3. Visit `example.html` in your browser e.g. `file:///home/Repos/elixir-webrtc/ex_webrtc/examples/send_from_file/example.html`
3. Visit `example.html` in your browser e.g. `file:///home/Repos/elixir-webrtc/ex_webrtc/examples/send_from_file/example.html`
4. Press the play button.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

video.avf -> video.ivf

You can replace `video.avf` or `audio.ogg` and use your own files instead.
Binary file added examples/send_from_file/audio.ogg
Binary file not shown.
98 changes: 75 additions & 23 deletions examples/send_from_file/example.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Peer do
IceCandidate,
MediaStreamTrack,
Media.IVFReader,
Media.OggReader,
PeerConnection,
RTPCodecParameters,
RTP.VP8Payloader,
Expand Down Expand Up @@ -45,6 +46,15 @@ defmodule Peer do
{:ok, pc} =
PeerConnection.start_link(
ice_servers: @ice_servers,
audio_codecs: [
%RTPCodecParameters{
payload_type: 111,
mime_type: "audio/opus",
clock_rate: 48_000,
channels: 2,
sdp_fmtp_line: %ExSDP.Attribute.FMTP{pt: 111, minptime: 10, useinbandfec: true}
}
],
video_codecs: [
%RTPCodecParameters{
payload_type: 96,
Expand All @@ -62,11 +72,13 @@ defmodule Peer do
conn: conn,
stream: stream,
peer_connection: pc,
track_id: nil,
ivf_reader: nil,
payloader: nil,
timer: nil,
last_timestamp: Enum.random(0..@max_rtp_timestamp)
video_track_id: nil,
video_reader: nil,
video_payloader: nil,
last_video_timestamp: Enum.random(0..@max_rtp_timestamp),
audio_track_id: nil,
audio_reader: nil,
last_audio_timestamp: Enum.random(0..@max_rtp_timestamp)
}}

other ->
Expand Down Expand Up @@ -111,32 +123,64 @@ defmodule Peer do
end

@impl true
def handle_info(:send_frame, state) do
Process.send_after(self(), :send_frame, 30)
def handle_info(:send_video_frame, state) do
Process.send_after(self(), :send_video_frame, 30)

case IVFReader.next_frame(state.ivf_reader) do
case IVFReader.next_frame(state.video_reader) do
{:ok, frame} ->
{rtp_packets, payloader} = VP8Payloader.payload(state.payloader, frame.data)
{rtp_packets, payloader} = VP8Payloader.payload(state.video_payloader, frame.data)

# the video has 30 FPS, VP8 clock rate is 90_000, so we have:
# 90_000 / 30 = 3_000
last_timestamp = state.last_timestamp + 3_000 &&& @max_rtp_timestamp
last_timestamp = state.last_video_timestamp + 3_000 &&& @max_rtp_timestamp

rtp_packets =
Enum.map(rtp_packets, fn rtp_packet -> %{rtp_packet | timestamp: last_timestamp} end)

Enum.each(rtp_packets, fn rtp_packet ->
PeerConnection.send_rtp(state.peer_connection, state.track_id, rtp_packet)
PeerConnection.send_rtp(state.peer_connection, state.video_track_id, rtp_packet)
end)

state = %{state | payloader: payloader, last_timestamp: last_timestamp}
{:noreply, state}
{:noreply, %{state | video_payloader: payloader, last_video_timestamp: last_timestamp}}

:eof ->
Logger.info("video.ivf ended. Looping...")
{:ok, ivf_reader} = IVFReader.open("./video.ivf")
{:ok, _header} = IVFReader.read_header(ivf_reader)
state = %{state | ivf_reader: ivf_reader}
{:ok, reader} = IVFReader.open("./video.ivf")
{:ok, _header} = IVFReader.read_header(reader)
{:noreply, %{state | video_reader: reader}}
end
end

@impl true
def handle_info(:send_audio_packet, state) do
case OggReader.next_packet(state.audio_reader) do
{:ok, reader, {packet, duration}} ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The convention is to return reader at the last place in the result tuple. Take a look at Map.pop and others

# in real-life scenario, you will need to conpensate for `Process.send_after/3` error
# and time spent on reading and parsing the file
# that's why you might hear short pauses in audio playback, when using this example
Process.send_after(self(), :send_audio_packet, duration)
rtp_packet = ExRTP.Packet.new(packet, 111, 1000, state.last_audio_timestamp, 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would pass 0 for everything that RTP sender is responsible for. Right now, we might confuse user that they are responsible for handling e.g. pt

PeerConnection.send_rtp(state.peer_connection, state.audio_track_id, rtp_packet)

# OggReader.next_packet/1 returns duration in ms
# we have to convert it to RTP timestamp difference
timestamp_delta = trunc(duration * 48_000 / 1000)

{:noreply,
%{
state
| audio_reader: reader,
last_audio_timestamp: state.last_audio_timestamp + timestamp_delta
}}

:eof ->
send(self(), :send_audio_packet)
Logger.info("audio.ogg ended. Looping...")
{:ok, reader} = OggReader.open("./audio.ogg")
{:noreply, %{state | audio_reader: reader}}

{:error, reason} ->
Logger.error("Error when reading Ogg, reason: #{inspect(reason)}")
{:noreply, state}
end
end
Expand All @@ -151,13 +195,18 @@ defmodule Peer do
%{"role" => _role, "type" => "peer_joined"},
%{peer_connection: pc} = state
) do
track = MediaStreamTrack.new(:video)
{:ok, _} = PeerConnection.add_transceiver(pc, track, codec: :vp8)
audio_track = MediaStreamTrack.new(:audio)
video_track = MediaStreamTrack.new(:video)

{:ok, _} = PeerConnection.add_track(pc, audio_track)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brilliant!

{:ok, _} = PeerConnection.add_track(pc, video_track)

{:ok, offer} = PeerConnection.create_offer(pc)
:ok = PeerConnection.set_local_description(pc, offer)
msg = %{"type" => "offer", "sdp" => offer.sdp}
:gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)})
%{state | track_id: track.id}

%{state | video_track_id: video_track.id, audio_track_id: audio_track.id}
end

defp handle_ws_message(%{"type" => "answer", "sdp" => sdp}, state) do
Expand Down Expand Up @@ -202,13 +251,16 @@ defmodule Peer do

defp handle_webrtc_message({:connection_state_change, :connected} = msg, state) do
Logger.info("#{inspect(msg)}")
Logger.info("Starting sending video.ivf")
Logger.info("Starting sending video.ivf and audio.ogg...")
{:ok, ivf_reader} = IVFReader.open("./video.ivf")
{:ok, _header} = IVFReader.read_header(ivf_reader)
payloader = VP8Payloader.new(800)
vp8_payloader = VP8Payloader.new(800)

{:ok, ogg_reader} = OggReader.open("./audio.ogg")

Process.send_after(self(), :send_frame, 30)
%{state | ivf_reader: ivf_reader, payloader: payloader}
Process.send_after(self(), :send_video_frame, 30)
Process.send_after(self(), :send_audio_packet, 20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we could do Process.sendafter 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better, send(self(), ...) 🙂

%{state | video_reader: ivf_reader, video_payloader: vp8_payloader, audio_reader: ogg_reader}
end

defp handle_webrtc_message(msg, state) do
Expand Down
2 changes: 1 addition & 1 deletion examples/send_from_file/example.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<main>
<h1>Elixir WebRTC Send From File Example</h1>
</main>
<video id="videoPlayer" autoplay muted> </video>
<video id="videoPlayer" autoplay controls> </video>
<script src="example.js"></script>
</body>
</html>
168 changes: 168 additions & 0 deletions lib/ex_webrtc/media/ogg_reader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
defmodule ExWebRTC.Media.OggReader do
@moduledoc """
Defines Ogg reader.

For now, works only with single Opus stream in the container.

Based on:
* [Xiph's official Ogg documentation](https://xiph.org/ogg/)
* [RFC 7845: Ogg Encapsulation for the Opus Audio Codec](https://www.rfc-editor.org/rfc/rfc7845.txt)
* [RFC 6716: Definition of the Opus Audio Codec](https://www.rfc-editor.org/rfc/rfc6716.txt)
"""

import Bitwise

@crc_params %{
extend: :crc_32,
poly: 0x04C11DB7,
init: 0x0,
xorout: 0x0,
refin: false,
refout: false
}

@signature "OggS"
@id_signature "OpusHead"
@comment_signature "OpusTags"
@version 0

@opaque t() :: %{
file: File.io_device(),
packets: [binary()],
rest: binary()
}

@doc """
Opens Ogg file.

For now, works only with single Opus stream in the container.
This function reads the ID and Comment Headers (and, for now, ignores them).
"""
@spec open(Path.t()) :: {:ok, t()} | {:error, File.posix() | :invalid_header}
def open(path) do
with {:ok, file} <- File.open(path),
reader <- %{file: file, packets: [], rest: <<>>},
# for now, we ignore ID Header and Comment Header
{:ok, reader, <<@id_signature, _rest::binary>>} <- do_next_packet(reader),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, keep reader at the end of return tuple

{:ok, reader, <<@comment_signature, _rest::binary>>} <- do_next_packet(reader) do
{:ok, reader}
else
{:error, _res} = err -> err
_other -> {:error, :invalid_header}
end
end

@doc """
Reads next Ogg packet.

One Ogg packet is equivalent to one Opus packet.
This function also returns the duration of the audio in milliseconds, based on Opus packet TOC sequence.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's elaborate on TOC

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just redirect the reader to the RFC.

It assumes that all of the Ogg packets belong to the same stream.
"""
@spec next_packet(t()) ::
{:ok, t(), {binary(), non_neg_integer()}}
| {:error, :invalid_page_header | :not_enough_data}
| :eof
def next_packet(reader) do
with {:ok, reader, packet} <- do_next_packet(reader),
{:ok, duration} <- get_packet_duration(packet) do
{:ok, reader, {packet, duration}}
end
end

defp do_next_packet(%{packets: [first | packets]} = reader) do
{:ok, %{reader | packets: packets}, first}
end

defp do_next_packet(%{packets: []} = reader) do
with {:ok, _header, packets, rest} <- read_page(reader.file) do
case packets do
[] ->
do_next_packet(%{reader | packets: [], rest: reader.rest <> rest})

[first | packets] ->
packet = rest <> first
reader = %{reader | packets: packets, rest: rest}
{:ok, reader, packet}
end
end
end

defp read_page(file) do
with <<@signature, @version, type, granule_pos::little-64, serial_no::little-32,
sequence_no::little-32, _checksum::little-32,
segment_no>> = header <- IO.binread(file, 27),
raw_segment_table when is_binary(raw_segment_table) <- IO.binread(file, segment_no),
segment_table <- :binary.bin_to_list(raw_segment_table),
payload_length <- Enum.sum(segment_table),
payload when is_binary(payload) <- IO.binread(file, payload_length),
:ok <- verify_checksum(header <> raw_segment_table <> payload) do
{packets, rest} = split_packets(segment_table, payload)

type = %{
fresh?: (type &&& 0x01) != 0,
first?: (type &&& 0x02) != 0,
last?: (type &&& 0x04) != 0
}

{:ok,
%{
type: type,
granule_pos: granule_pos,
serial_no: serial_no,
sequence_no: sequence_no
}, packets, rest}
else
data when is_binary(data) -> {:error, :invalid_page_header}
:eof -> :eof
{:error, _res} = err -> err
end
end

defp verify_checksum(<<start::binary-22, checksum::little-32, rest::binary>>) do
actual_checksum =
<<start::binary, 0::32, rest::binary>>
|> CRC.calculate(@crc_params)

if checksum == actual_checksum do
:ok
else
{:error, :invalid_checksum}
end
end

defp split_packets(segment_table, payload, packets \\ [], packet \\ <<>>)
defp split_packets([], <<>>, packets, packet), do: {Enum.reverse(packets), packet}

defp split_packets([segment_len | segment_table], payload, packets, packet) do
<<segment::binary-size(segment_len), rest::binary>> = payload
packet = packet <> segment

case segment_len do
255 -> split_packets(segment_table, rest, packets, packet)
_len -> split_packets(segment_table, rest, [packet | packets], <<>>)
end
end

# computes how much audio Opus packet contains (in ms), based on the TOC sequence
# RFC 6716, sec. 3
defp get_packet_duration(<<config::5, rest::bitstring>>) do
with {:ok, frame_count} <- get_frame_count(rest) do
{:ok, trunc(frame_count * get_frame_duration(config))}
end
end

defp get_packet_duration(_other), do: {:error, :not_enough_data}

defp get_frame_count(<<_s::1, 0::2, _rest::binary>>), do: {:ok, 1}
defp get_frame_count(<<_s::1, c::2, _rest::binary>>) when c in 1..2, do: {:ok, 2}
defp get_frame_count(<<_s::1, 3::2, _vp::2, frame_no::5, _rest::binary>>), do: {:ok, frame_no}
defp get_frame_count(_other), do: {:error, :not_enough_data}

defp get_frame_duration(config) when config in [16, 20, 24, 28], do: 2.5
defp get_frame_duration(config) when config in [17, 21, 25, 29], do: 5
defp get_frame_duration(config) when config in [0, 4, 8, 12, 14, 18, 22, 26, 30], do: 10
defp get_frame_duration(config) when config in [1, 5, 9, 13, 15, 19, 23, 27, 31], do: 20
defp get_frame_duration(config) when config in [2, 6, 10], do: 40
defp get_frame_duration(config) when config in [3, 7, 11], do: 60
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ defmodule ExWebRTC.MixProject do
{:ex_libsrtp, "~> 0.7.1"},
{:ex_rtp, "~> 0.2.0"},
{:ex_rtcp, "~> 0.1.0"},
{:crc, "~> 0.10"},

# dev/test
{:excoveralls, "~> 0.17.0", only: [:dev, :test], runtime: false},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
"bundlex": {:hex, :bundlex, "1.4.1", "60702b7f8e036a00c88bec69993329cc4aae32fe402804fe2e8db0c1e1396cd6", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "7511718b4b8063e457f3fa5166df177beff65c532db44631f41b496cfa2f48a3"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"},
"crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"},
"credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"},
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"elixir_make": {:hex, :elixir_make, "0.7.7", "7128c60c2476019ed978210c245badf08b03dbec4f24d05790ef791da11aa17c", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5bc19fff950fad52bbe5f211b12db9ec82c6b34a9647da0c2224b8b8464c7e6c"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
Expand Down
Loading
Loading