Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DataFusion 15 #1021

Merged
merged 3 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ crate-type = ["cdylib"]
name = "deltalake._internal"

[dependencies]
arrow-schema = { version = "26", features = ["serde"] }
arrow-schema = { version = "28", features = ["serde"] }
chrono = "0"
env_logger = "0"
futures = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl ObjectInputFile {
}
// reference is end of the stream; offset is usually negative
2 => {
self.pos = self.content_length as i64 + offset;
self.pos = self.content_length + offset;
}
_ => {
return Err(PyValueError::new_err(
Expand Down
12 changes: 6 additions & 6 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Native Delta Lake implementation in Rust"
edition = "2021"

[dependencies]
arrow = { version = "26", optional = true }
arrow = { version = "28", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = "0.4.22"
Expand All @@ -25,7 +25,7 @@ num-traits = "0.2.15"
object_store = { version = "0.5.2", features = ["aws_profile"] }
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "26", features = ["async"], optional = true }
parquet = { version = "28", features = ["async"], optional = true }
parquet2 = { version = "0.17", optional = true }
percent-encoding = "2"
serde = { version = "1", features = ["derive"] }
Expand All @@ -46,10 +46,10 @@ rusoto_dynamodb = { version = "0.48", default-features = false, optional = true
rusoto_glue = { version = "0.48", default-features = false, optional = true }

# Datafusion
datafusion = { version = "14", optional = true }
datafusion-expr = { version = "14", optional = true }
datafusion-common = { version = "14", optional = true }
datafusion-proto = { version = "14", optional = true }
datafusion = { version = "15", optional = true }
datafusion-expr = { version = "15", optional = true }
datafusion-common = { version = "15", optional = true }
datafusion-proto = { version = "15", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand Down
37 changes: 14 additions & 23 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use arrow::datatypes::{
use arrow::error::ArrowError;
use lazy_static::lazy_static;
use regex::Regex;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::convert::TryFrom;

impl TryFrom<&schema::Schema> for ArrowSchema {
Expand All @@ -30,24 +28,20 @@ impl TryFrom<&schema::SchemaField> for ArrowField {
type Error = ArrowError;

fn try_from(f: &schema::SchemaField) -> Result<Self, ArrowError> {
let mut field = ArrowField::new(
let metadata = f
.get_metadata()
.iter()
.map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?)))
.collect::<Result<_, serde_json::Error>>()
.map_err(|err| ArrowError::JsonError(err.to_string()))?;

let field = ArrowField::new(
f.get_name(),
ArrowDataType::try_from(f.get_type())?,
f.is_nullable(),
);
)
.with_metadata(metadata);

let metadata: Option<BTreeMap<String, String>> = Some(f.get_metadata())
.filter(|metadata| metadata.is_empty())
.map(|metadata| {
metadata
.iter()
.map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?)))
.collect::<Result<_, serde_json::Error>>()
.map_err(|err| ArrowError::JsonError(err.to_string()))
})
.transpose()?;

field.set_metadata(metadata);
Ok(field)
}
}
Expand Down Expand Up @@ -111,7 +105,7 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType {
))
})?;
let precision = extract.get(1).and_then(|v| v.as_str().parse::<u8>().ok());
let scale = extract.get(2).and_then(|v| v.as_str().parse::<u8>().ok());
let scale = extract.get(2).and_then(|v| v.as_str().parse::<i8>().ok());
match (precision, scale) {
// TODO how do we decide which variant (128 / 256) to use?
(Some(p), Some(s)) => Ok(ArrowDataType::Decimal128(p, s)),
Expand Down Expand Up @@ -205,12 +199,9 @@ impl TryFrom<&ArrowField> for schema::SchemaField {
arrow_field.is_nullable(),
arrow_field
.metadata()
.as_ref()
.map_or_else(HashMap::new, |m| {
m.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect()
}),
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect(),
))
}
}
Expand Down
37 changes: 27 additions & 10 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::sync::Arc;
use arrow::array::ArrayRef;
use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
Expand All @@ -42,14 +43,16 @@ use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::file_format::{partition_type_wrap, FileScanConfig};
use datafusion::physical_plan::{
ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult};
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::{Expr, Extension, LogicalPlan};
use datafusion_proto::logical_plan::{LogicalExtensionCodec, PhysicalExtensionCodec};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use object_store::{path::Path, ObjectMeta};
use url::Url;

