From 518aa8315d40256d13f8527ee6e9c8a930a74f19 Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Tue, 4 Nov 2025 17:34:09 +0900 Subject: [PATCH 1/2] remap channel_index from [0,1] to [0, 2] --- owhisper/owhisper-client/src/lib.rs | 2 + owhisper/owhisper-interface/src/stream.rs | 35 +++ plugins/listener/src/actors/listener.rs | 284 +++++++++++++++------- plugins/listener/src/actors/mod.rs | 6 + plugins/listener/src/actors/processor.rs | 102 ++++++-- plugins/listener/src/actors/session.rs | 27 ++ plugins/listener/src/actors/source.rs | 60 +++-- 7 files changed, 387 insertions(+), 129 deletions(-) diff --git a/owhisper/owhisper-client/src/lib.rs b/owhisper/owhisper-client/src/lib.rs index a7287b92ef..4ea5ce86aa 100644 --- a/owhisper/owhisper-client/src/lib.rs +++ b/owhisper/owhisper-client/src/lib.rs @@ -3,6 +3,8 @@ use futures_util::Stream; use hypr_ws::client::{ClientRequestBuilder, Message, WebSocketClient, WebSocketIO}; use owhisper_interface::{ControlMessage, MixedMessage, StreamResponse}; +pub use hypr_ws; + fn interleave_audio(mic: &[u8], speaker: &[u8]) -> Vec { let mic_samples: Vec = mic .chunks_exact(2) diff --git a/owhisper/owhisper-interface/src/stream.rs b/owhisper/owhisper-interface/src/stream.rs index d7038ef1e0..0aad0bcade 100644 --- a/owhisper/owhisper-interface/src/stream.rs +++ b/owhisper/owhisper-interface/src/stream.rs @@ -137,6 +137,41 @@ impl StreamResponse { _ => None, } } + + pub fn apply_offset(&mut self, offset_secs: f64) { + match self { + StreamResponse::TranscriptResponse { start, channel, .. } => { + *start += offset_secs; + for alt in &mut channel.alternatives { + for word in &mut alt.words { + word.start += offset_secs; + word.end += offset_secs; + } + } + } + StreamResponse::SpeechStartedResponse { timestamp, .. } => { + *timestamp += offset_secs; + } + StreamResponse::UtteranceEndResponse { last_word_end, .. } => { + *last_word_end += offset_secs; + } + _ => {} + } + } + + pub fn set_extra(&mut self, extra: &Extra) { + if let StreamResponse::TranscriptResponse { metadata, .. } = self { + metadata.extra = Some(extra.clone().into()); + } + } + + pub fn remap_channel_index(&mut self, from: i32, to: i32) { + if let StreamResponse::TranscriptResponse { channel_index, .. } = self { + if !channel_index.is_empty() && channel_index[0] == from { + channel_index[0] = to; + } + } + } } #[cfg(test)] diff --git a/plugins/listener/src/actors/listener.rs b/plugins/listener/src/actors/listener.rs index 0ad09ecc09..fe9c819768 100644 --- a/plugins/listener/src/actors/listener.rs +++ b/plugins/listener/src/actors/listener.rs @@ -1,9 +1,10 @@ use bytes::Bytes; -use std::time::Duration; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use futures_util::StreamExt; use tokio::time::error::Elapsed; +use owhisper_client::hypr_ws; use owhisper_interface::{ControlMessage, Extra, MixedMessage, StreamResponse}; use ractor::{Actor, ActorName, ActorProcessingErr, ActorRef, SupervisionEvent}; use tauri_specta::Event; @@ -20,6 +21,7 @@ pub enum ListenerMsg { StreamEnded, StreamTimeout(Elapsed), StreamStartFailed(String), + ChangeMode(crate::actors::ChannelMode), } #[derive(Clone)] @@ -31,6 +33,9 @@ pub struct ListenerArgs { pub base_url: String, pub api_key: String, pub keywords: Vec, + pub mode: crate::actors::ChannelMode, + pub session_started_at: Instant, + pub session_started_at_unix: SystemTime, } pub struct ListenerState { @@ -94,7 +99,11 @@ impl Actor for ListenerActor { let _ = state.tx.try_send(MixedMessage::Audio((mic, spk))); } - ListenerMsg::StreamResponse(response) => { + ListenerMsg::StreamResponse(mut response) => { + if state.args.mode == crate::actors::ChannelMode::Single { + response.remap_channel_index(0, 2); + } + SessionEvent::StreamResponse { response }.emit(&state.args.app)?; } @@ -117,6 +126,23 @@ impl Actor for ListenerActor { tracing::info!("listen_stream_timeout: {}", elapsed); myself.stop(None); } + + ListenerMsg::ChangeMode(new_mode) => { + tracing::info!(?new_mode, "listener_mode_change"); + + if let Some(shutdown_tx) = state.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + let _ = (&mut state.rx_task).await; + } + + state.args.mode = new_mode; + + let (tx, rx_task, shutdown_tx) = + spawn_rx_task(state.args.clone(), myself.clone()).await?; + state.tx = tx; + state.rx_task = rx_task; + state.shutdown_tx = Some(shutdown_tx); + } } Ok(()) } @@ -152,113 +178,187 @@ async fn spawn_rx_task( ActorProcessingErr, > { let (tx, rx) = tokio::sync::mpsc::channel::>(32); - let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); - - let client = owhisper_client::ListenClient::builder() - .api_base(args.base_url) - .api_key(args.api_key) - .params(owhisper_interface::ListenParams { - model: Some(args.model), - languages: args.languages, - redemption_time_ms: Some(if args.onboarding { 60 } else { 400 }), - keywords: args.keywords, - ..Default::default() - }) - .build_dual(); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + + let session_offset_secs = args.session_started_at.elapsed().as_secs_f64(); + let started_unix_secs = args + .session_started_at_unix + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + + let extra = Extra { started_unix_secs }; let rx_task = tokio::spawn(async move { - let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - let (listen_stream, handle) = match client.from_realtime_audio(outbound).await { - Ok(res) => res, - Err(e) => { - let _ = myself.send_message(ListenerMsg::StreamStartFailed(format!("{:?}", e))); - return; - } - }; - futures_util::pin_mut!(listen_stream); + use crate::actors::ChannelMode; + + if args.mode == ChannelMode::Single { + let client = owhisper_client::ListenClient::builder() + .api_base(args.base_url.clone()) + .api_key(args.api_key.clone()) + .params(owhisper_interface::ListenParams { + model: Some(args.model.clone()), + languages: args.languages.clone(), + redemption_time_ms: Some(if args.onboarding { 60 } else { 400 }), + keywords: args.keywords.clone(), + ..Default::default() + }) + .build_single(); + + let outbound = tokio_stream::StreamExt::map( + tokio_stream::wrappers::ReceiverStream::new(rx), + |msg| match msg { + MixedMessage::Audio((_mic, spk)) => MixedMessage::Audio(spk), + MixedMessage::Control(c) => MixedMessage::Control(c), + }, + ); + + let (listen_stream, handle) = match client.from_realtime_audio(outbound).await { + Ok(res) => res, + Err(e) => { + let _ = myself.send_message(ListenerMsg::StreamStartFailed(format!("{:?}", e))); + return; + } + }; + futures_util::pin_mut!(listen_stream); + + process_stream( + listen_stream, + handle, + myself, + shutdown_rx, + session_offset_secs, + extra.clone(), + ) + .await; + } else { + let client = owhisper_client::ListenClient::builder() + .api_base(args.base_url) + .api_key(args.api_key) + .params(owhisper_interface::ListenParams { + model: Some(args.model), + languages: args.languages, + redemption_time_ms: Some(if args.onboarding { 60 } else { 400 }), + keywords: args.keywords, + ..Default::default() + }) + .build_dual(); + + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); + + let (listen_stream, handle) = match client.from_realtime_audio(outbound).await { + Ok(res) => res, + Err(e) => { + let _ = myself.send_message(ListenerMsg::StreamStartFailed(format!("{:?}", e))); + return; + } + }; + futures_util::pin_mut!(listen_stream); + + process_stream( + listen_stream, + handle, + myself, + shutdown_rx, + session_offset_secs, + extra.clone(), + ) + .await; + } + }); - let extra = Extra::default(); + Ok((tx, rx_task, shutdown_tx)) +} - loop { - tokio::select! { - _ = &mut shutdown_rx => { - handle.finalize_with_text(serde_json::json!({"type": "Finalize"}).to_string().into()).await; +async fn process_stream( + mut listen_stream: std::pin::Pin<&mut S>, + handle: hypr_ws::client::WebSocketHandle, + myself: ActorRef, + mut shutdown_rx: tokio::sync::oneshot::Receiver<()>, + offset_secs: f64, + extra: Extra, +) where + S: futures_util::Stream>, + E: std::fmt::Debug, +{ + loop { + tokio::select! { + _ = &mut shutdown_rx => { + handle.finalize_with_text(serde_json::json!({"type": "Finalize"}).to_string().into()).await; + + let finalize_timeout = tokio::time::sleep(Duration::from_secs(5)); + tokio::pin!(finalize_timeout); + + let mut received_from_finalize = false; + + loop { + tokio::select! { + _ = &mut finalize_timeout => { + tracing::warn!(timeout = true, "break_timeout"); + break; + } + result = listen_stream.next() => { + match result { + Some(Ok(mut response)) => { + let is_from_finalize = if let StreamResponse::TranscriptResponse { from_finalize, .. } = &response { + *from_finalize + } else { + false + }; + + if is_from_finalize { + received_from_finalize = true; + } - let finalize_timeout = tokio::time::sleep(Duration::from_secs(5)); - tokio::pin!(finalize_timeout); + response.apply_offset(offset_secs); + response.set_extra(&extra); - let mut received_from_finalize = false; + let _ = myself.send_message(ListenerMsg::StreamResponse(response)); - loop { - tokio::select! { - _ = &mut finalize_timeout => { - tracing::warn!(timeout = true, "break_timeout"); - break; - } - result = listen_stream.next() => { - match result { - Some(Ok(response)) => { - let is_from_finalize = if let StreamResponse::TranscriptResponse { from_finalize, .. } = &response { - *from_finalize - } else { - false - }; - - - if is_from_finalize { - received_from_finalize = true; - } - - let _ = myself.send_message(ListenerMsg::StreamResponse(response)); - - if received_from_finalize { - tracing::info!(from_finalize = true, "break_from_finalize"); - break; - } - } - Some(Err(e)) => { - tracing::warn!(error = ?e, "break_from_finalize"); - break; - } - None => { - tracing::info!(ended = true, "break_from_finalize"); + if received_from_finalize { + tracing::info!(from_finalize = true, "break_from_finalize"); break; } } + Some(Err(e)) => { + tracing::warn!(error = ?e, "break_from_finalize"); + break; + } + None => { + tracing::info!(ended = true, "break_from_finalize"); + break; + } } } } - break; } - result = tokio::time::timeout(LISTEN_STREAM_TIMEOUT, listen_stream.next()) => { - match result { - Ok(Some(Ok(mut response))) => { - if let StreamResponse::TranscriptResponse { ref mut metadata, .. } = response { - metadata.extra = Some(extra.clone().into()); - } + break; + } + result = tokio::time::timeout(LISTEN_STREAM_TIMEOUT, listen_stream.next()) => { + match result { + Ok(Some(Ok(mut response))) => { + response.apply_offset(offset_secs); + response.set_extra(&extra); - let _ = myself.send_message(ListenerMsg::StreamResponse(response)); - } - // Something went wrong while sending or receiving a websocket message. Should restart. - Ok(Some(Err(e))) => { - let _ = myself.send_message(ListenerMsg::StreamError(format!("{:?}", e))); - break; - } - // Stream ended gracefully. Safe to stop the whole session. - Ok(None) => { - let _ = myself.send_message(ListenerMsg::StreamEnded); - break; - } - // We're not hearing back any transcript. Better to stop the whole session. - Err(elapsed) => { - let _ = myself.send_message(ListenerMsg::StreamTimeout(elapsed)); - break; - } + let _ = myself.send_message(ListenerMsg::StreamResponse(response)); + } + // Something went wrong while sending or receiving a websocket message. Should restart. + Ok(Some(Err(e))) => { + let _ = myself.send_message(ListenerMsg::StreamError(format!("{:?}", e))); + break; + } + // Stream ended gracefully. Safe to stop the whole session. + Ok(None) => { + let _ = myself.send_message(ListenerMsg::StreamEnded); + break; + } + // We're not hearing back any transcript. Better to stop the whole session. + Err(elapsed) => { + let _ = myself.send_message(ListenerMsg::StreamTimeout(elapsed)); + break; } } } } - }); - - Ok((tx, rx_task, shutdown_tx)) + } } diff --git a/plugins/listener/src/actors/mod.rs b/plugins/listener/src/actors/mod.rs index 9d731a2321..a5d6253f61 100644 --- a/plugins/listener/src/actors/mod.rs +++ b/plugins/listener/src/actors/mod.rs @@ -10,6 +10,12 @@ pub use recorder::*; pub use session::*; pub use source::*; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ChannelMode { + Single, + Dual, +} + #[derive(Clone)] pub struct AudioChunk { data: Vec, diff --git a/plugins/listener/src/actors/processor.rs b/plugins/listener/src/actors/processor.rs index 731d5e6cc4..fec275c9ef 100644 --- a/plugins/listener/src/actors/processor.rs +++ b/plugins/listener/src/actors/processor.rs @@ -8,7 +8,7 @@ use ractor::{registry, Actor, ActorName, ActorProcessingErr, ActorRef}; use tauri_specta::Event; use crate::{ - actors::{AudioChunk, ListenerActor, ListenerMsg, RecMsg, RecorderActor}, + actors::{AudioChunk, ChannelMode, ListenerActor, ListenerMsg, RecMsg, RecorderActor}, SessionEvent, }; @@ -17,7 +17,8 @@ const AUDIO_AMPLITUDE_THROTTLE: Duration = Duration::from_millis(100); pub enum ProcMsg { Mic(AudioChunk), Speaker(AudioChunk), - Mixed(AudioChunk), + SetMode(ChannelMode), + Reset, } pub struct ProcArgs { @@ -32,6 +33,18 @@ pub struct ProcState { last_sent_mic: Option>, last_sent_spk: Option>, last_amp_emit: Instant, + mode: ChannelMode, +} + +impl ProcState { + fn reset_pipeline(&mut self) { + self.joiner.reset(); + self.last_sent_mic = None; + self.last_sent_spk = None; + self.agc_m = hypr_agc::Agc::default(); + self.agc_s = hypr_agc::Agc::default(); + self.last_amp_emit = Instant::now(); + } } pub struct ProcessorActor {} @@ -60,6 +73,7 @@ impl Actor for ProcessorActor { last_sent_mic: None, last_sent_spk: None, last_amp_emit: Instant::now(), + mode: ChannelMode::Dual, }) } @@ -82,15 +96,14 @@ impl Actor for ProcessorActor { st.joiner.push_spk(arc); process_ready(st).await; } - ProcMsg::Mixed(mut c) => { - st.agc_m.process(&mut c.data); - - let empty_arc = Arc::<[f32]>::from(vec![0.0; c.data.len()]); - let arc = Arc::<[f32]>::from(c.data); - - st.joiner.push_mic(empty_arc); - st.joiner.push_spk(arc); - process_ready(st).await; + ProcMsg::SetMode(mode) => { + if st.mode != mode { + st.mode = mode; + st.reset_pipeline(); + } + } + ProcMsg::Reset => { + st.reset_pipeline(); } } Ok(()) @@ -98,7 +111,7 @@ impl Actor for ProcessorActor { } async fn process_ready(st: &mut ProcState) { - while let Some((mic, spk)) = st.joiner.pop_pair() { + while let Some((mic, spk)) = st.joiner.pop_pair(st.mode) { let mut audio_sent_successfully = false; if let Some(cell) = registry::where_is(RecorderActor::name()) { @@ -115,8 +128,23 @@ async fn process_ready(st: &mut ProcState) { } if let Some(cell) = registry::where_is(ListenerActor::name()) { - let mic_bytes = hypr_audio_utils::f32_to_i16_bytes(mic.iter().copied()); - let spk_bytes = hypr_audio_utils::f32_to_i16_bytes(spk.iter().copied()); + let (mic_bytes, spk_bytes) = if st.mode == ChannelMode::Single { + let mixed: Vec = mic + .iter() + .zip(spk.iter()) + .map(|(m, s)| (m + s).clamp(-1.0, 1.0)) + .collect(); + let mixed_bytes = hypr_audio_utils::f32_to_i16_bytes(mixed.iter().copied()); + ( + hypr_audio_utils::f32_to_i16_bytes(mic.iter().copied()), + mixed_bytes, + ) + } else { + ( + hypr_audio_utils::f32_to_i16_bytes(mic.iter().copied()), + hypr_audio_utils::f32_to_i16_bytes(spk.iter().copied()), + ) + }; let actor: ActorRef = cell.into(); if actor @@ -149,19 +177,36 @@ async fn process_ready(st: &mut ProcState) { struct Joiner { mic: VecDeque>, spk: VecDeque>, + silence_cache: std::collections::HashMap>, } impl Joiner { + const MAX_LAG: usize = 4; + const MAX_QUEUE_SIZE: usize = 30; + fn new() -> Self { Self { mic: VecDeque::new(), spk: VecDeque::new(), + silence_cache: std::collections::HashMap::new(), } } + fn reset(&mut self) { + self.mic.clear(); + self.spk.clear(); + } + + fn get_silence(&mut self, len: usize) -> Arc<[f32]> { + self.silence_cache + .entry(len) + .or_insert_with(|| Arc::from(vec![0.0; len])) + .clone() + } + fn push_mic(&mut self, data: Arc<[f32]>) { self.mic.push_back(data); - if self.mic.len() > 30 { + if self.mic.len() > Self::MAX_QUEUE_SIZE { tracing::warn!("mic_queue_overflow"); self.mic.pop_front(); } @@ -169,19 +214,30 @@ impl Joiner { fn push_spk(&mut self, data: Arc<[f32]>) { self.spk.push_back(data); - if self.spk.len() > 30 { + if self.spk.len() > Self::MAX_QUEUE_SIZE { tracing::warn!("spk_queue_overflow"); self.spk.pop_front(); } } - fn pop_pair(&mut self) -> Option<(Arc<[f32]>, Arc<[f32]>)> { - if !self.mic.is_empty() && !self.spk.is_empty() { - let mic = self.mic.pop_front()?; - let spk = self.spk.pop_front()?; - Some((mic, spk)) - } else { - None + fn pop_pair(&mut self, mode: ChannelMode) -> Option<(Arc<[f32]>, Arc<[f32]>)> { + match (self.mic.front(), self.spk.front()) { + (Some(_), Some(_)) => { + let mic = self.mic.pop_front()?; + let spk = self.spk.pop_front()?; + Some((mic, spk)) + } + (Some(_), None) if mode == ChannelMode::Single || self.mic.len() > Self::MAX_LAG => { + let mic = self.mic.pop_front()?; + let spk = self.get_silence(mic.len()); + Some((mic, spk)) + } + (None, Some(_)) if self.spk.len() > Self::MAX_LAG => { + let spk = self.spk.pop_front()?; + let mic = self.get_silence(spk.len()); + Some((mic, spk)) + } + _ => None, } } } diff --git a/plugins/listener/src/actors/session.rs b/plugins/listener/src/actors/session.rs index 96fbbd7abf..5d9d3c5f20 100644 --- a/plugins/listener/src/actors/session.rs +++ b/plugins/listener/src/actors/session.rs @@ -1,3 +1,5 @@ +use std::time::{Instant, SystemTime}; + use tauri::Manager; use tauri_specta::Event; @@ -44,6 +46,8 @@ pub struct SessionState { app: tauri::AppHandle, token: CancellationToken, params: SessionParams, + started_at_instant: Instant, + started_at_system: SystemTime, } pub struct SessionActor; @@ -65,6 +69,8 @@ impl Actor for SessionActor { args: Self::Arguments, ) -> Result { let cancellation_token = CancellationToken::new(); + let started_at_instant = Instant::now(); + let started_at_system = SystemTime::now(); { use tauri_plugin_tray::TrayPluginExt; @@ -75,6 +81,8 @@ impl Actor for SessionActor { app: args.app, token: cancellation_token, params: args.params, + started_at_instant, + started_at_system, }; { @@ -306,6 +314,22 @@ impl SessionActor { session_state: &SessionState, listener_args: Option, ) -> Result, ActorProcessingErr> { + use crate::actors::ChannelMode; + + let mode = if listener_args.is_none() { + if let Some(cell) = registry::where_is(SourceActor::name()) { + let actor: ActorRef = cell.into(); + match call_t!(actor, SourceMsg::GetMode, 500) { + Ok(m) => m, + Err(_) => ChannelMode::Dual, + } + } else { + ChannelMode::Dual + } + } else { + ChannelMode::Dual + }; + let (listen_ref, _) = Actor::spawn_linked( Some(ListenerActor::name()), ListenerActor, @@ -317,6 +341,9 @@ impl SessionActor { base_url: session_state.params.base_url.clone(), api_key: session_state.params.api_key.clone(), keywords: session_state.params.keywords.clone(), + mode, + session_started_at: session_state.started_at_instant, + session_started_at_unix: session_state.started_at_system, }), supervisor, ) diff --git a/plugins/listener/src/actors/source.rs b/plugins/listener/src/actors/source.rs index 36bbdba997..1f8378d5c6 100644 --- a/plugins/listener/src/actors/source.rs +++ b/plugins/listener/src/actors/source.rs @@ -5,7 +5,7 @@ use futures_util::StreamExt; use ractor::{registry, Actor, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort}; use tokio_util::sync::CancellationToken; -use crate::actors::{AudioChunk, ProcMsg, ProcessorActor}; +use crate::actors::{AudioChunk, ChannelMode, ListenerActor, ListenerMsg, ProcMsg, ProcessorActor}; use hypr_audio::{ is_using_headphone, AudioInput, DeviceEvent, DeviceMonitor, DeviceMonitorHandle, ResampledAsyncSource, @@ -20,6 +20,7 @@ pub enum SourceMsg { GetMicMute(RpcReplyPort), SetMicDevice(Option), GetMicDevice(RpcReplyPort>), + GetMode(RpcReplyPort), } pub struct SourceArgs { @@ -38,6 +39,7 @@ pub struct SourceState { _device_monitor_handle: Option, _silence_stream_tx: Option>, _device_event_thread: Option>, + current_mode: ChannelMode, } pub struct SourceActor; @@ -117,6 +119,7 @@ impl Actor for SourceActor { _device_monitor_handle: Some(device_monitor_handle), _silence_stream_tx: silence_stream_tx, _device_event_thread: Some(device_event_thread), + current_mode: ChannelMode::Dual, }; start_source_loop(&myself, &mut st).await?; @@ -155,6 +158,11 @@ impl Actor for SourceActor { } start_source_loop(&myself, st).await?; } + SourceMsg::GetMode(reply) => { + if !reply.is_closed() { + let _ = reply.send(st.current_mode); + } + } } Ok(()) @@ -189,12 +197,38 @@ async fn start_source_loop( st.stream_cancel_token = Some(stream_cancel_token.clone()); #[cfg(target_os = "macos")] - let use_mixed = !st.onboarding && !is_using_headphone(); + let new_mode = if !st.onboarding && !is_using_headphone() { + ChannelMode::Single + } else { + ChannelMode::Dual + }; #[cfg(not(target_os = "macos"))] - let use_mixed = false; + let new_mode = ChannelMode::Dual; + + let mode_changed = st.current_mode != new_mode; + st.current_mode = new_mode; + + tracing::info!(?new_mode, mode_changed, "start_source_loop"); + + if let Some(cell) = registry::where_is(ProcessorActor::name()) { + let actor: ActorRef = cell.into(); + let _ = actor.cast(ProcMsg::Reset); + } + + if mode_changed { + if let Some(cell) = registry::where_is(ProcessorActor::name()) { + let actor: ActorRef = cell.into(); + let _ = actor.cast(ProcMsg::SetMode(new_mode)); + } + + if let Some(cell) = registry::where_is(ListenerActor::name()) { + let actor: ActorRef = cell.into(); + let _ = actor.cast(ListenerMsg::ChangeMode(new_mode)); + } + } - tracing::info!(use_mixed = use_mixed); + let use_mixed = new_mode == ChannelMode::Single; let handle = if use_mixed { #[cfg(target_os = "macos")] @@ -236,23 +270,21 @@ async fn start_source_loop( } mic_next = mic_stream.next() => { if let Some(data) = mic_next { - if mic_muted.load(Ordering::Relaxed) { - let msg = ProcMsg::Mic(AudioChunk{ data: vec![0.0; data.len()] }); - let _ = proc.cast(msg); + let output_data = if mic_muted.load(Ordering::Relaxed) { + vec![0.0; data.len()] } else { - let msg = ProcMsg::Mixed(AudioChunk{ data }); - let _ = proc.cast(msg); - } + data + }; + let msg = ProcMsg::Mic(AudioChunk { data: output_data }); + let _ = proc.cast(msg); } else { break; } } spk_next = spk_stream.next() => { if let Some(data) = spk_next { - if mic_muted.load(Ordering::Relaxed) { - let msg = ProcMsg::Speaker(AudioChunk{ data }); - let _ = proc.cast(msg); - } + let msg = ProcMsg::Speaker(AudioChunk{ data }); + let _ = proc.cast(msg); } else { break; } From 1c0d980fb22612d5b769f64f8fddf9d55fab1a79 Mon Sep 17 00:00:00 2001 From: Yujong Lee Date: Tue, 4 Nov 2025 17:58:16 +0900 Subject: [PATCH 2/2] bunch of fixes including the new segmenting logic --- .../sessions/note-input/transcript/editor.tsx | 30 +- .../sessions/note-input/transcript/index.tsx | 4 +- .../note-input/transcript/shared/index.tsx | 17 +- .../transcript/shared/operations.tsx | 4 + .../transcript/shared/segment-header.tsx | 96 +++++- .../src/devtool/seed/data/curated.json | 110 +++++++ apps/desktop/src/utils/segment.test.ts | 273 ++++++++++++++++ apps/desktop/src/utils/segment.ts | 305 +++++++++++++----- apps/desktop/src/utils/speaker-hints.ts | 56 ++-- 9 files changed, 781 insertions(+), 114 deletions(-) create mode 100644 apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/operations.tsx diff --git a/apps/desktop/src/components/main/body/sessions/note-input/transcript/editor.tsx b/apps/desktop/src/components/main/body/sessions/note-input/transcript/editor.tsx index 10d0df055d..7c0633e839 100644 --- a/apps/desktop/src/components/main/body/sessions/note-input/transcript/editor.tsx +++ b/apps/desktop/src/components/main/body/sessions/note-input/transcript/editor.tsx @@ -1,6 +1,7 @@ import { useCallback } from "react"; import * as main from "../../../../../../store/tinybase/main"; +import { id } from "../../../../../../utils"; import { TranscriptContainer } from "./shared"; export function TranscriptEditor({ sessionId }: { sessionId: string }) { @@ -27,10 +28,37 @@ export function TranscriptEditor({ sessionId }: { sessionId: string }) { checkpoints.addCheckpoint("delete_word"); }, [store, indexes, checkpoints]); + const handleAssignSpeaker = useCallback((wordIds: string[], humanId: string) => { + if (!store || !checkpoints) { + return; + } + + wordIds.forEach((wordId) => { + const word = store.getRow("words", wordId); + if (!word || typeof word.transcript_id !== "string") { + return; + } + + const hintId = id(); + store.setRow("speaker_hints", hintId, { + transcript_id: word.transcript_id, + word_id: wordId, + type: "user_speaker_assignment", + value: JSON.stringify({ human_id: humanId }), + created_at: new Date().toISOString(), + }); + }); + + checkpoints.addCheckpoint("assign_speaker"); + }, [store, checkpoints]); + return ( ); } diff --git a/apps/desktop/src/components/main/body/sessions/note-input/transcript/index.tsx b/apps/desktop/src/components/main/body/sessions/note-input/transcript/index.tsx index 73e4dec31b..e30c398626 100644 --- a/apps/desktop/src/components/main/body/sessions/note-input/transcript/index.tsx +++ b/apps/desktop/src/components/main/body/sessions/note-input/transcript/index.tsx @@ -1,16 +1,18 @@ import { useCallback, useMemo, useRef, useState } from "react"; import { cn } from "@hypr/utils"; +import { useListener } from "../../../../../../contexts/listener"; import * as main from "../../../../../../store/tinybase/main"; import { TranscriptEditor } from "./editor"; import { TranscriptViewer } from "./viewer"; export function Transcript({ sessionId }: { sessionId: string }) { + const inactive = useListener((state) => state.status === "inactive"); const [isEditing, setIsEditing] = useState(false); return (
- + {inactive && }
{isEditing diff --git a/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/index.tsx b/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/index.tsx index 6f8455fdcf..7b2cf60b9c 100644 --- a/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/index.tsx +++ b/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/index.tsx @@ -18,18 +18,15 @@ import { SegmentWord, } from "../../../../../../../utils/segment"; import { convertStorageHintsToRuntime } from "../../../../../../../utils/speaker-hints"; +import { Operations } from "./operations"; import { SegmentHeader } from "./segment-header"; -type WordOperations = { - onDeleteWord?: (wordId: string) => void; -}; - export function TranscriptContainer({ sessionId, operations, }: { sessionId: string; - operations?: WordOperations; + operations?: Operations; }) { const transcriptIds = main.UI.useSliceRowIds( main.INDEXES.transcriptBySession, @@ -100,7 +97,7 @@ function TranscriptSeparator() { ])} >
- Restarted + ~ ~ ~ ~ ~ ~ ~ ~ ~
); @@ -116,7 +113,7 @@ function RenderTranscript( transcriptId: string; partialWords: PartialWord[]; partialHints: RuntimeSpeakerHint[]; - operations?: WordOperations; + operations?: Operations; }, ) { const finalWords = useFinalWords(transcriptId); @@ -165,7 +162,7 @@ export function SegmentRenderer( segment: Segment; offsetMs: number; transcriptId: string; - operations?: WordOperations; + operations?: Operations; }, ) { const { time, seek, start, audioExists } = useAudioPlayer(); @@ -183,7 +180,7 @@ export function SegmentRenderer( return (
- +
{segment.words.map((word, idx) => { @@ -224,7 +221,7 @@ function WordSpan({ word: SegmentWord; highlightState: "current" | "buffer" | "none"; audioExists: boolean; - operations?: WordOperations; + operations?: Operations; onSeekAndPlay: () => void; }) { const mode = operations && Object.keys(operations).length > 0 ? "editor" : "viewer"; diff --git a/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/operations.tsx b/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/operations.tsx new file mode 100644 index 0000000000..c9f61ec1d4 --- /dev/null +++ b/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/operations.tsx @@ -0,0 +1,4 @@ +export type Operations = { + onDeleteWord?: (wordId: string) => void; + onAssignSpeaker?: (wordIds: string[], humanId: string) => void; +}; diff --git a/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/segment-header.tsx b/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/segment-header.tsx index d8375b0457..58c396443e 100644 --- a/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/segment-header.tsx +++ b/apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/segment-header.tsx @@ -1,10 +1,21 @@ import chroma from "chroma-js"; import { useCallback, useMemo } from "react"; +import { + ContextMenu, + ContextMenuContent, + ContextMenuItem, + ContextMenuSub, + ContextMenuSubContent, + ContextMenuSubTrigger, + ContextMenuTrigger, +} from "@hypr/ui/components/ui/context-menu"; import { cn } from "@hypr/utils"; -import type { Segment } from "../../../../../../../utils/segment"; +import * as main from "../../../../../../../store/tinybase/main"; +import { ChannelProfile, type Segment } from "../../../../../../../utils/segment"; +import { Operations } from "./operations"; -export function SegmentHeader({ segment }: { segment: Segment }) { +export function SegmentHeader({ segment, operations }: { segment: Segment; operations?: Operations }) { const formatTimestamp = useCallback((ms: number): string => { const totalSeconds = Math.floor(ms / 1000); const hours = Math.floor(totalSeconds / 3600); @@ -30,9 +41,24 @@ export function SegmentHeader({ segment }: { segment: Segment }) { return `${from} - ${to}`; }, [segment.words.length, formatTimestamp]); - const colors = useSegmentColors(segment.key); + const color = useSegmentColor(segment.key); + const label = useSpeakerLabel(segment.key); + const humans = main.UI.useRowIds("humans", main.STORE_ID) ?? []; + const store = main.UI.useStore(main.STORE_ID); - return ( + const mode = operations && Object.keys(operations).length > 0 ? "editor" : "viewer"; + const wordIds = segment.words.filter((w) => w.id).map((w) => w.id!); + + const handleAssignSpeaker = useCallback( + (humanId: string) => { + if (wordIds.length > 0 && operations?.onAssignSpeaker) { + operations.onAssignSpeaker(wordIds, humanId); + } + }, + [wordIds, operations], + ); + + const headerContent = (

- {colors.label} + {label} {timestamp}

); + + if (mode === "editor" && wordIds.length > 0) { + return ( + + + {headerContent} + + + + Assign Speaker + + {humans.map((humanId) => { + const human = store?.getRow("humans", humanId); + const name = human?.name || humanId; + return ( + handleAssignSpeaker(humanId)}> + {name} + + ); + })} + {humans.length === 0 && No speakers available} + + + + + ); + } + + return headerContent; } -function useSegmentColors(key: Segment["key"]) { +function useSegmentColor(key: Segment["key"]) { return useMemo(() => { const speakerIndex = key.speaker_index ?? 0; @@ -64,9 +120,29 @@ function useSegmentColors(key: Segment["key"]) { const light = 0.55; const chromaVal = 0.15; - return { - color: chroma.oklch(light, chromaVal, hue).hex(), - label: key.speaker_index !== undefined ? `Speaker ${key.speaker_index + 1}` : `Speaker ${key.channel}`, - }; + return chroma.oklch(light, chromaVal, hue).hex(); }, [key]); } + +function useSpeakerLabel(key: Segment["key"]) { + const store = main.UI.useStore(main.STORE_ID); + + return useMemo(() => { + if (key.speaker_human_id && store) { + const human = store.getRow("humans", key.speaker_human_id); + if (human?.name) { + return human.name as string; + } + } + + const channelLabel = key.channel === ChannelProfile.DirectMic + ? "A" + : key.channel === ChannelProfile.RemoteParty + ? "B" + : "C"; + + return key.speaker_index !== undefined + ? `Speaker ${key.speaker_index + 1}` + : `Speaker ${channelLabel}`; + }, [key, store]); +} diff --git a/apps/desktop/src/devtool/seed/data/curated.json b/apps/desktop/src/devtool/seed/data/curated.json index 9826037aeb..ff253b02ab 100644 --- a/apps/desktop/src/devtool/seed/data/curated.json +++ b/apps/desktop/src/devtool/seed/data/curated.json @@ -2671,6 +2671,116 @@ } ] } + }, + { + "title": "Quick Check-in", + "raw_md": "Quick conversation between colleagues.", + "enhanced_md": "Quick conversation between colleagues.", + "folder": null, + "event": null, + "participants": [ + "Sarah Chen", + "Michael Rodriguez" + ], + "tags": [], + "transcript": { + "segments": [ + { + "channel": 0, + "start_ms": 0, + "end_ms": 2000, + "text": " How is the project going?", + "words": [ + { + "text": " How", + "start_ms": 0, + "end_ms": 400 + }, + { + "text": " is", + "start_ms": 400, + "end_ms": 600 + }, + { + "text": " the", + "start_ms": 600, + "end_ms": 800 + }, + { + "text": " project", + "start_ms": 800, + "end_ms": 1400 + }, + { + "text": " going", + "start_ms": 1400, + "end_ms": 2000 + } + ] + }, + { + "channel": 1, + "start_ms": 4100, + "end_ms": 6100, + "text": " It's going really well, thanks!", + "words": [ + { + "text": " It's", + "start_ms": 4100, + "end_ms": 4500 + }, + { + "text": " going", + "start_ms": 4500, + "end_ms": 4900 + }, + { + "text": " really", + "start_ms": 4900, + "end_ms": 5300 + }, + { + "text": " well", + "start_ms": 5300, + "end_ms": 5700 + }, + { + "text": " thanks", + "start_ms": 5700, + "end_ms": 6100 + } + ] + }, + { + "channel": 0, + "start_ms": 8200, + "end_ms": 9800, + "text": " That's great to hear!", + "words": [ + { + "text": " That's", + "start_ms": 8200, + "end_ms": 8600 + }, + { + "text": " great", + "start_ms": 8600, + "end_ms": 9000 + }, + { + "text": " to", + "start_ms": 9000, + "end_ms": 9200 + }, + { + "text": " hear", + "start_ms": 9200, + "end_ms": 9800 + } + ] + } + ] + } } ], "chat_groups": [ diff --git a/apps/desktop/src/utils/segment.test.ts b/apps/desktop/src/utils/segment.test.ts index 5852bd3078..ac9e474c02 100644 --- a/apps/desktop/src/utils/segment.test.ts +++ b/apps/desktop/src/utils/segment.test.ts @@ -213,6 +213,279 @@ describe("buildSegments", () => { }), ], }, + { + name: "propagates human id across shared speaker index", + finalWords: [ + { text: "hi", start_ms: 0, end_ms: 100, channel: 0 }, + { text: "there", start_ms: 200, end_ms: 300, channel: 0 }, + ], + partialWords: [], + speakerHints: [ + { wordIndex: 0, data: { type: "provider_speaker_index" as const, speaker_index: 1 } }, + { wordIndex: 1, data: { type: "provider_speaker_index" as const, speaker_index: 1 } }, + { wordIndex: 1, data: { type: "user_speaker_assignment" as const, human_id: "alice" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_index: 1, speaker_human_id: "alice" }), + words: [ + expect.objectContaining({ text: "hi" }), + expect.objectContaining({ text: "there" }), + ], + }), + ], + }, + { + name: "infers human id for partial words via last known speaker", + finalWords: [ + { text: "final", start_ms: 0, end_ms: 100, channel: 0 }, + ], + partialWords: [ + { text: "partial", start_ms: 150, end_ms: 200, channel: 0 }, + ], + speakerHints: [ + { wordIndex: 0, data: { type: "provider_speaker_index" as const, speaker_index: 2 } }, + { wordIndex: 0, data: { type: "user_speaker_assignment" as const, human_id: "bob" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_index: 2, speaker_human_id: "bob" }), + words: [ + expect.objectContaining({ text: "final" }), + expect.objectContaining({ text: "partial" }), + ], + }), + ], + }, + { + name: "splits segments when human id changes for same speaker index", + finalWords: [ + { text: "first", start_ms: 0, end_ms: 100, channel: 0 }, + { text: "second", start_ms: 150, end_ms: 250, channel: 0 }, + ], + partialWords: [], + speakerHints: [ + { wordIndex: 0, data: { type: "provider_speaker_index" as const, speaker_index: 0 } }, + { wordIndex: 0, data: { type: "user_speaker_assignment" as const, human_id: "alice" } }, + { wordIndex: 1, data: { type: "provider_speaker_index" as const, speaker_index: 0 } }, + { wordIndex: 1, data: { type: "user_speaker_assignment" as const, human_id: "bob" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_index: 0, speaker_human_id: "alice" }), + words: [expect.objectContaining({ text: "first" })], + }), + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_index: 0, speaker_human_id: "bob" }), + words: [expect.objectContaining({ text: "second" })], + }), + ], + }, + { + name: "auto-assign based on provider speaker index", + finalWords: [ + { text: "1", start_ms: 0, end_ms: 100, channel: 0 }, + { text: "2", start_ms: 100, end_ms: 200, channel: 1 }, + { text: "3", start_ms: 200, end_ms: 300, channel: 0 }, + ], + partialWords: [], + speakerHints: [ + { wordIndex: 0, data: { type: "provider_speaker_index" as const, speaker_index: 0 } }, + { wordIndex: 1, data: { type: "provider_speaker_index" as const, speaker_index: 1 } }, + { wordIndex: 2, data: { type: "provider_speaker_index" as const, speaker_index: 0 } }, + { wordIndex: 0, data: { type: "user_speaker_assignment" as const, human_id: "bob" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_index: 0, speaker_human_id: "bob" }), + words: [expect.objectContaining({ text: "1" })], + }), + expect.objectContaining({ + key: SegmentKey.make({ channel: 1, speaker_index: 1 }), + words: [expect.objectContaining({ text: "2" })], + }), + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_index: 0, speaker_human_id: "bob" }), + words: [expect.objectContaining({ text: "3" })], + }), + ], + }, + { + name: "handles partial-only stream with speaker hints", + finalWords: [], + partialWords: [ + { text: "hello", start_ms: 0, end_ms: 80, channel: 0 }, + { text: "world", start_ms: 120, end_ms: 200, channel: 0 }, + ], + speakerHints: [ + { wordIndex: 0, data: { type: "provider_speaker_index" as const, speaker_index: 3 } }, + { wordIndex: 0, data: { type: "user_speaker_assignment" as const, human_id: "alice" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_index: 3, speaker_human_id: "alice" }), + words: [ + expect.objectContaining({ text: "hello", isFinal: false }), + expect.objectContaining({ text: "world", isFinal: false }), + ], + }), + ], + }, + { + name: "applies speaker hints targeting partial word indexes", + finalWords: [ + { text: "final", start_ms: 0, end_ms: 90, channel: 0 }, + ], + partialWords: [ + { text: "partial", start_ms: 140, end_ms: 220, channel: 0 }, + ], + speakerHints: [ + { wordIndex: 1, data: { type: "provider_speaker_index" as const, speaker_index: 4 } }, + { wordIndex: 1, data: { type: "user_speaker_assignment" as const, human_id: "alice" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0 }), + words: [expect.objectContaining({ text: "final", isFinal: true })], + }), + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_index: 4, speaker_human_id: "alice" }), + words: [expect.objectContaining({ text: "partial", isFinal: false })], + }), + ], + }, + { + name: "merges using human assignment without provider index", + finalWords: [ + { text: "alpha", start_ms: 0, end_ms: 100, channel: 0 }, + { text: "beta", start_ms: 140, end_ms: 240, channel: 0 }, + ], + partialWords: [], + speakerHints: [ + { wordIndex: 0, data: { type: "user_speaker_assignment" as const, human_id: "alice" } }, + { wordIndex: 1, data: { type: "user_speaker_assignment" as const, human_id: "alice" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_human_id: "alice" }), + words: [ + expect.objectContaining({ text: "alpha", isFinal: true }), + expect.objectContaining({ text: "beta", isFinal: true }), + ], + }), + ], + }, + { + name: "propagates human assignment to partial words without speaker index", + finalWords: [{ text: "final", start_ms: 0, end_ms: 50, channel: 0 }], + partialWords: [{ text: "partial", start_ms: 100, end_ms: 150, channel: 0 }], + speakerHints: [{ wordIndex: 0, data: { type: "user_speaker_assignment" as const, human_id: "alice" } }], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_human_id: "alice" }), + words: [ + expect.objectContaining({ text: "final", isFinal: true }), + expect.objectContaining({ text: "partial", isFinal: false }), + ], + }), + ], + }, + { + name: "splits segments when channel-only human assignment changes", + finalWords: [ + { text: "alice", start_ms: 0, end_ms: 50, channel: 0 }, + { text: "bob", start_ms: 120, end_ms: 170, channel: 0 }, + ], + partialWords: [], + speakerHints: [ + { wordIndex: 0, data: { type: "user_speaker_assignment" as const, human_id: "alice" } }, + { wordIndex: 1, data: { type: "user_speaker_assignment" as const, human_id: "bob" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_human_id: "alice" }), + words: [expect.objectContaining({ text: "alice" })], + }), + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_human_id: "bob" }), + words: [expect.objectContaining({ text: "bob" })], + }), + ], + }, + { + name: "retains human assignment across partial-only stream without speaker index", + finalWords: [], + partialWords: [ + { text: "hello", start_ms: 0, end_ms: 80, channel: 1 }, + { text: "again", start_ms: 120, end_ms: 200, channel: 1 }, + ], + speakerHints: [ + { wordIndex: 0, data: { type: "user_speaker_assignment" as const, human_id: "carol" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 1, speaker_human_id: "carol" }), + words: [ + expect.objectContaining({ text: "hello", isFinal: false }), + expect.objectContaining({ text: "again", isFinal: false }), + ], + }), + ], + }, + { + name: "propagates DirectMic channel identity to all channel 0 words", + finalWords: [ + { text: " How", start_ms: 0, end_ms: 400, channel: 0 }, + { text: " is", start_ms: 400, end_ms: 600, channel: 0 }, + { text: " the", start_ms: 600, end_ms: 800, channel: 0 }, + { text: " project", start_ms: 800, end_ms: 1400, channel: 0 }, + { text: " going", start_ms: 1400, end_ms: 2000, channel: 0 }, + { text: " It's", start_ms: 4100, end_ms: 4500, channel: 1 }, + { text: " going", start_ms: 4500, end_ms: 4900, channel: 1 }, + { text: " really", start_ms: 4900, end_ms: 5300, channel: 1 }, + { text: " well", start_ms: 5300, end_ms: 5700, channel: 1 }, + { text: " thanks", start_ms: 5700, end_ms: 6100, channel: 1 }, + { text: " That's", start_ms: 8200, end_ms: 8600, channel: 0 }, + { text: " great", start_ms: 8600, end_ms: 9000, channel: 0 }, + { text: " to", start_ms: 9000, end_ms: 9200, channel: 0 }, + { text: " hear", start_ms: 9200, end_ms: 9800, channel: 0 }, + ], + partialWords: [], + speakerHints: [ + { wordIndex: 0, data: { type: "user_speaker_assignment" as const, human_id: "carol" } }, + ], + expected: [ + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_human_id: "carol" }), + words: [ + expect.objectContaining({ text: " How", isFinal: true }), + expect.objectContaining({ text: " is", isFinal: true }), + expect.objectContaining({ text: " the", isFinal: true }), + expect.objectContaining({ text: " project", isFinal: true }), + expect.objectContaining({ text: " going", isFinal: true }), + ], + }), + expect.objectContaining({ + key: SegmentKey.make({ channel: 1 }), + words: [ + expect.objectContaining({ text: " It's", isFinal: true }), + expect.objectContaining({ text: " going", isFinal: true }), + expect.objectContaining({ text: " really", isFinal: true }), + expect.objectContaining({ text: " well", isFinal: true }), + expect.objectContaining({ text: " thanks", isFinal: true }), + ], + }), + expect.objectContaining({ + key: SegmentKey.make({ channel: 0, speaker_human_id: "carol" }), + words: [ + expect.objectContaining({ text: " That's", isFinal: true }), + expect.objectContaining({ text: " great", isFinal: true }), + expect.objectContaining({ text: " to", isFinal: true }), + expect.objectContaining({ text: " hear", isFinal: true }), + ], + }), + ], + }, ]; test.each(testCases)("$name", ({ finalWords, partialWords, speakerHints, expected }) => { diff --git a/apps/desktop/src/utils/segment.ts b/apps/desktop/src/utils/segment.ts index 01feb1ee4e..8437eeec43 100644 --- a/apps/desktop/src/utils/segment.ts +++ b/apps/desktop/src/utils/segment.ts @@ -1,10 +1,18 @@ -import { Data, Equal, HashMap, Option } from "effect"; +import { Data, Schema } from "effect"; + +export enum ChannelProfile { + DirectMic = 0, + RemoteParty = 1, + MixedCapture = 2, +} + +export const ChannelProfileSchema = Schema.Enums(ChannelProfile); export type WordLike = { text: string; start_ms: number; end_ms: number; - channel: number; + channel: ChannelProfile; }; export type PartialWord = WordLike; @@ -20,25 +28,35 @@ export type RuntimeSpeakerHint = { data: SpeakerHintData; }; -export function getSpeakerIndex(hint: RuntimeSpeakerHint): number | undefined { - if (hint.data.type === "provider_speaker_index") { - return hint.data.speaker_index; - } - return undefined; -} - export type Segment = { key: SegmentKey; words: TWord[]; }; export type SegmentKey = { - readonly channel: number; + readonly channel: ChannelProfile; readonly speaker_index?: number; + readonly speaker_human_id?: string; }; export const SegmentKey = { - make: (params: { channel: number; speaker_index?: number }): SegmentKey => Data.struct(params), + make: ( + params: { channel: ChannelProfile } & Partial<{ speaker_index: number; speaker_human_id: string }>, + ): SegmentKey => Data.struct(params), +}; + +const MAX_GAP_MS = 2000; + +type SpeakerIdentity = { + speaker_index?: number; + human_id?: string; +}; + +type SpeakerState = { + assignmentByWordIndex: Map; + humanIdBySpeakerIndex: Map; + humanIdByChannel: Map; + lastSpeakerByChannel: Map; }; export function buildSegments< @@ -49,92 +67,231 @@ export function buildSegments< partialWords: readonly TPartial[], speakerHints: readonly RuntimeSpeakerHint[] = [], ): Segment[] { - const allWords: SegmentWord[] = [ - ...finalWords.map((word) => ({ - text: word.text, - start_ms: word.start_ms, - end_ms: word.end_ms, - channel: word.channel, - isFinal: true, - ...("id" in word && word.id ? { id: word.id as string } : {}), - })), - ...partialWords.map((word) => ({ - text: word.text, - start_ms: word.start_ms, - end_ms: word.end_ms, - channel: word.channel, - isFinal: false, - ...("id" in word && word.id ? { id: word.id as string } : {}), - })), - ].sort((a, b) => a.start_ms - b.start_ms); - - return createSpeakerTurns(allWords, speakerHints); + const words = normalizeWords(finalWords, partialWords); + return segmentWords(words, speakerHints); } -function createSpeakerTurns( - words: TWord[], +function segmentWords( + words: readonly TWord[], speakerHints: readonly RuntimeSpeakerHint[], ): Segment[] { - const MAX_GAP_MS = 2000; - if (words.length === 0) { return []; } - const speakerByIndex = new Map(); - speakerHints.forEach((hint) => { - const speakerIndex = getSpeakerIndex(hint); - if (speakerIndex !== undefined) { - speakerByIndex.set(hint.wordIndex, speakerIndex); - } + const state = createSpeakerState(speakerHints); + const segments: Segment[] = []; + const activeSegments = new Map>(); + + words.forEach((word, index) => { + const key = resolveSegmentKey(index, word, state); + placeWordInSegment(word, key, segments, activeSegments); }); - const segments: Segment[] = []; - let currentActiveSegment = HashMap.empty>(); - const lastSpeakerByChannel = new Map(); + return segments; +} - for (let i = 0; i < words.length; i++) { - const word = words[i]; - const explicitSpeaker = speakerByIndex.get(i); +function createSpeakerState(speakerHints: readonly RuntimeSpeakerHint[]): SpeakerState { + const assignmentByWordIndex = new Map(); + const humanIdBySpeakerIndex = new Map(); + const humanIdByChannel = new Map(); + const lastSpeakerByChannel = new Map(); - const speakerIndex = explicitSpeaker ?? (!word.isFinal ? lastSpeakerByChannel.get(word.channel) : undefined); + for (const hint of speakerHints) { + const current = assignmentByWordIndex.get(hint.wordIndex) ?? {}; + if (hint.data.type === "provider_speaker_index") { + current.speaker_index = hint.data.speaker_index; + } else { + current.human_id = hint.data.human_id; + } + assignmentByWordIndex.set(hint.wordIndex, { ...current }); - if (typeof explicitSpeaker === "number") { - lastSpeakerByChannel.set(word.channel, explicitSpeaker); + if (current.speaker_index !== undefined && current.human_id !== undefined) { + humanIdBySpeakerIndex.set(current.speaker_index, current.human_id); } + } + + return { + assignmentByWordIndex, + humanIdBySpeakerIndex, + humanIdByChannel, + lastSpeakerByChannel, + }; +} + +function resolveSegmentKey( + wordIndex: number, + word: TWord, + state: SpeakerState, +): SegmentKey { + const assignment = state.assignmentByWordIndex.get(wordIndex); + const identity = resolveSpeakerIdentity(word, assignment, state); + rememberIdentity(word, assignment, identity, state); + + const params: { + channel: ChannelProfile; + speaker_index?: number; + speaker_human_id?: string; + } = { channel: word.channel }; + + if (identity.speaker_index !== undefined) { + params.speaker_index = identity.speaker_index; + } + + if (identity.human_id !== undefined) { + params.speaker_human_id = identity.human_id; + } + + return SegmentKey.make(params); +} + +function resolveSpeakerIdentity( + word: TWord, + assignment: SpeakerIdentity | undefined, + state: SpeakerState, +): SpeakerIdentity { + const identity: SpeakerIdentity = { + speaker_index: assignment?.speaker_index, + human_id: assignment?.human_id, + }; + + if (identity.speaker_index !== undefined && identity.human_id === undefined) { + identity.human_id = state.humanIdBySpeakerIndex.get(identity.speaker_index); + } - const key = SegmentKey.make({ channel: word.channel, speaker_index: speakerIndex }); - const currentOption = HashMap.get(currentActiveSegment, key); + if (identity.human_id === undefined && word.channel === ChannelProfile.DirectMic) { + identity.human_id = state.humanIdByChannel.get(ChannelProfile.DirectMic); + } - if (Option.isSome(currentOption) && key.speaker_index !== undefined) { - const lastSegment = segments[segments.length - 1]; - if (!lastSegment || !Equal.equals(lastSegment.key, key)) { - const newSegment = { key, words: [word] }; - currentActiveSegment = HashMap.set(currentActiveSegment, key, newSegment); - segments.push(newSegment); - continue; + if (!word.isFinal && (identity.speaker_index === undefined || identity.human_id === undefined)) { + const last = state.lastSpeakerByChannel.get(word.channel); + if (last) { + if (identity.speaker_index === undefined) { + identity.speaker_index = last.speaker_index; + } + if (identity.human_id === undefined) { + identity.human_id = last.human_id; } } + } + + return identity; +} - if (Option.isNone(currentOption)) { - const newSegment = { key, words: [word] }; - currentActiveSegment = HashMap.set(currentActiveSegment, key, newSegment); - segments.push(newSegment); - continue; +function rememberIdentity( + word: TWord, + assignment: SpeakerIdentity | undefined, + identity: SpeakerIdentity, + state: SpeakerState, +): void { + const hasExplicitAssignment = assignment !== undefined + && (assignment.speaker_index !== undefined || assignment.human_id !== undefined); + + if (identity.speaker_index !== undefined && identity.human_id !== undefined) { + state.humanIdBySpeakerIndex.set(identity.speaker_index, identity.human_id); + } + + if (word.channel === ChannelProfile.DirectMic && identity.human_id !== undefined) { + state.humanIdByChannel.set(ChannelProfile.DirectMic, identity.human_id); + } + + if ( + !word.isFinal + || identity.speaker_index !== undefined + || hasExplicitAssignment + ) { + if (identity.speaker_index !== undefined || identity.human_id !== undefined) { + state.lastSpeakerByChannel.set(word.channel, { ...identity }); } + } +} - const current = currentOption.value; - const lastWord = current.words[current.words.length - 1]; - const gap = word.start_ms - lastWord.end_ms; +function placeWordInSegment( + word: TWord, + key: SegmentKey, + segments: Segment[], + activeSegments: Map>, +): void { + const segmentId = segmentKeyId(key); + const existing = activeSegments.get(segmentId); - if (gap <= MAX_GAP_MS) { - current.words.push(word); - } else { - const newSegment = { key, words: [word] }; - currentActiveSegment = HashMap.set(currentActiveSegment, key, newSegment); - segments.push(newSegment); + if (existing && canExtend(existing, key, word, segments)) { + existing.words.push(word); + return; + } + + if (word.isFinal && !hasSpeakerIdentity(key)) { + for (const [id, segment] of activeSegments) { + if (!hasSpeakerIdentity(segment.key) && segment.key.channel === key.channel) { + if (canExtend(segment, segment.key, word, segments)) { + segment.words.push(word); + activeSegments.set(segmentId, segment); + activeSegments.set(id, segment); + return; + } + } } } - return segments; + const newSegment: Segment = { key, words: [word] }; + segments.push(newSegment); + activeSegments.set(segmentId, newSegment); +} + +function canExtend( + existingSegment: Segment, + candidateKey: SegmentKey, + word: TWord, + segments: Segment[], +): boolean { + if (hasSpeakerIdentity(candidateKey)) { + const lastSegment = segments[segments.length - 1]; + if (!lastSegment || !sameKey(lastSegment.key, candidateKey)) { + return false; + } + } + + const lastWord = existingSegment.words[existingSegment.words.length - 1]; + return word.start_ms - lastWord.end_ms <= MAX_GAP_MS; +} + +function hasSpeakerIdentity(key: SegmentKey): boolean { + return key.speaker_index !== undefined || key.speaker_human_id !== undefined; +} + +function sameKey(a: SegmentKey, b: SegmentKey): boolean { + return ( + a.channel === b.channel + && a.speaker_index === b.speaker_index + && a.speaker_human_id === b.speaker_human_id + ); +} + +function segmentKeyId(key: SegmentKey): string { + return JSON.stringify([key.channel, key.speaker_index ?? null, key.speaker_human_id ?? null]); +} + +function normalizeWords( + finalWords: readonly TFinal[], + partialWords: readonly TPartial[], +): SegmentWord[] { + const finalNormalized = finalWords.map((word) => ({ + text: word.text, + start_ms: word.start_ms, + end_ms: word.end_ms, + channel: word.channel, + isFinal: true, + ...("id" in word && word.id ? { id: word.id as string } : {}), + })); + + const partialNormalized = partialWords.map((word) => ({ + text: word.text, + start_ms: word.start_ms, + end_ms: word.end_ms, + channel: word.channel, + isFinal: false, + ...("id" in word && word.id ? { id: word.id as string } : {}), + })); + + return [...finalNormalized, ...partialNormalized].sort((a, b) => a.start_ms - b.start_ms); } diff --git a/apps/desktop/src/utils/speaker-hints.ts b/apps/desktop/src/utils/speaker-hints.ts index 76096fcd34..cb5846ae1b 100644 --- a/apps/desktop/src/utils/speaker-hints.ts +++ b/apps/desktop/src/utils/speaker-hints.ts @@ -5,24 +5,6 @@ import type { RuntimeSpeakerHint } from "./segment"; export type { ProviderSpeakerIndexHint }; -export const parseProviderSpeakerIndex = (raw: unknown): ProviderSpeakerIndexHint | undefined => { - if (raw == null) { - return undefined; - } - - const data = typeof raw === "string" - ? (() => { - try { - return JSON.parse(raw); - } catch { - return undefined; - } - })() - : raw; - - return providerSpeakerIndexSchema.safeParse(data).data; -}; - export function convertStorageHintsToRuntime( storageHints: SpeakerHintStorage[], wordIdToIndex: Map, @@ -52,8 +34,46 @@ export function convertStorageHintsToRuntime( }, }); } + } else if (hint.type === "user_speaker_assignment") { + const data = typeof hint.value === "string" + ? (() => { + try { + return JSON.parse(hint.value); + } catch { + return undefined; + } + })() + : hint.value; + + if (data && typeof data === "object" && "human_id" in data && typeof data.human_id === "string") { + hints.push({ + wordIndex, + data: { + type: "user_speaker_assignment", + human_id: data.human_id, + }, + }); + } } }); return hints; } + +const parseProviderSpeakerIndex = (raw: unknown): ProviderSpeakerIndexHint | undefined => { + if (raw == null) { + return undefined; + } + + const data = typeof raw === "string" + ? (() => { + try { + return JSON.parse(raw); + } catch { + return undefined; + } + })() + : raw; + + return providerSpeakerIndexSchema.safeParse(data).data; +};