Skip to content

Commit

Permalink
feat: COPY TO support for delta (#3037)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Jun 26, 2024
1 parent ea93654 commit 6b9ff66
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 57 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ DROP DATABASE my_pg;
| Apache ORC | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 ||
| **Table Formats** | -- | -- | -- | -- | -- | -- |
| Lance |||||||
| Delta ||| 🚧 ||||
| Delta ||| ||||
| Iceberg || 🚧 | 🚧 ||||
✅ = Supported
Expand Down
138 changes: 138 additions & 0 deletions crates/datasources/src/common/sink/delta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::any::Any;
use std::fmt;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::common::Result as DfResult;
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
use deltalake::operations::create::CreateBuilder;
use deltalake::operations::write::WriteBuilder;
use deltalake::storage::StorageOptions;
use futures::StreamExt;
use object_store::prefix::PrefixStore;
use url::Url;

use crate::native::access::arrow_to_delta_safe;


/// Writes lance files to object storage.
#[derive(Debug, Clone)]
pub struct DeltaSink {
url: Url,
store: Arc<dyn object_store::ObjectStore>,
}

impl fmt::Display for DeltaSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DeltaSink({})", self.url)
}
}

impl DisplayAs for DeltaSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default => write!(f, "{self}"),
DisplayFormatType::Verbose => write!(f, "{self}"),
}
}
}

impl DeltaSink {
pub fn new(store: Arc<dyn object_store::ObjectStore>, url: Url) -> Self {
DeltaSink { url, store }
}

async fn stream_into_inner(&self, stream: SendableRecordBatchStream) -> DfResult<u64> {
// using this prefix store, mirroring the way that we use it
// in the native module; looks like delta sort of expects the
// location to show up both in the store and in the
// logstore. Feels wrong, probably is, but things work as expected.
let obj_store = PrefixStore::new(self.store.clone(), self.url.path());

let store = deltalake::logstore::default_logstore(
Arc::new(obj_store),
&self.url.clone(),
&StorageOptions::default(),
);

// eventually this should share code
// with "create_table" in the native module.
let mut builder = CreateBuilder::new()
.with_save_mode(deltalake::protocol::SaveMode::ErrorIfExists)
.with_table_name(
self.url
.to_file_path()
.ok()
.ok_or_else(|| {
DataFusionError::Internal("could not resolve table path".to_string())
})?
.file_name()
.ok_or_else(|| DataFusionError::Internal("missing table name".to_string()))?
.to_os_string()
.into_string()
.ok()
.ok_or_else(|| {
DataFusionError::Internal("could not resolve table path".to_string())
})?,
)
.with_log_store(store.clone());

// get resolve the schema; eventually this should share code
// with "create_table" in the native module.
for field in stream.schema().fields().into_iter() {
let col = arrow_to_delta_safe(field.data_type())?;
builder = builder.with_column(
field.name().clone(),
col.data_type,
field.is_nullable(),
col.metadata,
);
}

let table = builder.await?;

let mut chunks = stream.chunks(32);

let mut records: usize = 0;

while let Some(batches) = chunks.next().await {
let batches: Result<Vec<_>, _> = batches
.into_iter()
.map(|r| {
let _ = r.as_ref().map(|b| records += b.num_rows());
r
})
.collect();

WriteBuilder::new(table.log_store(), table.snapshot().ok().cloned())
.with_input_batches(batches?.into_iter())
.await?;
}


Ok(records as u64)
}
}

#[async_trait]
impl DataSink for DeltaSink {
fn as_any(&self) -> &dyn Any {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

async fn write_all(
&self,
data: SendableRecordBatchStream,
_: &Arc<TaskContext>,
) -> DfResult<u64> {
self.stream_into_inner(data).await
}
}
6 changes: 6 additions & 0 deletions crates/datasources/src/common/sink/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ impl Default for JsonSinkOpts {
}
}

