Skip to content

Commit

Permalink
fix: add data_type and nullable to StructField hash (#2045) (#2190)
Browse files Browse the repository at this point in the history
# Description
Correct hashing of StructField and add tests.
- Current: Two StructFields with the same name but different `data_type`
or different `nullable` are considered to hash equivalently.
- To be: `name`, `data_type` and `nullability` should match for a field
to be considered equivalent.

# Related Issue(s)
closes #2045

---------

Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
Co-authored-by: sonhmai <>
  • Loading branch information
sonhmai and rtyler authored Feb 24, 2024
1 parent 77ddd7c commit bcf124c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 7 deletions.
74 changes: 69 additions & 5 deletions crates/core/src/kernel/models/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ pub struct StructField {
impl Hash for StructField {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.data_type.hash(state);
self.nullable.hash(state);
}
}

Expand Down Expand Up @@ -215,7 +217,7 @@ impl StructField {

/// A struct is used to represent both the top-level schema of the table
/// as well as struct columns that contain nested columns.
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
pub struct StructType {
#[serde(rename = "type")]
/// The type of this struct
Expand Down Expand Up @@ -379,7 +381,7 @@ impl<'a> IntoIterator for &'a StructType {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
#[serde(rename_all = "camelCase")]
/// An array stores a variable length collection of items of some type.
pub struct ArrayType {
Expand Down Expand Up @@ -415,7 +417,7 @@ impl ArrayType {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
#[serde(rename_all = "camelCase")]
/// A map stores an arbitrary length collection of key-value pairs
pub struct MapType {
Expand Down Expand Up @@ -465,7 +467,7 @@ fn default_true() -> bool {
true
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
#[serde(rename_all = "camelCase")]
/// Primitive types supported by Delta
pub enum PrimitiveType {
Expand Down Expand Up @@ -559,7 +561,7 @@ impl Display for PrimitiveType {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
#[serde(untagged, rename_all = "camelCase")]
/// Top level delta tdatatypes
pub enum DataType {
Expand Down Expand Up @@ -641,6 +643,7 @@ mod tests {
use super::*;
use serde_json;
use serde_json::json;
use std::hash::DefaultHasher;

#[test]
fn test_serde_data_types() {
Expand Down Expand Up @@ -866,4 +869,65 @@ mod tests {
let buf = r#"{"type":"struct","fields":[{"name":"ID_D_DATE","type":"long","nullable":true,"metadata":{"delta.identity.start":1,"delta.identity.step":1,"delta.identity.allowExplicitInsert":false}},{"name":"TXT_DateKey","type":"string","nullable":true,"metadata":{}}]}"#;
let _schema: StructType = serde_json::from_str(buf).expect("Failed to load");
}

fn get_hash(field: &StructField) -> u64 {
let mut hasher = DefaultHasher::new();
field.hash(&mut hasher);
hasher.finish()
}

#[test]
fn test_hash_struct_field() {
// different names should result in different hashes
let field_1 = StructField::new(
"field_name_1",
DataType::Primitive(PrimitiveType::Decimal(4, 4)),
true,
);
let field_2 = StructField::new(
"field_name_2",
DataType::Primitive(PrimitiveType::Decimal(4, 4)),
true,
);
assert_ne!(get_hash(&field_1), get_hash(&field_2));

// different types should result in different hashes
let field_int = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Integer),
true,
);
let field_string = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::String),
true,
);
assert_ne!(get_hash(&field_int), get_hash(&field_string));

// different nullability should result in different hashes
let field_true = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Binary),
true,
);
let field_false = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Binary),
false,
);
assert_ne!(get_hash(&field_true), get_hash(&field_false));

// case where hashes are the same
let field_1 = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Timestamp),
true,
);
let field_2 = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Timestamp),
true,
);
assert_eq!(get_hash(&field_1), get_hash(&field_2));
}
}
11 changes: 9 additions & 2 deletions crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ impl ConvertToDeltaBuilder {
// Iterate over the parquet files. Parse partition columns, generate add actions and collect parquet file schemas
let mut arrow_schemas = Vec::new();
let mut actions = Vec::new();
// partition columns that were defined by caller and are expected to apply on this table
let mut expected_partitions: HashMap<String, StructField> = self
.partition_schema
.clone()
.into_iter()
.map(|field| (field.name.clone(), field))
.collect();
// A HashSet of all unique partition columns in a Parquet table
let mut partition_columns = HashSet::new();
// A vector of StructField of all unique partition columns in a Parquet table
Expand All @@ -290,7 +297,7 @@ impl ConvertToDeltaBuilder {
.ok_or(Error::MissingPartitionSchema)?;

if partition_columns.insert(key.to_string()) {
if let Some(schema) = self.partition_schema.take(key) {
if let Some(schema) = expected_partitions.remove(key) {
partition_schema_fields.insert(key.to_string(), schema);
} else {
// Return an error if the schema of a partition column is not provided by user
Expand Down Expand Up @@ -360,7 +367,7 @@ impl ConvertToDeltaBuilder {
arrow_schemas.push(arrow_schema);
}

if !self.partition_schema.is_empty() {
if !expected_partitions.is_empty() {
// Partition column provided by the user does not exist in the parquet files
return Err(Error::PartitionColumnNotExist(self.partition_schema));
}
Expand Down

0 comments on commit bcf124c

Please sign in to comment.