Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wkazmierczak committed Dec 19, 2024
1 parent 6d42fb3 commit 6ed4452
Show file tree
Hide file tree
Showing 13 changed files with 524 additions and 431 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions compositor_pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ axum = { version = "0.7.7", features = ["macros"] }
tower-http = { version = "0.6.1", features = ["cors"] }
tracing-subscriber = "0.3.18"
url = "2.5.2"
urlencoding = "2.1.3"

[target.x86_64-unknown-linux-gnu.dependencies]
decklink = { path = "../decklink", optional = true }
20 changes: 20 additions & 0 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use output::OutputOptions;
use output::RawDataOutputOptions;
use pipeline_output::register_pipeline_output;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tracing::{error, info, trace, warn};
use types::RawDataSender;
use whip_whep::run_whip_whep_server;
Expand Down Expand Up @@ -116,6 +117,8 @@ pub struct Pipeline {
renderer: Renderer,
audio_mixer: AudioMixer,
is_started: bool,
shutdown_whip_whep_sender: Option<oneshot::Sender<()>>,
shutdown_whip_whep_receiver: Option<oneshot::Receiver<()>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -199,6 +202,16 @@ impl Pipeline {
Some(tokio_rt) => tokio_rt,
None => Arc::new(Runtime::new().map_err(InitPipelineError::CreateTokioRuntime)?),
};
let shutdown_whip_whep_sender;
let shutdown_whip_whep_receiver;
if opts.start_whip_whep {
let (tx, rx) = oneshot::channel();
shutdown_whip_whep_sender = Some(tx);
shutdown_whip_whep_receiver = Some(rx);
} else {
shutdown_whip_whep_sender = None;
shutdown_whip_whep_receiver = None;
}
let event_emitter = Arc::new(EventEmitter::new());
let pipeline = Pipeline {
outputs: HashMap::new(),
Expand All @@ -207,6 +220,8 @@ impl Pipeline {
renderer,
audio_mixer: AudioMixer::new(opts.output_sample_rate),
is_started: false,
shutdown_whip_whep_sender,
shutdown_whip_whep_receiver,
ctx: PipelineCtx {
output_sample_rate: opts.output_sample_rate,
output_framerate: opts.queue_options.output_framerate,
Expand Down Expand Up @@ -481,6 +496,11 @@ impl Pipeline {

impl Drop for Pipeline {
fn drop(&mut self) {
if let Some(sender) = self.shutdown_whip_whep_sender.take() {
if let Err(err) = sender.send(()) {
warn!("Cannot sent shutdown signal to whip_whep server: {err:?}")
}
}
self.queue.shutdown()
}
}
Expand Down
10 changes: 9 additions & 1 deletion compositor_pipeline/src/pipeline/input/whip/depayloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;

use crate::pipeline::{
decoder,
types::{AudioCodec, EncodedChunk, EncodedChunkKind, VideoCodec},
types::{AudioCodec, EncodedChunk, EncodedChunkKind, IsKeyframe, VideoCodec},
VideoDecoder,
};

Expand Down Expand Up @@ -122,6 +122,7 @@ impl VideoDepayloader {
data: mem::take(buffer).concat().into(),
pts: Duration::from_secs_f64(timestamp as f64 / 90000.0),
dts: None,
is_keyframe: IsKeyframe::Unknown,
kind,
};

Expand All @@ -139,6 +140,12 @@ pub enum AudioDepayloader {
},
}

impl Default for AudioDepayloader {
fn default() -> Self {
Self::new()
}
}

impl AudioDepayloader {
pub fn new() -> Self {
AudioDepayloader::Opus {
Expand Down Expand Up @@ -167,6 +174,7 @@ impl AudioDepayloader {
data: opus_packet,
pts: Duration::from_secs_f64(timestamp as f64 / 48000.0),
dts: None,
is_keyframe: IsKeyframe::NoKeyframes,
kind,
}])
}
Expand Down
67 changes: 56 additions & 11 deletions compositor_pipeline/src/pipeline/whip_whep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ use axum::{
};
use compositor_render::InputId;
use error::WhipServerError;
use handlers::{handle_whip, status, terminate_whip_session, whip_ice_candidates_handler};
use reqwest::StatusCode;
use serde_json::{json, Value};
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex, Weak},
time::{Duration, Instant},
};
use tokio::sync::mpsc;
use tokio::{
signal,
sync::{mpsc, oneshot},
};
use tower_http::cors::CorsLayer;
use tracing::{error, warn};
use webrtc::{
Expand All @@ -30,18 +34,29 @@ use webrtc::{
RTCRtpTransceiverInit,
},
};
use whip_handlers::{
create_whip_session::handle_create_whip_session,
new_whip_ice_candidates::handle_new_whip_ice_candidates,
terminate_whip_session::handle_terminate_whip_session,
};

mod error;
mod handlers;
mod helpers;
mod validate_bearer_token;
mod whip_handlers;

use crate::{queue::PipelineEvent, Pipeline};

use super::EncodedChunk;

pub async fn run_whip_whep_server(pipeline: Weak<Mutex<Pipeline>>) {
let pipeline_ctx = match pipeline.upgrade() {
Some(pipeline) => pipeline.lock().unwrap().ctx.clone(),
let (pipeline_ctx, shutdown_signal_receiver) = match pipeline.upgrade() {
Some(pipeline) => {
let mut locked_pipeline = pipeline.lock().unwrap();
(
locked_pipeline.ctx.clone(),
locked_pipeline.shutdown_whip_whep_receiver.take(),
)
}
None => {
warn!("Pipeline stopped.");
return;
Expand All @@ -51,15 +66,16 @@ pub async fn run_whip_whep_server(pipeline: Weak<Mutex<Pipeline>>) {
if !pipeline_ctx.start_whip_whep {
return;
}
let shutdown_signal_receiver = shutdown_signal_receiver.unwrap(); //it is safe because receiver is None only if start_whip_whep is false

let state = pipeline_ctx.whip_whep_state;
let port = pipeline_ctx.whip_whep_server_port;

let app = Router::new()
.route("/status", get(status))
.route("/whip/:id", post(handle_whip))
.route("/session/:id", patch(whip_ice_candidates_handler))
.route("/session/:id", delete(terminate_whip_session))
.route("/whip/:id", post(handle_create_whip_session))
.route("/session/:id", patch(handle_new_whip_ice_candidates))
.route("/session/:id", delete(handle_terminate_whip_session))
.layer(CorsLayer::permissive())
.with_state(state);

Expand All @@ -69,12 +85,41 @@ pub async fn run_whip_whep_server(pipeline: Weak<Mutex<Pipeline>>) {
return;
};

if let Err(err) = axum::serve(listener, app).await {
if let Err(err) = axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal(shutdown_signal_receiver))
.await
{
error!("Cannot serve WHIP/WHEP server task: {err:?}");
};
}

#[derive(Debug)]
async fn shutdown_signal(receiver: oneshot::Receiver<()>) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

let terminate = async {
if let Err(err) = receiver.await {
warn!(
"Error while receiving whip_whep server shutdown signal {:?}",
err
);
}
};

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
}

pub async fn status() -> Result<(StatusCode, axum::Json<Value>), WhipServerError> {
Ok((StatusCode::OK, axum::Json(json!({}))))
}

#[derive(Debug, Clone)]
pub struct WhipInputConnectionOptions {
pub video_sender: Option<mpsc::Sender<PipelineEvent<EncodedChunk>>>,
pub audio_sender: Option<mpsc::Sender<PipelineEvent<EncodedChunk>>>,
Expand Down
Loading

0 comments on commit 6ed4452

Please sign in to comment.