@@ -474,115 +474,104 @@ pub fn from_pending_file_to_chunks(
474474 realtime_upload_done : Option < Receiver < ( ) > > ,
475475) -> impl Stream < Item = io:: Result < Chunk > > {
476476 try_stream ! {
477+ let mut file = tokio:: fs:: File :: open( & path) . await ?;
477478 let mut part_number = 1 ;
478479 let mut last_read_position: u64 = 0 ;
479480 let mut realtime_is_done = realtime_upload_done. as_ref( ) . map( |_| false ) ;
480481 let mut first_chunk_size: Option <u64 > = None ;
482+ let mut chunk_buffer = vec![ 0u8 ; CHUNK_SIZE as usize ] ;
481483
482484 loop {
483485 // Check if realtime recording is done
484- if !realtime_is_done. unwrap_or( true ) && let Some ( ref realtime_receiver) = realtime_upload_done {
485- match realtime_receiver. try_recv( ) {
486- Ok ( _) => realtime_is_done = Some ( true ) ,
487- Err ( flume:: TryRecvError :: Empty ) => { } ,
488- Err ( _) => yield Err ( std:: io:: Error :: new(
489- std:: io:: ErrorKind :: Interrupted ,
490- "Realtime generation failed"
491- ) ) ?,
492- } ;
493- }
494-
495- // Check file existence and size
496- if !path. exists( ) {
497- yield Err ( std:: io:: Error :: new(
498- std:: io:: ErrorKind :: NotFound ,
499- "File no longer exists"
500- ) ) ?;
486+ if !realtime_is_done. unwrap_or( true ) {
487+ if let Some ( ref realtime_receiver) = realtime_upload_done {
488+ match realtime_receiver. try_recv( ) {
489+ Ok ( _) => realtime_is_done = Some ( true ) ,
490+ Err ( flume:: TryRecvError :: Empty ) => { } ,
491+ Err ( _) => yield Err ( std:: io:: Error :: new(
492+ std:: io:: ErrorKind :: Interrupted ,
493+ "Realtime generation failed"
494+ ) ) ?,
495+ }
496+ }
501497 }
502498
503- let file_size = match tokio:: fs:: metadata( & path) . await {
499+ // Get current file size - reuse file handle for metadata
500+ let file_size = match file. metadata( ) . await {
504501 Ok ( metadata) => metadata. len( ) ,
505502 Err ( _) => {
506- // Retry on metadata errors (file might be temporarily locked)
507- tokio:: time:: sleep( Duration :: from_millis( 500 ) ) . await ;
503+ // File might be temporarily locked, retry with shorter delay
504+ tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . await ;
508505 continue ;
509506 }
510507 } ;
511508
512509 let new_data_size = file_size. saturating_sub( last_read_position) ;
513510
514- // Read chunk if we have enough data OR if recording is done with any data
511+ // Determine if we should read a chunk
515512 let should_read_chunk = if let Some ( is_done) = realtime_is_done {
516- // We have a realtime receiver - check if recording is done or we have enough data
517513 ( new_data_size >= CHUNK_SIZE ) || ( is_done && new_data_size > 0 )
518514 } else {
519- // No realtime receiver - read any available data
520515 new_data_size > 0
521516 } ;
522517
523518 if should_read_chunk {
524- let chunk_size = std:: cmp:: min( new_data_size, CHUNK_SIZE ) ;
519+ let chunk_size = std:: cmp:: min( new_data_size, CHUNK_SIZE ) as usize ;
525520
526- let mut file = tokio:: fs:: File :: open( & path) . await ?;
527521 file. seek( std:: io:: SeekFrom :: Start ( last_read_position) ) . await ?;
528522
529- let mut chunk = vec![ 0u8 ; chunk_size as usize ] ;
530523 let mut total_read = 0 ;
531-
532- while total_read < chunk_size as usize {
533- match file. read( & mut chunk[ total_read..] ) . await {
524+ while total_read < chunk_size {
525+ match file. read( & mut chunk_buffer[ total_read..chunk_size] ) . await {
534526 Ok ( 0 ) => break , // EOF
535527 Ok ( n) => total_read += n,
536528 Err ( e) => yield Err ( e) ?,
537529 }
538530 }
539531
540532 if total_read > 0 {
541- chunk. truncate( total_read) ;
542-
533+ // Remember first chunk size for later re-emission with updated header
543534 if last_read_position == 0 {
544- // This is the first chunk - remember its size so we can reemit it.
545535 first_chunk_size = Some ( total_read as u64 ) ;
546536 }
547537
548538 yield Chunk {
549539 total_size: file_size,
550540 part_number,
551- chunk: Bytes :: from ( chunk ) ,
541+ chunk: Bytes :: copy_from_slice ( & chunk_buffer [ ..total_read ] ) ,
552542 } ;
553543 part_number += 1 ;
554-
555544 last_read_position += total_read as u64 ;
556545 }
557546 } else if new_data_size == 0 && realtime_is_done. unwrap_or( true ) {
558- // Recording is done and no new data - now yield the first chunk
547+ // Recording is done and no new data - re-emit first chunk with corrected MP4 header
559548 if let Some ( first_size) = first_chunk_size {
560- let mut file = tokio:: fs:: File :: open( & path) . await ?;
561549 file. seek( std:: io:: SeekFrom :: Start ( 0 ) ) . await ?;
562550
563- let mut first_chunk = vec! [ 0u8 ; first_size as usize ] ;
551+ let chunk_size = first_size as usize ;
564552 let mut total_read = 0 ;
565553
566- while total_read < first_size as usize {
567- match file. read( & mut first_chunk [ total_read..] ) . await {
554+ while total_read < chunk_size {
555+ match file. read( & mut chunk_buffer [ total_read..chunk_size ] ) . await {
568556 Ok ( 0 ) => break ,
569557 Ok ( n) => total_read += n,
570558 Err ( e) => yield Err ( e) ?,
571559 }
572560 }
573561
574562 if total_read > 0 {
575- first_chunk . truncate ( total_read ) ;
563+ // Re-emit first chunk with part_number 1 to fix MP4 header
576564 yield Chunk {
577565 total_size: file_size,
578566 part_number: 1 ,
579- chunk: Bytes :: from ( first_chunk ) ,
567+ chunk: Bytes :: copy_from_slice ( & chunk_buffer [ ..total_read ] ) ,
580568 } ;
581569 }
582570 }
583571 break ;
584572 } else {
585- tokio:: time:: sleep( Duration :: from_millis( 500 ) ) . await ;
573+ // Reduced polling interval for better responsiveness
574+ tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . await ;
586575 }
587576 }
588577 }
0 commit comments