Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12411: [Rust] Add Builder interface for adding Arrays to RecordBatches #10063

Closed
wants to merge 2 commits into from

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Apr 15, 2021

Purpose

This PR is a draft for comment / review. If people generally like this idea, I will polish up this PR with doc examples / comments and more test for real review.

Rationle / Usecase:

While writing tests (both in IOx and in DataFusion) where I need a single RecordBatch, I often find myself doing something like this (copied directly from IOx source code):

let schema = Arc::new(Schema::new(vec![
    ArrowField::new("float_field", ArrowDataType::Float64, true),
    ArrowField::new("time", ArrowDataType::Int64, true),
]));

let float_array: ArrayRef = Arc::new(Float64Array::from(vec![10.1, 20.1, 30.1, 40.1]));
let timestamp_array: ArrayRef = Arc::new(Int64Array::from(vec![1000, 2000, 3000, 4000]));

let batch = RecordBatch::try_new(schema, vec![float_array, timestamp_array])
    .expect("created new record batch");

This is annoying because I have to redundantly (and verbosely) encode the information that float_field is a Float64 both in the Schema and the Float64Array

I would much rather be able to construct RecordBatches using a more Rust like style to avoid the the redundancy and reduce the amount of typing / redundancy:

Proposal:

Add RecordBatch::append so the following syntax can be supported:

let float_array: ArrayRef = Arc::new(Float64Array::from(vec![10.1, 20.1, 30.1, 40.1]));
let timestamp_array: ArrayRef = Arc::new(Int64Array::from(vec![1000, 2000, 3000, 4000]));

let batch = RecordBatch::empty()
  .append("float_field", timestamp_array).unwrap()
  .append("time", float_array).unwrap;

Existing APIs

The existing APIs to create a RecordBatch from a Schema and Vec<ArrayRef> would not be changed as there are plenty of use cases where the Schema is known up front and should not be checked each time.

@github-actions
Copy link

@alamb
Copy link
Contributor Author

alamb commented Apr 15, 2021

Err(shared_schema) => shared_schema.as_ref().clone(),
};

let nullable = field_values.null_count() > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a limitation here. If the purpose is to create a single record batch, and that batch is used alone through its lifetime, then this is fine; otherwise we might need to take a nullable parameter.

In any case, I think that if someone uses this to create individual record batches, it'd be inefficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this would be an important point to clarify in the comments. If you are creating more than one RecordBatch you should use the existing api to create the RecordBatch from a Schema and Vec<ArrayRef>

If you are creating a single one then this is more convenient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't an area that I'm super-familiar with, but are there any existing methods or expectations around merging schemas that differ solely on nullability, e.g. merging two schemas together and inferring the nullability of each field by ORing the nullability on each side of the merge? So if either schema declares the field as nullable, we take the resultant schema as nullable.

If not, then I'd agree that we probably should keep this explicitly declared and not infer it here.

Copy link
Contributor

@nevi-me nevi-me left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments :) I like the idea, at the least it'd make constructing batches from arrays in tests more convenient.

/// Creates a new [`RecordBatch`] with no columns
///
/// TODO add an code example using `append`
pub fn new() -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worthwhile to maintain the property that a recordbatch is ummutable. So, this feels a bit out of place for me.

How about we create a builder that takes an iterator of (&str, ArrayRef) instead?

The below is what I mean by out of place

let mut batch = read_batch_from_somewhere()?;
batch = batch.append("field", array_ref)?;

There might be a benefit in having the above, so I'm open to convincing :)
This could partially address your question on the TODO about returning Self on error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense @nevi-me -- I think a separate RecordBatchBuilder sounds like a better idea to me. I'll wait a bit for feedback from others, and then give it a shot.

/// TODO: code example
///
/// TODO: on error, can we return `Self` in some meaningful way?
pub fn append(self, field_name: &str, field_values: ArrayRef) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that ArrayRef is backed by an Arc, how about taking it by reference then cloning it?

Am I correct that if this errors, the underlying ArrayRef could get dropped if it's not a clone in itself?
Something like:

let array = Int32Array::from(vec![1, 2, 3]);
let array_ref = Arc::new(array);

