Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions crates/audio-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,48 @@ pub fn bytes_to_f32_samples(data: &[u8]) -> Vec<f32> {
.collect()
}

pub fn mix_sample_f32(mic: f32, speaker: f32) -> f32 {
(mic + speaker).clamp(-1.0, 1.0)
}

pub fn mix_audio_f32(mic: &[f32], speaker: &[f32]) -> Vec<f32> {
let max_len = mic.len().max(speaker.len());
(0..max_len)
.map(|i| {
let m = mic.get(i).copied().unwrap_or(0.0);
let s = speaker.get(i).copied().unwrap_or(0.0);
mix_sample_f32(m, s)
})
.collect()
}

pub fn mix_audio_pcm16le(mic: &[u8], speaker: &[u8]) -> Vec<u8> {
let max_len = mic.len().max(speaker.len());
let mut mixed = Vec::with_capacity(max_len);

let mut index = 0;
while index < max_len {
let mic_sample = if index + 1 < mic.len() {
i16::from_le_bytes([mic[index], mic[index + 1]])
} else {
0
};

let speaker_sample = if index + 1 < speaker.len() {
i16::from_le_bytes([speaker[index], speaker[index + 1]])
} else {
0
};

let mixed_sample = ((mic_sample as i32 + speaker_sample as i32) / 2) as i16;

mixed.extend_from_slice(&mixed_sample.to_le_bytes());
index += 2;
}

mixed
}

pub fn source_from_path(
path: impl AsRef<std::path::Path>,
) -> Result<rodio::Decoder<std::io::BufReader<std::fs::File>>, crate::Error> {
Expand Down
1 change: 1 addition & 0 deletions crates/transcribe-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
hypr-data = { workspace = true }

[dependencies]
hypr-audio-utils = { workspace = true }
owhisper-config = { workspace = true }
owhisper-interface = { workspace = true }

Expand Down
32 changes: 2 additions & 30 deletions crates/transcribe-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use aws_sdk_transcribestreaming::types::{
AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream,
};
use aws_sdk_transcribestreaming::{config::Region, Client};
use hypr_audio_utils::mix_audio_pcm16le;

use owhisper_interface::{ListenInputChunk, ListenOutputChunk, ListenParams, Word2};

Expand Down Expand Up @@ -88,7 +89,7 @@ impl TranscribeService {
}
ListenInputChunk::DualAudio { mic, speaker } => {
// For now, mix the dual audio channels
let mixed = mix_audio(mic, speaker);
let mixed = mix_audio_pcm16le(&mic, &speaker);
if !mixed.is_empty() {
if audio_tx.send(Bytes::from(mixed)).await.is_err() {
break;
Expand Down Expand Up @@ -229,32 +230,3 @@ impl Service<Request<Body>> for TranscribeService {
})
}
}

fn mix_audio(mic: Vec<u8>, speaker: Vec<u8>) -> Vec<u8> {
// Mix the two audio channels by averaging them
let len = mic.len().max(speaker.len());
let mut mixed = Vec::with_capacity(len);

for i in (0..len).step_by(2) {
// Process 16-bit samples (2 bytes each)
let mic_sample = if i + 1 < mic.len() {
i16::from_le_bytes([mic[i], mic[i + 1]])
} else {
0
};

let speaker_sample = if i + 1 < speaker.len() {
i16::from_le_bytes([speaker[i], speaker[i + 1]])
} else {
0
};

// Mix by averaging and prevent clipping
let mixed_sample = ((mic_sample as i32 + speaker_sample as i32) / 2) as i16;
let bytes = mixed_sample.to_le_bytes();
mixed.push(bytes[0]);
mixed.push(bytes[1]);
}

mixed
}
1 change: 1 addition & 0 deletions crates/transcribe-deepgram/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dirs = { workspace = true }
rodio = { workspace = true }

[dependencies]
hypr-audio-utils = { workspace = true }
owhisper-config = { workspace = true }
owhisper-interface = { workspace = true }

Expand Down
29 changes: 2 additions & 27 deletions crates/transcribe-deepgram/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::{
http::{Response, StatusCode},
response::IntoResponse,
};
use hypr_audio_utils::mix_audio_pcm16le;
use std::{
future::Future,
pin::Pin,
Expand Down Expand Up @@ -71,7 +72,7 @@ impl TranscribeService {
}
}
ListenInputChunk::DualAudio { mic, speaker } => {
let mixed = mix_audio(mic, speaker);
let mixed = mix_audio_pcm16le(&mic, &speaker);
if !mixed.is_empty() {
if audio_tx.send(Ok(mixed.into())).await.is_err() {
break;
Expand Down Expand Up @@ -200,29 +201,3 @@ impl Service<Request<Body>> for TranscribeService {
})
}
}

fn mix_audio(mic: Vec<u8>, speaker: Vec<u8>) -> Vec<u8> {
let len = mic.len().max(speaker.len());
let mut mixed = Vec::with_capacity(len);

for i in (0..len).step_by(2) {
let mic_sample = if i + 1 < mic.len() {
i16::from_le_bytes([mic[i], mic[i + 1]])
} else {
0
};

let speaker_sample = if i + 1 < speaker.len() {
i16::from_le_bytes([speaker[i], speaker[i + 1]])
} else {
0
};

let mixed_sample = ((mic_sample as i32 + speaker_sample as i32) / 2) as i16;
let bytes = mixed_sample.to_le_bytes();
mixed.push(bytes[0]);
mixed.push(bytes[1]);
}

mixed
}
15 changes: 2 additions & 13 deletions crates/ws-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::extract::ws::{Message, WebSocket};
use futures_util::{stream::SplitStream, Stream, StreamExt};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};

use hypr_audio_utils::bytes_to_f32_samples;
use hypr_audio_utils::{bytes_to_f32_samples, mix_audio_f32};
use owhisper_interface::ListenInputChunk;

enum AudioProcessResult {
Expand Down Expand Up @@ -70,17 +70,6 @@ fn process_ws_message(message: Message, channels: Option<u32>) -> AudioProcessRe
}
}

fn mix_audio_channels(mic: &[f32], speaker: &[f32]) -> Vec<f32> {
let max_len = mic.len().max(speaker.len());
(0..max_len)
.map(|i| {
let mic_sample = mic.get(i).copied().unwrap_or(0.0);
let speaker_sample = speaker.get(i).copied().unwrap_or(0.0);
(mic_sample + speaker_sample).clamp(-1.0, 1.0)
})
.collect()
}

pub struct WebSocketAudioSource {
receiver: Option<SplitStream<WebSocket>>,
sample_rate: u32,
Expand Down Expand Up @@ -127,7 +116,7 @@ impl Stream for WebSocketAudioSource {
self.buffer_idx = 0;
}
AudioProcessResult::DualSamples { mic, speaker } => {
let mut mixed = mix_audio_channels(&mic, &speaker);
let mut mixed = mix_audio_f32(&mic, &speaker);
if mixed.is_empty() {
continue;
}
Expand Down
108 changes: 96 additions & 12 deletions plugins/listener/src/actors/recorder.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::fs::File;
use std::io::BufWriter;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;

use hypr_audio_utils::{
decode_vorbis_to_wav_file, encode_wav_to_vorbis_file, VorbisEncodeSettings,
decode_vorbis_to_wav_file, encode_wav_to_vorbis_file, mix_audio_f32, VorbisEncodeSettings,
};
use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef};

const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000);

pub enum RecMsg {
Audio(Vec<f32>),
AudioSingle(Arc<[f32]>),
AudioDual(Arc<[f32]>, Arc<[f32]>),
}

