diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 17f7c20d884..3c0dc0a919f 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -241,11 +241,9 @@ pub fn deserialize_message( )?; Ok(None) } - t => { - return Err(Error::nyi(format!( - "Reading types other than record batches not yet supported, unable to read {:?}", - t - ))); - } + t => Err(Error::nyi(format!( + "Reading types other than record batches not yet supported, unable to read {:?}", + t + ))), } } diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 13300033b66..77ced6a5e97 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -58,76 +58,66 @@ pub fn read( ) .map(|x| x.boxed()) }), - Binary => { - let array = read_binary::( - field_nodes, - data_type, - buffers, - reader, - block_offset, - is_little_endian, - compression, - limit, - scratch, - )?; - Ok(Box::new(array)) - } - LargeBinary => { - let array = read_binary::( - field_nodes, - data_type, - buffers, - reader, - block_offset, - is_little_endian, - compression, - limit, - scratch, - )?; - Ok(Box::new(array)) - } - FixedSizeBinary => { - let array = read_fixed_size_binary( - field_nodes, - data_type, - buffers, - reader, - block_offset, - is_little_endian, - compression, - limit, - scratch, - )?; - Ok(Box::new(array)) - } - Utf8 => { - let array = read_utf8::( - field_nodes, - data_type, - buffers, - reader, - block_offset, - is_little_endian, - compression, - limit, - scratch, - )?; - Ok(Box::new(array)) - } - LargeUtf8 => { - let array = read_utf8::( - field_nodes, - data_type, - buffers, - reader, - block_offset, - is_little_endian, - compression, - limit, - scratch, - )?; - Ok(Box::new(array)) - } + Binary => read_binary::( + field_nodes, + data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + limit, + scratch, + ) + .map(|x| x.boxed()), + LargeBinary => read_binary::( + field_nodes, + data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + limit, + scratch, + ) + .map(|x| x.boxed()), + FixedSizeBinary => read_fixed_size_binary( + field_nodes, + data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + limit, + scratch, + ) + .map(|x| x.boxed()), + Utf8 => read_utf8::( + field_nodes, + data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + limit, + scratch, + ) + .map(|x| x.boxed()), + LargeUtf8 => read_utf8::( + field_nodes, + data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + limit, + scratch, + ) + .map(|x| x.boxed()), List => read_list::( field_nodes, data_type, diff --git a/src/io/ipc/read/schema.rs b/src/io/ipc/read/schema.rs index 7f970786aa1..b1d97f0d770 100644 --- a/src/io/ipc/read/schema.rs +++ b/src/io/ipc/read/schema.rs @@ -1,4 +1,6 @@ -use arrow_format::ipc::planus::ReadAsRoot; +use arrow_format::ipc::{ + planus::ReadAsRoot, FieldRef, FixedSizeListRef, MapRef, TimeRef, TimestampRef, UnionRef, +}; use crate::{ datatypes::{ @@ -84,6 +86,156 @@ fn deserialize_timeunit(time_unit: arrow_format::ipc::TimeUnit) -> Result Result<(DataType, IpcField)> { + let unit = deserialize_timeunit(time.unit()?)?; + + let data_type = match (time.bit_width()?, unit) { + (32, TimeUnit::Second) => DataType::Time32(TimeUnit::Second), + (32, TimeUnit::Millisecond) => DataType::Time32(TimeUnit::Millisecond), + (64, TimeUnit::Microsecond) => DataType::Time64(TimeUnit::Microsecond), + (64, TimeUnit::Nanosecond) => DataType::Time64(TimeUnit::Nanosecond), + (bits, precision) => { + return Err(Error::nyi(format!( + "Time type with bit width of {} and unit of {:?}", + bits, precision + ))) + } + }; + Ok((data_type, IpcField::default())) +} + +fn deserialize_timestamp(timestamp: TimestampRef) -> Result<(DataType, IpcField)> { + let timezone = timestamp.timezone()?.map(|tz| tz.to_string()); + let time_unit = deserialize_timeunit(timestamp.unit()?)?; + Ok(( + DataType::Timestamp(time_unit, timezone), + IpcField::default(), + )) +} + +fn deserialize_union(union_: UnionRef, field: FieldRef) -> Result<(DataType, IpcField)> { + let mode = UnionMode::sparse(union_.mode()? == arrow_format::ipc::UnionMode::Sparse); + let ids = union_.type_ids()?.map(|x| x.iter().collect()); + + let fields = field + .children()? + .ok_or_else(|| Error::oos("IPC: Union must contain children"))?; + if fields.is_empty() { + return Err(Error::oos("IPC: Union must contain at least one child")); + } + + let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { + let (field, fields) = deserialize_field(field?)?; + Ok((field, fields)) + }))?; + let ipc_field = IpcField { + fields: ipc_fields, + dictionary_id: None, + }; + Ok((DataType::Union(fields, ids, mode), ipc_field)) +} + +fn deserialize_map(map: MapRef, field: FieldRef) -> Result<(DataType, IpcField)> { + let is_sorted = map.keys_sorted()?; + + let children = field + .children()? + .ok_or_else(|| Error::oos("IPC: Map must contain children"))?; + let inner = children + .get(0) + .ok_or_else(|| Error::oos("IPC: Map must contain one child"))??; + let (field, ipc_field) = deserialize_field(inner)?; + + let data_type = DataType::Map(Box::new(field), is_sorted); + Ok(( + data_type, + IpcField { + fields: vec![ipc_field], + dictionary_id: None, + }, + )) +} + +fn deserialize_struct(field: FieldRef) -> Result<(DataType, IpcField)> { + let fields = field + .children()? + .ok_or_else(|| Error::oos("IPC: Struct must contain children"))?; + if fields.is_empty() { + return Err(Error::oos("IPC: Struct must contain at least one child")); + } + let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { + let (field, fields) = deserialize_field(field?)?; + Ok((field, fields)) + }))?; + let ipc_field = IpcField { + fields: ipc_fields, + dictionary_id: None, + }; + Ok((DataType::Struct(fields), ipc_field)) +} + +fn deserialize_list(field: FieldRef) -> Result<(DataType, IpcField)> { + let children = field + .children()? + .ok_or_else(|| Error::oos("IPC: List must contain children"))?; + let inner = children + .get(0) + .ok_or_else(|| Error::oos("IPC: List must contain one child"))??; + let (field, ipc_field) = deserialize_field(inner)?; + + Ok(( + DataType::List(Box::new(field)), + IpcField { + fields: vec![ipc_field], + dictionary_id: None, + }, + )) +} + +fn deserialize_large_list(field: FieldRef) -> Result<(DataType, IpcField)> { + let children = field + .children()? + .ok_or_else(|| Error::oos("IPC: List must contain children"))?; + let inner = children + .get(0) + .ok_or_else(|| Error::oos("IPC: List must contain one child"))??; + let (field, ipc_field) = deserialize_field(inner)?; + + Ok(( + DataType::LargeList(Box::new(field)), + IpcField { + fields: vec![ipc_field], + dictionary_id: None, + }, + )) +} + +fn deserialize_fixed_size_list( + list: FixedSizeListRef, + field: FieldRef, +) -> Result<(DataType, IpcField)> { + let children = field + .children()? + .ok_or_else(|| Error::oos("IPC: FixedSizeList must contain children"))?; + let inner = children + .get(0) + .ok_or_else(|| Error::oos("IPC: FixedSizeList must contain one child"))??; + let (field, ipc_field) = deserialize_field(inner)?; + + let size = list + .list_size()? + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + Ok(( + DataType::FixedSizeList(Box::new(field), size), + IpcField { + fields: vec![ipc_field], + dictionary_id: None, + }, + )) +} + /// Get the Arrow data type from the flatbuffer Field table fn get_data_type( field: arrow_format::ipc::FieldRef, @@ -154,31 +306,8 @@ fn get_data_type( }; (data_type, IpcField::default()) } - Time(time) => { - let unit = deserialize_timeunit(time.unit()?)?; - - let data_type = match (time.bit_width()?, unit) { - (32, TimeUnit::Second) => DataType::Time32(TimeUnit::Second), - (32, TimeUnit::Millisecond) => DataType::Time32(TimeUnit::Millisecond), - (64, TimeUnit::Microsecond) => DataType::Time64(TimeUnit::Microsecond), - (64, TimeUnit::Nanosecond) => DataType::Time64(TimeUnit::Nanosecond), - (bits, precision) => { - return Err(Error::nyi(format!( - "Time type with bit width of {} and unit of {:?}", - bits, precision - ))) - } - }; - (data_type, IpcField::default()) - } - Timestamp(timestamp) => { - let timezone = timestamp.timezone()?.map(|tz| tz.to_string()); - let time_unit = deserialize_timeunit(timestamp.unit()?)?; - ( - DataType::Timestamp(time_unit, timezone), - IpcField::default(), - ) - } + Time(time) => deserialize_time(time)?, + Timestamp(timestamp) => deserialize_timestamp(timestamp)?, Interval(interval) => { let data_type = match interval.unit()? { arrow_format::ipc::IntervalUnit::YearMonth => { @@ -210,120 +339,12 @@ fn get_data_type( ); (data_type, IpcField::default()) } - List(_) => { - let children = field - .children()? - .ok_or_else(|| Error::oos("IPC: List must contain children"))?; - let inner = children - .get(0) - .ok_or_else(|| Error::oos("IPC: List must contain one child"))??; - let (field, ipc_field) = deserialize_field(inner)?; - - ( - DataType::List(Box::new(field)), - IpcField { - fields: vec![ipc_field], - dictionary_id: None, - }, - ) - } - LargeList(_) => { - let children = field - .children()? - .ok_or_else(|| Error::oos("IPC: List must contain children"))?; - let inner = children - .get(0) - .ok_or_else(|| Error::oos("IPC: List must contain one child"))??; - let (field, ipc_field) = deserialize_field(inner)?; - - ( - DataType::LargeList(Box::new(field)), - IpcField { - fields: vec![ipc_field], - dictionary_id: None, - }, - ) - } - FixedSizeList(list) => { - let children = field - .children()? - .ok_or_else(|| Error::oos("IPC: FixedSizeList must contain children"))?; - let inner = children - .get(0) - .ok_or_else(|| Error::oos("IPC: FixedSizeList must contain one child"))??; - let (field, ipc_field) = deserialize_field(inner)?; - - let size = list - .list_size()? - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - ( - DataType::FixedSizeList(Box::new(field), size), - IpcField { - fields: vec![ipc_field], - dictionary_id: None, - }, - ) - } - Struct(_) => { - let fields = field - .children()? - .ok_or_else(|| Error::oos("IPC: Struct must contain children"))?; - if fields.is_empty() { - return Err(Error::oos("IPC: Struct must contain at least one child")); - } - let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { - let (field, fields) = deserialize_field(field?)?; - Ok((field, fields)) - }))?; - let ipc_field = IpcField { - fields: ipc_fields, - dictionary_id: None, - }; - (DataType::Struct(fields), ipc_field) - } - Union(union_) => { - let mode = UnionMode::sparse(union_.mode()? == arrow_format::ipc::UnionMode::Sparse); - let ids = union_.type_ids()?.map(|x| x.iter().collect()); - - let fields = field - .children()? - .ok_or_else(|| Error::oos("IPC: Union must contain children"))?; - if fields.is_empty() { - return Err(Error::oos("IPC: Union must contain at least one child")); - } - - let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { - let (field, fields) = deserialize_field(field?)?; - Ok((field, fields)) - }))?; - let ipc_field = IpcField { - fields: ipc_fields, - dictionary_id: None, - }; - (DataType::Union(fields, ids, mode), ipc_field) - } - Map(map) => { - let is_sorted = map.keys_sorted()?; - - let children = field - .children()? - .ok_or_else(|| Error::oos("IPC: Map must contain children"))?; - let inner = children - .get(0) - .ok_or_else(|| Error::oos("IPC: Map must contain one child"))??; - let (field, ipc_field) = deserialize_field(inner)?; - - let data_type = DataType::Map(Box::new(field), is_sorted); - ( - data_type, - IpcField { - fields: vec![ipc_field], - dictionary_id: None, - }, - ) - } + List(_) => deserialize_list(field)?, + LargeList(_) => deserialize_large_list(field)?, + FixedSizeList(list) => deserialize_fixed_size_list(list, field)?, + Struct(_) => deserialize_struct(field)?, + Union(union_) => deserialize_union(union_, field)?, + Map(map) => deserialize_map(map, field)?, }) } diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs index 597ecbc9fbe..ca6ce7354c3 100644 --- a/src/io/orc/read/mod.rs +++ b/src/io/orc/read/mod.rs @@ -354,6 +354,6 @@ pub fn deserialize(data_type: DataType, column: &Column) -> Result deserialize_utf8::(data_type, column).map(|x| x.boxed()), DataType::Binary => deserialize_binary::(data_type, column).map(|x| x.boxed()), DataType::LargeBinary => deserialize_binary::(data_type, column).map(|x| x.boxed()), - dt => return Err(Error::nyi(format!("Deserializing {dt:?} from ORC"))), + dt => Err(Error::nyi(format!("Deserializing {dt:?} from ORC"))), } }