diff --git a/moq-clock-ietf/src/main.rs b/moq-clock-ietf/src/main.rs index 24bf276..449d6c6 100644 --- a/moq-clock-ietf/src/main.rs +++ b/moq-clock-ietf/src/main.rs @@ -8,6 +8,7 @@ use clap::Parser; mod clock; use moq_transport::{ + coding::Tuple, serve, session::{Publisher, Subscriber}, }; @@ -64,7 +65,7 @@ async fn main() -> anyhow::Result<()> { .context("failed to create MoQ Transport session")?; let (mut writer, _, reader) = serve::Tracks { - namespace: config.namespace.clone(), + namespace: Tuple::from_utf8_path(&config.namespace), } .produce(); @@ -81,7 +82,7 @@ async fn main() -> anyhow::Result<()> { .await .context("failed to create MoQ Transport session")?; - let (prod, sub) = serve::Track::new(config.namespace, config.track).produce(); + let (prod, sub) = serve::Track::new(Tuple::from_utf8_path(&config.namespace), config.track).produce(); let clock = clock::Subscriber::new(sub); diff --git a/moq-dir/src/listings.rs b/moq-dir/src/listings.rs index ae5d26b..dd0adbc 100644 --- a/moq-dir/src/listings.rs +++ b/moq-dir/src/listings.rs @@ -3,6 +3,7 @@ use std::{ sync::{Arc, Mutex}, }; +use moq_transport::coding::Tuple; use moq_transport::serve::{ServeError, Tracks, TracksReader, TracksWriter}; use crate::{ListingReader, ListingWriter}; @@ -20,7 +21,7 @@ pub struct Listings { impl Listings { pub fn new(namespace: String) -> Self { - let (writer, _, reader) = Tracks::new(namespace).produce(); + let (writer, _, reader) = Tracks::new(Tuple::from_utf8_path(&namespace)).produce(); let state = State { writer, @@ -37,13 +38,15 @@ impl Listings { pub fn register(&mut self, path: &str) -> Result, ServeError> { let (prefix, base) = Self::prefix(path); - if !prefix.starts_with(&self.reader.namespace) { + let namespace = self.reader.namespace.to_utf8_path(); + + if !prefix.starts_with(&namespace) { // Ignore anything that isn't in our namespace. return Ok(None); } // Remove the namespace prefix from the path. - let prefix = &prefix[self.reader.namespace.len()..]; + let prefix = &prefix[namespace.len()..]; let mut state = self.state.lock().unwrap(); if let Some(listing) = state.active.get_mut(prefix) { diff --git a/moq-dir/src/session.rs b/moq-dir/src/session.rs index d302af5..03602c5 100644 --- a/moq-dir/src/session.rs +++ b/moq-dir/src/session.rs @@ -68,7 +68,7 @@ impl Session { async fn serve_announce(mut self, mut announce: Announced) -> anyhow::Result<()> { announce.ok()?; - match self.listings.register(&announce.namespace) { + match self.listings.register(&announce.namespace.to_utf8_path()) { Ok(_) => announce.closed().await?, Err(err) => { announce.close(err.clone())?; diff --git a/moq-pub/src/main.rs b/moq-pub/src/main.rs index 0f910cd..7898dac 100644 --- a/moq-pub/src/main.rs +++ b/moq-pub/src/main.rs @@ -8,7 +8,7 @@ use tokio::io::AsyncReadExt; use moq_native_ietf::quic; use moq_pub::Media; -use moq_transport::{serve, session::Publisher}; +use moq_transport::{coding::Tuple, serve, session::Publisher}; #[derive(Parser, Clone)] pub struct Cli { @@ -51,7 +51,7 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); - let (writer, _, reader) = serve::Tracks::new(cli.name).produce(); + let (writer, _, reader) = serve::Tracks::new(Tuple::from_utf8_path(&cli.name)).produce(); let media = Media::new(writer)?; let tls = cli.tls.load()?; diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index 5992e60..8198688 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -157,7 +157,7 @@ impl Media { let mut track = moq_catalog::Track { init_track: Some(self.init.name.clone()), name: name.clone(), - namespace: Some(self.broadcast.namespace.clone()), + namespace: Some(self.broadcast.namespace.to_utf8_path()), packaging: Some(moq_catalog::TrackPackaging::Cmaf), render_group: Some(1), ..Default::default() diff --git a/moq-relay-ietf/src/consumer.rs b/moq-relay-ietf/src/consumer.rs index 0d008a3..9a8c0e1 100644 --- a/moq-relay-ietf/src/consumer.rs +++ b/moq-relay-ietf/src/consumer.rs @@ -51,10 +51,10 @@ impl Consumer { async fn serve(mut self, mut announce: Announced) -> Result<(), anyhow::Error> { let mut tasks = FuturesUnordered::new(); - let (_, mut request, reader) = Tracks::new(announce.namespace.to_string()).produce(); + let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce(); if let Some(api) = self.api.as_ref() { - let mut refresh = api.set_origin(reader.namespace.clone()).await?; + let mut refresh = api.set_origin(reader.namespace.to_utf8_path()).await?; tasks.push(async move { refresh.run().await.context("failed refreshing origin") }.boxed()); } diff --git a/moq-relay-ietf/src/local.rs b/moq-relay-ietf/src/local.rs index 8e4217a..78194c4 100644 --- a/moq-relay-ietf/src/local.rs +++ b/moq-relay-ietf/src/local.rs @@ -3,11 +3,14 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use moq_transport::serve::{ServeError, TracksReader}; +use moq_transport::{ + coding::Tuple, + serve::{ServeError, TracksReader}, +}; #[derive(Clone)] pub struct Locals { - lookup: Arc>>, + lookup: Arc>>, } impl Default for Locals { @@ -38,14 +41,14 @@ impl Locals { Ok(registration) } - pub fn route(&self, namespace: &str) -> Option { + pub fn route(&self, namespace: &Tuple) -> Option { self.lookup.lock().unwrap().get(namespace).cloned() } } pub struct Registration { locals: Locals, - namespace: String, + namespace: Tuple, } impl Drop for Registration { diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index af96265..6f3e641 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -10,6 +10,7 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use moq_native_ietf::quic; +use moq_transport::coding::Tuple; use moq_transport::serve::{Track, TrackReader, TrackWriter}; use moq_transport::watch::State; use url::Url; @@ -119,9 +120,9 @@ impl RemotesConsumer { Self { info, state } } - pub async fn route(&self, namespace: &str) -> anyhow::Result> { + pub async fn route(&self, namespace: &Tuple) -> anyhow::Result> { // Always fetch the origin instead of using the (potentially invalid) cache. - let origin = match self.api.get_origin(namespace).await? { + let origin = match self.api.get_origin(&namespace.to_utf8_path()).await? { None => return Ok(None), Some(origin) => origin, }; @@ -192,7 +193,7 @@ impl Remote { #[derive(Default)] struct RemoteState { - tracks: HashMap<(String, String), RemoteTrackWeak>, + tracks: HashMap<(Tuple, String), RemoteTrackWeak>, requested: VecDeque, } @@ -285,7 +286,7 @@ impl RemoteConsumer { } /// Request a track from the broadcast. - pub fn subscribe(&self, namespace: String, name: String) -> anyhow::Result> { + pub fn subscribe(&self, namespace: Tuple, name: String) -> anyhow::Result> { let key = (namespace.clone(), name.clone()); let state = self.state.lock(); if let Some(track) = state.tracks.get(&key) { @@ -372,7 +373,7 @@ impl RemoteTrackWeak { struct RemoteTrackDrop { parent: State, - key: (String, String), + key: (Tuple, String), } impl Drop for RemoteTrackDrop { diff --git a/moq-sub/src/main.rs b/moq-sub/src/main.rs index f15382d..fc514d5 100644 --- a/moq-sub/src/main.rs +++ b/moq-sub/src/main.rs @@ -6,7 +6,7 @@ use url::Url; use moq_native_ietf::quic; use moq_sub::media::Media; -use moq_transport::serve::Tracks; +use moq_transport::{coding::Tuple, serve::Tracks}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { .context("failed to create MoQ Transport session")?; // Associate empty set of Tracks with provided namespace - let tracks = Tracks::new(config.name); + let tracks = Tracks::new(Tuple::from_utf8_path(&config.name)); let mut media = Media::new(subscriber, tracks, out).await?; diff --git a/moq-transport/src/coding/mod.rs b/moq-transport/src/coding/mod.rs index a3ff6f7..e471224 100644 --- a/moq-transport/src/coding/mod.rs +++ b/moq-transport/src/coding/mod.rs @@ -2,9 +2,11 @@ mod decode; mod encode; mod params; mod string; +mod tuple; mod varint; pub use decode::*; pub use encode::*; pub use params::*; +pub use tuple::*; pub use varint::*; diff --git a/moq-transport/src/coding/tuple.rs b/moq-transport/src/coding/tuple.rs new file mode 100644 index 0000000..8589095 --- /dev/null +++ b/moq-transport/src/coding/tuple.rs @@ -0,0 +1,144 @@ +// +use super::{Decode, DecodeError, Encode, EncodeError}; +use core::hash::{Hash, Hasher}; +/// Tuple Field +#[derive(Clone, Debug, Default)] +pub struct TupleField { + pub value: Vec, +} + +impl Eq for TupleField {} + +impl PartialEq for TupleField { + fn eq(&self, other: &Self) -> bool { + self.value.eq(&other.value) + } +} + +impl Hash for TupleField { + fn hash(&self, state: &mut H) { + self.value.hash(state); + } +} + +impl Decode for TupleField { + fn decode(r: &mut R) -> Result { + let size = usize::decode(r)?; + Self::decode_remaining(r, size)?; + let mut buf = vec![0; size]; + r.copy_to_slice(&mut buf); + Ok(Self { value: buf }) + } +} + +impl Encode for TupleField { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.value.len().encode(w)?; + Self::encode_remaining(w, self.value.len())?; + w.put_slice(&self.value); + Ok(()) + } +} + +impl TupleField { + pub fn new() -> Self { + Self::default() + } + + pub fn from_utf8(path: &str) -> Self { + let mut field = TupleField::new(); + field.value = path.as_bytes().to_vec(); + field + } + + pub fn set(&mut self, p: P) -> Result<(), EncodeError> { + let mut value = Vec::new(); + p.encode(&mut value)?; + self.value = value; + Ok(()) + } + + pub fn get(&self) -> Result { + P::decode(&mut bytes::Bytes::from(self.value.clone())) + } +} + +/// Tuple +#[derive(Clone, Debug, Default)] +pub struct Tuple { + pub fields: Vec, +} + +impl Hash for Tuple { + fn hash(&self, state: &mut H) { + self.fields.hash(state); + } +} + +impl Eq for Tuple {} + +impl PartialEq for Tuple { + fn eq(&self, other: &Self) -> bool { + self.fields.eq(&other.fields) + } +} + +impl Decode for Tuple { + fn decode(r: &mut R) -> Result { + let count = u64::decode(r)? as usize; + let mut fields = Vec::new(); + for _ in 0..count { + fields.push(TupleField::decode(r)?); + } + Ok(Self { fields }) + } +} + +impl Encode for Tuple { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.fields.len().encode(w)?; + for field in &self.fields { + field.encode(w)?; + } + Ok(()) + } +} + +impl Tuple { + pub fn new() -> Self { + Self::default() + } + + pub fn add(&mut self, field: TupleField) { + self.fields.push(field); + } + + pub fn get(&self, index: usize) -> Result { + self.fields[index].get() + } + + pub fn set(&mut self, index: usize, f: TupleField) -> Result<(), EncodeError> { + self.fields[index].set(f) + } + + pub fn clear(&mut self) { + self.fields.clear(); + } + + pub fn from_utf8_path(path: &str) -> Self { + let mut tuple = Tuple::new(); + for part in path.split('/') { + tuple.add(TupleField::from_utf8(part)); + } + tuple + } + + pub fn to_utf8_path(&self) -> String { + let mut path = String::new(); + for field in &self.fields { + path.push('/'); + path.push_str(&String::from_utf8_lossy(&field.value)); + } + path + } +} diff --git a/moq-transport/src/error.rs b/moq-transport/src/error.rs index 279eaf3..067a17b 100644 --- a/moq-transport/src/error.rs +++ b/moq-transport/src/error.rs @@ -21,9 +21,15 @@ pub enum SessionError { #[error("parameter length mismatch")] ParameterLengthMismatch, + #[error("too many subscribes")] + TooManySubscribes, + #[error("goaway timeout")] GoawayTimeout, + + + #[error("unknown error: code={0}")] Unknown(u64), // Unofficial error codes @@ -40,6 +46,7 @@ impl MoqError for SessionError { Self::ProtocolViolation => 0x3, Self::DuplicateTrackAlias => 0x4, Self::ParameterLengthMismatch => 0x5, + Self::TooManySubscribes => 0x6, Self::GoawayTimeout => 0x10, Self::Unknown(code) => *code, // Unofficial error codes diff --git a/moq-transport/src/message/announce.rs b/moq-transport/src/message/announce.rs index 114fb5c..00d1100 100644 --- a/moq-transport/src/message/announce.rs +++ b/moq-transport/src/message/announce.rs @@ -1,10 +1,10 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; /// Sent by the publisher to announce the availability of a group of tracks. #[derive(Clone, Debug)] pub struct Announce { /// The track namespace - pub namespace: String, + pub namespace: Tuple, /// Optional parameters pub params: Params, @@ -12,7 +12,7 @@ pub struct Announce { impl Decode for Announce { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; let params = Params::decode(r)?; Ok(Self { namespace, params }) diff --git a/moq-transport/src/message/announce_cancel.rs b/moq-transport/src/message/announce_cancel.rs index 2a3379f..086d2fe 100644 --- a/moq-transport/src/message/announce_cancel.rs +++ b/moq-transport/src/message/announce_cancel.rs @@ -1,10 +1,10 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Sent by the subscriber to reject an Announce after ANNOUNCE_OK #[derive(Clone, Debug)] pub struct AnnounceCancel { // Echo back the namespace that was reset - pub namespace: String, + pub namespace: Tuple, // An error code. //pub code: u64, @@ -14,7 +14,7 @@ pub struct AnnounceCancel { impl Decode for AnnounceCancel { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; //let code = u64::decode(r)?; //let reason = String::decode(r)?; diff --git a/moq-transport/src/message/announce_error.rs b/moq-transport/src/message/announce_error.rs index 4c468a3..6201ded 100644 --- a/moq-transport/src/message/announce_error.rs +++ b/moq-transport/src/message/announce_error.rs @@ -1,10 +1,10 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Sent by the subscriber to reject an Announce. #[derive(Clone, Debug)] pub struct AnnounceError { // Echo back the namespace that was reset - pub namespace: String, + pub namespace: Tuple, // An error code. pub code: u64, @@ -15,7 +15,7 @@ pub struct AnnounceError { impl Decode for AnnounceError { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; let code = u64::decode(r)?; let reason = String::decode(r)?; diff --git a/moq-transport/src/message/announce_ok.rs b/moq-transport/src/message/announce_ok.rs index 0178eb1..b00a557 100644 --- a/moq-transport/src/message/announce_ok.rs +++ b/moq-transport/src/message/announce_ok.rs @@ -1,16 +1,16 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Sent by the subscriber to accept an Announce. #[derive(Clone, Debug)] pub struct AnnounceOk { // Echo back the namespace that was announced. // TODO Propose using an ID to save bytes. - pub namespace: String, + pub namespace: Tuple, } impl Decode for AnnounceOk { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; Ok(Self { namespace }) } } diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 13524a5..52bd22f 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -43,6 +43,9 @@ mod publisher; mod subscribe; mod subscribe_done; mod subscribe_error; +mod subscribe_namespace; +mod subscribe_namespace_error; +mod subscribe_namespace_ok; mod subscribe_ok; mod subscribe_update; mod subscriber; @@ -50,6 +53,7 @@ mod track_status; mod track_status_request; mod unannounce; mod unsubscribe; +mod unsubscribe_namespace; pub use announce::*; pub use announce_cancel::*; @@ -62,6 +66,9 @@ pub use publisher::*; pub use subscribe::*; pub use subscribe_done::*; pub use subscribe_error::*; +pub use subscribe_namespace::*; +pub use subscribe_namespace_error::*; +pub use subscribe_namespace_ok::*; pub use subscribe_ok::*; pub use subscribe_update::*; pub use subscriber::*; @@ -69,6 +76,7 @@ pub use track_status::*; pub use track_status_request::*; pub use unannounce::*; pub use unsubscribe::*; +pub use unsubscribe_namespace::*; use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use std::fmt; @@ -192,6 +200,12 @@ message_types! { // Misc GoAway = 0x10, + + // NAMESPACE family, sent by subscriber + SubscribeNamespace = 0x11, + SubscribeNamespaceOk = 0x12, + SubscribeNamespaceError = 0x13, + UnsubscribeNamespace = 0x14, } /// Track Status Codes diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index fa6a186..3f142ce 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -1,4 +1,4 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; use crate::message::FilterType; use crate::message::GroupOrder; @@ -12,7 +12,7 @@ pub struct Subscribe { /// Track properties pub track_alias: u64, // This alias is useless but part of the spec - pub track_namespace: String, + pub track_namespace: Tuple, pub track_name: String, // Subscriber Priority @@ -34,7 +34,7 @@ impl Decode for Subscribe { fn decode(r: &mut R) -> Result { let id = u64::decode(r)?; let track_alias = u64::decode(r)?; - let track_namespace = String::decode(r)?; + let track_namespace = Tuple::decode(r)?; let track_name = String::decode(r)?; let subscriber_priority = u8::decode(r)?; diff --git a/moq-transport/src/message/subscribe_namespace.rs b/moq-transport/src/message/subscribe_namespace.rs new file mode 100644 index 0000000..68522ce --- /dev/null +++ b/moq-transport/src/message/subscribe_namespace.rs @@ -0,0 +1,33 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; + +/// Subscribe Namespace +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#section-6.11 +#[derive(Clone, Debug)] +pub struct SubscribeNamespace { + /// The track namespace + pub namespace_prefix: Tuple, + + /// Optional parameters + pub params: Params, +} + +impl Decode for SubscribeNamespace { + fn decode(r: &mut R) -> Result { + let namespace_prefix = Tuple::decode(r)?; + let params = Params::decode(r)?; + + Ok(Self { + namespace_prefix, + params, + }) + } +} + +impl Encode for SubscribeNamespace { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.namespace_prefix.encode(w)?; + self.params.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_namespace_error.rs b/moq-transport/src/message/subscribe_namespace_error.rs new file mode 100644 index 0000000..5a6ffb4 --- /dev/null +++ b/moq-transport/src/message/subscribe_namespace_error.rs @@ -0,0 +1,39 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; + +/// Subscribe Namespace Error +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#name-subscribe_namespace_error +#[derive(Clone, Debug)] +pub struct SubscribeNamespaceError { + // Echo back the namespace that was reset + pub namespace_prefix: Tuple, + + // An error code. + pub code: u64, + + // An optional, human-readable reason. + pub reason: String, +} + +impl Decode for SubscribeNamespaceError { + fn decode(r: &mut R) -> Result { + let namespace_prefix = Tuple::decode(r)?; + let code = u64::decode(r)?; + let reason = String::decode(r)?; + + Ok(Self { + namespace_prefix, + code, + reason, + }) + } +} + +impl Encode for SubscribeNamespaceError { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.namespace_prefix.encode(w)?; + self.code.encode(w)?; + self.reason.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_namespace_ok.rs b/moq-transport/src/message/subscribe_namespace_ok.rs new file mode 100644 index 0000000..df5b9ae --- /dev/null +++ b/moq-transport/src/message/subscribe_namespace_ok.rs @@ -0,0 +1,22 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; + +/// Subscribe Namespace Ok +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#name-subscribe_namespace_ok +#[derive(Clone, Debug)] +pub struct SubscribeNamespaceOk { + // Echo back the namespace that was announced. + pub namespace_prefix: Tuple, +} + +impl Decode for SubscribeNamespaceOk { + fn decode(r: &mut R) -> Result { + let namespace_prefix = Tuple::decode(r)?; + Ok(Self { namespace_prefix }) + } +} + +impl Encode for SubscribeNamespaceOk { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.namespace_prefix.encode(w) + } +} diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/message/subscribe_ok.rs index 15e07f0..1c4ef17 100644 --- a/moq-transport/src/message/subscribe_ok.rs +++ b/moq-transport/src/message/subscribe_ok.rs @@ -35,6 +35,10 @@ impl Decode for SubscribeOk { _ => return Err(DecodeError::InvalidValue), }; + // Skip the parameters. + // TODO: Implement parameters for SubscribeOk + let _ = u8::decode(r)?; + Ok(Self { id, expires, @@ -64,6 +68,9 @@ impl Encode for SubscribeOk { } } + // Add 0 for the length of the parameters + w.put_u8(0); + Ok(()) } } diff --git a/moq-transport/src/message/subscribe_update.rs b/moq-transport/src/message/subscribe_update.rs index 2547d82..4f67f9e 100644 --- a/moq-transport/src/message/subscribe_update.rs +++ b/moq-transport/src/message/subscribe_update.rs @@ -1,4 +1,4 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; use crate::message::subscribe::{SubscribeLocation, SubscribePair}; use crate::message::FilterType; use crate::message::GroupOrder; @@ -13,7 +13,7 @@ pub struct SubscribeUpdate { /// Track properties pub track_alias: u64, // This alias is useless but part of the spec - pub track_namespace: String, + pub track_namespace: Tuple, pub track_name: String, // Subscriber Priority @@ -35,7 +35,7 @@ impl Decode for SubscribeUpdate { fn decode(r: &mut R) -> Result { let id = u64::decode(r)?; let track_alias = u64::decode(r)?; - let track_namespace = String::decode(r)?; + let track_namespace = Tuple::decode(r)?; let track_name = String::decode(r)?; let subscriber_priority = u8::decode(r)?; diff --git a/moq-transport/src/message/subscriber.rs b/moq-transport/src/message/subscriber.rs index 5b699aa..8047312 100644 --- a/moq-transport/src/message/subscriber.rs +++ b/moq-transport/src/message/subscriber.rs @@ -52,4 +52,8 @@ subscriber_msgs! { Unsubscribe, SubscribeUpdate, TrackStatusRequest, + SubscribeNamespace, + SubscribeNamespaceOk, + SubscribeNamespaceError, + UnsubscribeNamespace, } diff --git a/moq-transport/src/message/track_status.rs b/moq-transport/src/message/track_status.rs index 16e0dc0..df2458b 100644 --- a/moq-transport/src/message/track_status.rs +++ b/moq-transport/src/message/track_status.rs @@ -1,10 +1,10 @@ use super::TrackStatusCode; -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; #[derive(Clone, Debug)] pub struct TrackStatus { /// Track Namespace - pub track_namespace: String, + pub track_namespace: Tuple, /// Track Name pub track_name: String, /// Status Code @@ -18,7 +18,7 @@ pub struct TrackStatus { impl Decode for TrackStatus { fn decode(r: &mut R) -> Result { Ok(Self { - track_namespace: String::decode(r)?, + track_namespace: Tuple::decode(r)?, track_name: String::decode(r)?, status_code: TrackStatusCode::decode(r)?, last_group_id: u64::decode(r)?, diff --git a/moq-transport/src/message/track_status_request.rs b/moq-transport/src/message/track_status_request.rs index 8900066..e2c3707 100644 --- a/moq-transport/src/message/track_status_request.rs +++ b/moq-transport/src/message/track_status_request.rs @@ -1,16 +1,16 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; #[derive(Clone, Debug)] pub struct TrackStatusRequest { /// Track Namespace - pub track_namespace: String, + pub track_namespace: Tuple, /// Track Name pub track_name: String, } impl Decode for TrackStatusRequest { fn decode(r: &mut R) -> Result { - let track_namespace = String::decode(r)?; + let track_namespace = Tuple::decode(r)?; let track_name = String::decode(r)?; Ok(Self { diff --git a/moq-transport/src/message/unannounce.rs b/moq-transport/src/message/unannounce.rs index e856bf0..4c510e8 100644 --- a/moq-transport/src/message/unannounce.rs +++ b/moq-transport/src/message/unannounce.rs @@ -1,15 +1,15 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Sent by the publisher to terminate an Announce. #[derive(Clone, Debug)] pub struct Unannounce { // Echo back the namespace that was reset - pub namespace: String, + pub namespace: Tuple, } impl Decode for Unannounce { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; Ok(Self { namespace }) } diff --git a/moq-transport/src/message/unsubscribe_namespace.rs b/moq-transport/src/message/unsubscribe_namespace.rs new file mode 100644 index 0000000..2fe9d09 --- /dev/null +++ b/moq-transport/src/message/unsubscribe_namespace.rs @@ -0,0 +1,23 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; + +/// Unsubscribe Namespace +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#name-unsubscribe_namespace +#[derive(Clone, Debug)] +pub struct UnsubscribeNamespace { + // Echo back the namespace that was reset + pub namespace_prefix: Tuple, +} + +impl Decode for UnsubscribeNamespace { + fn decode(r: &mut R) -> Result { + let namespace_prefix = Tuple::decode(r)?; + Ok(Self { namespace_prefix }) + } +} + +impl Encode for UnsubscribeNamespace { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.namespace_prefix.encode(w)?; + Ok(()) + } +} diff --git a/moq-transport/src/serve/track.rs b/moq-transport/src/serve/track.rs index 28afc55..fd3ed09 100644 --- a/moq-transport/src/serve/track.rs +++ b/moq-transport/src/serve/track.rs @@ -18,18 +18,19 @@ use super::{ Datagrams, DatagramsReader, DatagramsWriter, ObjectsWriter, ServeError, Stream, StreamReader, StreamWriter, Subgroups, SubgroupsReader, SubgroupsWriter, }; +use crate::coding::Tuple; use paste::paste; use std::{ops::Deref, sync::Arc}; /// Static information about a track. #[derive(Debug, Clone, PartialEq)] pub struct Track { - pub namespace: String, + pub namespace: Tuple, pub name: String, } impl Track { - pub fn new(namespace: String, name: String) -> Self { + pub fn new(namespace: Tuple, name: String) -> Self { Self { namespace, name } } diff --git a/moq-transport/src/serve/tracks.rs b/moq-transport/src/serve/tracks.rs index b2de227..f6120e7 100644 --- a/moq-transport/src/serve/tracks.rs +++ b/moq-transport/src/serve/tracks.rs @@ -13,16 +13,17 @@ use std::{collections::HashMap, ops::Deref, sync::Arc}; use super::{ServeError, Track, TrackReader, TrackWriter}; +use crate::coding::Tuple; use crate::watch::{Queue, State}; /// Static information about a broadcast. #[derive(Debug)] pub struct Tracks { - pub namespace: String, + pub namespace: Tuple, } impl Tracks { - pub fn new(namespace: String) -> Self { + pub fn new(namespace: Tuple) -> Self { Self { namespace } } diff --git a/moq-transport/src/session/announce.rs b/moq-transport/src/session/announce.rs index 706be52..478a266 100644 --- a/moq-transport/src/session/announce.rs +++ b/moq-transport/src/session/announce.rs @@ -1,5 +1,6 @@ use std::{collections::VecDeque, ops}; +use crate::coding::Tuple; use crate::watch::State; use crate::{message, serve::ServeError}; @@ -7,7 +8,7 @@ use super::{Publisher, Subscribed, TrackStatusRequested}; #[derive(Debug, Clone)] pub struct AnnounceInfo { - pub namespace: String, + pub namespace: Tuple, } struct AnnounceState { @@ -45,7 +46,7 @@ pub struct Announce { } impl Announce { - pub(super) fn new(mut publisher: Publisher, namespace: String) -> (Announce, AnnounceRecv) { + pub(super) fn new(mut publisher: Publisher, namespace: Tuple) -> (Announce, AnnounceRecv) { let info = AnnounceInfo { namespace: namespace.clone(), }; @@ -148,7 +149,7 @@ impl Drop for Announce { } self.publisher.send_message(message::Unannounce { - namespace: self.namespace.to_string(), + namespace: self.namespace.clone(), }); } } diff --git a/moq-transport/src/session/announced.rs b/moq-transport/src/session/announced.rs index 78b00a2..17f3b5d 100644 --- a/moq-transport/src/session/announced.rs +++ b/moq-transport/src/session/announced.rs @@ -1,5 +1,6 @@ use std::ops; +use crate::coding::Tuple; use crate::watch::State; use crate::{message, serve::ServeError}; @@ -21,7 +22,7 @@ pub struct Announced { } impl Announced { - pub(super) fn new(session: Subscriber, namespace: String) -> (Announced, AnnouncedRecv) { + pub(super) fn new(session: Subscriber, namespace: Tuple) -> (Announced, AnnouncedRecv) { let info = AnnounceInfo { namespace }; let (send, recv) = State::default().split(); diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 20da553..ed58246 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -6,6 +6,7 @@ use std::{ use futures::{stream::FuturesUnordered, StreamExt}; use crate::{ + coding::Tuple, message::{self, Message}, serve::{ServeError, TracksReader}, setup, @@ -20,7 +21,7 @@ use super::{Announce, AnnounceRecv, Session, SessionError, Subscribed, Subscribe pub struct Publisher { webtransport: web_transport::Session, - announces: Arc>>, + announces: Arc>>, subscribed: Arc>>, unknown: Queue, @@ -162,6 +163,11 @@ impl Publisher { message::Subscriber::Unsubscribe(msg) => self.recv_unsubscribe(msg), message::Subscriber::SubscribeUpdate(msg) => self.recv_subscribe_update(msg), message::Subscriber::TrackStatusRequest(msg) => self.recv_track_status_request(msg), + // TODO: Implement namespace messages. + message::Subscriber::SubscribeNamespace(_msg) => unimplemented!(), + message::Subscriber::SubscribeNamespaceOk(_msg) => unimplemented!(), + message::Subscriber::SubscribeNamespaceError(_msg) => unimplemented!(), + message::Subscriber::UnsubscribeNamespace(_msg) => unimplemented!(), }; if let Err(err) = res { @@ -259,7 +265,7 @@ impl Publisher { match &msg { message::Publisher::SubscribeDone(msg) => self.drop_subscribe(msg.id), message::Publisher::SubscribeError(msg) => self.drop_subscribe(msg.id), - message::Publisher::Unannounce(msg) => self.drop_announce(msg.namespace.as_str()), + message::Publisher::Unannounce(msg) => self.drop_announce(&msg.namespace), _ => (), }; @@ -270,7 +276,7 @@ impl Publisher { self.subscribed.lock().unwrap().remove(&id); } - fn drop_announce(&mut self, namespace: &str) { + fn drop_announce(&mut self, namespace: &Tuple) { self.announces.lock().unwrap().remove(namespace); } diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 1097a39..a5ac476 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -1,6 +1,7 @@ use std::ops; use crate::{ + coding::Tuple, data, message::{self, FilterType, GroupOrder, SubscribeLocation, SubscribePair}, serve::{self, ServeError, TrackWriter, TrackWriterMode}, @@ -12,7 +13,7 @@ use super::Subscriber; #[derive(Debug, Clone)] pub struct SubscribeInfo { - pub namespace: String, + pub namespace: Tuple, pub name: String, } diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 76f93ab..8428187 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -5,7 +5,7 @@ use std::{ }; use crate::{ - coding::Decode, + coding::{Decode, Tuple}, data, message::{self, Message}, serve::{self, ServeError}, @@ -19,7 +19,7 @@ use super::{Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, // TODO remove Clone. #[derive(Clone)] pub struct Subscriber { - announced: Arc>>, + announced: Arc>>, announced_queue: Queue, subscribes: Arc>>, @@ -102,7 +102,7 @@ impl Subscriber { hash_map::Entry::Vacant(entry) => entry, }; - let (announced, recv) = Announced::new(self.clone(), msg.namespace.to_string()); + let (announced, recv) = Announced::new(self.clone(), msg.namespace.clone()); if let Err(announced) = self.announced_queue.push(announced) { announced.close(ServeError::Cancel)?; return Ok(()); @@ -152,7 +152,7 @@ impl Subscriber { Ok(()) } - fn drop_announce(&mut self, namespace: &str) { + fn drop_announce(&mut self, namespace: &Tuple) { self.announced.lock().unwrap().remove(namespace); } diff --git a/moq-transport/src/session/track_status_requested.rs b/moq-transport/src/session/track_status_requested.rs index 75fb1ba..16decfe 100644 --- a/moq-transport/src/session/track_status_requested.rs +++ b/moq-transport/src/session/track_status_requested.rs @@ -1,9 +1,10 @@ use super::{Publisher, SessionError}; +use crate::coding::Tuple; use crate::message; #[derive(Debug, Clone)] pub struct TrackStatusRequestedInfo { - pub namespace: String, + pub namespace: Tuple, pub track: String, } diff --git a/moq-transport/src/setup/client.rs b/moq-transport/src/setup/client.rs index 9b414d0..58c1d5e 100644 --- a/moq-transport/src/setup/client.rs +++ b/moq-transport/src/setup/client.rs @@ -24,6 +24,10 @@ impl Decode for Client { return Err(DecodeError::InvalidMessage(typ)); } + let _len = u64::decode(r)?; + + // TODO: Check the length of the message. + let versions = Versions::decode(r)?; let mut params = Params::decode(r)?; @@ -43,12 +47,23 @@ impl Encode for Client { /// Encode a server setup message. fn encode(&self, w: &mut W) -> Result<(), EncodeError> { 0x40_u64.encode(w)?; - self.versions.encode(w)?; + + // Find out the length of the message + // by encoding it into a buffer and then encoding the length. + // This is a bit wasteful, but it's the only way to know the length. + let mut buf = Vec::new(); + + self.versions.encode(&mut buf).unwrap(); let mut params = self.params.clone(); params.set(0, self.role)?; + params.encode(&mut buf).unwrap(); + + (buf.len() as u64).encode(w)?; - params.encode(w)?; + // At least don't encode the message twice. + // Instead, write the buffer directly to the writer. + w.put_slice(&buf); Ok(()) } diff --git a/moq-transport/src/setup/server.rs b/moq-transport/src/setup/server.rs index 113bcdf..aaeb55d 100644 --- a/moq-transport/src/setup/server.rs +++ b/moq-transport/src/setup/server.rs @@ -25,6 +25,10 @@ impl Decode for Server { return Err(DecodeError::InvalidMessage(typ)); } + let _len = u64::decode(r)?; + + // TODO: Check the length of the message. + let version = Version::decode(r)?; let mut params = Params::decode(r)?; @@ -42,11 +46,23 @@ impl Decode for Server { impl Encode for Server { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { 0x41_u64.encode(w)?; - self.version.encode(w)?; + + // Find out the length of the message + // by encoding it into a buffer and then encoding the length. + // This is a bit wasteful, but it's the only way to know the length. + let mut buf = Vec::new(); + + self.version.encode(&mut buf).unwrap(); let mut params = self.params.clone(); params.set(0, self.role)?; - params.encode(w)?; + params.encode(&mut buf).unwrap(); + + (buf.len() as u64).encode(w)?; + + // At least don't encode the message twice. + // Instead, write the buffer directly to the writer. + w.put_slice(&buf); Ok(()) } @@ -58,6 +74,7 @@ mod tests { use crate::setup::Role; use bytes::BytesMut; + // TODO: Update this test to use the draft 06 version #[test] fn encode_decode() { let mut buf = BytesMut::new();