Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12411: [Rust] Add Builder interface for adding Arrays to RecordBatches #10063

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rust/arrow/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ impl Schema {
})
}

/// Appends a new field to this `Schema` as a field named
/// `field_name`.
pub fn push(&mut self, field: Field) {
self.fields.push(field)
}

/// Returns an immutable reference of the vector of `Field` instances.
#[inline]
pub const fn fields(&self) -> &Vec<Field> {
Expand Down
90 changes: 89 additions & 1 deletion rust/arrow/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl RecordBatch {
Ok(RecordBatch { schema, columns })
}

/// Creates a new empty [`RecordBatch`].
/// Creates a new empty [`RecordBatch`] based on `schema`.
pub fn new_empty(schema: SchemaRef) -> Self {
let columns = schema
.fields()
Expand All @@ -103,6 +103,56 @@ impl RecordBatch {
RecordBatch { schema, columns }
}

/// Creates a new [`RecordBatch`] with no columns
///
/// TODO add an code example using `append`
pub fn new() -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worthwhile to maintain the property that a recordbatch is ummutable. So, this feels a bit out of place for me.

How about we create a builder that takes an iterator of (&str, ArrayRef) instead?

The below is what I mean by out of place

let mut batch = read_batch_from_somewhere()?;
batch = batch.append("field", array_ref)?;

There might be a benefit in having the above, so I'm open to convincing :)
This could partially address your question on the TODO about returning Self on error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense @nevi-me -- I think a separate RecordBatchBuilder sounds like a better idea to me. I'll wait a bit for feedback from others, and then give it a shot.

Self {
schema: Arc::new(Schema::empty()),
columns: Vec::new(),
}
}

/// Appends the `field_array` array to this `RecordBatch` as a
/// field named `field_name`.
///
/// TODO: code example
///
/// TODO: on error, can we return `Self` in some meaningful way?
pub fn append(self, field_name: &str, field_values: ArrayRef) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that ArrayRef is backed by an Arc, how about taking it by reference then cloning it?

Am I correct that if this errors, the underlying ArrayRef could get dropped if it's not a clone in itself?
Something like:

let array = Int32Array::from(vec![1, 2, 3]);
let array_ref = Arc::new(array);

// append to existing batch
let mut batch = ...;
// assume that this fails, does array_ref get dropped as we no longer have references to it?
batch = batch.append("ints", array_ref)?;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was thinking we would leave the "do I want to have access to the ArrayRef after the call to append up to the caller,

So like

// append to existing batch
let mut batch = ...;
// We want to use array_ref later, so pass in a clone
batch = batch.append("ints", Arc::clone(array_ref))?;

Which I think is a common pattern in the Rust libraries (that if a function needs to clone its argument, it takes it by value instead and thus gives the caller a chance to avoid an extra copy if they don't need the argument again)

But in this case with an Arc that is cheap to clone I don't think the difference really matters from a performance perspective

if let Some(col) = self.columns.get(0) {
if col.len() != field_values.len() {
return Err(ArrowError::InvalidArgumentError(
format!("all columns in a record batch must have the same length. expected {}, field {} had {} ",
col.len(), field_name, field_values.len())
));
}
}

let Self {
schema,
mut columns,
} = self;

// modify the schema we have if possible, otherwise copy
let mut schema = match Arc::try_unwrap(schema) {
Ok(schema) => schema,
Err(shared_schema) => shared_schema.as_ref().clone(),
};

let nullable = field_values.null_count() > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a limitation here. If the purpose is to create a single record batch, and that batch is used alone through its lifetime, then this is fine; otherwise we might need to take a nullable parameter.

In any case, I think that if someone uses this to create individual record batches, it'd be inefficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this would be an important point to clarify in the comments. If you are creating more than one RecordBatch you should use the existing api to create the RecordBatch from a Schema and Vec<ArrayRef>

If you are creating a single one then this is more convenient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't an area that I'm super-familiar with, but are there any existing methods or expectations around merging schemas that differ solely on nullability, e.g. merging two schemas together and inferring the nullability of each field by ORing the nullability on each side of the merge? So if either schema declares the field as nullable, we take the resultant schema as nullable.

If not, then I'd agree that we probably should keep this explicitly declared and not infer it here.

schema.push(Field::new(
field_name,
field_values.data_type().clone(),
nullable,
));
let schema = Arc::new(schema);

columns.push(field_values);

Ok(Self { schema, columns })
}

/// Validate the schema and columns using [`RecordBatchOptions`]. Returns an error
/// if any validation check fails.
fn validate_new_batch(
Expand Down Expand Up @@ -245,6 +295,12 @@ impl RecordBatch {
}
}

impl Default for RecordBatch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not totally sure about this because it bypasses the invariants of validate_new_batch - it seems like we don't want to allow for 0-column record batches to exist. Might be an argument in favour of the impl From approach?

fn default() -> Self {
Self::new()
}
}

/// Options that control the behaviour used when creating a [`RecordBatch`].
#[derive(Debug)]
pub struct RecordBatchOptions {
Expand Down Expand Up @@ -337,6 +393,38 @@ mod tests {
assert_eq!(5, record_batch.column(1).data().len());
}

#[test]
fn create_record_batch_builder() {
let a = Arc::new(Int32Array::from(vec![
Some(1),
Some(2),
None,
Some(4),
Some(5),
]));
let b = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));

let record_batch = RecordBatch::new()
.append("a", a)
.unwrap()
.append("b", b)
.unwrap();

let expected_schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, false),
]);

assert_eq!(record_batch.schema().as_ref(), &expected_schema);

assert_eq!(5, record_batch.num_rows());
assert_eq!(2, record_batch.num_columns());
assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
assert_eq!(&DataType::Utf8, record_batch.schema().field(1).data_type());
assert_eq!(5, record_batch.column(0).data().len());
assert_eq!(5, record_batch.column(1).data().len());
}

#[test]
fn create_record_batch_schema_mismatch() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
Expand Down