Skip to content

Commit

Permalink
Merge pull request #15 from zafergurel/me/draft-06
Browse files Browse the repository at this point in the history
moq-transport: change namespace type to Tuple and add new message types (draft-06)
  • Loading branch information
englishm authored Nov 3, 2024
2 parents 5011ff1 + 41c46ad commit 2151545
Show file tree
Hide file tree
Showing 38 changed files with 417 additions and 70 deletions.
5 changes: 3 additions & 2 deletions moq-clock-ietf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use clap::Parser;
mod clock;

use moq_transport::{
coding::Tuple,
serve,
session::{Publisher, Subscriber},
};
Expand Down Expand Up @@ -64,7 +65,7 @@ async fn main() -> anyhow::Result<()> {
.context("failed to create MoQ Transport session")?;

let (mut writer, _, reader) = serve::Tracks {
namespace: config.namespace.clone(),
namespace: Tuple::from_utf8_path(&config.namespace),
}
.produce();

Expand All @@ -81,7 +82,7 @@ async fn main() -> anyhow::Result<()> {
.await
.context("failed to create MoQ Transport session")?;

let (prod, sub) = serve::Track::new(config.namespace, config.track).produce();
let (prod, sub) = serve::Track::new(Tuple::from_utf8_path(&config.namespace), config.track).produce();

let clock = clock::Subscriber::new(sub);

Expand Down
9 changes: 6 additions & 3 deletions moq-dir/src/listings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
sync::{Arc, Mutex},
};

use moq_transport::coding::Tuple;
use moq_transport::serve::{ServeError, Tracks, TracksReader, TracksWriter};

use crate::{ListingReader, ListingWriter};
Expand All @@ -20,7 +21,7 @@ pub struct Listings {

impl Listings {
pub fn new(namespace: String) -> Self {
let (writer, _, reader) = Tracks::new(namespace).produce();
let (writer, _, reader) = Tracks::new(Tuple::from_utf8_path(&namespace)).produce();

let state = State {
writer,
Expand All @@ -37,13 +38,15 @@ impl Listings {
pub fn register(&mut self, path: &str) -> Result<Option<Registration>, ServeError> {
let (prefix, base) = Self::prefix(path);

if !prefix.starts_with(&self.reader.namespace) {
let namespace = self.reader.namespace.to_utf8_path();

if !prefix.starts_with(&namespace) {
// Ignore anything that isn't in our namespace.
return Ok(None);
}

// Remove the namespace prefix from the path.
let prefix = &prefix[self.reader.namespace.len()..];
let prefix = &prefix[namespace.len()..];

let mut state = self.state.lock().unwrap();
if let Some(listing) = state.active.get_mut(prefix) {
Expand Down
2 changes: 1 addition & 1 deletion moq-dir/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Session {
async fn serve_announce(mut self, mut announce: Announced) -> anyhow::Result<()> {
announce.ok()?;

match self.listings.register(&announce.namespace) {
match self.listings.register(&announce.namespace.to_utf8_path()) {
Ok(_) => announce.closed().await?,
Err(err) => {
announce.close(err.clone())?;
Expand Down
4 changes: 2 additions & 2 deletions moq-pub/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::io::AsyncReadExt;

use moq_native_ietf::quic;
use moq_pub::Media;
use moq_transport::{serve, session::Publisher};
use moq_transport::{coding::Tuple, serve, session::Publisher};

#[derive(Parser, Clone)]
pub struct Cli {
Expand Down Expand Up @@ -51,7 +51,7 @@ async fn main() -> anyhow::Result<()> {

let cli = Cli::parse();

let (writer, _, reader) = serve::Tracks::new(cli.name).produce();
let (writer, _, reader) = serve::Tracks::new(Tuple::from_utf8_path(&cli.name)).produce();
let media = Media::new(writer)?;

let tls = cli.tls.load()?;
Expand Down
2 changes: 1 addition & 1 deletion moq-pub/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Media {
let mut track = moq_catalog::Track {
init_track: Some(self.init.name.clone()),
name: name.clone(),
namespace: Some(self.broadcast.namespace.clone()),
namespace: Some(self.broadcast.namespace.to_utf8_path()),
packaging: Some(moq_catalog::TrackPackaging::Cmaf),
render_group: Some(1),
..Default::default()
Expand Down
4 changes: 2 additions & 2 deletions moq-relay-ietf/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ impl Consumer {
async fn serve(mut self, mut announce: Announced) -> Result<(), anyhow::Error> {
let mut tasks = FuturesUnordered::new();

let (_, mut request, reader) = Tracks::new(announce.namespace.to_string()).produce();
let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce();

if let Some(api) = self.api.as_ref() {
let mut refresh = api.set_origin(reader.namespace.clone()).await?;
let mut refresh = api.set_origin(reader.namespace.to_utf8_path()).await?;
tasks.push(async move { refresh.run().await.context("failed refreshing origin") }.boxed());
}

Expand Down
11 changes: 7 additions & 4 deletions moq-relay-ietf/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use std::collections::HashMap;

use std::sync::{Arc, Mutex};

use moq_transport::serve::{ServeError, TracksReader};
use moq_transport::{
coding::Tuple,
serve::{ServeError, TracksReader},
};

#[derive(Clone)]
pub struct Locals {
lookup: Arc<Mutex<HashMap<String, TracksReader>>>,
lookup: Arc<Mutex<HashMap<Tuple, TracksReader>>>,
}

impl Default for Locals {
Expand Down Expand Up @@ -38,14 +41,14 @@ impl Locals {
Ok(registration)
}

pub fn route(&self, namespace: &str) -> Option<TracksReader> {
pub fn route(&self, namespace: &Tuple) -> Option<TracksReader> {
self.lookup.lock().unwrap().get(namespace).cloned()
}
}

pub struct Registration {
locals: Locals,
namespace: String,
namespace: Tuple,
}

impl Drop for Registration {
Expand Down
11 changes: 6 additions & 5 deletions moq-relay-ietf/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use moq_native_ietf::quic;
use moq_transport::coding::Tuple;
use moq_transport::serve::{Track, TrackReader, TrackWriter};
use moq_transport::watch::State;
use url::Url;
Expand Down Expand Up @@ -119,9 +120,9 @@ impl RemotesConsumer {
Self { info, state }
}

pub async fn route(&self, namespace: &str) -> anyhow::Result<Option<RemoteConsumer>> {
pub async fn route(&self, namespace: &Tuple) -> anyhow::Result<Option<RemoteConsumer>> {
// Always fetch the origin instead of using the (potentially invalid) cache.
let origin = match self.api.get_origin(namespace).await? {
let origin = match self.api.get_origin(&namespace.to_utf8_path()).await? {
None => return Ok(None),
Some(origin) => origin,
};
Expand Down Expand Up @@ -192,7 +193,7 @@ impl Remote {

#[derive(Default)]
struct RemoteState {
tracks: HashMap<(String, String), RemoteTrackWeak>,
tracks: HashMap<(Tuple, String), RemoteTrackWeak>,
requested: VecDeque<TrackWriter>,
}

Expand Down Expand Up @@ -285,7 +286,7 @@ impl RemoteConsumer {
}

/// Request a track from the broadcast.
pub fn subscribe(&self, namespace: String, name: String) -> anyhow::Result<Option<RemoteTrackReader>> {
pub fn subscribe(&self, namespace: Tuple, name: String) -> anyhow::Result<Option<RemoteTrackReader>> {
let key = (namespace.clone(), name.clone());
let state = self.state.lock();
if let Some(track) = state.tracks.get(&key) {
Expand Down Expand Up @@ -372,7 +373,7 @@ impl RemoteTrackWeak {

struct RemoteTrackDrop {
parent: State<RemoteState>,
key: (String, String),
key: (Tuple, String),
}

impl Drop for RemoteTrackDrop {
Expand Down
4 changes: 2 additions & 2 deletions moq-sub/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use url::Url;

use moq_native_ietf::quic;
use moq_sub::media::Media;
use moq_transport::serve::Tracks;
use moq_transport::{coding::Tuple, serve::Tracks};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
.context("failed to create MoQ Transport session")?;

// Associate empty set of Tracks with provided namespace
let tracks = Tracks::new(config.name);
let tracks = Tracks::new(Tuple::from_utf8_path(&config.name));

let mut media = Media::new(subscriber, tracks, out).await?;

Expand Down
2 changes: 2 additions & 0 deletions moq-transport/src/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ mod decode;
mod encode;
mod params;
mod string;
mod tuple;
mod varint;

pub use decode::*;
pub use encode::*;
pub use params::*;
pub use tuple::*;
pub use varint::*;
144 changes: 144 additions & 0 deletions moq-transport/src/coding/tuple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//
use super::{Decode, DecodeError, Encode, EncodeError};
use core::hash::{Hash, Hasher};
/// Tuple Field
#[derive(Clone, Debug, Default)]
pub struct TupleField {
pub value: Vec<u8>,
}

impl Eq for TupleField {}

impl PartialEq for TupleField {
fn eq(&self, other: &Self) -> bool {
self.value.eq(&other.value)
}
}

impl Hash for TupleField {
fn hash<H: Hasher>(&self, state: &mut H) {
self.value.hash(state);
}
}

impl Decode for TupleField {
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
let size = usize::decode(r)?;
Self::decode_remaining(r, size)?;
let mut buf = vec![0; size];
r.copy_to_slice(&mut buf);
Ok(Self { value: buf })
}
}

impl Encode for TupleField {
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.value.len().encode(w)?;
Self::encode_remaining(w, self.value.len())?;
w.put_slice(&self.value);
Ok(())
}
}

impl TupleField {
pub fn new() -> Self {
Self::default()
}

pub fn from_utf8(path: &str) -> Self {
let mut field = TupleField::new();
field.value = path.as_bytes().to_vec();
field
}

pub fn set<P: Encode>(&mut self, p: P) -> Result<(), EncodeError> {
let mut value = Vec::new();
p.encode(&mut value)?;
self.value = value;
Ok(())
}

pub fn get<P: Decode>(&self) -> Result<P, DecodeError> {
P::decode(&mut bytes::Bytes::from(self.value.clone()))
}
}

/// Tuple
#[derive(Clone, Debug, Default)]
pub struct Tuple {
pub fields: Vec<TupleField>,
}

impl Hash for Tuple {
fn hash<H: Hasher>(&self, state: &mut H) {
self.fields.hash(state);
}
}

impl Eq for Tuple {}

impl PartialEq for Tuple {
fn eq(&self, other: &Self) -> bool {
self.fields.eq(&other.fields)
}
}

impl Decode for Tuple {
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
let count = u64::decode(r)? as usize;
let mut fields = Vec::new();
for _ in 0..count {
fields.push(TupleField::decode(r)?);
}
Ok(Self { fields })
}
}

impl Encode for Tuple {
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.fields.len().encode(w)?;
for field in &self.fields {
field.encode(w)?;
}
Ok(())
}
}

impl Tuple {
pub fn new() -> Self {
Self::default()
}

pub fn add(&mut self, field: TupleField) {
self.fields.push(field);
}

pub fn get(&self, index: usize) -> Result<TupleField, DecodeError> {
self.fields[index].get()
}

pub fn set(&mut self, index: usize, f: TupleField) -> Result<(), EncodeError> {
self.fields[index].set(f)
}

pub fn clear(&mut self) {
self.fields.clear();
}

pub fn from_utf8_path(path: &str) -> Self {
let mut tuple = Tuple::new();
for part in path.split('/') {
tuple.add(TupleField::from_utf8(part));
}
tuple
}

pub fn to_utf8_path(&self) -> String {
let mut path = String::new();
for field in &self.fields {
path.push('/');
path.push_str(&String::from_utf8_lossy(&field.value));
}
path
}
}
7 changes: 7 additions & 0 deletions moq-transport/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ pub enum SessionError {
#[error("parameter length mismatch")]
ParameterLengthMismatch,
#[error("too many subscribes")]
TooManySubscribes,
#[error("goaway timeout")]
GoawayTimeout,
#[error("unknown error: code={0}")]
Unknown(u64),
// Unofficial error codes
Expand All @@ -40,6 +46,7 @@ impl MoqError for SessionError {
Self::ProtocolViolation => 0x3,
Self::DuplicateTrackAlias => 0x4,
Self::ParameterLengthMismatch => 0x5,
Self::TooManySubscribes => 0x6,
Self::GoawayTimeout => 0x10,
Self::Unknown(code) => *code,
// Unofficial error codes
Expand Down
Loading

0 comments on commit 2151545

Please sign in to comment.