Skip to content

Commit

Permalink
Add WHIP output (#834)
Browse files Browse the repository at this point in the history
Co-authored-by: bartosz rzepa <bartosz.rzepa@swmansion.com>
  • Loading branch information
wkazmierczak and brzep authored Dec 10, 2024
1 parent a6d9b7d commit 97405df
Show file tree
Hide file tree
Showing 31 changed files with 2,662 additions and 143 deletions.
1,162 changes: 1,136 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions compositor_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ compositor_render = { workspace = true }
serde = { workspace = true }
schemars = { workspace = true }
bytes = { workspace = true }
tracing = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
compositor_pipeline = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions compositor_api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use component::WebView;
pub use register_input::Mp4Input;
pub use register_output::Mp4Output;
pub use register_output::RtpOutput;
pub use register_output::WhipOutput;

pub use register_input::DeckLink;
pub use register_input::RtpInput;
Expand Down
97 changes: 97 additions & 0 deletions compositor_api/src/types/from_register_output.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use axum::http::HeaderValue;
use compositor_pipeline::pipeline::{
self,
encoder::{
Expand All @@ -9,6 +10,7 @@ use compositor_pipeline::pipeline::{
output::{
self,
mp4::{Mp4AudioTrack, Mp4OutputOptions, Mp4VideoTrack},
whip::WhipAudioOptions,
},
};

Expand Down Expand Up @@ -173,6 +175,88 @@ impl TryFrom<Mp4Output> for pipeline::RegisterOutputOptions<output::OutputOption
}
}

impl TryFrom<WhipOutput> for pipeline::RegisterOutputOptions<output::OutputOptions> {
type Error = TypeError;

fn try_from(request: WhipOutput) -> Result<Self, Self::Error> {
let WhipOutput {
endpoint_url,
bearer_token,
video,
audio,
} = request;

if video.is_none() && audio.is_none() {
return Err(TypeError::new(
"At least one of \"video\" and \"audio\" fields have to be specified.",
));
}
let video_codec = video.as_ref().map(|v| match v.encoder {
VideoEncoderOptions::FfmpegH264 { .. } => pipeline::VideoCodec::H264,
});
let audio_options = audio.as_ref().map(|a| match &a.encoder {
WhipAudioEncoderOptions::Opus {
channels,
preset: _,
} => WhipAudioOptions {
codec: pipeline::AudioCodec::Opus,
channels: match channels {
audio::AudioChannels::Mono => {
compositor_pipeline::audio_mixer::AudioChannels::Mono
}
audio::AudioChannels::Stereo => {
compositor_pipeline::audio_mixer::AudioChannels::Stereo
}
},
},
});

if let Some(token) = &bearer_token {
if HeaderValue::from_str(format!("Bearer {token}").as_str()).is_err() {
return Err(TypeError::new("Bearer token string is not valid. It must contain only 32-127 ASCII characters"));
};
}

let (video_encoder_options, output_video_options) = maybe_video_options(video)?;
let (audio_encoder_options, output_audio_options) = match audio {
Some(OutputWhipAudioOptions {
mixing_strategy,
send_eos_when,
encoder,
initial,
}) => {
let audio_encoder_options: AudioEncoderOptions = encoder.into();
let output_audio_options = pipeline::OutputAudioOptions {
initial: initial.try_into()?,
end_condition: send_eos_when.unwrap_or_default().try_into()?,
mixing_strategy: mixing_strategy.unwrap_or(MixingStrategy::SumClip).into(),
channels: audio_encoder_options.channels(),
};

(Some(audio_encoder_options), Some(output_audio_options))
}
None => (None, None),
};

let output_options = output::OutputOptions {
output_protocol: output::OutputProtocolOptions::Whip(output::whip::WhipSenderOptions {
endpoint_url,
bearer_token,
video: video_codec,
audio: audio_options,
}),
video: video_encoder_options,
audio: audio_encoder_options,
};

Ok(Self {
output_options,
video: output_video_options,
audio: output_audio_options,
})
}
}

fn maybe_video_options(
options: Option<OutputVideoOptions>,
) -> Result<
Expand Down Expand Up @@ -230,6 +314,19 @@ impl From<RtpAudioEncoderOptions> for pipeline::encoder::AudioEncoderOptions {
}
}