// append to existing batch
let mut batch = ...;
// assume that this fails, does array_ref get dropped as we no longer have references to it?
batch = batch.append("ints", array_ref)?;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was thinking we would leave the "do I want to have access to the ArrayRef after the call to append up to the caller,

So like

// append to existing batch
let mut batch = ...;
// We want to use array_ref later, so pass in a clone
batch = batch.append("ints", Arc::clone(array_ref))?;

Which I think is a common pattern in the Rust libraries (that if a function needs to clone its argument, it takes it by value instead and thus gives the caller a chance to avoid an extra copy if they don't need the argument again)

But in this case with an Arc that is cheap to clone I don't think the difference really matters from a performance perspective

@alamb
Copy link
Contributor Author

alamb commented Apr 16, 2021

Thanks for the comments @nevi-me !

@jorgecarleitao
Copy link
Member

jorgecarleitao commented Apr 16, 2021

Wouldn't the following work? (untested)

let batch: RecordBatch = (&StructArray::try_from(vec![
    ("a", float_array),
    ("b", timestamp_array),
])?).into();

If we do not want to use a StructArray, I would prefer that we have a constructor like this and thus keep the RecordBatch imutable.

@alamb
Copy link
Contributor Author

alamb commented Apr 16, 2021

If we do not want to use a StructArray, I would prefer that we have a constructor like this and thus keep the RecordBatch imutable.

@jorgecarleitao that is a really good idea. I learn something new every day! I think using a StructArray for creating RecordBatches is confusing, but the From implementation is a great idea. What would you feel about something like

let batch: RecordBatch = vec![
    ("a", float_array),
    ("b", timestamp_array),
].into();

Copy link
Contributor

@returnString returnString left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a big fan of this idea, especially for making testing cleaner and creating simple MemTable instances within DataFusion. I've done some pretty horrendous things to simplify creation of small batches.

Agreed with the existing comments that it might be more robust to do this via From, or maybe TryFrom to support erroring out, but I do like the simplicity of taking self by value for a quick chained setup too.

Err(shared_schema) => shared_schema.as_ref().clone(),
};

let nullable = field_values.null_count() > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't an area that I'm super-familiar with, but are there any existing methods or expectations around merging schemas that differ solely on nullability, e.g. merging two schemas together and inferring the nullability of each field by ORing the nullability on each side of the merge? So if either schema declares the field as nullable, we take the resultant schema as nullable.

If not, then I'd agree that we probably should keep this explicitly declared and not infer it here.

@@ -245,6 +295,12 @@ impl RecordBatch {
}
}

impl Default for RecordBatch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not totally sure about this because it bypasses the invariants of validate_new_batch - it seems like we don't want to allow for 0-column record batches to exist. Might be an argument in favour of the impl From approach?

@alamb
Copy link
Contributor Author

alamb commented Apr 17, 2021

This isn't an area that I'm super-familiar with, but are there any existing methods or expectations around merging schemas that differ solely on nullability, e.g. merging two schemas together and inferring the nullability of each field by ORing the nullability on each side of the merge? So if either schema declares the field as nullable, we take the resultant schema as nullable.

The only place I know about merging schema Fields is here:

if from.nullable {

Which by my reading effectively does do an OR of `nullability).

If not, then I'd agree that we probably should keep this explicitly declared and not infer it here.

Given my usecase of "make a single record batch from a known set of arrays" I really would prefer to avoid having to include redundant information (nullability).

However, given we are going to go with a From / TryFrom implementation I think that means we can also add something like

let batch: RecordBatch = vec![
    ("a", float_array, false),
    ("b", timestamp_array, true),
].into();

Or similar to specifically specify the nullability.

@alamb
Copy link
Contributor Author

alamb commented Apr 19, 2021

The Apache Arrow Rust community is moving the Rust implementation into its own dedicated github repositories arrow-rs and arrow-datafusion. It is likely we will not merge this PR into this repository

Please see the mailing-list thread for more details

We expect the process to take a few days and will follow up with a migration plan for the in-flight PRs.

@alamb
Copy link
Contributor Author

alamb commented Apr 19, 2021

Thanks everyone for the feedback -- I have created a PR apache/arrow-rs#7 with an implementation from iterators.

@alamb alamb closed this Apr 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants