diff --git a/README.md b/README.md index 21d4c3a9c..17402be11 100644 --- a/README.md +++ b/README.md @@ -227,7 +227,7 @@ DROP DATABASE my_pg; | Apache ORC | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | ➖ | | **Table Formats** | -- | -- | -- | -- | -- | -- | | Lance | ✅ | ✅ | ✅ | ✅ | ✅ | ➖ | -| Delta | ✅ | ✅ | 🚧 | ✅ | ✅ | ➖ | +| Delta | ✅ | ✅ | ✅ | ✅ | ✅ | ➖ | | Iceberg | ✅ | 🚧 | 🚧 | ✅ | ✅ | ➖ | ✅ = Supported diff --git a/crates/datasources/src/common/sink/delta.rs b/crates/datasources/src/common/sink/delta.rs new file mode 100644 index 000000000..a1b3bc2a3 --- /dev/null +++ b/crates/datasources/src/common/sink/delta.rs @@ -0,0 +1,138 @@ +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::common::Result as DfResult; +use datafusion::error::DataFusionError; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::insert::DataSink; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, SendableRecordBatchStream}; +use deltalake::operations::create::CreateBuilder; +use deltalake::operations::write::WriteBuilder; +use deltalake::storage::StorageOptions; +use futures::StreamExt; +use object_store::prefix::PrefixStore; +use url::Url; + +use crate::native::access::arrow_to_delta_safe; + + +/// Writes lance files to object storage. +#[derive(Debug, Clone)] +pub struct DeltaSink { + url: Url, + store: Arc, +} + +impl fmt::Display for DeltaSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "DeltaSink({})", self.url) + } +} + +impl DisplayAs for DeltaSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default => write!(f, "{self}"), + DisplayFormatType::Verbose => write!(f, "{self}"), + } + } +} + +impl DeltaSink { + pub fn new(store: Arc, url: Url) -> Self { + DeltaSink { url, store } + } + + async fn stream_into_inner(&self, stream: SendableRecordBatchStream) -> DfResult { + // using this prefix store, mirroring the way that we use it + // in the native module; looks like delta sort of expects the + // location to show up both in the store and in the + // logstore. Feels wrong, probably is, but things work as expected. + let obj_store = PrefixStore::new(self.store.clone(), self.url.path()); + + let store = deltalake::logstore::default_logstore( + Arc::new(obj_store), + &self.url.clone(), + &StorageOptions::default(), + ); + + // eventually this should share code + // with "create_table" in the native module. + let mut builder = CreateBuilder::new() + .with_save_mode(deltalake::protocol::SaveMode::ErrorIfExists) + .with_table_name( + self.url + .to_file_path() + .ok() + .ok_or_else(|| { + DataFusionError::Internal("could not resolve table path".to_string()) + })? + .file_name() + .ok_or_else(|| DataFusionError::Internal("missing table name".to_string()))? + .to_os_string() + .into_string() + .ok() + .ok_or_else(|| { + DataFusionError::Internal("could not resolve table path".to_string()) + })?, + ) + .with_log_store(store.clone()); + + // get resolve the schema; eventually this should share code + // with "create_table" in the native module. + for field in stream.schema().fields().into_iter() { + let col = arrow_to_delta_safe(field.data_type())?; + builder = builder.with_column( + field.name().clone(), + col.data_type, + field.is_nullable(), + col.metadata, + ); + } + + let table = builder.await?; + + let mut chunks = stream.chunks(32); + + let mut records: usize = 0; + + while let Some(batches) = chunks.next().await { + let batches: Result, _> = batches + .into_iter() + .map(|r| { + let _ = r.as_ref().map(|b| records += b.num_rows()); + r + }) + .collect(); + + WriteBuilder::new(table.log_store(), table.snapshot().ok().cloned()) + .with_input_batches(batches?.into_iter()) + .await?; + } + + + Ok(records as u64) + } +} + +#[async_trait] +impl DataSink for DeltaSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + + async fn write_all( + &self, + data: SendableRecordBatchStream, + _: &Arc, + ) -> DfResult { + self.stream_into_inner(data).await + } +} diff --git a/crates/datasources/src/common/sink/json.rs b/crates/datasources/src/common/sink/json.rs index dc1c384fb..e91c02d13 100644 --- a/crates/datasources/src/common/sink/json.rs +++ b/crates/datasources/src/common/sink/json.rs @@ -34,6 +34,12 @@ impl Default for JsonSinkOpts { } } +impl JsonSinkOpts { + pub fn with_array_format(array: bool) -> Self { + JsonSinkOpts { array } + } +} + #[derive(Debug)] pub struct JsonSink { store: Arc, diff --git a/crates/datasources/src/common/sink/lance.rs b/crates/datasources/src/common/sink/lance.rs index c7ee8c101..a9f8c1bb5 100644 --- a/crates/datasources/src/common/sink/lance.rs +++ b/crates/datasources/src/common/sink/lance.rs @@ -14,7 +14,6 @@ use futures::StreamExt; use lance::dataset::WriteMode; use lance::Dataset; use object_store::path::Path as ObjectPath; -use object_store::ObjectStore; pub type LanceWriteParams = lance::dataset::WriteParams; @@ -30,14 +29,13 @@ pub struct LanceSinkOpts { /// Writes lance files to object storage. #[derive(Debug, Clone)] pub struct LanceSink { - store: Arc, loc: ObjectPath, opts: LanceSinkOpts, } impl fmt::Display for LanceSink { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "LanceSink({}:{})", self.store, self.loc) + write!(f, "LanceSink({})", self.loc) } } @@ -51,13 +49,8 @@ impl DisplayAs for LanceSink { } impl LanceSink { - pub fn from_obj_store( - store: Arc, - loc: impl Into, - opts: LanceSinkOpts, - ) -> Self { + pub fn from_obj_store(loc: impl Into, opts: LanceSinkOpts) -> Self { LanceSink { - store, loc: loc.into(), opts, } @@ -80,6 +73,7 @@ impl LanceSink { ..Default::default() }; + let mut ds: Option = None; while let Some(batches) = chunks.next().await { diff --git a/crates/datasources/src/common/sink/mod.rs b/crates/datasources/src/common/sink/mod.rs index 9ee08909d..882c87336 100644 --- a/crates/datasources/src/common/sink/mod.rs +++ b/crates/datasources/src/common/sink/mod.rs @@ -1,5 +1,6 @@ pub mod bson; pub mod csv; +pub mod delta; pub mod json; pub mod lance; pub mod parquet; diff --git a/crates/datasources/src/native/access.rs b/crates/datasources/src/native/access.rs index c78267c94..459578ed7 100644 --- a/crates/datasources/src/native/access.rs +++ b/crates/datasources/src/native/access.rs @@ -99,14 +99,14 @@ impl LogStoreFactory for FakeStoreFactory { /// metadata for indicating the 'real' (original) type, for cases when /// downcasting occurs. #[derive(Debug)] -struct DeltaField { - data_type: DeltaDataType, - metadata: Option>, +pub struct DeltaField { + pub data_type: DeltaDataType, + pub metadata: Option>, } // Some datatypes get downgraded to a different type when they are stored in delta-lake. // So we add some metadata to the field to indicate that it needs to be converted back to the original type. -fn arrow_to_delta_safe(arrow_type: &DataType) -> DeltaResult { +pub fn arrow_to_delta_safe(arrow_type: &DataType) -> DeltaResult { match arrow_type { dtype @ DataType::Timestamp(_, tz) => { let delta_type = diff --git a/crates/protogen/src/metastore/types/options.rs b/crates/protogen/src/metastore/types/options.rs index cb98c3fa5..74bd882d0 100644 --- a/crates/protogen/src/metastore/types/options.rs +++ b/crates/protogen/src/metastore/types/options.rs @@ -2057,6 +2057,7 @@ pub enum CopyToFormatOptions { Csv(CopyToFormatOptionsCsv), Parquet(CopyToFormatOptionsParquet), Lance(CopyToFormatOptionsLance), + Delta(CopyToFormatOptionsDelta), Json(CopyToFormatOptionsJson), Bson(CopyToFormatOptionsBson), } @@ -2076,6 +2077,7 @@ impl CopyToFormatOptions { pub const JSON: &'static str = "json"; pub const BSON: &'static str = "bson"; pub const LANCE: &'static str = "lance"; + pub const DELTA: &'static str = "delta"; pub fn as_str(&self) -> &'static str { match self { @@ -2084,8 +2086,13 @@ impl CopyToFormatOptions { Self::Json(_) => Self::JSON, Self::Bson(_) => Self::BSON, Self::Lance(_) => Self::LANCE, + Self::Delta(_) => Self::DELTA, } } + + pub fn is_table(&self) -> bool { + matches!(self, Self::Delta(_) | Self::Lance(_)) + } } #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -2107,6 +2114,9 @@ pub struct CopyToFormatOptionsJson { #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct CopyToFormatOptionsBson {} +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct CopyToFormatOptionsDelta {} + #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct CopyToFormatOptionsLance { pub max_rows_per_file: Option, diff --git a/crates/protogen/src/sqlexec/copy_to.rs b/crates/protogen/src/sqlexec/copy_to.rs index a8765219b..8c4466064 100644 --- a/crates/protogen/src/sqlexec/copy_to.rs +++ b/crates/protogen/src/sqlexec/copy_to.rs @@ -80,6 +80,8 @@ pub enum CopyToFormatOptionsEnum { Lance(CopyToFormatOptionsLance), #[prost(message, tag = "5")] Bson(CopyToFormatOptionsBson), + #[prost(message, tag = "6")] + Delta(CopyToFormatOptionsDelta), } #[derive(Clone, PartialEq, Message)] @@ -117,6 +119,9 @@ pub struct CopyToFormatOptionsLance { #[derive(Clone, PartialEq, Message)] pub struct CopyToFormatOptionsBson {} +#[derive(Clone, PartialEq, Message)] +pub struct CopyToFormatOptionsDelta {} + impl TryFrom for CopyToFormatOptions { type Error = crate::errors::ProtoConvError; fn try_from( @@ -126,6 +131,9 @@ impl TryFrom for CopyToFo crate::metastore::types::options::CopyToFormatOptions::Bson(_) => { Ok(CopyToFormatOptions::default()) } + crate::metastore::types::options::CopyToFormatOptions::Delta(_) => { + Ok(CopyToFormatOptions::default()) + } crate::metastore::types::options::CopyToFormatOptions::Lance(opts) => { Ok(CopyToFormatOptions { copy_to_format_options_enum: Some(CopyToFormatOptionsEnum::Lance( @@ -179,14 +187,6 @@ impl TryFrom for crate::metastore::types::options::CopyToFo ))?; match value { - CopyToFormatOptionsEnum::Csv(csv) => { - Ok(crate::metastore::types::options::CopyToFormatOptions::Csv( - crate::metastore::types::options::CopyToFormatOptionsCsv { - delim: csv.delim as u8, - header: csv.header, - }, - )) - } CopyToFormatOptionsEnum::Lance(lance) => Ok( crate::metastore::types::options::CopyToFormatOptions::Lance( crate::metastore::types::options::CopyToFormatOptionsLance { @@ -197,6 +197,26 @@ impl TryFrom for crate::metastore::types::options::CopyToFo }, ), ), + CopyToFormatOptionsEnum::Delta(_) => Ok( + crate::metastore::types::options::CopyToFormatOptions::Delta( + crate::metastore::types::options::CopyToFormatOptionsDelta {}, + ), + ), + CopyToFormatOptionsEnum::Parquet(parquet) => Ok( + crate::metastore::types::options::CopyToFormatOptions::Parquet( + crate::metastore::types::options::CopyToFormatOptionsParquet { + row_group_size: parquet.row_group_size as usize, + }, + ), + ), + CopyToFormatOptionsEnum::Csv(csv) => { + Ok(crate::metastore::types::options::CopyToFormatOptions::Csv( + crate::metastore::types::options::CopyToFormatOptionsCsv { + delim: csv.delim as u8, + header: csv.header, + }, + )) + } CopyToFormatOptionsEnum::Json(json) => { Ok(crate::metastore::types::options::CopyToFormatOptions::Json( crate::metastore::types::options::CopyToFormatOptionsJson { array: json.array }, @@ -207,13 +227,6 @@ impl TryFrom for crate::metastore::types::options::CopyToFo crate::metastore::types::options::CopyToFormatOptionsBson {}, )) } - CopyToFormatOptionsEnum::Parquet(parquet) => Ok( - crate::metastore::types::options::CopyToFormatOptions::Parquet( - crate::metastore::types::options::CopyToFormatOptionsParquet { - row_group_size: parquet.row_group_size as usize, - }, - ), - ), } } } diff --git a/crates/sqlexec/src/planner/physical_plan/copy_to.rs b/crates/sqlexec/src/planner/physical_plan/copy_to.rs index a7f3c3e5f..d8b333d35 100644 --- a/crates/sqlexec/src/planner/physical_plan/copy_to.rs +++ b/crates/sqlexec/src/planner/physical_plan/copy_to.rs @@ -23,6 +23,7 @@ use datafusion::physical_plan::{ }; use datasources::common::sink::bson::BsonSink; use datasources::common::sink::csv::{CsvSink, CsvSinkOpts}; +use datasources::common::sink::delta::DeltaSink; use datasources::common::sink::json::{JsonSink, JsonSinkOpts}; use datasources::common::sink::lance::{LanceSink, LanceSinkOpts, LanceWriteParams}; use datasources::common::sink::parquet::{ParquetSink, ParquetSinkOpts}; @@ -93,7 +94,7 @@ impl ExecutionPlan for CopyToExec { } let this = self.clone(); - let stream = stream::once(this.copy_to(context)); + let stream = stream::once(async move { this.copy_to(context).await }); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -140,32 +141,24 @@ impl DisplayAs for CopyToExec { } impl CopyToExec { - async fn copy_to(self, context: Arc) -> DataFusionResult { - let sink = match (self.dest, self.format) { - (CopyToDestinationOptions::Local(local_options), CopyToFormatOptions::Lance(opts)) => { - get_sink_for_obj( - CopyToFormatOptions::Lance(opts), - &LocalStoreAccess {}, - &local_options.location, - )? - } - (CopyToDestinationOptions::Local(local_options), format) => { - { + async fn get_destination(&self) -> DataFusionResult<(Arc, String)> { + Ok(match self.dest.clone() { + CopyToDestinationOptions::Local(local_options) => { + if !self.format.is_table() { // Create the path if it doesn't exist (for local). let _ = tokio::fs::File::create(&local_options.location).await?; } - let access = LocalStoreAccess; - get_sink_for_obj(format, &access, &local_options.location)? + (Arc::new(LocalStoreAccess), local_options.location) } - (CopyToDestinationOptions::Gcs(gcs_options), format) => { + CopyToDestinationOptions::Gcs(gcs_options) => { let access = GcsStoreAccess { bucket: gcs_options.bucket, service_account_key: gcs_options.service_account_key, opts: HashMap::new(), }; - get_sink_for_obj(format, &access, &gcs_options.location)? + (Arc::new(access), gcs_options.location) } - (CopyToDestinationOptions::S3(s3_options), format) => { + CopyToDestinationOptions::S3(s3_options) => { let access = S3StoreAccess { bucket: s3_options.bucket, region: Some(s3_options.region), @@ -173,20 +166,42 @@ impl CopyToExec { secret_access_key: s3_options.secret_access_key, opts: HashMap::new(), }; - get_sink_for_obj(format, &access, &s3_options.location)? + (Arc::new(access), s3_options.location) } - (CopyToDestinationOptions::Azure(azure_options), format) => { + CopyToDestinationOptions::Azure(azure_options) => { let access = AzureStoreAccess { container: azure_options.container, account_name: Some(azure_options.account), access_key: Some(azure_options.access_key), opts: HashMap::new(), }; - get_sink_for_obj(format, &access, &azure_options.location)? + (Arc::new(access), azure_options.location) + } + }) + } + + async fn copy_to(&self, context: Arc) -> DataFusionResult { + let sink = match (self.dest.clone(), self.format.clone()) { + (CopyToDestinationOptions::Local(local_options), CopyToFormatOptions::Lance(opts)) => { + get_sink_for_obj( + CopyToFormatOptions::Lance(opts), + Arc::new(LocalStoreAccess {}), + &local_options.location, + )? + } + (_, CopyToFormatOptions::Lance(_)) => { + return Err(DataFusionError::Execution( + "COPY TO for lance format is only supported locally".to_string(), + )) + } + (_, format) => { + let (access, loc) = self.get_destination().await?; + + get_sink_for_obj(format, access.clone(), &loc)? } }; - let stream = execute_stream(self.source, context.clone())?; + let stream = execute_stream(self.source.clone(), context.clone())?; let count = sink.write_all(stream, &context).await?; Ok(new_operation_with_count_batch("copy", count)) @@ -196,7 +211,7 @@ impl CopyToExec { /// Get a sink for writing a file to. fn get_sink_for_obj( format: CopyToFormatOptions, - access: &dyn ObjStoreAccess, + access: Arc, location: &str, ) -> DataFusionResult> { let store = access @@ -227,7 +242,6 @@ fn get_sink_for_obj( let wp = LanceWriteParams::default(); Box::new(LanceSink::from_obj_store( - store, path, LanceSinkOpts { url: Some( @@ -246,14 +260,24 @@ fn get_sink_for_obj( }, )) } + CopyToFormatOptions::Delta(_) => Box::new(DeltaSink::new( + store, + url::Url::parse( + access + .base_url() + .map_err(|e| DataFusionError::External(Box::new(e)))? + .as_str(), + ) + .map_err(|e| DataFusionError::External(Box::new(e)))? + .join(location) + .map_err(|e| DataFusionError::External(Box::new(e)))?, + )), + CopyToFormatOptions::Bson(_) => Box::new(BsonSink::from_obj_store(store, path)), CopyToFormatOptions::Json(json_opts) => Box::new(JsonSink::from_obj_store( store, path, - JsonSinkOpts { - array: json_opts.array, - }, + JsonSinkOpts::with_array_format(json_opts.array), )), - CopyToFormatOptions::Bson(_) => Box::new(BsonSink::from_obj_store(store, path)), }; Ok(sink) } diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index daf774644..8a7c0e804 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -89,6 +89,7 @@ use protogen::metastore::types::options::{ CopyToFormatOptions, CopyToFormatOptionsBson, CopyToFormatOptionsCsv, + CopyToFormatOptionsDelta, CopyToFormatOptionsJson, CopyToFormatOptionsLance, CopyToFormatOptionsParquet, @@ -2040,6 +2041,9 @@ impl<'a> SessionPlanner<'a> { input_batch_size: m.remove_optional("input_batch_size")?, }) } + Some(CopyToFormatOptions::DELTA) => { + CopyToFormatOptions::Delta(CopyToFormatOptionsDelta {}) + } Some(other) => return Err(internal!("unsupported output format: {other}")), }; diff --git a/testdata/sqllogictests_object_store/local/delta.slt b/testdata/sqllogictests_object_store/local/delta.slt index 51fba23a3..021f7dc81 100644 --- a/testdata/sqllogictests_object_store/local/delta.slt +++ b/testdata/sqllogictests_object_store/local/delta.slt @@ -12,6 +12,18 @@ select * from delta_local order by a; 1 hello 2 world +# for all cases, there MUST NOT be a delta table at this location or its an error. +# for local cases the directory MUST exist and NOT have an existing delta table. + +statement ok +copy (select * from delta_local) to 'file://${TMP}' format delta; + +query IT +select * from read_delta('${TMP}') order by a; +---- +1 hello +2 world + statement ok ALTER TABLE delta_local SET ACCESS_MODE TO READ_WRITE; diff --git a/testdata/sqllogictests_object_store/local/lance.slt b/testdata/sqllogictests_object_store/local/lance.slt index 8c2e280c6..3417e479d 100644 --- a/testdata/sqllogictests_object_store/local/lance.slt +++ b/testdata/sqllogictests_object_store/local/lance.slt @@ -28,7 +28,7 @@ select * from lance_scan('file://${TMP}') order by point.lat; [0.2, 1.8] {lat:42.1,long:-74.1} [1.1, 1.2] {lat:45.5,long:-122.7} -# alias +# alias query IT select * from read_lance('file://${TMP}') order by point.lat; ----