Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/desktop/src/hooks/useRunBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ export const useRunBatch = (sessionId: string) => {
})();

if (!provider) {
console.error("unsupported_batch_provider", conn.provider);
return;
throw new Error(
`Batch transcription is not supported for provider: ${conn.provider}`,
);
}

if (sessionTabRef.current) {
Expand Down
2 changes: 1 addition & 1 deletion apps/desktop/src/store/zustand/listener/general.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ export const createGeneralSlice = <
get().setTranscriptPersist(options.handlePersist);
}

get().clearBatchSession(sessionId);
get().handleBatchStarted(sessionId);

let unlisten: (() => void) | undefined;

Expand Down
80 changes: 37 additions & 43 deletions plugins/listener2/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ use tauri_specta::Event;
use tokio_stream::{self as tokio_stream, StreamExt as TokioStreamExt};

use crate::BatchEvent;
const BATCH_STREAM_TIMEOUT_SECS: u64 = 5;
const BATCH_STREAM_TIMEOUT_SECS: u64 = 30;
const DEFAULT_CHUNK_MS: u64 = 500;
const DEFAULT_DELAY_MS: u64 = 20;

pub enum BatchMsg {
StreamResponse(Box<StreamResponse>),
StreamResponse {
response: Box<StreamResponse>,
percentage: f64,
},
StreamError(String),
StreamEnded,
StreamStartFailed(String),
StreamAudioDuration(f64),
}

pub type BatchStartNotifier = Arc<Mutex<Option<tokio::sync::oneshot::Sender<Result<(), String>>>>>;
Expand All @@ -39,36 +41,14 @@ pub struct BatchState {
pub session_id: String,
rx_task: tokio::task::JoinHandle<()>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
audio_duration_secs: Option<f64>,
}

