Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for "name" column mapping #205

Merged
merged 26 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions acceptance/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use std::{path::Path, sync::Arc};

use arrow_array::RecordBatch;
use arrow_ord::sort::{lexsort_to_indices, SortColumn};
use arrow_schema::DataType;
use arrow_schema::{DataType, Schema};
use arrow_select::{concat::concat_batches, filter::filter_record_batch, take::take};

use delta_kernel::{
Expand Down Expand Up @@ -62,9 +62,7 @@ pub fn sort_record_batch(batch: RecordBatch) -> DeltaResult<RecordBatch> {
Ok(RecordBatch::try_new(batch.schema(), columns)?)
}

static SKIPPED_TESTS: &[&str; 3] = &[
// Kernel does not support column mapping yet
"column_mapping",
static SKIPPED_TESTS: &[&str; 2] = &[
// iceberg compat requires column mapping
"iceberg_compat_v1",
// For multi_partitioned_2: The golden table stores the timestamp as an INT96 (which is
Expand All @@ -74,6 +72,34 @@ static SKIPPED_TESTS: &[&str; 3] = &[
"multi_partitioned_2",
];

// Ensure that two schema have the same field names, data types, nullability, and
// dict_id/ordering. Basically just ignore the metadata because that diverges from the real data to
// the golden tabled data
fn assert_schema_fields_match(schema: &Schema, golden: &Schema) {
for (schema_field, golden_field) in schema.fields.iter().zip(golden.fields.iter()) {
assert!(
schema_field.name() == golden_field.name(),
"Field names don't match"
);
assert!(
schema_field.data_type() == golden_field.data_type(),
"Field data types don't match"
);
assert!(
schema_field.is_nullable() == golden_field.is_nullable(),
"Field nullability doesn't match"
);
assert!(
schema_field.dict_id() == golden_field.dict_id(),
"Field dict_id doesn't match"
);
assert!(
schema_field.dict_is_ordered() == golden_field.dict_is_ordered(),
"Field dict_is_ordered doesn't match"
);
}
}

pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo) -> TestResult<()> {
let root_dir = test_case.root_dir();
for skipped in SKIPPED_TESTS {
Expand All @@ -86,7 +112,7 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
let table_root = test_case.table_root()?;
let table = Table::new(table_root);
let snapshot = table.snapshot(engine, None)?;
let scan = ScanBuilder::new(snapshot).build();
let scan = ScanBuilder::new(snapshot).build()?;
let mut schema = None;
roeap marked this conversation as resolved.
Show resolved Hide resolved
let batches: Vec<RecordBatch> = scan
.execute(engine)?
Expand All @@ -110,25 +136,16 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
.collect();
let all_data = concat_batches(&schema.unwrap(), batches.iter()).map_err(Error::from)?;
let all_data = sort_record_batch(all_data)?;

let golden = read_golden(test_case.root_dir(), None)
.await?
.expect("Didn't find golden data");
let golden = sort_record_batch(golden)?;
let golden_schema = golden
.schema()
.as_ref()
.clone()
.with_metadata(HashMap::new());

assert!(
all_data.columns() == golden.columns(),
"Read data does not equal golden data"
);
assert!(
all_data.schema() == Arc::new(golden_schema),
"Schemas not equal"
);
assert_schema_fields_match(all_data.schema().as_ref(), golden.schema().as_ref());
assert!(
all_data.num_rows() == golden.num_rows(),
"Didn't have same number of rows"
Expand Down
2 changes: 2 additions & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ pub enum KernelError {
JoinFailureError,
Utf8Error,
ParseIntError,
InvalidColumnMappingMode,
InvalidTableLocation,
}

Expand Down Expand Up @@ -263,6 +264,7 @@ impl From<Error> for KernelError {
Error::JoinFailure(_) => KernelError::JoinFailureError,
Error::Utf8Error(_) => KernelError::Utf8Error,
Error::ParseIntError(_) => KernelError::ParseIntError,
Error::InvalidColumnMappingMode(_) => KernelError::InvalidColumnMappingMode,
Error::InvalidTableLocation(_) => KernelError::InvalidTableLocation,
Error::Backtraced {
source,
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ unsafe fn scan_impl(
scan_builder = scan_builder.with_predicate(predicate);
}
}
Ok(Arc::new(scan_builder.build()).into())
Ok(Arc::new(scan_builder.build()?).into())
}

#[handle_descriptor(target=GlobalScanState, mutable=false, sized=true)]
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/dump-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ fn try_main() -> DeltaResult<()> {
.transpose()?;
let scan = ScanBuilder::new(snapshot)
.with_schema_opt(read_schema_opt)
.build();
.build()?;

let mut batches = vec![];
for res in scan.execute(engine.as_ref())?.into_iter() {
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn main() {
}
Commands::Adds => {
use delta_kernel::Add;
let scan = ScanBuilder::new(snapshot).build();
let scan = ScanBuilder::new(snapshot).build().unwrap();
let files: Vec<Add> = scan
.files(&engine)
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ fn try_main() -> DeltaResult<()> {
// build a scan with the specified schema
let scan = ScanBuilder::new(snapshot)
.with_schema_opt(read_schema_opt)
.build();
.build()?;

// this gives us an iterator of (our engine data, selection vector). our engine data is just
// arrow data. The schema can be obtained by calling
Expand Down
28 changes: 28 additions & 0 deletions kernel/src/column_mapping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! Code to handle column mapping, including modes and schema transforms

use crate::{DeltaResult, Error};
use serde::{Deserialize, Serialize};

/// Modes of column mapping a table can be in
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum ColumnMappingMode {
None,
Id,
Name,
}

// key to look in metadata.configuration for to get column mapping mode
pub(crate) const COLUMN_MAPPING_MODE_KEY: &str = "delta.columnMapping.mode";

impl TryFrom<&str> for ColumnMappingMode {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
impl TryFrom<&str> for ColumnMappingMode {
impl TryFrom<T: AsRef<str>> for ColumnMappingMode {

and then below

    fn try_from(mode: &AsRef<str>) -> DeltaResult<Self> {
        match mode.as_ref() {

(allows callers to pass a variety of string-like things more easily)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I wanted to do that, but you run into rust-lang/rust#50133

type Error = Error;

fn try_from(mode: &str) -> DeltaResult<Self> {
match mode {
"none" => Ok(ColumnMappingMode::None),
"id" => Ok(ColumnMappingMode::Id),
"name" => Ok(ColumnMappingMode::Name),
_ => Err(Error::invalid_column_mapping_mode(mode)),
}
}
}
7 changes: 7 additions & 0 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ pub enum Error {
#[error("Could not parse int: {0}")]
ParseIntError(#[from] ParseIntError),

#[error("Invalid column mapping mode: {0}")]
InvalidColumnMappingMode(String),

/// Asked for a table at an invalid location
#[error("Invalid table location: {0}.")]
InvalidTableLocation(String),
Expand Down Expand Up @@ -173,6 +176,10 @@ impl Error {
Self::InvalidTableLocation(location.to_string())
}

pub fn invalid_column_mapping_mode(mode: impl ToString) -> Self {
Self::InvalidColumnMappingMode(mode.to_string())
}

// Capture a backtrace when the error is constructed.
#[must_use]
pub fn with_backtrace(self) -> Self {
Expand Down
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use url::Url;
use self::schema::{DataType, SchemaRef};

pub mod actions;
pub mod column_mapping;
pub mod engine_data;
pub mod error;
pub mod expressions;
Expand Down
Loading
Loading