-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Add evaluate_to_arrays function #18446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b984129 to
ff2f57a
Compare
| #[inline] | ||
| pub fn evaluate_expressions_to_arrays( | ||
| exprs: &[Arc<dyn PhysicalExpr>], | ||
| batch: &RecordBatch, | ||
| ) -> Result<Vec<ArrayRef>> { | ||
| let num_rows = batch.num_rows(); | ||
| exprs | ||
| .iter() | ||
| .map(|e| e.evaluate(batch).and_then(|col| col.into_array(num_rows))) | ||
| .collect::<Result<Vec<ArrayRef>>>() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we handle the footgun of This means that this should not be called on expressions that may return // arrays of different lengths by matching the ColumnarValue and blowing up Scalars, passing through arrays of the same length but erroring if the result is an array of a different length?
Also since this is going to be used pretty widely maybe a docstring example, etc. would help
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I had a few busy days.
I think this should be a function on the ColumnarArray itself that we could call here and would return an error if the length was different than num_rows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I do think that's another feature/function that is missing in my opinion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe there is ColumnarValue::to_array already, but that doesn't check the length I assume? https://docs.rs/datafusion-expr-common/50.3.0/src/datafusion_expr_common/columnar_value.rs.html#145
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to_array is the same as into_array except it takes by reference, which is an extra unneeded clone in this case.
No relevant method exists that I know of, in my project I just made an extension trait for ColumnarValue and added a function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid bike shedding I'll just give my code suggestion:
| #[inline] | |
| pub fn evaluate_expressions_to_arrays( | |
| exprs: &[Arc<dyn PhysicalExpr>], | |
| batch: &RecordBatch, | |
| ) -> Result<Vec<ArrayRef>> { | |
| let num_rows = batch.num_rows(); | |
| exprs | |
| .iter() | |
| .map(|e| e.evaluate(batch).and_then(|col| col.into_array(num_rows))) | |
| .collect::<Result<Vec<ArrayRef>>>() | |
| } | |
| #[inline] | |
| pub fn evaluate_expressions_to_arrays( | |
| exprs: &[Arc<dyn PhysicalExpr>], | |
| batch: &RecordBatch, | |
| ) -> Result<Vec<ArrayRef>> { | |
| let num_rows = batch.num_rows(); | |
| exprs | |
| .iter() | |
| .map(|e| e.evaluate(batch).and_then(|col| { | |
| match col { | |
| ColumnarValue::Scalar(scalar) => scalar.to_array(num_rows), | |
| ColumnarValue::Array(array) => { | |
| if array.num_rows() == num_rows() { | |
| Ok(array) | |
| } else { | |
| internal_err!("Expressions evaluated to arrays of mismatched lengths") | |
| } | |
| } | |
| })) | |
| .collect::<Result<Vec<ArrayRef>>>() | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable to me, I do still think this should be inside ColumnarValue, I will prob make this PR in the near future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed I think into_array should be returning an error and checking for this, but that might be too big of a breaking change -> might just need to be a new function and document the foot gun or deprecate into_array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have decided to just add the required functions in ColumnarValue, as well as make this function more generic, so it can be used directly with iterators such as the group_by and nulls exprs case
ff2f57a to
8eb7788
Compare
| value.into_array(batch.num_rows()) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe these could make use of the new helper function too:
let exprs_list: Vec<_> = group_by.expr.iter().map(|(e, _)| Arc::clone(e)).collect();
let null_exprs_list: Vec<_> = group_by.null_expr.iter().map(|(e, _)| Arc::clone(e)).collect();
let exprs: Vec<ArrayRef> = evaluate_expressions_to_arrays(&exprs_list, batch)?;
let null_exprs: Vec<ArrayRef> = evaluate_expressions_to_arrays(&null_exprs_list, batch)?;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually considered it, just didn't want to allocate new Vecs if I could help it, but maybe worth adding anyway
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
| /// number of rows. [`Self::Scalar`] is converted by repeating the same | ||
| /// scalar multiple times which is not as efficient as handling the scalar | ||
| /// directly. | ||
| /// This validates that [`Self::Array`], if it exists, has the expected length. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// This validates that [`Self::Array`], if it exists, has the expected length. | |
| /// This validates that if this is [`Self::Array`] it has the expected length. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
| let arr = make_array(1, 3); | ||
| let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr)); | ||
| let result = arr_columnar_value.into_array_of_size(5); | ||
| assert!(result.is_err()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you validate the exact error as it can return different error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
rluvaton
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you @EmilyMatt, left some comments
@adriangb do you have more comments? or can I merge after my comments addressed
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a great improvement to me @EmilyMatt
| .and_then(|v| v.into_array(batch.num_rows())) | ||
| }) | ||
| .collect() | ||
| evaluate_expressions_to_arrays(&self.expressions(), batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😍
|
Thank you @EmilyMatt hope to see you contributing again :) |
Which issue does this PR close?
Rationale for this change
Reduce code duplication.
What changes are included in this PR?
A util function replacing many calls which are using the same code.
Are these changes tested?
No logic should change whatsoever, so each area which now uses this code should have it's own tests and benchmarks unmodified.
Are there any user-facing changes?
Yes, there is now a new pub function.
No other changes to API.