Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi monitor + multi audio device reconnection #697

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions screenpipe-audio/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ pub async fn get_device_and_config(
) -> Result<(cpal::Device, cpal::SupportedStreamConfig)> {
let host = cpal::default_host();

info!("device: {:?}", audio_device.to_string());

let is_output_device = audio_device.device_type == DeviceType::Output;
let is_display = audio_device.to_string().contains("Display");

Expand Down Expand Up @@ -183,6 +181,7 @@ pub async fn record_and_transcribe(
// Normal shutdown
break;
}

error!("record_and_transcribe error, restarting: {}", e);
// Add a small delay before restarting to prevent rapid restart loops
tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down Expand Up @@ -211,7 +210,9 @@ async fn run_record_and_transcribe(
let sample_rate = audio_stream.device_config.sample_rate().0 as usize;
let overlap_samples = OVERLAP_SECONDS * sample_rate;

while is_running.load(Ordering::Relaxed) {
while is_running.load(Ordering::Relaxed)
&& !audio_stream.is_disconnected.load(Ordering::Relaxed)
{
let start_time = tokio::time::Instant::now();

while start_time.elapsed() < duration && is_running.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -387,6 +388,7 @@ pub struct AudioStream {
transmitter: Arc<tokio::sync::broadcast::Sender<Vec<f32>>>,
stream_control: mpsc::Sender<StreamControl>,
stream_thread: Option<Arc<tokio::sync::Mutex<Option<thread::JoinHandle<()>>>>>,
is_disconnected: Arc<AtomicBool>,
}

enum StreamControl {
Expand All @@ -404,19 +406,38 @@ impl AudioStream {
let channels = config.channels();

let is_running_weak_2 = Arc::downgrade(&is_running);
let is_disconnected = Arc::new(AtomicBool::new(false));
let device_clone = device.clone();
let config_clone = config.clone();
let (stream_control_tx, stream_control_rx) = mpsc::channel();

let is_disconnected_clone = is_disconnected.clone();
let stream_control_tx_clone = stream_control_tx.clone();
let stream_thread = Arc::new(tokio::sync::Mutex::new(Some(thread::spawn(move || {
let device = device_clone;
let device_name = device.to_string();
let config = config_clone;
let error_callback = move |err: StreamError| {
error!("an error occurred on the audio stream: {}", err);
if err.to_string().contains("device is no longer valid") {
warn!("audio device disconnected. stopping recording.");
if let Some(arc) = is_running_weak_2.upgrade() {
arc.store(false, Ordering::Relaxed);
if err
.to_string()
.contains("The requested device is no longer available")
{
warn!(
"audio device {} disconnected. stopping recording.",
device_name
);
stream_control_tx_clone
.send(StreamControl::Stop(oneshot::channel().0))
.unwrap();

is_disconnected_clone.store(true, Ordering::Relaxed);
} else {
error!("an error occurred on the audio stream: {}", err);
if err.to_string().contains("device is no longer valid") {
warn!("audio device disconnected. stopping recording.");
if let Some(arc) = is_running_weak_2.upgrade() {
arc.store(false, Ordering::Relaxed);
}
}
}
};
Expand Down Expand Up @@ -490,6 +511,7 @@ impl AudioStream {
transmitter: Arc::new(tx_clone),
stream_control: stream_control_tx,
stream_thread: Some(stream_thread),
is_disconnected,
})
}

Expand All @@ -498,6 +520,7 @@ impl AudioStream {
}

pub async fn stop(mut self) -> Result<()> {
self.is_disconnected.store(true, Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
self.stream_control.send(StreamControl::Stop(tx))?;
rx.await?;
Expand Down
15 changes: 8 additions & 7 deletions screenpipe-audio/src/stt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,7 @@
embedding_manager: EmbeddingManager,
embedding_extractor: Arc<StdMutex<EmbeddingExtractor>>,
) -> Result<tokio::sync::mpsc::Receiver<SpeechSegment>> {
info!("Preparing segments");
let audio_data = if audio_input.sample_rate != m::SAMPLE_RATE as u32 {
info!(
"device: {}, resampling from {} Hz to {} Hz",
audio_input.device,
audio_input.sample_rate,
m::SAMPLE_RATE
);
resample(
audio_input.data.as_ref(),
audio_input.sample_rate,
Expand Down Expand Up @@ -337,6 +330,14 @@
let speech_ratio = speech_frame_count as f32 / total_frames as f32;
let min_speech_ratio = vad_engine.lock().await.get_min_speech_ratio();

info!(
"device: {}, speech ratio: {}, min_speech_ratio: {}, audio_frames: {}, speech_frames: {}",
audio_input.device,
speech_ratio,
min_speech_ratio,
audio_frames.len(),
speech_frame_count
);
let (tx, rx) = tokio::sync::mpsc::channel(100);
if !audio_frames.is_empty() && speech_ratio >= min_speech_ratio {
let segments = get_segments(
Expand Down Expand Up @@ -581,7 +582,7 @@

while let Some(segment) = segments.recv().await {
let transcription_result = if cfg!(target_os = "macos") {
let timestamp = timestamp + segment.start.round() as u64;

Check warning on line 585 in screenpipe-audio/src/stt.rs

View workflow job for this annotation

GitHub Actions / test-linux

unused variable: `timestamp`

Check warning on line 585 in screenpipe-audio/src/stt.rs

View workflow job for this annotation

GitHub Actions / test-ubuntu

unused variable: `timestamp`

Check warning on line 585 in screenpipe-audio/src/stt.rs

View workflow job for this annotation

GitHub Actions / test-windows

unused variable: `timestamp`
#[cfg(target_os = "macos")]
{
autoreleasepool(|| {
Expand Down
76 changes: 46 additions & 30 deletions screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,38 +310,54 @@ async fn record_audio(
&audio_device
);

let audio_stream = match AudioStream::from_device(
audio_device_clone.clone(),
Arc::new(AtomicBool::new(device_control.clone().is_running)),
)
.await
{
Ok(stream) => stream,
Err(e) => {
error!("Failed to create audio stream: {}", e);
return;
}
};

let audio_stream = Arc::new(audio_stream);
let device_control_clone = Arc::clone(&device_control);
let whisper_sender_clone = whisper_sender_clone.clone();
let audio_device = Arc::clone(&audio_device_clone);
let record_handle = tokio::spawn(async move {
let _ = record_and_transcribe(
audio_stream,
chunk_duration,
whisper_sender_clone.clone(),
Arc::new(AtomicBool::new(device_control_clone.is_running)),
let mut did_warn = false;
let is_running = Arc::new(AtomicBool::new(device_control.is_running));

while is_running.load(Ordering::Relaxed) {
let is_running_loop = Arc::clone(&is_running); // Create separate reference for the loop
let audio_stream = match AudioStream::from_device(
audio_device_clone.clone(),
Arc::clone(&is_running_loop), // Clone from original Arc
)
.await;
});

// let live_transcription_handle = tokio::spawn(async move {
// let _ = live_transcription(audio_stream, whisper_sender_clone.clone()).await;
// });
.await
{
Ok(stream) => stream,
Err(e) => {
if e.to_string().contains("Audio device not found") {
if !did_warn {
warn!("Audio device not found: {}", audio_device.name);
did_warn = true;
}
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
} else {
error!("Failed to create audio stream: {}", e);
return;
}
}
};

let audio_stream = Arc::new(audio_stream);
let whisper_sender_clone = whisper_sender_clone.clone();
let record_handle = Some(tokio::spawn(async move {
let _ = record_and_transcribe(
audio_stream,
chunk_duration,
whisper_sender_clone.clone(),
is_running_loop.clone(),
)
.await;
}));

// let live_transcription_handle = tokio::spawn(async move {
// let _ = live_transcription(audio_stream, whisper_sender_clone.clone()).await;
// });

if let Some(handle) = record_handle {
handle.await.unwrap();
}
}

record_handle.await.unwrap();
info!("exiting audio capture thread for device: {}", &audio_device);
});

Expand Down
Binary file modified screenpipe-vision/bin/ui_monitor-aarch64-apple-darwin
Binary file not shown.
28 changes: 13 additions & 15 deletions screenpipe-vision/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{
time::{Duration, Instant},
};
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
use xcap::Monitor;

pub struct CaptureResult {
Expand Down Expand Up @@ -56,28 +57,25 @@ pub async fn continuous_capture(
include_list: &[String],
languages: Vec<Language>,
) {
debug!(
"continuous_capture: Starting using monitor: {:?}",
monitor_id
);
let mut frame_counter: u64 = 0;
let mut previous_image: Option<DynamicImage> = None;
let mut max_average: Option<MaxAverageFrame> = None;
let mut max_avg_value = 0.0;

let monitor = match get_monitor_by_id(monitor_id).await {
Some(m) => m,
None => {
error!(
"Failed to get monitor with id: {}. Exiting continuous_capture.",
monitor_id
);
return;
}
};
debug!(
"continuous_capture: Starting using monitor: {:?}",
monitor_id
);

loop {
let capture_result = match capture_screenshot(&monitor, &ignore_list, &include_list).await {
let monitor = match get_monitor_by_id(monitor_id).await {
Some(m) => m,
None => {
sleep(Duration::from_secs(1)).await;
continue;
}
};
let capture_result = match capture_screenshot(&monitor, ignore_list, include_list).await {
Ok((image, window_images, image_hash, _capture_duration)) => {
debug!(
"Captured screenshot on monitor {} with hash: {}",
Expand Down
Loading