Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify columnar array call and add From<T> trait for columnar array values #310

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ unicode-segmentation = { version = "^1.7.1", optional = true }
regex = { version = "^1.4.3", optional = true }
lazy_static = { version = "^1.4.0", optional = true }
smallvec = { version = "1.6", features = ["union"] }
rand = "0.8"

[dev-dependencies]
rand = "0.8"
criterion = "0.3"
tempfile = "3"
doc-comment = "0.3"
Expand Down
33 changes: 10 additions & 23 deletions datafusion/src/physical_plan/crypto_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

//! Crypto expressions
use std::sync::Arc;

use md5::Md5;
use sha2::{
Expand Down Expand Up @@ -97,23 +96,15 @@ where
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8 => {
Ok(ColumnarValue::Array(Arc::new(unary_binary_function::<
i32,
_,
_,
>(
&[a.as_ref()], op, name
)?)))
}
DataType::Utf8 => Ok(ColumnarValue::from(
unary_binary_function::<i32, _, _>(&[a.as_ref()], op, name)?,
)),
DataType::LargeUtf8 => {
Ok(ColumnarValue::Array(Arc::new(unary_binary_function::<
i64,
_,
_,
>(
&[a.as_ref()], op, name
)?)))
Ok(ColumnarValue::from(unary_binary_function::<i64, _, _>(
&[a.as_ref()],
op,
name,
)?))
}
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?} for function {}",
Expand Down Expand Up @@ -147,13 +138,9 @@ fn md5_array<T: StringOffsetSizeTrait>(
pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(md5_array::<i32>(&[
a.as_ref()
])?))),
DataType::Utf8 => Ok(ColumnarValue::from(md5_array::<i32>(&[a.as_ref()])?)),
DataType::LargeUtf8 => {
Ok(ColumnarValue::Array(Arc::new(md5_array::<i64>(&[
a.as_ref()
])?)))
Ok(ColumnarValue::from(md5_array::<i64>(&[a.as_ref()])?))
}
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?} for function md5",
Expand Down
33 changes: 22 additions & 11 deletions datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,24 @@ where
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<i32, O, _>(&[a.as_ref()], op, name)?,
))),
DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<i64, O, _>(&[a.as_ref()], op, name)?,
))),
DataType::Utf8 => {
Ok(ColumnarValue::from(unary_string_to_primitive_function::<
i32,
O,
_,
>(
&[a.as_ref()], op, name
)?))
}
DataType::LargeUtf8 => {
Ok(ColumnarValue::from(unary_string_to_primitive_function::<
i64,
O,
_,
>(
&[a.as_ref()], op, name
)?))
}
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?} for function {}",
other, name,
Expand Down Expand Up @@ -342,7 +354,7 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
.map(f)
.collect::<Result<TimestampNanosecondArray>>()?;

ColumnarValue::Array(Arc::new(array))
ColumnarValue::from(array)
}
})
}
Expand Down Expand Up @@ -435,7 +447,7 @@ pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
0,
)?)
} else {
ColumnarValue::Array(Arc::new(arr))
ColumnarValue::from(arr)
})
}

Expand All @@ -461,8 +473,7 @@ mod tests {
ts_builder.append_null()?;
let expected_timestamps = &ts_builder.finish() as &dyn Array;

let string_array =
ColumnarValue::Array(Arc::new(string_builder.finish()) as ArrayRef);
let string_array = ColumnarValue::from(string_builder.finish() as ArrayRef);
let parsed_timestamps = to_timestamp(&[string_array])
.expect("that to_timestamp parsed values without error");
if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
Expand Down Expand Up @@ -539,7 +550,7 @@ mod tests {

let mut builder = Int64Array::builder(1);
builder.append_value(1)?;
let int64array = ColumnarValue::Array(Arc::new(builder.finish()));
let int64array = ColumnarValue::from(builder.finish());

let expected_err =
"Internal error: Unsupported data type Int64 for function to_timestamp";
Expand Down
8 changes: 4 additions & 4 deletions datafusion/src/physical_plan/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ macro_rules! make_contains {
})
.collect::<Vec<_>>();

Ok(ColumnarValue::Array(Arc::new(
Ok(ColumnarValue::from(
array
.iter()
.map(|x| {
Expand All @@ -95,7 +95,7 @@ macro_rules! make_contains {
}
})
.collect::<BooleanArray>(),
)))
))
}};
}

Expand Down Expand Up @@ -164,7 +164,7 @@ impl InListExpr {
})
.collect::<Vec<&str>>();

Ok(ColumnarValue::Array(Arc::new(
Ok(ColumnarValue::from(
array
.iter()
.map(|x| {
Expand All @@ -190,7 +190,7 @@ impl InListExpr {
}
})
.collect::<BooleanArray>(),
)))
))
}
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/expressions/is_not_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ impl PhysicalExpr for IsNotNullExpr {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let arg = self.arg.evaluate(batch)?;
match arg {
ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
compute::is_not_null(array.as_ref())?,
))),
ColumnarValue::Array(array) => {
Ok(ColumnarValue::from(compute::is_not_null(array.as_ref())?))
}
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
ScalarValue::Boolean(Some(!scalar.is_null())),
)),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/expressions/is_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ impl PhysicalExpr for IsNullExpr {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let arg = self.arg.evaluate(batch)?;
match arg {
ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
compute::is_null(array.as_ref())?,
))),
ColumnarValue::Array(array) => {
Ok(ColumnarValue::from(compute::is_null(array.as_ref())?))
}
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
ScalarValue::Boolean(Some(scalar.is_null())),
)),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/expressions/not.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ impl PhysicalExpr for NotExpr {
"boolean_op failed to downcast array".to_owned(),
)
})?;
Ok(ColumnarValue::Array(Arc::new(
arrow::compute::kernels::boolean::not(array)?,
)))
Ok(ColumnarValue::from(arrow::compute::kernels::boolean::not(
array,
)?))
}
ColumnarValue::Scalar(scalar) => {
use std::convert::TryInto;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/expressions/nullif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ mod tests {
Some(4),
Some(5),
]);
let a = ColumnarValue::Array(Arc::new(a));
let a = ColumnarValue::from(a);

