From 852332975a2242a5120b1508071e900dafdd574c Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 18 May 2022 06:03:11 +0000 Subject: [PATCH] Fmt --- src/array/binary/mutable.rs | 3 +-- src/array/struct_/mod.rs | 4 +--- src/compute/cast/binary_to.rs | 3 +-- src/compute/cast/utf8_to.rs | 3 +-- src/io/avro/read/header.rs | 3 +-- src/io/avro/write/header.rs | 3 +-- src/io/csv/read/reader.rs | 6 +++--- src/io/csv/read_async/reader.rs | 7 +++--- src/io/flight/mod.rs | 6 ++---- src/io/ipc/read/array/binary.rs | 4 +--- src/io/ipc/read/array/boolean.rs | 4 +--- src/io/ipc/read/array/primitive.rs | 4 +--- src/io/ipc/read/array/struct_.rs | 4 +--- src/io/ipc/read/array/union.rs | 4 +--- src/io/ipc/read/common.rs | 4 +--- src/io/ipc/read/file_async.rs | 5 ++--- src/io/ipc/read/read_basic.rs | 6 +++--- src/io/ipc/read/reader.rs | 11 +++++----- src/io/ipc/read/schema.rs | 19 +++++------------ src/io/ipc/read/stream.rs | 5 ++--- src/io/ipc/read/stream_async.rs | 6 ++---- src/io/json_integration/read/array.rs | 6 +++--- src/io/json_integration/read/schema.rs | 26 ++++++----------------- src/io/ndjson/read/file.rs | 6 +++--- src/io/parquet/read/deserialize/simple.rs | 8 ++----- tests/it/io/ndjson/read.rs | 5 +---- 26 files changed, 55 insertions(+), 110 deletions(-) diff --git a/src/array/binary/mutable.rs b/src/array/binary/mutable.rs index a12dbaff21d..df12e6d9d6c 100644 --- a/src/array/binary/mutable.rs +++ b/src/array/binary/mutable.rs @@ -426,8 +426,7 @@ impl> TryPush> for MutableBinaryArray { Some(value) => { let bytes = value.as_ref(); - let size = - O::from_usize(self.values.len() + bytes.len()).ok_or(Error::Overflow)?; + let size = O::from_usize(self.values.len() + bytes.len()).ok_or(Error::Overflow)?; self.values.extend_from_slice(bytes); diff --git a/src/array/struct_/mod.rs b/src/array/struct_/mod.rs index b0155ddc5ed..43d5d834524 100644 --- a/src/array/struct_/mod.rs +++ b/src/array/struct_/mod.rs @@ -53,9 +53,7 @@ impl StructArray { ) -> Result { let fields = Self::try_get_fields(&data_type)?; if fields.is_empty() { - return Err(Error::oos( - "A StructArray must contain at least one field", - )); + return Err(Error::oos("A StructArray must contain at least one field")); } if fields.len() != values.len() { return Err(Error::oos( diff --git a/src/compute/cast/binary_to.rs b/src/compute/cast/binary_to.rs index b7200237b98..aee7c8fcabb 100644 --- a/src/compute/cast/binary_to.rs +++ b/src/compute/cast/binary_to.rs @@ -24,8 +24,7 @@ pub fn binary_large_to_binary( to_data_type: DataType, ) -> Result> { let values = from.values().clone(); - let _ = - i32::try_from(*from.offsets().last().unwrap()).map_err(Error::from_external_error)?; + let _ = i32::try_from(*from.offsets().last().unwrap()).map_err(Error::from_external_error)?; let offsets = from.offsets().iter().map(|x| *x as i32).collect::>(); // todo: use `new_unchecked` since all invariants are preserved diff --git a/src/compute/cast/utf8_to.rs b/src/compute/cast/utf8_to.rs index 4551e664bcb..d6a235500e0 100644 --- a/src/compute/cast/utf8_to.rs +++ b/src/compute/cast/utf8_to.rs @@ -164,8 +164,7 @@ pub fn utf8_large_to_utf8(from: &Utf8Array) -> Result> { let data_type = Utf8Array::::default_data_type(); let validity = from.validity().cloned(); let values = from.values().clone(); - let _ = - i32::try_from(*from.offsets().last().unwrap()).map_err(Error::from_external_error)?; + let _ = i32::try_from(*from.offsets().last().unwrap()).map_err(Error::from_external_error)?; let offsets = from .offsets() diff --git a/src/io/avro/read/header.rs b/src/io/avro/read/header.rs index ff31e652e91..66bec96cc05 100644 --- a/src/io/avro/read/header.rs +++ b/src/io/avro/read/header.rs @@ -15,8 +15,7 @@ pub(crate) fn deserialize_header( .get("avro.schema") .ok_or_else(|| Error::ExternalFormat("Avro schema must be present".to_string())) .and_then(|bytes| { - serde_json::from_slice(bytes.as_ref()) - .map_err(|e| Error::ExternalFormat(e.to_string())) + serde_json::from_slice(bytes.as_ref()).map_err(|e| Error::ExternalFormat(e.to_string())) })?; let compression = header.get("avro.codec").and_then(|bytes| { diff --git a/src/io/avro/write/header.rs b/src/io/avro/write/header.rs index 758dd9d1931..1d25f2f596a 100644 --- a/src/io/avro/write/header.rs +++ b/src/io/avro/write/header.rs @@ -12,8 +12,7 @@ pub(crate) fn serialize_header( schema: &Schema, compression: Option, ) -> Result>> { - let schema = - serde_json::to_string(schema).map_err(|e| Error::ExternalFormat(e.to_string()))?; + let schema = serde_json::to_string(schema).map_err(|e| Error::ExternalFormat(e.to_string()))?; let mut header = HashMap::>::default(); diff --git a/src/io/csv/read/reader.rs b/src/io/csv/read/reader.rs index fa2976df5e5..9fef6301cb0 100644 --- a/src/io/csv/read/reader.rs +++ b/src/io/csv/read/reader.rs @@ -23,9 +23,9 @@ pub fn read_rows( let mut row_number = 0; for row in rows.iter_mut() { - let has_more = reader.read_byte_record(row).map_err(|e| { - Error::External(format!(" at line {}", skip + row_number), Box::new(e)) - })?; + let has_more = reader + .read_byte_record(row) + .map_err(|e| Error::External(format!(" at line {}", skip + row_number), Box::new(e)))?; if !has_more { break; } diff --git a/src/io/csv/read_async/reader.rs b/src/io/csv/read_async/reader.rs index 5e5a4f6f75f..661f5be1d3f 100644 --- a/src/io/csv/read_async/reader.rs +++ b/src/io/csv/read_async/reader.rs @@ -26,9 +26,10 @@ where let mut row_number = 0; for row in rows.iter_mut() { - let has_more = reader.read_byte_record(row).await.map_err(|e| { - Error::External(format!(" at line {}", skip + row_number), Box::new(e)) - })?; + let has_more = reader + .read_byte_record(row) + .await + .map_err(|e| Error::External(format!(" at line {}", skip + row_number), Box::new(e)))?; if !has_more { break; } diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 5f21276b26c..4cb19390100 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -116,10 +116,8 @@ pub fn deserialize_batch( dictionaries: &read::Dictionaries, ) -> Result>> { // check that the data_header is a record batch message - let message = - arrow_format::ipc::MessageRef::read_as_root(&data.data_header).map_err(|err| { - Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)) - })?; + let message = arrow_format::ipc::MessageRef::read_as_root(&data.data_header) + .map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?; let mut reader = std::io::Cursor::new(&data.data_body); diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index 8c60c0772a9..f963f3b697a 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -63,9 +63,7 @@ pub fn skip_binary( buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { - Error::oos( - "IPC: unable to fetch the field for binary. The file or stream is corrupted.", - ) + Error::oos("IPC: unable to fetch the field for binary. The file or stream is corrupted.") })?; let _ = buffers diff --git a/src/io/ipc/read/array/boolean.rs b/src/io/ipc/read/array/boolean.rs index fe44cdd2a10..ecdf240751c 100644 --- a/src/io/ipc/read/array/boolean.rs +++ b/src/io/ipc/read/array/boolean.rs @@ -50,9 +50,7 @@ pub fn skip_boolean( buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { - Error::oos( - "IPC: unable to fetch the field for boolean. The file or stream is corrupted.", - ) + Error::oos("IPC: unable to fetch the field for boolean. The file or stream is corrupted.") })?; let _ = buffers diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index 31e0ab9e77a..5477801d610 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -52,9 +52,7 @@ pub fn skip_primitive( buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { - Error::oos( - "IPC: unable to fetch the field for primitive. The file or stream is corrupted.", - ) + Error::oos("IPC: unable to fetch the field for primitive. The file or stream is corrupted.") })?; let _ = buffers diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index 4f7393ccd67..c87440a9782 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -70,9 +70,7 @@ pub fn skip_struct( buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { - Error::oos( - "IPC: unable to fetch the field for struct. The file or stream is corrupted.", - ) + Error::oos("IPC: unable to fetch the field for struct. The file or stream is corrupted.") })?; let _ = buffers diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index 35ddf630f23..49dde87c44c 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -94,9 +94,7 @@ pub fn skip_union( buffers: &mut VecDeque, ) -> Result<()> { let _ = field_nodes.pop_front().ok_or_else(|| { - Error::oos( - "IPC: unable to fetch the field for struct. The file or stream is corrupted.", - ) + Error::oos("IPC: unable to fetch the field for struct. The file or stream is corrupted.") })?; let _ = buffers diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index a9ccc1052cb..64517ff813b 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -246,9 +246,7 @@ pub fn read_dictionary( } _ => None, } - .ok_or_else(|| { - Error::InvalidArgumentError("dictionary id not found in schema".to_string()) - })?; + .ok_or_else(|| Error::InvalidArgumentError("dictionary id not found in schema".to_string()))?; dictionaries.insert(id, dictionary_values); diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index d4f14e3fa64..01e7d93ffa5 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -210,9 +210,8 @@ where let length = block.body_length as usize; read_dictionary_message(&mut reader, offset, &mut data).await?; - let message = MessageRef::read_as_root(&data).map_err(|err| { - Error::OutOfSpec(format!("unable to get root as message: {:?}", err)) - })?; + let message = MessageRef::read_as_root(&data) + .map_err(|err| Error::OutOfSpec(format!("unable to get root as message: {:?}", err)))?; let header = message .header()? .ok_or_else(|| Error::oos("message must have a header"))?; diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index 27124be5699..5f449917add 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -230,9 +230,9 @@ pub fn read_validity( compression, )?) } else { - let _ = buffers.pop_front().ok_or_else(|| { - Error::oos("IPC: unable to fetch a buffer. The file is corrupted.") - })?; + let _ = buffers + .pop_front() + .ok_or_else(|| Error::oos("IPC: unable to fetch a buffer. The file is corrupted."))?; None }) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 5c78611313f..e0be7880efd 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -79,9 +79,8 @@ pub(crate) fn read_dictionaries( let length = block.meta_data_length as u64; read_dictionary_message(reader, offset, &mut data)?; - let message = arrow_format::ipc::MessageRef::read_as_root(&data).map_err(|err| { - Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)) - })?; + let message = arrow_format::ipc::MessageRef::read_as_root(&data) + .map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?; let header = message .header()? @@ -133,9 +132,9 @@ pub(super) fn deserialize_footer(footer_data: &[u8]) -> Result { let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data) .map_err(|err| Error::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?; - let blocks = footer.record_batches()?.ok_or_else(|| { - Error::OutOfSpec("Unable to get record batches from footer".to_string()) - })?; + let blocks = footer + .record_batches()? + .ok_or_else(|| Error::OutOfSpec("Unable to get record batches from footer".to_string()))?; let blocks = blocks .iter() diff --git a/src/io/ipc/read/schema.rs b/src/io/ipc/read/schema.rs index 470b9c37ab3..b5925d00d2e 100644 --- a/src/io/ipc/read/schema.rs +++ b/src/io/ipc/read/schema.rs @@ -70,11 +70,7 @@ fn deserialize_integer(int: arrow_format::ipc::IntRef) -> Result { (32, false) => IntegerType::UInt32, (64, true) => IntegerType::Int64, (64, false) => IntegerType::UInt64, - _ => { - return Err(Error::oos( - "IPC: indexType can only be 8, 16, 32 or 64.", - )) - } + _ => return Err(Error::oos("IPC: indexType can only be 8, 16, 32 or 64.")), }) } @@ -259,9 +255,7 @@ fn get_data_type( .children()? .ok_or_else(|| Error::oos("IPC: Struct must contain children"))?; if fields.is_empty() { - return Err(Error::oos( - "IPC: Struct must contain at least one child", - )); + return Err(Error::oos("IPC: Struct must contain at least one child")); } let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { let (field, fields) = deserialize_field(field?)?; @@ -281,9 +275,7 @@ fn get_data_type( .children()? .ok_or_else(|| Error::oos("IPC: Union must contain children"))?; if fields.is_empty() { - return Err(Error::oos( - "IPC: Union must contain at least one child", - )); + return Err(Error::oos("IPC: Union must contain at least one child")); } let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { @@ -375,9 +367,8 @@ pub(super) fn fb_to_schema(schema: arrow_format::ipc::SchemaRef) -> Result<(Sche } pub(super) fn deserialize_stream_metadata(meta: &[u8]) -> Result { - let message = arrow_format::ipc::MessageRef::read_as_root(meta).map_err(|err| { - Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)) - })?; + let message = arrow_format::ipc::MessageRef::read_as_root(meta) + .map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?; let version = message.version()?; // message header is a Schema, so read it let header = message diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index b8e735a39c0..baa701cb8ff 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -123,9 +123,8 @@ fn read_next( message_buffer.resize(meta_length, 0); reader.read_exact(message_buffer)?; - let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer).map_err(|err| { - Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)) - })?; + let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer) + .map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?; let header = message.header()?.ok_or_else(|| { Error::oos("IPC: unable to fetch the message header. The file or stream is corrupted.") })?; diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 942fdf7c4af..5f67ed6338c 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -98,10 +98,8 @@ async fn maybe_next( state.message_buffer.resize(meta_length, 0); state.reader.read_exact(&mut state.message_buffer).await?; - let message = - arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer).map_err(|err| { - Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)) - })?; + let message = arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer) + .map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?; let header = message.header()?.ok_or_else(|| { Error::oos("IPC: unable to fetch the message header. The file or stream is corrupted.") })?; diff --git a/src/io/json_integration/read/array.rs b/src/io/json_integration/read/array.rs index 9a4f93df285..6210a02ca91 100644 --- a/src/io/json_integration/read/array.rs +++ b/src/io/json_integration/read/array.rs @@ -238,9 +238,9 @@ fn to_dictionary( ) -> Result> { // find dictionary let dict_id = field.dictionary_id.unwrap(); - let dictionary = dictionaries.get(&dict_id).ok_or_else(|| { - Error::OutOfSpec(format!("Unable to find any dictionary id {}", dict_id)) - })?; + let dictionary = dictionaries + .get(&dict_id) + .ok_or_else(|| Error::OutOfSpec(format!("Unable to find any dictionary id {}", dict_id)))?; let keys = to_primitive(json_col, K::PRIMITIVE.into()); diff --git a/src/io/json_integration/read/schema.rs b/src/io/json_integration/read/schema.rs index 6449d6351c2..aaf6912af05 100644 --- a/src/io/json_integration/read/schema.rs +++ b/src/io/json_integration/read/schema.rs @@ -17,9 +17,7 @@ fn to_time_unit(item: Option<&Value>) -> Result { Some(p) if p == "MILLISECOND" => Ok(TimeUnit::Millisecond), Some(p) if p == "MICROSECOND" => Ok(TimeUnit::Microsecond), Some(p) if p == "NANOSECOND" => Ok(TimeUnit::Nanosecond), - _ => Err(Error::OutOfSpec( - "time unit missing or invalid".to_string(), - )), + _ => Err(Error::OutOfSpec("time unit missing or invalid".to_string())), } } @@ -78,9 +76,7 @@ fn deserialize_fields(children: Option<&Value>) -> Result> { .map(deserialize_field) .collect::>>() } else { - Err(Error::OutOfSpec( - "children must be an array".to_string(), - )) + Err(Error::OutOfSpec("children must be an array".to_string())) } }) .unwrap_or_else(|| Ok(vec![])) @@ -204,20 +200,14 @@ fn to_data_type(item: &Value, mut children: Vec) -> Result { let tz = match item.get("timezone") { None => Ok(None), Some(Value::String(tz)) => Ok(Some(tz.clone())), - _ => Err(Error::OutOfSpec( - "timezone must be a string".to_string(), - )), + _ => Err(Error::OutOfSpec("timezone must be a string".to_string())), }?; DataType::Timestamp(unit, tz) } "date" => match item.get("unit") { Some(p) if p == "DAY" => DataType::Date32, Some(p) if p == "MILLISECOND" => DataType::Date64, - _ => { - return Err(Error::OutOfSpec( - "date unit missing or invalid".to_string(), - )) - } + _ => return Err(Error::OutOfSpec("date unit missing or invalid".to_string())), }, "time" => { let unit = to_time_unit(item.get("unit"))?; @@ -309,9 +299,7 @@ fn deserialize_ipc_field(value: &Value) -> Result { .map(deserialize_ipc_field) .collect::>>() } else { - Err(Error::OutOfSpec( - "children must be an array".to_string(), - )) + Err(Error::OutOfSpec("children must be an array".to_string())) } }) .unwrap_or_else(|| Ok(vec![]))?; @@ -320,9 +308,7 @@ fn deserialize_ipc_field(value: &Value) -> Result { match dictionary.get("id") { Some(Value::Number(n)) => Some(n.as_i64().unwrap()), _ => { - return Err(Error::OutOfSpec( - "Field missing 'id' attribute".to_string(), - )); + return Err(Error::OutOfSpec("Field missing 'id' attribute".to_string())); } } } else { diff --git a/src/io/ndjson/read/file.rs b/src/io/ndjson/read/file.rs index bc8c67d5060..2b0174a02bf 100644 --- a/src/io/ndjson/read/file.rs +++ b/src/io/ndjson/read/file.rs @@ -21,9 +21,9 @@ fn read_rows(reader: &mut R, rows: &mut [String], limit: usize) -> R for row in rows.iter_mut() { loop { row.clear(); - let _ = reader.read_line(row).map_err(|e| { - Error::External(format!(" at line {}", row_number), Box::new(e)) - })?; + let _ = reader + .read_line(row) + .map_err(|e| Error::External(format!(" at line {}", row_number), Box::new(e)))?; if row.is_empty() { break; } diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 2d5afcb9cd7..da83b3b861f 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -244,9 +244,7 @@ fn timestamp<'a, I: 'a + DataPages>( int96_to_i64_ns, )))); } else { - return Err(Error::nyi( - "Can't decode int96 to timestamp other than ns", - )); + return Err(Error::nyi("Can't decode int96 to timestamp other than ns")); } }; if physical_type != &PhysicalType::Int64 { @@ -305,9 +303,7 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( int96_to_i64_ns, ))); } else { - return Err(Error::nyi( - "Can't decode int96 to timestamp other than ns", - )); + return Err(Error::nyi("Can't decode int96 to timestamp other than ns")); } }; diff --git a/tests/it/io/ndjson/read.rs b/tests/it/io/ndjson/read.rs index 4b7f7049815..6a6900fc140 100644 --- a/tests/it/io/ndjson/read.rs +++ b/tests/it/io/ndjson/read.rs @@ -79,10 +79,7 @@ fn read_empty_reader() -> Result<()> { assert!(matches!(infer_error, Err(Error::ExternalFormat(_)))); let deserialize_error = ndjson_read::deserialize(&[], DataType::Null); - assert!(matches!( - deserialize_error, - Err(Error::ExternalFormat(_)) - )); + assert!(matches!(deserialize_error, Err(Error::ExternalFormat(_)))); Ok(()) }