diff --git a/python/Cargo.toml b/python/Cargo.toml index c5aafda258..8a7d9944bf 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -17,7 +17,7 @@ crate-type = ["cdylib"] name = "deltalake._internal" [dependencies] -arrow-schema = { version = "26", features = ["serde"] } +arrow-schema = { version = "28", features = ["serde"] } chrono = "0" env_logger = "0" futures = "0.3" diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 74793b4a4c..259b7a98d1 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -356,7 +356,7 @@ impl ObjectInputFile { } // reference is end of the stream; offset is usually negative 2 => { - self.pos = self.content_length as i64 + offset; + self.pos = self.content_length + offset; } _ => { return Err(PyValueError::new_err( diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 1d0f0bf831..8a27cff234 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -10,7 +10,7 @@ description = "Native Delta Lake implementation in Rust" edition = "2021" [dependencies] -arrow = { version = "26", optional = true } +arrow = { version = "28", optional = true } async-trait = "0.1" bytes = "1" chrono = "0.4.22" @@ -25,7 +25,7 @@ num-traits = "0.2.15" object_store = { version = "0.5.2", features = ["aws_profile"] } once_cell = "1.16.0" parking_lot = "0.12" -parquet = { version = "26", features = ["async"], optional = true } +parquet = { version = "28", features = ["async"], optional = true } parquet2 = { version = "0.17", optional = true } percent-encoding = "2" serde = { version = "1", features = ["derive"] } @@ -46,10 +46,10 @@ rusoto_dynamodb = { version = "0.48", default-features = false, optional = true rusoto_glue = { version = "0.48", default-features = false, optional = true } # Datafusion -datafusion = { version = "14", optional = true } -datafusion-expr = { version = "14", optional = true } -datafusion-common = { version = "14", optional = true } -datafusion-proto = { version = "14", optional = true } +datafusion = { version = "15", optional = true } +datafusion-expr = { version = "15", optional = true } +datafusion-common = { version = "15", optional = true } +datafusion-proto = { version = "15", optional = true } # NOTE dependencies only for integration tests fs_extra = { version = "1.2.0", optional = true } diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index cdfe1f75c6..0057175df2 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -8,8 +8,6 @@ use arrow::datatypes::{ use arrow::error::ArrowError; use lazy_static::lazy_static; use regex::Regex; -use std::collections::BTreeMap; -use std::collections::HashMap; use std::convert::TryFrom; impl TryFrom<&schema::Schema> for ArrowSchema { @@ -30,24 +28,20 @@ impl TryFrom<&schema::SchemaField> for ArrowField { type Error = ArrowError; fn try_from(f: &schema::SchemaField) -> Result { - let mut field = ArrowField::new( + let metadata = f + .get_metadata() + .iter() + .map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) + .collect::>() + .map_err(|err| ArrowError::JsonError(err.to_string()))?; + + let field = ArrowField::new( f.get_name(), ArrowDataType::try_from(f.get_type())?, f.is_nullable(), - ); + ) + .with_metadata(metadata); - let metadata: Option> = Some(f.get_metadata()) - .filter(|metadata| metadata.is_empty()) - .map(|metadata| { - metadata - .iter() - .map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) - .collect::>() - .map_err(|err| ArrowError::JsonError(err.to_string())) - }) - .transpose()?; - - field.set_metadata(metadata); Ok(field) } } @@ -111,7 +105,7 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType { )) })?; let precision = extract.get(1).and_then(|v| v.as_str().parse::().ok()); - let scale = extract.get(2).and_then(|v| v.as_str().parse::().ok()); + let scale = extract.get(2).and_then(|v| v.as_str().parse::().ok()); match (precision, scale) { // TODO how do we decide which variant (128 / 256) to use? (Some(p), Some(s)) => Ok(ArrowDataType::Decimal128(p, s)), @@ -205,12 +199,9 @@ impl TryFrom<&ArrowField> for schema::SchemaField { arrow_field.is_nullable(), arrow_field .metadata() - .as_ref() - .map_or_else(HashMap::new, |m| { - m.iter() - .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone()))) - .collect() - }), + .iter() + .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone()))) + .collect(), )) } } diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 3644b25e61..e756a9b2fc 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -29,6 +29,7 @@ use std::sync::Arc; use arrow::array::ArrayRef; use arrow::compute::{cast_with_options, CastOptions}; use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit}; +use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; @@ -42,14 +43,16 @@ use datafusion::execution::FunctionRegistry; use datafusion::optimizer::utils::conjunction; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use datafusion::physical_plan::file_format::FileScanConfig; +use datafusion::physical_plan::file_format::{partition_type_wrap, FileScanConfig}; use datafusion::physical_plan::{ ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use datafusion_common::scalar::ScalarValue; use datafusion_common::{Column, DataFusionError, Result as DataFusionResult}; +use datafusion_expr::logical_plan::CreateExternalTable; use datafusion_expr::{Expr, Extension, LogicalPlan}; -use datafusion_proto::logical_plan::{LogicalExtensionCodec, PhysicalExtensionCodec}; +use datafusion_proto::logical_plan::LogicalExtensionCodec; +use datafusion_proto::physical_plan::PhysicalExtensionCodec; use object_store::{path::Path, ObjectMeta}; use url::Url; @@ -331,7 +334,7 @@ impl TableProvider for DeltaTable { async fn scan( &self, session: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> DataFusionResult> { @@ -382,16 +385,26 @@ impl TableProvider for DeltaTable { .cloned() .collect(), )); - let parquet_scan = ParquetFormat::default() + + let parquet_scan = ParquetFormat::new(session.config_options()) .create_physical_plan( FileScanConfig { object_store_url: self.storage.object_store_url(), file_schema, file_groups: file_groups.into_values().collect(), statistics: self.datafusion_table_statistics(), - projection: projection.clone(), + projection: projection.cloned(), limit, - table_partition_cols, + table_partition_cols: table_partition_cols + .iter() + .map(|c| { + Ok(( + c.to_owned(), + partition_type_wrap(schema.field_with_name(c)?.data_type().clone()), + )) + }) + .collect::, ArrowError>>()?, + output_ordering: None, config_options: Default::default(), }, filters, @@ -829,7 +842,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec { fn try_encode_table_provider( &self, node: Arc, - mut buf: &mut Vec, + buf: &mut Vec, ) -> Result<(), DataFusionError> { let table = node .as_ref() @@ -838,7 +851,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec { .ok_or_else(|| { DataFusionError::Internal("Can't encode non-delta tables".to_string()) })?; - serde_json::to_writer(&mut buf, table) + serde_json::to_writer(buf, table) .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string())) } } @@ -848,8 +861,12 @@ pub struct DeltaTableFactory {} #[async_trait] impl TableProviderFactory for DeltaTableFactory { - async fn create(&self, url: &str) -> datafusion::error::Result> { - let provider = open_table(url).await.unwrap(); + async fn create( + &self, + _ctx: &SessionState, + cmd: &CreateExternalTable, + ) -> datafusion::error::Result> { + let provider = open_table(cmd.to_owned().location).await.unwrap(); Ok(Arc::new(provider)) } } diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index 821b11b0f5..70f825f994 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -87,7 +87,7 @@ impl std::future::IntoFuture for LoadBuilder { ctx.state() .runtime_env .register_object_store(scheme, "", store); - let scan_plan = table.scan(&ctx.state(), &None, &[], None).await?; + let scan_plan = table.scan(&ctx.state(), None, &[], None).await?; let plan = CoalescePartitionsExec::new(scan_plan); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); let stream = plan.execute(0, task_ctx)?; diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 823f1d466d..9ce9bf5b0a 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -33,7 +33,7 @@ use crate::storage::DeltaObjectStore; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; -use arrow::datatypes::SchemaRef as ArrowSchemaRef; +use arrow::datatypes::{DataType, SchemaRef as ArrowSchemaRef}; use arrow::record_batch::RecordBatch; use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; @@ -196,11 +196,23 @@ impl std::future::IntoFuture for WriteBuilder { fn into_future(self) -> Self::IntoFuture { let this = self; + fn schema_to_vec_name_type(schema: ArrowSchemaRef) -> Vec<(String, DataType)> { + schema + .fields() + .iter() + .map(|f| (f.name().to_owned(), f.data_type().clone())) + .collect::>() + } + + fn schema_eq(l: ArrowSchemaRef, r: ArrowSchemaRef) -> bool { + schema_to_vec_name_type(l) == schema_to_vec_name_type(r) + } + Box::pin(async move { let object_store = if let Some(store) = this.object_store { Ok(store) } else { - DeltaTableBuilder::from_uri(&this.location.unwrap()) + DeltaTableBuilder::from_uri(this.location.unwrap()) .with_storage_options(this.storage_options.unwrap_or_default()) .build_storage() }?; @@ -274,7 +286,8 @@ impl std::future::IntoFuture for WriteBuilder { if let Ok(meta) = table.get_metadata() { let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?); - if schema != curr_schema { + + if !schema_eq(curr_schema, schema.clone()) { return Err(DeltaTableError::Generic( "Updating table schema not yet implemented".to_string(), )); diff --git a/rust/src/writer/json.rs b/rust/src/writer/json.rs index 7fa86d8515..c9fee9db77 100644 --- a/rust/src/writer/json.rs +++ b/rust/src/writer/json.rs @@ -183,7 +183,7 @@ impl JsonWriter { partition_columns: Option>, storage_options: Option>, ) -> Result { - let storage = DeltaTableBuilder::from_uri(&table_uri) + let storage = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) .build_storage()?; diff --git a/rust/src/writer/stats.rs b/rust/src/writer/stats.rs index e14b7dc333..2ec3c5f8d3 100644 --- a/rust/src/writer/stats.rs +++ b/rust/src/writer/stats.rs @@ -38,7 +38,7 @@ pub(crate) fn apply_null_counts( array .columns() - .into_iter() + .iter() .zip(fields) .for_each(|(column, field)| { let key = field.name().to_owned(); diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/.00000000000000000000.json.crc b/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 0000000000..5042c0fbc2 Binary files /dev/null and b/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/.00000000000000000000.json.crc differ diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/00000000000000000000.json b/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..2db663806a --- /dev/null +++ b/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/00000000000000000000.json @@ -0,0 +1,6 @@ +{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}} +{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}} +{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":"b"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}} +{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}} diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/.part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet.crc b/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/.part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet.crc new file mode 100644 index 0000000000..4df00298f1 Binary files /dev/null and b/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/.part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet.crc differ diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet b/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet new file mode 100644 index 0000000000..3f09f1d945 Binary files /dev/null and b/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet differ diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/.part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet.crc b/rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/.part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet.crc new file mode 100644 index 0000000000..f6fffe7bcd Binary files /dev/null and b/rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/.part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet.crc differ diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet b/rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet new file mode 100644 index 0000000000..10ec40964b Binary files /dev/null and b/rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet differ diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/.part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet.crc b/rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/.part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet.crc new file mode 100644 index 0000000000..c31be60c20 Binary files /dev/null and b/rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/.part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet.crc differ diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet b/rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet new file mode 100644 index 0000000000..a0e02daa50 Binary files /dev/null and b/rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet differ diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 519c5eaacd..27e3642054 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use arrow::array::*; use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; use arrow::record_batch::RecordBatch; +use datafusion::assert_batches_sorted_eq; use datafusion::datasource::datasource::TableProviderFactory; use datafusion::datasource::TableProvider; use datafusion::execution::context::{SessionContext, TaskContext}; @@ -86,7 +87,7 @@ async fn prepare_table( #[tokio::test] async fn test_datafusion_sql_registration() -> Result<()> { let mut table_factories: HashMap> = HashMap::new(); - table_factories.insert("deltatable".to_string(), Arc::new(DeltaTableFactory {})); + table_factories.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {})); let cfg = RuntimeConfig::new().with_table_factories(table_factories); let env = RuntimeEnv::new(cfg).unwrap(); let ses = SessionConfig::new(); @@ -255,7 +256,7 @@ async fn test_files_scanned() -> Result<()> { assert_eq!(table.version(), 2); let ctx = SessionContext::new(); - let plan = table.scan(&ctx.state(), &None, &[], None).await?; + let plan = table.scan(&ctx.state(), None, &[], None).await?; let plan = CoalescePartitionsExec::new(plan.clone()); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); @@ -270,7 +271,7 @@ async fn test_files_scanned() -> Result<()> { Expr::Literal(ScalarValue::Int32(Some(5))), ); - let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), &None, &[filter], None).await?); + let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), None, &[filter], None).await?); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); let _result = common::collect(plan.execute(0, task_ctx)?).await?; @@ -280,3 +281,50 @@ async fn test_files_scanned() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_datafusion_partitioned_types() -> Result<()> { + let ctx = SessionContext::new(); + let table = deltalake::open_table("./tests/data/delta-2.2.0-partitioned-types") + .await + .unwrap(); + ctx.register_table("demo", Arc::new(table))?; + + let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?; + + let expected = vec![ + "+----+----+----+", + "| c3 | c1 | c2 |", + "+----+----+----+", + "| 5 | 4 | c |", + "| 6 | 5 | b |", + "| 4 | 6 | a |", + "+----+----+----+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); + + let expected_schema = ArrowSchema::new(vec![ + ArrowField::new("c3", ArrowDataType::Int32, true), + ArrowField::new( + "c1", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt16), + Box::new(ArrowDataType::Int32), + ), + false, + ), + ArrowField::new( + "c2", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt16), + Box::new(ArrowDataType::Utf8), + ), + false, + ), + ]); + + assert_eq!(Arc::new(expected_schema), batches[0].schema()); + + Ok(()) +}