@@ -496,7 +496,6 @@ pub fn from_pending_file_to_chunks(
496496 }
497497 }
498498
499- // Get current file size - reuse file handle for metadata
500499 let file_size = match file. metadata( ) . await {
501500 Ok ( metadata) => metadata. len( ) ,
502501 Err ( _) => {
@@ -560,7 +559,6 @@ pub fn from_pending_file_to_chunks(
560559 }
561560
562561 if total_read > 0 {
563- // Re-emit first chunk with part_number 1 to fix MP4 header
564562 yield Chunk {
565563 total_size: file_size,
566564 part_number: 1 ,
@@ -570,13 +568,31 @@ pub fn from_pending_file_to_chunks(
570568 }
571569 break ;
572570 } else {
573- // Reduced polling interval for better responsiveness
574571 tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . await ;
575572 }
576573 }
577574 }
578575}
579576
577+ fn retryable_client ( host : & str ) -> reqwest:: ClientBuilder {
578+ reqwest:: Client :: builder ( ) . retry (
579+ reqwest:: retry:: for_host ( host)
580+ . classify_fn ( |req_rep| {
581+ match req_rep. status ( ) {
582+ // Server errors
583+ Some ( s) if s. is_server_error ( ) || s == StatusCode :: TOO_MANY_REQUESTS => {
584+ req_rep. retryable ( )
585+ }
586+ // Network errors
587+ None => req_rep. retryable ( ) ,
588+ _ => req_rep. success ( ) ,
589+ }
590+ } )
591+ . max_retries_per_request ( 5 )
592+ . max_extra_load ( 5.0 ) ,
593+ )
594+ }
595+
580596/// Takes an incoming stream of bytes and individually uploads them to S3.
581597///
582598/// Note: It's on the caller to ensure the chunks are sized correctly within S3 limits.
@@ -603,14 +619,7 @@ fn multipart_uploader(
603619 . await ?;
604620
605621 let url = Uri :: from_str( & presigned_url) . map_err( |err| format!( "uploader/part/{part_number}/invalid_url: {err:?}" ) ) ?;
606- let resp = reqwest:: Client :: builder( )
607- . retry( reqwest:: retry:: for_host( url. host( ) . unwrap_or( "<unknown>" ) . to_string( ) ) . classify_fn( |req_rep| {
608- if req_rep. status( ) . is_some_and( |s| s. is_server_error( ) ) {
609- req_rep. retryable( )
610- } else {
611- req_rep. success( )
612- }
613- } ) )
622+ let resp = retryable_client( & url. host( ) . unwrap_or( "<unknown>" ) . to_string( ) )
614623 . build( )
615624 . map_err( |err| format!( "uploader/part/{part_number}/client: {err:?}" ) ) ?
616625 . put( & presigned_url)
@@ -650,19 +659,7 @@ pub async fn singlepart_uploader(
650659
651660 let url = Uri :: from_str ( & presigned_url)
652661 . map_err ( |err| format ! ( "singlepart_uploader/invalid_url: {err:?}" ) ) ?;
653- let resp = reqwest:: Client :: builder ( )
654- . retry (
655- reqwest:: retry:: for_host ( url. host ( ) . unwrap_or ( "<unknown>" ) . to_string ( ) )
656- . classify_fn ( |req_rep| {
657- if req_rep. status ( ) . is_some_and ( |s| s. is_server_error ( ) ) {
658- req_rep. retryable ( )
659- } else {
660- req_rep. success ( )
661- }
662- } )
663- . max_retries_per_request ( 5 )
664- . max_extra_load ( 5.0 ) ,
665- )
662+ let resp = retryable_client ( & url. host ( ) . unwrap_or ( "<unknown>" ) . to_string ( ) )
666663 . build ( )
667664 . map_err ( |err| format ! ( "singlepart_uploader/client: {err:?}" ) ) ?
668665 . put ( & presigned_url)
0 commit comments