From 9259bf1464e6bbdb0b603155cfa59b0b1baeee04 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 18 Nov 2024 08:01:54 +0100 Subject: [PATCH] Circumvent ListingTable regression in the upload endpoint --- src/frontend/http.rs | 41 ++++++++++++++++++++++++----------------- src/sync/planner.rs | 4 ++-- tests/http/upload.rs | 5 ++--- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/frontend/http.rs b/src/frontend/http.rs index e9231264..b55e6193 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -28,7 +28,7 @@ use percent_encoding::{percent_decode_str, utf8_percent_encode, NON_ALPHANUMERIC use serde::Deserialize; use serde_json::json; use sha2::{Digest, Sha256}; - +use tempfile::NamedTempFile; use tracing::{debug, info, warn}; use warp::http::HeaderValue; use warp::log::Info; @@ -397,9 +397,9 @@ pub async fn upload( let mut has_header = true; let mut schema: Option = None; let mut filename = String::new(); - let ref_temp_file = context.inner.runtime_env().disk_manager.create_tmp_file( - format!("Creating a target file to append to {database_name}.{schema_name}.{table_name}").as_str(), - )?; + let mut file_type = String::new(); + let mut temp_path = String::new(); + let mut temp_file: NamedTempFile; while let Some(maybe_part) = form.next().await { let mut part = maybe_part.map_err(ApiError::UploadBodyLoadError)?; @@ -426,34 +426,41 @@ pub async fn upload( .ok_or(ApiError::UploadMissingFile)? .to_string(); + if filename.is_empty() { + return Err(ApiError::UploadMissingFile); + } + + file_type = filename + .split('.') + .last() + .ok_or_else(|| { + ApiError::UploadMissingFilenameExtension(filename.clone()) + })? + .to_string(); + + temp_file = tempfile::Builder::new() + .suffix(&format!(".{file_type}")) + .tempfile()?; + temp_path = temp_file.path().display().to_string(); + // Write out the incoming bytes into the temporary file while let Some(maybe_bytes) = part.data().await { - ref_temp_file.inner().write_all( + temp_file.write_all( maybe_bytes.map_err(ApiError::UploadBodyLoadError)?.chunk(), )?; } } } - if filename.is_empty() { - return Err(ApiError::UploadMissingFile); - } - - let file_type = filename - .split('.') - .last() - .ok_or_else(|| ApiError::UploadMissingFilenameExtension(filename.clone()))?; - if file_type != "csv" && file_type != "parquet" { return Err(ApiError::UploadUnsupportedFileFormat(filename)); }; // Execute the plan and persist objects as well as table/partition metadata - let temp_path = ref_temp_file.path(); let table = context .file_to_table( - temp_path.display().to_string(), - file_type, + temp_path, + &file_type, schema, has_header, schema_name.clone(), diff --git a/src/sync/planner.rs b/src/sync/planner.rs index 49171497..5f0b823c 100644 --- a/src/sync/planner.rs +++ b/src/sync/planner.rs @@ -975,9 +975,9 @@ mod tests { } }); let expected_plan = vec![ - "ProjectionExec: expr=[CASE WHEN __upsert_col@5 IS NULL THEN c1@0 ELSE new_pk_c1@3 END as c1, CASE WHEN __upsert_col@5 IS NULL THEN c2@1 ELSE value_c2@4 END as c2]", + "ProjectionExec: expr=[CASE WHEN __upsert_col@4 IS NULL THEN c1@0 ELSE new_pk_c1@2 END as c1, CASE WHEN __upsert_col@4 IS NULL THEN c2@1 ELSE value_c2@3 END as c2]", " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: __upsert_col@5 IS DISTINCT FROM false projection=[c1@0, c2@1, new_pk_c1@3, value_c2@4, __upsert_col@5]", + " FilterExec: __upsert_col@5 IS DISTINCT FROM false, projection=[c1@0, c2@1, new_pk_c1@3, value_c2@4, __upsert_col@5]", " CoalesceBatchesExec: target_batch_size=8192", " HashJoinExec: mode=Partitioned, join_type=Full, on=[(c1@0, new_pk_c1@0)]", " CoalesceBatchesExec: target_batch_size=8192", diff --git a/tests/http/upload.rs b/tests/http/upload.rs index 23f2dfb1..172d2c61 100644 --- a/tests/http/upload.rs +++ b/tests/http/upload.rs @@ -337,9 +337,8 @@ async fn test_upload_to_existing_table() { dbg!("Upload status is {}", status); - assert_eq!( - "Error during planning: Cannot cast file schema field col_4 of type Boolean to table schema field of type Timestamp(Microsecond, None)".to_string(), - String::from_utf8(output.stdout).unwrap() + assert!( + String::from_utf8(output.stdout).unwrap().contains("Error during planning: Cannot cast file schema field col_4 of type Boolean to table schema field of type Timestamp(Microsecond, None)") ); terminate.send(()).unwrap();