Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moq-transport: change namespace type to Tuple and add new message types (draft-06) #15

Merged
merged 22 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
bf830f7
Merge pull request #11 from englishm/release-plz-2024-10-31T05-40-28Z
englishm Nov 1, 2024
09868aa
moq-transport: Bump target draft version to 06
englishm Oct 29, 2024
804e3c7
rename groups to subgroups
zafergurel Oct 25, 2024
4b87359
fixes to moq-transport, relay compiles
zafergurel Oct 25, 2024
45903f9
moq-pub uses subgroups
zafergurel Oct 25, 2024
73ba1bb
more fixes
zafergurel Oct 28, 2024
2a264cf
remove comment
zafergurel Oct 28, 2024
cd07d6c
moq-sub: s/group/subgroup/g
englishm Oct 29, 2024
b397458
moq-dir: s/group/subgroup/g
englishm Oct 29, 2024
0ea1f5b
moq-clock-ietf: s/group/subgroup/g
englishm Oct 29, 2024
c3d5de3
moq-dir: cargo fmt
englishm Oct 29, 2024
5a3e4ff
moq-clock-ietf: cargo fmt
englishm Oct 29, 2024
a3b7bdd
moq-dir: Fix one place that still uses groups?
englishm Oct 29, 2024
da21231
moq-transport: Remove object/stream (gone in -06)
englishm Nov 1, 2024
11c8e8b
moq-clock-ietf: Remove object/stream (gone in -06)
englishm Nov 1, 2024
c93f094
moq-transport: Add new error type
zafergurel Nov 1, 2024
7981390
moq-transport: first stab at subscribe namespace messages
zafergurel Nov 2, 2024
f0e00e9
moq-transport: Add Tuple type
zafergurel Nov 2, 2024
f0291c3
moq-transport: Change type of namespace to tuple
zafergurel Nov 2, 2024
a04dd6a
Merge remote-tracking branch 'englishm/me/draft-06' into me/draft-06
zafergurel Nov 3, 2024
9fef17c
moq-transport: Add length field to client and server setup
zafergurel Nov 3, 2024
41c46ad
moq-transport: Add length of params to SubscribeOk
zafergurel Nov 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions moq-clock-ietf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use clap::Parser;
mod clock;

