diff --git a/Cargo.lock b/Cargo.lock index b5346e603..d0625dd3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8085,6 +8085,7 @@ dependencies = [ "arrow-cast", "async-openai", "async-trait", + "bytes", "catalog", "datafusion", "datafusion-functions-array", diff --git a/Cargo.toml b/Cargo.toml index 45aedd2a1..8226a2aeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ tempfile = "3.10.1" thiserror = "1.0" tracing = "0.1" url = "2.5.0" +bytes = "1.6.0" [workspace.dependencies.deltalake] git = "https://github.com/GlareDB/delta-rs.git" diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index 4f2156783..8de7790fb 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -32,7 +32,7 @@ lzma-sys = { version = "*", features = ["static"] } # Prevent dynamic linking of napi = { version = "2.16.2", default-features = false, features = ["full"] } napi-derive = "2.16.2" once_cell = "1.19.0" -bytes = "1.6.0" +bytes = { workspace = true } async-once-cell = "0.5.3" [build-dependencies] diff --git a/crates/bytesutil/Cargo.toml b/crates/bytesutil/Cargo.toml index 03d023c11..751c78e24 100644 --- a/crates/bytesutil/Cargo.toml +++ b/crates/bytesutil/Cargo.toml @@ -7,4 +7,4 @@ edition = {workspace = true} workspace = true [dependencies] -bytes = "1.6.0" +bytes = { workspace = true } \ No newline at end of file diff --git a/crates/datasources/Cargo.toml b/crates/datasources/Cargo.toml index 59f2ecd1b..be3ae061c 100644 --- a/crates/datasources/Cargo.toml +++ b/crates/datasources/Cargo.toml @@ -15,7 +15,7 @@ async-stream = "0.3.5" async-trait = { workspace = true } bigquery-storage = { git = "https://github.com/glaredb/bigquery-storage", branch = "deps/2023-10-27-update" } bitvec = "1" -bytes = "1.6.0" +bytes = { workspace = true } chrono = { workspace = true } chrono-tz = "0.8.6" datafusion = { workspace = true } diff --git a/crates/ioutil/Cargo.toml b/crates/ioutil/Cargo.toml index 2bc312c52..5dc2506a8 100644 --- a/crates/ioutil/Cargo.toml +++ b/crates/ioutil/Cargo.toml @@ -7,5 +7,5 @@ edition = {workspace = true} workspace = true [dependencies] -bytes = "1.6.0" +bytes = { workspace = true } home = "0.5.9" diff --git a/crates/metastore/Cargo.toml b/crates/metastore/Cargo.toml index 9ba40d3d0..1344495c4 100644 --- a/crates/metastore/Cargo.toml +++ b/crates/metastore/Cargo.toml @@ -24,7 +24,7 @@ tokio = { workspace = true } tonic = { workspace = true } tracing = { workspace = true } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] } -bytes = "1.6" +bytes = { workspace = true } once_cell = "1.19.0" tower = "0.4" dashmap = "5.5.0" diff --git a/crates/object_store_util/Cargo.toml b/crates/object_store_util/Cargo.toml index 9f0a0769a..39b0ccbbe 100644 --- a/crates/object_store_util/Cargo.toml +++ b/crates/object_store_util/Cargo.toml @@ -15,6 +15,6 @@ thiserror = { workspace = true } tracing = { workspace = true } tokio = { workspace = true } tempfile = "3" -bytes = "1.6.0" +bytes = { workspace = true } moka = { version = "0.12.5", features = ["future"] } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/crates/pgprototest/Cargo.toml b/crates/pgprototest/Cargo.toml index ea007b383..988f920eb 100644 --- a/crates/pgprototest/Cargo.toml +++ b/crates/pgprototest/Cargo.toml @@ -13,5 +13,5 @@ clap = { workspace = true } anyhow = { workspace = true } datadriven = "0.6.0" postgres-protocol = "0.6.5" -bytes = "1.6.0" +bytes = { workspace = true } fallible-iterator = "0.2.0" diff --git a/crates/pgrepr/Cargo.toml b/crates/pgrepr/Cargo.toml index 1b98e9991..e9bc79cd1 100644 --- a/crates/pgrepr/Cargo.toml +++ b/crates/pgrepr/Cargo.toml @@ -17,6 +17,6 @@ decimal = { path = "../decimal" } num-traits = "0.2.18" dtoa = "1.0.9" chrono-tz = "0.8.6" -bytes = "1.6.0" +bytes = { workspace = true } const_format = "0.2.32" once_cell = "1.19.0" diff --git a/crates/pgsrv/Cargo.toml b/crates/pgsrv/Cargo.toml index 3d8cb40d4..690a5cd5b 100644 --- a/crates/pgsrv/Cargo.toml +++ b/crates/pgsrv/Cargo.toml @@ -21,7 +21,7 @@ bytesutil = { path = "../bytesutil" } parser = { path = "../parser" } pgrepr = { path = "../pgrepr" } datafusion_ext = { path = "../datafusion_ext" } -bytes = "1.6.0" +bytes = { workspace = true } rustls = "0.21.10" webpki-roots = "0.26.1" tokio-rustls = "0.24.1" diff --git a/crates/sqlbuiltins/Cargo.toml b/crates/sqlbuiltins/Cargo.toml index eaeea430c..43eb3ae2a 100644 --- a/crates/sqlbuiltins/Cargo.toml +++ b/crates/sqlbuiltins/Cargo.toml @@ -26,6 +26,7 @@ tracing = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } reqwest = { workspace = true } +bytes = { workspace = true } async-openai = "0.20.0" once_cell = "1.19.0" num-traits = "0.2.18" diff --git a/crates/sqlbuiltins/src/functions/table/mod.rs b/crates/sqlbuiltins/src/functions/table/mod.rs index ac7b3d291..add8e30c0 100644 --- a/crates/sqlbuiltins/src/functions/table/mod.rs +++ b/crates/sqlbuiltins/src/functions/table/mod.rs @@ -14,6 +14,7 @@ mod mysql; mod object_store; mod parquet_metadata; mod postgres; +mod read_blob; mod snowflake; mod sqlite; @@ -52,6 +53,7 @@ use self::mysql::ReadMysql; use self::object_store::{CloudUpload, READ_CSV, READ_JSON, READ_PARQUET}; use self::parquet_metadata::ParquetMetadataFunc; use self::postgres::ReadPostgres; +use self::read_blob::READ_BLOB; use self::snowflake::ReadSnowflake; use self::sqlite::ReadSqlite; use self::sqlserver::ReadSqlServer; @@ -101,6 +103,7 @@ impl BuiltinTableFuncs { Arc::new(ReadSqlServer), Arc::new(ReadCassandra), // Object store + Arc::new(READ_BLOB), Arc::new(READ_PARQUET), Arc::new(READ_CSV), Arc::new(READ_JSON), diff --git a/crates/sqlbuiltins/src/functions/table/object_store.rs b/crates/sqlbuiltins/src/functions/table/object_store.rs index b62decdb5..a61e5c53c 100644 --- a/crates/sqlbuiltins/src/functions/table/object_store.rs +++ b/crates/sqlbuiltins/src/functions/table/object_store.rs @@ -170,15 +170,15 @@ impl WithCompression for ParquetFormat { #[derive(Debug, Clone)] pub struct ObjScanTableFunc { /// Primary name for the function. - name: &'static str, + pub(super) name: &'static str, /// Additional aliases for this function. - aliases: &'static [&'static str], + pub(super) aliases: &'static [&'static str], - description: &'static str, - example: &'static str, + pub(super) description: &'static str, + pub(super) example: &'static str, - phantom: PhantomData, + pub(super) phantom: PhantomData, } impl BuiltinFunction for ObjScanTableFunc { diff --git a/crates/sqlbuiltins/src/functions/table/read_blob.rs b/crates/sqlbuiltins/src/functions/table/read_blob.rs new file mode 100644 index 000000000..d623adf87 --- /dev/null +++ b/crates/sqlbuiltins/src/functions/table/read_blob.rs @@ -0,0 +1,347 @@ +use std::any::Any; +use std::collections::HashMap; +use std::io::{Read, Seek, SeekFrom}; +use std::marker::PhantomData; +use std::sync::Arc; +use std::vec; + +use async_trait::async_trait; +use datafusion::arrow::array::{ + ArrayRef, + BinaryArray, + Int64Array, + RecordBatch, + StringArray, + TimestampNanosecondArray, +}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use datafusion::common::{FileType, Statistics}; +use datafusion::datasource::file_format::file_compression_type::FileCompressionType; +use datafusion::datasource::file_format::FileFormat; +use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream}; +use datafusion::error::{DataFusionError, Result as DatafusionResult}; +use datafusion::execution::context::SessionState; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PhysicalExpr}; +use datafusion_ext::errors::Result; +use datafusion_ext::functions::FuncParamValue; +use futures::stream::once; +use futures::{StreamExt, TryStreamExt}; +use object_store::{collect_bytes, GetOptions, ObjectMeta, ObjectStore}; +use once_cell::sync::Lazy; + +use super::object_store::{ObjScanTableFunc, OptionReader, WithCompression}; + +pub const READ_BLOB: ObjScanTableFunc = ObjScanTableFunc { + name: "read_blob", + aliases: &["read_binary"], + description: "reads from the selected source(s) to a binary blob.", + example: "SELECT size, content, filename FROM read_blob('./README.md')", + phantom: PhantomData, +}; + +#[derive(Debug, Clone, Copy)] +pub struct BlobOptionsReader; + +#[derive(Debug, Clone, Copy)] +pub struct BlobFormat { + file_compression_type: FileCompressionType, +} + +impl Default for BlobFormat { + fn default() -> Self { + Self { + file_compression_type: FileCompressionType::UNCOMPRESSED, + } + } +} + +impl BlobFormat { + pub fn with_file_compression_type( + mut self, + file_compression_type: FileCompressionType, + ) -> Self { + self.file_compression_type = file_compression_type; + self + } +} + +static BLOB_SCHEMA: Lazy = Lazy::new(|| { + Arc::new(Schema::new(vec![ + Field::new("filename", DataType::Utf8, true), + Field::new("content", DataType::Binary, true), + Field::new("size", DataType::Int64, true), + Field::new( + "last_modified", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + ])) +}); + +#[derive(Debug)] +struct ReadBlobExec { + base_config: FileScanConfig, + file_compression_type: FileCompressionType, + projected_schema: SchemaRef, + projected_output_ordering: Vec, + projected_statistics: Statistics, + metrics: ExecutionPlanMetricsSet, +} + +impl ReadBlobExec { + pub fn new(base_config: FileScanConfig, file_compression_type: FileCompressionType) -> Self { + let (projected_schema, projected_statistics, projected_output_ordering) = + base_config.project(); + + Self { + base_config, + file_compression_type, + projected_schema, + projected_output_ordering, + projected_statistics, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +struct BlobOpener { + object_store: Arc, + projected_schema: SchemaRef, + file_compression_type: FileCompressionType, +} + +impl BlobOpener { + pub fn new( + object_store: Arc, + projected_schema: SchemaRef, + file_compression_type: FileCompressionType, + ) -> Self { + Self { + object_store, + projected_schema, + file_compression_type, + } + } +} + +impl FileOpener for BlobOpener { + fn open( + &self, + file_meta: datafusion::datasource::physical_plan::FileMeta, + ) -> DatafusionResult { + let store = self.object_store.clone(); + let schema = self.projected_schema.clone(); + let file_compression_type = self.file_compression_type; + + Ok(Box::pin(async move { + let options = GetOptions::default(); + let result = store.get_opts(file_meta.location(), options).await?; + + // We build up the columns with their index in the schema + // The index is needed to retain the order of the projected schema + // Such as `select filename, size from read_blob(...)` -> [filename, size] + // instead of the default [size, filename], which is what we'd output without reordering. + let mut columns = Vec::new(); + if let Some((idx, _)) = schema.column_with_name("size") { + columns.push(( + idx, + Arc::new(Int64Array::from(vec![result.meta.size as i64])) as ArrayRef, + )); + } + + if let Some((idx, _)) = schema.column_with_name("last_modified") { + columns.push(( + idx, + Arc::new(TimestampNanosecondArray::from_vec( + vec![result.meta.last_modified.timestamp_nanos()], + None, + )), + )); + } + + if let Some((idx, _)) = schema.column_with_name("filename") { + columns.push(( + idx, + Arc::new(StringArray::from(vec![result.meta.location.to_string()])), + )); + } + + if let Some((idx, _)) = schema.column_with_name("content") { + let len = result.range.end - result.range.start; + match result.payload { + object_store::GetResultPayload::File(mut file, _) => { + let mut bytes = match file_meta.range { + None => file_compression_type.convert_read(file)?, + Some(_) => { + file.seek(SeekFrom::Start(result.range.start as _))?; + let limit = result.range.end - result.range.start; + file_compression_type.convert_read(file.take(limit as u64))? + } + }; + let mut data = Vec::new(); + bytes.read_to_end(&mut data)?; + + columns.push((idx, Arc::new(BinaryArray::from_vec(vec![&data])))); + } + object_store::GetResultPayload::Stream(s) => { + let s = s.map_err(DataFusionError::from); + + let s = file_compression_type.convert_stream(s.boxed())?.fuse(); + let bytes = collect_bytes(s, Some(len)).await?; + columns.push((idx, Arc::new(BinaryArray::from_vec(vec![&bytes])))) + } + } + } + + // reorder the columns based on their index in the schema + columns.sort_by(|a, b| a.0.cmp(&b.0)); + + let batch = RecordBatch::try_new( + schema.clone(), + columns.into_iter().map(|(_, v)| v).collect(), + )?; + + let stream = once(async move { Ok(batch) }).boxed(); + Ok(stream) + })) + } +} + +impl DisplayAs for ReadBlobExec { + fn fmt_as( + &self, + t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "ReadBlobExec: ")?; + self.base_config.fmt_as(t, f) + } +} + +impl ExecutionPlan for ReadBlobExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.projected_output_ordering + .first() + .map(|ordering| ordering.as_slice()) + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DatafusionResult> { + if !children.is_empty() { + return Err(datafusion::error::DataFusionError::Plan( + "ReadBlobExec does not accept children".to_string(), + )); + } + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DatafusionResult { + let object_store = context + .runtime_env() + .object_store(&self.base_config.object_store_url)?; + + let opener = BlobOpener::new( + object_store, + self.projected_schema.clone(), + self.file_compression_type, + ); + + let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?; + + Ok(Box::pin(stream) as SendableRecordBatchStream) + } + + fn equivalence_properties(&self) -> EquivalenceProperties { + EquivalenceProperties::new_with_orderings(self.schema(), &self.projected_output_ordering) + } + + fn statistics(&self) -> DatafusionResult { + Ok(self.projected_statistics.clone()) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +#[async_trait] +impl FileFormat for BlobFormat { + fn as_any(&self) -> &dyn Any { + self + } + async fn infer_schema( + &self, + _state: &SessionState, + _store: &Arc, + _objects: &[ObjectMeta], + ) -> DatafusionResult { + Ok(BLOB_SCHEMA.clone()) + } + + async fn infer_stats( + &self, + _state: &SessionState, + _store: &Arc, + _table_schema: SchemaRef, + _object: &ObjectMeta, + ) -> DatafusionResult { + Ok(Statistics::new_unknown(BLOB_SCHEMA.as_ref())) + } + + async fn create_physical_plan( + &self, + _state: &SessionState, + conf: FileScanConfig, + _filters: Option<&Arc>, + ) -> DatafusionResult> { + Ok(Arc::new(ReadBlobExec::new( + conf, + self.file_compression_type, + ))) + } + + fn file_type(&self) -> FileType { + panic!("BlobFormat does not support file_type") + } +} + +impl OptionReader for BlobOptionsReader { + type Format = BlobFormat; + + const OPTIONS: &'static [(&'static str, DataType)] = &[]; + + fn read_options(_opts: &HashMap) -> Result { + Ok(BlobFormat::default()) + } +} + +impl WithCompression for BlobFormat { + fn with_compression(self, compression: FileCompressionType) -> Result { + Ok(self.with_file_compression_type(compression)) + } +} diff --git a/crates/sqlexec/Cargo.toml b/crates/sqlexec/Cargo.toml index 91831f17a..379db29f8 100644 --- a/crates/sqlexec/Cargo.toml +++ b/crates/sqlexec/Cargo.toml @@ -36,8 +36,8 @@ tonic = { workspace = true } serde = { workspace = true } reqwest = { workspace = true } url = { workspace = true } +bytes = { workspace = true } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] } -bytes = "1.6.0" tokio-postgres = "0.7.8" once_cell = "1.19.0" parking_lot = "0.12.1" diff --git a/testdata/blob/hello.txt b/testdata/blob/hello.txt new file mode 100644 index 000000000..95d09f2b1 --- /dev/null +++ b/testdata/blob/hello.txt @@ -0,0 +1 @@ +hello world \ No newline at end of file diff --git a/testdata/sqllogictests/functions/read_blob.slt b/testdata/sqllogictests/functions/read_blob.slt new file mode 100644 index 000000000..9d4b73eae --- /dev/null +++ b/testdata/sqllogictests/functions/read_blob.slt @@ -0,0 +1,25 @@ +query I +select CAST(content as text) from read_blob('./testdata/blob/hello.txt'); +---- +hello world + + +query T +select string_to_array(filename, '/')[-1] as filename from read_blob('testdata/parquet/*') order by filename asc +---- +userdata0.parquet +userdata1.parquet +userdata1.parquet.bz2 +userdata1.parquet.gz +userdata1.parquet.xz +userdata1.parquet.zst + + +# make sure the projections are working properly +statement ok +select size, last_modified, filename from read_blob('testdata/parquet/*'); + +# if the projections are working properly, then the order of the columns should not matter +statement ok +select filename, last_modified, size from read_blob('testdata/parquet/*'); + diff --git a/testdata/sqllogictests_object_store/gcs/read_blob.slt b/testdata/sqllogictests_object_store/gcs/read_blob.slt new file mode 100644 index 000000000..a8567f31f --- /dev/null +++ b/testdata/sqllogictests_object_store/gcs/read_blob.slt @@ -0,0 +1,15 @@ +statement ok +CREATE CREDENTIAL gcp_creds +PROVIDER gcp OPTIONS ( + service_account_key '${GCP_SERVICE_ACCOUNT_KEY}' +); + + +query T +select filename from read_blob('gs://${GCS_BUCKET_NAME}/multi_csv/**/*', gcp_creds); +---- +multi_csv/bikeshare_stations.csv +multi_csv/bikeshare_stations_2.csv + +statement ok +select * from read_blob('gs://${GCS_BUCKET_NAME}/multi_csv/**/*', gcp_creds);