diff --git a/plugins/listener/src/actors/listener.rs b/plugins/listener/src/actors/listener.rs index e12eee07b1..353e5d9ebe 100644 --- a/plugins/listener/src/actors/listener.rs +++ b/plugins/listener/src/actors/listener.rs @@ -126,11 +126,14 @@ impl Actor for ListenerActor { crate::actors::ChannelMode::MicAndSpeaker => {} } - SessionEvent::StreamResponse { + if let Err(error) = (SessionEvent::StreamResponse { session_id: state.args.session_id.clone(), response, + }) + .emit(&state.args.app) + { + tracing::error!(?error, "stream_response_emit_failed"); } - .emit(&state.args.app)?; } ListenerMsg::StreamStartFailed(error) => { @@ -524,7 +527,10 @@ async fn process_stream( response.set_channel_index(channel_idx, total_channels); } - let _ = myself.send_message(ListenerMsg::StreamResponse(response)); + if myself.send_message(ListenerMsg::StreamResponse(response)).is_err() { + tracing::warn!("actor_gone_during_finalize"); + break; + } if received_from_finalize { tracing::info!(from_finalize = true, "break_from_finalize"); @@ -554,7 +560,10 @@ async fn process_stream( response.set_channel_index(channel_idx, total_channels); } - let _ = myself.send_message(ListenerMsg::StreamResponse(response)); + if myself.send_message(ListenerMsg::StreamResponse(response)).is_err() { + tracing::warn!("actor_gone_breaking_stream_loop"); + break; + } } // Something went wrong while sending or receiving a websocket message. Should restart. Ok(Some(Err(e))) => { diff --git a/plugins/listener/src/actors/recorder.rs b/plugins/listener/src/actors/recorder.rs index 06f2eae242..7ed4fae63d 100644 --- a/plugins/listener/src/actors/recorder.rs +++ b/plugins/listener/src/actors/recorder.rs @@ -165,9 +165,7 @@ impl Actor for RecorderActor { &st.wav_path, &temp_ogg_path, VorbisEncodeSettings::default(), - ) - .map_err(into_actor_err) - { + ) { Ok(_) => { std::fs::rename(&temp_ogg_path, &st.ogg_path)?; std::fs::remove_file(&st.wav_path)?; @@ -175,7 +173,7 @@ impl Actor for RecorderActor { Err(e) => { tracing::error!(error = ?e, "wav_to_ogg_failed_keeping_wav"); let _ = std::fs::remove_file(&temp_ogg_path); - return Err(e); + // Keep WAV as a fallback, but don't cause an actor failure } } } diff --git a/plugins/listener/src/ext.rs b/plugins/listener/src/ext.rs index 4cba958098..8818c270a9 100644 --- a/plugins/listener/src/ext.rs +++ b/plugins/listener/src/ext.rs @@ -111,11 +111,13 @@ impl> ListenerPluginExt for T { guard.session_supervisor = Some(supervisor_ref); guard.supervisor_handle = Some(handle); - SessionEvent::RunningActive { + if let Err(error) = (SessionEvent::RunningActive { session_id: params.session_id, - } + }) .emit(&guard.app) - .unwrap(); + { + tracing::error!(?error, "failed_to_emit_running_active"); + } tracing::info!("session_started"); } @@ -141,9 +143,9 @@ impl> ListenerPluginExt for T { }; if let Some(session_id) = session_id.clone() { - SessionEvent::Finalizing { session_id } - .emit(&guard.app) - .unwrap(); + if let Err(error) = (SessionEvent::Finalizing { session_id }).emit(&guard.app) { + tracing::error!(?error, "failed_to_emit_finalizing"); + } } if let Some(supervisor_cell) = guard.session_supervisor.take() { @@ -160,9 +162,9 @@ impl> ListenerPluginExt for T { } if let Some(session_id) = session_id { - SessionEvent::Inactive { session_id } - .emit(&guard.app) - .unwrap(); + if let Err(error) = (SessionEvent::Inactive { session_id }).emit(&guard.app) { + tracing::error!(?error, "failed_to_emit_inactive"); + } } tracing::info!("session_stopped");