Skip to content

Commit

Permalink
feat: copy to support for lance (#2342)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Jan 18, 2024
1 parent 7a5b196 commit 9323041
Show file tree
Hide file tree
Showing 16 changed files with 468 additions and 22 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ DROP DATABASE my_pg;
| Newline Delimited JSON ||\* ||||
| Apache Parquet ||\* ||||
| BSON ||\* ||||
| Lance ||\* ||||
| Delta || 🚧 ||||
| Iceberg || 🚧 ||||
| Lance || 🚧 ||||
| Microsoft Excel || 🚧 || 🚧 ||
| JSON | 🚧 | 🚧 | 🚧 | 🚧 ||
| Apache Avro | 🚧 | 🚧 | 🚧 | 🚧 ||
Expand Down
126 changes: 126 additions & 0 deletions crates/datasources/src/common/sink/lance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::fmt;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatchIterator;
use datafusion::common::Result as DfResult;
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::DisplayAs;
use datafusion::physical_plan::{DisplayFormatType, SendableRecordBatchStream};
use futures::StreamExt;
use lance::dataset::WriteMode;
use lance::Dataset;
use object_store::{path::Path as ObjectPath, ObjectStore};

pub type LanceWriteParams = lance::dataset::WriteParams;

#[derive(Debug, Clone)]
pub struct LanceSinkOpts {
pub url: Option<url::Url>,
pub max_rows_per_file: usize,
pub max_rows_per_group: usize,
pub max_bytes_per_file: usize,
pub input_batch_size: usize,
}

/// Writes lance files to object storage.
#[derive(Debug, Clone)]
pub struct LanceSink {
store: Arc<dyn ObjectStore>,
loc: ObjectPath,
opts: LanceSinkOpts,
}

impl fmt::Display for LanceSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LanceSink({}:{})", self.store, self.loc)
}
}

impl DisplayAs for LanceSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default => write!(f, "{self}"),
DisplayFormatType::Verbose => write!(f, "{self}"),
}
}
}

impl LanceSink {
pub fn from_obj_store(
store: Arc<dyn ObjectStore>,
loc: impl Into<ObjectPath>,
opts: LanceSinkOpts,
) -> Self {
LanceSink {
store,
loc: loc.into(),
opts,
}
}

async fn stream_into_inner(
&self,
stream: SendableRecordBatchStream,
mut ds: Option<Dataset>,
) -> DfResult<Option<Dataset>> {
let table = match self.opts.url.clone() {
Some(opts_url) => opts_url.join(self.loc.as_ref()),
None => url::Url::parse(self.loc.as_ref()),
}
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let schema = stream.schema().clone();
let mut chunks = stream.chunks(32);
let write_opts = LanceWriteParams {
mode: WriteMode::Overwrite,
..Default::default()
};

while let Some(batches) = chunks.next().await {
let batch_iter =
RecordBatchIterator::new(batches.into_iter().map(|item| Ok(item?)), schema.clone());

match ds.clone() {
Some(mut d) => {
d.append(batch_iter, Some(write_opts.clone())).await?;
}
None => {
ds.replace(
Dataset::write(batch_iter, table.as_str(), Some(write_opts.clone()))
.await?,
);
}
}
}

Ok(ds)
}
}

#[async_trait]
impl DataSink for LanceSink {
// the dataset is the handle to the lance database.
//
// there's no way to construct an empty dataset except by writing
// to it, so we pass this optional wrapped dataset to this method,
// if it's none, we create a new one, and if it's not we use the
// dataset we constructed before from the optional, and return it,
// and pass it into the next call.
async fn write_all(
&self,
data: Vec<SendableRecordBatchStream>,
_context: &Arc<TaskContext>,
) -> DfResult<u64> {
let mut ds: Option<Dataset> = None;
for stream in data {
ds = self.stream_into_inner(stream, ds).await?;
}
match ds {
Some(ds) => Ok(ds.count_rows().await? as u64),
None => Ok(0),
}
}
}
1 change: 1 addition & 0 deletions crates/datasources/src/common/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod bson;
pub mod csv;
pub mod json;
pub mod lance;
pub mod parquet;

use std::io::{self, Write};
Expand Down
5 changes: 2 additions & 3 deletions crates/datasources/src/lance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use lance::{dataset::builder::DatasetBuilder, Dataset};
use protogen::metastore::types::options::StorageOptions;

