From 09868aaf5ad3626cea00efde479b6f5bab501adc Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 29 Oct 2024 16:36:56 -0400 Subject: [PATCH 01/20] moq-transport: Bump target draft version to 06 --- moq-transport/src/session/mod.rs | 8 ++++---- moq-transport/src/setup/version.rs | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index 9063099..0e6dbed 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -79,7 +79,7 @@ impl Session { let mut sender = Writer::new(control.0); let mut recver = Reader::new(control.1); - let versions: setup::Versions = [setup::Version::DRAFT_05].into(); + let versions: setup::Versions = [setup::Version::DRAFT_06].into(); let client = setup::Client { role, @@ -128,10 +128,10 @@ impl Session { let client: setup::Client = recver.decode().await?; log::debug!("received client SETUP: {:?}", client); - if !client.versions.contains(&setup::Version::DRAFT_05) { + if !client.versions.contains(&setup::Version::DRAFT_06) { return Err(SessionError::Version( client.versions, - [setup::Version::DRAFT_05].into(), + [setup::Version::DRAFT_06].into(), )); } @@ -152,7 +152,7 @@ impl Session { let server = setup::Server { role, - version: setup::Version::DRAFT_05, + version: setup::Version::DRAFT_06, params: Default::default(), }; diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/setup/version.rs index 5d40b2e..6a68387 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/setup/version.rs @@ -24,6 +24,9 @@ impl Version { /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-05.html pub const DRAFT_05: Version = Version(0xff000005); + + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html + pub const DRAFT_06: Version = Version(0xff000006); } impl From for Version { From 804e3c7d37df5a56cac36c5412202bf5c7c2e12e Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Fri, 25 Oct 2024 17:03:34 +0300 Subject: [PATCH 02/20] rename groups to subgroups --- moq-transport/src/data/mod.rs | 4 +- moq-transport/src/data/object.rs | 6 +- .../src/data/{group.rs => subgroup.rs} | 17 +- moq-transport/src/serve/mod.rs | 4 +- .../src/serve/{group.rs => subgroup.rs} | 175 +++++++++--------- 5 files changed, 110 insertions(+), 96 deletions(-) rename moq-transport/src/data/{group.rs => subgroup.rs} (84%) rename moq-transport/src/serve/{group.rs => subgroup.rs} (71%) diff --git a/moq-transport/src/data/mod.rs b/moq-transport/src/data/mod.rs index 2090093..911a8a5 100644 --- a/moq-transport/src/data/mod.rs +++ b/moq-transport/src/data/mod.rs @@ -1,11 +1,11 @@ mod datagram; -mod group; mod header; mod object; +mod subgroup; mod track; pub use datagram::*; -pub use group::*; pub use header::*; pub use object::*; +pub use subgroup::*; pub use track::*; diff --git a/moq-transport/src/data/object.rs b/moq-transport/src/data/object.rs index 6d72918..5bb4bae 100644 --- a/moq-transport/src/data/object.rs +++ b/moq-transport/src/data/object.rs @@ -4,9 +4,9 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; pub enum ObjectStatus { Object = 0x0, ObjectDoesNotExist = 0x1, - GroupDoesNotExist = 0x2, EndOfGroup = 0x3, EndOfTrack = 0x4, + EndOfSubgroup = 0x5, } impl Decode for ObjectStatus { @@ -14,9 +14,9 @@ impl Decode for ObjectStatus { match u64::decode(r)? { 0x0 => Ok(Self::Object), 0x1 => Ok(Self::ObjectDoesNotExist), - 0x2 => Ok(Self::GroupDoesNotExist), 0x3 => Ok(Self::EndOfGroup), 0x4 => Ok(Self::EndOfTrack), + 0x5 => Ok(Self::EndOfSubgroup), _ => Err(DecodeError::InvalidObjectStatus), } } @@ -27,9 +27,9 @@ impl Encode for ObjectStatus { match self { Self::Object => (0x0_u64).encode(w), Self::ObjectDoesNotExist => (0x1_u64).encode(w), - Self::GroupDoesNotExist => (0x2_u64).encode(w), Self::EndOfGroup => (0x3_u64).encode(w), Self::EndOfTrack => (0x4_u64).encode(w), + Self::EndOfSubgroup => (0x5_u64).encode(w), } } } diff --git a/moq-transport/src/data/group.rs b/moq-transport/src/data/subgroup.rs similarity index 84% rename from moq-transport/src/data/group.rs rename to moq-transport/src/data/subgroup.rs index 7cefbea..27ff46e 100644 --- a/moq-transport/src/data/group.rs +++ b/moq-transport/src/data/subgroup.rs @@ -2,7 +2,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use crate::data::ObjectStatus; #[derive(Clone, Debug)] -pub struct GroupHeader { +pub struct SubgroupHeader { // The subscribe ID. pub subscribe_id: u64, @@ -12,26 +12,31 @@ pub struct GroupHeader { // The group sequence number pub group_id: u64, + // The subgroup sequence number + pub subgroup_id: u64, + // Publisher priority, where **smaller** values are sent first. pub publisher_priority: u8, } -impl Decode for GroupHeader { +impl Decode for SubgroupHeader { fn decode(r: &mut R) -> Result { Ok(Self { subscribe_id: u64::decode(r)?, track_alias: u64::decode(r)?, group_id: u64::decode(r)?, + subgroup_id: u64::decode(r)?, publisher_priority: u8::decode(r)?, }) } } -impl Encode for GroupHeader { +impl Encode for SubgroupHeader { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.subscribe_id.encode(w)?; self.track_alias.encode(w)?; self.group_id.encode(w)?; + self.subgroup_id.encode(w)?; self.publisher_priority.encode(w)?; Ok(()) @@ -39,13 +44,13 @@ impl Encode for GroupHeader { } #[derive(Clone, Debug)] -pub struct GroupObject { +pub struct SubgroupObject { pub object_id: u64, pub size: usize, pub status: ObjectStatus, } -impl Decode for GroupObject { +impl Decode for SubgroupObject { fn decode(r: &mut R) -> Result { let object_id = u64::decode(r)?; let size = usize::decode(r)?; @@ -66,7 +71,7 @@ impl Decode for GroupObject { } } -impl Encode for GroupObject { +impl Encode for SubgroupObject { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.object_id.encode(w)?; self.size.encode(w)?; diff --git a/moq-transport/src/serve/mod.rs b/moq-transport/src/serve/mod.rs index 0e7a0ec..be41c0c 100644 --- a/moq-transport/src/serve/mod.rs +++ b/moq-transport/src/serve/mod.rs @@ -1,15 +1,15 @@ mod datagram; mod error; -mod group; mod object; mod stream; +mod subgroup; mod track; mod tracks; pub use datagram::*; pub use error::*; -pub use group::*; pub use object::*; pub use stream::*; +pub use subgroup::*; pub use track::*; pub use tracks::*; diff --git a/moq-transport/src/serve/group.rs b/moq-transport/src/serve/subgroup.rs similarity index 71% rename from moq-transport/src/serve/group.rs rename to moq-transport/src/serve/subgroup.rs index 0d18572..06613a5 100644 --- a/moq-transport/src/serve/group.rs +++ b/moq-transport/src/serve/subgroup.rs @@ -15,22 +15,22 @@ use crate::watch::State; use super::{ServeError, Track}; -pub struct Groups { +pub struct Subgroups { pub track: Arc, } -impl Groups { - pub fn produce(self) -> (GroupsWriter, GroupsReader) { +impl Subgroups { + pub fn produce(self) -> (SubgroupsWriter, SubgroupsReader) { let (writer, reader) = State::default().split(); - let writer = GroupsWriter::new(writer, self.track.clone()); - let reader = GroupsReader::new(reader, self.track); + let writer = SubgroupsWriter::new(writer, self.track.clone()); + let reader = SubgroupsReader::new(reader, self.track); (writer, reader) } } -impl Deref for Groups { +impl Deref for Subgroups { type Target = Track; fn deref(&self) -> &Self::Target { @@ -39,13 +39,13 @@ impl Deref for Groups { } // State shared between the writer and reader. -struct GroupsState { - latest: Option, +struct SubgroupsState { + latest: Option, epoch: u64, // Updated each time latest changes closed: Result<(), ServeError>, } -impl Default for GroupsState { +impl Default for SubgroupsState { fn default() -> Self { Self { latest: None, @@ -55,14 +55,14 @@ impl Default for GroupsState { } } -pub struct GroupsWriter { +pub struct SubgroupsWriter { pub info: Arc, - state: State, + state: State, next: u64, // Not in the state to avoid a lock } -impl GroupsWriter { - fn new(state: State, track: Arc) -> Self { +impl SubgroupsWriter { + fn new(state: State, track: Arc) -> Self { Self { info: track, state, @@ -71,20 +71,21 @@ impl GroupsWriter { } // Helper to increment the group by one. - pub fn append(&mut self, priority: u8) -> Result { - self.create(Group { - group_id: self.next, + pub fn append(&mut self, priority: u8) -> Result { + self.create(Subgroup { + subgroup_id: self.next, priority, }) } - pub fn create(&mut self, group: Group) -> Result { - let group = GroupInfo { + pub fn create(&mut self, subgroup: Subgroup) -> Result { + let subgroup = SubgroupInfo { track: self.info.clone(), - group_id: group.group_id, - priority: group.priority, + group_id: subgroup.group_id, + subgroup_id: subgroup.subgroup_id, + priority: subgroup.priority, }; - let (writer, reader) = group.produce(); + let (writer, reader) = subgroup.produce(); let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?; @@ -97,8 +98,8 @@ impl GroupsWriter { } else { state.latest = Some(reader); } - - self.next = state.latest.as_ref().unwrap().group_id + 1; + // TODO: group_id should be incremented somewhere + self.next = state.latest.as_ref().unwrap().subgroup_id + 1; state.epoch += 1; Ok(writer) @@ -116,7 +117,7 @@ impl GroupsWriter { } } -impl Deref for GroupsWriter { +impl Deref for SubgroupsWriter { type Target = Track; fn deref(&self) -> &Self::Target { @@ -125,14 +126,14 @@ impl Deref for GroupsWriter { } #[derive(Clone)] -pub struct GroupsReader { +pub struct SubgroupsReader { pub info: Arc, - state: State, + state: State, epoch: u64, } -impl GroupsReader { - fn new(state: State, track: Arc) -> Self { +impl SubgroupsReader { + fn new(state: State, track: Arc) -> Self { Self { info: track, state, @@ -140,7 +141,7 @@ impl GroupsReader { } } - pub async fn next(&mut self) -> Result, ServeError> { + pub async fn next(&mut self) -> Result, ServeError> { loop { { let state = self.state.lock(); @@ -167,7 +168,7 @@ impl GroupsReader { } } -impl Deref for GroupsReader { +impl Deref for SubgroupsReader { type Target = Track; fn deref(&self) -> &Self::Target { @@ -177,41 +178,49 @@ impl Deref for GroupsReader { /// Parameters that can be specified by the user #[derive(Debug, Clone, PartialEq)] -pub struct Group { +pub struct Subgroup { // The sequence number of the group within the track. // NOTE: These may be received out of order or with gaps. pub group_id: u64, + // The sequence number of the subgroup within the group. + // NOTE: These may be received out of order or with gaps. + pub subgroup_id: u64, + // The priority of the group within the track. pub priority: u8, } /// Static information about the group #[derive(Debug, Clone, PartialEq)] -pub struct GroupInfo { +pub struct SubgroupInfo { pub track: Arc, // The sequence number of the group within the track. // NOTE: These may be received out of order or with gaps. pub group_id: u64, + // The sequence number of the subgroup within the group. + // NOTE: These may be received out of order or with gaps. + pub subgroup_id: u64, + // The priority of the group within the track. pub priority: u8, } -impl GroupInfo { - pub fn produce(self) -> (GroupWriter, GroupReader) { +impl SubgroupInfo { + pub fn produce(self) -> (SubgroupWriter, SubgroupReader) { let (writer, reader) = State::default().split(); let info = Arc::new(self); - let writer = GroupWriter::new(writer, info.clone()); - let reader = GroupReader::new(reader, info); + let writer = SubgroupWriter::new(writer, info.clone()); + let reader = SubgroupReader::new(reader, info); (writer, reader) } } -impl Deref for GroupInfo { +impl Deref for SubgroupInfo { type Target = Track; fn deref(&self) -> &Self::Target { @@ -219,15 +228,15 @@ impl Deref for GroupInfo { } } -struct GroupState { +struct SubgroupState { // The data that has been received thus far. - objects: Vec, + objects: Vec, // Set when the writer or all readers are dropped. closed: Result<(), ServeError>, } -impl Default for GroupState { +impl Default for SubgroupState { fn default() -> Self { Self { objects: Vec::new(), @@ -237,19 +246,19 @@ impl Default for GroupState { } /// Used to write data to a stream and notify readers. -pub struct GroupWriter { +pub struct SubgroupWriter { // Mutable stream state. - state: State, + state: State, // Immutable stream state. - pub info: Arc, + pub info: Arc, // The next object sequence number to use. next: u64, } -impl GroupWriter { - fn new(state: State, group: Arc) -> Self { +impl SubgroupWriter { + fn new(state: State, group: Arc) -> Self { Self { state, info: group, @@ -267,8 +276,8 @@ impl GroupWriter { /// Write an object over multiple writes. /// /// BAD STUFF will happen if the size is wrong; this is an advanced feature. - pub fn create(&mut self, size: usize) -> Result { - let (writer, reader) = GroupObject { + pub fn create(&mut self, size: usize) -> Result { + let (writer, reader) = SubgroupObject { group: self.info.clone(), object_id: self.next, status: ObjectStatus::Object, @@ -303,8 +312,8 @@ impl GroupWriter { } } -impl Deref for GroupWriter { - type Target = GroupInfo; +impl Deref for SubgroupWriter { + type Target = SubgroupInfo; fn deref(&self) -> &Self::Target { &self.info @@ -313,23 +322,23 @@ impl Deref for GroupWriter { /// Notified when a stream has new data available. #[derive(Clone)] -pub struct GroupReader { +pub struct SubgroupReader { // Modify the stream state. - state: State, + state: State, // Immutable stream state. - pub info: Arc, + pub info: Arc, // The number of chunks that we've read. // NOTE: Cloned readers inherit this index, but then run in parallel. index: usize, } -impl GroupReader { - fn new(state: State, group: Arc) -> Self { +impl SubgroupReader { + fn new(state: State, subgroup: Arc) -> Self { Self { state, - info: group, + info: subgroup, index: 0, } } @@ -347,7 +356,7 @@ impl GroupReader { } } - pub async fn next(&mut self) -> Result, ServeError> { + pub async fn next(&mut self) -> Result, ServeError> { loop { { let state = self.state.lock(); @@ -381,8 +390,8 @@ impl GroupReader { } } -impl Deref for GroupReader { - type Target = GroupInfo; +impl Deref for SubgroupReader { + type Target = SubgroupInfo; fn deref(&self) -> &Self::Target { &self.info @@ -391,8 +400,8 @@ impl Deref for GroupReader { /// A subset of Object, since we use the group's info. #[derive(Clone, PartialEq, Debug)] -pub struct GroupObject { - pub group: Arc, +pub struct SubgroupObject { + pub group: Arc, pub object_id: u64, @@ -403,27 +412,27 @@ pub struct GroupObject { pub status: ObjectStatus, } -impl GroupObject { - pub fn produce(self) -> (GroupObjectWriter, GroupObjectReader) { +impl SubgroupObject { + pub fn produce(self) -> (SubgroupObjectWriter, SubgroupObjectReader) { let (writer, reader) = State::default().split(); let info = Arc::new(self); - let writer = GroupObjectWriter::new(writer, info.clone()); - let reader = GroupObjectReader::new(reader, info); + let writer = SubgroupObjectWriter::new(writer, info.clone()); + let reader = SubgroupObjectReader::new(reader, info); (writer, reader) } } -impl Deref for GroupObject { - type Target = GroupInfo; +impl Deref for SubgroupObject { + type Target = SubgroupInfo; fn deref(&self) -> &Self::Target { &self.group } } -struct GroupObjectState { +struct SubgroupObjectState { // The data that has been received thus far. chunks: Vec, @@ -431,7 +440,7 @@ struct GroupObjectState { closed: Result<(), ServeError>, } -impl Default for GroupObjectState { +impl Default for SubgroupObjectState { fn default() -> Self { Self { chunks: Vec::new(), @@ -441,20 +450,20 @@ impl Default for GroupObjectState { } /// Used to write data to a segment and notify readers. -pub struct GroupObjectWriter { +pub struct SubgroupObjectWriter { // Mutable segment state. - state: State, + state: State, // Immutable segment state. - pub info: Arc, + pub info: Arc, // The amount of promised data that has yet to be written. remain: usize, } -impl GroupObjectWriter { +impl SubgroupObjectWriter { /// Create a new segment with the given info. - fn new(state: State, object: Arc) -> Self { + fn new(state: State, object: Arc) -> Self { Self { state, remain: object.size, @@ -491,7 +500,7 @@ impl GroupObjectWriter { } } -impl Drop for GroupObjectWriter { +impl Drop for SubgroupObjectWriter { fn drop(&mut self) { if self.remain == 0 { return; @@ -503,8 +512,8 @@ impl Drop for GroupObjectWriter { } } -impl Deref for GroupObjectWriter { - type Target = GroupObject; +impl Deref for SubgroupObjectWriter { + type Target = SubgroupObject; fn deref(&self) -> &Self::Target { &self.info @@ -513,20 +522,20 @@ impl Deref for GroupObjectWriter { /// Notified when a segment has new data available. #[derive(Clone)] -pub struct GroupObjectReader { +pub struct SubgroupObjectReader { // Modify the segment state. - state: State, + state: State, // Immutable segment state. - pub info: Arc, + pub info: Arc, // The number of chunks that we've read. // NOTE: Cloned readers inherit this index, but then run in parallel. index: usize, } -impl GroupObjectReader { - fn new(state: State, object: Arc) -> Self { +impl SubgroupObjectReader { + fn new(state: State, object: Arc) -> Self { Self { state, info: object, @@ -566,8 +575,8 @@ impl GroupObjectReader { } } -impl Deref for GroupObjectReader { - type Target = GroupObject; +impl Deref for SubgroupObjectReader { + type Target = SubgroupObject; fn deref(&self) -> &Self::Target { &self.info From 4b873596e5ca7b2aca6b8379d976a5819441faa0 Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Fri, 25 Oct 2024 18:00:33 +0300 Subject: [PATCH 03/20] fixes to moq-transport, relay compiles --- moq-transport/src/data/header.rs | 5 ++- moq-transport/src/serve/subgroup.rs | 8 +++- moq-transport/src/serve/track.rs | 12 +++--- moq-transport/src/session/subscribe.rs | 11 ++--- moq-transport/src/session/subscribed.rs | 55 +++++++++++++------------ moq-transport/src/session/subscriber.rs | 10 ++--- 6 files changed, 54 insertions(+), 47 deletions(-) diff --git a/moq-transport/src/data/header.rs b/moq-transport/src/data/header.rs index b416489..67e14d4 100644 --- a/moq-transport/src/data/header.rs +++ b/moq-transport/src/data/header.rs @@ -2,7 +2,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use paste::paste; use std::fmt; -use super::{GroupHeader, ObjectHeader, TrackHeader}; +use super::{ObjectHeader, SubgroupHeader, TrackHeader}; // Use a macro to generate the message types rather than copy-paste. // This implements a decode/encode method that uses the specified type. @@ -84,10 +84,11 @@ macro_rules! header_types { } } +// TODO: These types are removed in draft-06. Remove them if they are not needed. // Each object type is prefixed with the given VarInt type. header_types! { Object = 0x0, //Datagram = 0x1, - Group = 0x51, + Subgroup = 0x51, Track = 0x50, } diff --git a/moq-transport/src/serve/subgroup.rs b/moq-transport/src/serve/subgroup.rs index 06613a5..a3fca28 100644 --- a/moq-transport/src/serve/subgroup.rs +++ b/moq-transport/src/serve/subgroup.rs @@ -58,7 +58,8 @@ impl Default for SubgroupsState { pub struct SubgroupsWriter { pub info: Arc, state: State, - next: u64, // Not in the state to avoid a lock + next: u64, // Not in the state to avoid a lock + last_group_id: u64, // Not in the state to avoid a lock } impl SubgroupsWriter { @@ -67,12 +68,14 @@ impl SubgroupsWriter { info: track, state, next: 0, + last_group_id: 0, } } // Helper to increment the group by one. pub fn append(&mut self, priority: u8) -> Result { self.create(Subgroup { + group_id: self.last_group_id, subgroup_id: self.next, priority, }) @@ -98,8 +101,9 @@ impl SubgroupsWriter { } else { state.latest = Some(reader); } - // TODO: group_id should be incremented somewhere + self.next = state.latest.as_ref().unwrap().subgroup_id + 1; + self.last_group_id = state.latest.as_ref().unwrap().group_id; state.epoch += 1; Ok(writer) diff --git a/moq-transport/src/serve/track.rs b/moq-transport/src/serve/track.rs index 4d52982..82e798e 100644 --- a/moq-transport/src/serve/track.rs +++ b/moq-transport/src/serve/track.rs @@ -15,8 +15,8 @@ use crate::watch::State; use super::{ - Datagrams, DatagramsReader, DatagramsWriter, Groups, GroupsReader, GroupsWriter, Objects, ObjectsReader, - ObjectsWriter, ServeError, Stream, StreamReader, StreamWriter, + Datagrams, DatagramsReader, DatagramsWriter, Objects, ObjectsReader, ObjectsWriter, ServeError, Stream, + StreamReader, StreamWriter, Subgroups, SubgroupsReader, SubgroupsWriter, }; use paste::paste; use std::{ops::Deref, sync::Arc}; @@ -82,8 +82,8 @@ impl TrackWriter { Ok(writer) } - pub fn groups(self) -> Result { - let (writer, reader) = Groups { + pub fn groups(self) -> Result { + let (writer, reader) = Subgroups { track: self.info.clone(), } .produce(); @@ -220,7 +220,7 @@ macro_rules! track_readers { } } -track_readers!(Stream, Groups, Objects, Datagrams,); +track_readers!(Stream, Subgroups, Objects, Datagrams,); macro_rules! track_writers { {$($name:ident,)*} => { @@ -246,4 +246,4 @@ macro_rules! track_writers { } } -track_writers!(Track, Stream, Groups, Objects, Datagrams,); +track_writers!(Track, Stream, Subgroups, Objects, Datagrams,); diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 22aaa46..94f05ae 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -161,21 +161,22 @@ impl SubscribeRecv { Ok(stream) } - pub fn group(&mut self, header: data::GroupHeader) -> Result { + pub fn subgroup(&mut self, header: data::SubgroupHeader) -> Result { let writer = self.writer.take().ok_or(ServeError::Done)?; - let mut groups = match writer { + let mut subgroups = match writer { TrackWriterMode::Track(init) => init.groups()?, - TrackWriterMode::Groups(groups) => groups, + TrackWriterMode::Subgroups(subgroups) => subgroups, _ => return Err(ServeError::Mode), }; - let writer = groups.create(serve::Group { + let writer = subgroups.create(serve::Subgroup { group_id: header.group_id, + subgroup_id: header.subgroup_id, priority: header.publisher_priority, })?; - self.writer = Some(groups.into()); + self.writer = Some(subgroups.into()); Ok(writer) } diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index b346b16..2ebf029 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -12,15 +12,15 @@ use super::{Publisher, SessionError, SubscribeInfo, Writer}; #[derive(Debug)] struct SubscribedState { - max: Option<(u64, u64)>, + max_group_id: Option<(u64, u64)>, closed: Result<(), ServeError>, } impl SubscribedState { - fn update_max(&mut self, group_id: u64, object_id: u64) -> Result<(), ServeError> { - if let Some((max_group, max_object)) = self.max { + fn update_max_group_id(&mut self, group_id: u64, object_id: u64) -> Result<(), ServeError> { + if let Some((max_group, max_object)) = self.max_group_id { if group_id >= max_group && object_id >= max_object { - self.max = Some((group_id, object_id)); + self.max_group_id = Some((group_id, object_id)); } } @@ -31,7 +31,7 @@ impl SubscribedState { impl Default for SubscribedState { fn default() -> Self { Self { - max: None, + max_group_id: None, closed: Ok(()), } } @@ -79,7 +79,7 @@ impl Subscribed { async fn serve_inner(&mut self, track: serve::TrackReader) -> Result<(), SessionError> { let latest = track.latest(); - self.state.lock_mut().ok_or(ServeError::Cancel)?.max = latest; + self.state.lock_mut().ok_or(ServeError::Cancel)?.max_group_id = latest; self.publisher.send_message(message::SubscribeOk { id: self.msg.id, @@ -93,7 +93,7 @@ impl Subscribed { match track.mode().await? { // TODO cancel track/datagrams on closed TrackReaderMode::Stream(stream) => self.serve_track(stream).await, - TrackReaderMode::Groups(groups) => self.serve_groups(groups).await, + TrackReaderMode::Subgroups(subgroups) => self.serve_subgroups(subgroups).await, TrackReaderMode::Objects(objects) => self.serve_objects(objects).await, TrackReaderMode::Datagrams(datagrams) => self.serve_datagrams(datagrams).await, } @@ -137,13 +137,13 @@ impl Drop for Subscribed { fn drop(&mut self) { let state = self.state.lock(); let err = state.closed.as_ref().err().cloned().unwrap_or(ServeError::Done); - let max = state.max; + let max_group_id = state.max_group_id; drop(state); // Important to avoid a deadlock if self.ok { self.publisher.send_message(message::SubscribeDone { id: self.msg.id, - last: max, + last: max_group_id, code: err.code(), reason: err.to_string(), }); @@ -190,7 +190,7 @@ impl Subscribed { self.state .lock_mut() .ok_or(ServeError::Done)? - .update_max(object.group_id, object.object_id)?; + .update_max_group_id(object.group_id, object.object_id)?; writer.encode(&header).await?; @@ -208,27 +208,28 @@ impl Subscribed { Ok(()) } - async fn serve_groups(&mut self, mut groups: serve::GroupsReader) -> Result<(), SessionError> { + async fn serve_subgroups(&mut self, mut subgroups: serve::SubgroupsReader) -> Result<(), SessionError> { let mut tasks = FuturesUnordered::new(); let mut done: Option> = None; loop { tokio::select! { - res = groups.next(), if done.is_none() => match res { - Ok(Some(group)) => { - let header = data::GroupHeader { + res = subgroups.next(), if done.is_none() => match res { + Ok(Some(subgroup)) => { + let header = data::SubgroupHeader { subscribe_id: self.msg.id, track_alias: self.msg.track_alias, - group_id: group.group_id, - publisher_priority: group.priority, + group_id: subgroup.group_id, + subgroup_id: subgroup.subgroup_id, + publisher_priority: subgroup.priority, }; let publisher = self.publisher.clone(); let state = self.state.clone(); - let info = group.info.clone(); + let info = subgroup.info.clone(); tasks.push(async move { - if let Err(err) = Self::serve_group(header, group, publisher, state).await { + if let Err(err) = Self::serve_subgroup(header, subgroup, publisher, state).await { log::warn!("failed to serve group: {:?}, error: {}", info, err); } }); @@ -243,16 +244,16 @@ impl Subscribed { } } - async fn serve_group( - header: data::GroupHeader, - mut group: serve::GroupReader, + async fn serve_subgroup( + header: data::SubgroupHeader, + mut subgroup: serve::SubgroupReader, mut publisher: Publisher, state: State, ) -> Result<(), SessionError> { let mut stream = publisher.open_uni().await?; // TODO figure out u32 vs u64 priority - stream.set_priority(group.priority as i32); + stream.set_priority(subgroup.priority as i32); let mut writer = Writer::new(stream); @@ -261,8 +262,8 @@ impl Subscribed { log::trace!("sent group: {:?}", header); - while let Some(mut object) = group.next().await? { - let header = data::GroupObject { + while let Some(mut object) = subgroup.next().await? { + let header = data::SubgroupObject { object_id: object.object_id, size: object.size, status: object.status, @@ -273,7 +274,7 @@ impl Subscribed { state .lock_mut() .ok_or(ServeError::Done)? - .update_max(group.group_id, object.object_id)?; + .update_max_group_id(subgroup.group_id, object.object_id)?; log::trace!("sent group object: {:?}", header); @@ -335,7 +336,7 @@ impl Subscribed { state .lock_mut() .ok_or(ServeError::Done)? - .update_max(object.group_id, object.object_id)?; + .update_max_group_id(object.group_id, object.object_id)?; let mut stream = publisher.open_uni().await?; @@ -380,7 +381,7 @@ impl Subscribed { self.state .lock_mut() .ok_or(ServeError::Done)? - .update_max(datagram.group_id, datagram.object_id)?; + .update_max_group_id(datagram.group_id, datagram.object_id)?; } Ok(()) diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 2833e8a..60f9378 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -180,7 +180,7 @@ impl Subscriber { // This is super silly, but I couldn't figure out a way to avoid the mutex guard across awaits. enum Writer { Track(serve::StreamWriter), - Group(serve::GroupWriter), + Subgroup(serve::SubgroupWriter), Object(serve::ObjectWriter), } @@ -190,14 +190,14 @@ impl Subscriber { match header { data::Header::Track(track) => Writer::Track(subscribe.track(track)?), - data::Header::Group(group) => Writer::Group(subscribe.group(group)?), + data::Header::Subgroup(subgroup) => Writer::Subgroup(subscribe.subgroup(subgroup)?), data::Header::Object(object) => Writer::Object(subscribe.object(object)?), } }; match writer { Writer::Track(track) => Self::recv_track(track, reader).await?, - Writer::Group(group) => Self::recv_group(group, reader).await?, + Writer::Subgroup(group) => Self::recv_subgroup(group, reader).await?, Writer::Object(object) => Self::recv_object(object, reader).await?, }; @@ -234,11 +234,11 @@ impl Subscriber { Ok(()) } - async fn recv_group(mut group: serve::GroupWriter, mut reader: Reader) -> Result<(), SessionError> { + async fn recv_subgroup(mut group: serve::SubgroupWriter, mut reader: Reader) -> Result<(), SessionError> { log::trace!("received group: {:?}", group.info); while !reader.done().await? { - let object: data::GroupObject = reader.decode().await?; + let object: data::SubgroupObject = reader.decode().await?; log::trace!("received group object: {:?}", object); let mut remain = object.size; From 45903f99bcce22d95caab6dc7581e0147e261064 Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Fri, 25 Oct 2024 18:00:59 +0300 Subject: [PATCH 04/20] moq-pub uses subgroups --- moq-pub/src/media.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index 0c401fd..7310b4c 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -1,6 +1,6 @@ use anyhow::{self, Context}; use bytes::{Buf, Bytes}; -use moq_transport::serve::{GroupWriter, GroupsWriter, TrackWriter, TracksWriter}; +use moq_transport::serve::{SubgroupWriter, SubgroupsWriter, TrackWriter, TracksWriter}; use mp4::{self, ReadBox, TrackType}; use std::cmp::max; use std::collections::HashMap; @@ -15,8 +15,8 @@ pub struct Media { broadcast: TracksWriter, // The init and catalog tracks - init: GroupsWriter, - catalog: GroupsWriter, + init: SubgroupsWriter, + catalog: SubgroupsWriter, // The ftyp and moov atoms at the start of the file. ftyp: Option, @@ -297,10 +297,10 @@ fn next_atom(buf: &mut B) -> anyhow::Result> { struct Track { // The track we're producing - track: GroupsWriter, + track: SubgroupsWriter, // The current segment - current: Option, + current: Option, // The number of units per second. timescale: u64, From 73ba1bbe0dbb7748e0a1966087adbd31d90e5e8c Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Mon, 28 Oct 2024 19:16:15 +0300 Subject: [PATCH 05/20] more fixes --- moq-pub/src/main.rs | 5 ++-- moq-pub/src/media.rs | 5 ++++ moq-transport/src/serve/subgroup.rs | 41 ++++++++++++++++++++++++----- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/moq-pub/src/main.rs b/moq-pub/src/main.rs index b3e6613..0f910cd 100644 --- a/moq-pub/src/main.rs +++ b/moq-pub/src/main.rs @@ -70,7 +70,9 @@ async fn main() -> anyhow::Result<()> { tokio::select! { res = session.run() => res.context("session error")?, - res = run_media(media) => res.context("media error")?, + res = run_media(media) => { + res.context("media error")? + }, res = publisher.announce(reader) => res.context("publisher error")?, } @@ -80,7 +82,6 @@ async fn main() -> anyhow::Result<()> { async fn run_media(mut media: Media) -> anyhow::Result<()> { let mut input = tokio::io::stdin(); let mut buf = BytesMut::new(); - loop { input.read_buf(&mut buf).await.context("failed to read from stdin")?; media.parse(&mut buf).context("failed to parse media")?; diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index 7310b4c..5992e60 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -341,6 +341,11 @@ impl Track { // Create a new segment. let mut segment = self.track.append(priority)?; + println!( + "timestamp: {:?} segment: {:?}:{:?} priority: {:?}", + fragment.timestamp, segment.info.group_id, segment.info.subgroup_id, priority + ); + // Write the fragment in it's own object. segment.write(raw)?; diff --git a/moq-transport/src/serve/subgroup.rs b/moq-transport/src/serve/subgroup.rs index a3fca28..a51a9af 100644 --- a/moq-transport/src/serve/subgroup.rs +++ b/moq-transport/src/serve/subgroup.rs @@ -59,6 +59,7 @@ pub struct SubgroupsWriter { pub info: Arc, state: State, next: u64, // Not in the state to avoid a lock + next_group_id: u64, // Not in the state to avoid a lock last_group_id: u64, // Not in the state to avoid a lock } @@ -68,15 +69,35 @@ impl SubgroupsWriter { info: track, state, next: 0, + next_group_id: 0, last_group_id: 0, } } // Helper to increment the group by one. pub fn append(&mut self, priority: u8) -> Result { + let group_id; + let subgroup_id; + + // TODO: refactor here... For now, every subgroup is mapped to a new group... + let start_new_group = true; + + if start_new_group { + group_id = self.next_group_id; + subgroup_id = 0; + } else { + group_id = self.last_group_id; + subgroup_id = self.next; + } + + println!( + "SubgroupsWriter::append group_id: {}, subgroup_id: {}", + group_id, subgroup_id + ); + self.create(Subgroup { - group_id: self.last_group_id, - subgroup_id: self.next, + group_id, + subgroup_id, priority, }) } @@ -93,16 +114,24 @@ impl SubgroupsWriter { let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?; if let Some(latest) = &state.latest { - match writer.group_id.cmp(&latest.group_id) { - cmp::Ordering::Less => return Ok(writer), // dropped immediately, lul - cmp::Ordering::Equal => return Err(ServeError::Duplicate), - cmp::Ordering::Greater => state.latest = Some(reader), + // TODO: Check this logic again + if writer.group_id.cmp(&latest.group_id) == cmp::Ordering::Equal { + match writer.subgroup_id.cmp(&latest.subgroup_id) { + cmp::Ordering::Less => return Ok(writer), // dropped immediately, lul + cmp::Ordering::Equal => return Err(ServeError::Duplicate), + cmp::Ordering::Greater => state.latest = Some(reader), + } + } else if writer.group_id.cmp(&latest.group_id) == cmp::Ordering::Greater { + state.latest = Some(reader); + } else { + return Ok(writer); // drop here as well } } else { state.latest = Some(reader); } self.next = state.latest.as_ref().unwrap().subgroup_id + 1; + self.next_group_id = state.latest.as_ref().unwrap().group_id + 1; self.last_group_id = state.latest.as_ref().unwrap().group_id; state.epoch += 1; From 2a264cfbac918f7caba4ff414bc694cf33c209d6 Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Mon, 28 Oct 2024 19:23:24 +0300 Subject: [PATCH 06/20] remove comment --- moq-transport/src/serve/subgroup.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/moq-transport/src/serve/subgroup.rs b/moq-transport/src/serve/subgroup.rs index a51a9af..9b0b3e9 100644 --- a/moq-transport/src/serve/subgroup.rs +++ b/moq-transport/src/serve/subgroup.rs @@ -90,11 +90,6 @@ impl SubgroupsWriter { subgroup_id = self.next; } - println!( - "SubgroupsWriter::append group_id: {}, subgroup_id: {}", - group_id, subgroup_id - ); - self.create(Subgroup { group_id, subgroup_id, From cd07d6cf6bc1943bea55280fabb1de565312a9be Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 29 Oct 2024 17:06:38 -0400 Subject: [PATCH 07/20] moq-sub: s/group/subgroup/g This may not be what we really want yet, but it seems to match what was done on the moq-pub side and it builds, so that's a start... --- moq-sub/src/media.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/moq-sub/src/media.rs b/moq-sub/src/media.rs index 2bcf802..c47364b 100644 --- a/moq-sub/src/media.rs +++ b/moq-sub/src/media.rs @@ -3,7 +3,7 @@ use std::{io::Cursor, sync::Arc}; use anyhow::Context; use log::{debug, info, trace, warn}; use moq_transport::serve::{ - GroupObjectReader, GroupReader, TrackReader, TrackReaderMode, Tracks, TracksReader, TracksWriter, + SubgroupObjectReader, SubgroupReader, TrackReader, TrackReaderMode, Tracks, TracksReader, TracksWriter, }; use moq_transport::session::Subscriber; use mp4::ReadBox; @@ -49,7 +49,7 @@ impl Media { let track = self.broadcast.subscribe(init_track_name).context("no init track")?; let mut group = match track.mode().await? { - TrackReaderMode::Groups(mut groups) => groups.next().await?.context("no init group")?, + TrackReaderMode::Subgroups(mut groups) => groups.next().await?.context("no init group")?, _ => anyhow::bail!("expected init segment"), }; @@ -119,7 +119,7 @@ impl Media { async fn recv_track(track: TrackReader, out: Arc>) -> anyhow::Result<()> { let name = track.name.clone(); debug!("track {name}: start"); - if let TrackReaderMode::Groups(mut groups) = track.mode().await? { + if let TrackReaderMode::Subgroups(mut groups) = track.mode().await? { while let Some(group) = groups.next().await? { let out = out.clone(); if let Err(err) = Self::recv_group(group, out).await { @@ -131,7 +131,7 @@ impl Media { Ok(()) } - async fn recv_group(mut group: GroupReader, out: Arc>) -> anyhow::Result<()> { + async fn recv_group(mut group: SubgroupReader, out: Arc>) -> anyhow::Result<()> { trace!("group={} start", group.group_id); while let Some(object) = group.next().await? { trace!("group={} fragment={} start", group.group_id, object.object_id); @@ -144,7 +144,7 @@ impl Media { Ok(()) } - async fn recv_object(mut object: GroupObjectReader) -> anyhow::Result> { + async fn recv_object(mut object: SubgroupObjectReader) -> anyhow::Result> { let mut buf = Vec::with_capacity(object.size); while let Some(chunk) = object.read().await? { buf.extend_from_slice(&chunk); From b397458949d7f82a5003124196fa47d5cde4dae7 Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 29 Oct 2024 17:06:38 -0400 Subject: [PATCH 08/20] moq-dir: s/group/subgroup/g This may not be what we really want yet, but it seems to match what was done on the moq-pub side and it builds, so that's a start... --- moq-dir/src/listing.rs | 86 +++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/moq-dir/src/listing.rs b/moq-dir/src/listing.rs index 6c5e031..56c9931 100644 --- a/moq-dir/src/listing.rs +++ b/moq-dir/src/listing.rs @@ -3,13 +3,13 @@ use bytes::BytesMut; use std::collections::{HashSet, VecDeque}; use moq_transport::serve::{ - GroupReader, GroupWriter, GroupsReader, GroupsWriter, ServeError, TrackReader, TrackReaderMode, TrackWriter, + SubgroupReader, SubgroupWriter, SubgroupsReader, SubgroupsWriter, ServeError, TrackReader, TrackReaderMode, TrackWriter, }; pub struct ListingWriter { track: Option, - groups: Option, - group: Option, + subgroups: Option, + subgroup: Option, current: HashSet, } @@ -18,8 +18,8 @@ impl ListingWriter { pub fn new(track: TrackWriter) -> Self { Self { track: Some(track), - groups: None, - group: None, + subgroups: None, + subgroup: None, current: HashSet::new(), } } @@ -29,14 +29,14 @@ impl ListingWriter { return Err(ServeError::Duplicate); } - match self.group { - // Create a delta if the current group is small enough. - Some(ref mut group) if self.current.len() < 2 * group.len() => { + match self.subgroup { + // Create a delta if the current subgroup is small enough. + Some(ref mut subgroup) if self.current.len() < 2 * subgroup.len() => { let msg = format!("+{}", name); - group.write(msg.into())?; + subgroup.write(msg.into())?; } // Otherwise create a snapshot with every element. - _ => self.group = Some(self.snapshot()?), + _ => self.subgroup = Some(self.snapshot()?), } Ok(()) @@ -47,27 +47,27 @@ impl ListingWriter { return Err(ServeError::NotFound); } - match self.group { - // Create a delta if the current group is small enough. - Some(ref mut group) if self.current.len() < 2 * group.len() => { + match self.subgroup { + // Create a delta if the current subgroup is small enough. + Some(ref mut subgroup) if self.current.len() < 2 * subgroup.len() => { let msg = format!("-{}", name); - group.write(msg.into())?; + subgroup.write(msg.into())?; } // Otherwise create a snapshot with every element. - _ => self.group = Some(self.snapshot()?), + _ => self.subgroup = Some(self.snapshot()?), } Ok(()) } - fn snapshot(&mut self) -> Result { - let mut groups = match self.groups.take() { - Some(groups) => groups, - None => self.track.take().unwrap().groups()?, + fn snapshot(&mut self) -> Result { + let mut subgroups = match self.subgroups.take() { + Some(subgroups) => subgroups, + None => self.track.take().unwrap().subgroups()?, }; let priority = 127; - let mut group = groups.append(priority)?; + let mut subgroup = subgroups.append(priority)?; let mut msg = BytesMut::new(); for name in &self.current { @@ -75,10 +75,10 @@ impl ListingWriter { msg.extend_from_slice(b"\n"); } - group.write(msg.freeze())?; - self.groups = Some(groups); + subgroup.write(msg.freeze())?; + self.subgroups = Some(subgroups); - Ok(group) + Ok(subgroup) } pub fn len(&self) -> usize { @@ -100,9 +100,9 @@ pub enum ListingDelta { pub struct ListingReader { track: TrackReader, - // Keep track of the current group. - groups: Option, - group: Option, + // Keep track of the current subgroup. + subgroups: Option, + subgroup: Option, // The current state of the listing. current: HashSet, @@ -115,8 +115,8 @@ impl ListingReader { pub fn new(track: TrackReader) -> Self { Self { track, - groups: None, - group: None, + subgroups: None, + subgroup: None, current: HashSet::new(), deltas: VecDeque::new(), @@ -128,42 +128,42 @@ impl ListingReader { return Ok(Some(delta)); } - if self.groups.is_none() { - self.groups = match self.track.mode().await? { - TrackReaderMode::Groups(groups) => Some(groups), - _ => anyhow::bail!("expected groups mode"), + if self.subgroups.is_none() { + self.subgroups = match self.track.mode().await? { + TrackReaderMode::Subgroups(subgroups) => Some(subgroups), + _ => anyhow::bail!("expected subgroups mode"), }; }; - if self.group.is_none() { - self.group = Some(self.groups.as_mut().unwrap().next().await?.context("empty track")?); + if self.subgroup.is_none() { + self.subgroup = Some(self.subgroups.as_mut().unwrap().next().await?.context("empty track")?); } - let mut group_done = false; - let mut groups_done = false; + let mut subgroup_done = false; + let mut subgroups_done = false; loop { tokio::select! { - next = self.groups.as_mut().unwrap().next(), if !groups_done => { + next = self.subgroups.as_mut().unwrap().next(), if !subgroups_done => { if let Some(next) = next? { - self.group = Some(next); - group_done = false; + self.subgroup = Some(next); + subgroup_done = false; } else { - groups_done = true; + subgroups_done = true; } }, - object = self.group.as_mut().unwrap().read_next(), if !group_done => { + object = self.subgroup.as_mut().unwrap().read_next(), if !subgroup_done => { let payload = match object? { Some(object) => object, None => { - group_done = true; + subgroup_done = true; continue; } }; if payload.is_empty() { anyhow::bail!("empty payload"); - } else if self.group.as_mut().unwrap().pos() == 1 { + } else if self.subgroup.as_mut().unwrap().pos() == 1 { // This is a full snapshot, not a delta let set = HashSet::from_iter(payload.split(|&b| b == b'\n').map(|s| String::from_utf8_lossy(s).to_string())); From 0ea1f5bf8c760cf3ba9b88f972b81215b06ab472 Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 29 Oct 2024 17:06:38 -0400 Subject: [PATCH 09/20] moq-clock-ietf: s/group/subgroup/g This may not be what we really want yet, but it seems to match what was done on the moq-pub side and it builds, so that's a start... --- moq-clock-ietf/src/clock.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/moq-clock-ietf/src/clock.rs b/moq-clock-ietf/src/clock.rs index fec26c1..26a46fe 100644 --- a/moq-clock-ietf/src/clock.rs +++ b/moq-clock-ietf/src/clock.rs @@ -1,17 +1,17 @@ use anyhow::Context; use moq_transport::serve::{ - DatagramsReader, Group, GroupWriter, GroupsReader, GroupsWriter, ObjectsReader, StreamReader, TrackReader, + DatagramsReader, Subgroup, SubgroupWriter, SubgroupsReader, SubgroupsWriter, ObjectsReader, StreamReader, TrackReader, TrackReaderMode, }; use chrono::prelude::*; pub struct Publisher { - track: GroupsWriter, + track: SubgroupsWriter, } impl Publisher { - pub fn new(track: GroupsWriter) -> Self { + pub fn new(track: SubgroupsWriter) -> Self { Self { track } } @@ -25,8 +25,9 @@ impl Publisher { loop { let segment = self .track - .create(Group { + .create(Subgroup { group_id: sequence as u64, + subgroup_id: 0, priority: 0, }) .context("failed to create minute segment")?; @@ -49,7 +50,7 @@ impl Publisher { } } - async fn send_segment(mut segment: GroupWriter, mut now: DateTime) -> anyhow::Result<()> { + async fn send_segment(mut segment: SubgroupWriter, mut now: DateTime) -> anyhow::Result<()> { // Everything but the second. let base = now.format("%Y-%m-%d %H:%M:").to_string(); @@ -89,15 +90,15 @@ impl Subscriber { pub async fn run(self) -> anyhow::Result<()> { match self.track.mode().await.context("failed to get mode")? { TrackReaderMode::Stream(stream) => Self::recv_stream(stream).await, - TrackReaderMode::Groups(groups) => Self::recv_groups(groups).await, + TrackReaderMode::Subgroups(subgroups) => Self::recv_subgroups(subgroups).await, TrackReaderMode::Objects(objects) => Self::recv_objects(objects).await, TrackReaderMode::Datagrams(datagrams) => Self::recv_datagrams(datagrams).await, } } async fn recv_stream(mut track: StreamReader) -> anyhow::Result<()> { - while let Some(mut group) = track.next().await? { - while let Some(object) = group.read_next().await? { + while let Some(mut subgroup) = track.next().await? { + while let Some(object) = subgroup.read_next().await? { let str = String::from_utf8_lossy(&object); println!("{}", str); } @@ -106,17 +107,17 @@ impl Subscriber { Ok(()) } - async fn recv_groups(mut groups: GroupsReader) -> anyhow::Result<()> { - while let Some(mut group) = groups.next().await? { - let base = group + async fn recv_subgroups(mut subgroups: SubgroupsReader) -> anyhow::Result<()> { + while let Some(mut subgroup) = subgroups.next().await? { + let base = subgroup .read_next() .await .context("failed to get first object")? - .context("empty group")?; + .context("empty subgroup")?; let base = String::from_utf8_lossy(&base); - while let Some(object) = group.read_next().await? { + while let Some(object) = subgroup.read_next().await? { let str = String::from_utf8_lossy(&object); println!("{}{}", base, str); } From c3d5de3866d0ccf631c31680c081e00cf23de80b Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 29 Oct 2024 17:37:03 -0400 Subject: [PATCH 10/20] moq-dir: cargo fmt --- moq-dir/src/listing.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/moq-dir/src/listing.rs b/moq-dir/src/listing.rs index 56c9931..2a298cb 100644 --- a/moq-dir/src/listing.rs +++ b/moq-dir/src/listing.rs @@ -3,7 +3,8 @@ use bytes::BytesMut; use std::collections::{HashSet, VecDeque}; use moq_transport::serve::{ - SubgroupReader, SubgroupWriter, SubgroupsReader, SubgroupsWriter, ServeError, TrackReader, TrackReaderMode, TrackWriter, + ServeError, SubgroupReader, SubgroupWriter, SubgroupsReader, SubgroupsWriter, TrackReader, TrackReaderMode, + TrackWriter, }; pub struct ListingWriter { From 5a3e4ff167fa62cb2f157db9c589b726d2d6d9c6 Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 29 Oct 2024 17:37:18 -0400 Subject: [PATCH 11/20] moq-clock-ietf: cargo fmt --- moq-clock-ietf/src/clock.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/moq-clock-ietf/src/clock.rs b/moq-clock-ietf/src/clock.rs index 26a46fe..dcb0520 100644 --- a/moq-clock-ietf/src/clock.rs +++ b/moq-clock-ietf/src/clock.rs @@ -1,7 +1,7 @@ use anyhow::Context; use moq_transport::serve::{ - DatagramsReader, Subgroup, SubgroupWriter, SubgroupsReader, SubgroupsWriter, ObjectsReader, StreamReader, TrackReader, - TrackReaderMode, + DatagramsReader, ObjectsReader, StreamReader, Subgroup, SubgroupWriter, SubgroupsReader, SubgroupsWriter, + TrackReader, TrackReaderMode, }; use chrono::prelude::*; From a3b7bdd297fb80adf1902dcc29ad944cdff2cff8 Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 29 Oct 2024 17:39:53 -0400 Subject: [PATCH 12/20] moq-dir: Fix one place that still uses groups? --- moq-dir/src/listing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moq-dir/src/listing.rs b/moq-dir/src/listing.rs index 2a298cb..411462a 100644 --- a/moq-dir/src/listing.rs +++ b/moq-dir/src/listing.rs @@ -64,7 +64,7 @@ impl ListingWriter { fn snapshot(&mut self) -> Result { let mut subgroups = match self.subgroups.take() { Some(subgroups) => subgroups, - None => self.track.take().unwrap().subgroups()?, + None => self.track.take().unwrap().groups()?, }; let priority = 127; From da212318e8663f49a1772d1ef0ae00c5d0984eed Mon Sep 17 00:00:00 2001 From: Mike English Date: Fri, 1 Nov 2024 15:18:04 -0400 Subject: [PATCH 13/20] moq-transport: Remove object/stream (gone in -06) In draft-06 and going forward we don't have separate Group per Stream and Object per Stream modes, but can control the number of objects which are put on each stream using SubGroups We may want to come back around and provide convenience APIs for using SubGroups in each of these ways in the future. --- moq-transport/src/data/header.rs | 7 ++- moq-transport/src/serve/track.rs | 18 ++----- moq-transport/src/session/subscribe.rs | 20 ------- moq-transport/src/session/subscribed.rs | 72 ------------------------- moq-transport/src/session/subscriber.rs | 14 ----- 5 files changed, 7 insertions(+), 124 deletions(-) diff --git a/moq-transport/src/data/header.rs b/moq-transport/src/data/header.rs index 67e14d4..079b2e5 100644 --- a/moq-transport/src/data/header.rs +++ b/moq-transport/src/data/header.rs @@ -2,7 +2,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use paste::paste; use std::fmt; -use super::{ObjectHeader, SubgroupHeader, TrackHeader}; +use super::{SubgroupHeader, TrackHeader}; // Use a macro to generate the message types rather than copy-paste. // This implements a decode/encode method that uses the specified type. @@ -84,10 +84,9 @@ macro_rules! header_types { } } -// TODO: These types are removed in draft-06. Remove them if they are not needed. -// Each object type is prefixed with the given VarInt type. +// Each stream type is prefixed with the given VarInt type. +// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#section-7 header_types! { - Object = 0x0, //Datagram = 0x1, Subgroup = 0x51, Track = 0x50, diff --git a/moq-transport/src/serve/track.rs b/moq-transport/src/serve/track.rs index 82e798e..28afc55 100644 --- a/moq-transport/src/serve/track.rs +++ b/moq-transport/src/serve/track.rs @@ -15,8 +15,8 @@ use crate::watch::State; use super::{ - Datagrams, DatagramsReader, DatagramsWriter, Objects, ObjectsReader, ObjectsWriter, ServeError, Stream, - StreamReader, StreamWriter, Subgroups, SubgroupsReader, SubgroupsWriter, + Datagrams, DatagramsReader, DatagramsWriter, ObjectsWriter, ServeError, Stream, StreamReader, StreamWriter, + Subgroups, SubgroupsReader, SubgroupsWriter, }; use paste::paste; use std::{ops::Deref, sync::Arc}; @@ -82,6 +82,7 @@ impl TrackWriter { Ok(writer) } + // TODO: rework this whole interface for clarity? pub fn groups(self) -> Result { let (writer, reader) = Subgroups { track: self.info.clone(), @@ -93,17 +94,6 @@ impl TrackWriter { Ok(writer) } - pub fn objects(self) -> Result { - let (writer, reader) = Objects { - track: self.info.clone(), - } - .produce(); - - let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?; - state.mode = Some(reader.into()); - Ok(writer) - } - pub fn datagrams(self) -> Result { let (writer, reader) = Datagrams { track: self.info.clone(), @@ -220,7 +210,7 @@ macro_rules! track_readers { } } -track_readers!(Stream, Subgroups, Objects, Datagrams,); +track_readers!(Stream, Subgroups, Datagrams,); macro_rules! track_writers { {$($name:ident,)*} => { diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 94f05ae..1097a39 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -181,26 +181,6 @@ impl SubscribeRecv { Ok(writer) } - pub fn object(&mut self, header: data::ObjectHeader) -> Result { - let writer = self.writer.take().ok_or(ServeError::Done)?; - - let mut objects = match writer { - TrackWriterMode::Track(init) => init.objects()?, - TrackWriterMode::Objects(objects) => objects, - _ => return Err(ServeError::Mode), - }; - - let writer = objects.create(serve::Object { - group_id: header.group_id, - object_id: header.object_id, - priority: header.publisher_priority, - })?; - - self.writer = Some(objects.into()); - - Ok(writer) - } - pub fn datagram(&mut self, datagram: data::Datagram) -> Result<(), ServeError> { let writer = self.writer.take().ok_or(ServeError::Done)?; diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 2ebf029..fcdeb59 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -94,7 +94,6 @@ impl Subscribed { // TODO cancel track/datagrams on closed TrackReaderMode::Stream(stream) => self.serve_track(stream).await, TrackReaderMode::Subgroups(subgroups) => self.serve_subgroups(subgroups).await, - TrackReaderMode::Objects(objects) => self.serve_objects(objects).await, TrackReaderMode::Datagrams(datagrams) => self.serve_datagrams(datagrams).await, } } @@ -289,77 +288,6 @@ impl Subscribed { Ok(()) } - pub async fn serve_objects(&mut self, mut objects: serve::ObjectsReader) -> Result<(), SessionError> { - let mut tasks = FuturesUnordered::new(); - let mut done = None; - - loop { - tokio::select! { - res = objects.next(), if done.is_none() => match res { - Ok(Some(object)) => { - let header = data::ObjectHeader { - subscribe_id: self.msg.id, - track_alias: self.msg.track_alias, - group_id: object.group_id, - object_id: object.object_id, - publisher_priority: object.priority, - object_status: object.status, - - }; - - let publisher = self.publisher.clone(); - let state = self.state.clone(); - let info = object.info.clone(); - - tasks.push(async move { - if let Err(err) = Self::serve_object(header, object, publisher, state).await { - log::warn!("failed to serve object: {:?}, error: {}", info, err); - }; - }); - }, - Ok(None) => done = Some(Ok(())), - Err(err) => done = Some(Err(err)), - }, - _ = tasks.next(), if !tasks.is_empty() => {}, - res = self.closed(), if done.is_none() => done = Some(res), - else => return Ok(done.unwrap()?), - } - } - } - - async fn serve_object( - header: data::ObjectHeader, - mut object: serve::ObjectReader, - mut publisher: Publisher, - state: State, - ) -> Result<(), SessionError> { - state - .lock_mut() - .ok_or(ServeError::Done)? - .update_max_group_id(object.group_id, object.object_id)?; - - let mut stream = publisher.open_uni().await?; - - // TODO figure out u32 vs u64 priority - stream.set_priority(object.priority as i32); - - let mut writer = Writer::new(stream); - - let header: data::Header = header.into(); - writer.encode(&header).await?; - - log::trace!("sent object: {:?}", header); - - while let Some(chunk) = object.read().await? { - writer.write(&chunk).await?; - log::trace!("sent object payload: {:?}", chunk.len()); - } - - log::trace!("sent object done"); - - Ok(()) - } - async fn serve_datagrams(&mut self, mut datagrams: serve::DatagramsReader) -> Result<(), SessionError> { while let Some(datagram) = datagrams.read().await? { let datagram = data::Datagram { diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 60f9378..76f93ab 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -181,7 +181,6 @@ impl Subscriber { enum Writer { Track(serve::StreamWriter), Subgroup(serve::SubgroupWriter), - Object(serve::ObjectWriter), } let writer = { @@ -191,14 +190,12 @@ impl Subscriber { match header { data::Header::Track(track) => Writer::Track(subscribe.track(track)?), data::Header::Subgroup(subgroup) => Writer::Subgroup(subscribe.subgroup(subgroup)?), - data::Header::Object(object) => Writer::Object(subscribe.object(object)?), } }; match writer { Writer::Track(track) => Self::recv_track(track, reader).await?, Writer::Subgroup(group) => Self::recv_subgroup(group, reader).await?, - Writer::Object(object) => Self::recv_object(object, reader).await?, }; Ok(()) @@ -255,17 +252,6 @@ impl Subscriber { Ok(()) } - async fn recv_object(mut object: serve::ObjectWriter, mut reader: Reader) -> Result<(), SessionError> { - log::trace!("received object: {:?}", object.info); - - while let Some(data) = reader.read_chunk(usize::MAX).await? { - log::trace!("received object payload: {:?}", data.len()); - object.write(data)?; - } - - Ok(()) - } - pub fn recv_datagram(&mut self, datagram: bytes::Bytes) -> Result<(), SessionError> { let mut cursor = io::Cursor::new(datagram); let datagram = data::Datagram::decode(&mut cursor)?; From 11c8e8bd3b38427af08437829943ee286ebf30ad Mon Sep 17 00:00:00 2001 From: Mike English Date: Fri, 1 Nov 2024 15:18:04 -0400 Subject: [PATCH 14/20] moq-clock-ietf: Remove object/stream (gone in -06) In draft-06 and going forward we don't have separate Group per Stream and Object per Stream modes, but can control the number of objects which are put on each stream using SubGroups Removed from moq-transport, so skipping over it here, too. --- moq-clock-ietf/src/clock.rs | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/moq-clock-ietf/src/clock.rs b/moq-clock-ietf/src/clock.rs index dcb0520..a4dadd6 100644 --- a/moq-clock-ietf/src/clock.rs +++ b/moq-clock-ietf/src/clock.rs @@ -1,7 +1,7 @@ use anyhow::Context; use moq_transport::serve::{ - DatagramsReader, ObjectsReader, StreamReader, Subgroup, SubgroupWriter, SubgroupsReader, SubgroupsWriter, - TrackReader, TrackReaderMode, + DatagramsReader, StreamReader, Subgroup, SubgroupWriter, SubgroupsReader, SubgroupsWriter, TrackReader, + TrackReaderMode, }; use chrono::prelude::*; @@ -91,7 +91,6 @@ impl Subscriber { match self.track.mode().await.context("failed to get mode")? { TrackReaderMode::Stream(stream) => Self::recv_stream(stream).await, TrackReaderMode::Subgroups(subgroups) => Self::recv_subgroups(subgroups).await, - TrackReaderMode::Objects(objects) => Self::recv_objects(objects).await, TrackReaderMode::Datagrams(datagrams) => Self::recv_datagrams(datagrams).await, } } @@ -126,16 +125,6 @@ impl Subscriber { Ok(()) } - async fn recv_objects(mut objects: ObjectsReader) -> anyhow::Result<()> { - while let Some(mut object) = objects.next().await? { - let payload = object.read_all().await?; - let str = String::from_utf8_lossy(&payload); - println!("{}", str); - } - - Ok(()) - } - async fn recv_datagrams(mut datagrams: DatagramsReader) -> anyhow::Result<()> { while let Some(datagram) = datagrams.read().await? { let str = String::from_utf8_lossy(&datagram.payload); From c93f0942d0ac65b0ab7bcd0a8b09237061e2e7e4 Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Fri, 1 Nov 2024 23:57:30 +0300 Subject: [PATCH 15/20] moq-transport: Add new error type This commit adds the new error type "Too Many Subscribes" to moq-transport/src/error.rs. However, this file is commented out and the current change doesn't have an effect yet. --- moq-transport/src/error.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/moq-transport/src/error.rs b/moq-transport/src/error.rs index 279eaf3..a3dc47d 100644 --- a/moq-transport/src/error.rs +++ b/moq-transport/src/error.rs @@ -21,6 +21,9 @@ pub enum SessionError { #[error("parameter length mismatch")] ParameterLengthMismatch, + #[error("too many subscribes")] + TooManySubscribes, + #[error("goaway timeout")] GoawayTimeout, @@ -40,6 +43,7 @@ impl MoqError for SessionError { Self::ProtocolViolation => 0x3, Self::DuplicateTrackAlias => 0x4, Self::ParameterLengthMismatch => 0x5, + Self::TooManySubscribes => 0x6, Self::GoawayTimeout => 0x10, Self::Unknown(code) => *code, // Unofficial error codes From 7981390b394abbc412e6982de9ac401a670d9852 Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Sat, 2 Nov 2024 20:11:02 +0300 Subject: [PATCH 16/20] moq-transport: first stab at subscribe namespace messages This commit includes the changes for adding subcribe namespace messages. The track namespace tuple format is not yet supported. Also, the methods for handling these messages are not implemented yet. --- moq-transport/src/error.rs | 3 ++ moq-transport/src/message/mod.rs | 14 +++++++ .../src/message/subscribe_namespace.rs | 34 ++++++++++++++++ .../src/message/subscribe_namespace_error.rs | 40 +++++++++++++++++++ .../src/message/subscribe_namespace_ok.rs | 23 +++++++++++ moq-transport/src/message/subscriber.rs | 4 ++ .../src/message/unsubscribe_namespace.rs | 24 +++++++++++ moq-transport/src/session/publisher.rs | 5 +++ 8 files changed, 147 insertions(+) create mode 100644 moq-transport/src/message/subscribe_namespace.rs create mode 100644 moq-transport/src/message/subscribe_namespace_error.rs create mode 100644 moq-transport/src/message/subscribe_namespace_ok.rs create mode 100644 moq-transport/src/message/unsubscribe_namespace.rs diff --git a/moq-transport/src/error.rs b/moq-transport/src/error.rs index a3dc47d..067a17b 100644 --- a/moq-transport/src/error.rs +++ b/moq-transport/src/error.rs @@ -27,6 +27,9 @@ pub enum SessionError { #[error("goaway timeout")] GoawayTimeout, + + + #[error("unknown error: code={0}")] Unknown(u64), // Unofficial error codes diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 405ba21..45d9516 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -43,6 +43,9 @@ mod publisher; mod subscribe; mod subscribe_done; mod subscribe_error; +mod subscribe_namespace; +mod subscribe_namespace_error; +mod subscribe_namespace_ok; mod subscribe_ok; mod subscribe_update; mod subscriber; @@ -50,6 +53,7 @@ mod track_status; mod track_status_request; mod unannounce; mod unsubscribe; +mod unsubscribe_namespace; pub use announce::*; pub use announce_cancel::*; @@ -62,6 +66,9 @@ pub use publisher::*; pub use subscribe::*; pub use subscribe_done::*; pub use subscribe_error::*; +pub use subscribe_namespace::*; +pub use subscribe_namespace_error::*; +pub use subscribe_namespace_ok::*; pub use subscribe_ok::*; pub use subscribe_update::*; pub use subscriber::*; @@ -69,6 +76,7 @@ pub use track_status::*; pub use track_status_request::*; pub use unannounce::*; pub use unsubscribe::*; +pub use unsubscribe_namespace::*; use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use std::fmt; @@ -178,6 +186,12 @@ message_types! { // Misc GoAway = 0x10, + + // NAMESPACE family, sent by subscriber + SubscribeNamespace = 0x11, + SubscribeNamespaceOk = 0x12, + SubscribeNamespaceError = 0x13, + UnsubscribeNamespace = 0x14, } /// Track Status Codes diff --git a/moq-transport/src/message/subscribe_namespace.rs b/moq-transport/src/message/subscribe_namespace.rs new file mode 100644 index 0000000..1203068 --- /dev/null +++ b/moq-transport/src/message/subscribe_namespace.rs @@ -0,0 +1,34 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; + +/// Subscribe Namespace +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#section-6.11 +#[derive(Clone, Debug)] +pub struct SubscribeNamespace { + /// The track namespace + // TODO: convert this to tuple + pub namespace_prefix: String, + + /// Optional parameters + pub params: Params, +} + +impl Decode for SubscribeNamespace { + fn decode(r: &mut R) -> Result { + let namespace_prefix = String::decode(r)?; + let params = Params::decode(r)?; + + Ok(Self { + namespace_prefix, + params, + }) + } +} + +impl Encode for SubscribeNamespace { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.namespace_prefix.encode(w)?; + self.params.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_namespace_error.rs b/moq-transport/src/message/subscribe_namespace_error.rs new file mode 100644 index 0000000..2f6ce98 --- /dev/null +++ b/moq-transport/src/message/subscribe_namespace_error.rs @@ -0,0 +1,40 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +/// Subscribe Namespace Error +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#name-subscribe_namespace_error +#[derive(Clone, Debug)] +pub struct SubscribeNamespaceError { + // Echo back the namespace that was reset + // TODO: convert this to tuple + pub namespace_prefix: String, + + // An error code. + pub code: u64, + + // An optional, human-readable reason. + pub reason: String, +} + +impl Decode for SubscribeNamespaceError { + fn decode(r: &mut R) -> Result { + let namespace_prefix = String::decode(r)?; + let code = u64::decode(r)?; + let reason = String::decode(r)?; + + Ok(Self { + namespace_prefix, + code, + reason, + }) + } +} + +impl Encode for SubscribeNamespaceError { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.namespace_prefix.encode(w)?; + self.code.encode(w)?; + self.reason.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_namespace_ok.rs b/moq-transport/src/message/subscribe_namespace_ok.rs new file mode 100644 index 0000000..8e946c8 --- /dev/null +++ b/moq-transport/src/message/subscribe_namespace_ok.rs @@ -0,0 +1,23 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +/// Subscribe Namespace Ok +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#name-subscribe_namespace_ok +#[derive(Clone, Debug)] +pub struct SubscribeNamespaceOk { + // Echo back the namespace that was announced. + // TODO: convert this to tuple + pub namespace_prefix: String, +} + +impl Decode for SubscribeNamespaceOk { + fn decode(r: &mut R) -> Result { + let namespace_prefix = String::decode(r)?; + Ok(Self { namespace_prefix }) + } +} + +impl Encode for SubscribeNamespaceOk { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.namespace_prefix.encode(w) + } +} diff --git a/moq-transport/src/message/subscriber.rs b/moq-transport/src/message/subscriber.rs index 5b699aa..8047312 100644 --- a/moq-transport/src/message/subscriber.rs +++ b/moq-transport/src/message/subscriber.rs @@ -52,4 +52,8 @@ subscriber_msgs! { Unsubscribe, SubscribeUpdate, TrackStatusRequest, + SubscribeNamespace, + SubscribeNamespaceOk, + SubscribeNamespaceError, + UnsubscribeNamespace, } diff --git a/moq-transport/src/message/unsubscribe_namespace.rs b/moq-transport/src/message/unsubscribe_namespace.rs new file mode 100644 index 0000000..8ba15fa --- /dev/null +++ b/moq-transport/src/message/unsubscribe_namespace.rs @@ -0,0 +1,24 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +/// Unsubscribe Namespace +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#name-unsubscribe_namespace +#[derive(Clone, Debug)] +pub struct UnsubscribeNamespace { + // Echo back the namespace that was reset + // TODO: convert this to tuple + pub namespace_prefix: String, +} + +impl Decode for UnsubscribeNamespace { + fn decode(r: &mut R) -> Result { + let namespace_prefix = String::decode(r)?; + Ok(Self { namespace_prefix }) + } +} + +impl Encode for UnsubscribeNamespace { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.namespace_prefix.encode(w)?; + Ok(()) + } +} diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 20da553..f35ba9f 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -162,6 +162,11 @@ impl Publisher { message::Subscriber::Unsubscribe(msg) => self.recv_unsubscribe(msg), message::Subscriber::SubscribeUpdate(msg) => self.recv_subscribe_update(msg), message::Subscriber::TrackStatusRequest(msg) => self.recv_track_status_request(msg), + // TODO: Implement namespace messages. + message::Subscriber::SubscribeNamespace(_msg) => unimplemented!(), + message::Subscriber::SubscribeNamespaceOk(_msg) => unimplemented!(), + message::Subscriber::SubscribeNamespaceError(_msg) => unimplemented!(), + message::Subscriber::UnsubscribeNamespace(_msg) => unimplemented!(), }; if let Err(err) = res { From f0e00e932f9974f98e9870010e768d7d2377faf3 Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Sat, 2 Nov 2024 22:55:01 +0300 Subject: [PATCH 17/20] moq-transport: Add Tuple type This commit adds the new Tuple type x(tuple). --- moq-transport/src/coding/mod.rs | 2 + moq-transport/src/coding/tuple.rs | 93 +++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 moq-transport/src/coding/tuple.rs diff --git a/moq-transport/src/coding/mod.rs b/moq-transport/src/coding/mod.rs index a3ff6f7..e471224 100644 --- a/moq-transport/src/coding/mod.rs +++ b/moq-transport/src/coding/mod.rs @@ -2,9 +2,11 @@ mod decode; mod encode; mod params; mod string; +mod tuple; mod varint; pub use decode::*; pub use encode::*; pub use params::*; +pub use tuple::*; pub use varint::*; diff --git a/moq-transport/src/coding/tuple.rs b/moq-transport/src/coding/tuple.rs new file mode 100644 index 0000000..d67b7a0 --- /dev/null +++ b/moq-transport/src/coding/tuple.rs @@ -0,0 +1,93 @@ +// +use super::{Decode, DecodeError, Encode, EncodeError}; + +/// Tuple Field +#[derive(Clone, Debug, Default)] +pub struct TupleField { + pub value: Vec, +} + +impl Decode for TupleField { + fn decode(r: &mut R) -> Result { + let size = usize::decode(r)?; + Self::decode_remaining(r, size)?; + let mut buf = vec![0; size]; + r.copy_to_slice(&mut buf); + Ok(Self { value: buf }) + } +} + +impl Encode for TupleField { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.value.len().encode(w)?; + Self::encode_remaining(w, self.value.len())?; + w.put_slice(&self.value); + Ok(()) + } +} + +impl TupleField { + pub fn new() -> Self { + Self::default() + } + + pub fn set(&mut self, p: P) -> Result<(), EncodeError> { + let mut value = Vec::new(); + p.encode(&mut value)?; + self.value = value; + Ok(()) + } + + pub fn get(&self) -> Result { + P::decode(&mut bytes::Bytes::from(self.value.clone())) + } +} + +/// Tuple +#[derive(Clone, Debug, Default)] +pub struct Tuple { + pub fields: Vec, +} + +impl Decode for Tuple { + fn decode(r: &mut R) -> Result { + let count = u64::decode(r)? as usize; + let mut fields = Vec::new(); + for _ in 0..count { + fields.push(TupleField::decode(r)?); + } + Ok(Self { fields }) + } +} + +impl Encode for Tuple { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.fields.len().encode(w)?; + for field in &self.fields { + field.encode(w)?; + } + Ok(()) + } +} + +impl Tuple { + pub fn new() -> Self { + Self::default() + } + + pub fn add(&mut self, field: TupleField) { + self.fields.push(field); + } + + pub fn get(&self, index: usize) -> Result { + self.fields[index].get() + } + + pub fn set(&mut self, index: usize, f: TupleField) -> Result<(), EncodeError> { + self.fields[index].set(f) + } + + pub fn clear(&mut self) { + self.fields.clear(); + } +} From f0291c3b943cdb2a54f5fe1942822c0ae9d1a3d9 Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Sun, 3 Nov 2024 00:04:09 +0300 Subject: [PATCH 18/20] moq-transport: Change type of namespace to tuple The commit changes the type of track namespace from string to tuple. So many changes in one commit. --- moq-clock-ietf/src/main.rs | 5 +- moq-dir/src/listings.rs | 9 ++-- moq-dir/src/session.rs | 2 +- moq-pub/src/main.rs | 4 +- moq-pub/src/media.rs | 2 +- moq-relay-ietf/src/consumer.rs | 4 +- moq-relay-ietf/src/local.rs | 11 ++-- moq-relay-ietf/src/remote.rs | 11 ++-- moq-sub/src/main.rs | 4 +- moq-transport/src/coding/tuple.rs | 53 ++++++++++++++++++- moq-transport/src/message/announce.rs | 6 +-- moq-transport/src/message/announce_cancel.rs | 6 +-- moq-transport/src/message/announce_error.rs | 6 +-- moq-transport/src/message/announce_ok.rs | 6 +-- moq-transport/src/message/subscribe.rs | 6 +-- .../src/message/subscribe_namespace.rs | 7 ++- .../src/message/subscribe_namespace_error.rs | 7 ++- .../src/message/subscribe_namespace_ok.rs | 7 ++- moq-transport/src/message/subscribe_update.rs | 6 +-- moq-transport/src/message/track_status.rs | 6 +-- .../src/message/track_status_request.rs | 6 +-- moq-transport/src/message/unannounce.rs | 6 +-- .../src/message/unsubscribe_namespace.rs | 7 ++- moq-transport/src/serve/track.rs | 5 +- moq-transport/src/serve/tracks.rs | 5 +- moq-transport/src/session/announce.rs | 7 +-- moq-transport/src/session/announced.rs | 3 +- moq-transport/src/session/publisher.rs | 7 +-- moq-transport/src/session/subscribe.rs | 3 +- moq-transport/src/session/subscriber.rs | 8 +-- .../src/session/track_status_requested.rs | 3 +- 31 files changed, 145 insertions(+), 83 deletions(-) diff --git a/moq-clock-ietf/src/main.rs b/moq-clock-ietf/src/main.rs index 24bf276..449d6c6 100644 --- a/moq-clock-ietf/src/main.rs +++ b/moq-clock-ietf/src/main.rs @@ -8,6 +8,7 @@ use clap::Parser; mod clock; use moq_transport::{ + coding::Tuple, serve, session::{Publisher, Subscriber}, }; @@ -64,7 +65,7 @@ async fn main() -> anyhow::Result<()> { .context("failed to create MoQ Transport session")?; let (mut writer, _, reader) = serve::Tracks { - namespace: config.namespace.clone(), + namespace: Tuple::from_utf8_path(&config.namespace), } .produce(); @@ -81,7 +82,7 @@ async fn main() -> anyhow::Result<()> { .await .context("failed to create MoQ Transport session")?; - let (prod, sub) = serve::Track::new(config.namespace, config.track).produce(); + let (prod, sub) = serve::Track::new(Tuple::from_utf8_path(&config.namespace), config.track).produce(); let clock = clock::Subscriber::new(sub); diff --git a/moq-dir/src/listings.rs b/moq-dir/src/listings.rs index ae5d26b..dd0adbc 100644 --- a/moq-dir/src/listings.rs +++ b/moq-dir/src/listings.rs @@ -3,6 +3,7 @@ use std::{ sync::{Arc, Mutex}, }; +use moq_transport::coding::Tuple; use moq_transport::serve::{ServeError, Tracks, TracksReader, TracksWriter}; use crate::{ListingReader, ListingWriter}; @@ -20,7 +21,7 @@ pub struct Listings { impl Listings { pub fn new(namespace: String) -> Self { - let (writer, _, reader) = Tracks::new(namespace).produce(); + let (writer, _, reader) = Tracks::new(Tuple::from_utf8_path(&namespace)).produce(); let state = State { writer, @@ -37,13 +38,15 @@ impl Listings { pub fn register(&mut self, path: &str) -> Result, ServeError> { let (prefix, base) = Self::prefix(path); - if !prefix.starts_with(&self.reader.namespace) { + let namespace = self.reader.namespace.to_utf8_path(); + + if !prefix.starts_with(&namespace) { // Ignore anything that isn't in our namespace. return Ok(None); } // Remove the namespace prefix from the path. - let prefix = &prefix[self.reader.namespace.len()..]; + let prefix = &prefix[namespace.len()..]; let mut state = self.state.lock().unwrap(); if let Some(listing) = state.active.get_mut(prefix) { diff --git a/moq-dir/src/session.rs b/moq-dir/src/session.rs index d302af5..03602c5 100644 --- a/moq-dir/src/session.rs +++ b/moq-dir/src/session.rs @@ -68,7 +68,7 @@ impl Session { async fn serve_announce(mut self, mut announce: Announced) -> anyhow::Result<()> { announce.ok()?; - match self.listings.register(&announce.namespace) { + match self.listings.register(&announce.namespace.to_utf8_path()) { Ok(_) => announce.closed().await?, Err(err) => { announce.close(err.clone())?; diff --git a/moq-pub/src/main.rs b/moq-pub/src/main.rs index 0f910cd..7898dac 100644 --- a/moq-pub/src/main.rs +++ b/moq-pub/src/main.rs @@ -8,7 +8,7 @@ use tokio::io::AsyncReadExt; use moq_native_ietf::quic; use moq_pub::Media; -use moq_transport::{serve, session::Publisher}; +use moq_transport::{coding::Tuple, serve, session::Publisher}; #[derive(Parser, Clone)] pub struct Cli { @@ -51,7 +51,7 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); - let (writer, _, reader) = serve::Tracks::new(cli.name).produce(); + let (writer, _, reader) = serve::Tracks::new(Tuple::from_utf8_path(&cli.name)).produce(); let media = Media::new(writer)?; let tls = cli.tls.load()?; diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index 5992e60..8198688 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -157,7 +157,7 @@ impl Media { let mut track = moq_catalog::Track { init_track: Some(self.init.name.clone()), name: name.clone(), - namespace: Some(self.broadcast.namespace.clone()), + namespace: Some(self.broadcast.namespace.to_utf8_path()), packaging: Some(moq_catalog::TrackPackaging::Cmaf), render_group: Some(1), ..Default::default() diff --git a/moq-relay-ietf/src/consumer.rs b/moq-relay-ietf/src/consumer.rs index 0d008a3..9a8c0e1 100644 --- a/moq-relay-ietf/src/consumer.rs +++ b/moq-relay-ietf/src/consumer.rs @@ -51,10 +51,10 @@ impl Consumer { async fn serve(mut self, mut announce: Announced) -> Result<(), anyhow::Error> { let mut tasks = FuturesUnordered::new(); - let (_, mut request, reader) = Tracks::new(announce.namespace.to_string()).produce(); + let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce(); if let Some(api) = self.api.as_ref() { - let mut refresh = api.set_origin(reader.namespace.clone()).await?; + let mut refresh = api.set_origin(reader.namespace.to_utf8_path()).await?; tasks.push(async move { refresh.run().await.context("failed refreshing origin") }.boxed()); } diff --git a/moq-relay-ietf/src/local.rs b/moq-relay-ietf/src/local.rs index 8e4217a..78194c4 100644 --- a/moq-relay-ietf/src/local.rs +++ b/moq-relay-ietf/src/local.rs @@ -3,11 +3,14 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use moq_transport::serve::{ServeError, TracksReader}; +use moq_transport::{ + coding::Tuple, + serve::{ServeError, TracksReader}, +}; #[derive(Clone)] pub struct Locals { - lookup: Arc>>, + lookup: Arc>>, } impl Default for Locals { @@ -38,14 +41,14 @@ impl Locals { Ok(registration) } - pub fn route(&self, namespace: &str) -> Option { + pub fn route(&self, namespace: &Tuple) -> Option { self.lookup.lock().unwrap().get(namespace).cloned() } } pub struct Registration { locals: Locals, - namespace: String, + namespace: Tuple, } impl Drop for Registration { diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index af96265..6f3e641 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -10,6 +10,7 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use moq_native_ietf::quic; +use moq_transport::coding::Tuple; use moq_transport::serve::{Track, TrackReader, TrackWriter}; use moq_transport::watch::State; use url::Url; @@ -119,9 +120,9 @@ impl RemotesConsumer { Self { info, state } } - pub async fn route(&self, namespace: &str) -> anyhow::Result> { + pub async fn route(&self, namespace: &Tuple) -> anyhow::Result> { // Always fetch the origin instead of using the (potentially invalid) cache. - let origin = match self.api.get_origin(namespace).await? { + let origin = match self.api.get_origin(&namespace.to_utf8_path()).await? { None => return Ok(None), Some(origin) => origin, }; @@ -192,7 +193,7 @@ impl Remote { #[derive(Default)] struct RemoteState { - tracks: HashMap<(String, String), RemoteTrackWeak>, + tracks: HashMap<(Tuple, String), RemoteTrackWeak>, requested: VecDeque, } @@ -285,7 +286,7 @@ impl RemoteConsumer { } /// Request a track from the broadcast. - pub fn subscribe(&self, namespace: String, name: String) -> anyhow::Result> { + pub fn subscribe(&self, namespace: Tuple, name: String) -> anyhow::Result> { let key = (namespace.clone(), name.clone()); let state = self.state.lock(); if let Some(track) = state.tracks.get(&key) { @@ -372,7 +373,7 @@ impl RemoteTrackWeak { struct RemoteTrackDrop { parent: State, - key: (String, String), + key: (Tuple, String), } impl Drop for RemoteTrackDrop { diff --git a/moq-sub/src/main.rs b/moq-sub/src/main.rs index f15382d..fc514d5 100644 --- a/moq-sub/src/main.rs +++ b/moq-sub/src/main.rs @@ -6,7 +6,7 @@ use url::Url; use moq_native_ietf::quic; use moq_sub::media::Media; -use moq_transport::serve::Tracks; +use moq_transport::{coding::Tuple, serve::Tracks}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { .context("failed to create MoQ Transport session")?; // Associate empty set of Tracks with provided namespace - let tracks = Tracks::new(config.name); + let tracks = Tracks::new(Tuple::from_utf8_path(&config.name)); let mut media = Media::new(subscriber, tracks, out).await?; diff --git a/moq-transport/src/coding/tuple.rs b/moq-transport/src/coding/tuple.rs index d67b7a0..8589095 100644 --- a/moq-transport/src/coding/tuple.rs +++ b/moq-transport/src/coding/tuple.rs @@ -1,12 +1,26 @@ // use super::{Decode, DecodeError, Encode, EncodeError}; - +use core::hash::{Hash, Hasher}; /// Tuple Field #[derive(Clone, Debug, Default)] pub struct TupleField { pub value: Vec, } +impl Eq for TupleField {} + +impl PartialEq for TupleField { + fn eq(&self, other: &Self) -> bool { + self.value.eq(&other.value) + } +} + +impl Hash for TupleField { + fn hash(&self, state: &mut H) { + self.value.hash(state); + } +} + impl Decode for TupleField { fn decode(r: &mut R) -> Result { let size = usize::decode(r)?; @@ -31,6 +45,12 @@ impl TupleField { Self::default() } + pub fn from_utf8(path: &str) -> Self { + let mut field = TupleField::new(); + field.value = path.as_bytes().to_vec(); + field + } + pub fn set(&mut self, p: P) -> Result<(), EncodeError> { let mut value = Vec::new(); p.encode(&mut value)?; @@ -49,6 +69,20 @@ pub struct Tuple { pub fields: Vec, } +impl Hash for Tuple { + fn hash(&self, state: &mut H) { + self.fields.hash(state); + } +} + +impl Eq for Tuple {} + +impl PartialEq for Tuple { + fn eq(&self, other: &Self) -> bool { + self.fields.eq(&other.fields) + } +} + impl Decode for Tuple { fn decode(r: &mut R) -> Result { let count = u64::decode(r)? as usize; @@ -90,4 +124,21 @@ impl Tuple { pub fn clear(&mut self) { self.fields.clear(); } + + pub fn from_utf8_path(path: &str) -> Self { + let mut tuple = Tuple::new(); + for part in path.split('/') { + tuple.add(TupleField::from_utf8(part)); + } + tuple + } + + pub fn to_utf8_path(&self) -> String { + let mut path = String::new(); + for field in &self.fields { + path.push('/'); + path.push_str(&String::from_utf8_lossy(&field.value)); + } + path + } } diff --git a/moq-transport/src/message/announce.rs b/moq-transport/src/message/announce.rs index 114fb5c..00d1100 100644 --- a/moq-transport/src/message/announce.rs +++ b/moq-transport/src/message/announce.rs @@ -1,10 +1,10 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; /// Sent by the publisher to announce the availability of a group of tracks. #[derive(Clone, Debug)] pub struct Announce { /// The track namespace - pub namespace: String, + pub namespace: Tuple, /// Optional parameters pub params: Params, @@ -12,7 +12,7 @@ pub struct Announce { impl Decode for Announce { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; let params = Params::decode(r)?; Ok(Self { namespace, params }) diff --git a/moq-transport/src/message/announce_cancel.rs b/moq-transport/src/message/announce_cancel.rs index 2a3379f..086d2fe 100644 --- a/moq-transport/src/message/announce_cancel.rs +++ b/moq-transport/src/message/announce_cancel.rs @@ -1,10 +1,10 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Sent by the subscriber to reject an Announce after ANNOUNCE_OK #[derive(Clone, Debug)] pub struct AnnounceCancel { // Echo back the namespace that was reset - pub namespace: String, + pub namespace: Tuple, // An error code. //pub code: u64, @@ -14,7 +14,7 @@ pub struct AnnounceCancel { impl Decode for AnnounceCancel { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; //let code = u64::decode(r)?; //let reason = String::decode(r)?; diff --git a/moq-transport/src/message/announce_error.rs b/moq-transport/src/message/announce_error.rs index 4c468a3..6201ded 100644 --- a/moq-transport/src/message/announce_error.rs +++ b/moq-transport/src/message/announce_error.rs @@ -1,10 +1,10 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Sent by the subscriber to reject an Announce. #[derive(Clone, Debug)] pub struct AnnounceError { // Echo back the namespace that was reset - pub namespace: String, + pub namespace: Tuple, // An error code. pub code: u64, @@ -15,7 +15,7 @@ pub struct AnnounceError { impl Decode for AnnounceError { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; let code = u64::decode(r)?; let reason = String::decode(r)?; diff --git a/moq-transport/src/message/announce_ok.rs b/moq-transport/src/message/announce_ok.rs index 0178eb1..b00a557 100644 --- a/moq-transport/src/message/announce_ok.rs +++ b/moq-transport/src/message/announce_ok.rs @@ -1,16 +1,16 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Sent by the subscriber to accept an Announce. #[derive(Clone, Debug)] pub struct AnnounceOk { // Echo back the namespace that was announced. // TODO Propose using an ID to save bytes. - pub namespace: String, + pub namespace: Tuple, } impl Decode for AnnounceOk { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; Ok(Self { namespace }) } } diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index fa6a186..3f142ce 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -1,4 +1,4 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; use crate::message::FilterType; use crate::message::GroupOrder; @@ -12,7 +12,7 @@ pub struct Subscribe { /// Track properties pub track_alias: u64, // This alias is useless but part of the spec - pub track_namespace: String, + pub track_namespace: Tuple, pub track_name: String, // Subscriber Priority @@ -34,7 +34,7 @@ impl Decode for Subscribe { fn decode(r: &mut R) -> Result { let id = u64::decode(r)?; let track_alias = u64::decode(r)?; - let track_namespace = String::decode(r)?; + let track_namespace = Tuple::decode(r)?; let track_name = String::decode(r)?; let subscriber_priority = u8::decode(r)?; diff --git a/moq-transport/src/message/subscribe_namespace.rs b/moq-transport/src/message/subscribe_namespace.rs index 1203068..68522ce 100644 --- a/moq-transport/src/message/subscribe_namespace.rs +++ b/moq-transport/src/message/subscribe_namespace.rs @@ -1,12 +1,11 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; /// Subscribe Namespace /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#section-6.11 #[derive(Clone, Debug)] pub struct SubscribeNamespace { /// The track namespace - // TODO: convert this to tuple - pub namespace_prefix: String, + pub namespace_prefix: Tuple, /// Optional parameters pub params: Params, @@ -14,7 +13,7 @@ pub struct SubscribeNamespace { impl Decode for SubscribeNamespace { fn decode(r: &mut R) -> Result { - let namespace_prefix = String::decode(r)?; + let namespace_prefix = Tuple::decode(r)?; let params = Params::decode(r)?; Ok(Self { diff --git a/moq-transport/src/message/subscribe_namespace_error.rs b/moq-transport/src/message/subscribe_namespace_error.rs index 2f6ce98..5a6ffb4 100644 --- a/moq-transport/src/message/subscribe_namespace_error.rs +++ b/moq-transport/src/message/subscribe_namespace_error.rs @@ -1,12 +1,11 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Subscribe Namespace Error /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#name-subscribe_namespace_error #[derive(Clone, Debug)] pub struct SubscribeNamespaceError { // Echo back the namespace that was reset - // TODO: convert this to tuple - pub namespace_prefix: String, + pub namespace_prefix: Tuple, // An error code. pub code: u64, @@ -17,7 +16,7 @@ pub struct SubscribeNamespaceError { impl Decode for SubscribeNamespaceError { fn decode(r: &mut R) -> Result { - let namespace_prefix = String::decode(r)?; + let namespace_prefix = Tuple::decode(r)?; let code = u64::decode(r)?; let reason = String::decode(r)?; diff --git a/moq-transport/src/message/subscribe_namespace_ok.rs b/moq-transport/src/message/subscribe_namespace_ok.rs index 8e946c8..df5b9ae 100644 --- a/moq-transport/src/message/subscribe_namespace_ok.rs +++ b/moq-transport/src/message/subscribe_namespace_ok.rs @@ -1,17 +1,16 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Subscribe Namespace Ok /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#name-subscribe_namespace_ok #[derive(Clone, Debug)] pub struct SubscribeNamespaceOk { // Echo back the namespace that was announced. - // TODO: convert this to tuple - pub namespace_prefix: String, + pub namespace_prefix: Tuple, } impl Decode for SubscribeNamespaceOk { fn decode(r: &mut R) -> Result { - let namespace_prefix = String::decode(r)?; + let namespace_prefix = Tuple::decode(r)?; Ok(Self { namespace_prefix }) } } diff --git a/moq-transport/src/message/subscribe_update.rs b/moq-transport/src/message/subscribe_update.rs index 2547d82..4f67f9e 100644 --- a/moq-transport/src/message/subscribe_update.rs +++ b/moq-transport/src/message/subscribe_update.rs @@ -1,4 +1,4 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; use crate::message::subscribe::{SubscribeLocation, SubscribePair}; use crate::message::FilterType; use crate::message::GroupOrder; @@ -13,7 +13,7 @@ pub struct SubscribeUpdate { /// Track properties pub track_alias: u64, // This alias is useless but part of the spec - pub track_namespace: String, + pub track_namespace: Tuple, pub track_name: String, // Subscriber Priority @@ -35,7 +35,7 @@ impl Decode for SubscribeUpdate { fn decode(r: &mut R) -> Result { let id = u64::decode(r)?; let track_alias = u64::decode(r)?; - let track_namespace = String::decode(r)?; + let track_namespace = Tuple::decode(r)?; let track_name = String::decode(r)?; let subscriber_priority = u8::decode(r)?; diff --git a/moq-transport/src/message/track_status.rs b/moq-transport/src/message/track_status.rs index 16e0dc0..df2458b 100644 --- a/moq-transport/src/message/track_status.rs +++ b/moq-transport/src/message/track_status.rs @@ -1,10 +1,10 @@ use super::TrackStatusCode; -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; #[derive(Clone, Debug)] pub struct TrackStatus { /// Track Namespace - pub track_namespace: String, + pub track_namespace: Tuple, /// Track Name pub track_name: String, /// Status Code @@ -18,7 +18,7 @@ pub struct TrackStatus { impl Decode for TrackStatus { fn decode(r: &mut R) -> Result { Ok(Self { - track_namespace: String::decode(r)?, + track_namespace: Tuple::decode(r)?, track_name: String::decode(r)?, status_code: TrackStatusCode::decode(r)?, last_group_id: u64::decode(r)?, diff --git a/moq-transport/src/message/track_status_request.rs b/moq-transport/src/message/track_status_request.rs index 8900066..e2c3707 100644 --- a/moq-transport/src/message/track_status_request.rs +++ b/moq-transport/src/message/track_status_request.rs @@ -1,16 +1,16 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; #[derive(Clone, Debug)] pub struct TrackStatusRequest { /// Track Namespace - pub track_namespace: String, + pub track_namespace: Tuple, /// Track Name pub track_name: String, } impl Decode for TrackStatusRequest { fn decode(r: &mut R) -> Result { - let track_namespace = String::decode(r)?; + let track_namespace = Tuple::decode(r)?; let track_name = String::decode(r)?; Ok(Self { diff --git a/moq-transport/src/message/unannounce.rs b/moq-transport/src/message/unannounce.rs index e856bf0..4c510e8 100644 --- a/moq-transport/src/message/unannounce.rs +++ b/moq-transport/src/message/unannounce.rs @@ -1,15 +1,15 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Sent by the publisher to terminate an Announce. #[derive(Clone, Debug)] pub struct Unannounce { // Echo back the namespace that was reset - pub namespace: String, + pub namespace: Tuple, } impl Decode for Unannounce { fn decode(r: &mut R) -> Result { - let namespace = String::decode(r)?; + let namespace = Tuple::decode(r)?; Ok(Self { namespace }) } diff --git a/moq-transport/src/message/unsubscribe_namespace.rs b/moq-transport/src/message/unsubscribe_namespace.rs index 8ba15fa..2fe9d09 100644 --- a/moq-transport/src/message/unsubscribe_namespace.rs +++ b/moq-transport/src/message/unsubscribe_namespace.rs @@ -1,17 +1,16 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Tuple}; /// Unsubscribe Namespace /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-06.html#name-unsubscribe_namespace #[derive(Clone, Debug)] pub struct UnsubscribeNamespace { // Echo back the namespace that was reset - // TODO: convert this to tuple - pub namespace_prefix: String, + pub namespace_prefix: Tuple, } impl Decode for UnsubscribeNamespace { fn decode(r: &mut R) -> Result { - let namespace_prefix = String::decode(r)?; + let namespace_prefix = Tuple::decode(r)?; Ok(Self { namespace_prefix }) } } diff --git a/moq-transport/src/serve/track.rs b/moq-transport/src/serve/track.rs index 28afc55..fd3ed09 100644 --- a/moq-transport/src/serve/track.rs +++ b/moq-transport/src/serve/track.rs @@ -18,18 +18,19 @@ use super::{ Datagrams, DatagramsReader, DatagramsWriter, ObjectsWriter, ServeError, Stream, StreamReader, StreamWriter, Subgroups, SubgroupsReader, SubgroupsWriter, }; +use crate::coding::Tuple; use paste::paste; use std::{ops::Deref, sync::Arc}; /// Static information about a track. #[derive(Debug, Clone, PartialEq)] pub struct Track { - pub namespace: String, + pub namespace: Tuple, pub name: String, } impl Track { - pub fn new(namespace: String, name: String) -> Self { + pub fn new(namespace: Tuple, name: String) -> Self { Self { namespace, name } } diff --git a/moq-transport/src/serve/tracks.rs b/moq-transport/src/serve/tracks.rs index b2de227..f6120e7 100644 --- a/moq-transport/src/serve/tracks.rs +++ b/moq-transport/src/serve/tracks.rs @@ -13,16 +13,17 @@ use std::{collections::HashMap, ops::Deref, sync::Arc}; use super::{ServeError, Track, TrackReader, TrackWriter}; +use crate::coding::Tuple; use crate::watch::{Queue, State}; /// Static information about a broadcast. #[derive(Debug)] pub struct Tracks { - pub namespace: String, + pub namespace: Tuple, } impl Tracks { - pub fn new(namespace: String) -> Self { + pub fn new(namespace: Tuple) -> Self { Self { namespace } } diff --git a/moq-transport/src/session/announce.rs b/moq-transport/src/session/announce.rs index 706be52..478a266 100644 --- a/moq-transport/src/session/announce.rs +++ b/moq-transport/src/session/announce.rs @@ -1,5 +1,6 @@ use std::{collections::VecDeque, ops}; +use crate::coding::Tuple; use crate::watch::State; use crate::{message, serve::ServeError}; @@ -7,7 +8,7 @@ use super::{Publisher, Subscribed, TrackStatusRequested}; #[derive(Debug, Clone)] pub struct AnnounceInfo { - pub namespace: String, + pub namespace: Tuple, } struct AnnounceState { @@ -45,7 +46,7 @@ pub struct Announce { } impl Announce { - pub(super) fn new(mut publisher: Publisher, namespace: String) -> (Announce, AnnounceRecv) { + pub(super) fn new(mut publisher: Publisher, namespace: Tuple) -> (Announce, AnnounceRecv) { let info = AnnounceInfo { namespace: namespace.clone(), }; @@ -148,7 +149,7 @@ impl Drop for Announce { } self.publisher.send_message(message::Unannounce { - namespace: self.namespace.to_string(), + namespace: self.namespace.clone(), }); } } diff --git a/moq-transport/src/session/announced.rs b/moq-transport/src/session/announced.rs index 78b00a2..17f3b5d 100644 --- a/moq-transport/src/session/announced.rs +++ b/moq-transport/src/session/announced.rs @@ -1,5 +1,6 @@ use std::ops; +use crate::coding::Tuple; use crate::watch::State; use crate::{message, serve::ServeError}; @@ -21,7 +22,7 @@ pub struct Announced { } impl Announced { - pub(super) fn new(session: Subscriber, namespace: String) -> (Announced, AnnouncedRecv) { + pub(super) fn new(session: Subscriber, namespace: Tuple) -> (Announced, AnnouncedRecv) { let info = AnnounceInfo { namespace }; let (send, recv) = State::default().split(); diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index f35ba9f..ed58246 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -6,6 +6,7 @@ use std::{ use futures::{stream::FuturesUnordered, StreamExt}; use crate::{ + coding::Tuple, message::{self, Message}, serve::{ServeError, TracksReader}, setup, @@ -20,7 +21,7 @@ use super::{Announce, AnnounceRecv, Session, SessionError, Subscribed, Subscribe pub struct Publisher { webtransport: web_transport::Session, - announces: Arc>>, + announces: Arc>>, subscribed: Arc>>, unknown: Queue, @@ -264,7 +265,7 @@ impl Publisher { match &msg { message::Publisher::SubscribeDone(msg) => self.drop_subscribe(msg.id), message::Publisher::SubscribeError(msg) => self.drop_subscribe(msg.id), - message::Publisher::Unannounce(msg) => self.drop_announce(msg.namespace.as_str()), + message::Publisher::Unannounce(msg) => self.drop_announce(&msg.namespace), _ => (), }; @@ -275,7 +276,7 @@ impl Publisher { self.subscribed.lock().unwrap().remove(&id); } - fn drop_announce(&mut self, namespace: &str) { + fn drop_announce(&mut self, namespace: &Tuple) { self.announces.lock().unwrap().remove(namespace); } diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 1097a39..a5ac476 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -1,6 +1,7 @@ use std::ops; use crate::{ + coding::Tuple, data, message::{self, FilterType, GroupOrder, SubscribeLocation, SubscribePair}, serve::{self, ServeError, TrackWriter, TrackWriterMode}, @@ -12,7 +13,7 @@ use super::Subscriber; #[derive(Debug, Clone)] pub struct SubscribeInfo { - pub namespace: String, + pub namespace: Tuple, pub name: String, } diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 76f93ab..8428187 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -5,7 +5,7 @@ use std::{ }; use crate::{ - coding::Decode, + coding::{Decode, Tuple}, data, message::{self, Message}, serve::{self, ServeError}, @@ -19,7 +19,7 @@ use super::{Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, // TODO remove Clone. #[derive(Clone)] pub struct Subscriber { - announced: Arc>>, + announced: Arc>>, announced_queue: Queue, subscribes: Arc>>, @@ -102,7 +102,7 @@ impl Subscriber { hash_map::Entry::Vacant(entry) => entry, }; - let (announced, recv) = Announced::new(self.clone(), msg.namespace.to_string()); + let (announced, recv) = Announced::new(self.clone(), msg.namespace.clone()); if let Err(announced) = self.announced_queue.push(announced) { announced.close(ServeError::Cancel)?; return Ok(()); @@ -152,7 +152,7 @@ impl Subscriber { Ok(()) } - fn drop_announce(&mut self, namespace: &str) { + fn drop_announce(&mut self, namespace: &Tuple) { self.announced.lock().unwrap().remove(namespace); } diff --git a/moq-transport/src/session/track_status_requested.rs b/moq-transport/src/session/track_status_requested.rs index 75fb1ba..16decfe 100644 --- a/moq-transport/src/session/track_status_requested.rs +++ b/moq-transport/src/session/track_status_requested.rs @@ -1,9 +1,10 @@ use super::{Publisher, SessionError}; +use crate::coding::Tuple; use crate::message; #[derive(Debug, Clone)] pub struct TrackStatusRequestedInfo { - pub namespace: String, + pub namespace: Tuple, pub track: String, } From 9fef17c9b4c588493f8f89e1cbf8a6ca1fca51dc Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Sun, 3 Nov 2024 20:45:11 +0300 Subject: [PATCH 19/20] moq-transport: Add length field to client and server setup This commit adds the length field to control messages, client setup and server setup. --- moq-transport/src/setup/client.rs | 19 +++++++++++++++++-- moq-transport/src/setup/server.rs | 21 +++++++++++++++++++-- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/moq-transport/src/setup/client.rs b/moq-transport/src/setup/client.rs index 9b414d0..58c1d5e 100644 --- a/moq-transport/src/setup/client.rs +++ b/moq-transport/src/setup/client.rs @@ -24,6 +24,10 @@ impl Decode for Client { return Err(DecodeError::InvalidMessage(typ)); } + let _len = u64::decode(r)?; + + // TODO: Check the length of the message. + let versions = Versions::decode(r)?; let mut params = Params::decode(r)?; @@ -43,12 +47,23 @@ impl Encode for Client { /// Encode a server setup message. fn encode(&self, w: &mut W) -> Result<(), EncodeError> { 0x40_u64.encode(w)?; - self.versions.encode(w)?; + + // Find out the length of the message + // by encoding it into a buffer and then encoding the length. + // This is a bit wasteful, but it's the only way to know the length. + let mut buf = Vec::new(); + + self.versions.encode(&mut buf).unwrap(); let mut params = self.params.clone(); params.set(0, self.role)?; + params.encode(&mut buf).unwrap(); + + (buf.len() as u64).encode(w)?; - params.encode(w)?; + // At least don't encode the message twice. + // Instead, write the buffer directly to the writer. + w.put_slice(&buf); Ok(()) } diff --git a/moq-transport/src/setup/server.rs b/moq-transport/src/setup/server.rs index 113bcdf..aaeb55d 100644 --- a/moq-transport/src/setup/server.rs +++ b/moq-transport/src/setup/server.rs @@ -25,6 +25,10 @@ impl Decode for Server { return Err(DecodeError::InvalidMessage(typ)); } + let _len = u64::decode(r)?; + + // TODO: Check the length of the message. + let version = Version::decode(r)?; let mut params = Params::decode(r)?; @@ -42,11 +46,23 @@ impl Decode for Server { impl Encode for Server { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { 0x41_u64.encode(w)?; - self.version.encode(w)?; + + // Find out the length of the message + // by encoding it into a buffer and then encoding the length. + // This is a bit wasteful, but it's the only way to know the length. + let mut buf = Vec::new(); + + self.version.encode(&mut buf).unwrap(); let mut params = self.params.clone(); params.set(0, self.role)?; - params.encode(w)?; + params.encode(&mut buf).unwrap(); + + (buf.len() as u64).encode(w)?; + + // At least don't encode the message twice. + // Instead, write the buffer directly to the writer. + w.put_slice(&buf); Ok(()) } @@ -58,6 +74,7 @@ mod tests { use crate::setup::Role; use bytes::BytesMut; + // TODO: Update this test to use the draft 06 version #[test] fn encode_decode() { let mut buf = BytesMut::new(); From 41c46ada938e4cb4fd311cef2667bbf133bcf28e Mon Sep 17 00:00:00 2001 From: Zafer Gurel Date: Sun, 3 Nov 2024 21:01:26 +0300 Subject: [PATCH 20/20] moq-transport: Add length of params to SubscribeOk --- moq-transport/src/message/subscribe_ok.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/message/subscribe_ok.rs index 15e07f0..1c4ef17 100644 --- a/moq-transport/src/message/subscribe_ok.rs +++ b/moq-transport/src/message/subscribe_ok.rs @@ -35,6 +35,10 @@ impl Decode for SubscribeOk { _ => return Err(DecodeError::InvalidValue), }; + // Skip the parameters. + // TODO: Implement parameters for SubscribeOk + let _ = u8::decode(r)?; + Ok(Self { id, expires, @@ -64,6 +68,9 @@ impl Encode for SubscribeOk { } } + // Add 0 for the length of the parameters + w.put_u8(0); + Ok(()) } }