Skip to content

Commit

Permalink
Circumvent ListingTable regression in the upload endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 18, 2024
1 parent 6aa4e76 commit 9259bf1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
41 changes: 24 additions & 17 deletions src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use percent_encoding::{percent_decode_str, utf8_percent_encode, NON_ALPHANUMERIC
use serde::Deserialize;
use serde_json::json;
use sha2::{Digest, Sha256};

use tempfile::NamedTempFile;
use tracing::{debug, info, warn};
use warp::http::HeaderValue;
use warp::log::Info;
Expand Down Expand Up @@ -397,9 +397,9 @@ pub async fn upload(
let mut has_header = true;
let mut schema: Option<SchemaRef> = None;
let mut filename = String::new();
let ref_temp_file = context.inner.runtime_env().disk_manager.create_tmp_file(
format!("Creating a target file to append to {database_name}.{schema_name}.{table_name}").as_str(),
)?;
let mut file_type = String::new();
let mut temp_path = String::new();
let mut temp_file: NamedTempFile;
while let Some(maybe_part) = form.next().await {
let mut part = maybe_part.map_err(ApiError::UploadBodyLoadError)?;

Expand All @@ -426,34 +426,41 @@ pub async fn upload(
.ok_or(ApiError::UploadMissingFile)?
.to_string();

if filename.is_empty() {
return Err(ApiError::UploadMissingFile);
}

file_type = filename
.split('.')
.last()
.ok_or_else(|| {
ApiError::UploadMissingFilenameExtension(filename.clone())
})?
.to_string();

temp_file = tempfile::Builder::new()
.suffix(&format!(".{file_type}"))
.tempfile()?;
temp_path = temp_file.path().display().to_string();

// Write out the incoming bytes into the temporary file
while let Some(maybe_bytes) = part.data().await {
ref_temp_file.inner().write_all(
temp_file.write_all(
maybe_bytes.map_err(ApiError::UploadBodyLoadError)?.chunk(),
)?;
}
}
}

if filename.is_empty() {
return Err(ApiError::UploadMissingFile);
}

let file_type = filename
.split('.')
.last()
.ok_or_else(|| ApiError::UploadMissingFilenameExtension(filename.clone()))?;

if file_type != "csv" && file_type != "parquet" {
return Err(ApiError::UploadUnsupportedFileFormat(filename));
};

// Execute the plan and persist objects as well as table/partition metadata
let temp_path = ref_temp_file.path();
let table = context
.file_to_table(
temp_path.display().to_string(),
file_type,
temp_path,
&file_type,
schema,
has_header,
schema_name.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/sync/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,9 +975,9 @@ mod tests {
}
});
let expected_plan = vec![
"ProjectionExec: expr=[CASE WHEN __upsert_col@5 IS NULL THEN c1@0 ELSE new_pk_c1@3 END as c1, CASE WHEN __upsert_col@5 IS NULL THEN c2@1 ELSE value_c2@4 END as c2]",
"ProjectionExec: expr=[CASE WHEN __upsert_col@4 IS NULL THEN c1@0 ELSE new_pk_c1@2 END as c1, CASE WHEN __upsert_col@4 IS NULL THEN c2@1 ELSE value_c2@3 END as c2]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: __upsert_col@5 IS DISTINCT FROM false projection=[c1@0, c2@1, new_pk_c1@3, value_c2@4, __upsert_col@5]",
" FilterExec: __upsert_col@5 IS DISTINCT FROM false, projection=[c1@0, c2@1, new_pk_c1@3, value_c2@4, __upsert_col@5]",
" CoalesceBatchesExec: target_batch_size=8192",
" HashJoinExec: mode=Partitioned, join_type=Full, on=[(c1@0, new_pk_c1@0)]",
" CoalesceBatchesExec: target_batch_size=8192",
Expand Down
5 changes: 2 additions & 3 deletions tests/http/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,8 @@ async fn test_upload_to_existing_table() {

dbg!("Upload status is {}", status);

assert_eq!(
"Error during planning: Cannot cast file schema field col_4 of type Boolean to table schema field of type Timestamp(Microsecond, None)".to_string(),
String::from_utf8(output.stdout).unwrap()
assert!(
String::from_utf8(output.stdout).unwrap().contains("Error during planning: Cannot cast file schema field col_4 of type Boolean to table schema field of type Timestamp(Microsecond, None)")
);

terminate.send(()).unwrap();
Expand Down

0 comments on commit 9259bf1

Please sign in to comment.