@@ -13,7 +13,8 @@ use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta};
1313use cap_utils:: spawn_actor;
1414use ffmpeg:: ffi:: AV_TIME_BASE ;
1515use flume:: Receiver ;
16- use futures:: { Stream , StreamExt , TryStreamExt , future:: join, stream} ;
16+ use futures:: future:: join;
17+ use futures:: { Stream , StreamExt , TryStreamExt , stream} ;
1718use image:: { ImageReader , codecs:: jpeg:: JpegEncoder } ;
1819use reqwest:: StatusCode ;
1920use serde:: { Deserialize , Serialize } ;
@@ -24,6 +25,7 @@ use std::{
2425 path:: { Path , PathBuf } ,
2526 pin:: pin,
2627 str:: FromStr ,
28+ sync:: { Arc , Mutex , PoisonError } ,
2729 time:: Duration ,
2830} ;
2931use tauri:: { AppHandle , ipc:: Channel } ;
@@ -53,8 +55,9 @@ pub struct UploadProgressEvent {
5355 total : String ,
5456}
5557
56- // a typical recommended chunk size is 5MB (AWS min part size).
57- const CHUNK_SIZE : u64 = 5 * 1024 * 1024 ; // 5MB
58+ // The size of each S3 multipart upload chunk
59+ const MIN_CHUNK_SIZE : u64 = 5 * 1024 * 1024 ; // 5 MB
60+ const MAX_CHUNK_SIZE : u64 = 20 * 1024 * 1024 ; // 20 MB
5861
5962#[ instrument( skip( app, channel, file_path, screenshot_path) ) ]
6063pub async fn upload_video (
@@ -457,7 +460,7 @@ pub fn from_file_to_chunks(path: PathBuf) -> impl Stream<Item = io::Result<Chunk
457460 let total_size = file. metadata( ) . await ?. len( ) ;
458461 let mut file = BufReader :: new( file) ;
459462
460- let mut buf = vec![ 0u8 ; CHUNK_SIZE as usize ] ;
463+ let mut buf = vec![ 0u8 ; MAX_CHUNK_SIZE as usize ] ;
461464 let mut part_number = 0 ;
462465 loop {
463466 part_number += 1 ;
@@ -497,7 +500,7 @@ pub fn from_pending_file_to_chunks(
497500 let mut last_read_position: u64 = 0 ;
498501 let mut realtime_is_done = realtime_upload_done. as_ref( ) . map( |_| false ) ;
499502 let mut first_chunk_size: Option <u64 > = None ;
500- let mut chunk_buffer = vec![ 0u8 ; CHUNK_SIZE as usize ] ;
503+ let mut chunk_buffer = vec![ 0u8 ; MAX_CHUNK_SIZE as usize ] ;
501504
502505 loop {
503506 // Check if realtime recording is done
@@ -527,13 +530,13 @@ pub fn from_pending_file_to_chunks(
527530
528531 // Determine if we should read a chunk
529532 let should_read_chunk = if let Some ( is_done) = realtime_is_done {
530- ( new_data_size >= CHUNK_SIZE ) || ( is_done && new_data_size > 0 )
533+ ( new_data_size >= MIN_CHUNK_SIZE ) || ( is_done && new_data_size > 0 )
531534 } else {
532535 new_data_size > 0
533536 } ;
534537
535538 if should_read_chunk {
536- let chunk_size = std:: cmp:: min( new_data_size, CHUNK_SIZE ) as usize ;
539+ let chunk_size = std:: cmp:: min( new_data_size, MAX_CHUNK_SIZE ) as usize ;
537540
538541 file. seek( std:: io:: SeekFrom :: Start ( last_read_position) ) . await ?;
539542
@@ -620,98 +623,181 @@ fn multipart_uploader(
620623 app : AppHandle ,
621624 video_id : String ,
622625 upload_id : String ,
623- stream : impl Stream < Item = io:: Result < Chunk > > ,
624- ) -> impl Stream < Item = Result < UploadedPart , AuthedApiError > > {
626+ stream : impl Stream < Item = io:: Result < Chunk > > + Send + ' static ,
627+ ) -> impl Stream < Item = Result < UploadedPart , AuthedApiError > > + ' static {
628+ const MAX_CONCURRENT_UPLOADS : usize = 3 ;
629+
625630 debug ! ( "Initializing multipart uploader for video {video_id:?}" ) ;
626631 let start = Instant :: now ( ) ;
632+ let video_id2 = video_id. clone ( ) ;
633+
634+ stream:: once ( async move {
635+ let use_md5_hashes = app. is_server_url_custom ( ) . await ;
636+ let first_chunk_presigned_url = Arc :: new ( Mutex :: new ( None :: < ( String , Instant ) > ) ) ;
637+
638+ stream:: unfold (
639+ ( Box :: pin ( stream) , 1 ) ,
640+ move |( mut stream, expected_part_number) | {
641+ let app = app. clone ( ) ;
642+ let video_id = video_id. clone ( ) ;
643+ let upload_id = upload_id. clone ( ) ;
644+ let first_chunk_presigned_url = first_chunk_presigned_url. clone ( ) ;
645+
646+ async move {
647+ let ( Some ( item) , presigned_url) = join ( stream. next ( ) , async {
648+ // Self-hosted still uses the legacy web API which requires these so we can't presign the URL.
649+ if use_md5_hashes {
650+ return Ok ( None ) ;
651+ }
627652
628- try_stream ! {
629- let use_md5_hashes = app. is_server_url_custom( ) . await ;
630-
631- let mut stream = pin!( stream) ;
632- let mut prev_part_number = None ;
633- let mut expected_part_number = 1u32 ;
634-
635- loop {
636- let ( item, mut presigned_url, md5_sum) = if use_md5_hashes {
637- let Some ( item) = stream. next( ) . await else {
638- break ;
639- } ;
640- let item = item. map_err( |err| format!( "uploader/part/{:?}/fs: {err:?}" , prev_part_number. map( |p| p + 1 ) ) ) ?;
641- let md5_sum = base64:: encode( md5:: compute( & item. chunk) . 0 ) ;
642- let presigned_url = api:: upload_multipart_presign_part( & app, & video_id, & upload_id, expected_part_number, Some (
643- & md5_sum
644- ) ) . await ?;
645-
646- ( item, presigned_url, Some ( md5_sum) )
647- } else {
648- let ( Some ( item) , presigned_url) = join(
649- stream. next( ) ,
650- // We generate the presigned URL ahead of time for the part we expect to come next.
651- // If it's not the chunk that actually comes next we just throw it out.
652- // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing.
653- api:: upload_multipart_presign_part( & app, & video_id, & upload_id, expected_part_number, None )
654- ) . await else {
655- break ;
656- } ;
657-
658- let item = item. map_err( |err| format!( "uploader/part/{:?}/fs: {err:?}" , prev_part_number. map( |p| p + 1 ) ) ) ?;
659-
660- ( item, presigned_url?, None )
661- } ;
662-
663- let Chunk { total_size, part_number, chunk } = item;
664- trace!( "Uploading chunk {part_number} ({} bytes) for video {video_id:?}" , chunk. len( ) ) ;
665- prev_part_number = Some ( part_number) ;
666- let size = chunk. len( ) ;
667-
668- // We prefetched for the wrong chunk. Let's try again.
669- if expected_part_number != part_number {
670- presigned_url = api:: upload_multipart_presign_part( & app, & video_id, & upload_id, part_number, md5_sum. as_deref( ) )
671- . await ?
672- }
673-
674- trace!( "Uploading part {part_number}" ) ;
675-
676- let url = Uri :: from_str( & presigned_url) . map_err( |err| format!( "uploader/part/{part_number}/invalid_url: {err:?}" ) ) ?;
677- let mut req = retryable_client( url. host( ) . unwrap_or( "<unknown>" ) . to_string( ) )
678- . build( )
679- . map_err( |err| format!( "uploader/part/{part_number}/client: {err:?}" ) ) ?
680- . put( & presigned_url)
681- . header( "Content-Length" , chunk. len( ) )
682- . timeout( Duration :: from_secs( 5 * 60 ) ) . body( chunk) ;
683-
684- if let Some ( md5_sum) = & md5_sum {
685- req = req. header( "Content-MD5" , md5_sum) ;
686- }
687-
688- let resp = req
689- . send( )
690- . instrument( info_span!( "send" , size = size) )
691- . await
692- . map_err( |err| format!( "uploader/part/{part_number}/error: {err:?}" ) ) ?;
693-
694- let etag = resp. headers( ) . get( "ETag" ) . as_ref( ) . and_then( |etag| etag. to_str( ) . ok( ) ) . map( |v| v. trim_matches( '"' ) . to_string( ) ) ;
695-
696- match !resp. status( ) . is_success( ) {
697- true => Err ( format!( "uploader/part/{part_number}/error: {}" , resp. text( ) . await . unwrap_or_default( ) ) ) ,
698- false => Ok ( ( ) ) ,
699- } ?;
653+ // We generate the presigned URL ahead of time for the part we expect to come next.
654+ // If it's not the chunk that actually comes next we just throw it out.
655+ // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing.
656+ api:: upload_multipart_presign_part (
657+ & app,
658+ & video_id,
659+ & upload_id,
660+ expected_part_number,
661+ None ,
662+ )
663+ . await
664+ . map ( Some )
665+ } )
666+ . await
667+ else {
668+ return None ;
669+ } ;
700670
701- trace!( "Completed upload of part {part_number}" ) ;
671+ let part_number = item
672+ . as_ref ( )
673+ . map ( |c| c. part_number . to_string ( ) )
674+ . unwrap_or_else ( |_| "--" . into ( ) ) ;
675+
676+ Some ( (
677+ async move {
678+ let Chunk {
679+ total_size,
680+ part_number,
681+ chunk,
682+ } = item. map_err ( |err| {
683+ format ! ( "uploader/part/{:?}/fs: {err:?}" , expected_part_number)
684+ } ) ?;
685+ trace ! (
686+ "Uploading chunk {part_number} ({} bytes) for video {video_id:?}" ,
687+ chunk. len( )
688+ ) ;
689+
690+ // We prefetched for the wrong chunk. Let's try again with the correct part number now that we know it.
691+ let md5_sum =
692+ use_md5_hashes. then ( || base64:: encode ( md5:: compute ( & chunk) . 0 ) ) ;
693+ let presigned_url = if let Some ( url) = presigned_url?
694+ && part_number == expected_part_number
695+ {
696+ url
697+ } else if part_number == 1
698+ && !use_md5_hashes
699+ // We have a presigned URL left around from the first chunk
700+ && let Some ( ( url, expiry) ) = first_chunk_presigned_url
701+ . lock ( )
702+ . unwrap_or_else ( PoisonError :: into_inner)
703+ . clone ( )
704+ // The URL hasn't expired
705+ && expiry. elapsed ( ) < Duration :: from_secs ( 60 * 50 )
706+ {
707+ url
708+ } else {
709+ api:: upload_multipart_presign_part (
710+ & app,
711+ & video_id,
712+ & upload_id,
713+ part_number,
714+ md5_sum. as_deref ( ) ,
715+ )
716+ . await ?
717+ } ;
718+
719+ // We cache the presigned URL for the first chunk,
720+ // as for instant mode we upload the first chunk at the end again to include the updated video metadata.
721+ if part_number == 1 {
722+ * first_chunk_presigned_url
723+ . lock ( )
724+ . unwrap_or_else ( PoisonError :: into_inner) =
725+ Some ( ( presigned_url. clone ( ) , Instant :: now ( ) ) ) ;
726+ }
702727
703- yield UploadedPart {
704- etag: etag. ok_or_else( || format!( "uploader/part/{part_number}/error: ETag header not found" ) ) ?,
705- part_number,
706- size,
707- total_size
708- } ;
728+ let size = chunk. len ( ) ;
729+ let url = Uri :: from_str ( & presigned_url) . map_err ( |err| {
730+ format ! ( "uploader/part/{part_number}/invalid_url: {err:?}" )
731+ } ) ?;
732+ let mut req =
733+ retryable_client ( url. host ( ) . unwrap_or ( "<unknown>" ) . to_string ( ) )
734+ . build ( )
735+ . map_err ( |err| {
736+ format ! ( "uploader/part/{part_number}/client: {err:?}" )
737+ } ) ?
738+ . put ( & presigned_url)
739+ . header ( "Content-Length" , chunk. len ( ) )
740+ . timeout ( Duration :: from_secs ( 5 * 60 ) )
741+ . body ( chunk) ;
742+
743+ if let Some ( md5_sum) = & md5_sum {
744+ req = req. header ( "Content-MD5" , md5_sum) ;
745+ }
709746
710- expected_part_number = part_number + 1 ;
711- }
747+ let resp = req
748+ . send ( )
749+ . instrument ( info_span ! ( "s3_put" , size = size) )
750+ . await
751+ . map_err ( |err| {
752+ format ! ( "uploader/part/{part_number}/error: {err:?}" )
753+ } ) ?;
754+
755+ let etag = resp
756+ . headers ( )
757+ . get ( "ETag" )
758+ . as_ref ( )
759+ . and_then ( |etag| etag. to_str ( ) . ok ( ) )
760+ . map ( |v| v. trim_matches ( '"' ) . to_string ( ) ) ;
761+
762+ match !resp. status ( ) . is_success ( ) {
763+ true => Err ( format ! (
764+ "uploader/part/{part_number}/error: {}" ,
765+ resp. text( ) . await . unwrap_or_default( )
766+ ) ) ,
767+ false => Ok ( ( ) ) ,
768+ } ?;
769+
770+ trace ! ( "Completed upload of part {part_number}" ) ;
771+
772+ Ok :: < _ , AuthedApiError > ( UploadedPart {
773+ etag : etag. ok_or_else ( || {
774+ format ! (
775+ "uploader/part/{part_number}/error: ETag header not found"
776+ )
777+ } ) ?,
778+ part_number,
779+ size,
780+ total_size,
781+ } )
782+ }
783+ . instrument ( info_span ! ( "upload_part" , part_number = part_number) ) ,
784+ ( stream, expected_part_number + 1 ) ,
785+ ) )
786+ }
787+ } ,
788+ )
789+ . buffered ( MAX_CONCURRENT_UPLOADS )
790+ . boxed ( )
791+ } )
792+ . chain ( stream:: once ( async move {
793+ debug ! (
794+ "Completed multipart upload for {video_id2:?} in {:?}" ,
795+ start. elapsed( )
796+ ) ;
712797
713- debug!( "Completed multipart upload for {video_id:?} in {:?}" , start. elapsed( ) ) ;
714- }
798+ stream:: empty ( ) . boxed ( )
799+ } ) )
800+ . flatten ( )
715801 . instrument ( Span :: current ( ) )
716802}
717803
0 commit comments