Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 26, 2022
1 parent 127640f commit 8523329
Show file tree
Hide file tree
Showing 26 changed files with 55 additions and 110 deletions.
3 changes: 1 addition & 2 deletions src/array/binary/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,7 @@ impl<O: Offset, T: AsRef<[u8]>> TryPush<Option<T>> for MutableBinaryArray<O> {
Some(value) => {
let bytes = value.as_ref();

let size =
O::from_usize(self.values.len() + bytes.len()).ok_or(Error::Overflow)?;
let size = O::from_usize(self.values.len() + bytes.len()).ok_or(Error::Overflow)?;

self.values.extend_from_slice(bytes);

Expand Down
4 changes: 1 addition & 3 deletions src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ impl StructArray {
) -> Result<Self, Error> {
let fields = Self::try_get_fields(&data_type)?;
if fields.is_empty() {
return Err(Error::oos(
"A StructArray must contain at least one field",
));
return Err(Error::oos("A StructArray must contain at least one field"));
}
if fields.len() != values.len() {
return Err(Error::oos(
Expand Down
3 changes: 1 addition & 2 deletions src/compute/cast/binary_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ pub fn binary_large_to_binary(
to_data_type: DataType,
) -> Result<BinaryArray<i32>> {
let values = from.values().clone();
let _ =
i32::try_from(*from.offsets().last().unwrap()).map_err(Error::from_external_error)?;
let _ = i32::try_from(*from.offsets().last().unwrap()).map_err(Error::from_external_error)?;

let offsets = from.offsets().iter().map(|x| *x as i32).collect::<Vec<_>>();
// todo: use `new_unchecked` since all invariants are preserved
Expand Down
3 changes: 1 addition & 2 deletions src/compute/cast/utf8_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ pub fn utf8_large_to_utf8(from: &Utf8Array<i64>) -> Result<Utf8Array<i32>> {
let data_type = Utf8Array::<i32>::default_data_type();
let validity = from.validity().cloned();
let values = from.values().clone();
let _ =
i32::try_from(*from.offsets().last().unwrap()).map_err(Error::from_external_error)?;
let _ = i32::try_from(*from.offsets().last().unwrap()).map_err(Error::from_external_error)?;

let offsets = from
.offsets()
Expand Down
3 changes: 1 addition & 2 deletions src/io/avro/read/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ pub(crate) fn deserialize_header(
.get("avro.schema")
.ok_or_else(|| Error::ExternalFormat("Avro schema must be present".to_string()))
.and_then(|bytes| {
serde_json::from_slice(bytes.as_ref())
.map_err(|e| Error::ExternalFormat(e.to_string()))
serde_json::from_slice(bytes.as_ref()).map_err(|e| Error::ExternalFormat(e.to_string()))
})?;

let compression = header.get("avro.codec").and_then(|bytes| {
Expand Down
3 changes: 1 addition & 2 deletions src/io/avro/write/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ pub(crate) fn serialize_header(
schema: &Schema,
compression: Option<Compression>,
) -> Result<HashMap<String, Vec<u8>>> {
let schema =
serde_json::to_string(schema).map_err(|e| Error::ExternalFormat(e.to_string()))?;
let schema = serde_json::to_string(schema).map_err(|e| Error::ExternalFormat(e.to_string()))?;

let mut header = HashMap::<String, Vec<u8>>::default();

Expand Down
6 changes: 3 additions & 3 deletions src/io/csv/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ pub fn read_rows<R: Read>(

let mut row_number = 0;
for row in rows.iter_mut() {
let has_more = reader.read_byte_record(row).map_err(|e| {
Error::External(format!(" at line {}", skip + row_number), Box::new(e))
})?;
let has_more = reader
.read_byte_record(row)
.map_err(|e| Error::External(format!(" at line {}", skip + row_number), Box::new(e)))?;
if !has_more {
break;
}
Expand Down
7 changes: 4 additions & 3 deletions src/io/csv/read_async/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ where

let mut row_number = 0;
for row in rows.iter_mut() {
let has_more = reader.read_byte_record(row).await.map_err(|e| {
Error::External(format!(" at line {}", skip + row_number), Box::new(e))
})?;
let has_more = reader
.read_byte_record(row)
.await
.map_err(|e| Error::External(format!(" at line {}", skip + row_number), Box::new(e)))?;
if !has_more {
break;
}
Expand Down
6 changes: 2 additions & 4 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,8 @@ pub fn deserialize_batch(
dictionaries: &read::Dictionaries,
) -> Result<Chunk<Arc<dyn Array>>> {
// check that the data_header is a record batch message
let message =
arrow_format::ipc::MessageRef::read_as_root(&data.data_header).map_err(|err| {
Error::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let message = arrow_format::ipc::MessageRef::read_as_root(&data.data_header)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;

let mut reader = std::io::Cursor::new(&data.data_body);

Expand Down
4 changes: 1 addition & 3 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ pub fn skip_binary(
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
Error::oos(
"IPC: unable to fetch the field for binary. The file or stream is corrupted.",
)
Error::oos("IPC: unable to fetch the field for binary. The file or stream is corrupted.")
})?;

let _ = buffers
Expand Down
4 changes: 1 addition & 3 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ pub fn skip_boolean(
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
Error::oos(
"IPC: unable to fetch the field for boolean. The file or stream is corrupted.",
)
Error::oos("IPC: unable to fetch the field for boolean. The file or stream is corrupted.")
})?;

let _ = buffers
Expand Down
4 changes: 1 addition & 3 deletions src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ pub fn skip_primitive(
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
Error::oos(
"IPC: unable to fetch the field for primitive. The file or stream is corrupted.",
)
Error::oos("IPC: unable to fetch the field for primitive. The file or stream is corrupted.")
})?;

let _ = buffers
Expand Down
4 changes: 1 addition & 3 deletions src/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ pub fn skip_struct(
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
Error::oos(
"IPC: unable to fetch the field for struct. The file or stream is corrupted.",
)
Error::oos("IPC: unable to fetch the field for struct. The file or stream is corrupted.")
})?;

let _ = buffers
Expand Down
4 changes: 1 addition & 3 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ pub fn skip_union(
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
Error::oos(
"IPC: unable to fetch the field for struct. The file or stream is corrupted.",
)
Error::oos("IPC: unable to fetch the field for struct. The file or stream is corrupted.")
})?;

let _ = buffers
Expand Down
4 changes: 1 addition & 3 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,7 @@ pub fn read_dictionary<R: Read + Seek>(
}
_ => None,
}
.ok_or_else(|| {
Error::InvalidArgumentError("dictionary id not found in schema".to_string())
})?;
.ok_or_else(|| Error::InvalidArgumentError("dictionary id not found in schema".to_string()))?;

dictionaries.insert(id, dictionary_values);

Expand Down
5 changes: 2 additions & 3 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,8 @@ where
let length = block.body_length as usize;
read_dictionary_message(&mut reader, offset, &mut data).await?;

let message = MessageRef::read_as_root(&data).map_err(|err| {
Error::OutOfSpec(format!("unable to get root as message: {:?}", err))
})?;
let message = MessageRef::read_as_root(&data)
.map_err(|err| Error::OutOfSpec(format!("unable to get root as message: {:?}", err)))?;
let header = message
.header()?
.ok_or_else(|| Error::oos("message must have a header"))?;
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/read/read_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ pub fn read_validity<R: Read + Seek>(
compression,
)?)
} else {
let _ = buffers.pop_front().ok_or_else(|| {
Error::oos("IPC: unable to fetch a buffer. The file is corrupted.")
})?;
let _ = buffers
.pop_front()
.ok_or_else(|| Error::oos("IPC: unable to fetch a buffer. The file is corrupted."))?;
None
})
}
11 changes: 5 additions & 6 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ pub(crate) fn read_dictionaries<R: Read + Seek>(
let length = block.meta_data_length as u64;
read_dictionary_message(reader, offset, &mut data)?;

let message = arrow_format::ipc::MessageRef::read_as_root(&data).map_err(|err| {
Error::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let message = arrow_format::ipc::MessageRef::read_as_root(&data)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;

let header = message
.header()?
Expand Down Expand Up @@ -133,9 +132,9 @@ pub(super) fn deserialize_footer(footer_data: &[u8]) -> Result<FileMetadata> {
let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?;

let blocks = footer.record_batches()?.ok_or_else(|| {
Error::OutOfSpec("Unable to get record batches from footer".to_string())
})?;
let blocks = footer
.record_batches()?
.ok_or_else(|| Error::OutOfSpec("Unable to get record batches from footer".to_string()))?;

let blocks = blocks
.iter()
Expand Down
19 changes: 5 additions & 14 deletions src/io/ipc/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ fn deserialize_integer(int: arrow_format::ipc::IntRef) -> Result<IntegerType> {
(32, false) => IntegerType::UInt32,
(64, true) => IntegerType::Int64,
(64, false) => IntegerType::UInt64,
_ => {
return Err(Error::oos(
"IPC: indexType can only be 8, 16, 32 or 64.",
))
}
_ => return Err(Error::oos("IPC: indexType can only be 8, 16, 32 or 64.")),
})
}

Expand Down Expand Up @@ -259,9 +255,7 @@ fn get_data_type(
.children()?
.ok_or_else(|| Error::oos("IPC: Struct must contain children"))?;
if fields.is_empty() {
return Err(Error::oos(
"IPC: Struct must contain at least one child",
));
return Err(Error::oos("IPC: Struct must contain at least one child"));
}
let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| {
let (field, fields) = deserialize_field(field?)?;
Expand All @@ -281,9 +275,7 @@ fn get_data_type(
.children()?
.ok_or_else(|| Error::oos("IPC: Union must contain children"))?;
if fields.is_empty() {
return Err(Error::oos(
"IPC: Union must contain at least one child",
));
return Err(Error::oos("IPC: Union must contain at least one child"));
}

let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| {
Expand Down Expand Up @@ -375,9 +367,8 @@ pub(super) fn fb_to_schema(schema: arrow_format::ipc::SchemaRef) -> Result<(Sche
}

pub(super) fn deserialize_stream_metadata(meta: &[u8]) -> Result<StreamMetadata> {
let message = arrow_format::ipc::MessageRef::read_as_root(meta).map_err(|err| {
Error::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let message = arrow_format::ipc::MessageRef::read_as_root(meta)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;
let version = message.version()?;
// message header is a Schema, so read it
let header = message
Expand Down
5 changes: 2 additions & 3 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,8 @@ fn read_next<R: Read>(
message_buffer.resize(meta_length, 0);
reader.read_exact(message_buffer)?;

let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer).map_err(|err| {
Error::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;
let header = message.header()?.ok_or_else(|| {
Error::oos("IPC: unable to fetch the message header. The file or stream is corrupted.")
})?;
Expand Down
6 changes: 2 additions & 4 deletions src/io/ipc/read/stream_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,8 @@ async fn maybe_next<R: AsyncRead + Unpin + Send>(
state.message_buffer.resize(meta_length, 0);
state.reader.read_exact(&mut state.message_buffer).await?;

let message =
arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer).map_err(|err| {
Error::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let message = arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;
let header = message.header()?.ok_or_else(|| {
Error::oos("IPC: unable to fetch the message header. The file or stream is corrupted.")
})?;
Expand Down
6 changes: 3 additions & 3 deletions src/io/json_integration/read/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ fn to_dictionary<K: DictionaryKey>(
) -> Result<Arc<dyn Array>> {
// find dictionary
let dict_id = field.dictionary_id.unwrap();
let dictionary = dictionaries.get(&dict_id).ok_or_else(|| {
Error::OutOfSpec(format!("Unable to find any dictionary id {}", dict_id))
})?;
let dictionary = dictionaries
.get(&dict_id)
.ok_or_else(|| Error::OutOfSpec(format!("Unable to find any dictionary id {}", dict_id)))?;

let keys = to_primitive(json_col, K::PRIMITIVE.into());

Expand Down
26 changes: 6 additions & 20 deletions src/io/json_integration/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ fn to_time_unit(item: Option<&Value>) -> Result<TimeUnit> {
Some(p) if p == "MILLISECOND" => Ok(TimeUnit::Millisecond),
Some(p) if p == "MICROSECOND" => Ok(TimeUnit::Microsecond),
Some(p) if p == "NANOSECOND" => Ok(TimeUnit::Nanosecond),
_ => Err(Error::OutOfSpec(
"time unit missing or invalid".to_string(),
)),
_ => Err(Error::OutOfSpec("time unit missing or invalid".to_string())),
}
}

Expand Down Expand Up @@ -78,9 +76,7 @@ fn deserialize_fields(children: Option<&Value>) -> Result<Vec<Field>> {
.map(deserialize_field)
.collect::<Result<Vec<_>>>()
} else {
Err(Error::OutOfSpec(
"children must be an array".to_string(),
))
Err(Error::OutOfSpec("children must be an array".to_string()))
}
})
.unwrap_or_else(|| Ok(vec![]))
Expand Down Expand Up @@ -204,20 +200,14 @@ fn to_data_type(item: &Value, mut children: Vec<Field>) -> Result<DataType> {
let tz = match item.get("timezone") {
None => Ok(None),
Some(Value::String(tz)) => Ok(Some(tz.clone())),
_ => Err(Error::OutOfSpec(
"timezone must be a string".to_string(),
)),
_ => Err(Error::OutOfSpec("timezone must be a string".to_string())),
}?;
DataType::Timestamp(unit, tz)
}
"date" => match item.get("unit") {
Some(p) if p == "DAY" => DataType::Date32,
Some(p) if p == "MILLISECOND" => DataType::Date64,
_ => {
return Err(Error::OutOfSpec(
"date unit missing or invalid".to_string(),
))
}
_ => return Err(Error::OutOfSpec("date unit missing or invalid".to_string())),
},
"time" => {
let unit = to_time_unit(item.get("unit"))?;
Expand Down Expand Up @@ -309,9 +299,7 @@ fn deserialize_ipc_field(value: &Value) -> Result<IpcField> {
.map(deserialize_ipc_field)
.collect::<Result<Vec<_>>>()
} else {
Err(Error::OutOfSpec(
"children must be an array".to_string(),
))
Err(Error::OutOfSpec("children must be an array".to_string()))
}
})
.unwrap_or_else(|| Ok(vec![]))?;
Expand All @@ -320,9 +308,7 @@ fn deserialize_ipc_field(value: &Value) -> Result<IpcField> {
match dictionary.get("id") {
Some(Value::Number(n)) => Some(n.as_i64().unwrap()),
_ => {
return Err(Error::OutOfSpec(
"Field missing 'id' attribute".to_string(),
));
return Err(Error::OutOfSpec("Field missing 'id' attribute".to_string()));
}
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/io/ndjson/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ fn read_rows<R: BufRead>(reader: &mut R, rows: &mut [String], limit: usize) -> R
for row in rows.iter_mut() {
loop {
row.clear();
let _ = reader.read_line(row).map_err(|e| {
Error::External(format!(" at line {}", row_number), Box::new(e))
})?;
let _ = reader
.read_line(row)
.map_err(|e| Error::External(format!(" at line {}", row_number), Box::new(e)))?;
if row.is_empty() {
break;
}
Expand Down
Loading

0 comments on commit 8523329

Please sign in to comment.