diff --git a/Cargo.toml b/Cargo.toml index 70eadf76d..d2a773e36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ rust-version = "1.77.1" [workspace.dependencies] anyhow = "1.0.72" -apache-avro = "0.16" +apache-avro = "0.17" array-init = "2" arrow-arith = { version = "52" } arrow-array = { version = "52" } diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index 653f52aec..7f8142745 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -19,22 +19,26 @@ use std::collections::BTreeMap; use apache_avro::schema::{ - DecimalSchema, FixedSchema, Name, RecordField as AvroRecordField, RecordFieldOrder, - RecordSchema, UnionSchema, + ArraySchema, DecimalSchema, FixedSchema, MapSchema, Name, RecordField as AvroRecordField, + RecordFieldOrder, RecordSchema, UnionSchema, }; use apache_avro::Schema as AvroSchema; use itertools::{Either, Itertools}; use serde_json::{Number, Value}; use crate::spec::{ - visit_schema, ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor, - StructType, + visit_schema, ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, + SchemaVisitor, StructType, Type, }; -use crate::{Error, ErrorKind, Result}; +use crate::{ensure_data_valid, Error, ErrorKind, Result}; +const ELEMENT_ID: &str = "element-id"; const FILED_ID_PROP: &str = "field-id"; +const KEY_ID: &str = "key-id"; +const VALUE_ID: &str = "value-id"; const UUID_BYTES: usize = 16; const UUID_LOGICAL_TYPE: &str = "uuid"; +const MAP_LOGICAL_TYPE: &str = "map"; // # TODO: https://github.com/apache/iceberg-rust/issues/86 // This const may better to maintain in avro-rs. const LOGICAL_TYPE: &str = "logicalType"; @@ -124,8 +128,13 @@ impl SchemaVisitor for SchemaToAvroSchema { field_schema = avro_optional(field_schema)?; } - // TODO: We need to add element id prop here, but rust's avro schema doesn't support property except record schema. - Ok(Either::Left(AvroSchema::Array(Box::new(field_schema)))) + Ok(Either::Left(AvroSchema::Array(ArraySchema { + items: Box::new(field_schema), + attributes: BTreeMap::from([( + ELEMENT_ID.to_string(), + Value::Number(Number::from(list.element_field.id)), + )]), + }))) } fn map( @@ -141,7 +150,19 @@ impl SchemaVisitor for SchemaToAvroSchema { } if matches!(key_field_schema, AvroSchema::String) { - Ok(Either::Left(AvroSchema::Map(Box::new(value_field_schema)))) + Ok(Either::Left(AvroSchema::Map(MapSchema { + types: Box::new(value_field_schema), + attributes: BTreeMap::from([ + ( + KEY_ID.to_string(), + Value::Number(Number::from(map.key_field.id)), + ), + ( + VALUE_ID.to_string(), + Value::Number(Number::from(map.value_field.id)), + ), + ]), + }))) } else { // Avro map requires that key must be string type. Here we convert it to array if key is // not string type. @@ -187,7 +208,13 @@ impl SchemaVisitor for SchemaToAvroSchema { fields, )?; - Ok(Either::Left(AvroSchema::Array(item_avro_schema.into()))) + Ok(Either::Left(AvroSchema::Array(ArraySchema { + items: Box::new(item_avro_schema), + attributes: BTreeMap::from([( + LOGICAL_TYPE.to_string(), + Value::String(MAP_LOGICAL_TYPE.to_string()), + )]), + }))) } } @@ -255,6 +282,7 @@ pub(crate) fn avro_fixed_schema(len: usize, logical_type: Option<&str>) -> Resul doc: None, size: len, attributes, + default: None, })) } @@ -274,6 +302,7 @@ pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result Result { ])?)) } -#[cfg(test)] -mod tests { - use std::fs::read_to_string; - - use apache_avro::schema::{Namespace, UnionSchema}; - use apache_avro::Schema as AvroSchema; - - use super::*; - use crate::ensure_data_valid; - use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type}; - - fn is_avro_optional(avro_schema: &AvroSchema) -> bool { - match avro_schema { - AvroSchema::Union(union) => union.is_nullable(), - _ => false, - } +fn is_avro_optional(avro_schema: &AvroSchema) -> bool { + match avro_schema { + AvroSchema::Union(union) => union.is_nullable(), + _ => false, } +} - /// Post order avro schema visitor. - pub(crate) trait AvroSchemaVisitor { - type T; +/// Post order avro schema visitor. +pub(crate) trait AvroSchemaVisitor { + type T; - fn record(&mut self, record: &RecordSchema, fields: Vec) -> Result; + fn record(&mut self, record: &RecordSchema, fields: Vec) -> Result; - fn union(&mut self, union: &UnionSchema, options: Vec) -> Result; + fn union(&mut self, union: &UnionSchema, options: Vec) -> Result; - fn array(&mut self, array: &AvroSchema, item: Self::T) -> Result; - fn map(&mut self, map: &AvroSchema, value: Self::T) -> Result; + fn array(&mut self, array: &ArraySchema, item: Self::T) -> Result; + fn map(&mut self, map: &MapSchema, value: Self::T) -> Result; + // There are two representation for iceberg map in avro: array of key-value records, or map when keys are strings (optional), + // ref: https://iceberg.apache.org/spec/#avro + fn map_array(&mut self, array: &RecordSchema, key: Self::T, value: Self::T) -> Result; - fn primitive(&mut self, schema: &AvroSchema) -> Result; - } + fn primitive(&mut self, schema: &AvroSchema) -> Result; +} - struct AvroSchemaToSchema { - next_id: i32, +/// Visit avro schema in post order visitor. +pub(crate) fn visit(schema: &AvroSchema, visitor: &mut V) -> Result { + match schema { + AvroSchema::Record(record) => { + let field_results = record + .fields + .iter() + .map(|f| visit(&f.schema, visitor)) + .collect::>>()?; + + visitor.record(record, field_results) + } + AvroSchema::Union(union) => { + let option_results = union + .variants() + .iter() + .map(|f| visit(f, visitor)) + .collect::>>()?; + + visitor.union(union, option_results) + } + AvroSchema::Array(item) => { + if let Some(logical_type) = item + .attributes + .get(LOGICAL_TYPE) + .and_then(|v| Value::as_str(v)) + { + if logical_type == MAP_LOGICAL_TYPE { + if let AvroSchema::Record(record_schema) = &*item.items { + let key = visit(&record_schema.fields[0].schema, visitor)?; + let value = visit(&record_schema.fields[1].schema, visitor)?; + return visitor.map_array(record_schema, key, value); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, item is not a record.", + )); + } + } else { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Logical type {logical_type} is not support in iceberg array type.", + ), + )); + } + } + let item_result = visit(&item.items, visitor)?; + visitor.array(item, item_result) + } + AvroSchema::Map(inner) => { + let item_result = visit(&inner.types, visitor)?; + visitor.map(inner, item_result) + } + schema => visitor.primitive(schema), } +} - impl AvroSchemaToSchema { - fn next_field_id(&mut self) -> i32 { - self.next_id += 1; - self.next_id - } +struct AvroSchemaToSchema; + +impl AvroSchemaToSchema { + /// A convenient way to get element id(i32) from attributes. + #[inline] + fn get_element_id_from_attributes( + attributes: &BTreeMap, + name: &str, + ) -> Result { + attributes + .get(name) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro array schema, missing element id.", + ) + })? + .as_i64() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro array schema, element id is not a valid i64 number.", + ) + })? + .try_into() + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro array schema, element id is not a valid i32.", + ) + }) } +} - impl AvroSchemaVisitor for AvroSchemaToSchema { - // Only `AvroSchema::Null` will return `None` - type T = Option; - - fn record( - &mut self, - record: &RecordSchema, - field_types: Vec>, - ) -> Result> { - let mut fields = Vec::with_capacity(field_types.len()); - for (avro_field, typ) in record.fields.iter().zip_eq(field_types) { - let field_id = avro_field - .custom_attributes - .get(FILED_ID_PROP) - .and_then(Value::as_i64) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Can't convert field, missing field id: {avro_field:?}"), - ) - })?; +impl AvroSchemaVisitor for AvroSchemaToSchema { + // Only `AvroSchema::Null` will return `None` + type T = Option; - let optional = is_avro_optional(&avro_field.schema); + fn record( + &mut self, + record: &RecordSchema, + field_types: Vec>, + ) -> Result> { + let mut fields = Vec::with_capacity(field_types.len()); + for (avro_field, typ) in record.fields.iter().zip_eq(field_types) { + let field_id = + Self::get_element_id_from_attributes(&avro_field.custom_attributes, FILED_ID_PROP)?; - let mut field = if optional { - NestedField::optional(field_id as i32, &avro_field.name, typ.unwrap()) - } else { - NestedField::required(field_id as i32, &avro_field.name, typ.unwrap()) - }; + let optional = is_avro_optional(&avro_field.schema); - if let Some(doc) = &avro_field.doc { - field = field.with_doc(doc); - } + let mut field = NestedField::new(field_id, &avro_field.name, typ.unwrap(), !optional); - fields.push(field.into()); + if let Some(doc) = &avro_field.doc { + field = field.with_doc(doc); } - Ok(Some(Type::Struct(StructType::new(fields)))) + fields.push(field.into()); } - fn union( - &mut self, - union: &UnionSchema, - mut options: Vec>, - ) -> Result> { + Ok(Some(Type::Struct(StructType::new(fields)))) + } + + fn union( + &mut self, + union: &UnionSchema, + mut options: Vec>, + ) -> Result> { + ensure_data_valid!( + options.len() <= 2 && !options.is_empty(), + "Can't convert avro union type {:?} to iceberg.", + union + ); + + if options.len() > 1 { ensure_data_valid!( - options.len() <= 2 && !options.is_empty(), + options[0].is_none(), "Can't convert avro union type {:?} to iceberg.", union ); - - if options.len() > 1 { - ensure_data_valid!( - options[0].is_none(), - "Can't convert avro union type {:?} to iceberg.", - union - ); - } - - if options.len() == 1 { - Ok(Some(options.remove(0).unwrap())) - } else { - Ok(Some(options.remove(1).unwrap())) - } } - fn array(&mut self, array: &AvroSchema, item: Option) -> Result { - if let AvroSchema::Array(item_schema) = array { - let element_field = NestedField::list_element( - self.next_field_id(), - item.unwrap(), - !is_avro_optional(item_schema), - ) - .into(); - Ok(Some(Type::List(ListType { element_field }))) - } else { - Err(Error::new( - ErrorKind::Unexpected, - "Expected avro array schema, but {array}", - )) - } + if options.len() == 1 { + Ok(Some(options.remove(0).unwrap())) + } else { + Ok(Some(options.remove(1).unwrap())) } + } - fn map(&mut self, map: &AvroSchema, value: Option) -> Result> { - if let AvroSchema::Map(value_schema) = map { - // Due to avro rust implementation's limitation, we can't store attributes in map schema, - // we will fix it later when it has been resolved. - let key_field = NestedField::map_key_element( - self.next_field_id(), - Type::Primitive(PrimitiveType::String), - ); - let value_field = NestedField::map_value_element( - self.next_field_id(), - value.unwrap(), - !is_avro_optional(value_schema), - ); - Ok(Some(Type::Map(MapType { - key_field: key_field.into(), - value_field: value_field.into(), - }))) - } else { - Err(Error::new( - ErrorKind::Unexpected, - "Expected avro map schema, but {map}", - )) - } - } + fn array(&mut self, array: &ArraySchema, item: Option) -> Result { + let element_field_id = Self::get_element_id_from_attributes(&array.attributes, ELEMENT_ID)?; + let element_field = NestedField::list_element( + element_field_id, + item.unwrap(), + !is_avro_optional(&array.items), + ) + .into(); + Ok(Some(Type::List(ListType { element_field }))) + } - fn primitive(&mut self, schema: &AvroSchema) -> Result> { - let typ = match schema { - AvroSchema::Decimal(decimal) => { - Type::decimal(decimal.precision as u32, decimal.scale as u32)? - } - AvroSchema::Date => Type::Primitive(PrimitiveType::Date), - AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time), - AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp), - AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean), - AvroSchema::Int => Type::Primitive(PrimitiveType::Int), - AvroSchema::Long => Type::Primitive(PrimitiveType::Long), - AvroSchema::Float => Type::Primitive(PrimitiveType::Float), - AvroSchema::Double => Type::Primitive(PrimitiveType::Double), - AvroSchema::String | AvroSchema::Enum(_) => Type::Primitive(PrimitiveType::String), - AvroSchema::Fixed(fixed) => { - if let Some(logical_type) = fixed.attributes.get(LOGICAL_TYPE) { - let logical_type = logical_type.as_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "logicalType in attributes of avro schema is not a string type", - ) - })?; - match logical_type { - UUID_LOGICAL_TYPE => Type::Primitive(PrimitiveType::Uuid), - ty => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!( + fn map(&mut self, map: &MapSchema, value: Option) -> Result> { + let key_field_id = Self::get_element_id_from_attributes(&map.attributes, KEY_ID)?; + let key_field = + NestedField::map_key_element(key_field_id, Type::Primitive(PrimitiveType::String)); + let value_field_id = Self::get_element_id_from_attributes(&map.attributes, VALUE_ID)?; + let value_field = NestedField::map_value_element( + value_field_id, + value.unwrap(), + !is_avro_optional(&map.types), + ); + Ok(Some(Type::Map(MapType { + key_field: key_field.into(), + value_field: value_field.into(), + }))) + } + + fn primitive(&mut self, schema: &AvroSchema) -> Result> { + let typ = match schema { + AvroSchema::Decimal(decimal) => { + Type::decimal(decimal.precision as u32, decimal.scale as u32)? + } + AvroSchema::Date => Type::Primitive(PrimitiveType::Date), + AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time), + AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp), + AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean), + AvroSchema::Int => Type::Primitive(PrimitiveType::Int), + AvroSchema::Long => Type::Primitive(PrimitiveType::Long), + AvroSchema::Float => Type::Primitive(PrimitiveType::Float), + AvroSchema::Double => Type::Primitive(PrimitiveType::Double), + AvroSchema::String | AvroSchema::Enum(_) => Type::Primitive(PrimitiveType::String), + AvroSchema::Fixed(fixed) => { + if let Some(logical_type) = fixed.attributes.get(LOGICAL_TYPE) { + let logical_type = logical_type.as_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "logicalType in attributes of avro schema is not a string type", + ) + })?; + match logical_type { + UUID_LOGICAL_TYPE => Type::Primitive(PrimitiveType::Uuid), + ty => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( "Logical type {ty} is not support in iceberg primitive type.", ), - )) - } + )) } - } else { - Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)) } + } else { + Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)) } - AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary), - AvroSchema::Null => return Ok(None), - _ => { - return Err(Error::new( - ErrorKind::Unexpected, - "Unable to convert avro {schema} to iceberg primitive type.", - )) - } - }; - - Ok(Some(typ)) - } - } - - /// Visit avro schema in post order visitor. - pub(crate) fn visit( - schema: &AvroSchema, - visitor: &mut V, - ) -> Result { - match schema { - AvroSchema::Record(record) => { - let field_results = record - .fields - .iter() - .map(|f| visit(&f.schema, visitor)) - .collect::>>()?; - - visitor.record(record, field_results) - } - AvroSchema::Union(union) => { - let option_results = union - .variants() - .iter() - .map(|f| visit(f, visitor)) - .collect::>>()?; - - visitor.union(union, option_results) } - AvroSchema::Array(item) => { - let item_result = visit(item, visitor)?; - visitor.array(schema, item_result) - } - AvroSchema::Map(inner) => { - let item_result = visit(inner, visitor)?; - visitor.map(schema, item_result) - } - schema => visitor.primitive(schema), - } - } - /// Converts avro schema to iceberg schema. - pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result { - if let AvroSchema::Record(_) = avro_schema { - let mut converter = AvroSchemaToSchema { next_id: 0 }; - let typ = - visit(avro_schema, &mut converter)?.expect("Iceberg schema should not be none."); - if let Type::Struct(s) = typ { - Schema::builder() - .with_fields(s.fields().iter().cloned()) - .build() - } else { - Err(Error::new( + AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary), + AvroSchema::Null => return Ok(None), + _ => { + return Err(Error::new( ErrorKind::Unexpected, - format!("Expected to convert avro record schema to struct type, but {typ}"), + "Unable to convert avro {schema} to iceberg primitive type.", )) } + }; + + Ok(Some(typ)) + } + + fn map_array( + &mut self, + array: &RecordSchema, + key: Option, + value: Option, + ) -> Result { + let key = key.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, missing key schema.", + ) + })?; + let value = value.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't convert avro map schema, missing value schema.", + ) + })?; + let key_id = Self::get_element_id_from_attributes( + &array.fields[0].custom_attributes, + FILED_ID_PROP, + )?; + let value_id = Self::get_element_id_from_attributes( + &array.fields[1].custom_attributes, + FILED_ID_PROP, + )?; + let key_field = NestedField::map_key_element(key_id, key); + let value_field = NestedField::map_value_element( + value_id, + value, + !is_avro_optional(&array.fields[1].schema), + ); + Ok(Some(Type::Map(MapType { + key_field: key_field.into(), + value_field: value_field.into(), + }))) + } +} + +// # TODO +// Fix this when we have used `avro_schema_to_schema` inner. +#[allow(unused)] +/// Converts avro schema to iceberg schema. +pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result { + if let AvroSchema::Record(_) = avro_schema { + let mut converter = AvroSchemaToSchema; + let typ = visit(avro_schema, &mut converter)?.expect("Iceberg schema should not be none."); + if let Type::Struct(s) = typ { + Schema::builder() + .with_fields(s.fields().iter().cloned()) + .build() } else { Err(Error::new( - ErrorKind::DataInvalid, - "Can't convert non record avro schema to iceberg schema: {avro_schema}", + ErrorKind::Unexpected, + format!("Expected to convert avro record schema to struct type, but {typ}"), )) } + } else { + Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert non record avro schema to iceberg schema: {avro_schema}", + )) } +} + +#[cfg(test)] +mod tests { + use std::fs::read_to_string; + use std::sync::Arc; + + use apache_avro::schema::{Namespace, UnionSchema}; + use apache_avro::Schema as AvroSchema; + + use super::*; + use crate::avro::schema::AvroSchemaToSchema; + use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type}; fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema { let input = read_to_string(format!( @@ -557,22 +649,27 @@ mod tests { AvroSchema::parse_str(input.as_str()).unwrap() } - fn check_schema_conversion( - avro_schema: AvroSchema, - expected_iceberg_schema: Schema, - check_avro_to_iceberg: bool, - ) { - if check_avro_to_iceberg { - let converted_iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap(); - assert_eq!(expected_iceberg_schema, converted_iceberg_schema); - } + /// Help function to check schema conversion between avro and iceberg: + /// 1. avro to iceberg + /// 2. iceberg to avro + /// 3. iceberg to avro to iceberg back + fn check_schema_conversion(avro_schema: AvroSchema, iceberg_schema: Schema) { + // 1. avro to iceberg + let converted_iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap(); + assert_eq!(iceberg_schema, converted_iceberg_schema); + // 2. iceberg to avro let converted_avro_schema = schema_to_avro_schema( avro_schema.name().unwrap().fullname(Namespace::None), - &expected_iceberg_schema, + &iceberg_schema, ) .unwrap(); assert_eq!(avro_schema, converted_avro_schema); + + // 3.iceberg to avro to iceberg back + let converted_avro_converted_iceberg_schema = + avro_schema_to_schema(&converted_avro_schema).unwrap(); + assert_eq!(iceberg_schema, converted_avro_converted_iceberg_schema); } #[test] @@ -651,7 +748,6 @@ mod tests { check_schema_conversion( read_test_data_file_to_avro_schema("avro_schema_manifest_file_v1.json"), iceberg_schema, - false, ); } @@ -700,7 +796,7 @@ mod tests { .unwrap() }; - check_schema_conversion(avro_schema, iceberg_schema, false); + check_schema_conversion(avro_schema, iceberg_schema); } #[test] @@ -749,7 +845,7 @@ mod tests { .unwrap() }; - check_schema_conversion(avro_schema, iceberg_schema, false); + check_schema_conversion(avro_schema, iceberg_schema); } #[test] @@ -826,7 +922,144 @@ mod tests { .unwrap() }; - check_schema_conversion(avro_schema, iceberg_schema, false); + check_schema_conversion(avro_schema, iceberg_schema); + } + + #[test] + fn test_schema_with_array_map() { + let avro_schema = { + AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "avro_schema", + "fields": [ + { + "name": "optional", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "k102_v103", + "fields": [ + { + "name": "key", + "type": "boolean", + "field-id": 102 + }, + { + "name": "value", + "type": ["null", "boolean"], + "field-id": 103 + } + ] + }, + "default": [], + "element-id": 101, + "logicalType": "map" + }, + "field-id": 100 + },{ + "name": "required", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "k105_v106", + "fields": [ + { + "name": "key", + "type": "boolean", + "field-id": 105 + }, + { + "name": "value", + "type": "boolean", + "field-id": 106 + } + ] + }, + "default": [], + "logicalType": "map" + }, + "field-id": 104 + }, { + "name": "string_map", + "type": { + "type": "map", + "values": ["null", "long"], + "key-id": 108, + "value-id": 109 + }, + "field-id": 107 + } + ] +} +"#, + ) + .unwrap() + }; + + let iceberg_schema = { + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 100, + "optional", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 102, + PrimitiveType::Boolean.into(), + ) + .into(), + value_field: NestedField::map_value_element( + 103, + PrimitiveType::Boolean.into(), + false, + ) + .into(), + }), + )), + Arc::new(NestedField::required( + 104, + "required", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 105, + PrimitiveType::Boolean.into(), + ) + .into(), + value_field: NestedField::map_value_element( + 106, + PrimitiveType::Boolean.into(), + true, + ) + .into(), + }), + )), + Arc::new(NestedField::required( + 107, + "string_map", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 108, + PrimitiveType::String.into(), + ) + .into(), + value_field: NestedField::map_value_element( + 109, + PrimitiveType::Long.into(), + false, + ) + .into(), + }), + )), + ]) + .build() + .unwrap() + }; + + check_schema_conversion(avro_schema, iceberg_schema); } #[test] @@ -838,7 +1071,7 @@ mod tests { ]) .unwrap(); - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; let options = avro_schema .variants() @@ -850,7 +1083,7 @@ mod tests { #[test] fn test_string_type() { - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; let avro_schema = AvroSchema::String; assert_eq!( @@ -875,10 +1108,14 @@ mod tests { .unwrap() }; - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let AvroSchema::Map(avro_schema) = avro_schema else { + unreachable!() + }; + + let mut converter = AvroSchemaToSchema; let iceberg_type = Type::Map(MapType { - key_field: NestedField::map_key_element(1, PrimitiveType::String.into()).into(), - value_field: NestedField::map_value_element(2, PrimitiveType::Long.into(), false) + key_field: NestedField::map_key_element(101, PrimitiveType::String.into()).into(), + value_field: NestedField::map_value_element(102, PrimitiveType::Long.into(), false) .into(), }); @@ -902,7 +1139,7 @@ mod tests { .unwrap() }; - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; let iceberg_type = Type::from(PrimitiveType::Fixed(22)); @@ -914,7 +1151,7 @@ mod tests { #[test] fn test_unknown_primitive() { - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; assert!(converter.primitive(&AvroSchema::Duration).is_err()); } @@ -953,7 +1190,7 @@ mod tests { .unwrap() }; - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; assert_eq!( Type::decimal(25, 19).unwrap(), @@ -963,7 +1200,7 @@ mod tests { #[test] fn test_date_type() { - let mut converter = AvroSchemaToSchema { next_id: 0 }; + let mut converter = AvroSchemaToSchema; assert_eq!( Type::from(PrimitiveType::Date), diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 06dc95570..d8883878e 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -581,6 +581,19 @@ impl From for SerdeNestedField { pub type NestedFieldRef = Arc; impl NestedField { + /// Construct a new field. + pub fn new(id: i32, name: impl ToString, field_type: Type, required: bool) -> Self { + Self { + id, + name: name.to_string(), + required, + field_type: Box::new(field_type), + doc: None, + initial_default: None, + write_default: None, + } + } + /// Construct a required field. pub fn required(id: i32, name: impl ToString, field_type: Type) -> Self { Self {