-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC: Splitting scalar functions outside Datafusion core #7752
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use datafusion::error::Result; | ||
use datafusion::prelude::*; | ||
use datafusion_extension_test_scalar_func::TestFunctionPackage; | ||
|
||
/// This example demonstrates executing a simple query against an Arrow data source (CSV) and | ||
/// fetching results | ||
#[tokio::main] | ||
async fn main() -> Result<()> { | ||
let ctx = SessionContext::new(); | ||
let testdata = datafusion::test_util::arrow_test_data(); | ||
ctx.register_csv( | ||
"aggregate_test_100", | ||
&format!("{testdata}/csv/aggregate_test_100.csv"), | ||
CsvReadOptions::new(), | ||
) | ||
.await?; | ||
|
||
// Register add_one(x), multiply_two(x) function from `TestFunctionPackage` | ||
ctx.register_scalar_function_package(Box::new(TestFunctionPackage)); | ||
|
||
let df = ctx | ||
.sql("select add_one(1), multiply_two(c3), add_one(multiply_two(c4)) from aggregate_test_100 limit 5").await?; | ||
df.show().await?; | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,8 @@ use datafusion_common::{ | |
use datafusion_execution::registry::SerializerRegistry; | ||
use datafusion_expr::{ | ||
logical_plan::{DdlStatement, Statement}, | ||
StringifiedPlan, UserDefinedLogicalNode, WindowUDF, | ||
ScalarFunctionDef, ScalarFunctionPackage, StringifiedPlan, UserDefinedLogicalNode, | ||
WindowUDF, | ||
}; | ||
pub use datafusion_physical_expr::execution_props::ExecutionProps; | ||
use datafusion_physical_expr::var_provider::is_system_variables; | ||
|
@@ -79,6 +80,7 @@ use sqlparser::dialect::dialect_from_str; | |
use crate::config::ConfigOptions; | ||
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet}; | ||
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry}; | ||
use crate::physical_plan::functions::make_scalar_function; | ||
use crate::physical_plan::udaf::AggregateUDF; | ||
use crate::physical_plan::udf::ScalarUDF; | ||
use crate::physical_plan::ExecutionPlan; | ||
|
@@ -792,6 +794,30 @@ impl SessionContext { | |
.add_var_provider(variable_type, provider); | ||
} | ||
|
||
/// Register a function package into this context | ||
pub fn register_scalar_function_package( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are going to implement a new API, maybe we can have it cover all types of functions (window, aggregate, scalar) so there is a place to add table functions eventually too: Something like
Also I wonder if it should take |
||
&self, | ||
func_pkg: Box<dyn ScalarFunctionPackage>, | ||
) { | ||
// Make a `dyn ScalarFunctionDef` into a internal struct for scalar functions, then it can be | ||
// registered into context | ||
pub fn to_scalar_function(func: Box<dyn ScalarFunctionDef>) -> ScalarUDF { | ||
let name = func.name().to_string(); | ||
let signature = func.signature(); | ||
let return_type = func.return_type(); | ||
let func_impl = make_scalar_function(move |args| func.execute(args)); | ||
|
||
ScalarUDF::new(&name, &signature, &return_type, &func_impl) | ||
} | ||
|
||
for func in func_pkg.functions() { | ||
self.state | ||
.write() | ||
.scalar_functions | ||
.insert(func.name().to_string(), Arc::new(to_scalar_function(func))); | ||
} | ||
} | ||
|
||
/// Registers a scalar UDF within this context. | ||
/// | ||
/// Note in SQL queries, function names are looked up using | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,30 @@ | |
//! Udf module contains foundational types that are used to represent UDFs in DataFusion. | ||
|
||
use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature}; | ||
use arrow::array::ArrayRef; | ||
use datafusion_common::Result; | ||
use std::fmt; | ||
use std::fmt::Debug; | ||
use std::fmt::Formatter; | ||
use std::sync::Arc; | ||
|
||
pub trait ScalarFunctionDef: Sync + Send + std::fmt::Debug { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really like using a trait to define functions. To be honest, I am not sure why DataFusion didn't so that originally. My only concern is that adding this we would now have three ways to define scalar functions (the ScalarUDF, I wonder if this trait is needed, or can we extend Or perhaps should we be aiming to consolidate all functions to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think using a trait to define functions is more clear and easy to use, if extending There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, I agree
Maybe we could do something like impl SclarFunctionDef for ScalarUDF {
...
} So all of DataFusion's code was in terms of And then (eventally) deprecate ScalarUDF as part of helping people migrate to using |
||
// TODO: support alias | ||
fn name(&self) -> &str; | ||
|
||
fn signature(&self) -> Signature; | ||
|
||
// TODO: ReturnTypeFunction -> a ENUM | ||
// most function's return type is either the same as 1st arg or a fixed type | ||
fn return_type(&self) -> ReturnTypeFunction; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the current API requires return_type to be passed the specific argument types Also, #7657 suggests there is additional room for improvement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we might mark this API as experimental for now, it can have frequent changes to this API, other unexpected cases like #7657 might show up when porting some trickier functions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like that idea -- maybe the implementation plan sequence could be:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds great 👍🏼 My plan is |
||
|
||
fn execute(&self, args: &[ArrayRef]) -> Result<ArrayRef>; | ||
} | ||
|
||
pub trait ScalarFunctionPackage { | ||
fn functions(&self) -> Vec<Box<dyn ScalarFunctionDef>>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest using |
||
} | ||
|
||
/// Logical representation of a UDF. | ||
#[derive(Clone)] | ||
pub struct ScalarUDF { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
[package] | ||
name = "datafusion-extension-test-scalar-func" | ||
authors.workspace = true | ||
edition.workspace = true | ||
homepage.workspace = true | ||
license.workspace = true | ||
readme.workspace = true | ||
repository.workspace = true | ||
rust-version.workspace = true | ||
version.workspace = true | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
datafusion = { path = "../../../datafusion/core" } | ||
datafusion-common = { path = "../../../datafusion/common" } | ||
datafusion-expr = { path = "../../../datafusion/expr" } | ||
arrow = { workspace = true } | ||
#arrow-flight = { workspace = true } | ||
#arrow-schema = { workspace = true } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
use arrow::array::{ArrayRef, Float64Array}; | ||
use arrow::datatypes::DataType; | ||
use datafusion::error::Result; | ||
use datafusion::logical_expr::Volatility; | ||
use datafusion_common::cast::as_float64_array; | ||
use datafusion_expr::{ReturnTypeFunction, Signature}; | ||
use datafusion_expr::{ScalarFunctionDef, ScalarFunctionPackage}; | ||
use std::sync::Arc; | ||
|
||
#[derive(Debug)] | ||
pub struct AddOneFunction; | ||
|
||
impl ScalarFunctionDef for AddOneFunction { | ||
fn name(&self) -> &str { | ||
"add_one" | ||
} | ||
|
||
fn signature(&self) -> Signature { | ||
Signature::exact(vec![DataType::Float64], Volatility::Immutable) | ||
} | ||
|
||
fn return_type(&self) -> ReturnTypeFunction { | ||
let return_type = Arc::new(DataType::Float64); | ||
Arc::new(move |_| Ok(return_type.clone())) | ||
} | ||
|
||
fn execute(&self, args: &[ArrayRef]) -> Result<ArrayRef> { | ||
assert_eq!(args.len(), 1); | ||
let input = as_float64_array(&args[0]).expect("cast failed"); | ||
let array = input | ||
.iter() | ||
.map(|value| match value { | ||
Some(value) => Some(value + 1.0), | ||
_ => None, | ||
}) | ||
.collect::<Float64Array>(); | ||
Ok(Arc::new(array) as ArrayRef) | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct MultiplyTwoFunction; | ||
|
||
impl ScalarFunctionDef for MultiplyTwoFunction { | ||
fn name(&self) -> &str { | ||
"multiply_two" | ||
} | ||
|
||
fn signature(&self) -> Signature { | ||
Signature::exact(vec![DataType::Float64], Volatility::Immutable) | ||
} | ||
|
||
fn return_type(&self) -> ReturnTypeFunction { | ||
let return_type = Arc::new(DataType::Float64); | ||
Arc::new(move |_| Ok(return_type.clone())) | ||
} | ||
|
||
fn execute(&self, args: &[ArrayRef]) -> Result<ArrayRef> { | ||
assert_eq!(args.len(), 1); | ||
let input = as_float64_array(&args[0]).expect("cast failed"); | ||
let array = input | ||
.iter() | ||
.map(|value| match value { | ||
Some(value) => Some(value * 2.0), | ||
_ => None, | ||
}) | ||
.collect::<Float64Array>(); | ||
Ok(Arc::new(array) as ArrayRef) | ||
} | ||
} | ||
|
||
// Function package declaration | ||
pub struct TestFunctionPackage; | ||
|
||
impl ScalarFunctionPackage for TestFunctionPackage { | ||
fn functions(&self) -> Vec<Box<dyn ScalarFunctionDef>> { | ||
vec![Box::new(AddOneFunction), Box::new(MultiplyTwoFunction)] | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since most of DataFusion is extensions, another potential might be a path structure like the following?