Skip to content

Migrate to actor for audio processing#1457

Merged
yujonglee merged 12 commits intomainfrom
migrate-to-actor
Sep 10, 2025
Merged

Migrate to actor for audio processing#1457
yujonglee merged 12 commits intomainfrom
migrate-to-actor

Conversation

@yujonglee
Copy link
Contributor

No description provided.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 9, 2025

📝 Walkthrough

Walkthrough

Replaces the listener FSM with an actor-based architecture (ractor): adds SessionSupervisor, SourceActor, AudioProcessor, Recorder, and ListenBridge; routes plugin APIs through the supervisor; updates macOS speaker stream concurrency and buffering; changes AEC feature default and listener dependencies.

Changes

Cohort / File(s) Summary
AEC config
crates/aec/Cargo.toml
Default feature changed from "512" to "128".
macOS speaker stream
crates/audio/src/speaker/macos.rs
Increased ring buffer (1024×128), tightened atomic memory ordering (Acquire/Release/AcqRel), safer PCM32 handling, removed several fullness logs/assertions, simplified wake signaling, added Drop to ensure termination.
Listener deps
plugins/listener/Cargo.toml
Added tokio-util = { workspace = true }, ractor = "0.15", ractor_actors = "0.4"; removed flume and statig (async).
Actors & exports
plugins/listener/src/actors/mod.rs, .../listen.rs, .../processor.rs, .../recorder.rs, .../session.rs, .../source.rs
New actor modules and public types: ListenBridge, ListenMsg/Args/State; AudioProcessor (ProcMsg/Args/State); Recorder (RecMsg/Args); SessionSupervisor (SessionMsg/Args/State); SourceActor (SrcCtrl/SrcArgs). Adds AudioChunk type and wires inter-actor messaging.
Listener integration
plugins/listener/src/lib.rs, .../ext.rs, .../commands.rs, .../fsm.rs
Migrated plugin state to optional ActorRef<SessionMsg> supervisor. State::get_state becomes async and proxies to supervisor. ext.rs routes get/set/start/stop via SessionMsg with timeouts/fallbacks. fsm::State now unit variants and implements specta::Type. commands.rs removed post-call state validation.
Notification
plugins/notification/src/quit.rs
Fetches listener state via blocking await (block_in_place + block_on) and matches new unit-like State::RunningActive.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User
  participant App as Tauri App
  participant Sup as SessionSupervisor
  participant Mic as SourceActor (Mic)
  participant Spk as SourceActor (Speaker)
  participant Proc as AudioProcessor
  participant Rec as Recorder
  participant Lis as ListenBridge
  participant RT as Realtime Service
  participant DB as DB

  User->>App: start_session(session_id)
  App->>Sup: SessionMsg::Start{session_id}
  Sup->>Sup: load config, spawn pipeline
  Sup->>Proc: spawn/link processor
  Sup->>Mic: spawn mic source
  Sup->>Spk: spawn speaker source
  Sup->>Rec: (opt) spawn recorder & attach
  Sup->>Lis: spawn listen bridge & attach
  Sup-->>App: State = RunningActive

  par audio
    Mic-->>Proc: ProcMsg::Mic(AudioChunk)
    Spk-->>Proc: ProcMsg::Spk(AudioChunk)
  end
  Proc->>Proc: AGC + Join + AEC
  alt recorder attached
    Proc-->>Rec: RecMsg::Audio(mixed Vec<f32>)
  end
  alt listen attached
    Proc-->>Lis: ListenMsg::Audio(Bytes mic, Bytes spk)
    Lis->>RT: stream mic/spk
    RT-->>Lis: partial/final results
    Lis->>App: SessionEvent::PartialWords/FinalWords
    Lis->>DB: update_session(words)
  end

  User->>App: stop_session()
  App->>Sup: SessionMsg::Stop
  Sup->>Mic: stop
  Sup->>Spk: stop
  Sup->>Proc: stop
  Sup->>Rec: stop
  Sup->>Lis: stop
  Sup-->>App: State = Inactive
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Pre-merge checks (1 passed, 1 warning, 1 inconclusive)

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Description Check ❓ Inconclusive There is no pull request description provided, so it is impossible to assess whether the description relates appropriately to the changeset. Please add a brief description that outlines the pull request’s objectives and highlights its main changes so that reviewers can understand its purpose.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The title “Migrate to actor for audio processing” concisely captures the primary change of shifting the audio plugin architecture to an actor-based model and aligns with the extensive actor-centric modifications in the pull request without unnecessary detail or noise.

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch migrate-to-actor

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/audio/src/speaker/macos.rs (1)

123-141: Audio frames can be dropped when view exists but data_f32_at(0) is None.

Currently, the raw-pointer fallback runs only when with_buf_list_no_copy returns None. If it returns Some(view) but data_f32_at(0) is None (still F32), frames are silently skipped. Add the fallback in that branch too.

