@@ -106,6 +106,12 @@ async fn upload_single_parquet_file(
106106 . to_str ( )
107107 . expect ( "filename is valid string" ) ;
108108
109+ // Get the local file size for validation
110+ let local_file_size = path
111+ . metadata ( )
112+ . map_err ( |e| ObjectStorageError :: Custom ( format ! ( "Failed to get local file metadata: {e}" ) ) ) ?
113+ . len ( ) ;
114+
109115 // Upload the file
110116 store
111117 . upload_multipart ( & RelativePathBuf :: from ( & stream_relative_path) , & path)
@@ -115,6 +121,27 @@ async fn upload_single_parquet_file(
115121 ObjectStorageError :: Custom ( format ! ( "Failed to upload {filename}: {e}" ) )
116122 } ) ?;
117123
124+ // Validate the uploaded file size matches local file
125+ let upload_is_valid = validate_uploaded_parquet_file (
126+ & store,
127+ & stream_relative_path,
128+ local_file_size,
129+ & stream_name,
130+ )
131+ . await ?;
132+
133+ if !upload_is_valid {
134+ // Upload validation failed, clean up the uploaded file and return error
135+ let _ = store
136+ . delete_object ( & RelativePathBuf :: from ( & stream_relative_path) )
137+ . await ;
138+ error ! ( "Upload size validation failed for file {filename:?}, deleted from object storage" ) ;
139+ return Ok ( UploadResult {
140+ file_path : path,
141+ manifest_file : None , // This will trigger local file cleanup
142+ } ) ;
143+ }
144+
118145 // Update storage metrics
119146 update_storage_metrics ( & path, & stream_name, filename) ?;
120147
@@ -177,6 +204,44 @@ async fn calculate_stats_if_enabled(
177204 }
178205}
179206
207+ /// Validates that a parquet file uploaded to object storage matches the staging file size
208+ async fn validate_uploaded_parquet_file (
209+ store : & Arc < dyn ObjectStorage > ,
210+ stream_relative_path : & str ,
211+ expected_size : u64 ,
212+ stream_name : & str ,
213+ ) -> Result < bool , ObjectStorageError > {
214+ // Verify the file exists and has the expected size
215+ match store
216+ . head ( & RelativePathBuf :: from ( stream_relative_path) )
217+ . await
218+ {
219+ Ok ( metadata) => {
220+ if metadata. size as u64 != expected_size {
221+ warn ! (
222+ "Uploaded file size mismatch for stream {}: expected {}, got {}" ,
223+ stream_name, expected_size, metadata. size
224+ ) ;
225+ Ok ( false )
226+ } else {
227+ tracing:: trace!(
228+ "Uploaded parquet file size validated successfully for stream {}, size: {}" ,
229+ stream_name,
230+ expected_size
231+ ) ;
232+ Ok ( true )
233+ }
234+ }
235+ Err ( e) => {
236+ error ! (
237+ "Failed to get metadata for uploaded file in stream {}: {e}" ,
238+ stream_name
239+ ) ;
240+ Ok ( false )
241+ }
242+ }
243+ }
244+
180245pub trait ObjectStorageProvider : StorageMetrics + std:: fmt:: Debug + Send + Sync {
181246 fn get_datafusion_runtime ( & self ) -> RuntimeEnvBuilder ;
182247 fn construct_client ( & self ) -> Arc < dyn ObjectStorage > ;
@@ -880,14 +945,15 @@ async fn collect_upload_results(
880945 if let Some ( manifest_file) = upload_result. manifest_file {
881946 uploaded_files. push ( ( upload_result. file_path , manifest_file) ) ;
882947 } else {
883- // File failed to upload, clean up
884- if let Err ( e) = remove_file ( upload_result. file_path ) {
885- warn ! ( "Failed to remove staged file: {e}" ) ;
886- }
948+ // File failed in upload size validation, preserve staging file for retry
949+ error ! (
950+ "Parquet file upload size validation failed for {:?}, preserving in staging for retry" ,
951+ upload_result. file_path
952+ ) ;
887953 }
888954 }
889955 Ok ( Err ( e) ) => {
890- error ! ( "Error processing parquet file: {e}" ) ;
956+ error ! ( "Error uploading parquet file: {e}" ) ;
891957 return Err ( e) ;
892958 }
893959 Err ( e) => {
0 commit comments