From 909c5286b45ab2e417684948c144b4c176e99f6b Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Wed, 5 Nov 2025 11:58:28 +0300 Subject: [PATCH 1/2] Consolidate udf examples (#18142) --- .../examples/{ => udf}/advanced_udaf.rs | 5 +- .../examples/{ => udf}/advanced_udf.rs | 55 ++++---- .../examples/{ => udf}/advanced_udwf.rs | 5 +- .../examples/{ => udf}/async_udf.rs | 5 +- datafusion-examples/examples/udf/main.rs | 133 ++++++++++++++++++ .../examples/{ => udf}/simple_udaf.rs | 5 +- .../examples/{ => udf}/simple_udf.rs | 3 +- .../examples/{ => udf}/simple_udtf.rs | 3 +- .../examples/{ => udf}/simple_udwf.rs | 3 +- 9 files changed, 175 insertions(+), 42 deletions(-) rename datafusion-examples/examples/{ => udf}/advanced_udaf.rs (98%) rename datafusion-examples/examples/{ => udf}/advanced_udf.rs (99%) rename datafusion-examples/examples/{ => udf}/advanced_udwf.rs (98%) rename datafusion-examples/examples/{ => udf}/async_udf.rs (98%) create mode 100644 datafusion-examples/examples/udf/main.rs rename datafusion-examples/examples/{ => udf}/simple_udaf.rs (97%) rename datafusion-examples/examples/{ => udf}/simple_udf.rs (99%) rename datafusion-examples/examples/{ => udf}/simple_udtf.rs (99%) rename datafusion-examples/examples/{ => udf}/simple_udwf.rs (99%) diff --git a/datafusion-examples/examples/advanced_udaf.rs b/datafusion-examples/examples/udf/advanced_udaf.rs similarity index 98% rename from datafusion-examples/examples/advanced_udaf.rs rename to datafusion-examples/examples/udf/advanced_udaf.rs index 89f0a470e32e..81e227bfacee 100644 --- a/datafusion-examples/examples/advanced_udaf.rs +++ b/datafusion-examples/examples/udf/advanced_udaf.rs @@ -469,8 +469,9 @@ fn create_context() -> Result { Ok(ctx) } -#[tokio::main] -async fn main() -> Result<()> { +/// In this example we register `GeoMeanUdaf` and `SimplifiedGeoMeanUdaf` +/// as user defined aggregate functions and invoke them via the DataFrame API and SQL +pub async fn advanced_udaf() -> Result<()> { let ctx = create_context()?; let geo_mean_udf = AggregateUDF::from(GeoMeanUdaf::new()); diff --git a/datafusion-examples/examples/advanced_udf.rs b/datafusion-examples/examples/udf/advanced_udf.rs similarity index 99% rename from datafusion-examples/examples/advanced_udf.rs rename to datafusion-examples/examples/udf/advanced_udf.rs index 56ae599efa11..bb5a68e90cbb 100644 --- a/datafusion-examples/examples/advanced_udf.rs +++ b/datafusion-examples/examples/udf/advanced_udf.rs @@ -245,10 +245,35 @@ fn maybe_pow_in_place(base: f64, exp_array: ArrayRef) -> Result { } } +/// create local execution context with an in-memory table: +/// +/// ```text +/// +-----+-----+ +/// | a | b | +/// +-----+-----+ +/// | 2.1 | 1.0 | +/// | 3.1 | 2.0 | +/// | 4.1 | 3.0 | +/// | 5.1 | 4.0 | +/// +-----+-----+ +/// ``` +fn create_context() -> Result { + // define data. + let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])); + let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])); + let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?; + + // declare a new context. In Spark API, this corresponds to a new SparkSession + let ctx = SessionContext::new(); + + // declare a table in memory. In Spark API, this corresponds to createDataFrame(...). + ctx.register_batch("t", batch)?; + Ok(ctx) +} + /// In this example we register `PowUdf` as a user defined function /// and invoke it via the DataFrame API and SQL -#[tokio::main] -async fn main() -> Result<()> { +pub async fn advanced_udf() -> Result<()> { let ctx = create_context()?; // create the UDF @@ -295,29 +320,3 @@ async fn main() -> Result<()> { Ok(()) } - -/// create local execution context with an in-memory table: -/// -/// ```text -/// +-----+-----+ -/// | a | b | -/// +-----+-----+ -/// | 2.1 | 1.0 | -/// | 3.1 | 2.0 | -/// | 4.1 | 3.0 | -/// | 5.1 | 4.0 | -/// +-----+-----+ -/// ``` -fn create_context() -> Result { - // define data. - let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])); - let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])); - let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?; - - // declare a new context. In Spark API, this corresponds to a new SparkSession - let ctx = SessionContext::new(); - - // declare a table in memory. In Spark API, this corresponds to createDataFrame(...). - ctx.register_batch("t", batch)?; - Ok(ctx) -} diff --git a/datafusion-examples/examples/advanced_udwf.rs b/datafusion-examples/examples/udf/advanced_udwf.rs similarity index 98% rename from datafusion-examples/examples/advanced_udwf.rs rename to datafusion-examples/examples/udf/advanced_udwf.rs index ba4c377fd676..86f215e019c7 100644 --- a/datafusion-examples/examples/advanced_udwf.rs +++ b/datafusion-examples/examples/udf/advanced_udwf.rs @@ -236,8 +236,9 @@ async fn create_context() -> Result { Ok(ctx) } -#[tokio::main] -async fn main() -> Result<()> { +/// In this example we register `SmoothItUdf` as user defined window function +/// and invoke it via the DataFrame API and SQL +pub async fn advanced_udwf() -> Result<()> { let ctx = create_context().await?; let smooth_it = WindowUDF::from(SmoothItUdf::new()); ctx.register_udwf(smooth_it.clone()); diff --git a/datafusion-examples/examples/async_udf.rs b/datafusion-examples/examples/udf/async_udf.rs similarity index 98% rename from datafusion-examples/examples/async_udf.rs rename to datafusion-examples/examples/udf/async_udf.rs index b52ec68ea442..475775a599f6 100644 --- a/datafusion-examples/examples/async_udf.rs +++ b/datafusion-examples/examples/udf/async_udf.rs @@ -38,8 +38,9 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use std::any::Any; use std::sync::Arc; -#[tokio::main] -async fn main() -> Result<()> { +/// In this example we register `AskLLM` as an asynchronous user defined function +/// and invoke it via the DataFrame API and SQL +pub async fn async_udf() -> Result<()> { // Use a hard coded parallelism level of 4 so the explain plan // is consistent across machines. let config = SessionConfig::new().with_target_partitions(4); diff --git a/datafusion-examples/examples/udf/main.rs b/datafusion-examples/examples/udf/main.rs new file mode 100644 index 000000000000..ba36dbb15c58 --- /dev/null +++ b/datafusion-examples/examples/udf/main.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # User-Defined Functions Examples +//! +//! These examples demonstrate user-defined functions in DataFusion. +//! +//! Each subcommand runs a corresponding example: +//! - `adv_udaf` — user defined aggregate function example +//! - `adv_udf` — user defined scalar function example +//! - `adv_udwf` — user defined window function example +//! - `async_udf` — asynchronous user defined function example +//! - `udaf` — simple user defined aggregate function example +//! - `udf` — simple user defined scalar function example +//! - `udtf` — simple user defined table function example +//! - `udwf` — simple user defined window function example + +mod advanced_udaf; +mod advanced_udf; +mod advanced_udwf; +mod async_udf; +mod simple_udaf; +mod simple_udf; +mod simple_udtf; +mod simple_udwf; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + AdvUdaf, + AdvUdf, + AdvUdwf, + AsyncUdf, + Udf, + Udaf, + Udwf, + Udtf, +} + +impl AsRef for ExampleKind { + fn as_ref(&self) -> &str { + match self { + Self::AdvUdaf => "adv_udaf", + Self::AdvUdf => "adv_udf", + Self::AdvUdwf => "adv_udwf", + Self::AsyncUdf => "async_udf", + Self::Udf => "udf", + Self::Udaf => "udaf", + Self::Udwf => "udwt", + Self::Udtf => "udtf", + } + } +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "adv_udaf" => Ok(Self::AdvUdaf), + "adv_udf" => Ok(Self::AdvUdf), + "adv_udwf" => Ok(Self::AdvUdwf), + "async_udf" => Ok(Self::AsyncUdf), + "udaf" => Ok(Self::Udaf), + "udf" => Ok(Self::Udf), + "udtf" => Ok(Self::Udtf), + "udwf" => Ok(Self::Udwf), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +impl ExampleKind { + const ALL: [Self; 8] = [ + Self::AdvUdaf, + Self::AdvUdf, + Self::AdvUdwf, + Self::AsyncUdf, + Self::Udaf, + Self::Udf, + Self::Udtf, + Self::Udwf, + ]; + + const EXAMPLE_NAME: &str = "udf"; + + fn variants() -> Vec<&'static str> { + Self::ALL.iter().map(|x| x.as_ref()).collect() + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let usage = format!( + "Usage: cargo run --example {} -- [{}]", + ExampleKind::EXAMPLE_NAME, + ExampleKind::variants().join("|") + ); + + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("{usage}"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::AdvUdaf => advanced_udaf::advanced_udaf().await?, + ExampleKind::AdvUdf => advanced_udf::advanced_udf().await?, + ExampleKind::AdvUdwf => advanced_udwf::advanced_udwf().await?, + ExampleKind::AsyncUdf => async_udf::async_udf().await?, + ExampleKind::Udaf => simple_udaf::simple_udaf().await?, + ExampleKind::Udf => simple_udf::simple_udf().await?, + ExampleKind::Udtf => simple_udtf::simple_udtf().await?, + ExampleKind::Udwf => simple_udwf::simple_udwf().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/simple_udaf.rs b/datafusion-examples/examples/udf/simple_udaf.rs similarity index 97% rename from datafusion-examples/examples/simple_udaf.rs rename to datafusion-examples/examples/udf/simple_udaf.rs index 82bde7c034a5..e9f905e72099 100644 --- a/datafusion-examples/examples/simple_udaf.rs +++ b/datafusion-examples/examples/udf/simple_udaf.rs @@ -135,8 +135,9 @@ impl Accumulator for GeometricMean { } } -#[tokio::main] -async fn main() -> Result<()> { +/// In this example we register `GeometricMean` +/// as user defined aggregate function and invoke it via the DataFrame API and SQL +pub async fn simple_udaf() -> Result<()> { let ctx = create_context()?; // here is where we define the UDAF. We also declare its signature: diff --git a/datafusion-examples/examples/simple_udf.rs b/datafusion-examples/examples/udf/simple_udf.rs similarity index 99% rename from datafusion-examples/examples/simple_udf.rs rename to datafusion-examples/examples/udf/simple_udf.rs index 5612e0939f70..7d4f3588e313 100644 --- a/datafusion-examples/examples/simple_udf.rs +++ b/datafusion-examples/examples/udf/simple_udf.rs @@ -57,8 +57,7 @@ fn create_context() -> Result { } /// In this example we will declare a single-type, single return type UDF that exponentiates f64, a^b -#[tokio::main] -async fn main() -> Result<()> { +pub async fn simple_udf() -> Result<()> { let ctx = create_context()?; // First, declare the actual implementation of the calculation diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/udf/simple_udtf.rs similarity index 99% rename from datafusion-examples/examples/simple_udtf.rs rename to datafusion-examples/examples/udf/simple_udtf.rs index b65ffb8d7174..a03b157134ae 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/udf/simple_udtf.rs @@ -42,8 +42,7 @@ use std::sync::Arc; // 3. Register the function using [`SessionContext::register_udtf`] /// This example demonstrates how to register a TableFunction -#[tokio::main] -async fn main() -> Result<()> { +pub async fn simple_udtf() -> Result<()> { // create local execution context let ctx = SessionContext::new(); diff --git a/datafusion-examples/examples/simple_udwf.rs b/datafusion-examples/examples/udf/simple_udwf.rs similarity index 99% rename from datafusion-examples/examples/simple_udwf.rs rename to datafusion-examples/examples/udf/simple_udwf.rs index 1736ff00bd70..2cf1df8d8ed8 100644 --- a/datafusion-examples/examples/simple_udwf.rs +++ b/datafusion-examples/examples/udf/simple_udwf.rs @@ -42,8 +42,7 @@ async fn create_context() -> Result { } /// In this example we will declare a user defined window function that computes a moving average and then run it using SQL -#[tokio::main] -async fn main() -> Result<()> { +pub async fn simple_udwf() -> Result<()> { let ctx = create_context().await?; // here is where we define the UDWF. We also declare its signature: From 5cc9f8e266aee745c846ad70c9a9efafb82a49ea Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Wed, 5 Nov 2025 12:30:39 +0300 Subject: [PATCH 2/2] Update README.md --- datafusion-examples/README.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index f6783a643f76..f87f62e170af 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -46,11 +46,11 @@ cargo run --example dataframe ## Single Process -- [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF) -- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF) -- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF) +- [`examples/udf/advanced_udaf.rs`](examples/udf/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF) +- [`examples/udf/advanced_udf.rs`](examples/udf/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF) +- [`examples/udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF) - [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files -- [`async_udf.rs`](examples/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF) +- [`examples/udf/async_udf.rs`](examples/udf/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF) - [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control) - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog - [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization @@ -83,9 +83,10 @@ cargo run --example dataframe - [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP - [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions - [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network) -- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF) -- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF) -- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF) +- [`examples/udf/simple_udaf.rs`](examples/udf/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF) +- [`examples/udf/simple_udf.rs`](examples/udf/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF) +- [`examples/udf/simple_udtf.rs`](examples/udf/simple_udtf.rs): Define and invoke a User Defined Table Function (UDTF) +- [`examples/udf/simple_udfw.rs`](examples/udf/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF) - [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures - [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings - [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`