Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: double url encode of partition key #1324

Merged
merged 1 commit into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use object_store::{path::Path, ObjectMeta};
use object_store::ObjectMeta;
use url::Url;

use crate::builder::ensure_table_uri;
Expand Down Expand Up @@ -594,9 +594,8 @@ fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> P
);
PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(action.path.clone()),
last_modified,
size: action.size as usize,
..action.try_into().unwrap()
},
partition_values,
range: None,
Expand Down Expand Up @@ -952,6 +951,7 @@ mod tests {
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use object_store::path::Path;
use serde_json::json;
use std::ops::Deref;

Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl DeltaTableState {
Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?)
}

/// Get the pysical table schema.
/// Get the physical table schema.
///
/// This will construct a schema derived from the parquet schema of the latest data file,
/// and fields for partition columns from the schema defined in table meta data.
Expand Down
31 changes: 29 additions & 2 deletions rust/src/storage/utils.rs
mrjoe7 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,37 @@ impl TryFrom<&Add> for ObjectMeta {
Utc,
);
Ok(Self {
// TODO this won't work for absoute paths, since Paths are always relative to store.
location: Path::from(value.path.as_str()),
// TODO this won't work for absolute paths, since Paths are always relative to store.
location: Path::parse(value.path.as_str())?,
last_modified,
size: value.size as usize,
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_object_meta_from_add_action() {
let add = Add {
path: "x=A%252FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
.to_string(),
size: 123,
modification_time: 123456789,
..Default::default()
};

let meta: ObjectMeta = (&add).try_into().unwrap();
assert_eq!(
meta.location,
Path::parse(
"x=A%252FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
)
.unwrap()
);
assert_eq!(meta.size, 123);
assert_eq!(meta.last_modified.timestamp_millis(), 123456789);
}
}
24 changes: 24 additions & 0 deletions rust/tests/read_delta_partitions_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,27 @@ async fn read_null_partitions_from_checkpoint() {
let table = deltalake::open_table(&table.table_uri()).await.unwrap();
assert_eq!(table.version(), 2);
}

#[cfg(feature = "datafusion")]
#[tokio::test]
async fn load_from_delta_8_0_table_with_special_partition() {
use datafusion::physical_plan::SendableRecordBatchStream;
use deltalake::{DeltaOps, DeltaTable};
use futures::{future, StreamExt};

let table = deltalake::open_table("./tests/data/delta-0.8.0-special-partition")
.await
.unwrap();

let (_, stream): (DeltaTable, SendableRecordBatchStream) = DeltaOps(table)
.load()
.with_columns(vec!["x", "y"])
.await
.unwrap();
stream
.for_each(|batch| {
assert!(batch.is_ok());
future::ready(())
})
.await;
}