diff --git a/apps/desktop/src/hooks/useRunBatch.ts b/apps/desktop/src/hooks/useRunBatch.ts index 8d38c15649..c0dfbbedb3 100644 --- a/apps/desktop/src/hooks/useRunBatch.ts +++ b/apps/desktop/src/hooks/useRunBatch.ts @@ -61,8 +61,9 @@ export const useRunBatch = (sessionId: string) => { })(); if (!provider) { - console.error("unsupported_batch_provider", conn.provider); - return; + throw new Error( + `Batch transcription is not supported for provider: ${conn.provider}`, + ); } if (sessionTabRef.current) { diff --git a/apps/desktop/src/store/zustand/listener/general.ts b/apps/desktop/src/store/zustand/listener/general.ts index f5494be9cd..bde310ea32 100644 --- a/apps/desktop/src/store/zustand/listener/general.ts +++ b/apps/desktop/src/store/zustand/listener/general.ts @@ -333,7 +333,7 @@ export const createGeneralSlice = < get().setTranscriptPersist(options.handlePersist); } - get().clearBatchSession(sessionId); + get().handleBatchStarted(sessionId); let unlisten: (() => void) | undefined; diff --git a/plugins/listener2/src/batch.rs b/plugins/listener2/src/batch.rs index e675904e15..559bd2c094 100644 --- a/plugins/listener2/src/batch.rs +++ b/plugins/listener2/src/batch.rs @@ -9,16 +9,18 @@ use tauri_specta::Event; use tokio_stream::{self as tokio_stream, StreamExt as TokioStreamExt}; use crate::BatchEvent; -const BATCH_STREAM_TIMEOUT_SECS: u64 = 5; +const BATCH_STREAM_TIMEOUT_SECS: u64 = 30; const DEFAULT_CHUNK_MS: u64 = 500; const DEFAULT_DELAY_MS: u64 = 20; pub enum BatchMsg { - StreamResponse(Box), + StreamResponse { + response: Box, + percentage: f64, + }, StreamError(String), StreamEnded, StreamStartFailed(String), - StreamAudioDuration(f64), } pub type BatchStartNotifier = Arc>>>>; @@ -39,36 +41,14 @@ pub struct BatchState { pub session_id: String, rx_task: tokio::task::JoinHandle<()>, shutdown_tx: Option>, - audio_duration_secs: Option, } impl BatchState { - fn on_audio_duration(&mut self, duration: f64) -> Result<(), ActorProcessingErr> { - let clamped = if duration.is_finite() && duration >= 0.0 { - duration - } else { - 0.0 - }; - - self.audio_duration_secs = Some(clamped); - Ok(()) - } - fn emit_streamed_response( &self, response: StreamResponse, - transcript_end: f64, + percentage: f64, ) -> Result<(), ActorProcessingErr> { - let percentage = if let Some(audio_duration) = self.audio_duration_secs { - if audio_duration > 0.0 { - (transcript_end / audio_duration).clamp(0.0, 1.0) - } else { - 0.0 - } - } else { - 0.0 - }; - BatchEvent::BatchResponseStreamed { session_id: self.session_id.clone(), response, @@ -119,7 +99,6 @@ impl Actor for BatchActor { session_id: args.session_id, rx_task, shutdown_tx: Some(shutdown_tx), - audio_duration_secs: None, }; Ok(state) @@ -144,7 +123,10 @@ impl Actor for BatchActor { state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { match message { - BatchMsg::StreamResponse(response) => { + BatchMsg::StreamResponse { + response, + percentage, + } => { tracing::info!("batch stream response received"); let is_final = matches!( @@ -153,18 +135,10 @@ impl Actor for BatchActor { ); if is_final { - let transcript_end = transcript_end_from_response(&response); - if let Some(end) = transcript_end { - state.emit_streamed_response(*response, end)?; - } + state.emit_streamed_response(*response, percentage)?; } } - BatchMsg::StreamAudioDuration(duration) => { - tracing::info!("batch stream audio duration seconds: {duration}"); - state.on_audio_duration(duration)?; - } - BatchMsg::StreamStartFailed(error) => { tracing::info!("batch_stream_start_failed: {}", error); state.emit_failure(error.clone())?; @@ -268,7 +242,6 @@ async fn spawn_batch_task( } else { frame_count as f64 / metadata.sample_rate as f64 }; - let _ = myself.send_message(BatchMsg::StreamAudioDuration(audio_duration_secs)); let channel_count = metadata.channels.clamp(1, 2); let listen_params = owhisper_interface::ListenParams { @@ -311,7 +284,7 @@ async fn spawn_batch_task( notify_start_result(&start_notifier, Ok(())); futures_util::pin_mut!(listen_stream); - process_batch_stream(listen_stream, myself, shutdown_rx).await; + process_batch_stream(listen_stream, myself, shutdown_rx, audio_duration_secs).await; }); Ok((rx_task, shutdown_tx)) @@ -321,6 +294,7 @@ async fn process_batch_stream( mut listen_stream: std::pin::Pin<&mut S>, myself: ActorRef, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>, + audio_duration_secs: f64, ) where S: futures_util::Stream>, E: std::fmt::Debug, @@ -359,7 +333,13 @@ async fn process_batch_stream( if is_from_finalize { " (from_finalize)" } else { "" } ); - let _ = myself.send_message(BatchMsg::StreamResponse(Box::new(response))); + let percentage = compute_percentage(&response, audio_duration_secs); + if let Err(e) = myself.send_message(BatchMsg::StreamResponse { + response: Box::new(response), + percentage, + }) { + tracing::error!("failed to send stream response message: {:?}", e); + } if is_from_finalize { break; @@ -367,7 +347,9 @@ async fn process_batch_stream( } Ok(Some(Err(e))) => { tracing::error!("batch stream error: {:?}", e); - let _ = myself.send_message(BatchMsg::StreamError(format!("{:?}", e))); + if let Err(send_err) = myself.send_message(BatchMsg::StreamError(format!("{:?}", e))) { + tracing::error!("failed to send stream error message: {:?}", send_err); + } break; } Ok(None) => { @@ -376,7 +358,9 @@ async fn process_batch_stream( } Err(elapsed) => { tracing::warn!(timeout = ?elapsed, responses = response_count, "batch stream response timeout"); - let _ = myself.send_message(BatchMsg::StreamError("timeout waiting for batch stream response".into())); + if let Err(send_err) = myself.send_message(BatchMsg::StreamError("timeout waiting for batch stream response".into())) { + tracing::error!("failed to send timeout error message: {:?}", send_err); + } break; } } @@ -384,10 +368,20 @@ async fn process_batch_stream( } } - let _ = myself.send_message(BatchMsg::StreamEnded); + if let Err(e) = myself.send_message(BatchMsg::StreamEnded) { + tracing::error!("failed to send stream ended message: {:?}", e); + } tracing::info!("batch stream processing loop exited"); } +fn compute_percentage(response: &StreamResponse, audio_duration_secs: f64) -> f64 { + let transcript_end = transcript_end_from_response(response); + match transcript_end { + Some(end) if audio_duration_secs > 0.0 => (end / audio_duration_secs).clamp(0.0, 1.0), + _ => 0.0, + } +} + fn transcript_end_from_response(response: &StreamResponse) -> Option { let StreamResponse::TranscriptResponse { start, diff --git a/plugins/listener2/src/ext.rs b/plugins/listener2/src/ext.rs index c896afa54f..d597804118 100644 --- a/plugins/listener2/src/ext.rs +++ b/plugins/listener2/src/ext.rs @@ -122,7 +122,11 @@ impl> Listener2PluginExt for T { session_id: params.session_id.clone(), } .emit(&app) - .map_err(|_| crate::Error::BatchStartFailed("failed to emit event".to_string()))?; + .map_err(|e| { + crate::Error::BatchStartFailed(format!( + "failed to emit BatchStarted event: {e}" + )) + })?; let client = owhisper_client::ListenClient::builder() .api_base(params.base_url.clone()) @@ -140,7 +144,11 @@ impl> Listener2PluginExt for T { response, } .emit(&app) - .map_err(|_| crate::Error::BatchStartFailed("failed to emit event".to_string()))?; + .map_err(|e| { + crate::Error::BatchStartFailed(format!( + "failed to emit BatchResponse event: {e}" + )) + })?; Ok(()) } @@ -151,7 +159,11 @@ impl> Listener2PluginExt for T { session_id: params.session_id.clone(), } .emit(&app) - .map_err(|_| crate::Error::BatchStartFailed("failed to emit event".to_string()))?; + .map_err(|e| { + crate::Error::BatchStartFailed(format!( + "failed to emit BatchStarted event: {e}" + )) + })?; let client = owhisper_client::ListenClient::builder() .adapter::() @@ -170,7 +182,11 @@ impl> Listener2PluginExt for T { response, } .emit(&app) - .map_err(|_| crate::Error::BatchStartFailed("failed to emit event".to_string()))?; + .map_err(|e| { + crate::Error::BatchStartFailed(format!( + "failed to emit BatchResponse event: {e}" + )) + })?; Ok(()) }