Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ specta = "2.0.0-rc.22"
specta-typescript = "0.0.9"
tauri-specta = "2.0.0-rc.21"

cidre = { git = "https://github.com/yury/cidre", rev = "a9587fa" }
cidre = "0.11.4"
cpal = "0.15.3"
dasp = "0.11.0"
flume = "0.11.1"
Expand Down
5 changes: 5 additions & 0 deletions crates/audio-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ futures-util = { workspace = true }
kalosm-sound = { workspace = true, default-features = false }
thiserror = { workspace = true }

dasp = { workspace = true }
hound = { workspace = true }
rodio = { workspace = true }
rubato = "0.16.2"
vorbis_rs = { workspace = true }

[dev-dependencies]
hypr-data = { workspace = true }
tokio = { workspace = true, features = ["full"] }
2 changes: 0 additions & 2 deletions crates/audio-utils/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ pub enum Error {
Hound(#[from] hound::Error),
#[error(transparent)]
Vorbis(#[from] vorbis_rs::VorbisError),
#[error("vorbis channel count mismatch (expected {expected}, actual {actual})")]
ChannelCountMismatch { expected: u8, actual: u8 },
#[error("vorbis channel data length mismatch for channel {channel}")]
ChannelDataLengthMismatch { channel: usize },
#[error("unsupported channel count {count}")]
Expand Down
5 changes: 4 additions & 1 deletion crates/audio-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use futures_util::{Stream, StreamExt};
use kalosm_sound::AsyncSource;

mod error;
pub use error::*;
mod resampler;
mod vorbis;

pub use error::*;
pub use resampler::*;
pub use vorbis::*;

pub use rodio::Source;
Expand Down
194 changes: 194 additions & 0 deletions crates/audio-utils/src/resampler/driver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
use std::collections::VecDeque;

use rubato::Resampler;

/// Wraps a rubato Resampler with queues to enable sample-by-sample input and fixed-size chunk output.
/// Manages buffering between the streaming input and the block-based resampler requirements.
pub(crate) struct RubatoChunkResampler<R: Resampler<f32>, const CHANNELS: usize> {
resampler: R,
output_chunk_size: usize,
input_block_size: usize,
input_queue: VecDeque<f32>,
rubato_input_buffer: Vec<Vec<f32>>,
rubato_output_buffer: Vec<Vec<f32>>,
output_queue: VecDeque<f32>,
}

impl<R: Resampler<f32>, const CHANNELS: usize> RubatoChunkResampler<R, CHANNELS> {
/// Creates a new wrapper with pre-allocated buffers sized for the resampler's requirements.
/// Allocates capacity upfront to avoid reallocations during audio processing.
pub(crate) fn new(resampler: R, output_chunk_size: usize, input_block_size: usize) -> Self {
let rubato_input_buffer = resampler.input_buffer_allocate(false);
let rubato_output_buffer = resampler.output_buffer_allocate(true);
let output_queue_capacity = resampler.output_frames_max().max(output_chunk_size);
let input_queue_capacity = input_block_size.max(1) * CHANNELS;

Self {
resampler,
output_chunk_size,
input_block_size,
input_queue: VecDeque::with_capacity(input_queue_capacity),
rubato_input_buffer,
rubato_output_buffer,
output_queue: VecDeque::with_capacity(output_queue_capacity),
}
}

/// Checks whether any resampled output is available.
pub(crate) fn output_is_empty(&self) -> bool {
self.output_queue.is_empty()
}

/// Checks whether at least one full output chunk is ready to be consumed.
pub(crate) fn has_full_chunk(&self) -> bool {
self.output_queue.len() >= self.output_chunk_size
}

/// Extracts exactly one output chunk if available, leaving the rest in the queue.
/// Returns None if insufficient samples are available.
pub(crate) fn take_full_chunk(&mut self) -> Option<Vec<f32>> {
if self.output_queue.len() >= self.output_chunk_size {
Some(self.output_queue.drain(..self.output_chunk_size).collect())
} else {
None
}
}

/// Drains all available output samples regardless of chunk boundaries.
/// Used when flushing remaining samples at stream end.
pub(crate) fn take_all_output(&mut self) -> Option<Vec<f32>> {
if self.output_queue.is_empty() {
None
} else {
Some(self.output_queue.drain(..).collect())
}
}

/// Checks whether any input samples are waiting to be processed.
pub(crate) fn has_input(&self) -> bool {
!self.input_queue.is_empty()
}

/// Queues a single input sample for resampling.
pub(crate) fn push_sample(&mut self, sample: f32) {
self.input_queue.push_back(sample);
}

/// Processes all complete input blocks currently available in the queue.
/// Stops when insufficient input remains for another block.
/// Returns whether any output was produced.
pub(crate) fn process_all_ready_blocks(&mut self) -> Result<bool, crate::Error> {
let mut produced_output = false;
loop {
let frames_needed = self.resampler.input_frames_next();
if self.input_queue.len() < frames_needed {
break;
}
if self.process_one_block()? {
produced_output = true;
}
}
Ok(produced_output)
}

/// Processes exactly one input block if enough samples are available.
/// Returns whether output was produced.
pub(crate) fn process_one_block(&mut self) -> Result<bool, crate::Error> {
let frames_needed = self.resampler.input_frames_next();
if self.input_queue.len() < frames_needed {
return Ok(false);
}
self.rubato_input_buffer[0].clear();
self.rubato_input_buffer[0].extend(self.input_queue.drain(..frames_needed));
let produced_output = self.process_staged_input()?;
self.rubato_input_buffer[0].clear();
Ok(produced_output)
}

/// Processes an incomplete input block, optionally padding with zeros to meet resampler requirements.
/// Used for handling the final partial block at stream end when zero_pad is true.
pub(crate) fn process_partial_block(&mut self, zero_pad: bool) -> Result<bool, crate::Error> {
if self.input_queue.is_empty() {
return Ok(false);
}

let frames_needed = self.resampler.input_frames_next();
let frames_available = self.input_queue.len();

if !zero_pad && frames_available < frames_needed {
return Ok(false);
}

self.rubato_input_buffer[0].clear();
self.rubato_input_buffer[0].extend(self.input_queue.drain(..frames_available));
if frames_available < frames_needed {
if zero_pad {
self.rubato_input_buffer[0].resize(frames_needed, 0.0);
} else {
return Ok(false);
}
}

let produced_output = self.process_staged_input()?;
self.rubato_input_buffer[0].clear();
Ok(produced_output)
}

/// Discards all pending input samples without processing them.
pub(crate) fn clear_input(&mut self) {
self.input_queue.clear();
}

/// Hot-swaps the underlying resampler instance while preserving queue state.
/// Reallocates buffers and adjusts capacities as needed. Clears input queue to prevent
/// mixing samples from different configurations.
pub(crate) fn rebind_resampler(
&mut self,
resampler: R,
output_chunk_size: usize,
input_block_size: usize,
) {
self.resampler = resampler;
self.output_chunk_size = output_chunk_size;
self.input_block_size = input_block_size;
self.rubato_input_buffer = self.resampler.input_buffer_allocate(false);
self.rubato_output_buffer = self.resampler.output_buffer_allocate(true);

let desired_output_capacity = self
.resampler
.output_frames_max()
.max(self.output_chunk_size);
if self.output_queue.capacity() < desired_output_capacity {
self.output_queue
.reserve(desired_output_capacity - self.output_queue.capacity());
}

let desired_input_capacity = self.input_block_size.max(1) * CHANNELS;
if self.input_queue.capacity() < desired_input_capacity {
self.input_queue
.reserve(desired_input_capacity - self.input_queue.capacity());
}
self.clear_input();
}

/// Runs the resampler on the staged input buffer and queues the output.
/// Returns whether any output frames were produced.
fn process_staged_input(&mut self) -> Result<bool, crate::Error> {
let (_, frames_produced) = self.resampler.process_into_buffer(
&self.rubato_input_buffer[..],
&mut self.rubato_output_buffer[..],
None,
)?;
if frames_produced > 0 {
self.output_queue.extend(
self.rubato_output_buffer[0]
.iter()
.take(frames_produced)
.copied(),
);
Ok(true)
} else {
Ok(false)
}
}
}
Loading
Loading