Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,23 @@ impl ScalarUDF {
///
/// See [`ScalarUDFImpl::invoke_with_args`] for details.
pub fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
self.inner.invoke_with_args(args)
#[cfg(debug_assertions)]
let return_field = Arc::clone(&args.return_field);
let result = self.inner.invoke_with_args(args)?;
// Maybe this could be enabled always?
// This doesn't use debug_assert!, but it's meant to run anywhere except on production. It's same in spirit, thus conditioning on debug_assertions.
#[cfg(debug_assertions)]
{
if &result.data_type() != return_field.data_type() {
return datafusion_common::internal_err!("Function '{}' returned value of type '{:?}' while the following type was promised at planning time and expected: '{:?}'",
self.name(),
result.data_type(),
return_field.data_type()
);
}
// TODO verify return data is non-null when it was promised to be?
}
Ok(result)
}

/// Get the circuits of inner implementation
Expand Down
2 changes: 1 addition & 1 deletion datafusion/ffi/tests/ffi_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

/// Add an additional module here for convenience to scope this to only
/// when the feature integtation-tests is built
/// when the feature integration-tests is built
#[cfg(feature = "integration-tests")]
mod tests {
use datafusion::error::{DataFusionError, Result};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/ffi/tests/ffi_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

/// Add an additional module here for convenience to scope this to only
/// when the feature integtation-tests is built
/// when the feature integration-tests is built
#[cfg(feature = "integration-tests")]
mod tests {
use arrow::array::Float64Array;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/ffi/tests/ffi_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

/// Add an additional module here for convenience to scope this to only
/// when the feature integtation-tests is built
/// when the feature integration-tests is built
#[cfg(feature = "integration-tests")]
mod tests {

Expand Down
2 changes: 1 addition & 1 deletion datafusion/ffi/tests/ffi_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

/// Add an additional module here for convenience to scope this to only
/// when the feature integtation-tests is built
/// when the feature integration-tests is built
#[cfg(feature = "integration-tests")]
mod tests {

Expand Down
2 changes: 1 addition & 1 deletion datafusion/ffi/tests/ffi_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

/// Add an additional module here for convenience to scope this to only
/// when the feature integtation-tests is built
/// when the feature integration-tests is built
#[cfg(feature = "integration-tests")]
mod tests {
use arrow::array::{create_array, ArrayRef};
Expand Down
45 changes: 17 additions & 28 deletions datafusion/spark/src/function/array/spark_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,26 +73,23 @@ impl ScalarUDFImpl for SparkArray {
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match arg_types.len() {
0 => Ok(empty_array_type()),
_ => {
let mut expr_type = DataType::Null;
for arg_type in arg_types {
if !arg_type.equals_datatype(&DataType::Null) {
expr_type = arg_type.clone();
break;
}
}

if expr_type.is_null() {
expr_type = DataType::Int32;
}

Ok(DataType::List(Arc::new(Field::new_list_field(
expr_type, true,
))))
let mut expr_type = DataType::Null;
for arg_type in arg_types {
if !arg_type.equals_datatype(&DataType::Null) {
expr_type = arg_type.clone();
break;
}
}

if expr_type.is_null() {
expr_type = DataType::Int32;
}

Ok(DataType::List(Arc::new(Field::new(
ARRAY_FIELD_DEFAULT_NAME,
expr_type,
true,
))))
}

fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
Expand All @@ -104,7 +101,7 @@ impl ScalarUDFImpl for SparkArray {
.collect::<Vec<_>>();
let return_type = self.return_type(&data_types)?;
Ok(Arc::new(Field::new(
ARRAY_FIELD_DEFAULT_NAME,
"this_field_name_is_irrelevant",
return_type,
false,
)))
Expand Down Expand Up @@ -143,15 +140,6 @@ impl ScalarUDFImpl for SparkArray {
}
}

// Empty array is a special case that is useful for many other array functions
pub(super) fn empty_array_type() -> DataType {
DataType::List(Arc::new(Field::new(
ARRAY_FIELD_DEFAULT_NAME,
DataType::Int32,
true,
)))
}

/// `make_array_inner` is the implementation of the `make_array` function.
/// Constructs an array using the input `data` as `ArrayRef`.
/// Returns a reference-counted `Array` instance result.
Expand All @@ -174,6 +162,7 @@ pub fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
Ok(Arc::new(
SingleRowListArrayBuilder::new(array)
.with_nullable(true)
.with_field_name(Some(ARRAY_FIELD_DEFAULT_NAME.to_string()))
.build_list_array(),
))
}
Expand Down
7 changes: 4 additions & 3 deletions datafusion/spark/src/function/url/parse_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::any::Any;
use std::sync::Arc;

use arrow::array::{
Array, ArrayRef, GenericStringBuilder, LargeStringArray, StringArray, StringArrayType,
Array, ArrayRef, GenericStringBuilder, LargeStringArray, StringArray,
StringArrayType, StringViewArray,
};
use arrow::datatypes::DataType;
use datafusion_common::cast::{
Expand Down Expand Up @@ -222,7 +223,7 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> {
)
}
(DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
process_parse_url::<_, _, _, StringArray>(
process_parse_url::<_, _, _, StringViewArray>(
as_string_view_array(url)?,
as_string_view_array(part)?,
as_string_view_array(key)?,
Expand Down Expand Up @@ -255,7 +256,7 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> {
)
}
(DataType::Utf8View, DataType::Utf8View) => {
process_parse_url::<_, _, _, StringArray>(
process_parse_url::<_, _, _, StringViewArray>(
as_string_view_array(url)?,
as_string_view_array(part)?,
&key,
Expand Down