From 6afc4a36b16c50f0c69eed2e7c090f9bc4a7dfe1 Mon Sep 17 00:00:00 2001 From: Wojciech Kozyra Date: Tue, 3 Dec 2024 16:03:58 +0100 Subject: [PATCH] wip --- compositor_pipeline/src/pipeline/input.rs | 13 +- .../src/pipeline/input/decklink.rs | 2 +- compositor_pipeline/src/pipeline/input/mp4.rs | 378 +++++++++++++---- .../src/pipeline/input/mp4/mp4_file_reader.rs | 387 ------------------ .../src/pipeline/input/mp4/reader.rs | 245 +++++++++++ compositor_pipeline/src/pipeline/input/rtp.rs | 2 +- src/routes/register_request.rs | 19 +- src/state.rs | 8 +- 8 files changed, 587 insertions(+), 467 deletions(-) delete mode 100644 compositor_pipeline/src/pipeline/input/mp4/mp4_file_reader.rs create mode 100644 compositor_pipeline/src/pipeline/input/mp4/reader.rs diff --git a/compositor_pipeline/src/pipeline/input.rs b/compositor_pipeline/src/pipeline/input.rs index baba3159a..7705be939 100644 --- a/compositor_pipeline/src/pipeline/input.rs +++ b/compositor_pipeline/src/pipeline/input.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::{ error::{InputInitError, RegisterInputError}, queue::PipelineEvent, @@ -45,8 +47,15 @@ pub struct RawDataInputOptions { pub audio: bool, } -pub struct InputInitInfo { - pub port: Option, +pub enum InputInitInfo { + Rtp { + port: Option, + }, + Mp4 { + video_duration: Option, + audio_duration: Option, + }, + Other, } struct InputInitResult { diff --git a/compositor_pipeline/src/pipeline/input/decklink.rs b/compositor_pipeline/src/pipeline/input/decklink.rs index 92538b417..f72ed60a4 100644 --- a/compositor_pipeline/src/pipeline/input/decklink.rs +++ b/compositor_pipeline/src/pipeline/input/decklink.rs @@ -97,7 +97,7 @@ impl DeckLink { sample_receiver: rec, sample_rate: AUDIO_SAMPLE_RATE, }), - init_info: InputInitInfo { port: None }, + init_info: InputInitInfo::Other, }) } } diff --git a/compositor_pipeline/src/pipeline/input/mp4.rs b/compositor_pipeline/src/pipeline/input/mp4.rs index 1f1306f9a..834036a69 100644 --- a/compositor_pipeline/src/pipeline/input/mp4.rs +++ b/compositor_pipeline/src/pipeline/input/mp4.rs @@ -1,23 +1,30 @@ -use std::path::{Path, PathBuf}; +use std::{ + fs::File, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, + thread::JoinHandle, + time::Duration, +}; -use bytes::Bytes; use compositor_render::InputId; -use crossbeam_channel::Receiver; -use tracing::error; +use crossbeam_channel::{bounded, Sender}; +use reader::{DecoderOptions, Mp4FileReader, Track}; +use tracing::{debug, error, span, trace, Level, Span}; use crate::{ pipeline::{ - decoder::{AudioDecoderOptions, VideoDecoderOptions}, - VideoDecoder, + decoder::{AacDecoderOptions, AudioDecoderOptions, VideoDecoderOptions}, + EncodedChunk, VideoDecoder, }, queue::PipelineEvent, }; -use mp4_file_reader::Mp4FileReader; - use super::{AudioInputReceiver, Input, InputInitInfo, InputInitResult, VideoInputReceiver}; -pub mod mp4_file_reader; +pub mod reader; #[derive(Debug, Clone)] pub struct Mp4Options { @@ -26,19 +33,6 @@ pub struct Mp4Options { pub video_decoder: VideoDecoder, } -pub(crate) enum Mp4ReaderOptions { - NonFragmented { - file: PathBuf, - should_loop: bool, - }, - #[allow(dead_code)] - Fragmented { - header: Bytes, - fragment_receiver: Receiver>, - should_loop: bool, - }, -} - #[derive(Debug, Clone)] pub enum Source { Url(String), @@ -58,14 +52,18 @@ pub enum Mp4Error { #[error("No suitable track in the mp4 file")] NoTrack, + + #[error("Unknown error: {0}")] + Unknown(&'static str), } pub struct Mp4 { - pub input_id: InputId, - _video_thread: Option>, - _audio_thread: Option>, - source: Source, - path_to_file: PathBuf, + should_close: Arc, +} + +enum TrackType { + Video, + Audio, } impl Mp4 { @@ -74,7 +72,7 @@ impl Mp4 { options: Mp4Options, download_dir: &Path, ) -> Result { - let input_path = match options.source { + let source = match options.source { Source::Url(ref url) => { let file_response = reqwest::blocking::get(url)?; let mut file_response = file_response.error_for_status()?; @@ -89,72 +87,312 @@ impl Mp4 { std::io::copy(&mut file_response, &mut file)?; - path + Arc::new(SourceFile { + path, + remove_on_drop: true, + }) } - Source::File(ref path) => path.clone(), + Source::File(ref path) => Arc::new(SourceFile { + path: path.clone(), + remove_on_drop: false, + }), }; - let video = Mp4FileReader::new_video( - Mp4ReaderOptions::NonFragmented { - file: input_path.clone(), - should_loop: options.should_loop, - }, - input_id.clone(), - options.video_decoder, - )?; + let video = Mp4FileReader::from_path(&source.path)?.find_h264_track(); + let video_duration = video.as_ref().and_then(|track| track.duration()); + let audio = Mp4FileReader::from_path(&source.path)?.find_aac_track(); + let audio_duration = audio.as_ref().and_then(|track| track.duration()); - let (video_reader, video_receiver) = match video { - Some((reader, receiver)) => { - let input_receiver = VideoInputReceiver::Encoded { + if video.is_none() && audio.is_none() { + return Err(Mp4Error::NoTrack); + } + + let (video_sender, video_receiver, video_track) = match video { + Some(track) => { + let (sender, receiver) = crossbeam_channel::bounded(10); + let receiver = VideoInputReceiver::Encoded { chunk_receiver: receiver, - decoder_options: VideoDecoderOptions { - decoder: options.video_decoder, + decoder_options: match track.decoder_options() { + DecoderOptions::H264 => VideoDecoderOptions { + decoder: options.video_decoder, + }, + _ => return Err(Mp4Error::Unknown("Non H264 decoder options returned.")), }, }; - (Some(reader), Some(input_receiver)) + (Some(sender), Some(receiver), Some(track)) } - None => (None, None), + None => (None, None, None), }; - let audio = Mp4FileReader::new_audio( - Mp4ReaderOptions::NonFragmented { - file: input_path.clone(), - should_loop: options.should_loop, - }, - input_id.clone(), - )?; - - let (audio_reader, audio_receiver) = match audio { - Some((reader, receiver)) => { - let input_receiver = AudioInputReceiver::Encoded { - decoder_options: reader.decoder_options(), + let (audio_sender, audio_receiver, audio_track) = match audio { + Some(track) => { + let (sender, receiver) = crossbeam_channel::bounded(10); + let receiver = AudioInputReceiver::Encoded { chunk_receiver: receiver, + decoder_options: match track.decoder_options() { + DecoderOptions::Aac(data) => AudioDecoderOptions::Aac(AacDecoderOptions { + depayloader_mode: None, + asc: Some(data.clone()), + }), + _ => return Err(Mp4Error::Unknown("Non AAC decoder options returned.")), + }, }; - (Some(reader), Some(input_receiver)) + (Some(sender), Some(receiver), Some(track)) } - None => (None, None), + None => (None, None, None), }; + let video_span = span!(Level::INFO, "MP4 video", input_id = input_id.to_string()); + let audio_span = span!(Level::INFO, "MP4 audio", input_id = input_id.to_string()); + let should_close = Arc::new(AtomicBool::new(false)); + if options.should_loop { + start_thread_with_loop( + video_sender, + video_track, + video_span, + audio_sender, + audio_track, + audio_span, + should_close.clone(), + source, + ); + } else { + start_thread_single_run( + video_sender, + video_track, + video_span, + audio_sender, + audio_track, + audio_span, + should_close.clone(), + source, + ); + } + Ok(InputInitResult { - input: Input::Mp4(Self { - input_id: input_id.clone(), - _video_thread: video_reader, - _audio_thread: audio_reader, - source: options.source, - path_to_file: input_path, - }), + input: Input::Mp4(Self { should_close }), video: video_receiver, audio: audio_receiver, - init_info: InputInitInfo { port: None }, + init_info: InputInitInfo::Mp4 { + video_duration, + audio_duration, + }, }) } } +#[allow(clippy::too_many_arguments)] +fn start_thread_with_loop( + video_sender: Option>>, + video_track: Option>, + video_span: Span, + audio_sender: Option>>, + audio_track: Option>, + audio_span: Span, + should_close_input: Arc, + source_file: Arc, +) { + std::thread::Builder::new() + .name("mp4 reader".to_string()) + .spawn(move || { + enum TrackProvider { + Value(Track), + Handle(JoinHandle>), + } + let _source_file = source_file; + let mut offset = Duration::ZERO; + let has_audio = audio_track.is_some(); + let last_audio_sample_pts = Arc::new(AtomicU64::new(0)); + let last_video_sample_pts = Arc::new(AtomicU64::new(0)); + let mut video_track = video_track.map(TrackProvider::Value); + let mut audio_track = audio_track.map(TrackProvider::Value); + + loop { + let (finished_track_sender, finished_track_receiver) = bounded(1); + let should_close = Arc::new(AtomicBool::new(false)); + let video_thread = video_sender + .clone() + .and_then(|sender| video_track.take().map(|track| (track, sender))) + .map(|(track, sender)| { + let span = video_span.clone(); + let finished_track_sender = finished_track_sender.clone(); + let last_sample_pts = last_video_sample_pts.clone(); + let should_close = should_close.clone(); + let should_close_input = should_close_input.clone(); + std::thread::Builder::new() + .name("mp4 reader - video".to_string()) + .spawn(move || { + let _span = span.enter(); + let mut track = match track { + TrackProvider::Value(track) => track, + TrackProvider::Handle(handle) => handle.join().unwrap(), + }; + for (mut chunk, duration) in track.chunks() { + chunk.pts += offset; + chunk.dts = chunk.dts.map(|dts| dts + offset); + last_sample_pts.fetch_max( + (chunk.pts + duration).as_nanos() as u64, + Ordering::Relaxed, + ); + trace!(pts=?chunk.pts, "MP4 reader produced a video chunk."); + if sender.send(PipelineEvent::Data(chunk)).is_err() { + debug!("Failed to send a video chunk. Channel closed.") + } + if should_close.load(Ordering::Relaxed) + || should_close_input.load(Ordering::Relaxed) + { + break; + } + // TODO: send flush + } + let _ = finished_track_sender.send(TrackType::Video); + track + }) + .unwrap() + }); + + let audio_thread = audio_sender + .clone() + .and_then(|sender| audio_track.take().map(|track| (track, sender))) + .map(|(track, sender)| { + let span = audio_span.clone(); + let finished_track_sender = finished_track_sender.clone(); + let last_sample_pts = last_audio_sample_pts.clone(); + let should_close = should_close.clone(); + let should_close_input = should_close_input.clone(); + std::thread::Builder::new() + .name("mp4 reader - audio".to_string()) + .spawn(move || { + let _span = span.enter(); + let mut track = match track { + TrackProvider::Value(track) => track, + TrackProvider::Handle(handle) => handle.join().unwrap(), + }; + for (mut chunk, duration) in track.chunks() { + chunk.pts += offset; + chunk.dts = chunk.dts.map(|dts| dts + offset); + last_sample_pts.fetch_max( + (chunk.pts + duration).as_nanos() as u64, + Ordering::Relaxed, + ); + trace!(pts=?chunk.pts, "MP4 reader produced an audio chunk."); + if sender.send(PipelineEvent::Data(chunk)).is_err() { + debug!("Failed to send a audio chunk. Channel closed.") + } + if should_close.load(Ordering::Relaxed) + || should_close_input.load(Ordering::Relaxed) + { + break; + } + // TODO: send flush + } + let _ = finished_track_sender.send(TrackType::Audio); + track + }) + .unwrap() + }); + + match finished_track_receiver.recv().unwrap() { + TrackType::Video => { + video_track = + Some(TrackProvider::Value(video_thread.unwrap().join().unwrap())); + should_close.store(true, Ordering::Relaxed); + if let Some(audio_thread) = audio_thread { + audio_track = Some(TrackProvider::Handle(audio_thread)); + } + } + TrackType::Audio => { + audio_track = + Some(TrackProvider::Value(audio_thread.unwrap().join().unwrap())); + should_close.store(true, Ordering::Relaxed); + if let Some(video_thread) = video_thread { + video_track = Some(TrackProvider::Handle(video_thread)); + } + } + } + if has_audio { + offset = Duration::from_nanos(last_audio_sample_pts.load(Ordering::Relaxed)); + } else { + offset = Duration::from_nanos(last_video_sample_pts.load(Ordering::Relaxed)); + } + if should_close_input.load(Ordering::Relaxed) { + return; + } + } + }) + .unwrap(); +} + +#[allow(clippy::too_many_arguments)] +fn start_thread_single_run( + video_sender: Option>>, + video_track: Option>, + video_span: Span, + audio_sender: Option>>, + audio_track: Option>, + audio_span: Span, + should_close: Arc, + _source_file: Arc, +) { + if let (Some(sender), Some(mut track)) = (video_sender, video_track) { + let should_close = should_close.clone(); + std::thread::Builder::new() + .name("mp4 reader - video".to_string()) + .spawn(move || { + let _span = video_span.enter(); + for (chunk, _duration) in track.chunks() { + if sender.send(PipelineEvent::Data(chunk)).is_err() { + debug!("Failed to send a video chunk. Channel closed.") + } + if should_close.load(Ordering::Relaxed) { + break; + } + } + if sender.send(PipelineEvent::EOS).is_err() { + debug!("Failed to send EOS from MP4 video reader. Channel closed."); + } + }) + .unwrap(); + } + + if let (Some(sender), Some(mut track)) = (audio_sender, audio_track) { + let should_close = should_close.clone(); + std::thread::Builder::new() + .name("mp4 reader - audio".to_string()) + .spawn(move || { + let _span = audio_span.enter(); + for (chunk, _duration) in track.chunks() { + if sender.send(PipelineEvent::Data(chunk)).is_err() { + debug!("Failed to send a audio chunk. Channel closed.") + } + if should_close.load(Ordering::Relaxed) { + break; + } + } + if sender.send(PipelineEvent::EOS).is_err() { + debug!("Failed to send EOS from MP4 audio reader. Channel closed."); + } + }) + .unwrap(); + }; +} + impl Drop for Mp4 { fn drop(&mut self) { - if let Source::Url(_) = self.source { - if let Err(e) = std::fs::remove_file(&self.path_to_file) { - error!(input_id=?self.input_id.0, "Error while removing the downloaded mp4 file: {e}"); + self.should_close.store(true, Ordering::Relaxed); + } +} + +struct SourceFile { + pub path: PathBuf, + remove_on_drop: bool, +} + +impl Drop for SourceFile { + fn drop(&mut self) { + if self.remove_on_drop { + if let Err(e) = std::fs::remove_file(&self.path) { + error!("Error while removing the downloaded mp4 file: {e}"); } } } diff --git a/compositor_pipeline/src/pipeline/input/mp4/mp4_file_reader.rs b/compositor_pipeline/src/pipeline/input/mp4/mp4_file_reader.rs deleted file mode 100644 index fedd15b2e..000000000 --- a/compositor_pipeline/src/pipeline/input/mp4/mp4_file_reader.rs +++ /dev/null @@ -1,387 +0,0 @@ -use std::{ - io::{Read, Seek}, - os::unix::fs::MetadataExt, - sync::{atomic::AtomicBool, Arc}, - time::Duration, -}; - -use bytes::{Buf, Bytes, BytesMut}; -use compositor_render::InputId; -use crossbeam_channel::{Receiver, Sender}; -use mp4::Mp4Reader; -use tracing::{debug, span, trace, warn, Level, Span}; - -use crate::{ - pipeline::{ - decoder::{AacDecoderOptions, AudioDecoderOptions, VideoDecoderOptions}, - types::{EncodedChunk, EncodedChunkKind}, - AudioCodec, VideoCodec, VideoDecoder, - }, - queue::PipelineEvent, -}; - -use super::{Mp4Error, Mp4ReaderOptions}; - -type ChunkReceiver = Receiver>; - -pub(crate) struct Mp4FileReader { - stop_thread: Arc, - fragment_sender: Option>>, - decoder_options: DecoderOptions, -} - -struct TrackInfo Bytes> { - sample_count: u32, - timescale: u32, - track_id: u32, - decoder_options: DecoderOptions, - sample_unpacker: SampleUnpacker, - chunk_kind: EncodedChunkKind, -} - -impl Mp4FileReader { - pub(crate) fn new_audio( - options: Mp4ReaderOptions, - input_id: InputId, - ) -> Result, Mp4Error> { - let stop_thread = Arc::new(AtomicBool::new(false)); - let span = span!(Level::INFO, "MP4 audio", input_id = input_id.to_string()); - - match options { - Mp4ReaderOptions::NonFragmented { file, should_loop } => { - let input_file = std::fs::File::open(file)?; - let size = input_file.metadata()?.size(); - Self::new( - input_file, - size, - Self::find_aac_info, - None, - stop_thread, - span, - should_loop, - ) - } - Mp4ReaderOptions::Fragmented { - header, - fragment_receiver, - should_loop, - } => { - let size = header.len() as u64; - let reader = std::io::Cursor::new(header); - Self::new( - reader, - size, - Self::find_aac_info, - Some(fragment_receiver), - stop_thread, - span, - should_loop, - ) - } - } - } - - fn find_aac_info( - reader: &mp4::Mp4Reader, - ) -> Option Bytes>> { - let (&track_id, track, aac) = reader.tracks().iter().find_map(|(id, track)| { - let track_type = track.track_type().ok()?; - let media_type = track.media_type().ok()?; - let aac = track.trak.mdia.minf.stbl.stsd.mp4a.as_ref(); - - if track_type != mp4::TrackType::Audio - || media_type != mp4::MediaType::AAC - || aac.is_none() - { - return None; - } - - aac.map(|aac| (id, track, aac)) - })?; - - let asc = aac - .esds - .as_ref() - .and_then(|esds| esds.es_desc.dec_config.dec_specific.full_config.clone()) - .map(Bytes::from); - - let decoder_options = AudioDecoderOptions::Aac(AacDecoderOptions { - asc, - depayloader_mode: None, - }); - - Some(TrackInfo { - sample_count: track.sample_count(), - timescale: track.timescale(), - track_id, - decoder_options, - sample_unpacker: |sample| sample.bytes, - chunk_kind: EncodedChunkKind::Audio(AudioCodec::Aac), - }) - } -} - -impl Mp4FileReader { - pub(crate) fn new_video( - options: Mp4ReaderOptions, - input_id: InputId, - video_decoder: VideoDecoder, - ) -> Result, ChunkReceiver)>, Mp4Error> { - let stop_thread = Arc::new(AtomicBool::new(false)); - let span = span!(Level::INFO, "MP4 video", input_id = input_id.to_string()); - - match options { - Mp4ReaderOptions::NonFragmented { file, should_loop } => { - let input_file = std::fs::File::open(file)?; - let size = input_file.metadata()?.size(); - Self::new( - input_file, - size, - |r| Self::find_h264_info(r, video_decoder), - None, - stop_thread, - span, - should_loop, - ) - } - Mp4ReaderOptions::Fragmented { - header, - fragment_receiver, - should_loop, - } => { - let size = header.len() as u64; - let reader = std::io::Cursor::new(header); - Self::new( - reader, - size, - |r| Self::find_h264_info(r, video_decoder), - Some(fragment_receiver), - stop_thread, - span, - should_loop, - ) - } - } - } - - fn find_h264_info( - reader: &mp4::Mp4Reader, - video_decoder: VideoDecoder, - ) -> Option Bytes>> { - let (&track_id, track, avc) = reader.tracks().iter().find_map(|(id, track)| { - let track_type = track.track_type().ok()?; - let media_type = track.media_type().ok()?; - let avc = track.avc1_or_3_inner(); - - if track_type != mp4::TrackType::Video - || media_type != mp4::MediaType::H264 - || avc.is_none() - { - return None; - } - - avc.map(|avc| (id, track, avc)) - })?; - - // sps and pps have to be extracted from the container, interleaved with [0, 0, 0, 1], - // concatenated and prepended to the first frame. - let sps = avc - .avcc - .sequence_parameter_sets - .iter() - .flat_map(|s| [0, 0, 0, 1].iter().chain(s.bytes.iter())); - - let pps = avc - .avcc - .picture_parameter_sets - .iter() - .flat_map(|s| [0, 0, 0, 1].iter().chain(s.bytes.iter())); - - let mut sps_and_pps_payload = Some(sps.chain(pps).copied().collect::()); - - let length_size = avc.avcc.length_size_minus_one + 1; - - let sample_unpacker = move |sample: mp4::Mp4Sample| { - let mut sample_data = sample.bytes.reader(); - let mut data: BytesMut = Default::default(); - - if let Some(first_nal) = sps_and_pps_payload.take() { - data.extend_from_slice(&first_nal); - } - - // the mp4 sample contains one h264 access unit (possibly more than one NAL). - // the NALs are stored as: . - // we need to convert this into Annex B, in which NALs are separated by - // [0, 0, 0, 1]. `length_size` is at most 4 bytes long. - loop { - let mut len = [0u8; 4]; - - if sample_data - .read_exact(&mut len[4 - length_size as usize..]) - .is_err() - { - break; - } - - let len = u32::from_be_bytes(len); - - let mut nalu = bytes::BytesMut::zeroed(len as usize); - sample_data.read_exact(&mut nalu).unwrap(); - - data.extend_from_slice(&[0, 0, 0, 1]); - data.extend_from_slice(&nalu); - } - - data.freeze() - }; - - let decoder_options = VideoDecoderOptions { - decoder: video_decoder, - }; - - Some(TrackInfo { - sample_count: track.sample_count(), - timescale: track.timescale(), - decoder_options, - track_id, - sample_unpacker, - chunk_kind: EncodedChunkKind::Video(VideoCodec::H264), - }) - } - - #[allow(dead_code)] - pub(crate) fn fragment_sender(&self) -> Option>> { - self.fragment_sender.clone() - } -} - -impl Mp4FileReader { - fn new< - Reader: Read + Seek + Send + 'static, - SampleUnpacker: FnMut(mp4::Mp4Sample) -> Bytes + Send + 'static, - >( - reader: Reader, - size: u64, - track_info_reader: impl Fn( - &mp4::Mp4Reader, - ) -> Option>, - fragment_receiver: Option>>, - stop_thread: Arc, - span: Span, - should_loop: bool, - ) -> Result, Mp4Error> { - let reader = mp4::Mp4Reader::read_header(reader, size)?; - - let Some(track_info) = track_info_reader(&reader) else { - return Ok(None); - }; - - let (sender, receiver) = crossbeam_channel::bounded(10); - - let stop_thread_clone = stop_thread.clone(); - let decoder_options = track_info.decoder_options.clone(); - std::thread::Builder::new() - .name("mp4 reader".to_string()) - .spawn(move || { - let _guard = span.enter(); - run_reader_thread( - reader, - sender, - stop_thread_clone, - fragment_receiver, - track_info, - should_loop, - ); - debug!("Closing MP4 reader thread"); - }) - .unwrap(); - - Ok(Some(( - Mp4FileReader { - stop_thread, - fragment_sender: None, - decoder_options, - }, - receiver, - ))) - } - - pub(crate) fn decoder_options(&self) -> DecoderOptions { - self.decoder_options.clone() - } -} - -impl Drop for Mp4FileReader { - fn drop(&mut self) { - self.stop_thread - .store(true, std::sync::atomic::Ordering::Relaxed) - } -} - -fn run_reader_thread( - mut reader: Mp4Reader, - sender: Sender>, - stop_thread: Arc, - _fragment_receiver: Option>>, - track_info: TrackInfo Bytes>, - should_loop: bool, -) { - let mut sample_unpacker = track_info.sample_unpacker; - let mut loop_offset = Duration::ZERO; - - loop { - let mut last_end_pts = Duration::ZERO; - for i in 1..track_info.sample_count { - if stop_thread.load(std::sync::atomic::Ordering::Relaxed) { - return; - } - - match reader.read_sample(track_info.track_id, i) { - Ok(Some(sample)) => { - let rendering_offset = sample.rendering_offset; - let start_time = sample.start_time; - let sample_duration = Duration::from_secs_f64( - sample.duration as f64 / track_info.timescale as f64, - ); - - let dts = - Duration::from_secs_f64(start_time as f64 / track_info.timescale as f64) - + loop_offset; - let pts = Duration::from_secs_f64( - (start_time as f64 + rendering_offset as f64) / track_info.timescale as f64, - ) + loop_offset; - last_end_pts = pts + sample_duration; - - let data = sample_unpacker(sample); - - let chunk = EncodedChunk { - data, - pts, - dts: Some(dts), - kind: track_info.chunk_kind, - }; - - trace!(pts=?chunk.pts, "MP4 reader produced a chunk."); - match sender.send(PipelineEvent::Data(chunk)) { - Ok(_) => {} - Err(_) => { - debug!("Failed to send MP4 chunk. Channel closed."); - return; - } - } - } - Err(e) => { - warn!("Error while reading MP4 video sample: {:?}", e); - } - _ => {} - } - } - loop_offset = last_end_pts; - if !should_loop { - break; - } - } - if let Err(_err) = sender.send(PipelineEvent::EOS) { - debug!("Failed to send EOS from MP4 video reader. Channel closed."); - } -} diff --git a/compositor_pipeline/src/pipeline/input/mp4/reader.rs b/compositor_pipeline/src/pipeline/input/mp4/reader.rs new file mode 100644 index 000000000..06185d920 --- /dev/null +++ b/compositor_pipeline/src/pipeline/input/mp4/reader.rs @@ -0,0 +1,245 @@ +use std::{ + fs::File, + io::{Read, Seek}, + os::unix::fs::MetadataExt, + path::Path, + time::Duration, +}; + +use bytes::{Buf, Bytes, BytesMut}; +use mp4::Mp4Sample; +use tracing::warn; + +use crate::pipeline::{ + types::{EncodedChunk, EncodedChunkKind}, + AudioCodec, VideoCodec, +}; + +use super::Mp4Error; + +pub(super) struct Mp4FileReader { + reader: mp4::Mp4Reader, +} + +#[derive(Debug, Clone)] +pub(super) enum DecoderOptions { + H264, + Aac(Bytes), +} + +impl Mp4FileReader { + pub fn from_path(path: &Path) -> Result { + let file = std::fs::File::open(path)?; + let size = file.metadata()?.size(); + Self::new(file, size) + } +} + +impl Mp4FileReader { + fn new(reader: Reader, size: u64) -> Result { + let reader = mp4::Mp4Reader::read_header(reader, size)?; + + Ok(Mp4FileReader { reader }) + } + + pub fn find_aac_track(self) -> Option> { + let (&track_id, track, aac) = self.reader.tracks().iter().find_map(|(id, track)| { + let track_type = track.track_type().ok()?; + let media_type = track.media_type().ok()?; + let aac = track.trak.mdia.minf.stbl.stsd.mp4a.as_ref(); + + if track_type != mp4::TrackType::Audio + || media_type != mp4::MediaType::AAC + || aac.is_none() + { + return None; + } + + aac.map(|aac| (id, track, aac)) + })?; + + let asc = aac + .esds + .as_ref() + .and_then(|esds| esds.es_desc.dec_config.dec_specific.full_config.clone()) + .map(Bytes::from); + let Some(asc) = asc else { + warn!("Decoder options for AAC track were not found."); + return None; + }; + + Some(Track { + sample_count: track.sample_count(), + timescale: track.timescale(), + track_id, + sample_unpacker: Box::new(|sample| sample.bytes), + duration: track.duration(), + decoder_options: DecoderOptions::Aac(asc), + reader: self.reader, + }) + } + + pub fn find_h264_track(self) -> Option> { + let (&track_id, track, avc) = self.reader.tracks().iter().find_map(|(id, track)| { + let track_type = track.track_type().ok()?; + let media_type = track.media_type().ok()?; + let avc = track.avc1_or_3_inner(); + + if track_type != mp4::TrackType::Video + || media_type != mp4::MediaType::H264 + || avc.is_none() + { + return None; + } + + avc.map(|avc| (id, track, avc)) + })?; + + // sps and pps have to be extracted from the container, interleaved with [0, 0, 0, 1], + // concatenated and prepended to the first frame. + let sps = avc + .avcc + .sequence_parameter_sets + .iter() + .flat_map(|s| [0, 0, 0, 1].iter().chain(s.bytes.iter())); + + let pps = avc + .avcc + .picture_parameter_sets + .iter() + .flat_map(|s| [0, 0, 0, 1].iter().chain(s.bytes.iter())); + + let mut sps_and_pps_payload = Some(sps.chain(pps).copied().collect::()); + + let length_size = avc.avcc.length_size_minus_one + 1; + + let sample_unpacker = move |sample: mp4::Mp4Sample| { + let mut sample_data = sample.bytes.reader(); + let mut data: BytesMut = Default::default(); + + if let Some(first_nal) = sps_and_pps_payload.take() { + data.extend_from_slice(&first_nal); + } + + // the mp4 sample contains one h264 access unit (possibly more than one NAL). + // the NALs are stored as: . + // we need to convert this into Annex B, in which NALs are separated by + // [0, 0, 0, 1]. `length_size` is at most 4 bytes long. + loop { + let mut len = [0u8; 4]; + + if sample_data + .read_exact(&mut len[4 - length_size as usize..]) + .is_err() + { + break; + } + + let len = u32::from_be_bytes(len); + + let mut nalu = bytes::BytesMut::zeroed(len as usize); + sample_data.read_exact(&mut nalu).unwrap(); + + data.extend_from_slice(&[0, 0, 0, 1]); + data.extend_from_slice(&nalu); + } + + data.freeze() + }; + + Some(Track { + sample_unpacker: Box::new(sample_unpacker), + sample_count: track.sample_count(), + timescale: track.timescale(), + track_id, + duration: track.duration(), + decoder_options: DecoderOptions::H264, + reader: self.reader, + }) + } +} + +pub(crate) struct Track { + reader: mp4::Mp4Reader, + sample_unpacker: Box Bytes + Send>, + sample_count: u32, + timescale: u32, + track_id: u32, + duration: Duration, + decoder_options: DecoderOptions, +} + +impl Track { + pub(crate) fn chunks(&mut self) -> TrackChunks<'_, Reader> { + TrackChunks { + track: self, + last_sample_index: 1, + } + } + + pub(super) fn decoder_options(&self) -> &DecoderOptions { + &self.decoder_options + } + + pub(super) fn duration(&self) -> Option { + if self.duration == Duration::ZERO { + None + } else { + Some(self.duration) + } + } +} + +pub(crate) struct TrackChunks<'a, Reader: Read + Seek + Send + 'static> { + track: &'a mut Track, + last_sample_index: u32, +} + +impl Iterator for TrackChunks<'_, Reader> { + type Item = (EncodedChunk, Duration); + + fn next(&mut self) -> Option { + while self.last_sample_index < self.track.sample_count { + let sample = self + .track + .reader + .read_sample(self.track.track_id, self.last_sample_index); + self.last_sample_index += 1; + match sample { + Ok(Some(sample)) => return Some(self.sample_into_chunk(sample)), + Ok(None) => {} + Err(err) => { + warn!("Error while reading MP4 sample: {:?}", err); + } + }; + } + None + } +} + +impl TrackChunks<'_, Reader> { + fn sample_into_chunk(&mut self, sample: Mp4Sample) -> (EncodedChunk, Duration) { + let rendering_offset = sample.rendering_offset; + let start_time = sample.start_time; + let sample_duration = + Duration::from_secs_f64(sample.duration as f64 / self.track.timescale as f64); + + let dts = Duration::from_secs_f64(start_time as f64 / self.track.timescale as f64); + let pts = Duration::from_secs_f64( + (start_time as f64 + rendering_offset as f64) / self.track.timescale as f64, + ); + + let data = (self.track.sample_unpacker)(sample); + + let chunk = EncodedChunk { + data, + pts, + dts: Some(dts), + kind: match self.track.decoder_options { + DecoderOptions::H264 => EncodedChunkKind::Video(VideoCodec::H264), + DecoderOptions::Aac(_) => EncodedChunkKind::Audio(AudioCodec::Aac), + }, + }; + (chunk, sample_duration) + } +} diff --git a/compositor_pipeline/src/pipeline/input/rtp.rs b/compositor_pipeline/src/pipeline/input/rtp.rs index 5520e7e29..864ec32c8 100644 --- a/compositor_pipeline/src/pipeline/input/rtp.rs +++ b/compositor_pipeline/src/pipeline/input/rtp.rs @@ -126,7 +126,7 @@ impl RtpReceiver { }), video, audio, - init_info: InputInitInfo { port: Some(port) }, + init_info: InputInitInfo::Rtp { port: Some(port) }, }) } diff --git a/src/routes/register_request.rs b/src/routes/register_request.rs index 1d4683f27..eac4aac42 100644 --- a/src/routes/register_request.rs +++ b/src/routes/register_request.rs @@ -1,5 +1,5 @@ use axum::extract::{Path, State}; -use compositor_pipeline::pipeline::Port; +use compositor_pipeline::pipeline::{input::InputInitInfo, Port}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -51,9 +51,18 @@ pub(super) async fn handle_input( Pipeline::register_input(&api.pipeline, input_id.into(), decklink.try_into()?)? } }; - match response.port { - Some(Port(port)) => Ok(Response::RegisteredPort { port }), - None => Ok(Response::Ok {}), + match response { + InputInitInfo::Rtp { port } => Ok(Response::RegisteredPort { + port: port.map(|p| p.0), + }), + InputInitInfo::Mp4 { + video_duration, + audio_duration, + } => Ok(Response::RegisteredMp4 { + video_duration_ms: video_duration.map(|v| v.as_millis() as u64), + audio_duration_ms: audio_duration.map(|a| a.as_millis() as u64), + }), + InputInitInfo::Other => Ok(Response::Ok {}), } }) .await @@ -77,7 +86,7 @@ pub(super) async fn handle_output( } }; match response { - Some(Port(port)) => Ok(Response::RegisteredPort { port }), + Some(Port(port)) => Ok(Response::RegisteredPort { port: Some(port) }), None => Ok(Response::Ok {}), } }) diff --git a/src/state.rs b/src/state.rs index 236f4bebc..eca2de842 100644 --- a/src/state.rs +++ b/src/state.rs @@ -17,7 +17,13 @@ pub type Pipeline = compositor_pipeline::Pipeline; #[serde(untagged)] pub enum Response { Ok {}, - RegisteredPort { port: u16 }, + RegisteredPort { + port: Option, + }, + RegisteredMp4 { + video_duration_ms: Option, + audio_duration_ms: Option, + }, } impl IntoResponse for Response {