diff --git a/Cargo.lock b/Cargo.lock index 8eb8194f96..9227aae6c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16500,6 +16500,7 @@ name = "transcribe-aws" version = "0.1.0" dependencies = [ "async-stream", + "audio-utils", "aws-config", "aws-sdk-transcribe", "aws-sdk-transcribestreaming", diff --git a/crates/audio-utils/src/lib.rs b/crates/audio-utils/src/lib.rs index a0e7c5afb6..fe5d0682dd 100644 --- a/crates/audio-utils/src/lib.rs +++ b/crates/audio-utils/src/lib.rs @@ -84,6 +84,48 @@ pub fn bytes_to_f32_samples(data: &[u8]) -> Vec { .collect() } +pub fn mix_sample_f32(mic: f32, speaker: f32) -> f32 { + (mic + speaker).clamp(-1.0, 1.0) +} + +pub fn mix_audio_f32(mic: &[f32], speaker: &[f32]) -> Vec { + let max_len = mic.len().max(speaker.len()); + (0..max_len) + .map(|i| { + let m = mic.get(i).copied().unwrap_or(0.0); + let s = speaker.get(i).copied().unwrap_or(0.0); + mix_sample_f32(m, s) + }) + .collect() +} + +pub fn mix_audio_pcm16le(mic: &[u8], speaker: &[u8]) -> Vec { + let max_len = mic.len().max(speaker.len()); + let mut mixed = Vec::with_capacity(max_len); + + let mut index = 0; + while index < max_len { + let mic_sample = if index + 1 < mic.len() { + i16::from_le_bytes([mic[index], mic[index + 1]]) + } else { + 0 + }; + + let speaker_sample = if index + 1 < speaker.len() { + i16::from_le_bytes([speaker[index], speaker[index + 1]]) + } else { + 0 + }; + + let mixed_sample = ((mic_sample as i32 + speaker_sample as i32) / 2) as i16; + + mixed.extend_from_slice(&mixed_sample.to_le_bytes()); + index += 2; + } + + mixed +} + pub fn source_from_path( path: impl AsRef, ) -> Result>, crate::Error> { diff --git a/crates/transcribe-aws/Cargo.toml b/crates/transcribe-aws/Cargo.toml index e7787364f3..555f6da04f 100644 --- a/crates/transcribe-aws/Cargo.toml +++ b/crates/transcribe-aws/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" hypr-data = { workspace = true } [dependencies] +hypr-audio-utils = { workspace = true } owhisper-config = { workspace = true } owhisper-interface = { workspace = true } diff --git a/crates/transcribe-aws/src/lib.rs b/crates/transcribe-aws/src/lib.rs index 2e3cb3f666..c760791928 100644 --- a/crates/transcribe-aws/src/lib.rs +++ b/crates/transcribe-aws/src/lib.rs @@ -28,6 +28,7 @@ use aws_sdk_transcribestreaming::types::{ AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream, }; use aws_sdk_transcribestreaming::{config::Region, Client}; +use hypr_audio_utils::mix_audio_pcm16le; use owhisper_interface::{ListenInputChunk, ListenOutputChunk, ListenParams, Word2}; @@ -88,7 +89,7 @@ impl TranscribeService { } ListenInputChunk::DualAudio { mic, speaker } => { // For now, mix the dual audio channels - let mixed = mix_audio(mic, speaker); + let mixed = mix_audio_pcm16le(&mic, &speaker); if !mixed.is_empty() { if audio_tx.send(Bytes::from(mixed)).await.is_err() { break; @@ -229,32 +230,3 @@ impl Service> for TranscribeService { }) } } - -fn mix_audio(mic: Vec, speaker: Vec) -> Vec { - // Mix the two audio channels by averaging them - let len = mic.len().max(speaker.len()); - let mut mixed = Vec::with_capacity(len); - - for i in (0..len).step_by(2) { - // Process 16-bit samples (2 bytes each) - let mic_sample = if i + 1 < mic.len() { - i16::from_le_bytes([mic[i], mic[i + 1]]) - } else { - 0 - }; - - let speaker_sample = if i + 1 < speaker.len() { - i16::from_le_bytes([speaker[i], speaker[i + 1]]) - } else { - 0 - }; - - // Mix by averaging and prevent clipping - let mixed_sample = ((mic_sample as i32 + speaker_sample as i32) / 2) as i16; - let bytes = mixed_sample.to_le_bytes(); - mixed.push(bytes[0]); - mixed.push(bytes[1]); - } - - mixed -} diff --git a/crates/transcribe-deepgram/Cargo.toml b/crates/transcribe-deepgram/Cargo.toml index 9ece7a6699..22a2fe4c40 100644 --- a/crates/transcribe-deepgram/Cargo.toml +++ b/crates/transcribe-deepgram/Cargo.toml @@ -12,6 +12,7 @@ dirs = { workspace = true } rodio = { workspace = true } [dependencies] +hypr-audio-utils = { workspace = true } owhisper-config = { workspace = true } owhisper-interface = { workspace = true } diff --git a/crates/transcribe-deepgram/src/service.rs b/crates/transcribe-deepgram/src/service.rs index 114bca9977..7c2508cf51 100644 --- a/crates/transcribe-deepgram/src/service.rs +++ b/crates/transcribe-deepgram/src/service.rs @@ -8,6 +8,7 @@ use axum::{ http::{Response, StatusCode}, response::IntoResponse, }; +use hypr_audio_utils::mix_audio_pcm16le; use std::{ future::Future, pin::Pin, @@ -71,7 +72,7 @@ impl TranscribeService { } } ListenInputChunk::DualAudio { mic, speaker } => { - let mixed = mix_audio(mic, speaker); + let mixed = mix_audio_pcm16le(&mic, &speaker); if !mixed.is_empty() { if audio_tx.send(Ok(mixed.into())).await.is_err() { break; @@ -200,29 +201,3 @@ impl Service> for TranscribeService { }) } } - -fn mix_audio(mic: Vec, speaker: Vec) -> Vec { - let len = mic.len().max(speaker.len()); - let mut mixed = Vec::with_capacity(len); - - for i in (0..len).step_by(2) { - let mic_sample = if i + 1 < mic.len() { - i16::from_le_bytes([mic[i], mic[i + 1]]) - } else { - 0 - }; - - let speaker_sample = if i + 1 < speaker.len() { - i16::from_le_bytes([speaker[i], speaker[i + 1]]) - } else { - 0 - }; - - let mixed_sample = ((mic_sample as i32 + speaker_sample as i32) / 2) as i16; - let bytes = mixed_sample.to_le_bytes(); - mixed.push(bytes[0]); - mixed.push(bytes[1]); - } - - mixed -} diff --git a/crates/ws-utils/src/lib.rs b/crates/ws-utils/src/lib.rs index 781a4c1281..362cf3f2d9 100644 --- a/crates/ws-utils/src/lib.rs +++ b/crates/ws-utils/src/lib.rs @@ -8,7 +8,7 @@ use axum::extract::ws::{Message, WebSocket}; use futures_util::{stream::SplitStream, Stream, StreamExt}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use hypr_audio_utils::bytes_to_f32_samples; +use hypr_audio_utils::{bytes_to_f32_samples, mix_audio_f32}; use owhisper_interface::ListenInputChunk; enum AudioProcessResult { @@ -70,17 +70,6 @@ fn process_ws_message(message: Message, channels: Option) -> AudioProcessRe } } -fn mix_audio_channels(mic: &[f32], speaker: &[f32]) -> Vec { - let max_len = mic.len().max(speaker.len()); - (0..max_len) - .map(|i| { - let mic_sample = mic.get(i).copied().unwrap_or(0.0); - let speaker_sample = speaker.get(i).copied().unwrap_or(0.0); - (mic_sample + speaker_sample).clamp(-1.0, 1.0) - }) - .collect() -} - pub struct WebSocketAudioSource { receiver: Option>, sample_rate: u32, @@ -127,7 +116,7 @@ impl Stream for WebSocketAudioSource { self.buffer_idx = 0; } AudioProcessResult::DualSamples { mic, speaker } => { - let mut mixed = mix_audio_channels(&mic, &speaker); + let mut mixed = mix_audio_f32(&mic, &speaker); if mixed.is_empty() { continue; } diff --git a/plugins/listener/src/actors/recorder.rs b/plugins/listener/src/actors/recorder.rs index 2592529c0f..06f2eae242 100644 --- a/plugins/listener/src/actors/recorder.rs +++ b/plugins/listener/src/actors/recorder.rs @@ -1,17 +1,19 @@ use std::fs::File; use std::io::BufWriter; use std::path::PathBuf; +use std::sync::Arc; use std::time::Instant; use hypr_audio_utils::{ - decode_vorbis_to_wav_file, encode_wav_to_vorbis_file, VorbisEncodeSettings, + decode_vorbis_to_wav_file, encode_wav_to_vorbis_file, mix_audio_f32, VorbisEncodeSettings, }; use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef}; const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000); pub enum RecMsg { - Audio(Vec), + AudioSingle(Arc<[f32]>), + AudioDual(Arc<[f32]>, Arc<[f32]>), } pub struct RecArgs { @@ -21,6 +23,8 @@ pub struct RecArgs { pub struct RecState { writer: Option>>, + writer_mic: Option>>, + writer_spk: Option>>, wav_path: PathBuf, ogg_path: PathBuf, last_flush: Instant, @@ -70,8 +74,31 @@ impl Actor for RecorderActor { hound::WavWriter::create(&wav_path, spec)? }; + let (writer_mic, writer_spk) = if is_debug_mode() { + let mic_path = dir.join(format!("{}_mic.wav", filename_base)); + let spk_path = dir.join(format!("{}_spk.wav", filename_base)); + + let mic_writer = if mic_path.exists() { + hound::WavWriter::append(&mic_path)? + } else { + hound::WavWriter::create(&mic_path, spec)? + }; + + let spk_writer = if spk_path.exists() { + hound::WavWriter::append(&spk_path)? + } else { + hound::WavWriter::create(&spk_path, spec)? + }; + + (Some(mic_writer), Some(spk_writer)) + } else { + (None, None) + }; + Ok(RecState { writer: Some(writer), + writer_mic, + writer_spk, wav_path, ogg_path, last_flush: Instant::now(), @@ -85,17 +112,37 @@ impl Actor for RecorderActor { st: &mut Self::State, ) -> Result<(), ActorProcessingErr> { match msg { - RecMsg::Audio(v) => { + RecMsg::AudioSingle(samples) => { + if let Some(ref mut writer) = st.writer { + for s in samples.iter() { + writer.write_sample(*s)?; + } + } + flush_if_due(st)?; + } + RecMsg::AudioDual(mic, spk) => { if let Some(ref mut writer) = st.writer { - for s in v { - writer.write_sample(s)?; + let mixed = mix_audio_f32(&mic, &spk); + for sample in mixed { + writer.write_sample(sample)?; } + } - if st.last_flush.elapsed() >= FLUSH_INTERVAL { - writer.flush()?; - st.last_flush = Instant::now(); + if st.writer_mic.is_some() { + if let Some(ref mut writer_mic) = st.writer_mic { + for s in mic.iter() { + writer_mic.write_sample(*s)?; + } + } + + if let Some(ref mut writer_spk) = st.writer_spk { + for s in spk.iter() { + writer_spk.write_sample(*s)?; + } } } + + flush_if_due(st)?; } } @@ -107,10 +154,9 @@ impl Actor for RecorderActor { _myself: ActorRef, st: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - if let Some(mut writer) = st.writer.take() { - writer.flush()?; - writer.finalize()?; - } + finalize_writer(&mut st.writer)?; + finalize_writer(&mut st.writer_mic)?; + finalize_writer(&mut st.writer_spk)?; if st.wav_path.exists() { let temp_ogg_path = st.ogg_path.with_extension("ogg.tmp"); @@ -141,3 +187,41 @@ impl Actor for RecorderActor { fn into_actor_err(err: hypr_audio_utils::Error) -> ActorProcessingErr { Box::new(err) } + +fn is_debug_mode() -> bool { + cfg!(debug_assertions) + || std::env::var("HYPRNOTE_DEBUG") + .map(|v| !v.is_empty() && v != "0" && v != "false") + .unwrap_or(false) +} + +fn flush_if_due(state: &mut RecState) -> Result<(), hound::Error> { + if state.last_flush.elapsed() < FLUSH_INTERVAL { + return Ok(()); + } + flush_all(state) +} + +fn flush_all(state: &mut RecState) -> Result<(), hound::Error> { + if let Some(writer) = state.writer.as_mut() { + writer.flush()?; + } + if let Some(writer_mic) = state.writer_mic.as_mut() { + writer_mic.flush()?; + } + if let Some(writer_spk) = state.writer_spk.as_mut() { + writer_spk.flush()?; + } + state.last_flush = Instant::now(); + Ok(()) +} + +fn finalize_writer( + writer: &mut Option>>, +) -> Result<(), hound::Error> { + if let Some(mut writer) = writer.take() { + writer.flush()?; + writer.finalize()?; + } + Ok(()) +} diff --git a/plugins/listener/src/actors/source.rs b/plugins/listener/src/actors/source.rs index e49edf5750..3ccbfda298 100644 --- a/plugins/listener/src/actors/source.rs +++ b/plugins/listener/src/actors/source.rs @@ -375,12 +375,12 @@ impl Pipeline { fn dispatch(&mut self, mic: Arc<[f32]>, spk: Arc<[f32]>, mode: ChannelMode) { if let Some(cell) = registry::where_is(RecorderActor::name()) { let actor: ActorRef = cell.into(); - let audio_for_recording = if mode == ChannelMode::Single { - mic.to_vec() + let result = if mode == ChannelMode::Single { + actor.cast(RecMsg::AudioSingle(Arc::clone(&mic))) } else { - Self::mix(mic.as_ref(), spk.as_ref()) + actor.cast(RecMsg::AudioDual(Arc::clone(&mic), Arc::clone(&spk))) }; - if let Err(e) = actor.cast(RecMsg::Audio(audio_for_recording)) { + if let Err(e) = result { tracing::error!(error = ?e, "failed_to_send_audio_to_recorder"); } } @@ -408,13 +408,6 @@ impl Pipeline { self.amplitude.observe(mic, spk); } - - fn mix(mic: &[f32], spk: &[f32]) -> Vec { - mic.iter() - .zip(spk.iter()) - .map(|(m, s)| (m + s).clamp(-1.0, 1.0)) - .collect() - } } struct AmplitudeEmitter {