Skip to content

Commit

Permalink
Add RowConverter::append (#4479) (#4541)
Browse files Browse the repository at this point in the history
* Add RowConverter::append (#4479)

* Add overwrite test
  • Loading branch information
tustvold authored Jul 18, 2023
1 parent b71c0d9 commit 276d8c5
Showing 1 changed file with 107 additions and 40 deletions.
147 changes: 107 additions & 40 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,52 @@ impl RowConverter {
///
/// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
let mut rows = self.empty_rows(num_rows, 0);
self.append(&mut rows, columns)?;
Ok(rows)
}

/// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
///
/// See [`Row`] for information on when [`Row`] can be compared
///
/// # Panics
///
/// Panics if
/// * The schema of `columns` does not match that provided to [`RowConverter::new`]
/// * The provided [`Rows`] were not created by this [`RowConverter`]
///
/// ```
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{Row, RowConverter, SortField};
/// # use arrow_schema::DataType;
/// #
/// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let a1 = StringArray::from(vec!["hello", "world"]);
/// let a2 = StringArray::from(vec!["a", "a", "hello"]);
///
/// let mut rows = converter.empty_rows(5, 128);
/// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
/// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
///
/// let back = converter.convert_rows(&rows).unwrap();
/// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
/// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
/// ```
pub fn append(
&mut self,
rows: &mut Rows,
columns: &[ArrayRef],
) -> Result<(), ArrowError> {
assert!(
Arc::ptr_eq(&rows.config.fields, &self.fields),
"rows were not produced by this RowConverter"
);

if columns.len() != self.fields.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Incorrect number of arrays provided to RowConverter, expected {} got {}",
Expand All @@ -704,20 +750,43 @@ impl RowConverter {
})
.collect::<Result<Vec<_>, _>>()?;

let config = RowConfig {
fields: Arc::clone(&self.fields),
// Don't need to validate UTF-8 as came from arrow array
validate_utf8: false,
};
let mut rows = new_empty_rows(columns, &encoders, config);
let write_offset = rows.num_rows();
let lengths = row_lengths(columns, &encoders);

// We initialize the offsets shifted down by one row index.
//
// As the rows are appended to the offsets will be incremented to match
//
// For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
// The offsets would be initialized to `0, 0, 3, 7`
//
// Writing the first row entirely would yield `0, 3, 3, 7`
// The second, `0, 3, 7, 7`
// The third, `0, 3, 7, 13`
//
// This would be the final offsets for reading
//
// In this way offsets tracks the position during writing whilst eventually serving
// as identifying the offsets of the written rows
rows.offsets.reserve(lengths.len());
let mut cur_offset = rows.offsets[write_offset];
for l in lengths {
rows.offsets.push(cur_offset);
cur_offset = cur_offset.checked_add(l).expect("overflow");
}

// Note this will not zero out any trailing data in `rows.buffer`,
// e.g. resulting from a call to `Rows::clear`, relying instead on the
// encoders not assuming a zero-initialized buffer
rows.buffer.resize(cur_offset, 0);

for ((column, field), encoder) in
columns.iter().zip(self.fields.iter()).zip(encoders)
{
// We encode a column at a time to minimise dispatch overheads
encode_column(
&mut rows.buffer,
&mut rows.offsets,
&mut rows.offsets[write_offset..],
column.as_ref(),
field.options,
&encoder,
Expand All @@ -731,7 +800,7 @@ impl RowConverter {
.for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
}

Ok(rows)
Ok(())
}

/// Convert [`Rows`] columns into [`ArrayRef`]
Expand Down Expand Up @@ -899,6 +968,7 @@ impl Rows {
self.offsets.push(self.buffer.len())
}

/// Returns the row at index `row`
pub fn row(&self, row: usize) -> Row<'_> {
let end = self.offsets[row + 1];
let start = self.offsets[row];
Expand All @@ -908,10 +978,17 @@ impl Rows {
}
}

/// Sets the length of this [`Rows`] to 0
pub fn clear(&mut self) {
self.offsets.truncate(1);
}

/// Returns the number of [`Row`] in this [`Rows`]
pub fn num_rows(&self) -> usize {
self.offsets.len() - 1
}

/// Returns an iterator over the [`Row`] in this [`Rows`]
pub fn iter(&self) -> RowsIter<'_> {
self.into_iter()
}
Expand Down Expand Up @@ -1116,7 +1193,7 @@ fn null_sentinel(options: SortOptions) -> u8 {
}

/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) -> Rows {
fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
use fixed::FixedLengthEncoding;

let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
Expand Down Expand Up @@ -1203,37 +1280,7 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
}
}

let mut offsets = Vec::with_capacity(num_rows + 1);
offsets.push(0);

// We initialize the offsets shifted down by one row index.
//
// As the rows are appended to the offsets will be incremented to match
//
// For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
// The offsets would be initialized to `0, 0, 3, 7`
//
// Writing the first row entirely would yield `0, 3, 3, 7`
// The second, `0, 3, 7, 7`
// The third, `0, 3, 7, 13`
//
// This would be the final offsets for reading
//
// In this way offsets tracks the position during writing whilst eventually serving
// as identifying the offsets of the written rows
let mut cur_offset = 0_usize;
for l in lengths {
offsets.push(cur_offset);
cur_offset = cur_offset.checked_add(l).expect("overflow");
}

let buffer = vec![0_u8; cur_offset];

Rows {
buffer,
offsets,
config,
}
lengths
}

/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
Expand Down Expand Up @@ -2375,4 +2422,24 @@ mod tests {
}
}
}

#[test]
fn test_clear() {
let mut converter =
RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
let mut rows = converter.empty_rows(3, 128);

let arrays = [
Int32Array::from(vec![None, Some(2), Some(4)]),
Int32Array::from(vec![Some(2), None, Some(4)]),
];

for array in arrays {
rows.clear();
let array = Arc::new(array) as ArrayRef;
converter.append(&mut rows, &[array.clone()]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
assert_eq!(&back[0], &array);
}
}
}

0 comments on commit 276d8c5

Please sign in to comment.