-            if let Some(view) =
-                av::AudioPcmBuf::with_buf_list_no_copy(&ctx.format, input_data, None)
-            {
-                if let Some(data) = view.data_f32_at(0) {
-                    process_audio_data(ctx, data);
-                }
-            } else if ctx.format.common_format() == av::audio::CommonFormat::PcmF32 {
+            if let Some(view) =
+                av::AudioPcmBuf::with_buf_list_no_copy(&ctx.format, input_data, None)
+            {
+                if let Some(data) = view.data_f32_at(0) {
+                    process_audio_data(ctx, data);
+                } else if ctx.format.common_format() == av::audio::CommonFormat::PcmF32 {
+                    let first_buffer = &input_data.buffers[0];
+                    let byte_count = first_buffer.data_bytes_size as usize;
+                    let float_count = byte_count / std::mem::size_of::<f32>();
+                    if float_count > 0 && first_buffer.data != std::ptr::null_mut() {
+                        let data = unsafe {
+                            std::slice::from_raw_parts(first_buffer.data as *const f32, float_count)
+                        };
+                        process_audio_data(ctx, data);
+                    }
+                }
+            } else if ctx.format.common_format() == av::audio::CommonFormat::PcmF32 {
                 let first_buffer = &input_data.buffers[0];
                 let byte_count = first_buffer.data_bytes_size as usize;
                 let float_count = byte_count / std::mem::size_of::<f32>();
 
                 if float_count > 0 && first_buffer.data != std::ptr::null_mut() {
                     let data = unsafe {
                         std::slice::from_raw_parts(first_buffer.data as *const f32, float_count)
                     };
                     process_audio_data(ctx, data);
                 }
             }
🧹 Nitpick comments (18)
plugins/listener/src/actors/recorder.rs (2)

31-43: File append semantics and sample spec: confirm intent.

  • Appending to an existing audio.wav will merge across restarts of the same session_id. If you intend a fresh file per session start, prefer always creating (or include a timestamp).
  • WAV spec is hardcoded (1ch/16k/32f). Ensure the processor’s mixed output matches this to avoid header/data mismatch.

Option (if you want fresh files):

-            let writer = if path.exists() {
-                hound::WavWriter::append(path)?
-            } else {
-                hound::WavWriter::create(path, spec)?
-            };
+            let writer = hound::WavWriter::create(path, spec)?;

50-66: Be explicit about ignored variants (Mic/Spk) for clarity.

You currently no-op non-Mixed messages. A match makes that explicit and silences future confusion.

-        async move {
-            if let RecMsg::Mixed(v) = msg {
+        async move {
+            match msg {
+                RecMsg::Mixed(v) => {
                     if let Some(ref mut writer) = st.writer {
                         for s in v {
                             writer.write_sample(s)?;
                         }
                     }
-                }
+                }
+                RecMsg::Mic(_) | RecMsg::Spk(_) => { /* intentionally ignored */ }
+            }
             Ok(())
         }
plugins/listener/src/actors/mod.rs (1)

7-11: Prefer curated re-exports over glob.

Re-exporting with * can unintentionally grow your public API. Consider explicitly listing the public types you intend to surface to keep the API stable.

plugins/listener/src/fsm.rs (1)

1-5: Derive serde with snake_case instead of manual Serialize impl.

Removes boilerplate and keeps variant-to-string mapping declarative.

-#[derive(Debug, Clone)]
-pub enum State {
+#[derive(Debug, Clone, serde::Serialize)]
+#[serde(rename_all = "snake_case")]
+pub enum State {
     RunningActive,
     Inactive,
 }
-
-impl serde::Serialize for State {
-    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
-    where
-        S: serde::Serializer,
-    {
-        match self {
-            State::Inactive => serializer.serialize_str("inactive"),
-            State::RunningActive => serializer.serialize_str("running_active"),
-        }
-    }
-}

Also applies to: 7-17

plugins/listener/src/ext.rs (8)

48-60: Don’t hold SharedState lock across actor calls.

Copy the handle, drop the lock, then call the actor to reduce contention and avoid potential lock inversions.

     async fn get_current_microphone_device(&self) -> Result<Option<String>, crate::Error> {
         let state = self.state::<crate::SharedState>();
-        let guard = state.lock().await;
-
-        if let Some(supervisor) = &guard.supervisor {
-            match call_t!(supervisor, SessionMsg::GetMicDeviceName, 100) {
+        let guard = state.lock().await;
+        let supervisor = guard.supervisor.clone();
+        drop(guard);
+
+        if let Some(supervisor) = supervisor {
+            match call_t!(supervisor, SessionMsg::GetMicDeviceName, 100) {
                 Ok(device_name) => Ok(device_name),
                 Err(_) => Ok(None),
             }
         } else {
             Ok(None)
         }
     }

67-74: Same lock-scope issue when changing mic device.

         let state = self.state::<crate::SharedState>();
-        let guard = state.lock().await;
-
-        if let Some(supervisor) = &guard.supervisor {
+        let guard = state.lock().await;
+        let supervisor = guard.supervisor.clone();
+        drop(guard);
+
+        if let Some(supervisor) = supervisor {
             let _ = supervisor.cast(SessionMsg::ChangeMicDevice(Some(device_name.into())));
         }

196-207: Avoid holding the lock while calling the actor (mic mute).

         let state = self.state::<crate::SharedState>();
-        let guard = state.lock().await;
-
-        if let Some(supervisor) = &guard.supervisor {
-            match call_t!(supervisor, SessionMsg::GetMicMute, 100) {
+        let guard = state.lock().await;
+        let supervisor = guard.supervisor.clone();
+        drop(guard);
+
+        if let Some(supervisor) = supervisor {
+            match call_t!(supervisor, SessionMsg::GetMicMute, 100) {
                 Ok(muted) => muted,
                 Err(_) => false,
             }
         } else {
             false
         }

210-222: Same for speaker mute.

         let state = self.state::<crate::SharedState>();
-        let guard = state.lock().await;
-
-        if let Some(supervisor) = &guard.supervisor {
-            match call_t!(supervisor, SessionMsg::GetSpeakerMute, 100) {
+        let guard = state.lock().await;
+        let supervisor = guard.supervisor.clone();
+        drop(guard);
+
+        if let Some(supervisor) = supervisor {
+            match call_t!(supervisor, SessionMsg::GetSpeakerMute, 100) {
                 Ok(muted) => muted,
                 Err(_) => false,
             }
         } else {
             false
         }

226-232: Same for set_mic_muted.

         let state = self.state::<crate::SharedState>();
-        let guard = state.lock().await;
-
-        if let Some(supervisor) = &guard.supervisor {
+        let guard = state.lock().await;
+        let supervisor = guard.supervisor.clone();
+        drop(guard);
+
+        if let Some(supervisor) = supervisor {
             let _ = supervisor.cast(SessionMsg::SetMicMute(muted));
         }

235-242: Same for set_speaker_muted.

         let state = self.state::<crate::SharedState>();
-        let guard = state.lock().await;
-
-        if let Some(supervisor) = &guard.supervisor {
+        let guard = state.lock().await;
+        let supervisor = guard.supervisor.clone();
+        drop(guard);
+
+        if let Some(supervisor) = supervisor {
             let _ = supervisor.cast(SessionMsg::SetSpeakerMute(muted));
         }

245-254: Same for start_session.

         let state = self.state::<crate::SharedState>();
-        let guard = state.lock().await;
-
-        if let Some(supervisor) = &guard.supervisor {
+        let guard = state.lock().await;
+        let supervisor = guard.supervisor.clone();
+        drop(guard);
+
+        if let Some(supervisor) = supervisor {
             let _ = supervisor.cast(SessionMsg::Start {
                 session_id: session_id.into(),
             });
         }

257-264: Same for stop_session.

         let state = self.state::<crate::SharedState>();
-        let guard = state.lock().await;
-
-        if let Some(supervisor) = &guard.supervisor {
+        let guard = state.lock().await;
+        let supervisor = guard.supervisor.clone();
+        drop(guard);
+
+        if let Some(supervisor) = supervisor {
             let _ = supervisor.cast(SessionMsg::Stop);
         }
plugins/listener/src/actors/session.rs (1)

4-4: Remove unused import

use tauri::Manager; isn’t referenced in this file.

Apply:

- use tauri::Manager;
plugins/listener/src/actors/listen.rs (2)

92-105: Trim clones when building word maps.

Minor: avoid extra clones when mapping words.

Apply:

@@
-                            let partial_words_by_channel: HashMap<usize, Vec<Word2>> = diff
+                            let partial_words_by_channel: HashMap<usize, Vec<Word2>> = diff
                                 .partial_words
                                 .iter()
                                 .map(|(channel_idx, words)| {
                                     (
                                         *channel_idx,
-                                        words
-                                            .iter()
-                                            .map(|w| Word2::from(w.clone()))
-                                            .collect::<Vec<_>>(),
+                                        words.iter().cloned().map(Word2::from).collect::<Vec<_>>(),
                                     )
                                 })
                                 .collect();
@@
-                            let final_words_by_channel: HashMap<usize, Vec<Word2>> = diff
+                            let final_words_by_channel: HashMap<usize, Vec<Word2>> = diff
                                 .final_words
                                 .iter()
                                 .map(|(channel_idx, words)| {
                                     (
                                         *channel_idx,
-                                        words
-                                            .iter()
-                                            .map(|w| Word2::from(w.clone()))
-                                            .collect::<Vec<_>>(),
+                                        words.iter().cloned().map(Word2::from).collect::<Vec<_>>(),
                                     )
                                 })
                                 .collect();

Also applies to: 112-124


189-205: Avoid cloning the entire session during upsert.

Clone just words to return; move session into upsert.

Apply:

@@
-    let mut session = app
+    let mut session = app
         .db_get_session(session_id)
         .await?
         .ok_or(crate::Error::NoneSession)?;
 
-    session.words.extend(words);
-    app.db_upsert_session(session.clone()).await.unwrap();
-
-    Ok(session.words)
+    session.words.extend(words);
+    let out = session.words.clone();
+    app.db_upsert_session(session).await.unwrap();
+    Ok(out)
plugins/listener/src/actors/processor.rs (3)

24-28: Converge on a single recorder handle field.

ProcArgs has both mixed_to and rec_to but state uses only one via or(). Simplify API by keeping one.

Apply:

@@
-pub struct ProcArgs {
+pub struct ProcArgs {
     pub app: tauri::AppHandle,
-    pub mixed_to: Option<ActorRef<RecMsg>>,
-    pub rec_to: Option<ActorRef<RecMsg>>,
+    pub rec_to: Option<ActorRef<RecMsg>>,
@@
-                recorder: args.mixed_to.or(args.rec_to),
+                recorder: args.rec_to,
@@
-                ProcMsg::AttachRecorder(r) => st.recorder = Some(r),
+                ProcMsg::AttachRecorder(r) => st.recorder = Some(r),

Note: Update call sites accordingly.

Also applies to: 62-63, 78-79


146-160: Replace magic number 10 with a named constant.

Improves readability and makes tuning easier.

Apply:

@@
-    fn push_mic(&mut self, data: Vec<f32>) {
+    const JOINER_MAX: usize = 10;
+    fn push_mic(&mut self, data: Vec<f32>) {
         self.mic.push_back(data);
-        // Keep only recent chunks to avoid memory growth
-        if self.mic.len() > 10 {
+        if self.mic.len() > Self::JOINER_MAX {
             self.mic.pop_front();
         }
     }
@@
-    fn push_spk(&mut self, data: Vec<f32>) {
+    fn push_spk(&mut self, data: Vec<f32>) {
         self.spk.push_back(data);
-        // Keep only recent chunks to avoid memory growth
-        if self.spk.len() > 10 {
+        if self.spk.len() > Self::JOINER_MAX {
             self.spk.pop_front();
         }
     }

100-110: Optional: avoid alloc in mix by reusing buffer.

If this is a hot path, reuse a pre-allocated Vec sized to chunk len.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 86a7cda and 68777c7.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • crates/aec/Cargo.toml (1 hunks)
  • crates/audio/src/speaker/macos.rs (6 hunks)
  • plugins/listener/Cargo.toml (1 hunks)
  • plugins/listener/src/actors/listen.rs (1 hunks)
  • plugins/listener/src/actors/mod.rs (1 hunks)
  • plugins/listener/src/actors/processor.rs (1 hunks)
  • plugins/listener/src/actors/recorder.rs (1 hunks)
  • plugins/listener/src/actors/session.rs (1 hunks)
  • plugins/listener/src/actors/source.rs (1 hunks)
  • plugins/listener/src/commands.rs (1 hunks)
  • plugins/listener/src/ext.rs (4 hunks)
  • plugins/listener/src/fsm.rs (2 hunks)
  • plugins/listener/src/lib.rs (3 hunks)
  • plugins/notification/src/quit.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{js,ts,tsx,rs}

⚙️ CodeRabbit configuration file

**/*.{js,ts,tsx,rs}: 1. Do not add any error handling. Keep the existing one.
2. No unused imports, variables, or functions.
3. For comments, keep it minimal. It should be about "Why", not "What".

Files:

  • plugins/listener/src/commands.rs
  • plugins/listener/src/actors/mod.rs
  • plugins/listener/src/actors/listen.rs
  • plugins/listener/src/actors/processor.rs
  • plugins/notification/src/quit.rs
  • plugins/listener/src/actors/recorder.rs
  • plugins/listener/src/lib.rs
  • plugins/listener/src/actors/source.rs
  • crates/audio/src/speaker/macos.rs
  • plugins/listener/src/ext.rs
  • plugins/listener/src/fsm.rs
  • plugins/listener/src/actors/session.rs
🧬 Code graph analysis (11)
plugins/listener/src/commands.rs (2)
plugins/listener/src/actors/session.rs (1)
  • stop_session (343-396)
plugins/listener/src/ext.rs (2)
  • stop_session (37-37)
  • stop_session (257-264)
plugins/listener/src/actors/mod.rs (1)
crates/audio/src/resampler.rs (1)
  • source (90-90)
plugins/listener/src/actors/listen.rs (3)
plugins/listener/src/actors/processor.rs (3)
  • pre_start (49-68)
  • new (139-144)
  • handle (70-95)
plugins/listener/src/actors/session.rs (2)
  • pre_start (61-84)
  • handle (86-171)
plugins/listener/src/manager.rs (1)
  • with_unix_timestamp (21-27)
plugins/listener/src/actors/processor.rs (5)
plugins/listener/src/actors/session.rs (2)
  • pre_start (61-84)
  • handle (86-171)
plugins/listener/src/actors/source.rs (2)
  • pre_start (51-95)
  • handle (97-133)
plugins/listener/src/actors/recorder.rs (2)
  • pre_start (24-48)
  • handle (50-66)
plugins/listener/src/actors/listen.rs (2)
  • pre_start (38-161)
  • handle (163-177)
crates/audio-utils/src/lib.rs (1)
  • f32_to_i16_bytes (51-61)
plugins/notification/src/quit.rs (2)
plugins/listener/src/ext.rs (9)
  • state (49-49)
  • state (67-67)
  • state (189-189)
  • state (196-196)
  • state (211-211)
  • state (226-226)
  • state (236-236)
  • state (246-246)
  • state (258-258)
plugins/notification/src/ext.rs (5)
  • state (149-149)
  • state (159-159)
  • state (182-182)
  • state (194-194)
  • state (202-202)
plugins/listener/src/actors/recorder.rs (2)
plugins/listener/src/actors/processor.rs (2)
  • pre_start (49-68)
  • handle (70-95)
plugins/listener/src/actors/session.rs (2)
  • pre_start (61-84)
  • handle (86-171)
plugins/listener/src/lib.rs (3)
plugins/listener/src/commands.rs (1)
  • get_state (147-151)
plugins/listener/src/ext.rs (11)
  • get_state (36-36)
  • get_state (188-192)
  • state (49-49)
  • state (67-67)
  • state (189-189)
  • state (196-196)
  • state (211-211)
  • state (226-226)
  • state (236-236)
  • state (246-246)
  • state (258-258)
plugins/notification/src/quit.rs (1)
  • app_handle (12-12)
plugins/listener/src/actors/source.rs (4)
plugins/listener/src/actors/processor.rs (3)
  • pre_start (49-68)
  • new (139-144)
  • handle (70-95)
plugins/listener/src/actors/session.rs (2)
  • pre_start (61-84)
  • handle (86-171)
crates/audio/src/device_monitor.rs (1)
  • spawn (40-60)
crates/audio/src/lib.rs (4)
  • get_default_mic_device_name (76-80)
  • silence (44-59)
  • from_mic (97-106)
  • from_speaker (108-115)
crates/audio/src/speaker/macos.rs (2)
crates/audio/src/speaker/windows.rs (2)
  • new (14-16)
  • drop (167-178)
crates/audio/src/speaker/mod.rs (2)
  • new (33-36)
  • new (39-43)
plugins/listener/src/ext.rs (2)
plugins/listener/src/commands.rs (6)
  • get_mic_muted (96-98)
  • get_speaker_muted (102-106)
  • set_mic_muted (110-116)
  • set_speaker_muted (120-126)
  • start_session (130-136)
  • stop_session (140-143)
plugins/listener/src/actors/session.rs (2)
  • start_session (218-341)
  • stop_session (343-396)
plugins/listener/src/actors/session.rs (6)
plugins/listener/src/actors/processor.rs (3)
  • pre_start (49-68)
  • new (139-144)
  • handle (70-95)
plugins/listener/src/actors/source.rs (3)
  • pre_start (51-95)
  • handle (97-133)
  • post_stop (135-149)
plugins/listener/src/actors/recorder.rs (3)
  • pre_start (24-48)
  • handle (50-66)
  • post_stop (68-79)
plugins/listener/src/actors/listen.rs (3)
  • pre_start (38-161)
  • handle (163-177)
  • post_stop (179-186)
crates/audio/src/lib.rs (1)
  • get_default_mic_device_name (76-80)
plugins/listener/src/ext.rs (13)
  • state (49-49)
  • state (67-67)
  • state (189-189)
  • state (196-196)
  • state (211-211)
  • state (226-226)
  • state (236-236)
  • state (246-246)
  • state (258-258)
  • start_session (38-38)
  • start_session (245-254)
  • stop_session (37-37)
  • stop_session (257-264)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: ci (macos, macos-14)
  • GitHub Check: ci (windows, windows-latest)
🔇 Additional comments (27)
crates/aec/Cargo.toml (1)

7-7: Default AEC feature switched to "128": verify downstream assumptions and perf.

Confirm all AEC consumers expect the new default size and that latency/quality regressions are acceptable. Re-run benches with this default to validate.

plugins/listener/src/commands.rs (1)

135-136: start_session/stop_session now always return Ok(()): confirm caller expectations.

If UI or tests relied on StartSessionFailed/StopSessionFailed errors, they’ll silently succeed now. Ensure the frontend listens for state events (e.g., Inactive/RunningActive) instead of command results.

Also applies to: 141-143

plugins/notification/src/quit.rs (1)

14-18: Blocking await requires a Tokio runtime context.

tokio::task::block_in_place + Handle::current().block_on() will panic if this handler runs off-runtime. Verify this closure is always invoked on a Tokio-enabled thread in production (Tauri’s thread model can vary). Also ensure guard.get_state() doesn’t attempt to re-acquire the same lock internally to avoid deadlocks.

plugins/listener/Cargo.toml (1)

63-64: New actor deps: check compatibility and usage.

  • ractor 0.15 / ractor_actors 0.4: confirm compatibility with workspace tokio and MSRV.
  • tokio-util added: verify it’s actually used to avoid unused-dep bloat.

Also applies to: 68-69

plugins/listener/src/actors/recorder.rs (1)

68-79: Clean shutdown finalizes WAV writer.

Finalizing in post_stop is correct to seal the file header.

crates/audio/src/speaker/macos.rs (3)

157-159: Ring buffer size increased (131,072 samples): validate latency budget.

Bigger buffers reduce drops but can increase end-to-end latency, which may affect AEC quality. Confirm this size keeps the latency within your canceler’s tolerance.


36-38: Memory ordering on sample rate update/load looks correct.

Release in the callback and Acquire in readers is appropriate for cross-thread visibility.

Also applies to: 116-121


233-238: Termination path and Drop are sound.

Acquire check in poll_next with a last-drain and Release set in producer/drop ensures clean shutdown without holding locks.

Also applies to: 250-253

plugins/listener/src/actors/mod.rs (1)

1-5: Module surfacing looks good.

The module layout and visibility are clear.

plugins/listener/src/fsm.rs (1)

19-26: Specta type mapping is appropriate.

Representing State as a string for codegen is consistent with the serde mapping.

plugins/listener/src/ext.rs (4)

4-4: Import of call_t is appropriate.

Matches usage below.


12-13: SessionMsg import is correct.

Keeps actor messages local to the extension surface.


188-192: State retrieval path looks good.

Delegating to guard.get_state().await aligns with the new supervisor flow.


48-60: Ignore type mismatch concern. SessionMsg::GetMicDeviceName carries an RpcReplyPort<Option>, so call_t!(…, GetMicDeviceName, …) returns Option and Ok(device_name) correctly matches Result<Option<String>, _>. No changes needed.

Likely an incorrect or invalid review comment.

plugins/listener/src/actors/source.rs (4)

76-81: Silent output stream lifetime handled

Holding the silence() sender and dropping it in post_stop should unblock rx.recv() and stop the sink thread. LGTM.

Also applies to: 145-149


122-128: Hot mic-device change correctly restarts capture

Updating device, aborting the prior task, and restarting the source loop is the right approach for seamless hot-switching. LGTM.


181-195: Cancellation path and muted mixing look good

Stopping the actor on token cancellation and zero-filling when muted are straightforward and race-free (Relaxed is fine for this flag). LGTM.


163-176: Handle potential panic on mic initialization

Using AudioInput::from_mic(device.clone()).unwrap() can panic if device setup fails, terminating the spawned task silently. Replace the unwrap() with proper error handling (e.g. match, retry loop, or logging and backoff), or confirm that from_mic is infallible in your environment.

plugins/listener/src/lib.rs (3)

17-18: Actor-based wiring is clean

Importing and using SessionSupervisor with typed SessionMsg keeps the surface tight. LGTM.


28-36: State query timeout: validate 100ms SLA

call_t!(..., 100) may be tight under load and could devolve to frequent “Inactive” fallbacks. Confirm this meets UX/telemetry expectations.

You can test latencies in situ and adjust a single constant if needed.


73-79: Supervisor bootstrap and join-handle monitoring look solid

Managing shared state under a mutex and logging child termination outcomes is appropriate. LGTM.

Also applies to: 95-101

plugins/listener/src/actors/session.rs (5)

61-84: pre_start defaults are sane

Grabbing default mic and initializing a fresh token is appropriate for an idle supervisor. LGTM.


112-124: Mute control emits are properly decoupled

Forwarding to sources and emitting app events keeps UI state consistent. LGTM.


132-154: Synchronous getters with 100ms call_t timeout: validate SLA

The 100ms timeout mirrors lib.rs. Ensure it’s adequate, or centralize the timeout in a shared const to avoid drift.


256-297: Linked actor topology and attachments look correct

Processor, mic/spk sources, optional recorder, and listen bridge are linked to the supervisor, with attachments sequenced properly. LGTM.

Also applies to: 314-329, 335-341


343-367: Orderly shutdown and UI restoration

Cancel-first, then stop children, persist session end, and restore UI state is clean and idempotent. LGTM.

Also applies to: 387-395

plugins/listener/src/actors/listen.rs (1)

43-45: Confirm buffer/backpressure choice (try_send + capacity 32).

With try_send and a small 32-slot channel, audio frames will be dropped under load. If that’s intended, ignore; if not, revisit capacity/flow.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
plugins/listener/src/actors/source.rs (1)

56-73: Device monitor thread lifecycle: rely on Drop joining the spawned monitor thread

This setup assumes DeviceMonitorHandle joins its internal thread on drop; otherwise a detached thread leaks per mic source lifecycle. There’s a prior review asking to join in Drop for DeviceMonitorHandle. Please confirm that change landed.

plugins/listener/src/actors/session.rs (2)

223-234: Potential panic on missing user/config paths (prior note)

state.app.db_user_id().await?.unwrap() will panic if no user is present. Prior review already flagged this class of unwrap. Please confirm invariants ensure Some(user_id) here.


291-305: Potential panic on app_data_dir().unwrap() (prior note)

If the app data dir can be absent, this will panic; otherwise fine. Please confirm the invariant before shipping.

plugins/listener/src/actors/processor.rs (1)

113-118: Drop unnecessary into() on Bytes.

They’re already Bytes; shave a tiny conversion.

-            list.cast(ListenMsg::Audio(mic_bytes.into(), spk_bytes.into()))
+            list.cast(ListenMsg::Audio(mic_bytes, spk_bytes))
                 .ok();
🧹 Nitpick comments (10)
plugins/listener/src/actors/recorder.rs (2)

24-46: Append vs. create: ensure spec stays consistent across runs

If an older audio.wav was created with a different spec, WavWriter::append may yield a malformed file. Please confirm all sessions always use 1ch/16kHz/32f. If not guaranteed, consider encoding the spec in the filename or guarding at the creator.


55-61: Per-sample write loop is OK; zero-copy message passing is preserved

Vec<f32> is moved into the actor, so no extra copies; the per-sample write_sample loop is acceptable at 16 kHz. If chunks grow, we can micro-opt by iterating by reference (for &s in &v).

plugins/listener/src/actors/source.rs (3)

119-125: Avoid device-open races when switching mic device

Aborting the run task and immediately starting a new one can contend for the audio device. Await the aborted task to finish before restarting.

             (SrcCtrl::SetDevice(dev), SrcWhich::Mic { device }) => {
                 *device = dev;
                 if let Some(t) = st.run_task.take() {
-                    t.abort();
+                    t.abort();
+                    // Wait for prior stream teardown to finish before reopening device
+                    let _ = t.await;
                 }
                 start_source_loop(&myself, st).await?;
             }

176-184: Mute path: avoid per-chunk allocation

Zeroing in-place avoids allocating a new Vec each time.

-                        if let Some(data) = next {
-                            let output_data = if muted.load(Ordering::Relaxed) {
-                                vec![0.0; data.len()]
-                            } else {
-                                data
-                            };
+                        if let Some(mut data) = next {
+                            if muted.load(Ordering::Relaxed) {
+                                data.fill(0.0);
+                            }
+                            let output_data = data;

196-197: Magic constant: name the backoff

Give the restart backoff a name for clarity and easy tuning.

-            tokio::time::sleep(Duration::from_millis(200)).await;
+            const RESTART_BACKOFF_MS: u64 = 200;
+            tokio::time::sleep(Duration::from_millis(RESTART_BACKOFF_MS)).await;
plugins/listener/src/actors/session.rs (2)

122-132: 100 ms RPC timeout: verify it’s sufficient under load

call_t!(mic, SrcCtrl::GetDevice, 100) with 100 ms might be tight on busy systems. Please confirm this SLO; otherwise consider a slightly higher bound to reduce spurious timeouts.


334-355: Teardown ordering is sensible

Cancel token first, then stop children is the right order. If you observe device contention on fast restarts, consider awaiting child terminations, but current non-blocking stops are fine for UX.

plugins/listener/src/actors/listen.rs (3)

92-105: Remove extra clones in Word2 mapping.

Use cloned()+map(Word2::from) to avoid cloning the slice element and keep the code tight.

-                                        words
-                                            .iter()
-                                            .map(|w| Word2::from(w.clone()))
-                                            .collect::<Vec<_>>(),
+                                        words
+                                            .iter()
+                                            .cloned()
+                                            .map(Word2::from)
+                                            .collect::<Vec<_>>(),

Also applies to: 112-125


126-137: Avoid cloning the entire map to collect finals.

Flatten values in-place; no need to clone the HashMap.

-                            update_session(
-                                &app,
-                                &session_id,
-                                final_words_by_channel
-                                    .clone()
-                                    .values()
-                                    .flatten()
-                                    .cloned()
-                                    .collect(),
-                            )
+                            update_session(
+                                &app,
+                                &session_id,
+                                final_words_by_channel
+                                    .values()
+                                    .flat_map(|v| v.iter().cloned())
+                                    .collect(),
+                            )

187-203: Tighten update_session signature to make String conversions explicit.

Binding the generic and calling into() locally prevents call-site ambiguity around &String/&str.

-async fn update_session<R: tauri::Runtime>(
-    app: &tauri::AppHandle<R>,
-    session_id: impl Into<String>,
-    words: Vec<Word2>,
-) -> Result<Vec<Word2>, crate::Error> {
+async fn update_session<R: tauri::Runtime, S: Into<String>>(
+    app: &tauri::AppHandle<R>,
+    session_id: S,
+    words: Vec<Word2>,
+) -> Result<Vec<Word2>, crate::Error> {
@@
-    let mut session = app
-        .db_get_session(session_id)
+    let mut session = app
+        .db_get_session(session_id.into())
         .await?
         .ok_or(crate::Error::NoneSession)?;
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 68777c7 and c72ea6b.

📒 Files selected for processing (5)
  • plugins/listener/src/actors/listen.rs (1 hunks)
  • plugins/listener/src/actors/processor.rs (1 hunks)
  • plugins/listener/src/actors/recorder.rs (1 hunks)
  • plugins/listener/src/actors/session.rs (1 hunks)
  • plugins/listener/src/actors/source.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{js,ts,tsx,rs}

⚙️ CodeRabbit configuration file

**/*.{js,ts,tsx,rs}: 1. Do not add any error handling. Keep the existing one.
2. No unused imports, variables, or functions.
3. For comments, keep it minimal. It should be about "Why", not "What".

Files:

  • plugins/listener/src/actors/listen.rs
  • plugins/listener/src/actors/recorder.rs
  • plugins/listener/src/actors/source.rs
  • plugins/listener/src/actors/session.rs
  • plugins/listener/src/actors/processor.rs
🧬 Code graph analysis (5)
plugins/listener/src/actors/listen.rs (3)
plugins/listener/src/actors/processor.rs (3)
  • pre_start (50-67)
  • new (139-144)
  • handle (69-94)
plugins/listener/src/actors/session.rs (2)
  • pre_start (60-80)
  • handle (82-172)
plugins/listener/src/manager.rs (1)
  • with_unix_timestamp (21-27)
plugins/listener/src/actors/recorder.rs (2)
plugins/listener/src/actors/processor.rs (2)
  • pre_start (50-67)
  • handle (69-94)
plugins/listener/src/actors/session.rs (2)
  • pre_start (60-80)
  • handle (82-172)
plugins/listener/src/actors/source.rs (4)
plugins/listener/src/actors/processor.rs (3)
  • pre_start (50-67)
  • new (139-144)
  • handle (69-94)
plugins/listener/src/actors/session.rs (2)
  • pre_start (60-80)
  • handle (82-172)
crates/audio/src/device_monitor.rs (1)
  • spawn (40-60)
crates/audio/src/lib.rs (4)
  • get_default_mic_device_name (76-80)
  • silence (44-59)
  • from_mic (97-106)
  • from_speaker (108-115)
plugins/listener/src/actors/session.rs (6)
plugins/listener/src/actors/processor.rs (3)
  • pre_start (50-67)
  • new (139-144)
  • handle (69-94)
plugins/listener/src/actors/recorder.rs (3)
  • pre_start (24-46)
  • handle (48-65)
  • post_stop (67-77)
plugins/listener/src/actors/source.rs (3)
  • pre_start (51-93)
  • handle (95-130)
  • post_stop (132-144)
plugins/listener/src/actors/listen.rs (3)
  • pre_start (38-161)
  • handle (163-175)
  • post_stop (177-184)
plugins/listener/src/ext.rs (13)
  • state (49-49)
  • state (67-67)
  • state (189-189)
  • state (196-196)
  • state (211-211)
  • state (226-226)
  • state (236-236)
  • state (246-246)
  • state (258-258)
  • start_session (38-38)
  • start_session (245-254)
  • stop_session (37-37)
  • stop_session (257-264)
plugins/listener/src/commands.rs (2)
  • start_session (130-136)
  • stop_session (140-143)
plugins/listener/src/actors/processor.rs (5)
plugins/listener/src/actors/recorder.rs (2)
  • pre_start (24-46)
  • handle (48-65)
plugins/listener/src/actors/source.rs (2)
  • pre_start (51-93)
  • handle (95-130)
plugins/listener/src/actors/session.rs (2)
  • pre_start (60-80)
  • handle (82-172)
plugins/listener/src/actors/listen.rs (2)
  • pre_start (38-161)
  • handle (163-175)
crates/audio-utils/src/lib.rs (1)
  • f32_to_i16_bytes (51-61)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: ci (macos, macos-14)
  • GitHub Check: ci (windows, windows-latest)
🔇 Additional comments (10)
plugins/listener/src/actors/recorder.rs (1)

72-74: Graceful finalization on stop looks good

Taking the writer and calling finalize() in post_stop correctly patches the WAV header.

plugins/listener/src/actors/source.rs (2)

161-166: Unwrap on mic open can panic

AudioInput::from_mic(...).unwrap() will panic if the device is unavailable. If this is an invariant guaranteed by session wiring, all good; otherwise expect crash on device loss. Please confirm the invariant.


81-93: Overall actor init flow looks clean

State wiring, optional device monitor, and speaker “silence” keepalive are well-factored; starting the loop in pre_start is appropriate.

plugins/listener/src/actors/session.rs (3)

89-102: Start flow: correct handling of duplicate session_id

Short-circuiting when the same session is already active is good; stopping when IDs differ avoids overlapping pipelines.


134-156: Symmetric mute queries look good

Consistent use of call_t! for mic/speaker and reply guards keeps the API predictable.


328-333: State transitions and UI signals look consistent

Setting RunningActive after wiring and emitting events, and restoring to Inactive on stop, aligns with the new actor model.

plugins/listener/src/actors/listen.rs (1)

15-17: Good fix: Bytes end-to-end for audio payloads.

This resolves the prior Vec/Bytes mismatch and avoids copies on the hot path.

plugins/listener/src/actors/processor.rs (3)

104-111: Mixer math looks good.

Zip + clamp prevents overflow; safe for 16kHz float PCM.


121-129: Sane amplitude throttling.

100 ms cadence is a good tradeoff for UI responsiveness vs. spam.


146-158: Bounded queues prevent unbounded growth.

The 10-chunk cap keeps latency under control when one side lags.

@yujonglee yujonglee merged commit cee9c59 into main Sep 10, 2025
6 of 8 checks passed
@yujonglee yujonglee deleted the migrate-to-actor branch September 10, 2025 20:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant