Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ hypr-slack = { path = "crates/slack", package = "slack" }
hypr-stt = { path = "crates/stt", package = "stt", features = ["realtime", "recorded"] }
hypr-template = { path = "crates/template", package = "template" }
hypr-turso = { path = "crates/turso", package = "turso" }
hypr-vad = { path = "crates/vad", package = "vad" }
hypr-whisper = { path = "crates/whisper", package = "whisper" }
hypr-whisper-cloud = { path = "crates/whisper-cloud", package = "whisper-cloud" }
hypr-whisper-local = { path = "crates/whisper-local", package = "whisper-local" }
Expand Down Expand Up @@ -193,6 +192,7 @@ hound = "3.5.1"
realfft = "3.5.0"
ringbuf = "0.4.8"
rodio = { version = "0.20.1", features = ["symphonia"] }
silero-rs = { git = "https://github.com/emotechlab/silero-rs", rev = "26a6460", package = "silero" }

kalosm-common = { git = "https://github.com/floneum/floneum", rev = "52967ae" }
kalosm-llama = { git = "https://github.com/floneum/floneum", rev = "52967ae" }
Expand Down
13 changes: 5 additions & 8 deletions crates/chunker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ hound = { workspace = true }
hypr-data = { workspace = true }

[dependencies]
hypr-vad = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }

kalosm-sound = { workspace = true, default-features = false }
rodio = { workspace = true }
silero-rs = { workspace = true }

futures-util = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

[features]
default = []
load-dynamic = ["hypr-vad/load-dynamic"]
tracing = { workspace = true }
17 changes: 4 additions & 13 deletions crates/chunker/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
use serde::{ser::Serializer, Serialize};

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
VadError(#[from] hypr_vad::Error),
}

impl Serialize for Error {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.to_string().as_ref())
}
#[error("Failed to create VAD session")]
VadSessionCreationFailed,
#[error("Failed to process audio")]
VadProcessingFailed(String),
}
166 changes: 124 additions & 42 deletions crates/chunker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,143 @@
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures_util::Stream;
use kalosm_sound::AsyncSource;

use silero_rs::{VadConfig, VadSession, VadTransition};

mod error;
mod predictor;
mod stream;
use error::*;

pub use error::*;
pub use predictor::*;
pub use stream::*;
pub struct ChunkStream<S: AsyncSource> {
source: S,
chunk_samples: usize,
buffer: Vec<f32>,
}

use kalosm_sound::AsyncSource;
use std::time::Duration;

pub trait ChunkerExt: AsyncSource + Sized {
fn chunks<P: Predictor + Unpin>(
self,
predictor: P,
chunk_duration: Duration,
) -> ChunkStream<Self, P>
impl<S: AsyncSource> ChunkStream<S> {
fn new(source: S, chunk_duration: Duration) -> Self {
let sample_rate = source.sample_rate();
let chunk_samples = (chunk_duration.as_secs_f64() * sample_rate as f64) as usize;

Self {
source,
chunk_samples,
buffer: Vec::with_capacity(chunk_samples),
}
}
}

impl<S: AsyncSource + Unpin> Stream for ChunkStream<S> {
type Item = Vec<f32>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let stream = this.source.as_stream();
let mut stream = std::pin::pin!(stream);

while this.buffer.len() < this.chunk_samples {
match stream.as_mut().poll_next(cx) {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(Some(sample)) => {
this.buffer.push(sample);
}
Poll::Ready(None) => {
if this.buffer.is_empty() {
return Poll::Ready(None);
} else {
let chunk = std::mem::take(&mut this.buffer);
return Poll::Ready(Some(chunk));
}
}
}
}

let mut chunk = Vec::with_capacity(this.chunk_samples);
chunk.extend(this.buffer.drain(..this.chunk_samples));
Poll::Ready(Some(chunk))
}
}

pub trait VadExt: AsyncSource + Sized {
fn vad_chunks(self) -> VadChunkStream<Self>
where
Self: Unpin,
{
ChunkStream::new(self, predictor, chunk_duration)
let config = VadConfig {
post_speech_pad: Duration::from_millis(50),
..Default::default()
};

VadChunkStream::new(self, config).unwrap()
}
}

impl<T: AsyncSource> ChunkerExt for T {}
impl<T: AsyncSource> VadExt for T {}

#[cfg(test)]
mod tests {
use super::*;
use futures_util::StreamExt;
pub struct VadChunkStream<S: AsyncSource> {
chunk_stream: ChunkStream<S>,
vad_session: VadSession,
pending_chunks: Vec<AudioChunk>,
}

#[tokio::test]
async fn test_chunker() {
let audio_source = rodio::Decoder::new(std::io::BufReader::new(
std::fs::File::open(hypr_data::english_1::AUDIO_PATH).unwrap(),
))
.unwrap();
impl<S: AsyncSource> VadChunkStream<S> {
fn new(source: S, mut config: VadConfig) -> Result<Self, Error> {
config.sample_rate = source.sample_rate() as usize;

let spec = hound::WavSpec {
channels: 1,
sample_rate: 16000,
bits_per_sample: 32,
sample_format: hound::SampleFormat::Float,
};
// https://github.com/emotechlab/silero-rs/blob/26a6460/src/lib.rs#L775
let chunk_duration = Duration::from_millis(30);

let mut stream = audio_source.chunks(RMS::new(), Duration::from_secs(15));
let mut i = 0;
Ok(Self {
chunk_stream: ChunkStream::new(source, chunk_duration),
vad_session: VadSession::new(config).map_err(|_| Error::VadSessionCreationFailed)?,
pending_chunks: Vec::new(),
})
}
}

#[derive(Debug, Clone)]
pub struct AudioChunk {
pub samples: Vec<f32>,
}

impl<S: AsyncSource + Unpin> Stream for VadChunkStream<S> {
type Item = Result<AudioChunk, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

if let Some(chunk) = this.pending_chunks.pop() {
return Poll::Ready(Some(Ok(chunk)));
}

let _ = std::fs::remove_dir_all("tmp/english_1");
let _ = std::fs::create_dir_all("tmp/english_1");
loop {
match Pin::new(&mut this.chunk_stream).poll_next(cx) {
Poll::Ready(Some(samples)) => match this.vad_session.process(&samples) {
Ok(transitions) => {
for transition in transitions {
if let VadTransition::SpeechEnd { samples, .. } = transition {
this.pending_chunks.push(AudioChunk { samples });
}
}

while let Some(chunk) = stream.next().await {
let file = std::fs::File::create(format!("tmp/english_1/chunk_{}.wav", i)).unwrap();
let mut writer = hound::WavWriter::new(file, spec).unwrap();
for sample in chunk {
writer.write_sample(sample).unwrap();
if let Some(chunk) = this.pending_chunks.pop() {
return Poll::Ready(Some(Ok(chunk)));
}
}
Err(e) => {
let error = Error::VadProcessingFailed(e.to_string());
return Poll::Ready(Some(Err(error)));
}
},
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
i += 1;
}
}
}
45 changes: 0 additions & 45 deletions crates/chunker/src/predictor.rs

This file was deleted.

Loading
Loading