Skip to content

Commit

Permalink
fix: case sensitivity for z-order (#1982)
Browse files Browse the repository at this point in the history
# Description
Enable usage of z-order optimization on columns that have
capitalization.

# Related Issue(s)
- closes #1586
  • Loading branch information
Blajda authored Dec 19, 2023
1 parent 763d39e commit df04624
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ impl MergePlan {
context: Arc<zorder::ZOrderExecContext>,
) -> Result<BoxStream<'static, Result<RecordBatch, ParquetError>>, DeltaTableError> {
use datafusion::prelude::{col, ParquetReadOptions};
use datafusion_common::Column;
use datafusion_expr::expr::ScalarUDF;
use datafusion_expr::Expr;

Expand All @@ -549,12 +550,16 @@ impl MergePlan {
.schema()
.fields()
.iter()
.map(|f| col(f.name()))
.map(|f| Expr::Column(Column::from_qualified_name_ignore_case(f.name())))
.collect_vec();

// Add a temporary z-order column we will sort by, and then drop.
const ZORDER_KEY_COLUMN: &str = "__zorder_key";
let cols = context.columns.iter().map(col).collect_vec();
let cols = context
.columns
.iter()
.map(|col| Expr::Column(Column::from_qualified_name_ignore_case(col)))
.collect_vec();
let expr = Expr::ScalarUDF(ScalarUDF::new(
Arc::new(zorder::datafusion::zorder_key_udf()),
cols,
Expand Down Expand Up @@ -1208,6 +1213,7 @@ pub(super) mod zorder {
use ::datafusion::assert_batches_eq;
use arrow_array::{Int32Array, StringArray};
use arrow_ord::sort::sort_to_indices;
use arrow_schema::Field;
use arrow_select::take::take;
use rand::Rng;
#[test]
Expand Down Expand Up @@ -1300,6 +1306,42 @@ pub(super) mod zorder {
}
array
}

#[tokio::test]
async fn test_zorder_mixed_case() {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("moDified", DataType::Utf8, true),
Field::new("ID", DataType::Utf8, true),
Field::new("vaLue", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let res = crate::DeltaOps(table)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["moDified".into()]))
.await;
assert!(res.is_ok());
}
}
}

Expand Down

0 comments on commit df04624

Please sign in to comment.