-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for "name" column mapping #205
Conversation
if using `Scan::execute` or `transform_to_logical`
kernel/src/scan/mod.rs
Outdated
pub enum ColumnType<'a> { | ||
Selected(&'a StructField), | ||
Partition(&'a StructField), | ||
/// Scan uses this to set up what kinds of columns it is scanning. For Selected and Partition, the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note i've included both ways we could do things here. Selected
stores an actual copy of the field which we have to clone()
when we do ScanBuilder::build()
, but does simplify things slightly later when we need the field. Partition
just stores the index, and then when needed we can index into the schema to get the actual field. This introduces the problem that things could get out of sync (although that would likely indicate bigger issues), but should be more efficient that cloning the whole field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good.
left a few minor questions, but noting major :).
kernel/src/column_mapping.rs
Outdated
pub(crate) fn get_name_mapped_physical_field( | ||
logical_field: &StructField, | ||
) -> DeltaResult<(StructField, &str)> { | ||
match logical_field.metadata.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) { | ||
Some(val) => match val { | ||
MetadataValue::Number(_) => { | ||
Err(Error::generic("{ColumnMetadataKey::ColumnMappingPhysicalName} must be a string in name mapping mode")) | ||
} | ||
MetadataValue::String(name) => { | ||
Ok(( | ||
StructField::new(name, logical_field.data_type().clone(), logical_field.is_nullable()), | ||
name | ||
)) | ||
} | ||
} | ||
None => { | ||
Err(Error::generic("fields MUST have a {ColumnMetadataKey::ColumnMappingPhysicalName} key in their metadata in name mapping mode")) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in delta-rs
we just extended the StructField
with get_physical_name
. Don't have too string feeling about this, since its only internal, but personally I find it a little bit more convenient to work with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"more convenient" as in field.get_physical_name()
rather than get_name_mapped_physical_field(field)
? I would tend to agree with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, based on my experience debugging column mapping issues in delta-spark -- we should strongly consider making logical vs. physical be a first class concept everywhere in kernel. At any point when we have a field name (or schema), it should be immediately clear from the context whether that name/schema is logical or physical. In case column mapping is disabled, logical and physical name are the same. But almost all code shouldn't have to care about that corner case and should instead assume column mapping is enabled.
Otherwise, we risk trying to map a physical name, or failing to map a logical name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, the column mapping function would take the current column mapping mode as input, and then would use that to require the appropriate metadata is available:
impl StructField {
// Returns the physical name of this field, based on the column mapping mode in effect.
fn physical_name(&self, mode: ColumnMappingMode): DeltaResult<&str> {
let name = logical_field.metadata.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref());
match (mode, name) {
(ColumnMappingMode::None, None) => Ok(self.name.as_str()),
(ColumnMappingMode::Name, Some(MetadataValue::String(name))) => Ok(name.as_str()),
_ => Err(...),
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I've done mostly that.
Re the logical/physical naming comment, makes a lot of sense, and I've converted a lot of stuff to be explicit here. I'll take another pass to see what else could be renamed though.
kernel/src/column_mapping.rs
Outdated
pub(crate) fn get_name_mapped_physical_field( | ||
logical_field: &StructField, | ||
) -> DeltaResult<(StructField, &str)> { | ||
match logical_field.metadata.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) { | ||
Some(val) => match val { | ||
MetadataValue::Number(_) => { | ||
Err(Error::generic("{ColumnMetadataKey::ColumnMappingPhysicalName} must be a string in name mapping mode")) | ||
} | ||
MetadataValue::String(name) => { | ||
Ok(( | ||
StructField::new(name, logical_field.data_type().clone(), logical_field.is_nullable()), | ||
name | ||
)) | ||
} | ||
} | ||
None => { | ||
Err(Error::generic("fields MUST have a {ColumnMetadataKey::ColumnMappingPhysicalName} key in their metadata in name mapping mode")) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"more convenient" as in field.get_physical_name()
rather than get_name_mapped_physical_field(field)
? I would tend to agree with that.
kernel/src/column_mapping.rs
Outdated
pub(crate) fn get_name_mapped_physical_field( | ||
logical_field: &StructField, | ||
) -> DeltaResult<(StructField, &str)> { | ||
match logical_field.metadata.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) { | ||
Some(val) => match val { | ||
MetadataValue::Number(_) => { | ||
Err(Error::generic("{ColumnMetadataKey::ColumnMappingPhysicalName} must be a string in name mapping mode")) | ||
} | ||
MetadataValue::String(name) => { | ||
Ok(( | ||
StructField::new(name, logical_field.data_type().clone(), logical_field.is_nullable()), | ||
name | ||
)) | ||
} | ||
} | ||
None => { | ||
Err(Error::generic("fields MUST have a {ColumnMetadataKey::ColumnMappingPhysicalName} key in their metadata in name mapping mode")) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, based on my experience debugging column mapping issues in delta-spark -- we should strongly consider making logical vs. physical be a first class concept everywhere in kernel. At any point when we have a field name (or schema), it should be immediately clear from the context whether that name/schema is logical or physical. In case column mapping is disabled, logical and physical name are the same. But almost all code shouldn't have to care about that corner case and should instead assume column mapping is enabled.
Otherwise, we risk trying to map a physical name, or failing to map a logical name.
kernel/src/column_mapping.rs
Outdated
pub(crate) fn get_name_mapped_physical_field( | ||
logical_field: &StructField, | ||
) -> DeltaResult<(StructField, &str)> { | ||
match logical_field.metadata.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) { | ||
Some(val) => match val { | ||
MetadataValue::Number(_) => { | ||
Err(Error::generic("{ColumnMetadataKey::ColumnMappingPhysicalName} must be a string in name mapping mode")) | ||
} | ||
MetadataValue::String(name) => { | ||
Ok(( | ||
StructField::new(name, logical_field.data_type().clone(), logical_field.is_nullable()), | ||
name | ||
)) | ||
} | ||
} | ||
None => { | ||
Err(Error::generic("fields MUST have a {ColumnMetadataKey::ColumnMappingPhysicalName} key in their metadata in name mapping mode")) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, the column mapping function would take the current column mapping mode as input, and then would use that to require the appropriate metadata is available:
impl StructField {
// Returns the physical name of this field, based on the column mapping mode in effect.
fn physical_name(&self, mode: ColumnMappingMode): DeltaResult<&str> {
let name = logical_field.metadata.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref());
match (mode, name) {
(ColumnMappingMode::None, None) => Ok(self.name.as_str()),
(ColumnMappingMode::Name, Some(MetadataValue::String(name))) => Ok(name.as_str()),
_ => Err(...),
}
}
}
// key to look in metadata.configuration for to get column mapping mode | ||
pub(crate) const COLUMN_MAPPING_MODE_KEY: &str = "delta.columnMapping.mode"; | ||
|
||
impl TryFrom<&str> for ColumnMappingMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl TryFrom<&str> for ColumnMappingMode { | |
impl TryFrom<T: AsRef<str>> for ColumnMappingMode { |
and then below
fn try_from(mode: &AsRef<str>) -> DeltaResult<Self> {
match mode.as_ref() {
(allows callers to pass a variety of string-like things more easily)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I wanted to do that, but you run into rust-lang/rust#50133
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
// A column, selected from the data, as is | ||
Selected(String), | ||
// A partition column that needs to be added back in | ||
Partition(usize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that column mapping is no longer exposed here, can Partition
be a reference again?
Then there's nothing to get out of sync any more and we can revert the corresponding error checking.
(related to #205 (comment))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// to materialize the partition column. | ||
pub enum ColumnType { | ||
// A column, selected from the data, as is | ||
Selected(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaken, this could be &'a str
since it's either a reference to the field's name, or to an entry from the field's metadata map? That way, we could avoid creating a String
unless partitions and/or CMM are actually involved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still keep a Vec<ColumnType>
in Scan
(the all_fields
field). This is to avoid having to recompute everything each time.
If we make this a reference with a lifetime, then Scan
has to have a lifetime, and then everything gets... messy.
I'd actually argue for going the "other way" if we want to remove string allocation, and just have this also store an index in the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I forgot that you hoisted all_fields
to be a field computed previously, where it used to be converted internally...
However -- I don't think we can "just" store a schema index for the column name, when it could be name-mapped or not?
kernel/src/schema.rs
Outdated
} | ||
MetadataValue::String(name) => Ok(name), | ||
} | ||
(ColumnMappingMode::Id, _) => Err(Error::generic("Don't support id column mapping yet")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should think about how to eventually support id mapping mode, because it's likely to be more invasive than name mapping, and name mapping is expressible in terms of id mapping. Maybe we can get away with a single implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we can't get away with a single implementation because (a) field ids are really a problem for the parquet reader to deal with; and (b) existing tables converted to column mapping mode have parquet files whose schema lacks field ids (which is one reason why the parquet reader has to handle field ids).
But by the same token... if the parquet reader is handling field ids, then maybe id mapping mode isn't so invasive after all. Could kernel "implement" id mapping mode by just verifying, at snapshot creation time, that the table schema contains field ids iff the mode is enabled, so the parquet reader doesn't get confused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. My feeling was that id mapping will require some changes to either the api, or at least the api contract, that we expose for parquet reading.
regardless, i've heard name mapping is more common and important to support, so merging this and then looking at adding id mapping seems to make sense to me. hopefully it won't require big changes to any of this code, and will mostly just be adding new logic in the places it matters.
.schema | ||
.unwrap_or_else(|| self.snapshot.schema().clone().into()); | ||
Scan { | ||
let (all_fields, read_fields, have_partition_cols) = get_state_info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: This takes a lot of self.snapshot
args... should it be Snapshot::get_scan_state_info
or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe? is 2 a lot? :)
Snapshot::get_scan_state_info
would need to take a logical schema, so I'm not clear it's a win
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're probably right. I was just thinking out loud
logical_schema: self.logical_schema.as_ref().clone(), | ||
read_schema: self.physical_schema.as_ref().clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: Is there a particular reason GlobalScanState
takes Schema
instead of SchemaRef
?
Why force a clone like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the idea is that GlobalScanState
should be (de)serializable so it would be easy for multi-node systems to send between nodes. Schema
is Serializable
and SchemaRef
isn't, so this makes it be less code.
We could add a custom De/Serialize
for it and just go into the arc and serialize the inner, but it wasn't clear to me that complexity was worth it (yet...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serialization is an engine design problem, not a kernel problem. At most kernel might provide a default serializer mechanism -- but even then I don't know what we'd provide, given the wide variety of ways engines handle serialization? And anyway this is useless for FFI because we don't even expose our own schema type to extern engines.
Meanwhile, large schemas (10k+ columns) could be pretty expensive to clone?
(we want to be friendly to distributed execution, but making single-thread clients pay extra overhead for a potential future use case seems cart-before-horse?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to provide a de/serialize_global_state
ffi call yes. If we want to let engines handle serialization all by themselves, we'll need to have more complex APIs to allow reconstruction of state that the kernel understands. possible but something we'll need to design.
ideally we'll sort most of this out with the "single expression" mode of fixup.
I've added #216 to at least get the clones
gone, and then we can iterate further.
This only "just works" if you're using
Scan::execute
orScan::get_scan_data
in conjunction withtransform_to_logical
.It does also add enough information into the
GlobalScanState
for an engine to remap things on its own.Additionally this does some work to optimize how we set up and execute scans. In particular, it calls the expensive
get_state_info
only when building the scan, or when callingtransform_to_logical
(which is assumed to be potentially running on another node/thread from the scan). This required two main changes:ScanBuilder::build()
is now fallible, and does more workColumnType
enum can't have a lifetime on it anymore as we want to store it in the scan and it would reference data in the scan which is just messy in rust.Tested by running the
column_mapping
test fromdat