let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));

Expand All @@ -164,7 +164,7 @@ mod tests {
// Ensure that arrays with no nulls can also invoke NULLIF() correctly
fn nullif_int32_nonulls() -> Result<()> {
let a = Int32Array::from(vec![1, 3, 10, 7, 8, 1, 2, 4, 5]);
let a = ColumnarValue::Array(Arc::new(a));
let a = ColumnarValue::from(a);

let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));

Expand Down
34 changes: 17 additions & 17 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::{
scalar::ScalarValue,
};
use arrow::{
array::ArrayRef,
array::{ArrayRef, NullArray},
compute::kernels::length::{bit_length, length},
datatypes::TimeUnit,
datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
Expand Down Expand Up @@ -160,6 +160,8 @@ pub enum BuiltinScalarFunction {
NullIf,
/// octet_length
OctetLength,
/// random
Random,
/// regexp_replace
RegexpReplace,
/// repeat
Expand Down Expand Up @@ -256,6 +258,7 @@ impl FromStr for BuiltinScalarFunction {
"md5" => BuiltinScalarFunction::MD5,
"nullif" => BuiltinScalarFunction::NullIf,
"octet_length" => BuiltinScalarFunction::OctetLength,
"random" => BuiltinScalarFunction::Random,
"regexp_replace" => BuiltinScalarFunction::RegexpReplace,
"repeat" => BuiltinScalarFunction::Repeat,
"replace" => BuiltinScalarFunction::Replace,
Expand Down Expand Up @@ -298,15 +301,6 @@ pub fn return_type(
// verify that this is a valid set of data types for this function
data_types(&arg_types, &signature(fun))?;

if arg_types.is_empty() {
// functions currently cannot be evaluated without arguments, as they can't
// know the number of rows to return.
return Err(DataFusionError::Plan(format!(
"Function '{}' requires at least one argument",
fun
)));
}

// the return type of the built in function.
// Some built-in functions' return type depends on the incoming type.
match fun {
Expand Down Expand Up @@ -427,6 +421,7 @@ pub fn return_type(
));
}
}),
BuiltinScalarFunction::Random => Ok(DataType::Float64),
BuiltinScalarFunction::RegexpReplace => Ok(match arg_types[0] {
DataType::LargeUtf8 => DataType::LargeUtf8,
DataType::Utf8 => DataType::Utf8,
Expand Down Expand Up @@ -729,6 +724,7 @@ pub fn create_physical_expr(
BuiltinScalarFunction::Ln => math_expressions::ln,
BuiltinScalarFunction::Log10 => math_expressions::log10,
BuiltinScalarFunction::Log2 => math_expressions::log2,
BuiltinScalarFunction::Random => math_expressions::random,
BuiltinScalarFunction::Round => math_expressions::round,
BuiltinScalarFunction::Signum => math_expressions::signum,
BuiltinScalarFunction::Sin => math_expressions::sin,
Expand Down Expand Up @@ -1373,20 +1369,24 @@ impl PhysicalExpr for ScalarFunctionExpr {
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
// evaluate the arguments
let inputs = self
.args
.iter()
.map(|e| e.evaluate(batch))
.collect::<Result<Vec<_>>>()?;
// evaluate the arguments, if there are no arguments we'll instead pass in a null array of
// batch size (as a convention)
let inputs = match self.args.len() {
0 => vec![ColumnarValue::from(NullArray::new(batch.num_rows()))],
_ => self
.args
.iter()
.map(|e| e.evaluate(batch))
.collect::<Result<Vec<_>>>()?,
};

// evaluate the function
let fun = self.fun.as_ref();
(fun)(&inputs)
}
}

/// decorates a function to handle [`ScalarValue`]s by coverting them to arrays before calling the function
/// decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function
/// and vice-versa after evaluation.
pub fn make_scalar_function<F>(inner: F) -> ScalarFunctionImplementation
where
Expand Down
20 changes: 19 additions & 1 deletion datafusion/src/physical_plan/math_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
// under the License.

//! Math expressions

use super::{ColumnarValue, ScalarValue};
use crate::error::{DataFusionError, Result};
use arrow::array::{Float32Array, Float64Array};
use arrow::datatypes::DataType;
use rand::{thread_rng, Rng};
use std::sync::Arc;

macro_rules! downcast_compute_op {
Expand Down Expand Up @@ -100,3 +100,21 @@ math_unary_function!("exp", exp);
math_unary_function!("ln", ln);
math_unary_function!("log2", log2);
math_unary_function!("log10", log10);

/// random SQL function
pub fn random(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let len = match &args[0] {
ColumnarValue::Array(array) => array.len(),
_ => {
return Err(DataFusionError::Internal(
"Expect random function to take no param".to_string(),
))
}
};
let mut rng = thread_rng();
let mut array = Vec::with_capacity(len);
for _ in 0..len {
array.push(Some(rng.gen_range(0.0..1.0)))
}
Ok(ColumnarValue::from(Float64Array::from(array)))
}
Loading