Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RecordBatch with zero columns but non zero row count, add field to RecordBatchOptions (#1536) #1552

Merged
merged 5 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 61 additions & 28 deletions arrow/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::error::{ArrowError, Result};
pub struct RecordBatch {
schema: SchemaRef,
columns: Vec<Arc<dyn Array>>,
row_count: usize,
alamb marked this conversation as resolved.
Show resolved Hide resolved
}

impl RecordBatch {
Expand Down Expand Up @@ -77,8 +78,7 @@ impl RecordBatch {
/// ```
pub fn try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self> {
let options = RecordBatchOptions::default();
Self::validate_new_batch(&schema, columns.as_slice(), &options)?;
Ok(RecordBatch { schema, columns })
Self::try_new_impl(schema, columns, &options)
}

/// Creates a `RecordBatch` from a schema and columns, with additional options,
Expand All @@ -90,8 +90,7 @@ impl RecordBatch {
columns: Vec<ArrayRef>,
options: &RecordBatchOptions,
) -> Result<Self> {
Self::validate_new_batch(&schema, columns.as_slice(), options)?;
Ok(RecordBatch { schema, columns })
Self::try_new_impl(schema, columns, options)
}

/// Creates a new empty [`RecordBatch`].
Expand All @@ -101,23 +100,21 @@ impl RecordBatch {
.iter()
.map(|field| new_empty_array(field.data_type()))
.collect();
RecordBatch { schema, columns }

RecordBatch {
schema,
columns,
row_count: 0,
}
}

/// Validate the schema and columns using [`RecordBatchOptions`]. Returns an error
/// if any validation check fails.
fn validate_new_batch(
schema: &SchemaRef,
columns: &[ArrayRef],
/// if any validation check fails, otherwise returns the created [`RecordBatch`]
fn try_new_impl(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure why we need a separate method for this, but I avoided changing it to keep the diff down

schema: SchemaRef,
columns: Vec<ArrayRef>,
options: &RecordBatchOptions,
) -> Result<()> {
// check that there are some columns
if columns.is_empty() {
return Err(ArrowError::InvalidArgumentError(
"at least one column must be defined to create a record batch"
.to_string(),
));
}
) -> Result<RecordBatch> {
// check that number of fields in schema match column length
if schema.fields().len() != columns.len() {
return Err(ArrowError::InvalidArgumentError(format!(
Expand All @@ -128,7 +125,13 @@ impl RecordBatch {
}

// check that all columns have the same row count
let row_count = columns[0].data().len();
let row_count = options
.row_count
.or(columns.first().map(|col| col.len()))
.ok_or(ArrowError::InvalidArgumentError(
"must either specify a row count or at least one column".to_string(),
))?;

if columns.iter().any(|c| c.len() != row_count) {
return Err(ArrowError::InvalidArgumentError(
"all columns in a record batch must have the same length".to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error will now also happen when the specified row count doesn't match the array counts (so the wording might be good to update?)

Expand Down Expand Up @@ -163,7 +166,11 @@ impl RecordBatch {
i)));
}

Ok(())
Ok(RecordBatch {
schema,
columns,
row_count,
})
}

/// Returns the [`Schema`](crate::datatypes::Schema) of the record batch.
Expand Down Expand Up @@ -218,10 +225,6 @@ impl RecordBatch {

/// Returns the number of rows in each column.
///
/// # Panics
///
/// Panics if the `RecordBatch` contains no columns.
///
/// # Example
///
/// ```
Expand All @@ -243,7 +246,7 @@ impl RecordBatch {
/// # }
/// ```
pub fn num_rows(&self) -> usize {
self.columns[0].data().len()
self.row_count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could avoid adding row_count with something like

self
  .columns
  .get(0)
  .map(|col| col.data.len())
  .unwrap_or(0)

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would always return 0 if there are no columns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- I think I was a little confused -- is the idea that the schema also has 0 fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it's basically a way to support projecting no columns and just getting the row count. This is needed for #1537

}

/// Get a reference to a column's array by index.
Expand All @@ -267,10 +270,6 @@ impl RecordBatch {
///
/// Panics if `offset` with `length` is greater than column length.
pub fn slice(&self, offset: usize, length: usize) -> RecordBatch {
if self.schema.fields().is_empty() {
assert!((offset + length) == 0);
return RecordBatch::new_empty(self.schema.clone());
}
assert!((offset + length) <= self.num_rows());

let columns = self
Expand All @@ -282,6 +281,7 @@ impl RecordBatch {
Self {
schema: self.schema.clone(),
columns,
row_count: length,
}
}

Expand Down Expand Up @@ -402,15 +402,20 @@ impl RecordBatch {

/// Options that control the behaviour used when creating a [`RecordBatch`].
#[derive(Debug)]
#[non_exhaustive]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a new option imo shouldn't be a breaking change, this should give us that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe it is time to make these fields non pub and add with_match_field_name(&mut self, val: bool) type accessors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel particularly strongly either way, although have to write and maintain setters and accessors is a bit of a PIA

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although have to write and maintain setters and accessors is a bit of a PIA

The price for backwards compatibility?

Copy link
Contributor Author

@tustvold tustvold Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non_exhaustive should make additive changes non-breaking, which should be good enough hopefully 😀

Happy to change if you feel strongly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly

pub struct RecordBatchOptions {
/// Match field names of structs and lists. If set to `true`, the names must match.
pub match_field_names: bool,

/// Optional row count, useful for specifying a row count for a RecordBatch with no columns
pub row_count: Option<usize>,
}

impl Default for RecordBatchOptions {
fn default() -> Self {
Self {
match_field_names: true,
row_count: None,
}
}
}
Expand All @@ -426,6 +431,7 @@ impl From<&StructArray> for RecordBatch {
let columns = struct_array.boxed_fields.clone();
RecordBatch {
schema: Arc::new(schema),
row_count: struct_array.len(),
columns,
}
} else {
Expand Down Expand Up @@ -644,6 +650,7 @@ mod tests {
// creating the batch without field name validation should pass
let options = RecordBatchOptions {
match_field_names: false,
row_count: None,
};
let batch = RecordBatch::try_new_with_options(schema, vec![a], &options);
assert!(batch.is_ok());
Expand Down Expand Up @@ -934,4 +941,30 @@ mod tests {

assert_eq!(expected, record_batch.project(&[0, 2]).unwrap());
}

#[test]
fn test_no_column_record_batch() {
let schema = Arc::new(Schema::new(vec![]));

let err = RecordBatch::try_new(schema.clone(), vec![]).unwrap_err();
assert!(err
.to_string()
.contains("must either specify a row count or at least one column"));

let mut options = RecordBatchOptions::default();
options.row_count = Some(10);

let ok =
RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
assert_eq!(ok.num_rows(), 10);

let a = ok.slice(2, 5);
assert_eq!(a.num_rows(), 5);

let b = ok.slice(5, 0);
assert_eq!(b.num_rows(), 0);

assert_ne!(a, b);
assert_eq!(b, RecordBatch::new_empty(schema))
}
}
1 change: 1 addition & 0 deletions arrow/src/util/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub fn create_random_batch(
columns,
&RecordBatchOptions {
match_field_names: false,
row_count: None,
},
)
}
Expand Down