diff --git a/.github/workflows/platforms.yml b/.github/workflows/platforms.yml index e4e923077..eeed8a4e1 100644 --- a/.github/workflows/platforms.yml +++ b/.github/workflows/platforms.yml @@ -24,15 +24,15 @@ on: env: # MSRV varies by backend due to platform-specific dependencies - MSRV_AAUDIO: "1.82" - MSRV_ALSA: "1.82" + MSRV_AAUDIO: "1.85" + MSRV_ALSA: "1.85" MSRV_COREAUDIO: "1.80" MSRV_JACK: "1.82" MSRV_WASIP1: "1.78" MSRV_WASM: "1.82" MSRV_WINDOWS: "1.82" - PACKAGES_LINUX: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev + PACKAGES_LINUX: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev libpipewire-0.3-dev ANDROID_COMPILE_SDK: "30" ANDROID_BUILD_TOOLS: "30.0.3" @@ -94,10 +94,10 @@ jobs: run: cargo +${{ env.MSRV_ALSA }} check --examples --no-default-features --workspace --verbose - name: Run tests (all features) - run: cargo +${{ steps.msrv.outputs.all-features }} test --all-features --workspace --verbose + run: cargo +${{ steps.msrv.outputs.all-features }} test --features=jack --workspace --verbose - name: Check examples (all features) - run: cargo +${{ steps.msrv.outputs.all-features }} check --examples --all-features --workspace --verbose + run: cargo +${{ steps.msrv.outputs.all-features }} check --examples --features=jack --workspace --verbose # Linux ARMv7 (cross-compilation) linux-armv7: @@ -150,10 +150,10 @@ jobs: run: cross +${{ env.MSRV_ALSA }} test --no-default-features --workspace --verbose --target ${{ env.TARGET }} - name: Run tests (all features) - run: cross +${{ steps.msrv.outputs.all-features }} test --all-features --workspace --verbose --target ${{ env.TARGET }} + run: cross +${{ steps.msrv.outputs.all-features }} test --features=jack --workspace --verbose --target ${{ env.TARGET }} - name: Check examples (all features) - run: cross +${{ steps.msrv.outputs.all-features }} test --all-features --workspace --verbose --target ${{ env.TARGET }} + run: cross +${{ steps.msrv.outputs.all-features }} test --features=jack --workspace --verbose --target ${{ env.TARGET }} # Windows (x86_64 and i686) windows: diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index cd470bfd0..ba55ac99c 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -92,7 +92,7 @@ jobs: if: runner.os == 'Linux' uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev + packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev libpipewire-0.3-dev - name: Setup ASIO SDK if: runner.os == 'Windows' @@ -128,7 +128,7 @@ jobs: - name: Cache Linux audio packages uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev + packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev libpipewire-0.3-dev - name: Install Rust toolchain uses: dtolnay/rust-toolchain@nightly diff --git a/Cargo.toml b/Cargo.toml index fb47eb19a..df3e19eda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ asio = [ # Platform: Linux, DragonFly BSD, FreeBSD, NetBSD, macOS, Windows # Note: JACK must be installed separately on all platforms jack = ["dep:jack"] +pipewire = ["dep:pipewire"] # WebAssembly backend using wasm-bindgen # Enables the Web Audio API backend for browser-based audio @@ -53,6 +54,8 @@ audioworklet = [ # Platform: All platforms custom = [] +default = [] + [dependencies] dasp_sample = "0.11" @@ -89,6 +92,7 @@ alsa = "0.11" libc = "0.2" audio_thread_priority = { version = "0.34", optional = true } jack = { version = "0.13", optional = true } +pipewire = { version = "0.9.2", optional = true, features = ["v0_3_44"]} [target.'cfg(target_vendor = "apple")'.dependencies] mach2 = "0.5" diff --git a/Cross.toml b/Cross.toml index 6d0ad81d1..09b92e9ea 100644 --- a/Cross.toml +++ b/Cross.toml @@ -2,6 +2,4 @@ dockerfile = "Dockerfile" [target.armv7-unknown-linux-gnueabihf.env] -passthrough = [ - "RUSTFLAGS", -] +passthrough = ["RUSTFLAGS"] diff --git a/Dockerfile b/Dockerfile index 8e56a2efd..5405f1417 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,4 +7,5 @@ ENV PKG_CONFIG_PATH=/usr/lib/arm-linux-gnueabihf/pkgconfig/ RUN dpkg --add-architecture armhf && \ apt-get update && \ apt-get install libasound2-dev:armhf -y && \ - apt-get install libjack-jackd2-dev:armhf libjack-jackd2-0:armhf -y \ + apt-get install libjack-jackd2-dev:armhf libjack-jackd2-0:armhf -y +# TODO: now the cross-rs is based on ubuntu:20.04, so it does not contain pipewire-0.3-dev diff --git a/_typos.toml b/_typos.toml new file mode 100644 index 000000000..02d3ec106 --- /dev/null +++ b/_typos.toml @@ -0,0 +1,2 @@ +[default.extend-words] +datas = "datas" diff --git a/examples/beep.rs b/examples/beep.rs index f5e32c1f5..3b9b88a83 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -37,6 +37,19 @@ struct Opt { #[arg(short, long)] #[allow(dead_code)] jack: bool, + /// Use the pipewire host + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + #[arg(short, long)] + #[allow(dead_code)] + pipewire: bool, } fn main() -> anyhow::Result<()> { @@ -64,6 +77,28 @@ fn main() -> anyhow::Result<()> { } else { cpal::default_host() }; + // Conditionally compile with jack if the feature is specified. + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + // Manually check for flags. Can be passed through cargo with -- e.g. + // cargo run --release --example beep --features jack -- --jack + let host = if opt.pipewire { + cpal::host_from_id(cpal::available_hosts() + .into_iter() + .find(|id| *id == cpal::HostId::PipeWire) + .expect( + "make sure --features pipewire is specified. only works on OSes where jack is available", + )).expect("jack host unavailable") + } else { + cpal::default_host() + }; #[cfg(any( not(any( @@ -72,7 +107,7 @@ fn main() -> anyhow::Result<()> { target_os = "freebsd", target_os = "netbsd" )), - not(feature = "jack") + not(any(feature = "jack", feature = "pipewire")) ))] let host = cpal::default_host(); diff --git a/examples/record_wav.rs b/examples/record_wav.rs index 1fea0ee89..9a1ef2374 100644 --- a/examples/record_wav.rs +++ b/examples/record_wav.rs @@ -33,6 +33,19 @@ struct Opt { #[arg(short, long)] #[allow(dead_code)] jack: bool, + /// Use the pipewire host + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + #[arg(short, long)] + #[allow(dead_code)] + pipewire: bool, } fn main() -> Result<(), anyhow::Error> { @@ -49,7 +62,7 @@ fn main() -> Result<(), anyhow::Error> { feature = "jack" ))] // Manually check for flags. Can be passed through cargo with -- e.g. - // cargo run --release --example beep --features jack -- --jack + // cargo run --release --example record_wav --features jack -- --jack let host = if opt.jack { cpal::host_from_id(cpal::available_hosts() .into_iter() @@ -61,6 +74,28 @@ fn main() -> Result<(), anyhow::Error> { cpal::default_host() }; + // Conditionally compile with jack if the feature is specified. + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + // Manually check for flags. Can be passed through cargo with -- e.g. + // cargo run --release --example record_wav --features pipewire -- -- pipewire + let host = if opt.pipewire { + cpal::host_from_id(cpal::available_hosts() + .into_iter() + .find(|id| *id == cpal::HostId::PipeWire) + .expect( + "make sure --features pipewire is specified. only works on OSes where jack is available", + )).expect("jack host unavailable") + } else { + cpal::default_host() + }; #[cfg(any( not(any( target_os = "linux", @@ -68,7 +103,7 @@ fn main() -> Result<(), anyhow::Error> { target_os = "freebsd", target_os = "netbsd" )), - not(feature = "jack") + not(any(feature = "jack", feature = "pipewire")) ))] let host = cpal::default_host(); diff --git a/src/host/mod.rs b/src/host/mod.rs index 58b79bb75..e5ef68f20 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -31,6 +31,11 @@ pub(crate) mod emscripten; ) ))] pub(crate) mod jack; +#[cfg(all( + any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), + feature = "pipewire" +))] +pub(crate) mod pipewire; #[cfg(windows)] pub(crate) mod wasapi; #[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))] diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs new file mode 100644 index 000000000..6e5fe229c --- /dev/null +++ b/src/host/pipewire/device.rs @@ -0,0 +1,683 @@ +use std::time::Duration; +use std::{cell::RefCell, rc::Rc}; + +use crate::host::pipewire::stream::{StreamData, SUPPORTED_FORMATS}; +use crate::{traits::DeviceTrait, DeviceDirection, SupportedStreamConfigRange}; + +use crate::iter::{SupportedInputConfigs, SupportedOutputConfigs}; +use pipewire::{ + self as pw, + metadata::{Metadata, MetadataListener}, + node::{Node, NodeListener}, + proxy::ProxyT, + spa::utils::result::AsyncSeq, +}; + +use std::thread; + +use super::stream::Stream; + +pub type Devices = std::vec::IntoIter; + +/// This enum record whether it is created by human or just default device +#[derive(Clone, Debug, Default, Copy)] +pub(crate) enum ClassType { + #[default] + Node, + DefaultSink, + DefaultInput, + DefaultOutput, +} + +#[derive(Clone, Debug, Default, Copy)] +pub enum Role { + Sink, + #[default] + Source, +} + +#[allow(unused)] +#[derive(Clone, Debug, Default)] +pub struct Device { + id: u32, + node_name: String, + nick_name: String, + description: String, + direction: DeviceDirection, + channels: u16, + limit_quantum: u32, + rate: u32, + allow_rates: Vec, + quantum: u32, + min_quantum: u32, + max_quantum: u32, + class_type: ClassType, + object_id: String, + device_id: String, + role: Role, + icon_name: String, + object_serial: u32, +} + +impl Device { + pub(crate) fn class_type(&self) -> ClassType { + self.class_type + } + fn sink_default() -> Self { + Self { + id: 0, + node_name: "sink_default".to_owned(), + nick_name: "sink_default".to_owned(), + description: "default_sink".to_owned(), + direction: DeviceDirection::Duplex, + channels: 2, + class_type: ClassType::DefaultSink, + role: Role::Sink, + ..Default::default() + } + } + fn input_default() -> Self { + Self { + id: 0, + node_name: "input_default".to_owned(), + nick_name: "input_default".to_owned(), + description: "default_input".to_owned(), + direction: DeviceDirection::Input, + channels: 2, + class_type: ClassType::DefaultInput, + role: Role::Source, + ..Default::default() + } + } + fn output_default() -> Self { + Self { + id: 0, + node_name: "output_default".to_owned(), + nick_name: "output_default".to_owned(), + description: "default_output".to_owned(), + direction: DeviceDirection::Output, + channels: 2, + class_type: ClassType::DefaultOutput, + role: Role::Source, + ..Default::default() + } + } + + fn device_type(&self) -> crate::DeviceType { + match self.icon_name.as_str() { + "audio-headphones" => crate::DeviceType::Headphones, + "audio-input-microphone" => crate::DeviceType::Microphone, + _ => crate::DeviceType::Unknown, + } + } + + pub(crate) fn pw_properties( + &self, + direction: DeviceDirection, + ) -> pw::properties::PropertiesBox { + let mut properties = match direction { + DeviceDirection::Output => pw::properties::properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CATEGORY => "Playback", + *pw::keys::MEDIA_ROLE => "Music", + }, + DeviceDirection::Input => pw::properties::properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CATEGORY => "Capture", + *pw::keys::MEDIA_ROLE => "Music", + }, + _ => unreachable!(), + }; + if matches!(self.role, Role::Sink) { + properties.insert(*pw::keys::STREAM_CAPTURE_SINK, "true"); + } + if matches!(self.class_type, ClassType::Node) { + properties.insert(*pw::keys::TARGET_OBJECT, self.object_serial.to_string()); + } + properties + } +} +impl DeviceTrait for Device { + type Stream = Stream; + type SupportedInputConfigs = SupportedInputConfigs; + type SupportedOutputConfigs = SupportedOutputConfigs; + + fn id(&self) -> Result { + Ok(crate::DeviceId( + crate::HostId::PipeWire, + self.nick_name.clone(), + )) + } + + fn description(&self) -> Result { + Ok(crate::DeviceDescriptionBuilder::new(&self.nick_name) + .direction(self.direction()) + .device_type(self.device_type()) + .build()) + } + + fn supports_input(&self) -> bool { + matches!( + self.direction, + DeviceDirection::Input | DeviceDirection::Duplex + ) + } + + fn supports_output(&self) -> bool { + matches!( + self.direction, + DeviceDirection::Output | DeviceDirection::Duplex + ) + } + + fn supported_input_configs( + &self, + ) -> Result { + if !self.supports_input() { + return Ok(vec![].into_iter()); + } + let rates = if self.allow_rates.is_empty() { + vec![self.rate] + } else { + self.allow_rates.clone() + }; + Ok(rates + .iter() + .flat_map(|&rate| { + SUPPORTED_FORMATS + .iter() + .map(move |sample_format| SupportedStreamConfigRange { + channels: self.channels, + min_sample_rate: rate, + max_sample_rate: rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: *sample_format, + }) + }) + .collect::>() + .into_iter()) + } + fn supported_output_configs( + &self, + ) -> Result { + if !self.supports_output() { + return Ok(vec![].into_iter()); + } + let rates = if self.allow_rates.is_empty() { + vec![self.rate] + } else { + self.allow_rates.clone() + }; + Ok(rates + .iter() + .flat_map(|&rate| { + SUPPORTED_FORMATS + .iter() + .map(move |sample_format| SupportedStreamConfigRange { + channels: self.channels, + min_sample_rate: rate, + max_sample_rate: rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: *sample_format, + }) + }) + .collect::>() + .into_iter()) + } + fn default_input_config( + &self, + ) -> Result { + if !self.supports_input() { + return Err(crate::DefaultStreamConfigError::StreamTypeNotSupported); + } + Ok(crate::SupportedStreamConfig { + channels: self.channels, + sample_format: crate::SampleFormat::F32, + sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + }) + } + + fn default_output_config( + &self, + ) -> Result { + if !self.supports_output() { + return Err(crate::DefaultStreamConfigError::StreamTypeNotSupported); + } + Ok(crate::SupportedStreamConfig { + channels: self.channels, + sample_format: crate::SampleFormat::F32, + sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + }) + } + + fn build_input_stream_raw( + &self, + config: &crate::StreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&crate::Data, &crate::InputCallbackInfo) + Send + 'static, + E: FnMut(crate::StreamError) + Send + 'static, + { + let (pw_play_tx, pw_play_rv) = pw::channel::channel::(); + + let (pw_init_tx, pw_init_rv) = std::sync::mpsc::channel::(); + let device = self.clone(); + let config = config.clone(); + let wait_timeout = timeout.unwrap_or(Duration::from_secs(2)); + let handle = thread::Builder::new() + .name("pw_capture_music_in".to_owned()) + .spawn(move || { + let properties = device.pw_properties(DeviceDirection::Input); + let Ok(StreamData { + mainloop, + listener, + stream, + context, + }) = super::stream::connect_input( + &config, + properties, + sample_format, + data_callback, + error_callback, + ) + else { + let _ = pw_init_tx.send(false); + return; + }; + let _ = pw_init_tx.send(true); + let stream = stream.clone(); + let _receiver = pw_play_rv.attach(mainloop.loop_(), move |play| { + let _ = stream.set_active(play); + }); + mainloop.run(); + drop(listener); + drop(context); + }) + .unwrap(); + match pw_init_rv.recv_timeout(wait_timeout) { + Ok(true) => Ok(Stream { + handle, + controller: pw_play_tx, + }), + Ok(false) | Err(_) => Err(crate::BuildStreamError::StreamConfigNotSupported), + } + } + + fn build_output_stream_raw( + &self, + config: &crate::StreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&mut crate::Data, &crate::OutputCallbackInfo) + Send + 'static, + E: FnMut(crate::StreamError) + Send + 'static, + { + let (pw_play_tx, pw_play_rv) = pw::channel::channel::(); + + let (pw_init_tx, pw_init_rv) = std::sync::mpsc::channel::(); + let device = self.clone(); + let config = config.clone(); + let wait_timeout = timeout.unwrap_or(Duration::from_secs(2)); + let handle = thread::Builder::new() + .name("pw_capture_music_out".to_owned()) + .spawn(move || { + let properties = device.pw_properties(DeviceDirection::Output); + + let Ok(StreamData { + mainloop, + listener, + stream, + context, + }) = super::stream::connect_output( + &config, + properties, + sample_format, + data_callback, + error_callback, + ) + else { + let _ = pw_init_tx.send(false); + return; + }; + + let _ = pw_init_tx.send(true); + let stream = stream.clone(); + let _receiver = pw_play_rv.attach(mainloop.loop_(), move |play| { + let _ = stream.set_active(play); + }); + mainloop.run(); + drop(listener); + drop(context); + }) + .unwrap(); + match pw_init_rv.recv_timeout(wait_timeout) { + Ok(true) => Ok(Stream { + handle, + controller: pw_play_tx, + }), + Ok(false) | Err(_) => Err(crate::BuildStreamError::StreamConfigNotSupported), + } + } +} + +impl Device { + pub fn channels(&self) -> u16 { + self.channels + } + pub fn direction(&self) -> DeviceDirection { + self.direction + } + pub fn node_name(&self) -> &str { + &self.node_name + } + + pub fn limit_quantam(&self) -> u32 { + self.limit_quantum + } + pub fn min_quantum(&self) -> u32 { + self.min_quantum + } + pub fn max_quantum(&self) -> u32 { + self.max_quantum + } + pub fn quantum(&self) -> u32 { + self.quantum + } + pub fn rate(&self) -> u32 { + self.rate + } + pub fn allow_rates(&self) -> &[u32] { + &self.allow_rates + } +} + +#[derive(Debug, Clone, Default)] +struct Settings { + rate: u32, + allow_rates: Vec, + quantum: u32, + min_quantum: u32, + max_quantum: u32, +} + +#[allow(dead_code)] +enum Request { + Node(NodeListener), + Meta(MetadataListener), +} + +impl From for Request { + fn from(value: NodeListener) -> Self { + Self::Node(value) + } +} + +impl From for Request { + fn from(value: MetadataListener) -> Self { + Self::Meta(value) + } +} + +fn init_roundtrip() -> Option> { + let mainloop = pw::main_loop::MainLoopRc::new(None).ok()?; + let context = pw::context::ContextRc::new(&mainloop, None).ok()?; + let core = context.connect_rc(None).ok()?; + let registry = core.get_registry_rc().ok()?; + + // To comply with Rust's safety rules, we wrap this variable in an `Rc` and a `Cell`. + let devices: Rc>> = Rc::new(RefCell::new(vec![ + Device::sink_default(), + Device::input_default(), + Device::output_default(), + ])); + let requests = Rc::new(RefCell::new(vec![])); + let settings = Rc::new(RefCell::new(Settings::default())); + let loop_clone = mainloop.clone(); + + // Trigger the sync event. The server's answer won't be processed until we start the main loop, + // so we can safely do this before setting up a callback. This lets us avoid using a Cell. + let peddings: Rc>> = Rc::new(RefCell::new(vec![])); + let pending = core.sync(0).expect("sync failed"); + + peddings.borrow_mut().push(pending); + + let _listener_core = core + .add_listener_local() + .done({ + let peddings = peddings.clone(); + move |id, seq| { + if id != pw::core::PW_ID_CORE { + return; + } + let mut peddinglist = peddings.borrow_mut(); + let Some(index) = peddinglist.iter().position(|o_seq| *o_seq == seq) else { + return; + }; + peddinglist.remove(index); + if !peddinglist.is_empty() { + return; + } + loop_clone.quit(); + } + }) + .register(); + let _listener_reg = registry + .add_listener_local() + .global({ + let devices = devices.clone(); + let registry = registry.clone(); + let requests = requests.clone(); + let settings = settings.clone(); + move |global| match global.type_ { + pipewire::types::ObjectType::Metadata => { + if !global.props.is_some_and(|props| { + props + .get("metadata.name") + .is_some_and(|name| name == "settings") + }) { + return; + } + let meta_settings: Metadata = registry.bind(global).unwrap(); + let settings = settings.clone(); + let listener = meta_settings + .add_listener_local() + .property(move |_, key, _, value| { + match (key, value) { + (Some("clock.rate"), Some(rate)) => { + let Ok(rate) = rate.parse() else { + return 0; + }; + settings.borrow_mut().rate = rate; + } + (Some("clock.allowed-rates"), Some(list)) => { + let Some(list) = list.strip_prefix("[") else { + return 0; + }; + let Some(list) = list.strip_suffix("]") else { + return 0; + }; + let list = list.trim(); + let list_normalized = list.replace(',', " "); + let list: Vec<&str> = list_normalized + .split(' ') + .filter(|s| !s.is_empty()) + .collect(); + let mut allow_rates = vec![]; + for rate in list { + let Ok(rate) = rate.parse() else { + return 0; + }; + allow_rates.push(rate); + } + settings.borrow_mut().allow_rates = allow_rates; + } + (Some("clock.quantum"), Some(quantum)) => { + let Ok(quantum) = quantum.parse() else { + return 0; + }; + settings.borrow_mut().quantum = quantum; + } + (Some("clock.min-quantum"), Some(min_quantum)) => { + let Ok(min_quantum) = min_quantum.parse() else { + return 0; + }; + settings.borrow_mut().min_quantum = min_quantum; + } + (Some("clock.max-quantum"), Some(max_quantum)) => { + let Ok(max_quantum) = max_quantum.parse() else { + return 0; + }; + settings.borrow_mut().max_quantum = max_quantum; + } + _ => {} + } + 0 + }) + .register(); + let pending = core.sync(0).expect("sync failed"); + peddings.borrow_mut().push(pending); + requests + .borrow_mut() + .push((meta_settings.upcast(), Request::Meta(listener))); + } + pipewire::types::ObjectType::Node => { + let Some(props) = global.props else { + return; + }; + let Some(media_class) = props.get("media.class") else { + return; + }; + if !matches!(media_class, "Audio/Sink" | "Audio/Source") { + return; + } + + let node: Node = registry.bind(global).expect("should ok"); + + let devices = devices.clone(); + let listener = node + .add_listener_local() + .info(move |info| { + let Some(props) = info.props() else { + return; + }; + let Some(media_class) = props.get("media.class") else { + return; + }; + let role = match media_class { + "Audio/Sink" => Role::Sink, + "Audio/Source" => Role::Source, + _ => { + return; + } + }; + let Some(group) = props.get("port.group") else { + return; + }; + let direction = match (group, role) { + ("playback", Role::Sink) => DeviceDirection::Duplex, + ("playback", Role::Source) => DeviceDirection::Input, + ("capture", _) => DeviceDirection::Output, + _ => { + return; + } + }; + let Some(object_id) = props.get("object.id") else { + return; + }; + let Some(device_id) = props.get("device.id") else { + return; + }; + let Some(object_serial) = props + .get("object.serial") + .and_then(|serial| serial.parse().ok()) + else { + return; + }; + let id = info.id(); + let node_name = props.get("node.name").unwrap_or("unknown").to_owned(); + let nick_name = props.get("node.nick").unwrap_or("unknown").to_owned(); + let description = props + .get("node.description") + .unwrap_or("unknown") + .to_owned(); + let channels = props + .get("audio.channels") + .and_then(|channels| channels.parse().ok()) + .unwrap_or(2); + let limit_quantum: u32 = props + .get("clock.quantum-limit") + .and_then(|channels| channels.parse().ok()) + .unwrap_or(0); + let icon_name = props + .get("device.icon_name") + .unwrap_or("default") + .to_owned(); + + let device = Device { + id, + node_name, + nick_name, + description, + direction, + role, + channels, + limit_quantum, + icon_name, + object_id: object_id.to_owned(), + device_id: device_id.to_owned(), + object_serial, + ..Default::default() + }; + devices.borrow_mut().push(device); + }) + .register(); + let pending = core.sync(0).expect("sync failed"); + peddings.borrow_mut().push(pending); + requests + .borrow_mut() + .push((node.upcast(), Request::Node(listener))); + } + _ => {} + } + }) + .register(); + + mainloop.run(); + + let mut devices = devices.take(); + let settings = settings.take(); + for device in devices.iter_mut() { + device.rate = settings.rate; + device.allow_rates = settings.allow_rates.clone(); + device.quantum = settings.quantum; + device.min_quantum = settings.min_quantum; + device.max_quantum = settings.max_quantum; + } + Some(devices) +} + +pub fn init_devices() -> Option> { + let devices = init_roundtrip()?; + Some(devices) +} diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs new file mode 100644 index 000000000..af7c402a0 --- /dev/null +++ b/src/host/pipewire/mod.rs @@ -0,0 +1,38 @@ +use device::{init_devices, ClassType, Device, Devices}; + +use crate::traits::HostTrait; +mod device; +mod stream; +#[derive(Debug)] +pub struct Host(Vec); + +impl Host { + pub fn new() -> Result { + let devices = init_devices().ok_or(crate::HostUnavailable)?; + Ok(Host(devices)) + } +} + +impl HostTrait for Host { + type Devices = Devices; + type Device = Device; + fn is_available() -> bool { + true + } + fn devices(&self) -> Result { + Ok(self.0.clone().into_iter()) + } + + fn default_input_device(&self) -> Option { + self.0 + .iter() + .find(|device| matches!(device.class_type(), ClassType::DefaultSink)) + .cloned() + } + fn default_output_device(&self) -> Option { + self.0 + .iter() + .find(|device| matches!(device.class_type(), ClassType::DefaultOutput)) + .cloned() + } +} diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs new file mode 100644 index 000000000..b99aeede5 --- /dev/null +++ b/src/host/pipewire/stream.rs @@ -0,0 +1,414 @@ +use std::{thread::JoinHandle, time::Instant}; + +use crate::{ + traits::StreamTrait, BackendSpecificError, InputCallbackInfo, OutputCallbackInfo, SampleFormat, + StreamConfig, StreamError, +}; +use pipewire::{ + self as pw, + context::ContextRc, + main_loop::MainLoopRc, + spa::{ + param::{ + format::{MediaSubtype, MediaType}, + format_utils, + }, + pod::Pod, + }, + stream::{StreamListener, StreamRc}, +}; + +use crate::Data; + +#[allow(unused)] +pub struct Stream { + pub(crate) handle: JoinHandle<()>, + pub(crate) controller: pw::channel::Sender, +} + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), crate::PlayStreamError> { + let _ = self.controller.send(true); + Ok(()) + } + fn pause(&self) -> Result<(), crate::PauseStreamError> { + let _ = self.controller.send(false); + Ok(()) + } +} + +pub(crate) const SUPPORTED_FORMATS: &[SampleFormat] = &[ + SampleFormat::I8, + SampleFormat::U8, + SampleFormat::I16, + SampleFormat::U16, + SampleFormat::I24, + SampleFormat::U24, + SampleFormat::I32, + SampleFormat::U32, + SampleFormat::I64, + SampleFormat::U64, + SampleFormat::F32, + SampleFormat::F64, +]; + +impl From for pw::spa::param::audio::AudioFormat { + fn from(value: SampleFormat) -> Self { + match value { + SampleFormat::I8 => Self::S8, + SampleFormat::U8 => Self::U8, + + #[cfg(target_endian = "little")] + SampleFormat::I16 => Self::S16LE, + #[cfg(target_endian = "big")] + SampleFormat::I16 => Self::S16BE, + #[cfg(target_endian = "little")] + SampleFormat::U16 => Self::U16LE, + #[cfg(target_endian = "big")] + SampleFormat::U16 => Self::U16BE, + + #[cfg(target_endian = "little")] + SampleFormat::I24 => Self::S24LE, + #[cfg(target_endian = "big")] + SampleFormat::I24 => Self::S24BE, + #[cfg(target_endian = "little")] + SampleFormat::U24 => Self::U24LE, + #[cfg(target_endian = "big")] + SampleFormat::U24 => Self::U24BE, + #[cfg(target_endian = "little")] + SampleFormat::I32 => Self::S32LE, + #[cfg(target_endian = "big")] + SampleFormat::I32 => Self::S32BE, + #[cfg(target_endian = "little")] + SampleFormat::U32 => Self::U32LE, + #[cfg(target_endian = "big")] + SampleFormat::U32 => Self::U32BE, + #[cfg(target_endian = "little")] + SampleFormat::F32 => Self::F32LE, + #[cfg(target_endian = "big")] + SampleFormat::F32 => Self::F32BE, + #[cfg(target_endian = "little")] + SampleFormat::F64 => Self::F64LE, + #[cfg(target_endian = "big")] + SampleFormat::F64 => Self::F64BE, + SampleFormat::I64 => Self::Unknown, + SampleFormat::U64 => Self::Unknown, + } + } +} + +pub struct UserData { + data_callback: D, + error_callback: E, + sample_format: SampleFormat, + format: pw::spa::param::audio::AudioInfoRaw, + created_instance: Instant, +} + +impl UserData +where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + fn publish_data_in(&mut self, frames: usize, data: &Data) -> Result<(), BackendSpecificError> { + let callback = stream_timestamp_fallback(self.created_instance)?; + let delay_duration = frames_to_duration(frames, self.format.rate()); + let capture = callback + .add(delay_duration) + .ok_or_else(|| BackendSpecificError { + description: "`playback` occurs beyond representation supported by `StreamInstant`" + .to_string(), + })?; + let timestamp = crate::InputStreamTimestamp { callback, capture }; + let info = crate::InputCallbackInfo { timestamp }; + (self.data_callback)(data, &info); + Ok(()) + } +} +impl UserData +where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + fn publish_data_out( + &mut self, + frames: usize, + data: &mut Data, + ) -> Result<(), BackendSpecificError> { + let callback = stream_timestamp_fallback(self.created_instance)?; + let delay_duration = frames_to_duration(frames, self.format.rate()); + let playback = callback + .add(delay_duration) + .ok_or_else(|| BackendSpecificError { + description: "`playback` occurs beyond representation supported by `StreamInstant`" + .to_string(), + })?; + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = crate::OutputCallbackInfo { timestamp }; + (self.data_callback)(data, &info); + Ok(()) + } +} +pub struct StreamData { + pub mainloop: MainLoopRc, + pub listener: StreamListener>, + pub stream: StreamRc, + pub context: ContextRc, +} + +// Use elapsed duration since stream creation as fallback when hardware timestamps are unavailable. +// +// This ensures positive values that are compatible with our `StreamInstant` representation. +#[inline] +fn stream_timestamp_fallback( + creation: std::time::Instant, +) -> Result { + let now = std::time::Instant::now(); + let duration = now.duration_since(creation); + crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(BackendSpecificError { + description: "stream duration has exceeded `StreamInstant` representation".to_string(), + }) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +#[inline] +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + +pub fn connect_output( + config: &StreamConfig, + properties: pw::properties::PropertiesBox, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, +) -> Result, pw::Error> +where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + pw::init(); + let mainloop = pw::main_loop::MainLoopRc::new(None)?; + let context = pw::context::ContextRc::new(&mainloop, None)?; + let core = context.connect_rc(None)?; + + let data = UserData { + data_callback, + error_callback, + sample_format, + format: Default::default(), + created_instance: Instant::now(), + }; + + let stream = pw::stream::StreamRc::new(core, "cpal-playback", properties)?; + let listener = stream + .add_local_listener_with_user_data(data) + .param_changed(|_, user_data, id, param| { + let Some(param) = param else { + return; + }; + if id != pw::spa::param::ParamType::Format.as_raw() { + return; + } + + let (media_type, media_subtype) = match format_utils::parse_format(param) { + Ok(v) => v, + Err(_) => return, + }; + + // only accept raw audio + if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw { + return; + } + + // call a helper function to parse the format for us. + user_data + .format + .parse(param) + .expect("Failed to parse param changed to AudioInfoRaw"); + }) + .process(|stream, user_data| match stream.dequeue_buffer() { + None => (user_data.error_callback)(StreamError::BufferUnderrun), + Some(mut buffer) => { + let datas = buffer.datas_mut(); + if datas.is_empty() { + return; + } + let buf_data = &mut datas[0]; + let n_channels = user_data.format.channels(); + + let Some(samples) = buf_data.data() else { + return; + }; + let stride = user_data.sample_format.sample_size() * n_channels as usize; + let frames = samples.len() / stride; + + let n_samples = samples.len() / user_data.sample_format.sample_size(); + + let data = samples.as_ptr() as *mut (); + let mut data = + unsafe { Data::from_parts(data, n_samples, user_data.sample_format) }; + if let Err(err) = user_data.publish_data_out(frames, &mut data) { + (user_data.error_callback)(StreamError::BackendSpecific { err }); + } + let chunk = buf_data.chunk_mut(); + *chunk.offset_mut() = 0; + *chunk.stride_mut() = stride as i32; + *chunk.size_mut() = (frames * stride) as u32; + } + }) + .register()?; + let mut audio_info = pw::spa::param::audio::AudioInfoRaw::new(); + audio_info.set_format(sample_format.into()); + audio_info.set_rate(config.sample_rate); + audio_info.set_channels(config.channels as u32); + + let obj = pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), + id: pw::spa::param::ParamType::EnumFormat.as_raw(), + properties: audio_info.into(), + }; + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(obj), + ) + .unwrap() + .0 + .into_inner(); + + let mut params = [Pod::from_bytes(&values).unwrap()]; + + /* Now connect this stream. We ask that our process function is + * called in a realtime thread. */ + stream.connect( + pw::spa::utils::Direction::Output, + None, + pw::stream::StreamFlags::AUTOCONNECT + | pw::stream::StreamFlags::MAP_BUFFERS + | pw::stream::StreamFlags::RT_PROCESS, + &mut params, + )?; + + Ok(StreamData { + mainloop, + listener, + stream, + context, + }) +} +pub fn connect_input( + config: &StreamConfig, + properties: pw::properties::PropertiesBox, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, +) -> Result, pw::Error> +where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + pw::init(); + let mainloop = pw::main_loop::MainLoopRc::new(None)?; + let context = pw::context::ContextRc::new(&mainloop, None)?; + let core = context.connect_rc(None)?; + + let data = UserData { + data_callback, + error_callback, + sample_format, + format: Default::default(), + created_instance: Instant::now(), + }; + + let stream = pw::stream::StreamRc::new(core, "cpal-capture", properties)?; + let listener = stream + .add_local_listener_with_user_data(data) + .param_changed(|_, user_data, id, param| { + let Some(param) = param else { + return; + }; + if id != pw::spa::param::ParamType::Format.as_raw() { + return; + } + + let (media_type, media_subtype) = match format_utils::parse_format(param) { + Ok(v) => v, + Err(_) => return, + }; + + // only accept raw audio + if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw { + return; + } + + // call a helper function to parse the format for us. + user_data + .format + .parse(param) + .expect("Failed to parse param changed to AudioInfoRaw"); + }) + .process(|stream, user_data| match stream.dequeue_buffer() { + None => (user_data.error_callback)(StreamError::BufferUnderrun), + Some(mut buffer) => { + let datas = buffer.datas_mut(); + if datas.is_empty() { + return; + } + let data = &mut datas[0]; + let n_channels = user_data.format.channels(); + let n_samples = data.chunk().size() / user_data.sample_format.sample_size() as u32; + let frames = n_samples / n_channels; + + let Some(samples) = data.data() else { + return; + }; + let data = samples.as_ptr() as *mut (); + let data = + unsafe { Data::from_parts(data, n_samples as usize, user_data.sample_format) }; + if let Err(err) = user_data.publish_data_in(frames as usize, &data) { + (user_data.error_callback)(StreamError::BackendSpecific { err }); + } + } + }) + .register()?; + let mut audio_info = pw::spa::param::audio::AudioInfoRaw::new(); + audio_info.set_format(sample_format.into()); + audio_info.set_rate(config.sample_rate); + audio_info.set_channels(config.channels as u32); + + let obj = pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), + id: pw::spa::param::ParamType::EnumFormat.as_raw(), + properties: audio_info.into(), + }; + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(obj), + ) + .unwrap() + .0 + .into_inner(); + + let mut params = [Pod::from_bytes(&values).unwrap()]; + + /* Now connect this stream. We ask that our process function is + * called in a realtime thread. */ + stream.connect( + pw::spa::utils::Direction::Input, + None, + pw::stream::StreamFlags::AUTOCONNECT + | pw::stream::StreamFlags::MAP_BUFFERS + | pw::stream::StreamFlags::RT_PROCESS, + &mut params, + )?; + + Ok(StreamData { + mainloop, + listener, + stream, + context, + }) +} diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 0f62026d7..0c46eea93 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -713,11 +713,25 @@ mod platform_impl { ))) )] pub use crate::host::jack::Host as JackHost; - + #[cfg(feature = "pipewire")] + #[cfg_attr( + docsrs, + doc(cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))) + )] + pub use crate::host::pipewire::Host as PipeWireHost; impl_platform_host!( #[cfg(feature = "jack")] Jack => JackHost, Alsa => AlsaHost, - #[cfg(feature = "custom")] Custom => super::CustomHost + #[cfg(feature = "custom")] Custom => super::CustomHost, + #[cfg(feature = "pipewire")] PipeWire => super::PipeWireHost, ); /// The default host for the current compilation target platform.