impl From<WhipAudioEncoderOptions> for pipeline::encoder::AudioEncoderOptions {
fn from(value: WhipAudioEncoderOptions) -> Self {
match value {
WhipAudioEncoderOptions::Opus { channels, preset } => {
AudioEncoderOptions::Opus(encoder::opus::OpusEncoderOptions {
channels: channels.into(),
preset: preset.unwrap_or(OpusEncoderPreset::Voip).into(),
})
}
}
}
}

impl TryFrom<OutputEndCondition> for pipeline::PipelineOutputEndCondition {
type Error = TypeError;

Expand Down
38 changes: 38 additions & 0 deletions compositor_api/src/types/register_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ pub struct Mp4Output {
pub audio: Option<OutputMp4AudioOptions>,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct WhipOutput {
/// WHIP server endpoint
pub endpoint_url: String,
// Bearer token
pub bearer_token: Option<Arc<str>>,
/// Video track configuration.
pub video: Option<OutputVideoOptions>,
/// Audio track configuration.
pub audio: Option<OutputWhipAudioOptions>,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputVideoOptions {
Expand Down Expand Up @@ -77,6 +90,19 @@ pub struct OutputMp4AudioOptions {
pub initial: Audio,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputWhipAudioOptions {
/// (**default="sum_clip"**) Specifies how audio should be mixed.
pub mixing_strategy: Option<MixingStrategy>,
/// Condition for termination of output stream based on the input streams states.
pub send_eos_when: Option<OutputEndCondition>,
/// Audio encoder options.
pub encoder: WhipAudioEncoderOptions,
/// Initial audio mixer configuration for output.
pub initial: Audio,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)]
pub enum VideoEncoderOptions {
Expand Down Expand Up @@ -108,6 +134,18 @@ pub enum Mp4AudioEncoderOptions {
Aac { channels: AudioChannels },
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)]
pub enum WhipAudioEncoderOptions {
Opus {
/// Specifies channels configuration.
channels: AudioChannels,

/// (**default="voip"**) Specifies preset for audio output encoder.
preset: Option<OpusEncoderPreset>,
},
}

/// This type defines when end of an input stream should trigger end of the output stream. Only one of those fields can be set at the time.
/// Unless specified otherwise the input stream is considered finished/ended when:
/// - TCP connection was dropped/closed.
Expand Down
5 changes: 5 additions & 0 deletions compositor_pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ rubato = "0.15.0"
wgpu = { workspace = true }
vk-video = { path = "../vk-video/", optional = true }
glyphon = { workspace = true }
webrtc = "0.11.0"
tokio = {workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
url = "2.5.2"

[target.x86_64-unknown-linux-gnu.dependencies]
decklink = { path = "../decklink", optional = true }
14 changes: 13 additions & 1 deletion compositor_pipeline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use compositor_render::{
InputId, OutputId,
};

use crate::pipeline::{decoder::AacDecoderError, VideoCodec};
use crate::pipeline::{decoder::AacDecoderError, output::whip, VideoCodec};
use fdk_aac_sys as fdk;

#[derive(Debug, thiserror::Error)]
Expand All @@ -20,6 +20,9 @@ pub enum InitPipelineError {
#[cfg(feature = "vk-video")]
#[error(transparent)]
VulkanCtxError(#[from] vk_video::VulkanCtxError),

#[error("Failed to create tokio::Runtime.")]
CreateTokioRuntime(#[source] std::io::Error),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -90,6 +93,15 @@ pub enum OutputInitError {

#[error("Failed to register output. FFmpeg error: {0}.")]
FfmpegMp4Error(ffmpeg_next::Error),

#[error("Unkown Whip output error.")]
UnknownWhipError,

#[error("Whip init timeout exceeded")]
WhipInitTimeout,

#[error("Failed to init whip output")]
WhipInitError(#[source] Box<whip::WhipError>),
}

#[derive(Debug, thiserror::Error)]
Expand Down
27 changes: 21 additions & 6 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use input::RawDataInputOptions;
use output::EncodedDataOutputOptions;
use output::OutputOptions;
use output::RawDataOutputOptions;
use pipeline_output::register_pipeline_output;
use tokio::runtime::Runtime;
use tracing::{error, info, trace, warn};
use types::RawDataSender;

Expand Down Expand Up @@ -121,17 +123,21 @@ pub struct Options {
pub force_gpu: bool,
pub download_root: PathBuf,
pub output_sample_rate: u32,
pub stun_servers: Arc<Vec<String>>,
pub wgpu_features: WgpuFeatures,
pub load_system_fonts: Option<bool>,
pub wgpu_ctx: Option<GraphicsContext>,
pub tokio_rt: Option<Arc<Runtime>>,
}

#[derive(Clone)]
pub struct PipelineCtx {
pub output_sample_rate: u32,
pub output_framerate: Framerate,
pub stun_servers: Arc<Vec<String>>,
pub download_dir: Arc<PathBuf>,
pub event_emitter: Arc<EventEmitter>,
pub tokio_rt: Arc<Runtime>,
#[cfg(feature = "vk-video")]
pub vulkan_ctx: Option<graphics_context::VulkanCtx>,
}
Expand Down Expand Up @@ -181,6 +187,10 @@ impl Pipeline {
.join(format!("live-compositor-{}", rand::random::<u64>()));
std::fs::create_dir_all(&download_dir).map_err(InitPipelineError::CreateDownloadDir)?;

let tokio_rt = match opts.tokio_rt {
Some(tokio_rt) => tokio_rt,
None => Arc::new(Runtime::new().map_err(InitPipelineError::CreateTokioRuntime)?),
};
let event_emitter = Arc::new(EventEmitter::new());
let pipeline = Pipeline {
outputs: HashMap::new(),
Expand All @@ -192,8 +202,10 @@ impl Pipeline {
ctx: PipelineCtx {
output_sample_rate: opts.output_sample_rate,
output_framerate: opts.queue_options.output_framerate,
stun_servers: opts.stun_servers,
download_dir: download_dir.into(),
event_emitter,
tokio_rt,
#[cfg(feature = "vk-video")]
vulkan_ctx: preinitialized_ctx.and_then(|ctx| ctx.vulkan_ctx),
},
Expand Down Expand Up @@ -252,11 +264,12 @@ impl Pipeline {
}

pub fn register_output(
&mut self,
pipeline: &Arc<Mutex<Self>>,
output_id: OutputId,
register_options: RegisterOutputOptions<OutputOptions>,
) -> Result<Option<Port>, RegisterOutputError> {
self.register_pipeline_output(
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
Expand All @@ -265,11 +278,12 @@ impl Pipeline {
}

pub fn register_encoded_data_output(
&mut self,
pipeline: &Arc<Mutex<Self>>,
output_id: OutputId,
register_options: RegisterOutputOptions<EncodedDataOutputOptions>,
) -> Result<Receiver<EncoderOutputEvent>, RegisterOutputError> {
self.register_pipeline_output(
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
Expand All @@ -278,11 +292,12 @@ impl Pipeline {
}

pub fn register_raw_data_output(
&mut self,
pipeline: &Arc<Mutex<Self>>,
output_id: OutputId,
register_options: RegisterOutputOptions<RawDataOutputOptions>,
) -> Result<RawDataReceiver, RegisterOutputError> {
self.register_pipeline_output(
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
Expand Down
14 changes: 12 additions & 2 deletions compositor_pipeline/src/pipeline/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ impl Encoder {
}
}

pub fn keyframe_request_sender(&self) -> Option<Sender<()>> {
match self.video.as_ref() {
Some(VideoEncoder::H264(encoder)) => Some(encoder.keyframe_request_sender().clone()),
None => {
error!("Non video encoder received keyframe request.");
None
}
}
}

pub fn samples_batch_sender(&self) -> Option<&Sender<PipelineEvent<OutputSamples>>> {
match &self.audio {
Some(encoder) => Some(encoder.samples_batch_sender()),
Expand Down Expand Up @@ -138,9 +148,9 @@ impl VideoEncoder {
}
}

pub fn request_keyframe(&self) {
pub fn keyframe_request_sender(&self) -> Sender<()> {
match self {
Self::H264(encoder) => encoder.request_keyframe(),
Self::H264(encoder) => encoder.keyframe_request_sender(),
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,8 @@ impl LibavH264Encoder {
self.resolution
}

pub fn request_keyframe(&self) {
if let Err(err) = self.keyframe_req_sender.send(()) {
debug!(%err, "Failed to send keyframe request to the encoder.");
}
pub fn keyframe_request_sender(&self) -> Sender<()> {
self.keyframe_req_sender.clone()
}
}

Expand Down
Loading

0 comments on commit 97405df

Please sign in to comment.