Expand Down Expand Up @@ -331,7 +334,7 @@ impl TableProvider for DeltaTable {
async fn scan(
&self,
session: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -382,16 +385,26 @@ impl TableProvider for DeltaTable {
.cloned()
.collect(),
));
let parquet_scan = ParquetFormat::default()

let parquet_scan = ParquetFormat::new(session.config_options())
.create_physical_plan(
FileScanConfig {
object_store_url: self.storage.object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: self.datafusion_table_statistics(),
projection: projection.clone(),
projection: projection.cloned(),
limit,
table_partition_cols,
table_partition_cols: table_partition_cols
.iter()
.map(|c| {
Ok((
c.to_owned(),
partition_type_wrap(schema.field_with_name(c)?.data_type().clone()),
))
})
.collect::<Result<Vec<_>, ArrowError>>()?,
output_ordering: None,
config_options: Default::default(),
},
filters,
Expand Down Expand Up @@ -829,7 +842,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec {
fn try_encode_table_provider(
&self,
node: Arc<dyn TableProvider>,
mut buf: &mut Vec<u8>,
buf: &mut Vec<u8>,
) -> Result<(), DataFusionError> {
let table = node
.as_ref()
Expand All @@ -838,7 +851,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec {
.ok_or_else(|| {
DataFusionError::Internal("Can't encode non-delta tables".to_string())
})?;
serde_json::to_writer(&mut buf, table)
serde_json::to_writer(buf, table)
.map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
}
}
Expand All @@ -848,8 +861,12 @@ pub struct DeltaTableFactory {}

#[async_trait]
impl TableProviderFactory for DeltaTableFactory {
async fn create(&self, url: &str) -> datafusion::error::Result<Arc<dyn TableProvider>> {
let provider = open_table(url).await.unwrap();
async fn create(
&self,
_ctx: &SessionState,
cmd: &CreateExternalTable,
) -> datafusion::error::Result<Arc<dyn TableProvider>> {
let provider = open_table(cmd.to_owned().location).await.unwrap();
Ok(Arc::new(provider))
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl std::future::IntoFuture for LoadBuilder {
ctx.state()
.runtime_env
.register_object_store(scheme, "", store);
let scan_plan = table.scan(&ctx.state(), &None, &[], None).await?;
let scan_plan = table.scan(&ctx.state(), None, &[], None).await?;
let plan = CoalescePartitionsExec::new(scan_plan);
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx)?;
Expand Down
19 changes: 16 additions & 3 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::storage::DeltaObjectStore;
use crate::writer::record_batch::divide_by_partition_values;
use crate::writer::utils::PartitionPath;

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::datatypes::{DataType, SchemaRef as ArrowSchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan};
Expand Down Expand Up @@ -196,11 +196,23 @@ impl std::future::IntoFuture for WriteBuilder {
fn into_future(self) -> Self::IntoFuture {
let this = self;

fn schema_to_vec_name_type(schema: ArrowSchemaRef) -> Vec<(String, DataType)> {
schema
.fields()
.iter()
.map(|f| (f.name().to_owned(), f.data_type().clone()))
.collect::<Vec<_>>()
}

fn schema_eq(l: ArrowSchemaRef, r: ArrowSchemaRef) -> bool {
schema_to_vec_name_type(l) == schema_to_vec_name_type(r)
}

Box::pin(async move {
let object_store = if let Some(store) = this.object_store {
Ok(store)
} else {
DeltaTableBuilder::from_uri(&this.location.unwrap())
DeltaTableBuilder::from_uri(this.location.unwrap())
.with_storage_options(this.storage_options.unwrap_or_default())
.build_storage()
}?;
Expand Down Expand Up @@ -274,7 +286,8 @@ impl std::future::IntoFuture for WriteBuilder {

if let Ok(meta) = table.get_metadata() {
let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?);
if schema != curr_schema {

if !schema_eq(curr_schema, schema.clone()) {
return Err(DeltaTableError::Generic(
"Updating table schema not yet implemented".to_string(),
));
Expand Down
2 changes: 1 addition & 1 deletion rust/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl JsonWriter {
partition_columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
) -> Result<Self, DeltaTableError> {
let storage = DeltaTableBuilder::from_uri(&table_uri)
let storage = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build_storage()?;

Expand Down
2 changes: 1 addition & 1 deletion rust/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub(crate) fn apply_null_counts(

array
.columns()
.into_iter()
.iter()
.zip(fields)
.for_each(|(column, field)| {
let key = field.name().to_owned();
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}
{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}
{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":"b"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}
{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
54 changes: 51 additions & 3 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use arrow::array::*;
use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::{SessionContext, TaskContext};
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn prepare_table(
#[tokio::test]
async fn test_datafusion_sql_registration() -> Result<()> {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> = HashMap::new();
table_factories.insert("deltatable".to_string(), Arc::new(DeltaTableFactory {}));
table_factories.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
let env = RuntimeEnv::new(cfg).unwrap();
let ses = SessionConfig::new();
Expand Down Expand Up @@ -255,7 +256,7 @@ async fn test_files_scanned() -> Result<()> {
assert_eq!(table.version(), 2);

let ctx = SessionContext::new();
let plan = table.scan(&ctx.state(), &None, &[], None).await?;
let plan = table.scan(&ctx.state(), None, &[], None).await?;
let plan = CoalescePartitionsExec::new(plan.clone());

let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
Expand All @@ -270,7 +271,7 @@ async fn test_files_scanned() -> Result<()> {
Expr::Literal(ScalarValue::Int32(Some(5))),
);

let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), &None, &[filter], None).await?);
let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), None, &[filter], None).await?);
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let _result = common::collect(plan.execute(0, task_ctx)?).await?;

Expand All @@ -280,3 +281,50 @@ async fn test_files_scanned() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_datafusion_partitioned_types() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/delta-2.2.0-partitioned-types")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;

let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?;

let expected = vec![
"+----+----+----+",
"| c3 | c1 | c2 |",
"+----+----+----+",
"| 5 | 4 | c |",
"| 6 | 5 | b |",
"| 4 | 6 | a |",
"+----+----+----+",
];

assert_batches_sorted_eq!(&expected, &batches);

let expected_schema = ArrowSchema::new(vec![
ArrowField::new("c3", ArrowDataType::Int32, true),
ArrowField::new(
"c1",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::Int32),
),
false,
),
ArrowField::new(
"c2",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::Utf8),
),
false,
),
]);

assert_eq!(Arc::new(expected_schema), batches[0].schema());

Ok(())
}