Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: double-encode paths during zorder optimize when they contain special characters #2897

Merged
merged 1 commit into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ use url::Url;
use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{
Add, DataCheck, EagerSnapshot, Invariant, LogicalFile, Snapshot, StructTypeExt,
};
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
use crate::logstore::LogStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
Expand Down
2 changes: 0 additions & 2 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3099,7 +3099,6 @@ mod tests {
use crate::kernel::Protocol;
use crate::operations::merge::Action;

let _ = pretty_env_logger::try_init();
let schema = get_delta_schema();

let actions = vec![Action::Protocol(Protocol::new(1, 4))];
Expand Down Expand Up @@ -3194,7 +3193,6 @@ mod tests {
use crate::kernel::Protocol;
use crate::operations::merge::Action;

let _ = pretty_env_logger::try_init();
let schema = get_delta_schema();

let actions = vec![Action::Protocol(Protocol::new(1, 4))];
Expand Down
120 changes: 113 additions & 7 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ use parquet::basic::{Compression, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
use tracing::debug;
use tracing::*;
use url::Url;

use super::transaction::PROTOCOL;
use super::writer::{PartitionWriter, PartitionWriterConfig};
Expand Down Expand Up @@ -137,6 +138,7 @@ impl fmt::Display for MetricDetails {
}
}

#[derive(Debug)]
/// Metrics for a single partition
pub struct PartialMetrics {
/// Number of optimized files added
Expand Down Expand Up @@ -345,6 +347,7 @@ impl From<OptimizeInput> for DeltaOperation {
}
}

/// Generate an appropriate remove action for the optimization task
fn create_remove(
path: &str,
partitions: &IndexMap<String, Scalar>,
Expand Down Expand Up @@ -606,12 +609,26 @@ impl MergePlan {
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{Expr, ScalarUDF};

let locations = files
// This code is ... not ideal. Essentially `read_parquet` expects Strings that it will then
// parse as URLs and then pass back to the object store (x_x). This can cause problems when
// paths in object storage have special characters like spaces, etc.
//
// This [str::replace] i kind of a hack to address
// <https://github.com/delta-io/delta-rs/issues/2834 >
let locations: Vec<String> = files
.iter()
.map(|file| format!("delta-rs:///{}", file.location))
.collect_vec();
.map(|om| {
format!(
"delta-rs:///{}",
str::replace(om.location.as_ref(), "%", "%25")
)
})
.collect();
debug!("Reading z-order with locations are: {locations:?}");

let df = context
.ctx
// TODO: should read options have the partition columns
.read_parquet(locations, ParquetReadOptions::default())
.await?;

Expand Down Expand Up @@ -712,13 +729,15 @@ impl MergePlan {
bins.len() <= num_cpus::get(),
));

debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");
#[cfg(feature = "datafusion")]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
log_store.object_store(),
max_spill_size,
)?);
let task_parameters = self.task_parameters.clone();

let log_store = log_store.clone();
futures::stream::iter(bins)
.map(move |(_, (partition, files))| {
Expand Down Expand Up @@ -891,9 +910,7 @@ impl MergeBin {
self.size_bytes += meta.size as i64;
self.files.push(meta);
}
}

impl MergeBin {
fn iter(&self) -> impl Iterator<Item = &ObjectMeta> {
self.files.iter()
}
Expand Down Expand Up @@ -1036,6 +1053,7 @@ fn build_zorder_plan(
.or_insert_with(|| (partition_values, MergeBin::new()))
.1
.add(object_meta);
error!("partition_files inside the zorder plan: {partition_files:?}");
}

let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files);
Expand Down Expand Up @@ -1229,7 +1247,6 @@ pub(super) mod zorder {
let runtime = Arc::new(RuntimeEnv::new(config)?);
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store);

use url::Url;
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF));
Ok(Self { columns, ctx })
Expand Down Expand Up @@ -1269,6 +1286,7 @@ pub(super) mod zorder {
fn zorder_key_datafusion(
columns: &[ColumnarValue],
) -> Result<ColumnarValue, DataFusionError> {
debug!("zorder_key_datafusion: {columns:#?}");
let length = columns
.iter()
.map(|col| match col {
Expand Down Expand Up @@ -1423,6 +1441,94 @@ pub(super) mod zorder {
.await;
assert!(res.is_ok());
}

/// Issue <https://github.com/delta-io/delta-rs/issues/2834>
#[tokio::test]
async fn test_zorder_space_in_partition_value() {
use arrow_schema::Schema as ArrowSchema;
let _ = pretty_env_logger::try_init();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("country", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec![
"Germany",
"China",
"Canada",
"Dominican Republic",
])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
//Arc::new(arrow::array::StringArray::from(vec!["Dominican Republic"])),
//Arc::new(arrow::array::Int32Array::from(vec![100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_partition_columns(vec!["country"])
.with_save_mode(crate::protocol::SaveMode::Overwrite)
.await
.unwrap();

let res = crate::DeltaOps(table)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["modified".into()]))
.await;
assert!(res.is_ok(), "Failed to optimize: {res:#?}");
}

#[tokio::test]
async fn test_zorder_space_in_partition_value_garbage() {
use arrow_schema::Schema as ArrowSchema;
let _ = pretty_env_logger::try_init();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("country", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec![
"Germany", "China", "Canada", "USA$$!",
])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_partition_columns(vec!["country"])
.with_save_mode(crate::protocol::SaveMode::Overwrite)
.await
.unwrap();

let res = crate::DeltaOps(table)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["modified".into()]))
.await;
assert!(res.is_ok(), "Failed to optimize: {res:#?}");
}
}
}

Expand Down
35 changes: 35 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import pyarrow as pa
import pytest

try:
import pandas as pd
except ModuleNotFoundError:
_has_pandas = False
else:
_has_pandas = True

from deltalake import DeltaTable, write_deltalake
from deltalake.table import CommitProperties

Expand Down Expand Up @@ -132,3 +139,31 @@ def test_optimize_schema_evolved_table(
assert dt.to_pyarrow_table().sort_by([("foo", "ascending")]) == data.sort_by(
[("foo", "ascending")]
)


@pytest.mark.pandas
def test_zorder_with_space_partition(tmp_path: pathlib.Path):
df = pd.DataFrame(
{
"user": ["James", "Anna", "Sara", "Martin"],
"country": ["United States", "Canada", "Costa Rica", "South Africa"],
"age": [34, 23, 45, 26],
}
)

write_deltalake(
table_or_uri=tmp_path,
data=df,
mode="overwrite",
partition_by=["country"],
)

test_table = DeltaTable(tmp_path)

# retrieve by partition works fine
partitioned_df = test_table.to_pandas(
partitions=[("country", "=", "United States")],
)
print(partitioned_df)

test_table.optimize.z_order(columns=["user"])
Loading