Skip to content

Commit

Permalink
Add moq-gst
Browse files Browse the repository at this point in the history
  • Loading branch information
levaitamas committed Nov 2, 2024
1 parent bf830f7 commit 6d1a6b7
Show file tree
Hide file tree
Showing 9 changed files with 744 additions and 27 deletions.
411 changes: 384 additions & 27 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"moq-dir",
"moq-native-ietf",
"moq-catalog",
"moq-gst",
]
resolver = "2"

Expand Down
36 changes: 36 additions & 0 deletions moq-gst/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "moq-gst"
description = "Media over QUIC - gstreamer plugin"
authors = ["Luke Curley"]
repository = "https://github.com/kixelated/moq-gst"
license = "MIT OR Apache-2.0"

version = "0.1.0"
edition = "2021"

keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
moq-transport = "0.8"
moq-native-ietf = "0.5"
moq-pub = "0.8"

gst = { package = "gstreamer", version = "0.23" }
gst-base = { package = "gstreamer-base", version = "0.23" }
once_cell = "1"
bytes = "1"
url = "2"
tokio = { version = "1", features = ["full"] }
env_logger = "0.11"
anyhow = { version = "1", features = ["backtrace"] }

[build-dependencies]
gst-plugin-version-helper = "0.8"

[lib]
name = "gstmoq"
crate-type = ["cdylib","rlib"]
path = "src/lib.rs"
22 changes: 22 additions & 0 deletions moq-gst/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
A gstreamer plugin utilizing [moq-rs](https://github.com/englishm/moq-rs).

# Usage
Check out the `run` script for an example pipeline.

```bash
./run
```

By default this uses a localhost relay.
You can change the ENV args if you want to make it watchable on production instead:

```bash
ADDR=relay.quic.video NAME=something ./run
```

# License

Licensed under either:

- Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
3 changes: 3 additions & 0 deletions moq-gst/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
gst_plugin_version_helper::info()
}
43 changes: 43 additions & 0 deletions moq-gst/run
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash
set -euo pipefail

# Change directory to the root of the project
cd "$(dirname "$0")"

# Use info logging by default
export RUST_LOG="${RUST_LOG:-info}"

# Connect to localhost by default.
HOST="${HOST:-localhost}"
PORT="${PORT:-4443}"
ADDR="${ADDR:-$HOST:$PORT}"
SCHEME="${SCHEME:-https}"

# Use the name "bbb" for the broadcast.
NAME="${NAME:-bbb}"

# Combine the host into a URL.
URL="${URL:-"$SCHEME://$ADDR"}"

# Default to a source video
INPUT="${INPUT:-bbb.mp4}"

# Print out the watch URL
echo "Watch URL: https://quic.video/watch/$NAME?server=$ADDR"

# Make sure we build the gstreamer plugin
cargo build

export GST_PLUGIN_PATH="${PWD}/../target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}"
#export GST_DEBUG=*:5

# Download the Big Buck Bunny video if it doesn't exist
if [[ $INPUT == "bbb.mp4" && ! -f $INPUT ]]; then
echo "Downloading ya boye Big Buck Bunny..."
wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -O bbb.mp4
fi

# Run gstreamer and pipe the output to moq-pub
gst-launch-1.0 -v -e multifilesrc location="$INPUT" loop=true ! qtdemux name=demux \
demux.video_0 ! h264parse ! queue ! identity sync=true ! isofmp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink url="$URL" namespace="$NAME" \
# demux.audio_0 ! aacparse ! queue ! mux.
20 changes: 20 additions & 0 deletions moq-gst/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use gst::glib;

mod sink;

pub fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
sink::register(plugin)?;
Ok(())
}

gst::plugin_define!(
moq,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"MIT/Apache-2.0",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);
217 changes: 217 additions & 0 deletions moq-gst/src/sink/imp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
use anyhow::Context;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_base::subclass::prelude::*;

use moq_native_ietf::quic;
use moq_native_ietf::tls;
use moq_transport::serve::Tracks;
use moq_transport::serve::TracksReader;
use once_cell::sync::Lazy;
use std::sync::Mutex;
use url::Url;

pub static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap()
});

#[derive(Default)]
struct Settings {
pub url: Option<String>,
pub namespace: Option<String>,
pub tls_disable_verify: bool,
}

#[derive(Default)]
struct State {
pub media: Option<moq_pub::Media>,
pub buffer: bytes::BytesMut,
}

