From 1d0eed7f1585942d93c2ff335e36e811bf9a4442 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Tue, 16 Apr 2024 13:34:02 -0500 Subject: [PATCH 1/5] feat: read_blob --- Cargo.lock | 1 + Cargo.toml | 1 + bindings/nodejs/Cargo.toml | 3 +- crates/bytesutil/Cargo.toml | 2 +- crates/datasources/Cargo.toml | 2 +- crates/ioutil/Cargo.toml | 2 +- crates/metastore/Cargo.toml | 2 +- crates/object_store_util/Cargo.toml | 2 +- crates/pgprototest/Cargo.toml | 2 +- crates/pgrepr/Cargo.toml | 2 +- crates/pgsrv/Cargo.toml | 2 +- crates/sqlbuiltins/Cargo.toml | 1 + crates/sqlbuiltins/src/functions/table/mod.rs | 3 + .../src/functions/table/object_store.rs | 10 +- .../src/functions/table/read_blob.rs | 363 ++++++++++++++++++ crates/sqlexec/Cargo.toml | 2 +- testdata/blob/hello.txt | 1 + .../sqllogictests/functions/read_blob.slt | 15 + .../gcs/read_blob.slt | 8 + 19 files changed, 408 insertions(+), 16 deletions(-) create mode 100644 crates/sqlbuiltins/src/functions/table/read_blob.rs create mode 100644 testdata/blob/hello.txt create mode 100644 testdata/sqllogictests/functions/read_blob.slt create mode 100644 testdata/sqllogictests_object_store/gcs/read_blob.slt diff --git a/Cargo.lock b/Cargo.lock index 9386375de..8545a07e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8077,6 +8077,7 @@ dependencies = [ "arrow-cast", "async-openai", "async-trait", + "bytes", "catalog", "datafusion", "datafusion-functions-array", diff --git a/Cargo.toml b/Cargo.toml index 45aedd2a1..8226a2aeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ tempfile = "3.10.1" thiserror = "1.0" tracing = "0.1" url = "2.5.0" +bytes = "1.6.0" [workspace.dependencies.deltalake] git = "https://github.com/GlareDB/delta-rs.git" diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index 2f0c0ca9c..851fe11a3 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -31,7 +31,6 @@ lzma-sys = { version = "*", features = ["static"] } # Prevent dynamic linking of napi = { version = "2.16.2", default-features = false, features = ["full"] } napi-derive = "2.16.2" once_cell = "1.19.0" -bytes = "1.6.0" - +bytes = { workspace = true } [build-dependencies] napi-build = "2.1.2" diff --git a/crates/bytesutil/Cargo.toml b/crates/bytesutil/Cargo.toml index 03d023c11..751c78e24 100644 --- a/crates/bytesutil/Cargo.toml +++ b/crates/bytesutil/Cargo.toml @@ -7,4 +7,4 @@ edition = {workspace = true} workspace = true [dependencies] -bytes = "1.6.0" +bytes = { workspace = true } \ No newline at end of file diff --git a/crates/datasources/Cargo.toml b/crates/datasources/Cargo.toml index 59f2ecd1b..be3ae061c 100644 --- a/crates/datasources/Cargo.toml +++ b/crates/datasources/Cargo.toml @@ -15,7 +15,7 @@ async-stream = "0.3.5" async-trait = { workspace = true } bigquery-storage = { git = "https://github.com/glaredb/bigquery-storage", branch = "deps/2023-10-27-update" } bitvec = "1" -bytes = "1.6.0" +bytes = { workspace = true } chrono = { workspace = true } chrono-tz = "0.8.6" datafusion = { workspace = true } diff --git a/crates/ioutil/Cargo.toml b/crates/ioutil/Cargo.toml index 2bc312c52..5dc2506a8 100644 --- a/crates/ioutil/Cargo.toml +++ b/crates/ioutil/Cargo.toml @@ -7,5 +7,5 @@ edition = {workspace = true} workspace = true [dependencies] -bytes = "1.6.0" +bytes = { workspace = true } home = "0.5.9" diff --git a/crates/metastore/Cargo.toml b/crates/metastore/Cargo.toml index 9ba40d3d0..1344495c4 100644 --- a/crates/metastore/Cargo.toml +++ b/crates/metastore/Cargo.toml @@ -24,7 +24,7 @@ tokio = { workspace = true } tonic = { workspace = true } tracing = { workspace = true } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] } -bytes = "1.6" +bytes = { workspace = true } once_cell = "1.19.0" tower = "0.4" dashmap = "5.5.0" diff --git a/crates/object_store_util/Cargo.toml b/crates/object_store_util/Cargo.toml index 9f0a0769a..39b0ccbbe 100644 --- a/crates/object_store_util/Cargo.toml +++ b/crates/object_store_util/Cargo.toml @@ -15,6 +15,6 @@ thiserror = { workspace = true } tracing = { workspace = true } tokio = { workspace = true } tempfile = "3" -bytes = "1.6.0" +bytes = { workspace = true } moka = { version = "0.12.5", features = ["future"] } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/crates/pgprototest/Cargo.toml b/crates/pgprototest/Cargo.toml index ea007b383..988f920eb 100644 --- a/crates/pgprototest/Cargo.toml +++ b/crates/pgprototest/Cargo.toml @@ -13,5 +13,5 @@ clap = { workspace = true } anyhow = { workspace = true } datadriven = "0.6.0" postgres-protocol = "0.6.5" -bytes = "1.6.0" +bytes = { workspace = true } fallible-iterator = "0.2.0" diff --git a/crates/pgrepr/Cargo.toml b/crates/pgrepr/Cargo.toml index 1b98e9991..e9bc79cd1 100644 --- a/crates/pgrepr/Cargo.toml +++ b/crates/pgrepr/Cargo.toml @@ -17,6 +17,6 @@ decimal = { path = "../decimal" } num-traits = "0.2.18" dtoa = "1.0.9" chrono-tz = "0.8.6" -bytes = "1.6.0" +bytes = { workspace = true } const_format = "0.2.32" once_cell = "1.19.0" diff --git a/crates/pgsrv/Cargo.toml b/crates/pgsrv/Cargo.toml index 3d8cb40d4..690a5cd5b 100644 --- a/crates/pgsrv/Cargo.toml +++ b/crates/pgsrv/Cargo.toml @@ -21,7 +21,7 @@ bytesutil = { path = "../bytesutil" } parser = { path = "../parser" } pgrepr = { path = "../pgrepr" } datafusion_ext = { path = "../datafusion_ext" } -bytes = "1.6.0" +bytes = { workspace = true } rustls = "0.21.10" webpki-roots = "0.26.1" tokio-rustls = "0.24.1" diff --git a/crates/sqlbuiltins/Cargo.toml b/crates/sqlbuiltins/Cargo.toml index eaeea430c..43eb3ae2a 100644 --- a/crates/sqlbuiltins/Cargo.toml +++ b/crates/sqlbuiltins/Cargo.toml @@ -26,6 +26,7 @@ tracing = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } reqwest = { workspace = true } +bytes = { workspace = true } async-openai = "0.20.0" once_cell = "1.19.0" num-traits = "0.2.18" diff --git a/crates/sqlbuiltins/src/functions/table/mod.rs b/crates/sqlbuiltins/src/functions/table/mod.rs index ac7b3d291..add8e30c0 100644 --- a/crates/sqlbuiltins/src/functions/table/mod.rs +++ b/crates/sqlbuiltins/src/functions/table/mod.rs @@ -14,6 +14,7 @@ mod mysql; mod object_store; mod parquet_metadata; mod postgres; +mod read_blob; mod snowflake; mod sqlite; @@ -52,6 +53,7 @@ use self::mysql::ReadMysql; use self::object_store::{CloudUpload, READ_CSV, READ_JSON, READ_PARQUET}; use self::parquet_metadata::ParquetMetadataFunc; use self::postgres::ReadPostgres; +use self::read_blob::READ_BLOB; use self::snowflake::ReadSnowflake; use self::sqlite::ReadSqlite; use self::sqlserver::ReadSqlServer; @@ -101,6 +103,7 @@ impl BuiltinTableFuncs { Arc::new(ReadSqlServer), Arc::new(ReadCassandra), // Object store + Arc::new(READ_BLOB), Arc::new(READ_PARQUET), Arc::new(READ_CSV), Arc::new(READ_JSON), diff --git a/crates/sqlbuiltins/src/functions/table/object_store.rs b/crates/sqlbuiltins/src/functions/table/object_store.rs index b62decdb5..a61e5c53c 100644 --- a/crates/sqlbuiltins/src/functions/table/object_store.rs +++ b/crates/sqlbuiltins/src/functions/table/object_store.rs @@ -170,15 +170,15 @@ impl WithCompression for ParquetFormat { #[derive(Debug, Clone)] pub struct ObjScanTableFunc { /// Primary name for the function. - name: &'static str, + pub(super) name: &'static str, /// Additional aliases for this function. - aliases: &'static [&'static str], + pub(super) aliases: &'static [&'static str], - description: &'static str, - example: &'static str, + pub(super) description: &'static str, + pub(super) example: &'static str, - phantom: PhantomData, + pub(super) phantom: PhantomData, } impl BuiltinFunction for ObjScanTableFunc { diff --git a/crates/sqlbuiltins/src/functions/table/read_blob.rs b/crates/sqlbuiltins/src/functions/table/read_blob.rs new file mode 100644 index 000000000..b5d93bfbb --- /dev/null +++ b/crates/sqlbuiltins/src/functions/table/read_blob.rs @@ -0,0 +1,363 @@ +use std::any::Any; +use std::collections::HashMap; +use std::io::{Read, Seek, SeekFrom}; +use std::marker::PhantomData; +use std::sync::Arc; +use std::vec; + +use async_trait::async_trait; +use datafusion::arrow::array::{ + ArrayRef, + BinaryArray, + Int64Array, + RecordBatch, + StringArray, + TimestampNanosecondArray, +}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use datafusion::common::{FileType, Statistics}; +use datafusion::datasource::file_format::file_compression_type::FileCompressionType; +use datafusion::datasource::file_format::FileFormat; +use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream}; +use datafusion::error::{DataFusionError, Result as DatafusionResult}; +use datafusion::execution::context::SessionState; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PhysicalExpr}; +use datafusion_ext::errors::Result; +use datafusion_ext::functions::FuncParamValue; +use futures::{StreamExt, TryStreamExt}; +use object_store::{collect_bytes, GetOptions, ObjectMeta, ObjectStore}; +use once_cell::sync::Lazy; + +use super::object_store::{ObjScanTableFunc, OptionReader, WithCompression}; + +pub const READ_BLOB: ObjScanTableFunc = ObjScanTableFunc { + name: "read_blob", + aliases: &["read_binary"], + description: "reads from the selected source(s) to a binary blob.", + example: "SELECT size, content, filename FROM read_blob('./README.md')", + phantom: PhantomData, +}; + +#[derive(Debug, Clone, Copy)] +pub struct BlobOptionsReader; + +#[derive(Debug, Clone, Copy)] +pub struct BlobFormat { + file_compression_type: FileCompressionType, +} + +impl Default for BlobFormat { + fn default() -> Self { + Self { + file_compression_type: FileCompressionType::UNCOMPRESSED, + } + } +} + +impl BlobFormat { + /// Set a `FileCompressionType` of JSON + /// - defaults to `FileCompressionType::UNCOMPRESSED` + pub fn with_file_compression_type( + mut self, + file_compression_type: FileCompressionType, + ) -> Self { + self.file_compression_type = file_compression_type; + self + } +} + +static BLOB_SCHEMA: Lazy = Lazy::new(|| { + Arc::new(Schema::new(vec![ + Field::new("filename", DataType::Utf8, true), + Field::new("content", DataType::Binary, true), + Field::new("size", DataType::Int64, true), + Field::new( + "last_modified", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + ])) +}); + +#[derive(Debug)] +struct ReadBlobExec { + base_config: FileScanConfig, + file_compression_type: FileCompressionType, + projected_schema: SchemaRef, + projected_output_ordering: Vec, + projected_statistics: Statistics, + metrics: ExecutionPlanMetricsSet, +} + +impl ReadBlobExec { + pub fn new(base_config: FileScanConfig, file_compression_type: FileCompressionType) -> Self { + let (projected_schema, projected_statistics, projected_output_ordering) = + base_config.project(); + + Self { + base_config, + file_compression_type, + projected_schema, + projected_output_ordering, + projected_statistics, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +struct BlobOpener { + object_store: Arc, + projected_schema: SchemaRef, + file_compression_type: FileCompressionType, +} + +impl BlobOpener { + pub fn new( + object_store: Arc, + projected_schema: SchemaRef, + file_compression_type: FileCompressionType, + ) -> Self { + Self { + object_store, + projected_schema, + file_compression_type, + } + } +} + +impl FileOpener for BlobOpener { + fn open( + &self, + file_meta: datafusion::datasource::physical_plan::FileMeta, + ) -> DatafusionResult { + let store = self.object_store.clone(); + let schema = self.projected_schema.clone(); + let file_compression_type = self.file_compression_type; + + Ok(Box::pin(async move { + let options = GetOptions::default(); + let result = store.get_opts(file_meta.location(), options).await?; + + // manually add columns based on the projected schema + let mut columns = Vec::new(); + + let mut size = schema + .column_with_name("size") + .map(|_| Arc::new(Int64Array::from(vec![result.meta.size as i64])) as ArrayRef); + + let mut last_modified = schema.column_with_name("last_modified").map(|_| { + Arc::new(TimestampNanosecondArray::from_vec( + vec![result.meta.last_modified.timestamp_nanos()], + None, + )) as ArrayRef + }); + + let mut filename = schema.column_with_name("filename").map(|_| { + Arc::new(StringArray::from(vec![result.meta.location.to_string()])) as ArrayRef + }); + + let mut content = match schema.column_with_name("content") { + Some(_) => { + let len = result.range.end - result.range.start; + match result.payload { + object_store::GetResultPayload::File(mut file, _) => { + let mut bytes = match file_meta.range { + None => file_compression_type.convert_read(file)?, + Some(_) => { + file.seek(SeekFrom::Start(result.range.start as _))?; + let limit = result.range.end - result.range.start; + file_compression_type.convert_read(file.take(limit as u64))? + } + }; + let mut data = Vec::new(); + bytes.read_to_end(&mut data)?; + + Some(Arc::new(BinaryArray::from_vec(vec![&data])) as ArrayRef) + } + object_store::GetResultPayload::Stream(s) => { + let s = s.map_err(DataFusionError::from); + + let s = file_compression_type.convert_stream(s.boxed())?.fuse(); + let bytes = collect_bytes(s, Some(len)).await?; + Some(Arc::new(BinaryArray::from_vec(vec![&bytes])) as ArrayRef) + } + } + } + None => None, + }; + + for field in schema.fields() { + // we need to do this hacky option take because of the borrow checker. + // these matches should only ever match once, but the borrow checker doesn't know that. + // so we just wrap the value in an option to ensure it's only taken once. + match field.name().as_str() { + "size" => { + if let Some(size) = size.take() { + columns.push(size); + } + } + "last_modified" => { + if let Some(last_modified) = last_modified.take() { + columns.push(last_modified); + } + } + "filename" => { + if let Some(filename) = filename.take() { + columns.push(filename); + } + } + "content" => { + if let Some(content) = content.take() { + columns.push(content); + } + } + _ => panic!("unexpected column name: {}", field.name()), + } + } + + let batch = RecordBatch::try_new(schema.clone(), columns)?; + let iterator = vec![batch].into_iter().map(Ok); + let stream = futures::stream::iter(iterator).boxed(); + Ok(stream) + })) + } +} + +impl DisplayAs for ReadBlobExec { + fn fmt_as( + &self, + t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "ReadBlobExec: ")?; + self.base_config.fmt_as(t, f) + } +} + +impl ExecutionPlan for ReadBlobExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.projected_output_ordering + .first() + .map(|ordering| ordering.as_slice()) + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DatafusionResult> { + if !children.is_empty() { + return Err(datafusion::error::DataFusionError::Plan( + "ReadBlobExec does not accept children".to_string(), + )); + } + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DatafusionResult { + let object_store = context + .runtime_env() + .object_store(&self.base_config.object_store_url)?; + + let opener = BlobOpener::new( + object_store, + self.projected_schema.clone(), + self.file_compression_type, + ); + + let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?; + + Ok(Box::pin(stream) as SendableRecordBatchStream) + } + + fn equivalence_properties(&self) -> EquivalenceProperties { + EquivalenceProperties::new_with_orderings(self.schema(), &self.projected_output_ordering) + } + + fn statistics(&self) -> DatafusionResult { + Ok(self.projected_statistics.clone()) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +#[async_trait] +impl FileFormat for BlobFormat { + fn as_any(&self) -> &dyn Any { + self + } + async fn infer_schema( + &self, + _state: &SessionState, + _store: &Arc, + _objects: &[ObjectMeta], + ) -> DatafusionResult { + Ok(BLOB_SCHEMA.clone()) + } + + async fn infer_stats( + &self, + _state: &SessionState, + _store: &Arc, + _table_schema: SchemaRef, + _object: &ObjectMeta, + ) -> DatafusionResult { + Ok(Statistics::new_unknown(BLOB_SCHEMA.as_ref())) + } + + async fn create_physical_plan( + &self, + _state: &SessionState, + conf: FileScanConfig, + _filters: Option<&Arc>, + ) -> DatafusionResult> { + Ok(Arc::new(ReadBlobExec::new( + conf, + self.file_compression_type, + ))) + } + + fn file_type(&self) -> FileType { + panic!("BlobFormat does not support file_type") + } +} + +impl OptionReader for BlobOptionsReader { + type Format = BlobFormat; + + const OPTIONS: &'static [(&'static str, DataType)] = &[]; + + fn read_options(_opts: &HashMap) -> Result { + Ok(BlobFormat::default()) + } +} + +impl WithCompression for BlobFormat { + fn with_compression(self, compression: FileCompressionType) -> Result { + Ok(self.with_file_compression_type(compression)) + } +} diff --git a/crates/sqlexec/Cargo.toml b/crates/sqlexec/Cargo.toml index 91831f17a..379db29f8 100644 --- a/crates/sqlexec/Cargo.toml +++ b/crates/sqlexec/Cargo.toml @@ -36,8 +36,8 @@ tonic = { workspace = true } serde = { workspace = true } reqwest = { workspace = true } url = { workspace = true } +bytes = { workspace = true } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] } -bytes = "1.6.0" tokio-postgres = "0.7.8" once_cell = "1.19.0" parking_lot = "0.12.1" diff --git a/testdata/blob/hello.txt b/testdata/blob/hello.txt new file mode 100644 index 000000000..95d09f2b1 --- /dev/null +++ b/testdata/blob/hello.txt @@ -0,0 +1 @@ +hello world \ No newline at end of file diff --git a/testdata/sqllogictests/functions/read_blob.slt b/testdata/sqllogictests/functions/read_blob.slt new file mode 100644 index 000000000..6f461fb16 --- /dev/null +++ b/testdata/sqllogictests/functions/read_blob.slt @@ -0,0 +1,15 @@ +query I +select CAST(content as text) from read_blob('./testdata/blob/hello.txt'); +---- +hello world + + +query T +select string_to_array(filename, '/')[-1] as filename from read_blob('testdata/parquet/*') order by filename asc +---- +userdata0.parquet +userdata1.parquet +userdata1.parquet.bz2 +userdata1.parquet.gz +userdata1.parquet.xz +userdata1.parquet.zst \ No newline at end of file diff --git a/testdata/sqllogictests_object_store/gcs/read_blob.slt b/testdata/sqllogictests_object_store/gcs/read_blob.slt new file mode 100644 index 000000000..6a59fcca7 --- /dev/null +++ b/testdata/sqllogictests_object_store/gcs/read_blob.slt @@ -0,0 +1,8 @@ +query T +select filename from read_blob('gs://${GCS_BUCKET_NAME}/multi_csv/**/*', gcp_creds); +---- +multi_csv/bikeshare_stations.csv +multi_csv/bikeshare_stations_2.csv + +statement ok +select * from read_blob('gs://${GCS_BUCKET_NAME}/multi_csv/**/*', gcp_creds); From 2538deaa9f9f69014f3f47f2c0fa576845e062cd Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Tue, 16 Apr 2024 13:37:26 -0500 Subject: [PATCH 2/5] cleanup --- crates/sqlbuiltins/src/functions/table/read_blob.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/sqlbuiltins/src/functions/table/read_blob.rs b/crates/sqlbuiltins/src/functions/table/read_blob.rs index b5d93bfbb..d298b32e7 100644 --- a/crates/sqlbuiltins/src/functions/table/read_blob.rs +++ b/crates/sqlbuiltins/src/functions/table/read_blob.rs @@ -58,8 +58,6 @@ impl Default for BlobFormat { } impl BlobFormat { - /// Set a `FileCompressionType` of JSON - /// - defaults to `FileCompressionType::UNCOMPRESSED` pub fn with_file_compression_type( mut self, file_compression_type: FileCompressionType, From 33397474a6c8b274f652a15fd63472cc485dd77f Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Tue, 16 Apr 2024 13:55:19 -0500 Subject: [PATCH 3/5] add gcp_creds --- testdata/sqllogictests_object_store/gcs/read_blob.slt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/testdata/sqllogictests_object_store/gcs/read_blob.slt b/testdata/sqllogictests_object_store/gcs/read_blob.slt index 6a59fcca7..a8567f31f 100644 --- a/testdata/sqllogictests_object_store/gcs/read_blob.slt +++ b/testdata/sqllogictests_object_store/gcs/read_blob.slt @@ -1,3 +1,10 @@ +statement ok +CREATE CREDENTIAL gcp_creds +PROVIDER gcp OPTIONS ( + service_account_key '${GCP_SERVICE_ACCOUNT_KEY}' +); + + query T select filename from read_blob('gs://${GCS_BUCKET_NAME}/multi_csv/**/*', gcp_creds); ---- From fd1d19baa959b4e71960fcdeba06f2b5033fdde8 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Wed, 17 Apr 2024 09:26:35 -0500 Subject: [PATCH 4/5] remove "option" hack --- .../src/functions/table/read_blob.rs | 135 ++++++++---------- .../sqllogictests/functions/read_blob.slt | 12 +- 2 files changed, 73 insertions(+), 74 deletions(-) diff --git a/crates/sqlbuiltins/src/functions/table/read_blob.rs b/crates/sqlbuiltins/src/functions/table/read_blob.rs index d298b32e7..331791326 100644 --- a/crates/sqlbuiltins/src/functions/table/read_blob.rs +++ b/crates/sqlbuiltins/src/functions/table/read_blob.rs @@ -27,6 +27,7 @@ use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PhysicalExpr}; use datafusion_ext::errors::Result; use datafusion_ext::functions::FuncParamValue; +use futures::stream::once; use futures::{StreamExt, TryStreamExt}; use object_store::{collect_bytes, GetOptions, ObjectMeta, ObjectStore}; use once_cell::sync::Lazy; @@ -139,86 +140,74 @@ impl FileOpener for BlobOpener { let options = GetOptions::default(); let result = store.get_opts(file_meta.location(), options).await?; - // manually add columns based on the projected schema let mut columns = Vec::new(); + if let Some((idx, _)) = schema.column_with_name("size") { + columns.push(( + idx, + Arc::new(Int64Array::from(vec![result.meta.size as i64])) as ArrayRef, + )); + } - let mut size = schema - .column_with_name("size") - .map(|_| Arc::new(Int64Array::from(vec![result.meta.size as i64])) as ArrayRef); - - let mut last_modified = schema.column_with_name("last_modified").map(|_| { - Arc::new(TimestampNanosecondArray::from_vec( - vec![result.meta.last_modified.timestamp_nanos()], - None, - )) as ArrayRef - }); - - let mut filename = schema.column_with_name("filename").map(|_| { - Arc::new(StringArray::from(vec![result.meta.location.to_string()])) as ArrayRef - }); - - let mut content = match schema.column_with_name("content") { - Some(_) => { - let len = result.range.end - result.range.start; - match result.payload { - object_store::GetResultPayload::File(mut file, _) => { - let mut bytes = match file_meta.range { - None => file_compression_type.convert_read(file)?, - Some(_) => { - file.seek(SeekFrom::Start(result.range.start as _))?; - let limit = result.range.end - result.range.start; - file_compression_type.convert_read(file.take(limit as u64))? - } - }; - let mut data = Vec::new(); - bytes.read_to_end(&mut data)?; - - Some(Arc::new(BinaryArray::from_vec(vec![&data])) as ArrayRef) - } - object_store::GetResultPayload::Stream(s) => { - let s = s.map_err(DataFusionError::from); - - let s = file_compression_type.convert_stream(s.boxed())?.fuse(); - let bytes = collect_bytes(s, Some(len)).await?; - Some(Arc::new(BinaryArray::from_vec(vec![&bytes])) as ArrayRef) - } - } - } - None => None, - }; - - for field in schema.fields() { - // we need to do this hacky option take because of the borrow checker. - // these matches should only ever match once, but the borrow checker doesn't know that. - // so we just wrap the value in an option to ensure it's only taken once. - match field.name().as_str() { - "size" => { - if let Some(size) = size.take() { - columns.push(size); - } - } - "last_modified" => { - if let Some(last_modified) = last_modified.take() { - columns.push(last_modified); - } - } - "filename" => { - if let Some(filename) = filename.take() { - columns.push(filename); - } + if let Some((idx, _)) = schema.column_with_name("last_modified") { + columns.push(( + idx, + Arc::new(TimestampNanosecondArray::from_vec( + vec![result.meta.last_modified.timestamp_nanos()], + None, + )), + )); + } + + if let Some((idx, _)) = schema.column_with_name("filename") { + columns.push(( + idx, + Arc::new(StringArray::from(vec![result.meta.location.to_string()])) as ArrayRef, + )); + } + + if let Some((idx, _)) = schema.column_with_name("content") { + let len = result.range.end - result.range.start; + match result.payload { + object_store::GetResultPayload::File(mut file, _) => { + let mut bytes = match file_meta.range { + None => file_compression_type.convert_read(file)?, + Some(_) => { + file.seek(SeekFrom::Start(result.range.start as _))?; + let limit = result.range.end - result.range.start; + file_compression_type.convert_read(file.take(limit as u64))? + } + }; + let mut data = Vec::new(); + bytes.read_to_end(&mut data)?; + + columns.push(( + idx, + Arc::new(BinaryArray::from_vec(vec![&data])) as ArrayRef, + )); } - "content" => { - if let Some(content) = content.take() { - columns.push(content); - } + object_store::GetResultPayload::Stream(s) => { + let s = s.map_err(DataFusionError::from); + + let s = file_compression_type.convert_stream(s.boxed())?.fuse(); + let bytes = collect_bytes(s, Some(len)).await?; + columns.push(( + idx, + Arc::new(BinaryArray::from_vec(vec![&bytes])) as ArrayRef, + )) } - _ => panic!("unexpected column name: {}", field.name()), } } - let batch = RecordBatch::try_new(schema.clone(), columns)?; - let iterator = vec![batch].into_iter().map(Ok); - let stream = futures::stream::iter(iterator).boxed(); + // sort columns by index. + // This retains the order of the columns as they were defined in the schema + columns.sort_by(|a, b| a.0.cmp(&b.0)); + + let batch = RecordBatch::try_new( + schema.clone(), + columns.into_iter().map(|(_, v)| v).collect(), + )?; + + let stream = once(async move { Ok(batch) }).boxed(); Ok(stream) })) } diff --git a/testdata/sqllogictests/functions/read_blob.slt b/testdata/sqllogictests/functions/read_blob.slt index 6f461fb16..9d4b73eae 100644 --- a/testdata/sqllogictests/functions/read_blob.slt +++ b/testdata/sqllogictests/functions/read_blob.slt @@ -12,4 +12,14 @@ userdata1.parquet userdata1.parquet.bz2 userdata1.parquet.gz userdata1.parquet.xz -userdata1.parquet.zst \ No newline at end of file +userdata1.parquet.zst + + +# make sure the projections are working properly +statement ok +select size, last_modified, filename from read_blob('testdata/parquet/*'); + +# if the projections are working properly, then the order of the columns should not matter +statement ok +select filename, last_modified, size from read_blob('testdata/parquet/*'); + From 631511d00ea41bb785b34351c382b6b23db99a0b Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Wed, 17 Apr 2024 09:33:23 -0500 Subject: [PATCH 5/5] cleanup and comments --- .../src/functions/table/read_blob.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/crates/sqlbuiltins/src/functions/table/read_blob.rs b/crates/sqlbuiltins/src/functions/table/read_blob.rs index 331791326..d623adf87 100644 --- a/crates/sqlbuiltins/src/functions/table/read_blob.rs +++ b/crates/sqlbuiltins/src/functions/table/read_blob.rs @@ -140,6 +140,10 @@ impl FileOpener for BlobOpener { let options = GetOptions::default(); let result = store.get_opts(file_meta.location(), options).await?; + // We build up the columns with their index in the schema + // The index is needed to retain the order of the projected schema + // Such as `select filename, size from read_blob(...)` -> [filename, size] + // instead of the default [size, filename], which is what we'd output without reordering. let mut columns = Vec::new(); if let Some((idx, _)) = schema.column_with_name("size") { columns.push(( @@ -161,7 +165,7 @@ impl FileOpener for BlobOpener { if let Some((idx, _)) = schema.column_with_name("filename") { columns.push(( idx, - Arc::new(StringArray::from(vec![result.meta.location.to_string()])) as ArrayRef, + Arc::new(StringArray::from(vec![result.meta.location.to_string()])), )); } @@ -180,26 +184,19 @@ impl FileOpener for BlobOpener { let mut data = Vec::new(); bytes.read_to_end(&mut data)?; - columns.push(( - idx, - Arc::new(BinaryArray::from_vec(vec![&data])) as ArrayRef, - )); + columns.push((idx, Arc::new(BinaryArray::from_vec(vec![&data])))); } object_store::GetResultPayload::Stream(s) => { let s = s.map_err(DataFusionError::from); let s = file_compression_type.convert_stream(s.boxed())?.fuse(); let bytes = collect_bytes(s, Some(len)).await?; - columns.push(( - idx, - Arc::new(BinaryArray::from_vec(vec![&bytes])) as ArrayRef, - )) + columns.push((idx, Arc::new(BinaryArray::from_vec(vec![&bytes])))) } } } - // sort columns by index. - // This retains the order of the columns as they were defined in the schema + // reorder the columns based on their index in the schema columns.sort_by(|a, b| a.0.cmp(&b.0)); let batch = RecordBatch::try_new(