From 97405df03be601f4aa2fd873f80a64d96e65169f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Ka=C5=BAmierczak?= Date: Tue, 10 Dec 2024 16:16:21 +0100 Subject: [PATCH] Add WHIP output (#834) Co-authored-by: bartosz rzepa --- Cargo.lock | 1162 ++++++++++++++++- compositor_api/Cargo.toml | 1 + compositor_api/src/types.rs | 1 + .../src/types/from_register_output.rs | 97 ++ compositor_api/src/types/register_output.rs | 38 + compositor_pipeline/Cargo.toml | 5 + compositor_pipeline/src/error.rs | 14 +- compositor_pipeline/src/pipeline.rs | 27 +- compositor_pipeline/src/pipeline/encoder.rs | 14 +- .../src/pipeline/encoder/ffmpeg_h264.rs | 6 +- compositor_pipeline/src/pipeline/output.rs | 47 +- .../src/pipeline/output/mp4.rs | 4 +- .../src/pipeline/output/rtp.rs | 2 +- .../src/pipeline/output/whip.rs | 288 ++++ .../output/whip/establish_peer_connection.rs | 241 ++++ .../output/whip/init_peer_connection.rs | 159 +++ .../src/pipeline/output/whip/packet_stream.rs | 76 ++ .../src/pipeline/output/whip/payloader.rs | 322 +++++ .../src/pipeline/pipeline_output.rs | 114 +- generate/Cargo.toml | 1 + generate/src/compositor_instance.rs | 8 +- .../examples/encoded_channel_output.rs | 15 +- .../manual_graphics_initialization.rs | 4 + .../examples/raw_channel_input.rs | 9 +- .../examples/raw_channel_output.rs | 10 +- integration_tests/examples/whip_client.rs | 75 ++ integration_tests/src/compositor_instance.rs | 8 +- src/config.rs | 17 + src/routes/register_request.rs | 10 +- src/server.rs | 16 +- src/state.rs | 14 +- 31 files changed, 2662 insertions(+), 143 deletions(-) create mode 100644 compositor_pipeline/src/pipeline/output/whip.rs create mode 100644 compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs create mode 100644 compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs create mode 100644 compositor_pipeline/src/pipeline/output/whip/packet_stream.rs create mode 100644 compositor_pipeline/src/pipeline/output/whip/payloader.rs create mode 100644 integration_tests/examples/whip_client.rs diff --git a/Cargo.lock b/Cargo.lock index 7e46e2f1f..b78821882 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,6 +33,41 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aead" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" +dependencies = [ + "crypto-common", + "generic-array", +] + +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "aes-gcm" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + [[package]] name = "ahash" version = "0.8.11" @@ -176,6 +211,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayref" version = "0.3.7" @@ -209,6 +250,83 @@ dependencies = [ "libloading 0.8.0", ] +[[package]] +name = "asn1-rs" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f6fd5ddaf0351dff5b8da21b2fb4ff8e08ddd02857f0bf69c47639106c0fff0" +dependencies = [ + "asn1-rs-derive 0.4.0", + "asn1-rs-impl 0.1.0", + "displaydoc", + "nom", + "num-traits", + "rusticata-macros", + "thiserror", +] + +[[package]] +name = "asn1-rs" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5493c3bedbacf7fd7382c6346bbd66687d12bbaad3a89a2d2c303ee6cf20b048" +dependencies = [ + "asn1-rs-derive 0.5.1", + "asn1-rs-impl 0.2.0", + "displaydoc", + "nom", + "num-traits", + "rusticata-macros", + "thiserror", + "time", +] + +[[package]] +name = "asn1-rs-derive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "726535892e8eae7e70657b4c8ea93d26b8553afb1ce617caee529ef96d7dee6c" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", + "synstructure 0.12.6", +] + +[[package]] +name = "asn1-rs-derive" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", + "synstructure 0.13.1", +] + +[[package]] +name = "asn1-rs-impl" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2777730b2039ac0f95f093556e61b6d26cebed5393ca6f152717777cec3a42ed" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "asn1-rs-impl" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "async-trait" version = "0.1.81" @@ -316,6 +434,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base16ct" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" + [[package]] name = "base64" version = "0.21.2" @@ -328,6 +452,21 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bindgen" version = "0.66.1" @@ -423,6 +562,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "block-sys" version = "0.2.1" @@ -506,6 +654,15 @@ dependencies = [ "wayland-client", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.1.10" @@ -516,6 +673,18 @@ dependencies = [ "libc", ] +[[package]] +name = "ccm" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae3c82e4355234767756212c570e29833699ab63e6ffd161887314cc5b43847" +dependencies = [ + "aead", + "cipher", + "ctr", + "subtle", +] + [[package]] name = "cesu8" version = "1.1.0" @@ -579,6 +748,16 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cca491388666e04d7248af3f60f0c40cfb0991c72205595d7c396e3510207d1a" +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -712,6 +891,7 @@ dependencies = [ "compositor_render", "schemars", "serde", + "tracing", ] [[package]] @@ -743,14 +923,19 @@ dependencies = [ "opus", "rand", "reqwest", - "rtcp", - "rtp", + "rtcp 0.10.1", + "rtp 0.9.0", "rubato", + "serde", + "serde_json", "socket2", "thiserror", + "tokio", "tracing", + "url", "vk-video", - "webrtc-util", + "webrtc", + "webrtc-util 0.8.1", "wgpu", ] @@ -823,6 +1008,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "core-foundation" version = "0.9.4" @@ -895,6 +1086,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.3.2" @@ -953,6 +1159,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "generic-array", + "rand_core", + "subtle", + "zeroize", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -960,15 +1178,51 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", + "rand_core", "typenum", ] +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + [[package]] name = "cursor-icon" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991" +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "fiat-crypto", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "cxx" version = "1.0.126" @@ -1047,6 +1301,53 @@ dependencies = [ "thiserror", ] +[[package]] +name = "der" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + +[[package]] +name = "der-parser" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbd676fbbab537128ef0278adb5576cf363cff6aa22a7b24effe97347cfab61e" +dependencies = [ + "asn1-rs 0.5.2", + "displaydoc", + "nom", + "num-traits", + "rusticata-macros", +] + +[[package]] +name = "der-parser" +version = "9.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cd0a5c643689626bec213c4d8bd4d96acc8ffdb4ad4bb6bc16abf27d5f4b553" +dependencies = [ + "asn1-rs 0.6.2", + "displaydoc", + "nom", + "num-bigint", + "num-traits", + "rusticata-macros", +] + +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "derivative" version = "2.2.0" @@ -1065,7 +1366,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", + "subtle", ] [[package]] @@ -1095,6 +1398,17 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd0c93bb4b0c6d9b77f4435b0ae98c24d17f1c45b2ff844c6151a07256ca923b" +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "dlib" version = "0.5.2" @@ -1125,12 +1439,47 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23d2f3407d9a573d666de4b5bdf10569d73ca9478087346697dcbae6244bfbcd" +[[package]] +name = "ecdsa" +version = "0.16.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" +dependencies = [ + "der", + "digest", + "elliptic-curve", + "rfc6979", + "signature", + "spki", +] + [[package]] name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "elliptic-curve" +version = "0.13.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47" +dependencies = [ + "base16ct", + "crypto-bigint", + "digest", + "ff", + "generic-array", + "group", + "hkdf", + "pem-rfc7468", + "pkcs8", + "rand_core", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "encoding_rs" version = "0.8.32" @@ -1215,6 +1564,16 @@ dependencies = [ "cc", ] +[[package]] +name = "ff" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449" +dependencies = [ + "rand_core", + "subtle", +] + [[package]] name = "ffmpeg-next" version = "7.1.0" @@ -1240,6 +1599,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "flate2" version = "1.0.26" @@ -1365,9 +1730,9 @@ checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] @@ -1384,11 +1749,26 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -1400,6 +1780,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -1435,6 +1826,7 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1461,15 +1853,16 @@ dependencies = [ "pitch-detection", "rand", "reqwest", - "rtcp", - "rtp", + "rtcp 0.10.1", + "rtp 0.9.0", "schemars", "serde", "serde_json", "socket2", + "tokio", "tracing", "tracing-subscriber 0.3.18", - "webrtc-util", + "webrtc-util 0.8.1", ] [[package]] @@ -1480,6 +1873,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", + "zeroize", ] [[package]] @@ -1505,6 +1899,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "ghash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1" +dependencies = [ + "opaque-debug", + "polyval", +] + [[package]] name = "gif" version = "0.12.0" @@ -1624,6 +2028,17 @@ dependencies = [ "bitflags 2.6.0", ] +[[package]] +name = "group" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" +dependencies = [ + "ff", + "rand_core", + "subtle", +] + [[package]] name = "h2" version = "0.4.6" @@ -1682,9 +2097,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.1" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" [[package]] name = "hassle-rs" @@ -1719,6 +2134,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hex-slice" version = "0.1.4" @@ -1731,6 +2152,24 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfa686283ad6dd069f105e5ab091b04c62850d3e4cf5d67debad1933f55023df" +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "1.1.0" @@ -1887,9 +2326,9 @@ dependencies = [ [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -1938,7 +2377,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown 0.15.1", + "hashbrown 0.15.2", +] + +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", ] [[package]] @@ -1962,8 +2411,8 @@ dependencies = [ "rand", "regex", "reqwest", - "rtcp", - "rtp", + "rtcp 0.10.1", + "rtp 0.9.0", "serde", "serde_json", "signal-hook", @@ -1971,10 +2420,30 @@ dependencies = [ "tokio", "tokio-tungstenite", "tracing", - "webrtc-util", + "webrtc-util 0.8.1", "wgpu", ] +[[package]] +name = "interceptor" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4705c00485029e738bea8c9505b5ddb1486a8f3627a953e1e77e6abdf5eef90c" +dependencies = [ + "async-trait", + "bytes", + "log", + "portable-atomic", + "rand", + "rtcp 0.11.0", + "rtp 0.11.0", + "thiserror", + "tokio", + "waitgroup", + "webrtc-srtp", + "webrtc-util 0.9.0", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -2180,7 +2649,7 @@ dependencies = [ "log", "rand", "reqwest", - "rtp", + "rtp 0.9.0", "rubato", "schemars", "serde", @@ -2192,7 +2661,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber 0.3.18", - "webrtc-util", + "webrtc-util 0.8.1", "wgpu", ] @@ -2261,6 +2730,16 @@ dependencies = [ "rawpointer", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.4" @@ -2560,6 +3039,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.45" @@ -2663,12 +3148,27 @@ dependencies = [ "memchr", ] +[[package]] +name = "oid-registry" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8d8034d9489cdaf79228eb9f6a3b8d7bb32ba00d6645ebd48eef4077ceb5bd9" +dependencies = [ + "asn1-rs 0.6.2", +] + [[package]] name = "once_cell" version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "opaque-debug" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" + [[package]] name = "openssl" version = "0.10.55" @@ -2753,6 +3253,30 @@ dependencies = [ "ttf-parser 0.25.0", ] +[[package]] +name = "p256" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b" +dependencies = [ + "ecdsa", + "elliptic-curve", + "primeorder", + "sha2", +] + +[[package]] +name = "p384" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70786f51bcc69f6a4c0360e063a4cac5419ef7c5cd5b3c99ad70f3be5ba79209" +dependencies = [ + "ecdsa", + "elliptic-curve", + "primeorder", + "sha2", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -2788,11 +3312,30 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pem" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae" +dependencies = [ + "base64 0.22.1", + "serde", +] + +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pico-args" @@ -2841,6 +3384,16 @@ dependencies = [ "rustfft", ] +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.27" @@ -2881,6 +3434,30 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22686f4785f02a4fcc856d3b3bb19bf6c8160d103f7a99cc258bddd0251dc7f2" +[[package]] +name = "polyval" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2912,6 +3489,15 @@ dependencies = [ "num-integer", ] +[[package]] +name = "primeorder" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6" +dependencies = [ + "elliptic-curve", +] + [[package]] name = "proc-macro-crate" version = "3.2.0" @@ -3039,6 +3625,20 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rcgen" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54077e1872c46788540de1ea3d7f4ccb1983d12f9aa909b234468676c1a36779" +dependencies = [ + "pem", + "ring", + "rustls-pki-types", + "time", + "x509-parser", + "yasna", +] + [[package]] name = "rctree" version = "0.5.0" @@ -3214,6 +3814,16 @@ dependencies = [ "mpeg4-audio-const", ] +[[package]] +name = "rfc6979" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2" +dependencies = [ + "hmac", + "subtle", +] + [[package]] name = "rgb" version = "0.8.36" @@ -3255,7 +3865,18 @@ checksum = "33648a781874466a62d89e265fee9f17e32bc7d05a256e6cca41bf97eadcd8aa" dependencies = [ "bytes", "thiserror", - "webrtc-util", + "webrtc-util 0.8.1", +] + +[[package]] +name = "rtcp" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc9f775ff89c5fe7f0cc0abafb7c57688ae25ce688f1a52dd88e277616c76ab2" +dependencies = [ + "bytes", + "thiserror", + "webrtc-util 0.9.0", ] [[package]] @@ -3268,7 +3889,21 @@ dependencies = [ "rand", "serde", "thiserror", - "webrtc-util", + "webrtc-util 0.8.1", +] + +[[package]] +name = "rtp" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6870f09b5db96f8b9e7290324673259fd15519ebb7d55acf8e7eb044a9ead6af" +dependencies = [ + "bytes", + "portable-atomic", + "rand", + "serde", + "thiserror", + "webrtc-util 0.9.0", ] [[package]] @@ -3301,6 +3936,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustfft" version = "6.2.0" @@ -3316,6 +3960,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom", +] + [[package]] name = "rustix" version = "0.38.34" @@ -3336,6 +3989,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" dependencies = [ "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", @@ -3495,6 +4149,32 @@ dependencies = [ "tiny-skia 0.11.4", ] +[[package]] +name = "sdp" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13254db766b17451aced321e7397ebf0a446ef0c8d2942b6e67a95815421093f" +dependencies = [ + "rand", + "substring", + "thiserror", + "url", +] + +[[package]] +name = "sec1" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc" +dependencies = [ + "base16ct", + "der", + "generic-array", + "pkcs8", + "subtle", + "zeroize", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -3524,6 +4204,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d369a96f978623eb3dc28807c4852d6cc617fed53da5d3c400feff1ef34a714a" +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.208" @@ -3611,6 +4297,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -3658,6 +4355,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "simba" version = "0.8.1" @@ -3788,6 +4495,16 @@ dependencies = [ "bitflags 2.6.0", ] +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -3815,6 +4532,34 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "stun" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28fad383a1cc63ae141e84e48eaef44a1063e9d9e55bcb8f51a99b886486e01b" +dependencies = [ + "base64 0.21.2", + "crc", + "lazy_static", + "md-5", + "rand", + "ring", + "subtle", + "thiserror", + "tokio", + "url", + "webrtc-util 0.9.0", +] + +[[package]] +name = "substring" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ee6433ecef213b2e72f587ef64a2f5943e7cd16fbd82dbe8bc07486c534c86" +dependencies = [ + "autocfg", +] + [[package]] name = "subtle" version = "2.6.1" @@ -3885,6 +4630,29 @@ dependencies = [ "futures-core", ] +[[package]] +name = "synstructure" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", + "unicode-xid", +] + +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "sys-locale" version = "0.3.1" @@ -3978,6 +4746,37 @@ dependencies = [ "weezl", ] +[[package]] +name = "time" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-skia" version = "0.10.0" @@ -4362,6 +5161,27 @@ dependencies = [ "utf-8", ] +[[package]] +name = "turn" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b000cebd930420ac1ed842c8128e3b3412512dfd5b82657eab035a3f5126acc" +dependencies = [ + "async-trait", + "base64 0.21.2", + "futures", + "log", + "md-5", + "portable-atomic", + "rand", + "ring", + "stun", + "thiserror", + "tokio", + "tokio-util", + "webrtc-util 0.9.0", +] + [[package]] name = "typenum" version = "1.16.0" @@ -4461,6 +5281,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "universal-hash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" +dependencies = [ + "crypto-common", + "subtle", +] + [[package]] name = "untrusted" version = "0.9.0" @@ -4469,9 +5299,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.4.1" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", "idna", @@ -4551,6 +5381,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" @@ -4599,6 +5438,15 @@ dependencies = [ "winit", ] +[[package]] +name = "waitgroup" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1f50000a783467e6c0200f9d10642f4bc424e39efc1b770203e88b488f79292" +dependencies = [ + "atomic-waker", +] + [[package]] name = "walkdir" version = "2.5.0" @@ -4831,6 +5679,194 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webrtc" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b3a840e31c969844714f93b5a87e73ee49f3bc2a4094ab9132c69497eb31db" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "cfg-if", + "hex", + "interceptor", + "lazy_static", + "log", + "portable-atomic", + "rand", + "rcgen", + "regex", + "ring", + "rtcp 0.11.0", + "rtp 0.11.0", + "rustls", + "sdp", + "serde", + "serde_json", + "sha2", + "smol_str", + "stun", + "thiserror", + "time", + "tokio", + "turn", + "url", + "waitgroup", + "webrtc-data", + "webrtc-dtls", + "webrtc-ice", + "webrtc-mdns", + "webrtc-media", + "webrtc-sctp", + "webrtc-srtp", + "webrtc-util 0.9.0", +] + +[[package]] +name = "webrtc-data" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8b7c550f8d35867b72d511640adf5159729b9692899826fe00ba7fa74f0bf70" +dependencies = [ + "bytes", + "log", + "portable-atomic", + "thiserror", + "tokio", + "webrtc-sctp", + "webrtc-util 0.9.0", +] + +[[package]] +name = "webrtc-dtls" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86e5eedbb0375aa04da93fc3a189b49ed3ed9ee844b6997d5aade14fc3e2c26e" +dependencies = [ + "aes", + "aes-gcm", + "async-trait", + "bincode", + "byteorder", + "cbc", + "ccm", + "der-parser 8.2.0", + "hkdf", + "hmac", + "log", + "p256", + "p384", + "portable-atomic", + "rand", + "rand_core", + "rcgen", + "ring", + "rustls", + "sec1", + "serde", + "sha1", + "sha2", + "subtle", + "thiserror", + "tokio", + "webrtc-util 0.9.0", + "x25519-dalek", + "x509-parser", +] + +[[package]] +name = "webrtc-ice" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d4f0ca6d4df8d1bdd34eece61b51b62540840b7a000397bcfb53a7bfcf347c8" +dependencies = [ + "arc-swap", + "async-trait", + "crc", + "log", + "portable-atomic", + "rand", + "serde", + "serde_json", + "stun", + "thiserror", + "tokio", + "turn", + "url", + "uuid", + "waitgroup", + "webrtc-mdns", + "webrtc-util 0.9.0", +] + +[[package]] +name = "webrtc-mdns" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0804694f3b2acfdff48f6df217979b13cb0a00377c63b5effd111daaee7e8c4" +dependencies = [ + "log", + "socket2", + "thiserror", + "tokio", + "webrtc-util 0.9.0", +] + +[[package]] +name = "webrtc-media" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c15b20e98167b22949abc1c20eca7c6d814307d187068fe7a48f0b87a4f6d46" +dependencies = [ + "byteorder", + "bytes", + "rand", + "rtp 0.11.0", + "thiserror", +] + +[[package]] +name = "webrtc-sctp" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d850daa68639b9d7bb16400676e97525d1e52b15b4928240ae2ba0e849817a5" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "crc", + "log", + "portable-atomic", + "rand", + "thiserror", + "tokio", + "webrtc-util 0.9.0", +] + +[[package]] +name = "webrtc-srtp" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbec5da43a62c228d321d93fb12cc9b4d9c03c9b736b0c215be89d8bd0774cfe" +dependencies = [ + "aead", + "aes", + "aes-gcm", + "byteorder", + "bytes", + "ctr", + "hmac", + "log", + "rtcp 0.11.0", + "rtp 0.11.0", + "sha1", + "subtle", + "thiserror", + "tokio", + "webrtc-util 0.9.0", +] + [[package]] name = "webrtc-util" version = "0.8.1" @@ -4851,6 +5887,27 @@ dependencies = [ "winapi", ] +[[package]] +name = "webrtc-util" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc8d9bc631768958ed97b8d68b5d301e63054ae90b09083d43e2fefb939fd77e" +dependencies = [ + "async-trait", + "bitflags 1.3.2", + "bytes", + "ipnet", + "lazy_static", + "libc", + "log", + "nix 0.26.4", + "portable-atomic", + "rand", + "thiserror", + "tokio", + "winapi", +] + [[package]] name = "weezl" version = "0.1.7" @@ -5425,6 +6482,36 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec107c4503ea0b4a98ef47356329af139c0a4f7750e621cf2973cd3385ebcb3d" +[[package]] +name = "x25519-dalek" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7e468321c81fb07fa7f4c636c3972b9100f0346e5b6a9f2bd0603a52f7ed277" +dependencies = [ + "curve25519-dalek", + "rand_core", + "serde", + "zeroize", +] + +[[package]] +name = "x509-parser" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcbc162f30700d6f3f82a24bf7cc62ffe7caea42c0b2cba8bf7f3ae50cf51f69" +dependencies = [ + "asn1-rs 0.6.2", + "data-encoding", + "der-parser 9.0.0", + "lazy_static", + "nom", + "oid-registry", + "ring", + "rusticata-macros", + "thiserror", + "time", +] + [[package]] name = "xcursor" version = "0.3.8" @@ -5468,6 +6555,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec7a2a501ed189703dba8b08142f057e887dfc4b2cc4db2d343ac6376ba3e0b9" +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" +dependencies = [ + "time", +] + [[package]] name = "yazi" version = "0.1.6" @@ -5505,6 +6601,20 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] [[package]] name = "zune-inflate" diff --git a/compositor_api/Cargo.toml b/compositor_api/Cargo.toml index 69af23089..282e0a697 100644 --- a/compositor_api/Cargo.toml +++ b/compositor_api/Cargo.toml @@ -14,6 +14,7 @@ compositor_render = { workspace = true } serde = { workspace = true } schemars = { workspace = true } bytes = { workspace = true } +tracing = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] compositor_pipeline = { workspace = true } diff --git a/compositor_api/src/types.rs b/compositor_api/src/types.rs index b49cb496d..84a263d1e 100644 --- a/compositor_api/src/types.rs +++ b/compositor_api/src/types.rs @@ -40,6 +40,7 @@ pub use component::WebView; pub use register_input::Mp4Input; pub use register_output::Mp4Output; pub use register_output::RtpOutput; +pub use register_output::WhipOutput; pub use register_input::DeckLink; pub use register_input::RtpInput; diff --git a/compositor_api/src/types/from_register_output.rs b/compositor_api/src/types/from_register_output.rs index c4dae365b..8aba4815b 100644 --- a/compositor_api/src/types/from_register_output.rs +++ b/compositor_api/src/types/from_register_output.rs @@ -1,3 +1,4 @@ +use axum::http::HeaderValue; use compositor_pipeline::pipeline::{ self, encoder::{ @@ -9,6 +10,7 @@ use compositor_pipeline::pipeline::{ output::{ self, mp4::{Mp4AudioTrack, Mp4OutputOptions, Mp4VideoTrack}, + whip::WhipAudioOptions, }, }; @@ -173,6 +175,88 @@ impl TryFrom for pipeline::RegisterOutputOptions for pipeline::RegisterOutputOptions { + type Error = TypeError; + + fn try_from(request: WhipOutput) -> Result { + let WhipOutput { + endpoint_url, + bearer_token, + video, + audio, + } = request; + + if video.is_none() && audio.is_none() { + return Err(TypeError::new( + "At least one of \"video\" and \"audio\" fields have to be specified.", + )); + } + let video_codec = video.as_ref().map(|v| match v.encoder { + VideoEncoderOptions::FfmpegH264 { .. } => pipeline::VideoCodec::H264, + }); + let audio_options = audio.as_ref().map(|a| match &a.encoder { + WhipAudioEncoderOptions::Opus { + channels, + preset: _, + } => WhipAudioOptions { + codec: pipeline::AudioCodec::Opus, + channels: match channels { + audio::AudioChannels::Mono => { + compositor_pipeline::audio_mixer::AudioChannels::Mono + } + audio::AudioChannels::Stereo => { + compositor_pipeline::audio_mixer::AudioChannels::Stereo + } + }, + }, + }); + + if let Some(token) = &bearer_token { + if HeaderValue::from_str(format!("Bearer {token}").as_str()).is_err() { + return Err(TypeError::new("Bearer token string is not valid. It must contain only 32-127 ASCII characters")); + }; + } + + let (video_encoder_options, output_video_options) = maybe_video_options(video)?; + let (audio_encoder_options, output_audio_options) = match audio { + Some(OutputWhipAudioOptions { + mixing_strategy, + send_eos_when, + encoder, + initial, + }) => { + let audio_encoder_options: AudioEncoderOptions = encoder.into(); + let output_audio_options = pipeline::OutputAudioOptions { + initial: initial.try_into()?, + end_condition: send_eos_when.unwrap_or_default().try_into()?, + mixing_strategy: mixing_strategy.unwrap_or(MixingStrategy::SumClip).into(), + channels: audio_encoder_options.channels(), + }; + + (Some(audio_encoder_options), Some(output_audio_options)) + } + None => (None, None), + }; + + let output_options = output::OutputOptions { + output_protocol: output::OutputProtocolOptions::Whip(output::whip::WhipSenderOptions { + endpoint_url, + bearer_token, + video: video_codec, + audio: audio_options, + }), + video: video_encoder_options, + audio: audio_encoder_options, + }; + + Ok(Self { + output_options, + video: output_video_options, + audio: output_audio_options, + }) + } +} + fn maybe_video_options( options: Option, ) -> Result< @@ -230,6 +314,19 @@ impl From for pipeline::encoder::AudioEncoderOptions { } } +impl From for pipeline::encoder::AudioEncoderOptions { + fn from(value: WhipAudioEncoderOptions) -> Self { + match value { + WhipAudioEncoderOptions::Opus { channels, preset } => { + AudioEncoderOptions::Opus(encoder::opus::OpusEncoderOptions { + channels: channels.into(), + preset: preset.unwrap_or(OpusEncoderPreset::Voip).into(), + }) + } + } + } +} + impl TryFrom for pipeline::PipelineOutputEndCondition { type Error = TypeError; diff --git a/compositor_api/src/types/register_output.rs b/compositor_api/src/types/register_output.rs index 617d60377..cd8cc26f8 100644 --- a/compositor_api/src/types/register_output.rs +++ b/compositor_api/src/types/register_output.rs @@ -38,6 +38,19 @@ pub struct Mp4Output { pub audio: Option, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct WhipOutput { + /// WHIP server endpoint + pub endpoint_url: String, + // Bearer token + pub bearer_token: Option>, + /// Video track configuration. + pub video: Option, + /// Audio track configuration. + pub audio: Option, +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(deny_unknown_fields)] pub struct OutputVideoOptions { @@ -77,6 +90,19 @@ pub struct OutputMp4AudioOptions { pub initial: Audio, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct OutputWhipAudioOptions { + /// (**default="sum_clip"**) Specifies how audio should be mixed. + pub mixing_strategy: Option, + /// Condition for termination of output stream based on the input streams states. + pub send_eos_when: Option, + /// Audio encoder options. + pub encoder: WhipAudioEncoderOptions, + /// Initial audio mixer configuration for output. + pub initial: Audio, +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)] pub enum VideoEncoderOptions { @@ -108,6 +134,18 @@ pub enum Mp4AudioEncoderOptions { Aac { channels: AudioChannels }, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)] +pub enum WhipAudioEncoderOptions { + Opus { + /// Specifies channels configuration. + channels: AudioChannels, + + /// (**default="voip"**) Specifies preset for audio output encoder. + preset: Option, + }, +} + /// This type defines when end of an input stream should trigger end of the output stream. Only one of those fields can be set at the time. /// Unless specified otherwise the input stream is considered finished/ended when: /// - TCP connection was dropped/closed. diff --git a/compositor_pipeline/Cargo.toml b/compositor_pipeline/Cargo.toml index 488d769cd..37c905f98 100644 --- a/compositor_pipeline/Cargo.toml +++ b/compositor_pipeline/Cargo.toml @@ -31,6 +31,11 @@ rubato = "0.15.0" wgpu = { workspace = true } vk-video = { path = "../vk-video/", optional = true } glyphon = { workspace = true } +webrtc = "0.11.0" +tokio = {workspace = true } +serde_json = { workspace = true } +serde = { workspace = true } +url = "2.5.2" [target.x86_64-unknown-linux-gnu.dependencies] decklink = { path = "../decklink", optional = true } diff --git a/compositor_pipeline/src/error.rs b/compositor_pipeline/src/error.rs index 03dfcace6..3f8d95c7a 100644 --- a/compositor_pipeline/src/error.rs +++ b/compositor_pipeline/src/error.rs @@ -6,7 +6,7 @@ use compositor_render::{ InputId, OutputId, }; -use crate::pipeline::{decoder::AacDecoderError, VideoCodec}; +use crate::pipeline::{decoder::AacDecoderError, output::whip, VideoCodec}; use fdk_aac_sys as fdk; #[derive(Debug, thiserror::Error)] @@ -20,6 +20,9 @@ pub enum InitPipelineError { #[cfg(feature = "vk-video")] #[error(transparent)] VulkanCtxError(#[from] vk_video::VulkanCtxError), + + #[error("Failed to create tokio::Runtime.")] + CreateTokioRuntime(#[source] std::io::Error), } #[derive(Debug, thiserror::Error)] @@ -90,6 +93,15 @@ pub enum OutputInitError { #[error("Failed to register output. FFmpeg error: {0}.")] FfmpegMp4Error(ffmpeg_next::Error), + + #[error("Unkown Whip output error.")] + UnknownWhipError, + + #[error("Whip init timeout exceeded")] + WhipInitTimeout, + + #[error("Failed to init whip output")] + WhipInitError(#[source] Box), } #[derive(Debug, thiserror::Error)] diff --git a/compositor_pipeline/src/pipeline.rs b/compositor_pipeline/src/pipeline.rs index cbadbd247..fe0bf3211 100644 --- a/compositor_pipeline/src/pipeline.rs +++ b/compositor_pipeline/src/pipeline.rs @@ -25,6 +25,8 @@ use input::RawDataInputOptions; use output::EncodedDataOutputOptions; use output::OutputOptions; use output::RawDataOutputOptions; +use pipeline_output::register_pipeline_output; +use tokio::runtime::Runtime; use tracing::{error, info, trace, warn}; use types::RawDataSender; @@ -121,17 +123,21 @@ pub struct Options { pub force_gpu: bool, pub download_root: PathBuf, pub output_sample_rate: u32, + pub stun_servers: Arc>, pub wgpu_features: WgpuFeatures, pub load_system_fonts: Option, pub wgpu_ctx: Option, + pub tokio_rt: Option>, } #[derive(Clone)] pub struct PipelineCtx { pub output_sample_rate: u32, pub output_framerate: Framerate, + pub stun_servers: Arc>, pub download_dir: Arc, pub event_emitter: Arc, + pub tokio_rt: Arc, #[cfg(feature = "vk-video")] pub vulkan_ctx: Option, } @@ -181,6 +187,10 @@ impl Pipeline { .join(format!("live-compositor-{}", rand::random::())); std::fs::create_dir_all(&download_dir).map_err(InitPipelineError::CreateDownloadDir)?; + let tokio_rt = match opts.tokio_rt { + Some(tokio_rt) => tokio_rt, + None => Arc::new(Runtime::new().map_err(InitPipelineError::CreateTokioRuntime)?), + }; let event_emitter = Arc::new(EventEmitter::new()); let pipeline = Pipeline { outputs: HashMap::new(), @@ -192,8 +202,10 @@ impl Pipeline { ctx: PipelineCtx { output_sample_rate: opts.output_sample_rate, output_framerate: opts.queue_options.output_framerate, + stun_servers: opts.stun_servers, download_dir: download_dir.into(), event_emitter, + tokio_rt, #[cfg(feature = "vk-video")] vulkan_ctx: preinitialized_ctx.and_then(|ctx| ctx.vulkan_ctx), }, @@ -252,11 +264,12 @@ impl Pipeline { } pub fn register_output( - &mut self, + pipeline: &Arc>, output_id: OutputId, register_options: RegisterOutputOptions, ) -> Result, RegisterOutputError> { - self.register_pipeline_output( + register_pipeline_output( + pipeline, output_id, ®ister_options.output_options, register_options.video, @@ -265,11 +278,12 @@ impl Pipeline { } pub fn register_encoded_data_output( - &mut self, + pipeline: &Arc>, output_id: OutputId, register_options: RegisterOutputOptions, ) -> Result, RegisterOutputError> { - self.register_pipeline_output( + register_pipeline_output( + pipeline, output_id, ®ister_options.output_options, register_options.video, @@ -278,11 +292,12 @@ impl Pipeline { } pub fn register_raw_data_output( - &mut self, + pipeline: &Arc>, output_id: OutputId, register_options: RegisterOutputOptions, ) -> Result { - self.register_pipeline_output( + register_pipeline_output( + pipeline, output_id, ®ister_options.output_options, register_options.video, diff --git a/compositor_pipeline/src/pipeline/encoder.rs b/compositor_pipeline/src/pipeline/encoder.rs index a770feb36..136542d23 100644 --- a/compositor_pipeline/src/pipeline/encoder.rs +++ b/compositor_pipeline/src/pipeline/encoder.rs @@ -100,6 +100,16 @@ impl Encoder { } } + pub fn keyframe_request_sender(&self) -> Option> { + match self.video.as_ref() { + Some(VideoEncoder::H264(encoder)) => Some(encoder.keyframe_request_sender().clone()), + None => { + error!("Non video encoder received keyframe request."); + None + } + } + } + pub fn samples_batch_sender(&self) -> Option<&Sender>> { match &self.audio { Some(encoder) => Some(encoder.samples_batch_sender()), @@ -138,9 +148,9 @@ impl VideoEncoder { } } - pub fn request_keyframe(&self) { + pub fn keyframe_request_sender(&self) -> Sender<()> { match self { - Self::H264(encoder) => encoder.request_keyframe(), + Self::H264(encoder) => encoder.keyframe_request_sender(), } } } diff --git a/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs b/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs index 9eeb2339c..ad3eb22d1 100644 --- a/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs +++ b/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs @@ -149,10 +149,8 @@ impl LibavH264Encoder { self.resolution } - pub fn request_keyframe(&self) { - if let Err(err) = self.keyframe_req_sender.send(()) { - debug!(%err, "Failed to send keyframe request to the encoder."); - } + pub fn keyframe_request_sender(&self) -> Sender<()> { + self.keyframe_req_sender.clone() } } diff --git a/compositor_pipeline/src/pipeline/output.rs b/compositor_pipeline/src/pipeline/output.rs index e9e988ad0..e9a1105ae 100644 --- a/compositor_pipeline/src/pipeline/output.rs +++ b/compositor_pipeline/src/pipeline/output.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; + use compositor_render::{ error::RequestKeyframeError, Frame, OutputFrameFormat, OutputId, Resolution, }; use crossbeam_channel::{bounded, Receiver, Sender}; use mp4::{Mp4FileWriter, Mp4OutputOptions}; +use tracing::debug; use crate::{audio_mixer::OutputSamples, error::RegisterOutputError, queue::PipelineEvent}; @@ -13,9 +16,11 @@ use super::{ types::EncoderOutputEvent, PipelineCtx, Port, RawDataReceiver, }; +use whip::{WhipSender, WhipSenderOptions}; pub mod mp4; pub mod rtp; +pub mod whip; /// Options to configure public outputs that can be constructed via REST API #[derive(Debug, Clone)] @@ -29,6 +34,7 @@ pub struct OutputOptions { pub enum OutputProtocolOptions { Rtp(RtpSenderOptions), Mp4(Mp4OutputOptions), + Whip(WhipSenderOptions), } /// Options to configure output that sends h264 and opus audio via channel @@ -68,6 +74,10 @@ pub enum Output { writer: Mp4FileWriter, encoder: Encoder, }, + Whip { + sender: WhipSender, + encoder: Encoder, + }, EncodedData { encoder: Encoder, }, @@ -82,7 +92,7 @@ pub(super) trait OutputOptionsExt { fn new_output( &self, output_id: &OutputId, - ctx: &PipelineCtx, + ctx: Arc, ) -> Result<(Output, NewOutputResult), RegisterOutputError>; } @@ -90,7 +100,7 @@ impl OutputOptionsExt> for OutputOptions { fn new_output( &self, output_id: &OutputId, - ctx: &PipelineCtx, + ctx: Arc, ) -> Result<(Output, Option), RegisterOutputError> { let encoder_opts = EncoderOptions { video: self.video.clone(), @@ -114,6 +124,18 @@ impl OutputOptionsExt> for OutputOptions { Ok((Output::Mp4 { writer, encoder }, None)) } + OutputProtocolOptions::Whip(whip_options) => { + let sender = whip::WhipSender::new( + output_id, + whip_options.clone(), + packets, + encoder.keyframe_request_sender(), + ctx, + ) + .map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?; + + Ok((Output::Whip { sender, encoder }, None)) + } } } } @@ -122,7 +144,7 @@ impl OutputOptionsExt> for EncodedDataOutputOptions fn new_output( &self, output_id: &OutputId, - ctx: &PipelineCtx, + ctx: Arc, ) -> Result<(Output, Receiver), RegisterOutputError> { let encoder_opts = EncoderOptions { video: self.video.clone(), @@ -140,7 +162,7 @@ impl OutputOptionsExt for RawDataOutputOptions { fn new_output( &self, _output_id: &OutputId, - _ctx: &PipelineCtx, + _ctx: Arc, ) -> Result<(Output, RawDataReceiver), RegisterOutputError> { let (video_sender, video_receiver, resolution) = match &self.video { Some(opts) => { @@ -175,6 +197,7 @@ impl Output { match &self { Output::Rtp { encoder, .. } => encoder.frame_sender(), Output::Mp4 { encoder, .. } => encoder.frame_sender(), + Output::Whip { encoder, .. } => encoder.frame_sender(), Output::EncodedData { encoder } => encoder.frame_sender(), Output::RawData { video, .. } => video.as_ref(), } @@ -184,6 +207,7 @@ impl Output { match &self { Output::Rtp { encoder, .. } => encoder.samples_batch_sender(), Output::Mp4 { encoder, .. } => encoder.samples_batch_sender(), + Output::Whip { encoder, .. } => encoder.samples_batch_sender(), Output::EncodedData { encoder } => encoder.samples_batch_sender(), Output::RawData { audio, .. } => audio.as_ref(), } @@ -193,6 +217,7 @@ impl Output { match &self { Output::Rtp { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), Output::Mp4 { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), + Output::Whip { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), Output::EncodedData { encoder } => encoder.video.as_ref().map(|v| v.resolution()), Output::RawData { resolution, .. } => *resolution, } @@ -202,15 +227,21 @@ impl Output { let encoder = match &self { Output::Rtp { encoder, .. } => encoder, Output::Mp4 { encoder, .. } => encoder, + Output::Whip { encoder, .. } => encoder, Output::EncodedData { encoder } => encoder, Output::RawData { .. } => return Err(RequestKeyframeError::RawOutput(output_id)), }; - encoder + if encoder .video .as_ref() .ok_or(RequestKeyframeError::NoVideoOutput(output_id))? - .request_keyframe(); + .keyframe_request_sender() + .send(()) + .is_err() + { + debug!("Failed to send keyframe request to the encoder. Channel closed."); + }; Ok(()) } @@ -232,6 +263,10 @@ impl Output { .video .as_ref() .map(|_| OutputFrameFormat::PlanarYuv420Bytes), + Output::Whip { encoder, .. } => encoder + .video + .as_ref() + .map(|_| OutputFrameFormat::PlanarYuv420Bytes), } } } diff --git a/compositor_pipeline/src/pipeline/output/mp4.rs b/compositor_pipeline/src/pipeline/output/mp4.rs index bba8c80c1..147eae348 100644 --- a/compositor_pipeline/src/pipeline/output/mp4.rs +++ b/compositor_pipeline/src/pipeline/output/mp4.rs @@ -1,4 +1,4 @@ -use std::{fs, path::PathBuf, ptr, time::Duration}; +use std::{fs, path::PathBuf, ptr, sync::Arc, time::Duration}; use compositor_render::OutputId; use crossbeam_channel::Receiver; @@ -48,7 +48,7 @@ impl Mp4FileWriter { output_id: OutputId, options: Mp4OutputOptions, packets_receiver: Receiver, - pipeline_ctx: &PipelineCtx, + pipeline_ctx: Arc, ) -> Result { if options.output_path.exists() { let mut old_index = 0; diff --git a/compositor_pipeline/src/pipeline/output/rtp.rs b/compositor_pipeline/src/pipeline/output/rtp.rs index b898e3f45..540a7e136 100644 --- a/compositor_pipeline/src/pipeline/output/rtp.rs +++ b/compositor_pipeline/src/pipeline/output/rtp.rs @@ -50,7 +50,7 @@ impl RtpSender { output_id: &OutputId, options: RtpSenderOptions, packets_receiver: Receiver, - pipeline_ctx: &PipelineCtx, + pipeline_ctx: Arc, ) -> Result<(Self, Port), OutputInitError> { let payloader = Payloader::new(options.video, options.audio); let mtu = match options.connection_options { diff --git a/compositor_pipeline/src/pipeline/output/whip.rs b/compositor_pipeline/src/pipeline/output/whip.rs new file mode 100644 index 000000000..884e35bb4 --- /dev/null +++ b/compositor_pipeline/src/pipeline/output/whip.rs @@ -0,0 +1,288 @@ +use compositor_render::OutputId; +use crossbeam_channel::{Receiver, Sender}; +use establish_peer_connection::connect; +use init_peer_connection::init_peer_connection; +use packet_stream::PacketStream; +use payloader::{Payload, Payloader, PayloadingError}; +use reqwest::{Method, StatusCode}; +use std::{ + sync::{atomic::AtomicBool, Arc}, + thread, + time::{Duration, Instant}, +}; +use tokio::sync::oneshot; +use tracing::{debug, error, span, Instrument, Level}; +use url::{ParseError, Url}; +use webrtc::track::track_local::TrackLocalWriter; + +use crate::{ + audio_mixer::AudioChannels, + error::OutputInitError, + event::Event, + pipeline::{AudioCodec, EncoderOutputEvent, PipelineCtx, VideoCodec}, +}; + +mod establish_peer_connection; +mod init_peer_connection; +mod packet_stream; +mod payloader; + +#[derive(Debug)] +pub struct WhipSender { + pub connection_options: WhipSenderOptions, + should_close: Arc, +} + +#[derive(Debug, Clone)] +pub struct WhipSenderOptions { + pub endpoint_url: String, + pub bearer_token: Option>, + pub video: Option, + pub audio: Option, +} + +#[derive(Debug, Clone, Copy)] +pub struct WhipAudioOptions { + pub codec: AudioCodec, + pub channels: AudioChannels, +} + +#[derive(Debug, Clone)] +pub struct WhipCtx { + output_id: OutputId, + options: WhipSenderOptions, + request_keyframe_sender: Option>, + should_close: Arc, + pipeline_ctx: Arc, +} + +#[derive(Debug, thiserror::Error)] +pub enum WhipError { + #[error("Bad status in WHIP response\nStatus: {0}\nBody: {1}")] + BadStatus(StatusCode, String), + + #[error("WHIP request failed!\nMethod: {0}\nURL: {1}")] + RequestFailed(Method, Url), + + #[error( + "Unable to get location endpoint, check correctness of WHIP endpoint and your Bearer token" + )] + MissingLocationHeader, + + #[error("Invalid endpoint URL: {1}")] + InvalidEndpointUrl(#[source] ParseError, String), + + #[error("Missing Host in endpoint URL")] + MissingHost, + + #[error("Missing port in endpoint URL")] + MissingPort, + + #[error("Failed to create RTC session description: {0}")] + RTCSessionDescriptionError(webrtc::Error), + + #[error("Failed to set local description: {0}")] + LocalDescriptionError(webrtc::Error), + + #[error("Failed to set remote description: {0}")] + RemoteDescriptionError(webrtc::Error), + + #[error("Failed to parse {0} response body: {1}")] + BodyParsingError(&'static str, reqwest::Error), + + #[error("Failed to create offer: {0}")] + OfferCreationError(webrtc::Error), + + #[error(transparent)] + PeerConnectionInitError(#[from] webrtc::Error), + + #[error("Failed to convert ICE candidate to JSON: {0}")] + IceCandidateToJsonError(webrtc::Error), + + #[error(transparent)] + SerdeJsonError(#[from] serde_json::Error), + + #[error(transparent)] + PayloadingError(#[from] PayloadingError), + + #[error("Trickle ICE not supported")] + TrickleIceNotSupported, + + #[error("Entity Tag missing")] + EntityTagMissing, + + #[error("Entity Tag non-matching")] + EntityTagNonMatching, + + #[error("Codec not supported: {0}")] + UnsupportedCodec(&'static str), +} + +const WHIP_INIT_TIMEOUT: Duration = Duration::from_secs(60); + +impl WhipSender { + pub fn new( + output_id: &OutputId, + options: WhipSenderOptions, + packets_receiver: Receiver, + request_keyframe_sender: Option>, + pipeline_ctx: Arc, + ) -> Result { + let payloader = Payloader::new(options.video, options.audio); + let packet_stream = PacketStream::new(packets_receiver, payloader, 1400); + let should_close = Arc::new(AtomicBool::new(false)); + let (init_confirmation_sender, mut init_confirmation_receiver) = + oneshot::channel::>(); + + let whip_ctx = WhipCtx { + output_id: output_id.clone(), + options: options.clone(), + request_keyframe_sender, + should_close: should_close.clone(), + pipeline_ctx: pipeline_ctx.clone(), + }; + + pipeline_ctx.tokio_rt.spawn( + run_whip_sender_task(whip_ctx, packet_stream, init_confirmation_sender).instrument( + span!( + Level::INFO, + "WHIP sender", + output_id = output_id.to_string() + ), + ), + ); + + let start_time = Instant::now(); + loop { + thread::sleep(Duration::from_millis(500)); + let elapsed_time = Instant::now().duration_since(start_time); + if elapsed_time > WHIP_INIT_TIMEOUT { + init_confirmation_receiver.close(); + return Err(OutputInitError::WhipInitTimeout); + } + match init_confirmation_receiver.try_recv() { + Ok(result) => match result { + Ok(_) => break, + Err(err) => return Err(OutputInitError::WhipInitError(err.into())), + }, + Err(err) => match err { + oneshot::error::TryRecvError::Closed => { + return Err(OutputInitError::UnknownWhipError) + } + oneshot::error::TryRecvError::Empty => {} + }, + }; + } + + Ok(Self { + connection_options: options, + should_close, + }) + } +} + +impl Drop for WhipSender { + fn drop(&mut self) { + self.should_close + .store(true, std::sync::atomic::Ordering::Relaxed); + } +} + +async fn run_whip_sender_task( + whip_ctx: WhipCtx, + packet_stream: PacketStream, + init_confirmation_sender: oneshot::Sender>, +) { + let client = Arc::new(reqwest::Client::new()); + let (peer_connection, video_track, audio_track) = match init_peer_connection(&whip_ctx).await { + Ok(pc) => pc, + Err(err) => { + if let Err(Err(err)) = init_confirmation_sender.send(Err(err)) { + error!( + "Error while initializing whip sender thread, couldn't send message, error: {err:?}" + ); + } + return; + } + }; + let whip_session_url = match connect(peer_connection, client.clone(), &whip_ctx).await { + Ok(val) => val, + Err(err) => { + if let Err(Err(err)) = init_confirmation_sender.send(Err(err)) { + error!( + "Error while initializing whip sender thread, couldn't send message, error: {err:?}" + ); + } + return; + } + }; + if let Err(Ok(_)) = init_confirmation_sender.send(Ok(())) { + error!("Whip sender thread initialized successfully, coulnd't send confirmation message."); + return; + } + + for chunk in packet_stream { + if whip_ctx + .should_close + .load(std::sync::atomic::Ordering::Relaxed) + { + break; + } + let chunk = match chunk { + Ok(chunk) => chunk, + Err(err) => { + error!("Failed to payload a packet: {}", err); + continue; + } + }; + + match chunk { + Payload::Video(video_payload) => { + match video_track.clone() { + Some(video_track) => match video_payload { + Ok(video_bytes) => { + if video_track.write(&video_bytes).await.is_err() { + error!("Error occurred while writing to video track, closing connection."); + break; + } + } + Err(err) => { + error!("Error while reading video bytes: {err}"); + } + }, + None => { + error!("Video payload detected on output with no video, shutting down"); + break; + } + } + } + Payload::Audio(audio_payload) => { + match audio_track.clone() { + Some(audio_track) => match audio_payload { + Ok(audio_bytes) => { + if audio_track.write(&audio_bytes).await.is_err() { + error!("Error occurred while writing to audio track, closing connection."); + break; + } + } + Err(err) => { + error!("Error while audio video bytes: {err}"); + } + }, + None => { + error!("Audio payload detected on output with no audio, shutting down"); + break; + } + } + } + } + } + if let Err(err) = client.delete(whip_session_url).send().await { + error!("Error while sending delete whip session request: {}", err); + } + whip_ctx + .pipeline_ctx + .event_emitter + .emit(Event::OutputDone(whip_ctx.output_id)); + debug!("Closing WHIP sender thread.") +} diff --git a/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs b/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs new file mode 100644 index 000000000..900fd3e57 --- /dev/null +++ b/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs @@ -0,0 +1,241 @@ +use super::{WhipCtx, WhipError}; +use compositor_render::error::ErrorStack; +use reqwest::{ + header::{HeaderMap, HeaderValue}, + Client, Method, StatusCode, +}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use tracing::{debug, error, info}; +use url::{ParseError, Url}; +use webrtc::{ + ice_transport::{ice_candidate::RTCIceCandidate, ice_connection_state::RTCIceConnectionState}, + peer_connection::{sdp::session_description::RTCSessionDescription, RTCPeerConnection}, + rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication, +}; + +pub async fn connect( + peer_connection: Arc, + client: Arc, + whip_ctx: &WhipCtx, +) -> Result { + let whip_ctx = whip_ctx.clone(); + peer_connection.on_ice_connection_state_change(Box::new( + move |connection_state: RTCIceConnectionState| { + debug!("Connection State has changed {connection_state}."); + if connection_state == RTCIceConnectionState::Connected { + debug!("Ice connected."); + } else if connection_state == RTCIceConnectionState::Failed { + debug!("Ice connection failed."); + whip_ctx + .should_close + .store(true, std::sync::atomic::Ordering::Relaxed); + } + Box::pin(async {}) + }, + )); + + if let Some(keyframe_sender) = whip_ctx.request_keyframe_sender { + let senders = peer_connection.get_senders().await; + for sender in senders { + let keyframe_sender_clone = keyframe_sender.clone(); + whip_ctx.pipeline_ctx.tokio_rt.spawn(async move { + loop { + if let Ok((packets, _)) = &sender.read_rtcp().await { + for packet in packets { + if packet + .as_any() + .downcast_ref::() + .is_some() + { + if let Err(err) = keyframe_sender_clone.send(()) { + debug!(%err, "Failed to send keyframe request to the encoder."); + }; + } + } + } else { + debug!("Failed to read RTCP packets from the sender."); + } + } + }); + } + } + + let offer = peer_connection + .create_offer(None) + .await + .map_err(WhipError::OfferCreationError)?; + + let endpoint_url = Url::parse(&whip_ctx.options.endpoint_url) + .map_err(|e| WhipError::InvalidEndpointUrl(e, whip_ctx.options.endpoint_url.clone()))?; + + info!("Endpoint url: {}", endpoint_url); + + let mut header_map = HeaderMap::new(); + header_map.append("Content-Type", HeaderValue::from_static("application/sdp")); + + if let Some(token) = &whip_ctx.options.bearer_token { + let header_value_str: HeaderValue = match format!("Bearer {token}").parse() { + Ok(val) => val, + Err(err) => { + error!("Ivalid header token, couldn't parse: {}", err); + HeaderValue::from_static("Bearer") + } + }; + header_map.append("Authorization", header_value_str); + } + + let response = client + .post(endpoint_url.clone()) + .headers(header_map) + .body(offer.sdp.clone()) + .send() + .await + .map_err(|_| WhipError::RequestFailed(Method::POST, endpoint_url.clone()))?; + + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + let answer = &response + .text() + .await + .map_err(|e| WhipError::BodyParsingError("sdp offer", e))?; + return Err(WhipError::BadStatus(status, answer.to_string())); + }; + + let location_url_str = response + .headers() + .get("location") + .and_then(|url| url.to_str().ok()) + .ok_or_else(|| WhipError::MissingLocationHeader)?; + + let port = endpoint_url.port().ok_or_else(|| WhipError::MissingPort)?; + let location_url = match Url::parse(location_url_str) { + Ok(url) => Ok(url), + Err(err) => match err { + ParseError::RelativeUrlWithoutBase => { + let scheme = endpoint_url.scheme(); + let host = endpoint_url + .host_str() + .ok_or_else(|| WhipError::MissingHost)?; + let formatted_url = format!("{}://{}:{}{}", scheme, host, port, location_url_str); + let location_url = Url::try_from(formatted_url.as_str()) + .map_err(|e| WhipError::InvalidEndpointUrl(e, formatted_url))?; + Ok(location_url) + } + _ => Err(WhipError::InvalidEndpointUrl( + err, + location_url_str.to_string(), + )), + }, + }?; + + peer_connection + .set_local_description(offer) + .await + .map_err(WhipError::LocalDescriptionError)?; + + let answer = response + .text() + .await + .map_err(|e| WhipError::BodyParsingError("sdp offer", e))?; + + let rtc_answer = + RTCSessionDescription::answer(answer).map_err(WhipError::RTCSessionDescriptionError)?; + + peer_connection + .set_remote_description(rtc_answer) + .await + .map_err(WhipError::RemoteDescriptionError)?; + + let should_stop = Arc::new(AtomicBool::new(false)); + let location_url_clone = location_url.clone(); + peer_connection.on_ice_candidate(Box::new(move |candidate| { + let bearer_token = whip_ctx.options.bearer_token.clone(); + let client = client.clone(); + let location_url = location_url_clone.clone(); + let should_stop_clone = should_stop.clone(); + Box::pin(async move { + if should_stop_clone.load(Ordering::Relaxed) { + return; + } + if let Some(candidate) = candidate { + if let Err(err) = + handle_candidate(candidate, bearer_token, client, location_url.clone()).await + { + match err { + WhipError::TrickleIceNotSupported => { + info!("Trickle ICE not supported by WHIP server"); + should_stop_clone.store(true, Ordering::Relaxed); + } + WhipError::EntityTagMissing | WhipError::EntityTagNonMatching => { + info!("Entity tags not supported by WHIP output"); + should_stop_clone.store(true, Ordering::Relaxed); + } + _ => error!("{}", ErrorStack::new(&err).into_string()), + } + } + } + }) + })); + + Ok(location_url) +} + +async fn handle_candidate( + candidate: RTCIceCandidate, + bearer_token: Option>, + client: Arc, + location: Url, +) -> Result<(), WhipError> { + let ice_candidate = candidate + .to_json() + .map_err(WhipError::IceCandidateToJsonError)?; + + let mut header_map = HeaderMap::new(); + header_map.append( + "Content-Type", + HeaderValue::from_static("application/trickle-ice-sdpfrag"), + ); + + if let Some(token) = bearer_token { + let header_value_str: HeaderValue = match format!("Bearer {token}").parse() { + Ok(val) => val, + Err(err) => { + error!("Ivalid header token, couldn't parse: {}", err); + HeaderValue::from_static("Bearer") + } + }; + header_map.append("Authorization", header_value_str); + } + + let response = client + .patch(location.clone()) + .headers(header_map) + .body(serde_json::to_string(&ice_candidate)?) + .send() + .await + .map_err(|_| WhipError::RequestFailed(Method::PATCH, location.clone()))?; + + let status = response.status(); + if status.is_server_error() || status.is_client_error() { + let trickle_ice_error = match status { + StatusCode::UNPROCESSABLE_ENTITY | StatusCode::METHOD_NOT_ALLOWED => { + WhipError::TrickleIceNotSupported + } + StatusCode::PRECONDITION_REQUIRED => WhipError::EntityTagMissing, + StatusCode::PRECONDITION_FAILED => WhipError::EntityTagNonMatching, + _ => { + let answer = &response + .text() + .await + .map_err(|e| WhipError::BodyParsingError("ICE Candidate", e))?; + WhipError::BadStatus(status, answer.to_string()) + } + }; + return Err(trickle_ice_error); + }; + + Ok(()) +} diff --git a/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs b/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs new file mode 100644 index 000000000..87ce18e5e --- /dev/null +++ b/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs @@ -0,0 +1,159 @@ +use crate::{ + audio_mixer::AudioChannels, + pipeline::{AudioCodec, VideoCodec}, +}; + +use super::{WhipAudioOptions, WhipCtx, WhipError}; +use std::sync::Arc; +use webrtc::{ + api::{ + interceptor_registry::register_default_interceptors, + media_engine::{MediaEngine, MIME_TYPE_H264, MIME_TYPE_OPUS}, + APIBuilder, + }, + ice_transport::ice_server::RTCIceServer, + interceptor::registry::Registry, + peer_connection::{configuration::RTCConfiguration, RTCPeerConnection}, + rtp_transceiver::{ + rtp_codec::{RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType}, + rtp_transceiver_direction::RTCRtpTransceiverDirection, + }, + track::track_local::track_local_static_rtp::TrackLocalStaticRTP, +}; + +pub async fn init_peer_connection( + whip_ctx: &WhipCtx, +) -> Result< + ( + Arc, + Option>, + Option>, + ), + WhipError, +> { + let mut media_engine = MediaEngine::default(); + media_engine.register_default_codecs()?; + + if let Some(video) = whip_ctx.options.video { + media_engine.register_codec(video_codec_parameters(video), RTPCodecType::Video)?; + } + + if let Some(audio) = whip_ctx.options.audio { + media_engine.register_codec( + audio_codec_parameters(audio, whip_ctx.pipeline_ctx.output_sample_rate)?, + RTPCodecType::Audio, + )?; + } + let mut registry = Registry::new(); + registry = register_default_interceptors(registry, &mut media_engine)?; + let api = APIBuilder::new() + .with_media_engine(media_engine) + .with_interceptor_registry(registry) + .build(); + + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: whip_ctx.pipeline_ctx.stun_servers.to_vec(), + ..Default::default() + }], + ..Default::default() + }; + let peer_connection = Arc::new(api.new_peer_connection(config).await?); + + let video_track = match whip_ctx.options.video { + Some(video) => { + let video_track = Arc::new(TrackLocalStaticRTP::new( + video_codec_capability(video), + "video".to_owned(), + format!("live-compositor-{}-video", whip_ctx.output_id).to_owned(), + )); + peer_connection + .add_track(video_track.clone()) + .await + .map_err(WhipError::PeerConnectionInitError)?; + Some(video_track) + } + None => None, + }; + let audio_track = match whip_ctx.options.audio { + Some(audio_options) => { + let audio_track = Arc::new(TrackLocalStaticRTP::new( + audio_codec_capability(audio_options, whip_ctx.pipeline_ctx.output_sample_rate)?, + "audio".to_owned(), + format!("live-compositor-{}-audio", whip_ctx.output_id).to_owned(), + )); + peer_connection + .add_track(audio_track.clone()) + .await + .map_err(WhipError::PeerConnectionInitError)?; + Some(audio_track) + } + None => None, + }; + let transceivers = peer_connection.get_transceivers().await; + for transceiver in transceivers { + transceiver + .set_direction(RTCRtpTransceiverDirection::Sendonly) + .await; + } + Ok((peer_connection, video_track, audio_track)) +} + +fn video_codec_capability(video: VideoCodec) -> RTCRtpCodecCapability { + match video { + VideoCodec::H264 => RTCRtpCodecCapability { + mime_type: MIME_TYPE_H264.to_owned(), + clock_rate: 90000, + channels: 0, + sdp_fmtp_line: "".to_owned(), + rtcp_feedback: vec![], + }, + } +} + +fn audio_codec_capability( + audio_options: WhipAudioOptions, + sample_rate: u32, +) -> Result { + match audio_options.codec { + AudioCodec::Opus => Ok(RTCRtpCodecCapability { + mime_type: MIME_TYPE_OPUS.to_owned(), + clock_rate: sample_rate, + channels: match audio_options.channels { + AudioChannels::Mono => 1, + AudioChannels::Stereo => 2, + }, + sdp_fmtp_line: "".to_owned(), + rtcp_feedback: vec![], + }), + AudioCodec::Aac => Err(WhipError::UnsupportedCodec("AAC")), + } +} + +fn video_codec_parameters(video: VideoCodec) -> RTCRtpCodecParameters { + let capability = video_codec_capability(video); + let payload_type = match video { + VideoCodec::H264 => 96, + }; + RTCRtpCodecParameters { + capability, + payload_type, + ..Default::default() + } +} + +fn audio_codec_parameters( + audio_options: WhipAudioOptions, + sample_rate: u32, +) -> Result { + let capability = audio_codec_capability(audio_options, sample_rate)?; + let payload_type = match audio_options.codec { + AudioCodec::Aac => return Err(WhipError::UnsupportedCodec("AAC")), + AudioCodec::Opus => 111, + }; + Ok(RTCRtpCodecParameters { + capability, + payload_type, + ..Default::default() + }) +} diff --git a/compositor_pipeline/src/pipeline/output/whip/packet_stream.rs b/compositor_pipeline/src/pipeline/output/whip/packet_stream.rs new file mode 100644 index 000000000..ce594c435 --- /dev/null +++ b/compositor_pipeline/src/pipeline/output/whip/packet_stream.rs @@ -0,0 +1,76 @@ +use std::collections::VecDeque; + +use crossbeam_channel::Receiver; + +use crate::pipeline::types::EncoderOutputEvent; + +use super::payloader::{Payload, Payloader, PayloadingError}; + +pub(super) struct PacketStream { + packets_receiver: Receiver, + state: VecDeque, + payloader: Payloader, + mtu: usize, +} + +impl PacketStream { + pub(super) fn new( + packets_receiver: Receiver, + payloader: Payloader, + mtu: usize, + ) -> Self { + Self { + packets_receiver, + payloader, + mtu, + state: VecDeque::new(), + } + } + + fn next_new_packet(&mut self) -> Option> { + let Ok(packet) = self.packets_receiver.recv() else { + // Send audio and video EOS if payloaders are supported and EOS was not sent before. + match self.payloader.audio_eos() { + Err(PayloadingError::NoAudioPayloader) => (), + Err(PayloadingError::AudioEOSAlreadySent) => (), + packet => return Some(Ok(Payload::Audio(packet))), + } + match self.payloader.video_eos() { + Err(PayloadingError::NoVideoPayloader) => (), + Err(PayloadingError::VideoEOSAlreadySent) => (), + packet => return Some(Ok(Payload::Video(packet))), + } + return None; + }; + + let encoded_chunk = match packet { + EncoderOutputEvent::Data(packet) => packet, + EncoderOutputEvent::AudioEOS => { + return Some(Ok(Payload::Audio(self.payloader.audio_eos()))); + } + EncoderOutputEvent::VideoEOS => { + return Some(Ok(Payload::Video(self.payloader.video_eos()))); + } + }; + + let rtp_packets = match self.payloader.payload(self.mtu, encoded_chunk) { + Ok(packets) => packets, + Err(err) => return Some(Err(err)), + }; + + // I'm assuming here that payload will never return empty list + self.state = rtp_packets; + self.state.pop_front().map(Ok) + } +} + +impl Iterator for PacketStream { + type Item = Result; + + fn next(&mut self) -> Option { + match &mut self.state { + chunks if chunks.is_empty() => self.next_new_packet(), + chunks => chunks.pop_front().map(Ok), + } + } +} diff --git a/compositor_pipeline/src/pipeline/output/whip/payloader.rs b/compositor_pipeline/src/pipeline/output/whip/payloader.rs new file mode 100644 index 000000000..f64a51772 --- /dev/null +++ b/compositor_pipeline/src/pipeline/output/whip/payloader.rs @@ -0,0 +1,322 @@ +use bytes::Bytes; +use std::{collections::VecDeque, fmt::Debug}; +use tracing::error; +use webrtc_util::Marshal; + +use rand::Rng; +use rtp::codecs::{h264::H264Payloader, opus::OpusPayloader}; + +use crate::pipeline::{ + rtp::{AUDIO_PAYLOAD_TYPE, VIDEO_PAYLOAD_TYPE}, + types::{EncodedChunk, EncodedChunkKind}, + AudioCodec, VideoCodec, +}; + +use super::WhipAudioOptions; + +const H264_CLOCK_RATE: u32 = 90000; +const OPUS_CLOCK_RATE: u32 = 48000; + +struct RtpStreamContext { + ssrc: u32, + next_sequence_number: u16, + received_eos: bool, +} + +impl RtpStreamContext { + pub fn new() -> Self { + let mut rng = rand::thread_rng(); + let ssrc = rng.gen::(); + let next_sequence_number = rng.gen::(); + + RtpStreamContext { + ssrc, + next_sequence_number, + received_eos: false, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum PayloadingError { + #[error("Tried to payload video with non video payloader.")] + NoVideoPayloader, + + #[error("Tried to payload audio with non audio payloader.")] + NoAudioPayloader, + + #[error( + "Tried to payload video with codec {:#?} with payloader for codec {:#?}", + chunk_codec, + payloader_codec + )] + NonMatchingVideoCodecs { + chunk_codec: VideoCodec, + payloader_codec: VideoCodec, + }, + + #[error( + "Tried to payload audio with codec {:#?} with payloader for codec {:#?}", + chunk_codec, + payloader_codec + )] + NonMatchingAudioCodecs { + chunk_codec: AudioCodec, + payloader_codec: AudioCodec, + }, + + #[error(transparent)] + RtpLibError(#[from] rtp::Error), + + #[error(transparent)] + MarshalError(#[from] webrtc_util::Error), + + #[error("Audio EOS already sent.")] + AudioEOSAlreadySent, + + #[error("Video EOS already sent.")] + VideoEOSAlreadySent, + + #[error("Unsupported payload type.")] + UnsupportedPayloadType, +} + +pub struct Payloader { + video: Option, + audio: Option, +} + +enum VideoPayloader { + H264 { + payloader: H264Payloader, + context: RtpStreamContext, + }, +} + +enum AudioPayloader { + Opus { + payloader: OpusPayloader, + context: RtpStreamContext, + }, +} + +pub enum Payload { + Video(Result), + Audio(Result), +} + +impl Payloader { + pub fn new(video: Option, audio: Option) -> Self { + Self { + video: video.map(VideoPayloader::new), + audio: audio.map(|audio| AudioPayloader::new(audio.codec)), + } + } + + pub(super) fn payload( + &mut self, + mtu: usize, + data: EncodedChunk, + ) -> Result, PayloadingError> { + match data.kind { + EncodedChunkKind::Video(chunk_codec) => { + let Some(ref mut video_payloader) = self.video else { + return Err(PayloadingError::NoVideoPayloader); + }; + + if video_payloader.codec() != chunk_codec { + return Err(PayloadingError::NonMatchingVideoCodecs { + chunk_codec, + payloader_codec: video_payloader.codec(), + }); + } + + video_payloader.payload(mtu, data) + } + EncodedChunkKind::Audio(chunk_codec) => { + let Some(ref mut audio_payloader) = self.audio else { + return Err(PayloadingError::NoAudioPayloader); + }; + + if audio_payloader.codec() != chunk_codec { + return Err(PayloadingError::NonMatchingAudioCodecs { + chunk_codec, + payloader_codec: audio_payloader.codec(), + }); + } + + audio_payloader.payload(mtu, data) + } + } + } + + pub(super) fn audio_eos(&mut self) -> Result { + self.audio + .as_mut() + .map(|audio| { + let ctx = audio.context_mut(); + if ctx.received_eos { + return Err(PayloadingError::AudioEOSAlreadySent); + } + ctx.received_eos = true; + + let packet = rtcp::goodbye::Goodbye { + sources: vec![ctx.ssrc], + reason: Bytes::from("Unregister output stream"), + }; + packet.marshal().map_err(PayloadingError::MarshalError) + }) + .unwrap_or(Err(PayloadingError::NoAudioPayloader)) + } + + pub(super) fn video_eos(&mut self) -> Result { + self.video + .as_mut() + .map(|video| { + let ctx = video.context_mut(); + if ctx.received_eos { + return Err(PayloadingError::VideoEOSAlreadySent); + } + ctx.received_eos = true; + + let packet = rtcp::goodbye::Goodbye { + sources: vec![ctx.ssrc], + reason: Bytes::from("Unregister output stream"), + }; + packet.marshal().map_err(PayloadingError::MarshalError) + }) + .unwrap_or(Err(PayloadingError::NoVideoPayloader)) + } +} + +impl VideoPayloader { + fn new(codec: VideoCodec) -> Self { + match codec { + VideoCodec::H264 => Self::H264 { + payloader: H264Payloader::default(), + context: RtpStreamContext::new(), + }, + } + } + + fn codec(&self) -> VideoCodec { + match self { + VideoPayloader::H264 { .. } => VideoCodec::H264, + } + } + + fn payload( + &mut self, + mtu: usize, + chunk: EncodedChunk, + ) -> Result, PayloadingError> { + match self { + VideoPayloader::H264 { + ref mut payloader, + ref mut context, + } => payload( + payloader, + context, + chunk, + mtu, + VIDEO_PAYLOAD_TYPE, + H264_CLOCK_RATE, + ), + } + } + + fn context_mut(&mut self) -> &mut RtpStreamContext { + match self { + VideoPayloader::H264 { context, .. } => context, + } + } +} + +impl AudioPayloader { + fn new(codec: AudioCodec) -> Self { + match codec { + AudioCodec::Opus => Self::Opus { + payloader: OpusPayloader, + context: RtpStreamContext::new(), + }, + AudioCodec::Aac => panic!("Aac audio output is not supported yet"), + } + } + + fn codec(&self) -> AudioCodec { + match self { + AudioPayloader::Opus { .. } => AudioCodec::Opus, + } + } + + fn payload( + &mut self, + mtu: usize, + chunk: EncodedChunk, + ) -> Result, PayloadingError> { + match self { + AudioPayloader::Opus { + ref mut payloader, + ref mut context, + } => payload( + payloader, + context, + chunk, + mtu, + AUDIO_PAYLOAD_TYPE, + OPUS_CLOCK_RATE, + ), + } + } + + fn context_mut(&mut self) -> &mut RtpStreamContext { + match self { + AudioPayloader::Opus { context, .. } => context, + } + } +} + +fn payload( + payloader: &mut T, + context: &mut RtpStreamContext, + chunk: EncodedChunk, + mtu: usize, + payload_type: u8, + clock_rate: u32, +) -> Result, PayloadingError> { + let payloads = payloader.payload(mtu, &chunk.data)?; + let packets_amount = payloads.len(); + + payloads + .into_iter() + .enumerate() + .map(|(i, payload)| { + let header = rtp::header::Header { + version: 2, + padding: false, + extension: false, + marker: i == packets_amount - 1, // marker needs to be set on the last packet of each frame + payload_type, + sequence_number: context.next_sequence_number, + timestamp: (chunk.pts.as_secs_f64() * clock_rate as f64) as u32, + ssrc: context.ssrc, + ..Default::default() + }; + context.next_sequence_number = context.next_sequence_number.wrapping_add(1); + + match payload_type { + VIDEO_PAYLOAD_TYPE => { + Ok(Payload::Video(Ok( + rtp::packet::Packet { header, payload }.marshal()? + ))) + } + AUDIO_PAYLOAD_TYPE => { + Ok(Payload::Audio(Ok( + rtp::packet::Packet { header, payload }.marshal()? + ))) + } + _ => Err(PayloadingError::UnsupportedPayloadType), + } + }) + .collect() +} diff --git a/compositor_pipeline/src/pipeline/pipeline_output.rs b/compositor_pipeline/src/pipeline/pipeline_output.rs index cfb695886..2f31f9090 100644 --- a/compositor_pipeline/src/pipeline/pipeline_output.rs +++ b/compositor_pipeline/src/pipeline/pipeline_output.rs @@ -34,73 +34,73 @@ pub(super) enum OutputSender { FinishedSender, } -impl Pipeline { - pub(super) fn register_pipeline_output( - &mut self, - output_id: OutputId, - output_options: &dyn OutputOptionsExt, - video: Option, - audio: Option, - ) -> Result { - let (has_video, has_audio) = (video.is_some(), audio.is_some()); - if !has_video && !has_audio { - return Err(RegisterOutputError::NoVideoAndAudio(output_id)); - } +pub(super) fn register_pipeline_output( + pipeline: &Arc>, + output_id: OutputId, + output_options: &dyn OutputOptionsExt, + video: Option, + audio: Option, +) -> Result { + let (has_video, has_audio) = (video.is_some(), audio.is_some()); + if !has_video && !has_audio { + return Err(RegisterOutputError::NoVideoAndAudio(output_id)); + } - if self.outputs.contains_key(&output_id) { - return Err(RegisterOutputError::AlreadyRegistered(output_id)); - } + if pipeline.lock().unwrap().outputs.contains_key(&output_id) { + return Err(RegisterOutputError::AlreadyRegistered(output_id)); + } - let (output, output_result) = output_options.new_output(&output_id, &self.ctx)?; + let pipeline_ctx = Arc::new(pipeline.lock().unwrap().ctx.clone()); - let output = PipelineOutput { - output, - audio_end_condition: audio.as_ref().map(|audio| { - PipelineOutputEndConditionState::new_audio( - audio.end_condition.clone(), - &self.inputs, - ) - }), - video_end_condition: video.as_ref().map(|video| { - PipelineOutputEndConditionState::new_video( - video.end_condition.clone(), - &self.inputs, - ) - }), - }; + let (output, output_result) = output_options.new_output(&output_id, pipeline_ctx)?; - if let (Some(video_opts), Some(resolution), Some(format)) = ( - video.clone(), - output.output.resolution(), - output.output.output_frame_format(), - ) { - let result = self.renderer.update_scene( - output_id.clone(), - resolution, - format, - video_opts.initial, - ); + let mut guard = pipeline.lock().unwrap(); - if let Err(err) = result { - self.renderer.unregister_output(&output_id); - return Err(RegisterOutputError::SceneError(output_id.clone(), err)); - } - }; + if guard.outputs.contains_key(&output_id) { + return Err(RegisterOutputError::AlreadyRegistered(output_id)); + } - if let Some(audio_opts) = audio.clone() { - self.audio_mixer.register_output( - output_id.clone(), - audio_opts.initial, - audio_opts.mixing_strategy, - audio_opts.channels, - ); - } + let output = PipelineOutput { + output, + audio_end_condition: audio.as_ref().map(|audio| { + PipelineOutputEndConditionState::new_audio(audio.end_condition.clone(), &guard.inputs) + }), + video_end_condition: video.as_ref().map(|video| { + PipelineOutputEndConditionState::new_video(video.end_condition.clone(), &guard.inputs) + }), + }; - self.outputs.insert(output_id.clone(), output); + if let (Some(video_opts), Some(resolution), Some(format)) = ( + video.clone(), + output.output.resolution(), + output.output.output_frame_format(), + ) { + let result = + guard + .renderer + .update_scene(output_id.clone(), resolution, format, video_opts.initial); + + if let Err(err) = result { + guard.renderer.unregister_output(&output_id); + return Err(RegisterOutputError::SceneError(output_id.clone(), err)); + } + }; - Ok(output_result) + if let Some(audio_opts) = audio.clone() { + guard.audio_mixer.register_output( + output_id.clone(), + audio_opts.initial, + audio_opts.mixing_strategy, + audio_opts.channels, + ); } + guard.outputs.insert(output_id.clone(), output); + + Ok(output_result) +} + +impl Pipeline { pub(super) fn all_output_video_senders_iter( pipeline: &Arc>, ) -> impl Iterator>>)> { diff --git a/generate/Cargo.toml b/generate/Cargo.toml index 313c5caee..fc1e3f922 100644 --- a/generate/Cargo.toml +++ b/generate/Cargo.toml @@ -32,3 +32,4 @@ pitch-detection = "0.3.0" rand = { workspace = true } schemars = { workspace = true} tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] } +tokio = { workspace = true } diff --git a/generate/src/compositor_instance.rs b/generate/src/compositor_instance.rs index aa36656ce..8e5c961e5 100644 --- a/generate/src/compositor_instance.rs +++ b/generate/src/compositor_instance.rs @@ -6,11 +6,12 @@ use std::{ env, sync::{ atomic::{AtomicU16, Ordering}, - OnceLock, + Arc, OnceLock, }, thread, time::{Duration, Instant}, }; +use tokio::runtime::Runtime; use tracing::info; pub struct CompositorInstance { @@ -37,12 +38,13 @@ impl CompositorInstance { info!("Starting LiveCompositor Integration Test with config:\n{config:#?}",); let (should_close_sender, should_close_receiver) = crossbeam_channel::bounded(1); - let (state, _event_loop) = ApiState::new(config).unwrap(); + let runtime = Arc::new(Runtime::new().unwrap()); + let (state, _event_loop) = ApiState::new(config, runtime.clone()).unwrap(); thread::Builder::new() .name("HTTP server startup thread".to_string()) .spawn(move || { - run_api(state, should_close_receiver).unwrap(); + run_api(state, runtime.clone(), should_close_receiver).unwrap(); }) .unwrap(); diff --git a/integration_tests/examples/encoded_channel_output.rs b/integration_tests/examples/encoded_channel_output.rs index 715235591..9795ac107 100644 --- a/integration_tests/examples/encoded_channel_output.rs +++ b/integration_tests/examples/encoded_channel_output.rs @@ -1,5 +1,5 @@ use core::panic; -use std::{fs::File, io::Write, path::PathBuf, time::Duration}; +use std::{fs::File, io::Write, path::PathBuf, sync::Arc, time::Duration}; use compositor_pipeline::{ audio_mixer::{AudioChannels, AudioMixingParams, InputParams, MixingStrategy}, @@ -24,6 +24,7 @@ use compositor_render::{ }; use integration_tests::examples::download_file; use live_compositor::{config::read_config, logger, state::ApiState}; +use tokio::runtime::Runtime; const BUNNY_FILE_URL: &str = "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"; @@ -39,7 +40,8 @@ fn main() { let root_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); config.queue_options.ahead_of_time_processing = true; // no chromium support, so we can ignore _event_loop - let (state, _event_loop) = ApiState::new(config).unwrap_or_else(|err| { + let runtime = Arc::new(Runtime::new().unwrap()); + let (state, _event_loop) = ApiState::new(config, runtime).unwrap_or_else(|err| { panic!( "Failed to start compositor.\n{}", ErrorStack::new(&err).into_string() @@ -102,12 +104,9 @@ fn main() { Pipeline::register_input(&state.pipeline, input_id.clone(), input_options).unwrap(); - let output_receiver = state - .pipeline - .lock() - .unwrap() - .register_encoded_data_output(output_id.clone(), output_options) - .unwrap(); + let output_receiver = + Pipeline::register_encoded_data_output(&state.pipeline, output_id.clone(), output_options) + .unwrap(); Pipeline::start(&state.pipeline); diff --git a/integration_tests/examples/manual_graphics_initialization.rs b/integration_tests/examples/manual_graphics_initialization.rs index 2ae7e096f..ab958a415 100644 --- a/integration_tests/examples/manual_graphics_initialization.rs +++ b/integration_tests/examples/manual_graphics_initialization.rs @@ -8,6 +8,8 @@ fn main() { Pipeline, }; use live_compositor::config::read_config; + use std::sync::Arc; + use tokio::runtime::Runtime; let graphics_context = GraphicsContext::new( false, @@ -32,8 +34,10 @@ fn main() { force_gpu: config.force_gpu, download_root: config.download_root, output_sample_rate: config.output_sample_rate, + stun_servers: config.stun_servers, wgpu_features: config.required_wgpu_features, load_system_fonts: Some(true), + tokio_rt: Some(Arc::new(Runtime::new().unwrap())), }) .unwrap(); } diff --git a/integration_tests/examples/raw_channel_input.rs b/integration_tests/examples/raw_channel_input.rs index 90b0ec577..e2cf3d716 100644 --- a/integration_tests/examples/raw_channel_input.rs +++ b/integration_tests/examples/raw_channel_input.rs @@ -32,6 +32,7 @@ use live_compositor::{ config::read_config, logger::{self}, }; +use tokio::runtime::Runtime; const VIDEO_OUTPUT_PORT: u16 = 8002; @@ -50,9 +51,11 @@ fn main() { force_gpu: config.force_gpu, download_root: config.download_root, output_sample_rate: config.output_sample_rate, + stun_servers: config.stun_servers, wgpu_features: config.required_wgpu_features, load_system_fonts: Some(true), wgpu_ctx: Some(ctx), + tokio_rt: Some(Arc::new(Runtime::new().unwrap())), }) .unwrap_or_else(|err| { panic!( @@ -108,11 +111,7 @@ fn main() { ) .unwrap(); - pipeline - .lock() - .unwrap() - .register_output(output_id.clone(), output_options) - .unwrap(); + Pipeline::register_output(&pipeline, output_id.clone(), output_options).unwrap(); let frames = generate_frames(&wgpu_device, &wgpu_queue); diff --git a/integration_tests/examples/raw_channel_output.rs b/integration_tests/examples/raw_channel_output.rs index ec8e11b74..fdac94e48 100644 --- a/integration_tests/examples/raw_channel_output.rs +++ b/integration_tests/examples/raw_channel_output.rs @@ -34,6 +34,7 @@ use live_compositor::{ config::read_config, logger::{self}, }; +use tokio::runtime::Runtime; const BUNNY_FILE_URL: &str = "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"; @@ -63,9 +64,11 @@ fn main() { force_gpu: config.force_gpu, download_root: config.download_root, output_sample_rate: config.output_sample_rate, + stun_servers: config.stun_servers, wgpu_features: config.required_wgpu_features, load_system_fonts: Some(true), wgpu_ctx: Some(ctx), + tokio_rt: Some(Arc::new(Runtime::new().unwrap())), }) .unwrap_or_else(|err| { panic!( @@ -124,11 +127,8 @@ fn main() { Pipeline::register_input(&pipeline, input_id.clone(), input_options).unwrap(); - let RawDataReceiver { video, audio } = pipeline - .lock() - .unwrap() - .register_raw_data_output(output_id.clone(), output_options) - .unwrap(); + let RawDataReceiver { video, audio } = + Pipeline::register_raw_data_output(&pipeline, output_id.clone(), output_options).unwrap(); Pipeline::start(&pipeline); diff --git a/integration_tests/examples/whip_client.rs b/integration_tests/examples/whip_client.rs new file mode 100644 index 000000000..e528285f4 --- /dev/null +++ b/integration_tests/examples/whip_client.rs @@ -0,0 +1,75 @@ +use anyhow::{anyhow, Result}; +use compositor_api::types::Resolution; +use serde_json::json; +use std::{env, time::Duration}; + +use integration_tests::examples::{self, run_example}; + +const BUNNY_URL: &str = + "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"; + +const VIDEO_RESOLUTION: Resolution = Resolution { + width: 1280, + height: 720, +}; + +fn main() { + run_example(client_code); +} + +fn client_code() -> Result<()> { + examples::post( + "input/input_1/register", + &json!({ + "type": "mp4", + "url": BUNNY_URL, + "required": true, + "offset_ms": 0, + }), + )?; + + let token = env::var("LIVE_COMPOSITOR_WHIP_CLIENT_EXAMPLE_TOKEN").map_err(|err| anyhow!("Couldn't read LIVE_COMPOSITOR_WHIP_CLIENT_EXAMPLE_TOKEN environmental variable. You must provide it in order to run `whip_client` example. Read env error: {}", err))?; + + examples::post( + "output/output_1/register", + &json!({ + "type": "whip", + "endpoint_url": "https://g.webrtc.live-video.net:4443/v2/offer", // Twitch WHIP endpoint URL + "bearer_token": token, + "video": { + "resolution": { + "width": VIDEO_RESOLUTION.width, + "height": VIDEO_RESOLUTION.height, + }, + "encoder": { + "type": "ffmpeg_h264", + "preset": "medium" + }, + "initial": { + "root": { + "id": "input_1", + "type": "input_stream", + "input_id": "input_1", + } + } + }, + "audio": { + "encoder": { + "type": "opus", + "channels": "stereo", + }, + "initial": { + "inputs": [ + {"input_id": "input_1"} + ] + } + } + }), + )?; + + std::thread::sleep(Duration::from_millis(500)); + + examples::post("start", &json!({}))?; + + Ok(()) +} diff --git a/integration_tests/src/compositor_instance.rs b/integration_tests/src/compositor_instance.rs index 9f90e4244..bb3967ff6 100644 --- a/integration_tests/src/compositor_instance.rs +++ b/integration_tests/src/compositor_instance.rs @@ -11,11 +11,12 @@ use std::{ env, sync::{ atomic::{AtomicU16, Ordering}, - OnceLock, + Arc, OnceLock, }, thread, time::{Duration, Instant}, }; +use tokio::runtime::Runtime; use tracing::info; pub struct CompositorInstance { @@ -41,12 +42,13 @@ impl CompositorInstance { info!("Starting LiveCompositor Integration Test with config:\n{config:#?}",); let (should_close_sender, should_close_receiver) = crossbeam_channel::bounded(1); - let (state, _event_loop) = ApiState::new(config).unwrap(); + let runtime = Arc::new(Runtime::new().unwrap()); + let (state, _event_loop) = ApiState::new(config, runtime.clone()).unwrap(); thread::Builder::new() .name("HTTP server startup thread".to_string()) .spawn(move || { - run_api(state, should_close_receiver).unwrap(); + run_api(state, runtime.clone(), should_close_receiver).unwrap(); }) .unwrap(); diff --git a/src/config.rs b/src/config.rs index 1869cb65e..c6a11c91c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,6 +9,7 @@ use std::{ use compositor_pipeline::queue::{self, QueueOptions}; use compositor_render::{web_renderer::WebRendererInitOptions, Framerate, WgpuFeatures}; use rand::Rng; +use tracing::error; use crate::logger::FfmpegLogLevel; @@ -23,6 +24,7 @@ pub struct Config { pub download_root: PathBuf, pub queue_options: QueueOptions, pub output_sample_rate: u32, + pub stun_servers: Arc>, pub required_wgpu_features: WgpuFeatures, } @@ -194,6 +196,20 @@ fn try_read_config() -> Result { Err(_) => None, }; + let default_stun_servers = Arc::new(vec!["stun:stun.l.google.com:19302".to_string()]); + + let stun_servers = match env::var("LIVE_COMPOSITOR_STUN_SERVERS") { + Ok(var) => { + if var.is_empty() { + error!("empty stun servers env"); + Arc::new(Vec::new()) + } else { + Arc::new(var.split(',').map(String::from).collect()) + } + } + Err(_) => default_stun_servers, + }; + let config = Config { instance_id, api_port, @@ -218,6 +234,7 @@ fn try_read_config() -> Result { }, download_root, output_sample_rate, + stun_servers, required_wgpu_features, }; Ok(config) diff --git a/src/routes/register_request.rs b/src/routes/register_request.rs index eac4aac42..9fcff502d 100644 --- a/src/routes/register_request.rs +++ b/src/routes/register_request.rs @@ -11,7 +11,7 @@ use compositor_api::{ error::ApiError, types::{ DeckLink, ImageSpec, InputId, Mp4Input, Mp4Output, OutputId, RendererId, RtpInput, - RtpOutput, ShaderSpec, WebRendererSpec, + RtpOutput, ShaderSpec, WebRendererSpec, WhipOutput, }, }; @@ -31,6 +31,7 @@ pub enum RegisterInput { pub enum RegisterOutput { RtpStream(RtpOutput), Mp4(Mp4Output), + Whip(WhipOutput), } pub(super) async fn handle_input( @@ -79,10 +80,13 @@ pub(super) async fn handle_output( tokio::task::spawn_blocking(move || { let response = match request { RegisterOutput::RtpStream(rtp) => { - Pipeline::register_output(&mut api.pipeline(), output_id.into(), rtp.try_into()?)? + Pipeline::register_output(&api.pipeline, output_id.into(), rtp.try_into()?)? } RegisterOutput::Mp4(mp4) => { - Pipeline::register_output(&mut api.pipeline(), output_id.into(), mp4.try_into()?)? + Pipeline::register_output(&api.pipeline, output_id.into(), mp4.try_into()?)? + } + RegisterOutput::Whip(whip) => { + Pipeline::register_output(&api.pipeline, output_id.into(), whip.try_into()?)? } }; match response { diff --git a/src/server.rs b/src/server.rs index f99ce60d0..9a63281c2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,7 @@ use log::info; use signal_hook::{consts, iterator::Signals}; use tracing::error; -use std::{net::SocketAddr, process, thread}; +use std::{net::SocketAddr, process, sync::Arc, thread}; use tokio::runtime::Runtime; use crate::{config::read_config, logger::init_logger, routes::routes, state::ApiState}; @@ -15,7 +15,8 @@ pub fn run() { init_logger(config.logger.clone()); info!("Starting LiveCompositor with config:\n{:#?}", config); - let (state, event_loop) = ApiState::new(config).unwrap_or_else(|err| { + let runtime = Arc::new(Runtime::new().unwrap()); + let (state, event_loop) = ApiState::new(config, runtime.clone()).unwrap_or_else(|err| { panic!( "Failed to start event loop.\n{}", ErrorStack::new(&err).into_string() @@ -26,7 +27,7 @@ pub fn run() { .name("HTTP server startup thread".to_string()) .spawn(move || { let (_should_close_sender, should_close_receiver) = crossbeam_channel::bounded(1); - if let Err(err) = run_api(state, should_close_receiver) { + if let Err(err) = run_api(state, runtime, should_close_receiver) { error!(%err); process::exit(1); } @@ -44,9 +45,12 @@ pub fn run() { } } -pub fn run_api(state: ApiState, should_close: Receiver<()>) -> tokio::io::Result<()> { - let rt = Runtime::new().unwrap(); - rt.block_on(async { +pub fn run_api( + state: ApiState, + runtime: Arc, + should_close: Receiver<()>, +) -> tokio::io::Result<()> { + runtime.block_on(async { let port = state.config.api_port; let app = routes(state); let listener = diff --git a/src/state.rs b/src/state.rs index eca2de842..7b2464f31 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,13 +1,11 @@ use std::sync::{Arc, Mutex, MutexGuard}; use axum::response::IntoResponse; -use compositor_pipeline::{ - error::InitPipelineError, - pipeline::{self}, -}; +use compositor_pipeline::{error::InitPipelineError, pipeline}; use compositor_render::EventLoop; use serde::Serialize; +use tokio::runtime::Runtime; use crate::config::Config; @@ -39,7 +37,10 @@ pub struct ApiState { } impl ApiState { - pub fn new(config: Config) -> Result<(ApiState, Arc), InitPipelineError> { + pub fn new( + config: Config, + runtime: Arc, + ) -> Result<(ApiState, Arc), InitPipelineError> { let Config { queue_options, stream_fallback_timeout, @@ -47,6 +48,7 @@ impl ApiState { force_gpu, download_root, output_sample_rate, + stun_servers, required_wgpu_features, .. } = config.clone(); @@ -57,9 +59,11 @@ impl ApiState { force_gpu, download_root, output_sample_rate, + stun_servers, wgpu_features: required_wgpu_features, wgpu_ctx: None, load_system_fonts: Some(true), + tokio_rt: Some(runtime), })?; Ok(( ApiState {