impl JsonSinkOpts {
pub fn with_array_format(array: bool) -> Self {
JsonSinkOpts { array }
}
}

#[derive(Debug)]
pub struct JsonSink {
store: Arc<dyn ObjectStore>,
Expand Down
12 changes: 3 additions & 9 deletions crates/datasources/src/common/sink/lance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use futures::StreamExt;
use lance::dataset::WriteMode;
use lance::Dataset;
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore;

pub type LanceWriteParams = lance::dataset::WriteParams;

Expand All @@ -30,14 +29,13 @@ pub struct LanceSinkOpts {
/// Writes lance files to object storage.
#[derive(Debug, Clone)]
pub struct LanceSink {
store: Arc<dyn ObjectStore>,
loc: ObjectPath,
opts: LanceSinkOpts,
}

impl fmt::Display for LanceSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LanceSink({}:{})", self.store, self.loc)
write!(f, "LanceSink({})", self.loc)
}
}

Expand All @@ -51,13 +49,8 @@ impl DisplayAs for LanceSink {
}

impl LanceSink {
pub fn from_obj_store(
store: Arc<dyn ObjectStore>,
loc: impl Into<ObjectPath>,
opts: LanceSinkOpts,
) -> Self {
pub fn from_obj_store(loc: impl Into<ObjectPath>, opts: LanceSinkOpts) -> Self {
LanceSink {
store,
loc: loc.into(),
opts,
}
Expand All @@ -80,6 +73,7 @@ impl LanceSink {
..Default::default()
};


let mut ds: Option<Dataset> = None;

while let Some(batches) = chunks.next().await {
Expand Down
1 change: 1 addition & 0 deletions crates/datasources/src/common/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod bson;
pub mod csv;
pub mod delta;
pub mod json;
pub mod lance;
pub mod parquet;
Expand Down
8 changes: 4 additions & 4 deletions crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ impl LogStoreFactory for FakeStoreFactory {
/// metadata for indicating the 'real' (original) type, for cases when
/// downcasting occurs.
#[derive(Debug)]
struct DeltaField {
data_type: DeltaDataType,
metadata: Option<HashMap<String, Value>>,
pub struct DeltaField {
pub data_type: DeltaDataType,
pub metadata: Option<HashMap<String, Value>>,
}

// Some datatypes get downgraded to a different type when they are stored in delta-lake.
// So we add some metadata to the field to indicate that it needs to be converted back to the original type.
fn arrow_to_delta_safe(arrow_type: &DataType) -> DeltaResult<DeltaField> {
pub fn arrow_to_delta_safe(arrow_type: &DataType) -> DeltaResult<DeltaField> {
match arrow_type {
dtype @ DataType::Timestamp(_, tz) => {
let delta_type =
Expand Down
10 changes: 10 additions & 0 deletions crates/protogen/src/metastore/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,7 @@ pub enum CopyToFormatOptions {
Csv(CopyToFormatOptionsCsv),
Parquet(CopyToFormatOptionsParquet),
Lance(CopyToFormatOptionsLance),
Delta(CopyToFormatOptionsDelta),
Json(CopyToFormatOptionsJson),
Bson(CopyToFormatOptionsBson),
}
Expand All @@ -2076,6 +2077,7 @@ impl CopyToFormatOptions {
pub const JSON: &'static str = "json";
pub const BSON: &'static str = "bson";
pub const LANCE: &'static str = "lance";
pub const DELTA: &'static str = "delta";

pub fn as_str(&self) -> &'static str {
match self {
Expand All @@ -2084,8 +2086,13 @@ impl CopyToFormatOptions {
Self::Json(_) => Self::JSON,
Self::Bson(_) => Self::BSON,
Self::Lance(_) => Self::LANCE,
Self::Delta(_) => Self::DELTA,
}
}

pub fn is_table(&self) -> bool {
matches!(self, Self::Delta(_) | Self::Lance(_))
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
Expand All @@ -2107,6 +2114,9 @@ pub struct CopyToFormatOptionsJson {
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct CopyToFormatOptionsBson {}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct CopyToFormatOptionsDelta {}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct CopyToFormatOptionsLance {
pub max_rows_per_file: Option<usize>,
Expand Down
43 changes: 28 additions & 15 deletions crates/protogen/src/sqlexec/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub enum CopyToFormatOptionsEnum {
Lance(CopyToFormatOptionsLance),
#[prost(message, tag = "5")]
Bson(CopyToFormatOptionsBson),
#[prost(message, tag = "6")]
Delta(CopyToFormatOptionsDelta),
}

#[derive(Clone, PartialEq, Message)]
Expand Down Expand Up @@ -117,6 +119,9 @@ pub struct CopyToFormatOptionsLance {
#[derive(Clone, PartialEq, Message)]
pub struct CopyToFormatOptionsBson {}

#[derive(Clone, PartialEq, Message)]
pub struct CopyToFormatOptionsDelta {}

impl TryFrom<crate::metastore::types::options::CopyToFormatOptions> for CopyToFormatOptions {
type Error = crate::errors::ProtoConvError;
fn try_from(
Expand All @@ -126,6 +131,9 @@ impl TryFrom<crate::metastore::types::options::CopyToFormatOptions> for CopyToFo
crate::metastore::types::options::CopyToFormatOptions::Bson(_) => {
Ok(CopyToFormatOptions::default())
}
crate::metastore::types::options::CopyToFormatOptions::Delta(_) => {
Ok(CopyToFormatOptions::default())
}
crate::metastore::types::options::CopyToFormatOptions::Lance(opts) => {
Ok(CopyToFormatOptions {
copy_to_format_options_enum: Some(CopyToFormatOptionsEnum::Lance(
Expand Down Expand Up @@ -179,14 +187,6 @@ impl TryFrom<CopyToFormatOptions> for crate::metastore::types::options::CopyToFo
))?;

match value {
CopyToFormatOptionsEnum::Csv(csv) => {
Ok(crate::metastore::types::options::CopyToFormatOptions::Csv(
crate::metastore::types::options::CopyToFormatOptionsCsv {
delim: csv.delim as u8,
header: csv.header,
},
))
}
CopyToFormatOptionsEnum::Lance(lance) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Lance(
crate::metastore::types::options::CopyToFormatOptionsLance {
Expand All @@ -197,6 +197,26 @@ impl TryFrom<CopyToFormatOptions> for crate::metastore::types::options::CopyToFo
},
),
),
CopyToFormatOptionsEnum::Delta(_) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Delta(
crate::metastore::types::options::CopyToFormatOptionsDelta {},
),
),
CopyToFormatOptionsEnum::Parquet(parquet) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Parquet(
crate::metastore::types::options::CopyToFormatOptionsParquet {
row_group_size: parquet.row_group_size as usize,
},
),
),
CopyToFormatOptionsEnum::Csv(csv) => {
Ok(crate::metastore::types::options::CopyToFormatOptions::Csv(
crate::metastore::types::options::CopyToFormatOptionsCsv {
delim: csv.delim as u8,
header: csv.header,
},
))
}
CopyToFormatOptionsEnum::Json(json) => {
Ok(crate::metastore::types::options::CopyToFormatOptions::Json(
crate::metastore::types::options::CopyToFormatOptionsJson { array: json.array },
Expand All @@ -207,13 +227,6 @@ impl TryFrom<CopyToFormatOptions> for crate::metastore::types::options::CopyToFo
crate::metastore::types::options::CopyToFormatOptionsBson {},
))
}
CopyToFormatOptionsEnum::Parquet(parquet) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Parquet(
crate::metastore::types::options::CopyToFormatOptionsParquet {
row_group_size: parquet.row_group_size as usize,
},
),
),
}
}
}
Expand Down
Loading

0 comments on commit 6b9ff66

Please sign in to comment.