Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add data_type and nullable to StructField hash (#2045) #2190

Merged
merged 6 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 69 additions & 5 deletions crates/core/src/kernel/models/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ pub struct StructField {
impl Hash for StructField {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.data_type.hash(state);
self.nullable.hash(state);
}
}

Expand Down Expand Up @@ -215,7 +217,7 @@ impl StructField {

/// A struct is used to represent both the top-level schema of the table
/// as well as struct columns that contain nested columns.
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
pub struct StructType {
#[serde(rename = "type")]
/// The type of this struct
Expand Down Expand Up @@ -379,7 +381,7 @@ impl<'a> IntoIterator for &'a StructType {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
#[serde(rename_all = "camelCase")]
/// An array stores a variable length collection of items of some type.
pub struct ArrayType {
Expand Down Expand Up @@ -415,7 +417,7 @@ impl ArrayType {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
#[serde(rename_all = "camelCase")]
/// A map stores an arbitrary length collection of key-value pairs
pub struct MapType {
Expand Down Expand Up @@ -465,7 +467,7 @@ fn default_true() -> bool {
true
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
#[serde(rename_all = "camelCase")]
/// Primitive types supported by Delta
pub enum PrimitiveType {
Expand Down Expand Up @@ -559,7 +561,7 @@ impl Display for PrimitiveType {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)]
#[serde(untagged, rename_all = "camelCase")]
/// Top level delta tdatatypes
pub enum DataType {
Expand Down Expand Up @@ -641,6 +643,7 @@ mod tests {
use super::*;
use serde_json;
use serde_json::json;
use std::hash::DefaultHasher;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incase anyone else runs into this - this raises the MSRV of running the tests to 1.76.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing this out!


#[test]
fn test_serde_data_types() {
Expand Down Expand Up @@ -866,4 +869,65 @@ mod tests {
let buf = r#"{"type":"struct","fields":[{"name":"ID_D_DATE","type":"long","nullable":true,"metadata":{"delta.identity.start":1,"delta.identity.step":1,"delta.identity.allowExplicitInsert":false}},{"name":"TXT_DateKey","type":"string","nullable":true,"metadata":{}}]}"#;
let _schema: StructType = serde_json::from_str(buf).expect("Failed to load");
}

fn get_hash(field: &StructField) -> u64 {
let mut hasher = DefaultHasher::new();
field.hash(&mut hasher);
hasher.finish()
}

#[test]
fn test_hash_struct_field() {
// different names should result in different hashes
let field_1 = StructField::new(
"field_name_1",
DataType::Primitive(PrimitiveType::Decimal(4, 4)),
true,
);
let field_2 = StructField::new(
"field_name_2",
DataType::Primitive(PrimitiveType::Decimal(4, 4)),
true,
);
assert_ne!(get_hash(&field_1), get_hash(&field_2));

// different types should result in different hashes
let field_int = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Integer),
true,
);
let field_string = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::String),
true,
);
assert_ne!(get_hash(&field_int), get_hash(&field_string));

// different nullability should result in different hashes
let field_true = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Binary),
true,
);
let field_false = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Binary),
false,
);
assert_ne!(get_hash(&field_true), get_hash(&field_false));

// case where hashes are the same
let field_1 = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Timestamp),
true,
);
let field_2 = StructField::new(
"field_name",
DataType::Primitive(PrimitiveType::Timestamp),
true,
);
assert_eq!(get_hash(&field_1), get_hash(&field_2));
}
}
11 changes: 9 additions & 2 deletions crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ impl ConvertToDeltaBuilder {
// Iterate over the parquet files. Parse partition columns, generate add actions and collect parquet file schemas
let mut arrow_schemas = Vec::new();
let mut actions = Vec::new();
// partition columns that were defined by caller and are expected to apply on this table
let mut expected_partitions: HashMap<String, StructField> = self
.partition_schema
.clone()
.into_iter()
.map(|field| (field.name.clone(), field))
.collect();
// A HashSet of all unique partition columns in a Parquet table
let mut partition_columns = HashSet::new();
// A vector of StructField of all unique partition columns in a Parquet table
Expand All @@ -290,7 +297,7 @@ impl ConvertToDeltaBuilder {
.ok_or(Error::MissingPartitionSchema)?;

if partition_columns.insert(key.to_string()) {
if let Some(schema) = self.partition_schema.take(key) {
if let Some(schema) = expected_partitions.remove(key) {
partition_schema_fields.insert(key.to_string(), schema);
} else {
// Return an error if the schema of a partition column is not provided by user
Expand Down Expand Up @@ -360,7 +367,7 @@ impl ConvertToDeltaBuilder {
arrow_schemas.push(arrow_schema);
}

if !self.partition_schema.is_empty() {
if !expected_partitions.is_empty() {
// Partition column provided by the user does not exist in the parquet files
return Err(Error::PartitionColumnNotExist(self.partition_schema));
}
Expand Down
Loading