pub struct RecArgs {
Expand All @@ -21,6 +23,8 @@ pub struct RecArgs {

pub struct RecState {
writer: Option<hound::WavWriter<BufWriter<File>>>,
writer_mic: Option<hound::WavWriter<BufWriter<File>>>,
writer_spk: Option<hound::WavWriter<BufWriter<File>>>,
wav_path: PathBuf,
ogg_path: PathBuf,
last_flush: Instant,
Expand Down Expand Up @@ -70,8 +74,31 @@ impl Actor for RecorderActor {
hound::WavWriter::create(&wav_path, spec)?
};

let (writer_mic, writer_spk) = if is_debug_mode() {
let mic_path = dir.join(format!("{}_mic.wav", filename_base));
let spk_path = dir.join(format!("{}_spk.wav", filename_base));

let mic_writer = if mic_path.exists() {
hound::WavWriter::append(&mic_path)?
} else {
hound::WavWriter::create(&mic_path, spec)?
};

let spk_writer = if spk_path.exists() {
hound::WavWriter::append(&spk_path)?
} else {
hound::WavWriter::create(&spk_path, spec)?
};

(Some(mic_writer), Some(spk_writer))
} else {
(None, None)
};

Ok(RecState {
writer: Some(writer),
writer_mic,
writer_spk,
wav_path,
ogg_path,
last_flush: Instant::now(),
Expand All @@ -85,17 +112,37 @@ impl Actor for RecorderActor {
st: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match msg {
RecMsg::Audio(v) => {
RecMsg::AudioSingle(samples) => {
if let Some(ref mut writer) = st.writer {
for s in samples.iter() {
writer.write_sample(*s)?;
}
}
flush_if_due(st)?;
}
RecMsg::AudioDual(mic, spk) => {
if let Some(ref mut writer) = st.writer {
for s in v {
writer.write_sample(s)?;
let mixed = mix_audio_f32(&mic, &spk);
for sample in mixed {
writer.write_sample(sample)?;
}
}

if st.last_flush.elapsed() >= FLUSH_INTERVAL {
writer.flush()?;
st.last_flush = Instant::now();
if st.writer_mic.is_some() {
if let Some(ref mut writer_mic) = st.writer_mic {
for s in mic.iter() {
writer_mic.write_sample(*s)?;
}
}

if let Some(ref mut writer_spk) = st.writer_spk {
for s in spk.iter() {
writer_spk.write_sample(*s)?;
}
}
}

flush_if_due(st)?;
}
}

Expand All @@ -107,10 +154,9 @@ impl Actor for RecorderActor {
_myself: ActorRef<Self::Msg>,
st: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let Some(mut writer) = st.writer.take() {
writer.flush()?;
writer.finalize()?;
}
finalize_writer(&mut st.writer)?;
finalize_writer(&mut st.writer_mic)?;
finalize_writer(&mut st.writer_spk)?;

if st.wav_path.exists() {
let temp_ogg_path = st.ogg_path.with_extension("ogg.tmp");
Expand Down Expand Up @@ -141,3 +187,41 @@ impl Actor for RecorderActor {
fn into_actor_err(err: hypr_audio_utils::Error) -> ActorProcessingErr {
Box::new(err)
}

fn is_debug_mode() -> bool {
cfg!(debug_assertions)
|| std::env::var("HYPRNOTE_DEBUG")
.map(|v| !v.is_empty() && v != "0" && v != "false")
.unwrap_or(false)
}

fn flush_if_due(state: &mut RecState) -> Result<(), hound::Error> {
if state.last_flush.elapsed() < FLUSH_INTERVAL {
return Ok(());
}
flush_all(state)
}

fn flush_all(state: &mut RecState) -> Result<(), hound::Error> {
if let Some(writer) = state.writer.as_mut() {
writer.flush()?;
}
if let Some(writer_mic) = state.writer_mic.as_mut() {
writer_mic.flush()?;
}
if let Some(writer_spk) = state.writer_spk.as_mut() {
writer_spk.flush()?;
}
state.last_flush = Instant::now();
Ok(())
}

fn finalize_writer(
writer: &mut Option<hound::WavWriter<BufWriter<File>>>,
) -> Result<(), hound::Error> {
if let Some(mut writer) = writer.take() {
writer.flush()?;
writer.finalize()?;
}
Ok(())
}
15 changes: 4 additions & 11 deletions plugins/listener/src/actors/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,12 @@ impl Pipeline {
fn dispatch(&mut self, mic: Arc<[f32]>, spk: Arc<[f32]>, mode: ChannelMode) {
if let Some(cell) = registry::where_is(RecorderActor::name()) {
let actor: ActorRef<RecMsg> = cell.into();
let audio_for_recording = if mode == ChannelMode::Single {
mic.to_vec()
let result = if mode == ChannelMode::Single {
actor.cast(RecMsg::AudioSingle(Arc::clone(&mic)))
} else {
Self::mix(mic.as_ref(), spk.as_ref())
actor.cast(RecMsg::AudioDual(Arc::clone(&mic), Arc::clone(&spk)))
};
if let Err(e) = actor.cast(RecMsg::Audio(audio_for_recording)) {
if let Err(e) = result {
tracing::error!(error = ?e, "failed_to_send_audio_to_recorder");
}
}
Expand Down Expand Up @@ -408,13 +408,6 @@ impl Pipeline {

self.amplitude.observe(mic, spk);
}

fn mix(mic: &[f32], spk: &[f32]) -> Vec<f32> {
mic.iter()
.zip(spk.iter())
.map(|(m, s)| (m + s).clamp(-1.0, 1.0))
.collect()
}
}

struct AmplitudeEmitter {
Expand Down
Loading