#[derive(Default)]
pub struct MoqSink {
settings: Mutex<Settings>,
state: Mutex<State>,
}

#[glib::object_subclass]
impl ObjectSubclass for MoqSink {
const NAME: &'static str = "MoqSink";
type Type = super::MoqSink;
type ParentType = gst_base::BaseSink;

fn new() -> Self {
Self::default()
}
}

impl ObjectImpl for MoqSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("url")
.nick("URL")
.blurb("Connect to the subscriber at the given URL")
.build(),
glib::ParamSpecString::builder("namespace")
.nick("Namespace")
.blurb("Publish the broadcast under the given namespace")
.build(),
glib::ParamSpecBoolean::builder("tls-disable-verify")
.nick("TLS disable verify")
.blurb("Disable TLS verification")
.default_value(false)
.build(),
]
});
PROPERTIES.as_ref()
}

fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();

match pspec.name() {
"url" => settings.url = Some(value.get().unwrap()),
"namespace" => settings.namespace = Some(value.get().unwrap()),
"tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(),
_ => unimplemented!(),
}
}

fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();

match pspec.name() {
"url" => settings.url.to_value(),
"namespace" => settings.namespace.to_value(),
"tls-disable-verify" => settings.tls_disable_verify.to_value(),
_ => unimplemented!(),
}
}
}

impl GstObjectImpl for MoqSink {}

impl ElementImpl for MoqSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"MoQ Sink",
"Sink",
"Transmits media over the network via MoQ",
"Luke Curley <kixelated@gmail.com>",
)
});

Some(&*ELEMENT_METADATA)
}

fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder("video/quicktime")
.field("variant", "iso-fragmented")
.build();

let pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();

vec![pad_template]
});
PAD_TEMPLATES.as_ref()
}
}

impl BaseSinkImpl for MoqSink {
fn start(&self) -> Result<(), gst::ErrorMessage> {
let _guard = RUNTIME.enter();
self.setup()
.map_err(|e| gst::error_msg!(gst::ResourceError::Failed, ["Failed to connect: {}", e]))
}

fn stop(&self) -> Result<(), gst::ErrorMessage> {
Ok(())
}

fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let data = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;

let mut state = self.state.lock().unwrap();

let mut buffer = state.buffer.split_off(0);
buffer.extend_from_slice(&data);

let media = state.media.as_mut().expect("not initialized");

// TODO avoid full media parsing? gst should be able to provide the necessary info
media.parse(&mut buffer).expect("failed to parse");

state.buffer = buffer;

Ok(gst::FlowSuccess::Ok)
}
}

impl MoqSink {
fn setup(&self) -> anyhow::Result<()> {
let settings = self.settings.lock().unwrap();
let namespace = settings.namespace.clone().context("missing namespace")?;
let (writer, _, reader) = Tracks::new(namespace).produce();

let mut state = self.state.lock().unwrap();
state.media = Some(moq_pub::Media::new(writer)?);

let url = settings.url.clone().context("missing url")?;
let url = url.parse().context("invalid URL")?;

// TODO support TLS certs and other options
let config = quic::Args {
bind: "[::]:0".parse().unwrap(),
tls: tls::Args {
disable_verify: settings.tls_disable_verify,
..Default::default()
},
}
.load()?;
let client = quic::Endpoint::new(config)?.client;

let session = Session {
client,
url,
tracks: reader,
};

tokio::spawn(async move { session.run().await.expect("failed to run session") });

Ok(())
}
}

struct Session {
pub client: quic::Client,
pub url: Url,
pub tracks: TracksReader,
}

impl Session {
async fn run(self) -> anyhow::Result<()> {
let session = self.client.connect(&self.url).await?;
let (session, mut publisher) = moq_transport::session::Publisher::connect(session).await?;

tokio::select! {
res = publisher.announce(self.tracks) => res?,
res = session.run() => res?,
};

Ok(())
}
}
18 changes: 18 additions & 0 deletions moq-gst/src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use gst::glib;
use gst::prelude::*;

mod imp;

glib::wrapper! {
pub struct MoqSink(ObjectSubclass<imp::MoqSink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
}

pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
env_logger::init();
gst::Element::register(
Some(plugin),
"moqsink",
gst::Rank::NONE,
MoqSink::static_type(),
)
}

0 comments on commit 6d1a6b7

Please sign in to comment.