@@ -17,12 +17,12 @@ use std::{
1717 } ,
1818 time:: Duration ,
1919} ;
20- use tokio:: sync:: broadcast;
20+ use tokio:: { select , sync:: broadcast} ;
2121use tokio_util:: {
2222 future:: FutureExt as _,
2323 sync:: { CancellationToken , DropGuard } ,
2424} ;
25- use tracing:: debug;
25+ use tracing:: { debug, warn } ;
2626
2727#[ derive( Debug ) ]
2828pub struct CMSampleBufferCapture ;
@@ -269,7 +269,6 @@ pub struct CaptureError(pub arc::R<ns::Error>);
269269struct Capturer {
270270 started : Arc < AtomicBool > ,
271271 capturer : Arc < scap_screencapturekit:: Capturer > ,
272- // error_rx: broadcast::Receiver<arc::R<ns::Error>>,
273272}
274273
275274impl Clone for Capturer {
@@ -283,32 +282,53 @@ impl Clone for Capturer {
283282}
284283
285284impl Capturer {
286- fn new (
287- capturer : Arc < scap_screencapturekit:: Capturer > ,
288- // error_rx: broadcast::Receiver<arc::R<ns::Error>>,
289- ) -> Self {
285+ fn new ( capturer : Arc < scap_screencapturekit:: Capturer > ) -> Self {
290286 Self {
291287 started : Arc :: new ( AtomicBool :: new ( false ) ) ,
292288 capturer,
293- // error_rx,
294289 }
295290 }
296291
297- async fn start ( & mut self ) -> anyhow:: Result < ( ) > {
298- if !self . started . fetch_xor ( true , atomic:: Ordering :: Relaxed ) {
299- self . capturer . start ( ) . await ?;
292+ async fn start ( & self ) -> anyhow:: Result < ( ) > {
293+ if self
294+ . started
295+ . compare_exchange (
296+ false ,
297+ true ,
298+ atomic:: Ordering :: Relaxed ,
299+ atomic:: Ordering :: Relaxed ,
300+ )
301+ . is_ok ( )
302+ {
303+ self . capturer
304+ . start ( )
305+ . await
306+ . map_err ( |err| anyhow ! ( format!( "{err}" ) ) ) ?;
300307 }
301308
302309 Ok ( ( ) )
303310 }
304311
305- async fn stop ( & mut self ) -> anyhow:: Result < ( ) > {
306- if self . started . fetch_xor ( true , atomic:: Ordering :: Relaxed ) {
312+ async fn stop ( & self ) -> anyhow:: Result < ( ) > {
313+ if self
314+ . started
315+ . compare_exchange (
316+ true ,
317+ false ,
318+ atomic:: Ordering :: Relaxed ,
319+ atomic:: Ordering :: Relaxed ,
320+ )
321+ . is_ok ( )
322+ {
307323 self . capturer . stop ( ) . await . context ( "capturer_stop" ) ?;
308324 }
309325
310326 Ok ( ( ) )
311327 }
328+
329+ fn mark_stopped ( & self ) {
330+ self . started . store ( false , atomic:: Ordering :: Relaxed ) ;
331+ }
312332}
313333
314334pub struct VideoSourceConfig {
@@ -332,29 +352,63 @@ impl output_pipeline::VideoSource for VideoSource {
332352 type Frame = VideoFrame ;
333353
334354 async fn setup (
335- mut config : Self :: Config ,
355+ config : Self :: Config ,
336356 video_tx : mpsc:: Sender < Self :: Frame > ,
337357 ctx : & mut SetupCtx ,
338358 ) -> anyhow:: Result < Self >
339359 where
340360 Self : Sized ,
341361 {
342- ctx. tasks ( ) . spawn ( "screen-capture" , async move {
343- if let Ok ( err) = config. error_rx . recv ( ) . await {
344- return Err ( anyhow ! ( "{err}" ) ) ;
345- }
362+ let VideoSourceConfig {
363+ inner,
364+ capturer,
365+ mut error_rx,
366+ cancel_token,
367+ drop_guard,
368+ video_frame_counter,
369+ } = config;
370+
371+ let monitor_capturer = capturer. clone ( ) ;
372+ let monitor_cancel = cancel_token. clone ( ) ;
373+ ctx. tasks ( ) . spawn ( "screen-capture-monitor" , async move {
374+ loop {
375+ select ! {
376+ _ = monitor_cancel. cancelled( ) => break Ok ( ( ) ) ,
377+ recv = error_rx. recv( ) => {
378+ let err = match recv {
379+ Ok ( err) => err,
380+ Err ( broadcast:: error:: RecvError :: Closed ) => break Ok ( ( ) ) ,
381+ Err ( broadcast:: error:: RecvError :: Lagged ( _) ) => {
382+ warn!( "Screen capture error channel lagged; continuing" ) ;
383+ continue ;
384+ }
385+ } ;
386+
387+ if is_system_stop_error( err. as_ref( ) ) {
388+ warn!( "Screen capture stream stopped by the system; attempting restart" ) ;
389+ monitor_capturer. mark_stopped( ) ;
390+ if let Err ( restart_err) = monitor_capturer. start( ) . await {
391+ return Err ( anyhow!( format!(
392+ "Failed to restart ScreenCaptureKit stream: {restart_err:#}"
393+ ) ) ) ;
394+ }
395+ continue ;
396+ }
346397
347- Ok ( ( ) )
398+ return Err ( anyhow!( format!( "{err}" ) ) ) ;
399+ }
400+ }
401+ }
348402 } ) ;
349403
350- ChannelVideoSource :: setup ( config . inner , video_tx, ctx)
404+ ChannelVideoSource :: setup ( inner, video_tx, ctx)
351405 . await
352406 . map ( |source| Self {
353407 inner : source,
354- capturer : config . capturer ,
355- cancel_token : config . cancel_token ,
356- _drop_guard : config . drop_guard ,
357- video_frame_counter : config . video_frame_counter ,
408+ capturer,
409+ cancel_token,
410+ _drop_guard : drop_guard,
411+ video_frame_counter,
358412 } )
359413 }
360414
@@ -402,6 +456,16 @@ impl output_pipeline::VideoSource for VideoSource {
402456 }
403457}
404458
459+ fn is_system_stop_error ( err : & ns:: Error ) -> bool {
460+ const SCK_ERROR_DOMAIN : & str = "com.apple.ScreenCaptureKit.error" ;
461+
462+ if err. domain ( ) . to_string ( ) != SCK_ERROR_DOMAIN {
463+ return false ;
464+ }
465+
466+ err. localized_description ( ) . to_string ( ) == "Stream was stopped by the system"
467+ }
468+
405469pub struct SystemAudioSourceConfig (
406470 ChannelAudioSourceConfig ,
407471 Capturer ,
@@ -414,22 +478,36 @@ impl output_pipeline::AudioSource for SystemAudioSource {
414478 type Config = SystemAudioSourceConfig ;
415479
416480 fn setup (
417- mut config : Self :: Config ,
481+ config : Self :: Config ,
418482 tx : mpsc:: Sender < AudioFrame > ,
419483 ctx : & mut SetupCtx ,
420484 ) -> impl Future < Output = anyhow:: Result < Self > > + ' static
421485 where
422486 Self : Sized ,
423487 {
488+ let SystemAudioSourceConfig ( channel_config, capturer, mut error_rx) = config;
489+
424490 ctx. tasks ( ) . spawn ( "system-audio" , async move {
425- if let Ok ( err) = config. 2 . recv ( ) . await {
426- return Err ( anyhow ! ( "{err}" ) ) ;
491+ loop {
492+ match error_rx. recv ( ) . await {
493+ Ok ( err) => {
494+ if is_system_stop_error ( err. as_ref ( ) ) {
495+ warn ! ( "Screen capture audio stream stopped by the system; awaiting restart" ) ;
496+ continue ;
497+ }
498+
499+ return Err ( anyhow ! ( "{err}" ) ) ;
500+ }
501+ Err ( broadcast:: error:: RecvError :: Closed ) => break ,
502+ Err ( broadcast:: error:: RecvError :: Lagged ( _) ) => continue ,
503+ }
427504 }
428505
429506 Ok ( ( ) )
430507 } ) ;
431508
432- ChannelAudioSource :: setup ( config. 0 , tx, ctx) . map ( |v| v. map ( |source| Self ( source, config. 1 ) ) )
509+ ChannelAudioSource :: setup ( channel_config, tx, ctx)
510+ . map ( |v| v. map ( |source| Self ( source, capturer) ) )
433511 }
434512
435513 async fn start ( & mut self ) -> anyhow:: Result < ( ) > {
0 commit comments