impl BatchState {
fn on_audio_duration(&mut self, duration: f64) -> Result<(), ActorProcessingErr> {
let clamped = if duration.is_finite() && duration >= 0.0 {
duration
} else {
0.0
};

self.audio_duration_secs = Some(clamped);
Ok(())
}

fn emit_streamed_response(
&self,
response: StreamResponse,
transcript_end: f64,
percentage: f64,
) -> Result<(), ActorProcessingErr> {
let percentage = if let Some(audio_duration) = self.audio_duration_secs {
if audio_duration > 0.0 {
(transcript_end / audio_duration).clamp(0.0, 1.0)
} else {
0.0
}
} else {
0.0
};

BatchEvent::BatchResponseStreamed {
session_id: self.session_id.clone(),
response,
Expand Down Expand Up @@ -119,7 +99,6 @@ impl Actor for BatchActor {
session_id: args.session_id,
rx_task,
shutdown_tx: Some(shutdown_tx),
audio_duration_secs: None,
};

Ok(state)
Expand All @@ -144,7 +123,10 @@ impl Actor for BatchActor {
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
BatchMsg::StreamResponse(response) => {
BatchMsg::StreamResponse {
response,
percentage,
} => {
tracing::info!("batch stream response received");

let is_final = matches!(
Expand All @@ -153,18 +135,10 @@ impl Actor for BatchActor {
);

if is_final {
let transcript_end = transcript_end_from_response(&response);
if let Some(end) = transcript_end {
state.emit_streamed_response(*response, end)?;
}
state.emit_streamed_response(*response, percentage)?;
}
}

BatchMsg::StreamAudioDuration(duration) => {
tracing::info!("batch stream audio duration seconds: {duration}");
state.on_audio_duration(duration)?;
}

BatchMsg::StreamStartFailed(error) => {
tracing::info!("batch_stream_start_failed: {}", error);
state.emit_failure(error.clone())?;
Expand Down Expand Up @@ -268,7 +242,6 @@ async fn spawn_batch_task(
} else {
frame_count as f64 / metadata.sample_rate as f64
};
let _ = myself.send_message(BatchMsg::StreamAudioDuration(audio_duration_secs));

let channel_count = metadata.channels.clamp(1, 2);
let listen_params = owhisper_interface::ListenParams {
Expand Down Expand Up @@ -311,7 +284,7 @@ async fn spawn_batch_task(
notify_start_result(&start_notifier, Ok(()));
futures_util::pin_mut!(listen_stream);

process_batch_stream(listen_stream, myself, shutdown_rx).await;
process_batch_stream(listen_stream, myself, shutdown_rx, audio_duration_secs).await;
});

Ok((rx_task, shutdown_tx))
Expand All @@ -321,6 +294,7 @@ async fn process_batch_stream<S, E>(
mut listen_stream: std::pin::Pin<&mut S>,
myself: ActorRef<BatchMsg>,
mut shutdown_rx: tokio::sync::oneshot::Receiver<()>,
audio_duration_secs: f64,
) where
S: futures_util::Stream<Item = Result<StreamResponse, E>>,
E: std::fmt::Debug,
Expand Down Expand Up @@ -359,15 +333,23 @@ async fn process_batch_stream<S, E>(
if is_from_finalize { " (from_finalize)" } else { "" }
);

let _ = myself.send_message(BatchMsg::StreamResponse(Box::new(response)));
let percentage = compute_percentage(&response, audio_duration_secs);
if let Err(e) = myself.send_message(BatchMsg::StreamResponse {
response: Box::new(response),
percentage,
}) {
tracing::error!("failed to send stream response message: {:?}", e);
}

if is_from_finalize {
break;
}
}
Ok(Some(Err(e))) => {
tracing::error!("batch stream error: {:?}", e);
let _ = myself.send_message(BatchMsg::StreamError(format!("{:?}", e)));
if let Err(send_err) = myself.send_message(BatchMsg::StreamError(format!("{:?}", e))) {
tracing::error!("failed to send stream error message: {:?}", send_err);
}
break;
}
Ok(None) => {
Expand All @@ -376,18 +358,30 @@ async fn process_batch_stream<S, E>(
}
Err(elapsed) => {
tracing::warn!(timeout = ?elapsed, responses = response_count, "batch stream response timeout");
let _ = myself.send_message(BatchMsg::StreamError("timeout waiting for batch stream response".into()));
if let Err(send_err) = myself.send_message(BatchMsg::StreamError("timeout waiting for batch stream response".into())) {
tracing::error!("failed to send timeout error message: {:?}", send_err);
}
break;
}
}
}
}
}

let _ = myself.send_message(BatchMsg::StreamEnded);
if let Err(e) = myself.send_message(BatchMsg::StreamEnded) {
tracing::error!("failed to send stream ended message: {:?}", e);
}
tracing::info!("batch stream processing loop exited");
}

fn compute_percentage(response: &StreamResponse, audio_duration_secs: f64) -> f64 {
let transcript_end = transcript_end_from_response(response);
match transcript_end {
Some(end) if audio_duration_secs > 0.0 => (end / audio_duration_secs).clamp(0.0, 1.0),
_ => 0.0,
}
}

fn transcript_end_from_response(response: &StreamResponse) -> Option<f64> {
let StreamResponse::TranscriptResponse {
start,
Expand Down
24 changes: 20 additions & 4 deletions plugins/listener2/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ impl<R: tauri::Runtime, T: tauri::Manager<R>> Listener2PluginExt<R> for T {
session_id: params.session_id.clone(),
}
.emit(&app)
.map_err(|_| crate::Error::BatchStartFailed("failed to emit event".to_string()))?;
.map_err(|e| {
crate::Error::BatchStartFailed(format!(
"failed to emit BatchStarted event: {e}"
))
})?;

let client = owhisper_client::ListenClient::builder()
.api_base(params.base_url.clone())
Expand All @@ -140,7 +144,11 @@ impl<R: tauri::Runtime, T: tauri::Manager<R>> Listener2PluginExt<R> for T {
response,
}
.emit(&app)
.map_err(|_| crate::Error::BatchStartFailed("failed to emit event".to_string()))?;
.map_err(|e| {
crate::Error::BatchStartFailed(format!(
"failed to emit BatchResponse event: {e}"
))
})?;

Ok(())
}
Expand All @@ -151,7 +159,11 @@ impl<R: tauri::Runtime, T: tauri::Manager<R>> Listener2PluginExt<R> for T {
session_id: params.session_id.clone(),
}
.emit(&app)
.map_err(|_| crate::Error::BatchStartFailed("failed to emit event".to_string()))?;
.map_err(|e| {
crate::Error::BatchStartFailed(format!(
"failed to emit BatchStarted event: {e}"
))
})?;

let client = owhisper_client::ListenClient::builder()
.adapter::<owhisper_client::SonioxAdapter>()
Expand All @@ -170,7 +182,11 @@ impl<R: tauri::Runtime, T: tauri::Manager<R>> Listener2PluginExt<R> for T {
response,
}
.emit(&app)
.map_err(|_| crate::Error::BatchStartFailed("failed to emit event".to_string()))?;
.map_err(|e| {
crate::Error::BatchStartFailed(format!(
"failed to emit BatchResponse event: {e}"
))
})?;

Ok(())
}
Expand Down