pub async fn scan_lance_table(location: &str, options: StorageOptions) -> Result<Dataset> {
DatasetBuilder::from_uri(location)
Ok(DatasetBuilder::from_uri(location)
.with_storage_options(options.inner.into_iter().collect())
.load()
.await
.map_err(|e| e.into())
.await?)
}
11 changes: 11 additions & 0 deletions crates/protogen/src/metastore/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,7 @@ pub struct CopyToDestinationOptionsAzure {
pub enum CopyToFormatOptions {
Csv(CopyToFormatOptionsCsv),
Parquet(CopyToFormatOptionsParquet),
Lance(CopyToFormatOptionsLance),
Json(CopyToFormatOptionsJson),
Bson,
}
Expand All @@ -1554,13 +1555,15 @@ impl CopyToFormatOptions {
pub const PARQUET: &'static str = "parquet";
pub const JSON: &'static str = "json";
pub const BSON: &'static str = "bson";
pub const LANCE: &'static str = "lance";

pub fn as_str(&self) -> &'static str {
match self {
Self::Csv(_) => Self::CSV,
Self::Parquet(_) => Self::PARQUET,
Self::Json(_) => Self::JSON,
Self::Bson => Self::BSON,
Self::Lance(_) => Self::LANCE,
}
}
}
Expand All @@ -1580,3 +1583,11 @@ pub struct CopyToFormatOptionsParquet {
pub struct CopyToFormatOptionsJson {
pub array: bool,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct CopyToFormatOptionsLance {
pub max_rows_per_file: Option<usize>,
pub max_rows_per_group: Option<usize>,
pub max_bytes_per_file: Option<usize>,
pub input_batch_size: Option<usize>,
}
40 changes: 39 additions & 1 deletion crates/protogen/src/sqlexec/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub enum CopyToFormatOptionsEnum {
Json(CopyToFormatOptionsJson),
#[prost(message, tag = "3")]
Parquet(CopyToFormatOptionsParquet),
#[prost(message, tag = "4")]
Lance(CopyToFormatOptionsLance),
}

#[derive(Clone, PartialEq, Message)]
Expand All @@ -96,6 +98,21 @@ pub struct CopyToFormatOptionsParquet {
pub row_group_size: u64,
}

#[derive(Clone, PartialEq, Message)]
pub struct CopyToFormatOptionsLance {
#[prost(uint64, optional, tag = "1")]
pub max_rows_per_file: Option<u64>,
#[prost(uint64, optional, tag = "2")]
pub max_rows_per_group: Option<u64>,
#[prost(uint64, optional, tag = "3")]
pub max_bytes_per_file: Option<u64>,
#[prost(uint64, optional, tag = "4")]
pub input_batch_size: Option<u64>,
}

#[derive(Clone, PartialEq, Message)]
pub struct CopyToFormatOptionsBson {}

impl TryFrom<crate::metastore::types::options::CopyToFormatOptions> for CopyToFormatOptions {
type Error = crate::errors::ProtoConvError;
fn try_from(
Expand All @@ -105,6 +122,18 @@ impl TryFrom<crate::metastore::types::options::CopyToFormatOptions> for CopyToFo
crate::metastore::types::options::CopyToFormatOptions::Bson => {
Ok(CopyToFormatOptions::default())
}
crate::metastore::types::options::CopyToFormatOptions::Lance(opts) => {
Ok(CopyToFormatOptions {
copy_to_format_options_enum: Some(CopyToFormatOptionsEnum::Lance(
CopyToFormatOptionsLance {
max_rows_per_file: opts.max_rows_per_file.map(|v| v as u64),
max_rows_per_group: opts.max_rows_per_group.map(|v| v as u64),
max_bytes_per_file: opts.max_bytes_per_file.map(|v| v as u64),
input_batch_size: opts.input_batch_size.map(|v| v as u64),
},
)),
})
}
crate::metastore::types::options::CopyToFormatOptions::Csv(csv) => {
Ok(CopyToFormatOptions {
copy_to_format_options_enum: Some(CopyToFormatOptionsEnum::Csv(
Expand Down Expand Up @@ -154,12 +183,21 @@ impl TryFrom<CopyToFormatOptions> for crate::metastore::types::options::CopyToFo
},
))
}
CopyToFormatOptionsEnum::Lance(lance) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Lance(
crate::metastore::types::options::CopyToFormatOptionsLance {
max_rows_per_file: lance.max_rows_per_file.map(|v| v as usize),
max_rows_per_group: lance.max_rows_per_group.map(|v| v as usize),
max_bytes_per_file: lance.max_rows_per_group.map(|v| v as usize),
input_batch_size: lance.input_batch_size.map(|v| v as usize),
},
),
),
CopyToFormatOptionsEnum::Json(json) => {
Ok(crate::metastore::types::options::CopyToFormatOptions::Json(
crate::metastore::types::options::CopyToFormatOptionsJson { array: json.array },
))
}

CopyToFormatOptionsEnum::Parquet(parquet) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Parquet(
crate::metastore::types::options::CopyToFormatOptionsParquet {
Expand Down
11 changes: 11 additions & 0 deletions crates/sqlexec/src/parser/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ impl ParseOptionValue<String> for OptionValue {
}
}

impl ParseOptionValue<Vec<String>> for OptionValue {
fn parse_opt(self) -> Result<Vec<String>, ParserError> {
match self {
Self::QuotedLiteral(s) | Self::UnquotedLiteral(s) => {
Ok(s.split(',').map(|s| s.to_string()).collect())
}
o => Err(unexpected_type_err!("string slice", o)),
}
}
}

impl ParseOptionValue<bool> for OptionValue {
fn parse_opt(self) -> Result<bool, ParserError> {
let opt = match self {
Expand Down
32 changes: 32 additions & 0 deletions crates/sqlexec/src/planner/physical_plan/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion_ext::metrics::WriteOnlyDataSourceMetricsExecAdapter;
use datasources::common::sink::bson::BsonSink;
use datasources::common::sink::csv::{CsvSink, CsvSinkOpts};
use datasources::common::sink::json::{JsonSink, JsonSinkOpts};
use datasources::common::sink::lance::{LanceSink, LanceSinkOpts, LanceWriteParams};
use datasources::common::sink::parquet::{ParquetSink, ParquetSinkOpts};
use datasources::common::url::DatasourceUrl;
use datasources::object_store::gcs::GcsStoreAccess;
Expand Down Expand Up @@ -106,6 +107,13 @@ impl DisplayAs for CopyToExec {
impl CopyToExec {
async fn copy_to(self, context: Arc<TaskContext>) -> DataFusionResult<RecordBatch> {
let sink = match (self.dest, self.format) {
(CopyToDestinationOptions::Local(local_options), CopyToFormatOptions::Lance(opts)) => {
get_sink_for_obj(
CopyToFormatOptions::Lance(opts),
&LocalStoreAccess {},
&local_options.location,
)?
}
(CopyToDestinationOptions::Local(local_options), format) => {
{
// Create the path if it doesn't exist (for local).
Expand Down Expand Up @@ -177,6 +185,7 @@ fn get_sink_for_obj(
let store = access
.create_store()
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let path = access
.path(location)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Expand All @@ -197,6 +206,29 @@ fn get_sink_for_obj(
row_group_size: parquet_opts.row_group_size,
},
)),
CopyToFormatOptions::Lance(opts) => {
let wp = LanceWriteParams::default();

Box::new(LanceSink::from_obj_store(
store,
path,
LanceSinkOpts {
url: Some(
url::Url::parse(
access
.base_url()
.map_err(|e| DataFusionError::External(Box::new(e)))?
.as_str(),
)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
),
max_rows_per_file: opts.max_rows_per_file.unwrap_or(wp.max_rows_per_file),
max_rows_per_group: opts.max_rows_per_group.unwrap_or(wp.max_rows_per_group),
max_bytes_per_file: opts.max_bytes_per_file.unwrap_or(wp.max_bytes_per_file),
input_batch_size: opts.input_batch_size.unwrap_or(64),
},
))
}
CopyToFormatOptions::Json(json_opts) => Box::new(JsonSink::from_obj_store(
store,
path,
Expand Down
30 changes: 19 additions & 11 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ use protogen::metastore::types::catalog::{
use protogen::metastore::types::options::{
CopyToDestinationOptions, CopyToDestinationOptionsAzure, CopyToDestinationOptionsGcs,
CopyToDestinationOptionsLocal, CopyToDestinationOptionsS3, CopyToFormatOptions,
CopyToFormatOptionsCsv, CopyToFormatOptionsJson, CopyToFormatOptionsParquet,
CredentialsOptions, CredentialsOptionsAws, CredentialsOptionsAzure, CredentialsOptionsDebug,
CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsCassandra,
DatabaseOptionsClickhouse, DatabaseOptionsDebug, DatabaseOptionsDeltaLake,
DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres,
DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog, DeltaLakeUnityCatalog,
StorageOptions, TableOptions, TableOptionsBigQuery, TableOptionsCassandra,
TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsLocal,
TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres,
TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions,
TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
CopyToFormatOptionsCsv, CopyToFormatOptionsJson, CopyToFormatOptionsLance,
CopyToFormatOptionsParquet, CredentialsOptions, CredentialsOptionsAws, CredentialsOptionsAzure,
CredentialsOptionsDebug, CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery,
DatabaseOptionsCassandra, DatabaseOptionsClickhouse, DatabaseOptionsDebug,
DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog,
DeltaLakeUnityCatalog, StorageOptions, TableOptions, TableOptionsBigQuery,
TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs,
TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore,
TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer,
TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
};
use protogen::metastore::types::service::{AlterDatabaseOperation, AlterTableOperation};
use sqlbuiltins::builtins::{CURRENT_SESSION_SCHEMA, DEFAULT_CATALOG};
Expand Down Expand Up @@ -1790,6 +1790,14 @@ impl<'a> SessionPlanner<'a> {
CopyToFormatOptions::Json(CopyToFormatOptionsJson { array })
}
Some(CopyToFormatOptions::BSON) => CopyToFormatOptions::Bson {},
Some(CopyToFormatOptions::LANCE) => {
CopyToFormatOptions::Lance(CopyToFormatOptionsLance {
max_rows_per_file: m.remove_optional("max_rows_per_file")?,
max_rows_per_group: m.remove_optional("max_rows_per_group")?,
max_bytes_per_file: m.remove_optional("max_bytes_per_file")?,
input_batch_size: m.remove_optional("input_batch_size")?,
})
}
Some(other) => return Err(internal!("unsupported output format: {other}")),
};

Expand Down
Loading

0 comments on commit 9323041

Please sign in to comment.