use moq_transport::{
coding::Tuple,
serve,
session::{Publisher, Subscriber},
};
Expand Down Expand Up @@ -64,7 +65,7 @@ async fn main() -> anyhow::Result<()> {
.context("failed to create MoQ Transport session")?;

let (mut writer, _, reader) = serve::Tracks {
namespace: config.namespace.clone(),
namespace: Tuple::from_utf8_path(&config.namespace),
}
.produce();

Expand All @@ -81,7 +82,7 @@ async fn main() -> anyhow::Result<()> {
.await
.context("failed to create MoQ Transport session")?;

let (prod, sub) = serve::Track::new(config.namespace, config.track).produce();
let (prod, sub) = serve::Track::new(Tuple::from_utf8_path(&config.namespace), config.track).produce();

let clock = clock::Subscriber::new(sub);

Expand Down
9 changes: 6 additions & 3 deletions moq-dir/src/listings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
sync::{Arc, Mutex},
};

use moq_transport::coding::Tuple;
use moq_transport::serve::{ServeError, Tracks, TracksReader, TracksWriter};

use crate::{ListingReader, ListingWriter};
Expand All @@ -20,7 +21,7 @@ pub struct Listings {

impl Listings {
pub fn new(namespace: String) -> Self {
let (writer, _, reader) = Tracks::new(namespace).produce();
let (writer, _, reader) = Tracks::new(Tuple::from_utf8_path(&namespace)).produce();

let state = State {
writer,
Expand All @@ -37,13 +38,15 @@ impl Listings {
pub fn register(&mut self, path: &str) -> Result<Option<Registration>, ServeError> {
let (prefix, base) = Self::prefix(path);

if !prefix.starts_with(&self.reader.namespace) {
let namespace = self.reader.namespace.to_utf8_path();

if !prefix.starts_with(&namespace) {
// Ignore anything that isn't in our namespace.
return Ok(None);
}

// Remove the namespace prefix from the path.
let prefix = &prefix[self.reader.namespace.len()..];
let prefix = &prefix[namespace.len()..];

let mut state = self.state.lock().unwrap();
if let Some(listing) = state.active.get_mut(prefix) {
Expand Down
2 changes: 1 addition & 1 deletion moq-dir/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Session {
async fn serve_announce(mut self, mut announce: Announced) -> anyhow::Result<()> {
announce.ok()?;

match self.listings.register(&announce.namespace) {
match self.listings.register(&announce.namespace.to_utf8_path()) {
Ok(_) => announce.closed().await?,
Err(err) => {
announce.close(err.clone())?;
Expand Down
4 changes: 2 additions & 2 deletions moq-pub/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::io::AsyncReadExt;

use moq_native_ietf::quic;
use moq_pub::Media;
use moq_transport::{serve, session::Publisher};
use moq_transport::{coding::Tuple, serve, session::Publisher};

#[derive(Parser, Clone)]
pub struct Cli {
Expand Down Expand Up @@ -51,7 +51,7 @@ async fn main() -> anyhow::Result<()> {

let cli = Cli::parse();

let (writer, _, reader) = serve::Tracks::new(cli.name).produce();
let (writer, _, reader) = serve::Tracks::new(Tuple::from_utf8_path(&cli.name)).produce();
let media = Media::new(writer)?;

let tls = cli.tls.load()?;
Expand Down
2 changes: 1 addition & 1 deletion moq-pub/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Media {
let mut track = moq_catalog::Track {
init_track: Some(self.init.name.clone()),
name: name.clone(),
namespace: Some(self.broadcast.namespace.clone()),
namespace: Some(self.broadcast.namespace.to_utf8_path()),
packaging: Some(moq_catalog::TrackPackaging::Cmaf),
render_group: Some(1),
..Default::default()
Expand Down
4 changes: 2 additions & 2 deletions moq-relay-ietf/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ impl Consumer {
async fn serve(mut self, mut announce: Announced) -> Result<(), anyhow::Error> {
let mut tasks = FuturesUnordered::new();

let (_, mut request, reader) = Tracks::new(announce.namespace.to_string()).produce();
let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce();

if let Some(api) = self.api.as_ref() {
let mut refresh = api.set_origin(reader.namespace.clone()).await?;
let mut refresh = api.set_origin(reader.namespace.to_utf8_path()).await?;
tasks.push(async move { refresh.run().await.context("failed refreshing origin") }.boxed());
}

Expand Down
11 changes: 7 additions & 4 deletions moq-relay-ietf/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use std::collections::HashMap;

use std::sync::{Arc, Mutex};

use moq_transport::serve::{ServeError, TracksReader};
use moq_transport::{
coding::Tuple,
serve::{ServeError, TracksReader},
};

#[derive(Clone)]
pub struct Locals {
lookup: Arc<Mutex<HashMap<String, TracksReader>>>,
lookup: Arc<Mutex<HashMap<Tuple, TracksReader>>>,
}

impl Default for Locals {
Expand Down Expand Up @@ -38,14 +41,14 @@ impl Locals {
Ok(registration)
}

pub fn route(&self, namespace: &str) -> Option<TracksReader> {
pub fn route(&self, namespace: &Tuple) -> Option<TracksReader> {
self.lookup.lock().unwrap().get(namespace).cloned()
}
}

pub struct Registration {
locals: Locals,
namespace: String,
namespace: Tuple,
}

impl Drop for Registration {
Expand Down
11 changes: 6 additions & 5 deletions moq-relay-ietf/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use moq_native_ietf::quic;
use moq_transport::coding::Tuple;
use moq_transport::serve::{Track, TrackReader, TrackWriter};
use moq_transport::watch::State;
use url::Url;
Expand Down Expand Up @@ -119,9 +120,9 @@ impl RemotesConsumer {
Self { info, state }
}

pub async fn route(&self, namespace: &str) -> anyhow::Result<Option<RemoteConsumer>> {
pub async fn route(&self, namespace: &Tuple) -> anyhow::Result<Option<RemoteConsumer>> {
// Always fetch the origin instead of using the (potentially invalid) cache.
let origin = match self.api.get_origin(namespace).await? {
let origin = match self.api.get_origin(&namespace.to_utf8_path()).await? {
None => return Ok(None),
Some(origin) => origin,
};
Expand Down Expand Up @@ -192,7 +193,7 @@ impl Remote {

#[derive(Default)]
struct RemoteState {
tracks: HashMap<(String, String), RemoteTrackWeak>,
tracks: HashMap<(Tuple, String), RemoteTrackWeak>,
requested: VecDeque<TrackWriter>,
}

Expand Down Expand Up @@ -285,7 +286,7 @@ impl RemoteConsumer {
}

/// Request a track from the broadcast.
pub fn subscribe(&self, namespace: String, name: String) -> anyhow::Result<Option<RemoteTrackReader>> {
pub fn subscribe(&self, namespace: Tuple, name: String) -> anyhow::Result<Option<RemoteTrackReader>> {
let key = (namespace.clone(), name.clone());
let state = self.state.lock();
if let Some(track) = state.tracks.get(&key) {
Expand Down Expand Up @@ -372,7 +373,7 @@ impl RemoteTrackWeak {

struct RemoteTrackDrop {
parent: State<RemoteState>,
key: (String, String),
key: (Tuple, String),
}

impl Drop for RemoteTrackDrop {
Expand Down
4 changes: 2 additions & 2 deletions moq-sub/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use url::Url;

use moq_native_ietf::quic;
use moq_sub::media::Media;
use moq_transport::serve::Tracks;
use moq_transport::{coding::Tuple, serve::Tracks};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
.context("failed to create MoQ Transport session")?;

// Associate empty set of Tracks with provided namespace
let tracks = Tracks::new(config.name);
let tracks = Tracks::new(Tuple::from_utf8_path(&config.name));

let mut media = Media::new(subscriber, tracks, out).await?;

Expand Down
2 changes: 2 additions & 0 deletions moq-transport/src/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ mod decode;
mod encode;
mod params;
mod string;
mod tuple;
mod varint;

pub use decode::*;
pub use encode::*;
pub use params::*;
pub use tuple::*;
pub use varint::*;
144 changes: 144 additions & 0 deletions moq-transport/src/coding/tuple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//
use super::{Decode, DecodeError, Encode, EncodeError};
use core::hash::{Hash, Hasher};
/// Tuple Field
#[derive(Clone, Debug, Default)]
pub struct TupleField {
pub value: Vec<u8>,
}

impl Eq for TupleField {}

impl PartialEq for TupleField {
fn eq(&self, other: &Self) -> bool {
self.value.eq(&other.value)
}
}

impl Hash for TupleField {
fn hash<H: Hasher>(&self, state: &mut H) {
self.value.hash(state);
}
}

impl Decode for TupleField {
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
let size = usize::decode(r)?;
Self::decode_remaining(r, size)?;
let mut buf = vec![0; size];
r.copy_to_slice(&mut buf);
Ok(Self { value: buf })
}
}

impl Encode for TupleField {
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.value.len().encode(w)?;
Self::encode_remaining(w, self.value.len())?;
w.put_slice(&self.value);
Ok(())
}
}

impl TupleField {
pub fn new() -> Self {
Self::default()
}

pub fn from_utf8(path: &str) -> Self {
let mut field = TupleField::new();
field.value = path.as_bytes().to_vec();
field
}

pub fn set<P: Encode>(&mut self, p: P) -> Result<(), EncodeError> {
let mut value = Vec::new();
p.encode(&mut value)?;
self.value = value;
Ok(())
}

pub fn get<P: Decode>(&self) -> Result<P, DecodeError> {
P::decode(&mut bytes::Bytes::from(self.value.clone()))
}
}

/// Tuple
#[derive(Clone, Debug, Default)]
pub struct Tuple {
pub fields: Vec<TupleField>,
}

impl Hash for Tuple {
fn hash<H: Hasher>(&self, state: &mut H) {
self.fields.hash(state);
}
}

impl Eq for Tuple {}

impl PartialEq for Tuple {
fn eq(&self, other: &Self) -> bool {
self.fields.eq(&other.fields)
}
}

impl Decode for Tuple {
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
let count = u64::decode(r)? as usize;
let mut fields = Vec::new();
for _ in 0..count {
fields.push(TupleField::decode(r)?);
}
Ok(Self { fields })
}
}

impl Encode for Tuple {
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.fields.len().encode(w)?;
for field in &self.fields {
field.encode(w)?;
}
Ok(())
}
}

impl Tuple {
pub fn new() -> Self {
Self::default()
}

pub fn add(&mut self, field: TupleField) {
self.fields.push(field);
}

pub fn get(&self, index: usize) -> Result<TupleField, DecodeError> {
self.fields[index].get()
}

pub fn set(&mut self, index: usize, f: TupleField) -> Result<(), EncodeError> {
self.fields[index].set(f)
}

pub fn clear(&mut self) {
self.fields.clear();
}

pub fn from_utf8_path(path: &str) -> Self {
let mut tuple = Tuple::new();
for part in path.split('/') {
tuple.add(TupleField::from_utf8(part));
}
tuple
}

pub fn to_utf8_path(&self) -> String {
let mut path = String::new();
for field in &self.fields {
path.push('/');
path.push_str(&String::from_utf8_lossy(&field.value));
}
path
}
}
7 changes: 7 additions & 0 deletions moq-transport/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ pub enum SessionError {
#[error("parameter length mismatch")]
ParameterLengthMismatch,
#[error("too many subscribes")]
TooManySubscribes,
#[error("goaway timeout")]
GoawayTimeout,
#[error("unknown error: code={0}")]
Unknown(u64),
// Unofficial error codes
Expand All @@ -40,6 +46,7 @@ impl MoqError for SessionError {
Self::ProtocolViolation => 0x3,
Self::DuplicateTrackAlias => 0x4,
Self::ParameterLengthMismatch => 0x5,
Self::TooManySubscribes => 0x6,
Self::GoawayTimeout => 0x10,
Self::Unknown(code) => *code,
// Unofficial error codes
Expand Down
Loading