diff --git a/Cargo.lock b/Cargo.lock index 120dc29db223..d2fb2a31070e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2207,7 +2207,6 @@ dependencies = [ "bytes", "dashmap", "datafusion", - "datafusion-ffi", "datafusion-physical-expr-adapter", "datafusion-proto", "env_logger", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index bb0525e57753..ac21063bb865 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -32,30 +32,6 @@ rust-version = { workspace = true } [lints] workspace = true -[[example]] -name = "flight_sql_server" -path = "examples/flight/flight_sql_server.rs" - -[[example]] -name = "flight_server" -path = "examples/flight/flight_server.rs" - -[[example]] -name = "flight_client" -path = "examples/flight/flight_client.rs" - -[[example]] -name = "dataframe_to_s3" -path = "examples/external_dependency/dataframe-to-s3.rs" - -[[example]] -name = "query_aws_s3" -path = "examples/external_dependency/query-aws-s3.rs" - -[[example]] -name = "custom_file_casts" -path = "examples/custom_file_casts.rs" - [dev-dependencies] arrow = { workspace = true } # arrow_schema is required for record_batch! macro :sad: @@ -67,7 +43,6 @@ dashmap = { workspace = true } # note only use main datafusion crate for examples base64 = "0.22.1" datafusion = { workspace = true, default-features = true, features = ["parquet_encryption"] } -datafusion-ffi = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } datafusion-proto = { workspace = true } env_logger = { workspace = true } diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index f1bcbcce8200..ba2c28972ec1 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -40,58 +40,61 @@ git submodule update --init cd datafusion-examples/examples # Run the `dataframe` example: -# ... use the equivalent for other examples -cargo run --example dataframe +# ... use the equivalent for other examples with corresponding subcommand +cargo run --example dataframe -- dataframe ``` ## Single Process -- [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF) -- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF) -- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF) -- [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files -- [`async_udf.rs`](examples/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF) -- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control) -- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog -- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization -- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file -- [`csv_json_opener.rs`](examples/csv_json_opener.rs): Use low level `FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es -- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider) -- [`custom_file_casts.rs`](examples/custom_file_casts.rs): Implement custom casting rules to adapt file schemas -- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format -- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 -- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file. -- [`default_column_values.rs`](examples/default_column_values.rs): Implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter -- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs -- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s -- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. -- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients -- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros -- [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages -- [`memory_pool_execution_plan.rs`](examples/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling -- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates -- [`parquet_embedded_index.rs`](examples/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries -- [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion -- [`parquet_encrypted_with_kms.rs`](examples/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory -- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries -- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution -- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. -- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan` -- [`planner_api.rs`](examples/planner_api.rs) APIs to manipulate logical and physical plans -- [`pruning.rs`](examples/pruning.rs): Use pruning to rule out files based on statistics -- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3 -- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP -- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions -- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network) -- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF) -- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF) -- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF) -- [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures -- [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings -- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser` -- [`sql_query.rs`](examples/memtable.rs): Query data using SQL (in memory `RecordBatches`, local Parquet files) -- [`date_time_function.rs`](examples/date_time_function.rs): Examples of date-time related functions and queries. +- [`advanced_udaf.rs`](examples/advanced_udaf/udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF) +- [`advanced_udf.rs`](examples/advanced_udaf/udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF) +- [`advanced_udwf.rs`](examples/advanced_udaf/udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF) +- [`advanced_parquet_index.rs`](examples/parquet/advanced_index.rs): Create a detailed secondary index that covers the contents of several parquet files +- [`async_udf.rs`](examples/advanced_udaf/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF) +- [`analyzer_rule.rs`](examples/query_planning/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control) +- [`catalog.rs`](examples/data_io/catalog.rs): Register the table into a custom catalog +- [`composed_extension_codec`](examples/proto/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization +- [`csv_sql_streaming.rs`](examples/custom_data_source/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file +- [`csv_json_opener.rs`](examples/custom_data_source/csv_json_opener.rs): Use low level `FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es +- [`custom_datasource.rs`](examples/custom_data_source/custom_datasource.rs): Run queries against a custom datasource (TableProvider) +- [`custom_file_casts.rs`](examples/custom_data_source/custom_file_casts.rs): Implement custom casting rules to adapt file schemas +- [`custom_file_format.rs`](examples/custom_data_source/custom_file_format.rs): Write data to a custom file format +- [`dataframe_to_s3.rs`](examples/external_dependency/dataframe_to_s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 +- [`dataframe.rs`](examples/dataframe/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file. +- [`date_time_function.rs`](examples/date_time_function.rs): Examples of date-time related functions and queries +- [`default_column_values.rs`](examples/dataframe/default_column_values.rs): Implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter +- [`deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs +- [`expr_api.rs`](examples/query_planning/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s +- [`file_stream_provider.rs`](examples/custom_data_source/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. +- [`flight_sql_server.rs`](examples/flight/sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients +- [`function_factory.rs`](examples/builtin_functions/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros +- [`json_shredding.rs`](examples/data_io/json_shredding.rs): Shows how to implement custom filter rewriting for JSON shredding +- [`memory_pool_tracking.rs`](examples/execution_monitoring/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages +- [`memory_pool_execution_plan.rs`](examples/execution_monitoring/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling +- [`optimizer_rule.rs`](examples/query_planning/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates +- [`parquet_embedded_index.rs`](examples/parquet/embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries +- [`parquet_encrypted.rs`](examples/parquet/encrypted.rs): Read and write encrypted Parquet files using DataFusion +- [`parquet_encrypted_with_kms.rs`](examples/parquet/encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory +- [`parquet_index.rs`](examples/parquet/index.rs): Create an secondary index over several Parquet files and use it to speed up queries +- [`parquet_exec_visitor.rs`](examples/parquet/exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution +- [`parse_sql_expr.rs`](examples/query_planning/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. +- [`plan_to_sql.rs`](examples/query_planning/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan` +- [`planner_api.rs`](examples/query_planning/planner_api.rs) APIs to manipulate logical and physical plans +- [`pruning.rs`](examples/query_planning/pruning.rs): Use pruning to rule out files based on statistics +- [`query_aws_s3.rs`](examples/external_dependency/query_aws_s3.rs): Configure `object_store` and run a query against files stored in AWS S3 +- [`query_http_csv.rs`](examples/data_io/query_http_csv.rs): Configure `object_store` and run a query against files via HTTP +- [`regexp.rs`](examples/builtin_functions/regexp.rs): Examples of using regular expression functions +- [`remote_catalog.rs`](examples/data_io/remote_catalog.rs): Examples of interfacing with a remote catalog (e.g. over a network) +- [`simple_udaf.rs`](examples/simple_udf/udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF) +- [`simple_udf.rs`](examples/simple_udf/udf.rs): Define and invoke a User Defined Scalar Function (UDF) +- [`simple_udfw.rs`](examples/simple_udf/udwf.rs): Define and invoke a User Defined Window Function (UDWF) +- [`simple_udtf.rs`](examples/simple_udf/udtf.rs): Define and invoke a User Defined Table Function (UDTF) +- [`sql_analysis.rs`](examples/sql_ops/analysis.rs): Analyse SQL queries with DataFusion structures +- [`sql_frontend.rs`](examples/sql_ops/frontend.rs): Create LogicalPlans (only) from sql strings +- [`sql_dialect.rs`](examples/sql_ops/dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser` +- [`sql_query.rs`](examples/sql_ops/query.rs): Query data using SQL (in memory `RecordBatches`, local Parquet files) +- [`tracing.rs`](examples/execution_monitoring/tracing.rs): Demonstrates the tracing injection feature for the DataFusion runtime ## Distributed -- [`flight_client.rs`](examples/flight/flight_client.rs) and [`flight_server.rs`](examples/flight/flight_server.rs): Run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol. +- [`flight_client.rs`](examples/flight/client.rs) and [`flight_server.rs`](examples/flight/server.rs): Run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol. diff --git a/datafusion-examples/examples/async_udf.rs b/datafusion-examples/examples/advanced_udf/async_udf.rs similarity index 98% rename from datafusion-examples/examples/async_udf.rs rename to datafusion-examples/examples/advanced_udf/async_udf.rs index b52ec68ea442..80fe9c7bb2f6 100644 --- a/datafusion-examples/examples/async_udf.rs +++ b/datafusion-examples/examples/advanced_udf/async_udf.rs @@ -38,79 +38,6 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use std::any::Any; use std::sync::Arc; -#[tokio::main] -async fn main() -> Result<()> { - // Use a hard coded parallelism level of 4 so the explain plan - // is consistent across machines. - let config = SessionConfig::new().with_target_partitions(4); - let ctx = - SessionContext::from(SessionStateBuilder::new().with_config(config).build()); - - // Similarly to regular UDFs, you create an AsyncScalarUDF by implementing - // `AsyncScalarUDFImpl` and creating an instance of `AsyncScalarUDF`. - let async_equal = AskLLM::new(); - let udf = AsyncScalarUDF::new(Arc::new(async_equal)); - - // Async UDFs are registered with the SessionContext, using the same - // `register_udf` method as regular UDFs. - ctx.register_udf(udf.into_scalar_udf()); - - // Create a table named 'animal' with some sample data - ctx.register_batch("animal", animal()?)?; - - // You can use the async UDF as normal in SQL queries - // - // Note: Async UDFs can currently be used in the select list and filter conditions. - let results = ctx - .sql("select * from animal a where ask_llm(a.name, 'Is this animal furry?')") - .await? - .collect() - .await?; - - assert_batches_eq!( - [ - "+----+------+", - "| id | name |", - "+----+------+", - "| 1 | cat |", - "| 2 | dog |", - "+----+------+", - ], - &results - ); - - // While the interface is the same for both normal and async UDFs, you can - // use `EXPLAIN` output to see that the async UDF uses a special - // `AsyncFuncExec` node in the physical plan: - let results = ctx - .sql("explain select * from animal a where ask_llm(a.name, 'Is this animal furry?')") - .await? - .collect() - .await?; - - assert_batches_eq!( - [ - "+---------------+--------------------------------------------------------------------------------------------------------------------------------+", - "| plan_type | plan |", - "+---------------+--------------------------------------------------------------------------------------------------------------------------------+", - "| logical_plan | SubqueryAlias: a |", - "| | Filter: ask_llm(CAST(animal.name AS Utf8View), Utf8View(\"Is this animal furry?\")) |", - "| | TableScan: animal projection=[id, name] |", - "| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |", - "| | FilterExec: __async_fn_0@2, projection=[id@0, name@1] |", - "| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |", - "| | AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=ask_llm(CAST(name@1 AS Utf8View), Is this animal furry?))] |", - "| | CoalesceBatchesExec: target_batch_size=8192 |", - "| | DataSourceExec: partitions=1, partition_sizes=[1] |", - "| | |", - "+---------------+--------------------------------------------------------------------------------------------------------------------------------+", - ], - &results - ); - - Ok(()) -} - /// Returns a sample `RecordBatch` representing an "animal" table with two columns: fn animal() -> Result { let schema = Arc::new(Schema::new(vec![ @@ -235,3 +162,77 @@ impl AsyncScalarUDFImpl for AskLLM { Ok(ColumnarValue::from(Arc::new(result_array) as ArrayRef)) } } + +/// In this example we register `AskLLM` as an asynchronous user defined function +/// and invoke it via the DataFrame API and SQL +pub async fn async_udf() -> Result<()> { + // Use a hard coded parallelism level of 4 so the explain plan + // is consistent across machines. + let config = SessionConfig::new().with_target_partitions(4); + let ctx = + SessionContext::from(SessionStateBuilder::new().with_config(config).build()); + + // Similarly to regular UDFs, you create an AsyncScalarUDF by implementing + // `AsyncScalarUDFImpl` and creating an instance of `AsyncScalarUDF`. + let async_equal = AskLLM::new(); + let udf = AsyncScalarUDF::new(Arc::new(async_equal)); + + // Async UDFs are registered with the SessionContext, using the same + // `register_udf` method as regular UDFs. + ctx.register_udf(udf.into_scalar_udf()); + + // Create a table named 'animal' with some sample data + ctx.register_batch("animal", animal()?)?; + + // You can use the async UDF as normal in SQL queries + // + // Note: Async UDFs can currently be used in the select list and filter conditions. + let results = ctx + .sql("select * from animal a where ask_llm(a.name, 'Is this animal furry?')") + .await? + .collect() + .await?; + + assert_batches_eq!( + [ + "+----+------+", + "| id | name |", + "+----+------+", + "| 1 | cat |", + "| 2 | dog |", + "+----+------+", + ], + &results + ); + + // While the interface is the same for both normal and async UDFs, you can + // use `EXPLAIN` output to see that the async UDF uses a special + // `AsyncFuncExec` node in the physical plan: + let results = ctx + .sql("explain select * from animal a where ask_llm(a.name, 'Is this animal furry?')") + .await? + .collect() + .await?; + + assert_batches_eq!( + [ + "+---------------+--------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+--------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | SubqueryAlias: a |", + "| | Filter: ask_llm(CAST(animal.name AS Utf8View), Utf8View(\"Is this animal furry?\")) |", + "| | TableScan: animal projection=[id, name] |", + "| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |", + "| | FilterExec: __async_fn_0@2, projection=[id@0, name@1] |", + "| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |", + "| | AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=ask_llm(CAST(name@1 AS Utf8View), Is this animal furry?))] |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | DataSourceExec: partitions=1, partition_sizes=[1] |", + "| | |", + "+---------------+--------------------------------------------------------------------------------------------------------------------------------+", + ], + &results + ); + + Ok(()) +} diff --git a/datafusion-examples/examples/advanced_udf/main.rs b/datafusion-examples/examples/advanced_udf/main.rs new file mode 100644 index 000000000000..55ce0cee7c8f --- /dev/null +++ b/datafusion-examples/examples/advanced_udf/main.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Advanced UDF/UDAF/UDWF/Asynchronous UDF Examples +//! +//! This example demonstrates advanced user-defined functions in DataFusion. +//! +//! ## Usage +//! ```bash +//! cargo run --example advanced_udf -- [udf|udaf|udwf|async_udf] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `udf` — user defined scalar function example +//! - `udaf` — user defined aggregate function example +//! - `udwf` — user defined window function example +//! - `async_udf` — asynchronous user defined function example + +mod async_udf; +mod udaf; +mod udf; +mod udwf; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + Udf, + Udaf, + Udwf, + AsyncUdf, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "udf" => Ok(Self::Udf), + "udaf" => Ok(Self::Udaf), + "udwf" => Ok(Self::Udwf), + "async_udf" => Ok(Self::AsyncUdf), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example advanced_udf -- [udf|udaf|udwf|async_udf]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::Udf => udf::advanced_udf().await?, + ExampleKind::Udaf => udaf::advanced_udaf().await?, + ExampleKind::Udwf => udwf::advanced_udwf().await?, + ExampleKind::AsyncUdf => async_udf::async_udf().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/advanced_udaf.rs b/datafusion-examples/examples/advanced_udf/udaf.rs similarity index 98% rename from datafusion-examples/examples/advanced_udaf.rs rename to datafusion-examples/examples/advanced_udf/udaf.rs index 89f0a470e32e..c3c7c3bc5f4f 100644 --- a/datafusion-examples/examples/advanced_udaf.rs +++ b/datafusion-examples/examples/advanced_udf/udaf.rs @@ -469,8 +469,9 @@ fn create_context() -> Result { Ok(ctx) } -#[tokio::main] -async fn main() -> Result<()> { +/// In this example we register `GeoMeanUdaf` and `SimplifiedGeoMeanUdaf` +/// as user defined aggregate functions and invoke them via the DataFrame API and SQL +pub async fn advanced_udaf() -> Result<()> { let ctx = create_context()?; let geo_mean_udf = AggregateUDF::from(GeoMeanUdaf::new()); @@ -506,6 +507,5 @@ async fn main() -> Result<()> { assert!((result.value(0) - 8.0).abs() < f64::EPSILON); println!("The geometric mean of [2,4,8,64] is {}", result.value(0)); } - Ok(()) } diff --git a/datafusion-examples/examples/advanced_udf.rs b/datafusion-examples/examples/advanced_udf/udf.rs similarity index 99% rename from datafusion-examples/examples/advanced_udf.rs rename to datafusion-examples/examples/advanced_udf/udf.rs index 56ae599efa11..919c1ad9965e 100644 --- a/datafusion-examples/examples/advanced_udf.rs +++ b/datafusion-examples/examples/advanced_udf/udf.rs @@ -245,12 +245,36 @@ fn maybe_pow_in_place(base: f64, exp_array: ArrayRef) -> Result { } } +/// create local execution context with an in-memory table: +/// +/// ```text +/// +-----+-----+ +/// | a | b | +/// +-----+-----+ +/// | 2.1 | 1.0 | +/// | 3.1 | 2.0 | +/// | 4.1 | 3.0 | +/// | 5.1 | 4.0 | +/// +-----+-----+ +/// ``` +fn create_context() -> Result { + // define data. + let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])); + let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])); + let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?; + + // declare a new context. In Spark API, this corresponds to a new SparkSession + let ctx = SessionContext::new(); + + // declare a table in memory. In Spark API, this corresponds to createDataFrame(...). + ctx.register_batch("t", batch)?; + Ok(ctx) +} + /// In this example we register `PowUdf` as a user defined function /// and invoke it via the DataFrame API and SQL -#[tokio::main] -async fn main() -> Result<()> { +pub async fn advanced_udf() -> Result<()> { let ctx = create_context()?; - // create the UDF let pow = ScalarUDF::from(PowUdf::new()); @@ -292,32 +316,5 @@ async fn main() -> Result<()> { err.to_string(), "Execution error: Could not reuse array for maybe_pow_in_place" ); - Ok(()) } - -/// create local execution context with an in-memory table: -/// -/// ```text -/// +-----+-----+ -/// | a | b | -/// +-----+-----+ -/// | 2.1 | 1.0 | -/// | 3.1 | 2.0 | -/// | 4.1 | 3.0 | -/// | 5.1 | 4.0 | -/// +-----+-----+ -/// ``` -fn create_context() -> Result { - // define data. - let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])); - let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])); - let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?; - - // declare a new context. In Spark API, this corresponds to a new SparkSession - let ctx = SessionContext::new(); - - // declare a table in memory. In Spark API, this corresponds to createDataFrame(...). - ctx.register_batch("t", batch)?; - Ok(ctx) -} diff --git a/datafusion-examples/examples/advanced_udwf.rs b/datafusion-examples/examples/advanced_udf/udwf.rs similarity index 98% rename from datafusion-examples/examples/advanced_udwf.rs rename to datafusion-examples/examples/advanced_udf/udwf.rs index ba4c377fd676..86f215e019c7 100644 --- a/datafusion-examples/examples/advanced_udwf.rs +++ b/datafusion-examples/examples/advanced_udf/udwf.rs @@ -236,8 +236,9 @@ async fn create_context() -> Result { Ok(ctx) } -#[tokio::main] -async fn main() -> Result<()> { +/// In this example we register `SmoothItUdf` as user defined window function +/// and invoke it via the DataFrame API and SQL +pub async fn advanced_udwf() -> Result<()> { let ctx = create_context().await?; let smooth_it = WindowUDF::from(SmoothItUdf::new()); ctx.register_udwf(smooth_it.clone()); diff --git a/datafusion-examples/examples/date_time_functions.rs b/datafusion-examples/examples/builtin_functions/date_time.rs similarity index 99% rename from datafusion-examples/examples/date_time_functions.rs rename to datafusion-examples/examples/builtin_functions/date_time.rs index 2628319ae31f..14bb9189f3e8 100644 --- a/datafusion-examples/examples/date_time_functions.rs +++ b/datafusion-examples/examples/builtin_functions/date_time.rs @@ -26,8 +26,7 @@ use datafusion::common::assert_contains; use datafusion::error::Result; use datafusion::prelude::*; -#[tokio::main] -async fn main() -> Result<()> { +pub async fn date_time() -> Result<()> { query_make_date().await?; query_to_date().await?; query_to_timestamp().await?; diff --git a/datafusion-examples/examples/function_factory.rs b/datafusion-examples/examples/builtin_functions/function_factory.rs similarity index 99% rename from datafusion-examples/examples/function_factory.rs rename to datafusion-examples/examples/builtin_functions/function_factory.rs index d4312ae59409..b290ade884a4 100644 --- a/datafusion-examples/examples/function_factory.rs +++ b/datafusion-examples/examples/builtin_functions/function_factory.rs @@ -42,8 +42,7 @@ use std::sync::Arc; /// /// This example is rather simple and does not cover all cases required for a /// real implementation. -#[tokio::main] -async fn main() -> Result<()> { +pub async fn function_factory() -> Result<()> { // First we must configure the SessionContext with our function factory let ctx = SessionContext::new() // register custom function factory @@ -181,6 +180,7 @@ impl ScalarFunctionWrapper { Ok(result.data) } + // Finds placeholder identifier such as `$X` format where X >= 1 fn parse_placeholder_identifier(placeholder: &str) -> Result { if let Some(value) = placeholder.strip_prefix('$') { diff --git a/datafusion-examples/examples/builtin_functions/main.rs b/datafusion-examples/examples/builtin_functions/main.rs new file mode 100644 index 000000000000..135277eec84c --- /dev/null +++ b/datafusion-examples/examples/builtin_functions/main.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # These are miscellaneous function-related examples +//! +//! This example demonstrates miscellaneous function-related features. +//! +//! ## Usage +//! ```bash +//! cargo run --example builtin_functions -- [date_time|function_factory|regexp] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `date_time` — examples of date-time related functions and queries +//! - `function_factory` — register `CREATE FUNCTION` handler to implement SQL macros +//! - `regexp` — examples of using regular expression functions + +mod date_time; +mod function_factory; +mod regexp; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + DateTime, + FunctionFactory, + Regexp, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "date_time" => Ok(Self::DateTime), + "function_factory" => Ok(Self::FunctionFactory), + "regexp" => Ok(Self::Regexp), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example builtin_functions -- [date_time|function_factory|regexp]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::DateTime => date_time::date_time().await?, + ExampleKind::FunctionFactory => function_factory::function_factory().await?, + ExampleKind::Regexp => regexp::regexp().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/regexp.rs b/datafusion-examples/examples/builtin_functions/regexp.rs similarity index 99% rename from datafusion-examples/examples/regexp.rs rename to datafusion-examples/examples/builtin_functions/regexp.rs index 12d115b9b502..5d04418a5566 100644 --- a/datafusion-examples/examples/regexp.rs +++ b/datafusion-examples/examples/builtin_functions/regexp.rs @@ -28,8 +28,7 @@ use datafusion::prelude::*; /// /// Supported flags can be found at /// https://docs.rs/regex/latest/regex/#grouping-and-flags -#[tokio::main] -async fn main() -> Result<()> { +pub async fn regexp() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv( "examples", diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs similarity index 98% rename from datafusion-examples/examples/csv_json_opener.rs rename to datafusion-examples/examples/custom_data_source/csv_json_opener.rs index 8abed90238d4..efb94d209332 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs @@ -38,9 +38,8 @@ use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; /// This example demonstrates using the low level [`FileStream`] / [`FileOpener`] APIs to directly /// read data from (CSV/JSON) into Arrow RecordBatches. /// -/// If you want to query data in CSV or JSON files, see the [`dataframe.rs`] and [`sql_query.rs`] examples -#[tokio::main] -async fn main() -> Result<()> { +/// If you want to query data in CSV or JSON files, see the [`dataframe/dataframe.rs`] and [`sql/query.rs`] examples +pub async fn csv_json_opener() -> Result<()> { csv_opener().await?; json_opener().await?; Ok(()) diff --git a/datafusion-examples/examples/csv_sql_streaming.rs b/datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs similarity index 98% rename from datafusion-examples/examples/csv_sql_streaming.rs rename to datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs index 99264bbcb486..aca63c4f35c2 100644 --- a/datafusion-examples/examples/csv_sql_streaming.rs +++ b/datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs @@ -21,8 +21,7 @@ use datafusion::prelude::*; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and /// fetching results with streaming aggregation and streaming window -#[tokio::main] -async fn main() -> Result<()> { +pub async fn csv_sql_streaming() -> Result<()> { // create local execution context let ctx = SessionContext::new(); diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs similarity index 99% rename from datafusion-examples/examples/custom_datasource.rs rename to datafusion-examples/examples/custom_data_source/custom_datasource.rs index bc865fac5a33..298ad71edcba 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -42,8 +42,7 @@ use datafusion::catalog::Session; use tokio::time::timeout; /// This example demonstrates executing a simple query against a custom datasource -#[tokio::main] -async fn main() -> Result<()> { +pub async fn custom_data_source() -> Result<()> { // create our custom datasource and adding some users let db = CustomDataSource::default(); db.populate_users(); diff --git a/datafusion-examples/examples/custom_file_casts.rs b/datafusion-examples/examples/custom_data_source/custom_file_casts.rs similarity index 99% rename from datafusion-examples/examples/custom_file_casts.rs rename to datafusion-examples/examples/custom_data_source/custom_file_casts.rs index 4d97ecd91dc6..31ec2845c611 100644 --- a/datafusion-examples/examples/custom_file_casts.rs +++ b/datafusion-examples/examples/custom_data_source/custom_file_casts.rs @@ -44,9 +44,7 @@ use object_store::{ObjectStore, PutPayload}; // This example enforces that casts must be strictly widening: if the file type is Int64 and the table type is Int32, it will error // before even reading the data. // Without this custom cast rule DataFusion would happily do the narrowing cast, potentially erroring only if it found a row with data it could not cast. - -#[tokio::main] -async fn main() -> Result<()> { +pub async fn custom_file_casts() -> Result<()> { println!("=== Creating example data ==="); // Create a logical / table schema with an Int32 column diff --git a/datafusion-examples/examples/custom_file_format.rs b/datafusion-examples/examples/custom_data_source/custom_file_format.rs similarity index 98% rename from datafusion-examples/examples/custom_file_format.rs rename to datafusion-examples/examples/custom_data_source/custom_file_format.rs index 67fe642fd46e..7f6ba47f19e4 100644 --- a/datafusion-examples/examples/custom_file_format.rs +++ b/datafusion-examples/examples/custom_data_source/custom_file_format.rs @@ -180,8 +180,27 @@ impl GetExt for TSVFileFactory { } } -#[tokio::main] -async fn main() -> Result<()> { +// create a simple mem table +fn create_mem_table() -> Arc { + let fields = vec![ + Field::new("id", DataType::UInt8, false), + Field::new("data", DataType::Utf8, false), + ]; + let schema = Arc::new(Schema::new(fields)); + + let partitions = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt8Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["foo", "bar"])), + ], + ) + .unwrap(); + + Arc::new(MemTable::try_new(schema, vec![vec![partitions]]).unwrap()) +} + +pub async fn custom_file_format() -> Result<()> { // Create a new context with the default configuration let mut state = SessionStateBuilder::new().with_default_features().build(); @@ -198,14 +217,14 @@ async fn main() -> Result<()> { let temp_dir = tempdir().unwrap(); let table_save_path = temp_dir.path().join("mem_table.tsv"); - let d = ctx + let df = ctx .sql(&format!( "COPY mem_table TO '{}' STORED AS TSV;", table_save_path.display(), )) .await?; - let results = d.collect().await?; + let results = df.collect().await?; println!( "Number of inserted rows: {:?}", (results[0] @@ -217,23 +236,3 @@ async fn main() -> Result<()> { Ok(()) } - -// create a simple mem table -fn create_mem_table() -> Arc { - let fields = vec![ - Field::new("id", DataType::UInt8, false), - Field::new("data", DataType::Utf8, false), - ]; - let schema = Arc::new(Schema::new(fields)); - - let partitions = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(UInt8Array::from(vec![1, 2])), - Arc::new(StringArray::from(vec!["foo", "bar"])), - ], - ) - .unwrap(); - - Arc::new(MemTable::try_new(schema, vec![vec![partitions]]).unwrap()) -} diff --git a/datafusion-examples/examples/file_stream_provider.rs b/datafusion-examples/examples/custom_data_source/file_stream_provider.rs similarity index 99% rename from datafusion-examples/examples/file_stream_provider.rs rename to datafusion-examples/examples/custom_data_source/file_stream_provider.rs index e6c59d57e98d..72114af0b7fc 100644 --- a/datafusion-examples/examples/file_stream_provider.rs +++ b/datafusion-examples/examples/custom_data_source/file_stream_provider.rs @@ -187,8 +187,7 @@ mod non_windows { } } -#[tokio::main] -async fn main() -> datafusion::error::Result<()> { +pub async fn file_stream_provider() -> datafusion::error::Result<()> { #[cfg(target_os = "windows")] { println!("file_stream_provider example does not work on windows"); diff --git a/datafusion-examples/examples/custom_data_source/main.rs b/datafusion-examples/examples/custom_data_source/main.rs new file mode 100644 index 000000000000..3a9c3fad4007 --- /dev/null +++ b/datafusion-examples/examples/custom_data_source/main.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # These examples are all related to extending or defining how DataFusion reads data +//! +//! This example demonstrates how DataFusion reads data. +//! +//! ## Usage +//! ```bash +//! cargo run --example custom_data_source -- [csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|file_stream_provider] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `csv_json_opener` — use low level FileOpener APIs to read CSV/JSON into Arrow RecordBatches +//! - `csv_sql_streaming` — build and run a streaming query plan from a SQL statement against a local CSV file +//! - `custom_datasource` — run queries against a custom datasource (TableProvider) +//! - `custom_file_casts` — implement custom casting rules to adapt file schemas +//! - `custom_file_format` — write data to a custom file format +//! - `file_stream_provider` — run a query on FileStreamProvider which implements StreamProvider for reading and writing to arbitrary stream sources / sinks + +mod csv_json_opener; +mod csv_sql_streaming; +mod custom_datasource; +mod custom_file_casts; +mod custom_file_format; +mod file_stream_provider; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + CsvJsonOpener, + CsvSqlStreaming, + CustomDatasource, + CustomFileCasts, + CustomFileFormat, + FileFtreamProvider, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "csv_json_opener" => Ok(Self::CsvJsonOpener), + "csv_sql_streaming" => Ok(Self::CsvSqlStreaming), + "custom_datasource" => Ok(Self::CustomDatasource), + "custom_file_casts" => Ok(Self::CustomFileCasts), + "custom_file_format" => Ok(Self::CustomFileFormat), + "file_stream_provider" => Ok(Self::FileFtreamProvider), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example custom_data_source -- [csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|file_stream_provider]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::CsvJsonOpener => csv_json_opener::csv_json_opener().await?, + ExampleKind::CsvSqlStreaming => csv_sql_streaming::csv_sql_streaming().await?, + ExampleKind::CustomDatasource => custom_datasource::custom_data_source().await?, + ExampleKind::CustomFileCasts => custom_file_casts::custom_file_casts().await?, + ExampleKind::CustomFileFormat => custom_file_format::custom_file_format().await?, + ExampleKind::FileFtreamProvider => { + file_stream_provider::file_stream_provider().await? + } + } + + Ok(()) +} diff --git a/datafusion-examples/examples/catalog.rs b/datafusion-examples/examples/data_io/catalog.rs similarity index 99% rename from datafusion-examples/examples/catalog.rs rename to datafusion-examples/examples/data_io/catalog.rs index 229867cdfc5b..f591d7da9e10 100644 --- a/datafusion-examples/examples/catalog.rs +++ b/datafusion-examples/examples/data_io/catalog.rs @@ -34,8 +34,7 @@ use std::{any::Any, collections::HashMap, path::Path, sync::Arc}; use std::{fs::File, io::Write}; use tempfile::TempDir; -#[tokio::main] -async fn main() -> Result<()> { +pub async fn catalog() -> Result<()> { env_logger::builder() .filter_level(log::LevelFilter::Info) .init(); @@ -134,12 +133,14 @@ struct DirSchemaOpts<'a> { dir: &'a Path, format: Arc, } + /// Schema where every file with extension `ext` in a given `dir` is a table. #[derive(Debug)] struct DirSchema { ext: String, tables: RwLock>>, } + impl DirSchema { async fn create(state: &SessionState, opts: DirSchemaOpts<'_>) -> Result> { let DirSchemaOpts { ext, dir, format } = opts; @@ -172,6 +173,7 @@ impl DirSchema { ext: ext.to_string(), })) } + #[allow(unused)] fn name(&self) -> &str { &self.ext @@ -198,6 +200,7 @@ impl SchemaProvider for DirSchema { let tables = self.tables.read().unwrap(); tables.contains_key(name) } + fn register_table( &self, name: String, @@ -218,11 +221,13 @@ impl SchemaProvider for DirSchema { Ok(tables.remove(name)) } } + /// Catalog holds multiple schemas #[derive(Debug)] struct DirCatalog { schemas: RwLock>>, } + impl DirCatalog { fn new() -> Self { Self { @@ -230,10 +235,12 @@ impl DirCatalog { } } } + impl CatalogProvider for DirCatalog { fn as_any(&self) -> &dyn Any { self } + fn register_schema( &self, name: &str, @@ -260,11 +267,13 @@ impl CatalogProvider for DirCatalog { } } } + /// Catalog lists holds multiple catalog providers. Each context has a single catalog list. #[derive(Debug)] struct CustomCatalogProviderList { catalogs: RwLock>>, } + impl CustomCatalogProviderList { fn new() -> Self { Self { @@ -272,10 +281,12 @@ impl CustomCatalogProviderList { } } } + impl CatalogProviderList for CustomCatalogProviderList { fn as_any(&self) -> &dyn Any { self } + fn register_catalog( &self, name: String, diff --git a/datafusion-examples/examples/json_shredding.rs b/datafusion-examples/examples/data_io/json_shredding.rs similarity index 99% rename from datafusion-examples/examples/json_shredding.rs rename to datafusion-examples/examples/data_io/json_shredding.rs index a2e83bc9510a..62dc971ae40d 100644 --- a/datafusion-examples/examples/json_shredding.rs +++ b/datafusion-examples/examples/data_io/json_shredding.rs @@ -63,8 +63,7 @@ use object_store::{ObjectStore, PutPayload}; // 1. Push down predicates for better filtering // 2. Avoid expensive JSON parsing at query time // 3. Leverage columnar storage benefits for the materialized fields -#[tokio::main] -async fn main() -> Result<()> { +pub async fn json_shredding() -> Result<()> { println!("=== Creating example data with flat columns and underscore prefixes ==="); // Create sample data with flat columns using underscore prefixes diff --git a/datafusion-examples/examples/data_io/main.rs b/datafusion-examples/examples/data_io/main.rs new file mode 100644 index 000000000000..e419bc9b9947 --- /dev/null +++ b/datafusion-examples/examples/data_io/main.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # These examples of data formats and I/O +//! +//! This example demonstrates data formats and I/O. +//! +//! ## Usage +//! ```bash +//! cargo run --example data_io -- [catalog|json_shredding|query_http_csv|remote_catalog] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `catalog` — register the table into a custom catalog +//! - `json_shredding` — shows how to implement custom filter rewriting for JSON shredding +//! - `query_http_csv` — demonstrates executing a simple query against an Arrow data source (CSV) and fetching results +//! - `remote_catalog` — interfacing with a remote catalog (e.g. over a network) + +mod catalog; +mod json_shredding; +mod query_http_csv; +mod remote_catalog; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + Catalog, + JsonShredding, + QueryHttpCsv, + RemoteCatalog, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "catalog" => Ok(Self::Catalog), + "json_shredding" => Ok(Self::JsonShredding), + "query_http_csv" => Ok(Self::QueryHttpCsv), + "remote_catalog" => Ok(Self::RemoteCatalog), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example data_io -- [catalog|json_shredding|query_http_csv|remote_catalog]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::Catalog => catalog::catalog().await?, + ExampleKind::JsonShredding => json_shredding::json_shredding().await?, + ExampleKind::QueryHttpCsv => query_http_csv::query_http_csv().await?, + ExampleKind::RemoteCatalog => remote_catalog::remote_catalog().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/query-http-csv.rs b/datafusion-examples/examples/data_io/query_http_csv.rs similarity index 97% rename from datafusion-examples/examples/query-http-csv.rs rename to datafusion-examples/examples/data_io/query_http_csv.rs index fa3fd2ac068d..0248051589ca 100644 --- a/datafusion-examples/examples/query-http-csv.rs +++ b/datafusion-examples/examples/data_io/query_http_csv.rs @@ -23,8 +23,7 @@ use url::Url; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and /// fetching results -#[tokio::main] -async fn main() -> Result<()> { +pub async fn query_http_csv() -> Result<()> { // create local execution context let ctx = SessionContext::new(); diff --git a/datafusion-examples/examples/remote_catalog.rs b/datafusion-examples/examples/data_io/remote_catalog.rs similarity index 99% rename from datafusion-examples/examples/remote_catalog.rs rename to datafusion-examples/examples/data_io/remote_catalog.rs index 74575554ec0a..b50e72ccbdbe 100644 --- a/datafusion-examples/examples/remote_catalog.rs +++ b/datafusion-examples/examples/data_io/remote_catalog.rs @@ -46,8 +46,7 @@ use futures::TryStreamExt; use std::any::Any; use std::sync::Arc; -#[tokio::main] -async fn main() -> Result<()> { +pub async fn remote_catalog() -> Result<()> { // As always, we create a session context to interact with DataFusion let ctx = SessionContext::new(); diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe/dataframe.rs similarity index 99% rename from datafusion-examples/examples/dataframe.rs rename to datafusion-examples/examples/dataframe/dataframe.rs index a5ee571a1476..2ae3fc295b04 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe/dataframe.rs @@ -57,8 +57,7 @@ use tempfile::tempdir; /// # Querying data /// /// * [query_to_date]: execute queries against parquet files -#[tokio::main] -async fn main() -> Result<()> { +pub async fn dataframe() -> Result<()> { env_logger::init(); // The SessionContext is the main high level API for interacting with DataFusion let ctx = SessionContext::new(); diff --git a/datafusion-examples/examples/default_column_values.rs b/datafusion-examples/examples/dataframe/default_column_values.rs similarity index 99% rename from datafusion-examples/examples/default_column_values.rs rename to datafusion-examples/examples/dataframe/default_column_values.rs index d3a7d2ec67f3..3f516eb93fd9 100644 --- a/datafusion-examples/examples/default_column_values.rs +++ b/datafusion-examples/examples/dataframe/default_column_values.rs @@ -69,8 +69,7 @@ const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value"; // The metadata-based approach provides a flexible way to store default values as strings // and cast them to the appropriate types at query time. -#[tokio::main] -async fn main() -> Result<()> { +pub async fn default_column_values() -> Result<()> { println!("=== Creating example data with missing columns and default values ==="); // Create sample data where the logical schema has more columns than the physical schema diff --git a/datafusion-examples/examples/deserialize_to_struct.rs b/datafusion-examples/examples/dataframe/deserialize_to_struct.rs similarity index 98% rename from datafusion-examples/examples/deserialize_to_struct.rs rename to datafusion-examples/examples/dataframe/deserialize_to_struct.rs index d6655b3b654f..e525e4d8efb1 100644 --- a/datafusion-examples/examples/deserialize_to_struct.rs +++ b/datafusion-examples/examples/dataframe/deserialize_to_struct.rs @@ -29,8 +29,7 @@ use futures::StreamExt; /// as [ArrayRef] /// /// [ArrayRef]: arrow::array::ArrayRef -#[tokio::main] -async fn main() -> Result<()> { +pub async fn deserialize_to_struct() -> Result<()> { // Run a query that returns two columns of data let ctx = SessionContext::new(); let testdata = datafusion::test_util::parquet_test_data(); diff --git a/datafusion-examples/examples/dataframe/main.rs b/datafusion-examples/examples/dataframe/main.rs new file mode 100644 index 000000000000..7f0f052e835c --- /dev/null +++ b/datafusion-examples/examples/dataframe/main.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # These are core DataFrame API usage +//! +//! This example demonstrates core DataFrame API usage. +//! +//! ## Usage +//! ```bash +//! cargo run --example dataframe -- [dataframe|default_column_values|deserialize_to_struct] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `dataframe` — run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries +//! - `default_column_values` — implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter +//! - `deserialize_to_struct` — convert query results (Arrow ArrayRefs) into Rust structs + +mod dataframe; +mod default_column_values; +mod deserialize_to_struct; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + Dataframe, + DefaultColumnValues, + DeserializeToStruct, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "dataframe" => Ok(Self::Dataframe), + "default_column_values" => Ok(Self::DefaultColumnValues), + "deserialize_to_struct" => Ok(Self::DeserializeToStruct), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example dataframe -- [dataframe|default_column_values|deserialize_to_struct]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::Dataframe => dataframe::dataframe().await?, + ExampleKind::DefaultColumnValues => { + default_column_values::default_column_values().await? + } + ExampleKind::DeserializeToStruct => { + deserialize_to_struct::deserialize_to_struct().await? + } + } + + Ok(()) +} diff --git a/datafusion-examples/examples/execution_monitoring/main.rs b/datafusion-examples/examples/execution_monitoring/main.rs new file mode 100644 index 000000000000..696951409c2c --- /dev/null +++ b/datafusion-examples/examples/execution_monitoring/main.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # These examples of memory and performance management +//! +//! This example demonstrates memory and performance management. +//! +//! ## Usage +//! ```bash +//! cargo run --example execution_monitoring -- [mem_pool_exec_plan|mem_pool_tracking|tracing] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `mem_pool_exec_plan` — shows how to implement memory-aware ExecutionPlan with memory reservation and spilling +//! - `mem_pool_tracking` — demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages +//! - `tracing` — demonstrates the tracing injection feature for the DataFusion runtime + +mod memory_pool_execution_plan; +mod memory_pool_tracking; +mod tracing; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + MemoryPoolExecutionPlan, + MemoryPoolTracking, + Tracing, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "mem_pool_exec_plan" => Ok(Self::MemoryPoolExecutionPlan), + "mem_pool_tracking" => Ok(Self::MemoryPoolTracking), + "tracing" => Ok(Self::Tracing), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example execution_monitoring -- [mem_pool_exec_plan|mem_pool_tracking|tracing]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::MemoryPoolExecutionPlan => { + memory_pool_execution_plan::memory_pool_execution_plan().await? + } + ExampleKind::MemoryPoolTracking => { + memory_pool_tracking::mem_pool_tracking().await? + } + ExampleKind::Tracing => tracing::tracing().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/memory_pool_execution_plan.rs b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs similarity index 99% rename from datafusion-examples/examples/memory_pool_execution_plan.rs rename to datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs index 3258cde17625..afa0057ceb13 100644 --- a/datafusion-examples/examples/memory_pool_execution_plan.rs +++ b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs @@ -44,8 +44,7 @@ use std::any::Any; use std::fmt; use std::sync::Arc; -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn memory_pool_execution_plan() -> Result<()> { println!("=== DataFusion ExecutionPlan Memory Tracking Example ===\n"); // Set up a runtime with memory tracking diff --git a/datafusion-examples/examples/memory_pool_tracking.rs b/datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs similarity index 97% rename from datafusion-examples/examples/memory_pool_tracking.rs rename to datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs index d5823b1173ab..bd88f441250a 100644 --- a/datafusion-examples/examples/memory_pool_tracking.rs +++ b/datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs @@ -24,11 +24,11 @@ //! //! * [`automatic_usage_example`]: Shows how to use RuntimeEnvBuilder to automatically enable memory tracking +use datafusion::error::Result; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::prelude::*; -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn mem_pool_tracking() -> Result<()> { println!("=== DataFusion Memory Pool Tracking Example ===\n"); // Example 1: Automatic Usage with RuntimeEnvBuilder @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box> { /// /// This shows the recommended way to use TrackConsumersPool through RuntimeEnvBuilder, /// which automatically creates a TrackConsumersPool with sensible defaults. -async fn automatic_usage_example() -> datafusion::error::Result<()> { +async fn automatic_usage_example() -> Result<()> { println!("Example 1: Automatic Usage with RuntimeEnvBuilder"); println!("------------------------------------------------"); diff --git a/datafusion-examples/examples/tracing.rs b/datafusion-examples/examples/execution_monitoring/tracing.rs similarity index 99% rename from datafusion-examples/examples/tracing.rs rename to datafusion-examples/examples/execution_monitoring/tracing.rs index 334ee0f4e568..29229d909024 100644 --- a/datafusion-examples/examples/tracing.rs +++ b/datafusion-examples/examples/execution_monitoring/tracing.rs @@ -61,8 +61,7 @@ use std::any::Any; use std::sync::Arc; use tracing::{info, instrument, Instrument, Level, Span}; -#[tokio::main] -async fn main() -> Result<()> { +pub async fn tracing() -> Result<()> { // Initialize tracing subscriber with thread info. tracing_subscriber::fmt() .with_thread_ids(true) diff --git a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs b/datafusion-examples/examples/external_dependency/dataframe_to_s3.rs similarity index 98% rename from datafusion-examples/examples/external_dependency/dataframe-to-s3.rs rename to datafusion-examples/examples/external_dependency/dataframe_to_s3.rs index e75ba5dd5328..83de1d897369 100644 --- a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs +++ b/datafusion-examples/examples/external_dependency/dataframe_to_s3.rs @@ -30,8 +30,7 @@ use url::Url; /// This example demonstrates querying data from AmazonS3 and writing /// the result of a query back to AmazonS3 -#[tokio::main] -async fn main() -> Result<()> { +pub async fn dataframe_to_s3() -> Result<()> { // create local execution context let ctx = SessionContext::new(); diff --git a/datafusion-examples/examples/external_dependency/main.rs b/datafusion-examples/examples/external_dependency/main.rs new file mode 100644 index 000000000000..3293371ff283 --- /dev/null +++ b/datafusion-examples/examples/external_dependency/main.rs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # These are using data from Amazon S3 examples +//! +//! This example demonstrates how to work with data from Amazon S3. +//! +//! ## Usage +//! ```bash +//! cargo run --example external_dependency -- [dataframe_to_s3|query_aws_s3] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `dataframe_to_s3` — run a query using a DataFrame against a parquet file from AWS S3 and writing back to AWS S3 +//! - `query_aws_s3` — configure `object_store` and run a query against files stored in AWS S3 + +mod dataframe_to_s3; +mod query_aws_s3; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + DataframeToS3, + QueryAwsS3, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "dataframe_to_s3" => Ok(Self::DataframeToS3), + "query_aws_s3" => Ok(Self::QueryAwsS3), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example external_dependency -- [dataframe_to_s3|query_aws_s3]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::DataframeToS3 => dataframe_to_s3::dataframe_to_s3().await?, + ExampleKind::QueryAwsS3 => query_aws_s3::query_aws_s3().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/external_dependency/query-aws-s3.rs b/datafusion-examples/examples/external_dependency/query_aws_s3.rs similarity index 98% rename from datafusion-examples/examples/external_dependency/query-aws-s3.rs rename to datafusion-examples/examples/external_dependency/query_aws_s3.rs index da2d7e4879f9..3810be23ee1b 100644 --- a/datafusion-examples/examples/external_dependency/query-aws-s3.rs +++ b/datafusion-examples/examples/external_dependency/query_aws_s3.rs @@ -29,8 +29,7 @@ use url::Url; /// - AWS_ACCESS_KEY_ID /// - AWS_SECRET_ACCESS_KEY /// -#[tokio::main] -async fn main() -> Result<()> { +pub async fn query_aws_s3() -> Result<()> { let ctx = SessionContext::new(); // the region must be set to the region where the bucket exists until the following diff --git a/datafusion-examples/examples/flight/flight_client.rs b/datafusion-examples/examples/flight/client.rs similarity index 97% rename from datafusion-examples/examples/flight/flight_client.rs rename to datafusion-examples/examples/flight/client.rs index ff4b5903ad88..031beea47d57 100644 --- a/datafusion-examples/examples/flight/flight_client.rs +++ b/datafusion-examples/examples/flight/client.rs @@ -30,8 +30,7 @@ use datafusion::arrow::util::pretty; /// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for /// Parquet files and executing SQL queries against them on a remote server. /// This example is run along-side the example `flight_server`. -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn client() -> Result<(), Box> { let testdata = datafusion::test_util::parquet_test_data(); // Create Flight client diff --git a/datafusion-examples/examples/flight/main.rs b/datafusion-examples/examples/flight/main.rs new file mode 100644 index 000000000000..bfa4526cccf9 --- /dev/null +++ b/datafusion-examples/examples/flight/main.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Arrow Flight Examples +//! +//! This example demonstrates Arrow Flight usage. +//! +//! ## Usage +//! ```bash +//! cargo run --example flight -- [client|server|sql_server] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `client` — run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol +//! - `server` — run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol +//! - `sql_server` — run DataFusion as a standalone process and execute SQL queries from JDBC clients + +mod client; +mod server; +mod sql_server; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + Client, + Server, + SqlServer, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "client" => Ok(Self::Client), + "server" => Ok(Self::Server), + "sql_server" => Ok(Self::SqlServer), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example simple_udf -- [udf|udaf|udwf|udtf]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::Client => client::client().await?, + ExampleKind::Server => server::server().await?, + ExampleKind::SqlServer => sql_server::sql_server().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/flight/flight_server.rs b/datafusion-examples/examples/flight/server.rs similarity index 99% rename from datafusion-examples/examples/flight/flight_server.rs rename to datafusion-examples/examples/flight/server.rs index 22265e415fbd..dc75287cf2e2 100644 --- a/datafusion-examples/examples/flight/flight_server.rs +++ b/datafusion-examples/examples/flight/server.rs @@ -194,8 +194,7 @@ fn to_tonic_err(e: datafusion::error::DataFusionError) -> Status { /// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for /// Parquet files and executing SQL queries against them on a remote server. /// This example is run along-side the example `flight_client`. -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn server() -> Result<(), Box> { let addr = "0.0.0.0:50051".parse()?; let service = FlightServiceImpl {}; diff --git a/datafusion-examples/examples/flight/flight_sql_server.rs b/datafusion-examples/examples/flight/sql_server.rs similarity index 99% rename from datafusion-examples/examples/flight/flight_sql_server.rs rename to datafusion-examples/examples/flight/sql_server.rs index c35debec7d71..fc7d0817bd5f 100644 --- a/datafusion-examples/examples/flight/flight_sql_server.rs +++ b/datafusion-examples/examples/flight/sql_server.rs @@ -69,8 +69,7 @@ macro_rules! status { /// Based heavily on Ballista's implementation: https://github.com/apache/datafusion-ballista/blob/main/ballista/scheduler/src/flight_sql.rs /// and the example in arrow-rs: https://github.com/apache/arrow-rs/blob/master/arrow-flight/examples/flight_sql_server.rs /// -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn sql_server() -> Result<(), Box> { env_logger::init(); let addr = "0.0.0.0:50051".parse()?; let service = FlightSqlServiceImpl { diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/parquet/advanced_index.rs similarity index 99% rename from datafusion-examples/examples/advanced_parquet_index.rs rename to datafusion-examples/examples/parquet/advanced_index.rs index 1c560be6d08a..1237ad930cea 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/parquet/advanced_index.rs @@ -156,8 +156,7 @@ use url::Url; /// /// [`ListingTable`]: datafusion::datasource::listing::ListingTable /// [Page Index](https://github.com/apache/parquet-format/blob/master/PageIndex.md) -#[tokio::main] -async fn main() -> Result<()> { +pub async fn advanced_index() -> Result<()> { // the object store is used to read the parquet files (in this case, it is // a local file system, but in a real system it could be S3, GCS, etc) let object_store: Arc = diff --git a/datafusion-examples/examples/parquet_embedded_index.rs b/datafusion-examples/examples/parquet/embedded_index.rs similarity index 99% rename from datafusion-examples/examples/parquet_embedded_index.rs rename to datafusion-examples/examples/parquet/embedded_index.rs index 3cbe18914775..ffd345b3d942 100644 --- a/datafusion-examples/examples/parquet_embedded_index.rs +++ b/datafusion-examples/examples/parquet/embedded_index.rs @@ -451,8 +451,7 @@ impl TableProvider for DistinctIndexTable { } } -#[tokio::main] -async fn main() -> Result<()> { +pub async fn embedded_index() -> Result<()> { // 1. Create temp dir and write 3 Parquet files with different category sets let tmp = TempDir::new()?; let dir = tmp.path(); diff --git a/datafusion-examples/examples/parquet_encrypted.rs b/datafusion-examples/examples/parquet/encrypted.rs similarity index 97% rename from datafusion-examples/examples/parquet_encrypted.rs rename to datafusion-examples/examples/parquet/encrypted.rs index 690d9f2a5f14..2fdb14c148e9 100644 --- a/datafusion-examples/examples/parquet_encrypted.rs +++ b/datafusion-examples/examples/parquet/encrypted.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::common::DataFusionError; +use datafusion::common::{DataFusionError, Result}; use datafusion::config::{ConfigFileEncryptionProperties, TableParquetOptions}; use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::logical_expr::{col, lit}; @@ -25,8 +25,7 @@ use datafusion::prelude::{ParquetReadOptions, SessionContext}; use std::sync::Arc; use tempfile::TempDir; -#[tokio::main] -async fn main() -> datafusion::common::Result<()> { +pub async fn encrypted() -> Result<()> { // The SessionContext is the main high level API for interacting with DataFusion let ctx = SessionContext::new(); diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet/encrypted_with_kms.rs similarity index 99% rename from datafusion-examples/examples/parquet_encrypted_with_kms.rs rename to datafusion-examples/examples/parquet/encrypted_with_kms.rs index 45bfd183773a..976c86d410bb 100644 --- a/datafusion-examples/examples/parquet_encrypted_with_kms.rs +++ b/datafusion-examples/examples/parquet/encrypted_with_kms.rs @@ -53,8 +53,7 @@ const ENCRYPTION_FACTORY_ID: &str = "example.mock_kms_encryption"; /// which is not a secure way to store encryption keys. /// For production use, it is recommended to use a key-management service (KMS) to encrypt /// data encryption keys. -#[tokio::main] -async fn main() -> Result<()> { +pub async fn encrypted_with_kms() -> Result<()> { let ctx = SessionContext::new(); // Register an `EncryptionFactory` implementation to be used for Parquet encryption diff --git a/datafusion-examples/examples/parquet_exec_visitor.rs b/datafusion-examples/examples/parquet/exec_visitor.rs similarity index 91% rename from datafusion-examples/examples/parquet_exec_visitor.rs rename to datafusion-examples/examples/parquet/exec_visitor.rs index 84f92d4f450e..37ef5059ce1b 100644 --- a/datafusion-examples/examples/parquet_exec_visitor.rs +++ b/datafusion-examples/examples/parquet/exec_visitor.rs @@ -17,11 +17,11 @@ use std::sync::Arc; +use datafusion::common::{DataFusionError, Result}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::datasource::physical_plan::{FileGroup, ParquetSource}; use datafusion::datasource::source::DataSourceExec; -use datafusion::error::DataFusionError; use datafusion::execution::context::SessionContext; use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::{ @@ -30,8 +30,7 @@ use datafusion::physical_plan::{ use futures::StreamExt; /// Example of collecting metrics after execution by visiting the `ExecutionPlan` -#[tokio::main] -async fn main() { +pub async fn exec_visitor() -> Result<()> { let ctx = SessionContext::new(); let test_data = datafusion::test_util::parquet_test_data(); @@ -51,8 +50,8 @@ async fn main() { ) .await; - let df = ctx.sql("SELECT * FROM my_table").await.unwrap(); - let plan = df.create_physical_plan().await.unwrap(); + let df = ctx.sql("SELECT * FROM my_table").await?; + let plan = df.create_physical_plan().await?; // Create empty visitor let mut visitor = ParquetExecVisitor { @@ -63,12 +62,12 @@ async fn main() { // Make sure you execute the plan to collect actual execution statistics. // For example, in this example the `file_scan_config` is known without executing // but the `bytes_scanned` would be None if we did not execute. - let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx()).unwrap(); - while let Some(batch) = batch_stream.next().await { - println!("Batch rows: {}", batch.unwrap().num_rows()); + let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx())?; + while let Some(batch) = batch_stream.next().await.transpose()? { + println!("Batch rows: {}", batch.num_rows()); } - visit_execution_plan(plan.as_ref(), &mut visitor).unwrap(); + visit_execution_plan(plan.as_ref(), &mut visitor)?; println!( "ParquetExecVisitor bytes_scanned: {:?}", @@ -78,6 +77,8 @@ async fn main() { "ParquetExecVisitor file_groups: {:?}", visitor.file_groups.unwrap() ); + + Ok(()) } /// Define a struct with fields to hold the execution information you want to diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet/index.rs similarity index 99% rename from datafusion-examples/examples/parquet_index.rs rename to datafusion-examples/examples/parquet/index.rs index 127c55da982c..538fba85e7b0 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet/index.rs @@ -103,8 +103,7 @@ use url::Url; /// ``` /// /// [`ListingTable`]: datafusion::datasource::listing::ListingTable -#[tokio::main] -async fn main() -> Result<()> { +pub async fn index() -> Result<()> { // Demo data has three files, each with schema // * file_name (string) // * value (int32) diff --git a/datafusion-examples/examples/parquet/main.rs b/datafusion-examples/examples/parquet/main.rs new file mode 100644 index 000000000000..f0885a1d7505 --- /dev/null +++ b/datafusion-examples/examples/parquet/main.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Parquet Examples +//! +//! This example demonstrates Parquet file in DataFusion. +//! +//! ## Usage +//! ```bash +//! cargo run --example parquet -- [advanced_index|embedded_index|encrypted_with_kms|encrypted|exec_visitor|index] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `advanced_index` — create a detailed secondary index that covers the contents of several parquet files +//! - `embedded_index` — store a custom index inside a Parquet file and use it to speed up queries +//! - `encrypted_with_kms` — read and write encrypted Parquet files using an encryption factory +//! - `encrypted` — read and write encrypted Parquet files using DataFusion +//! - `exec_visitor` — extract statistics by visiting an ExecutionPlan after execution +//! - `index` — create an secondary index over several parquet files and use it to speed up queries + +mod advanced_index; +mod embedded_index; +mod encrypted; +mod encrypted_with_kms; +mod exec_visitor; +mod index; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + AdvancedIndex, + EmbeddedIndex, + EncryptedWithKms, + Encrypted, + ExecVisitor, + Index, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "advanced_index" => Ok(Self::AdvancedIndex), + "embedded_index" => Ok(Self::EmbeddedIndex), + "encrypted_with_kms" => Ok(Self::EncryptedWithKms), + "encrypted" => Ok(Self::Encrypted), + "exec_visitor" => Ok(Self::ExecVisitor), + "index" => Ok(Self::Index), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example parquet -- [advanced_index|embedded_index|encrypted_with_kms|encrypted|exec_visitor|index]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::AdvancedIndex => advanced_index::advanced_index().await?, + ExampleKind::EmbeddedIndex => embedded_index::embedded_index().await?, + ExampleKind::EncryptedWithKms => encrypted_with_kms::encrypted_with_kms().await?, + ExampleKind::Encrypted => encrypted::encrypted().await?, + ExampleKind::ExecVisitor => exec_visitor::exec_visitor().await?, + ExampleKind::Index => index::index().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs similarity index 96% rename from datafusion-examples/examples/composed_extension_codec.rs rename to datafusion-examples/examples/proto/composed_extension_codec.rs index 57f2c370413a..58862c0045c3 100644 --- a/datafusion-examples/examples/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -44,8 +44,7 @@ use datafusion_proto::physical_plan::{ }; use datafusion_proto::protobuf; -#[tokio::main] -async fn main() { +pub async fn composed_extension_codec() -> Result<()> { // build execution plan that has both types of nodes // // Note each node requires a different `PhysicalExtensionCodec` to decode @@ -66,16 +65,15 @@ async fn main() { protobuf::PhysicalPlanNode::try_from_physical_plan( exec_plan.clone(), &composed_codec, - ) - .expect("to proto"); + )?; // deserialize proto back to execution plan - let result_exec_plan: Arc = proto - .try_into_physical_plan(&ctx.task_ctx(), &composed_codec) - .expect("from proto"); + let result_exec_plan: Arc = + proto.try_into_physical_plan(&ctx.task_ctx(), &composed_codec)?; // assert that the original and deserialized execution plans are equal assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}")); + Ok(()) } /// This example has two types of nodes: `ParentExec` and `ChildExec` which can only diff --git a/datafusion-examples/examples/proto/main.rs b/datafusion-examples/examples/proto/main.rs new file mode 100644 index 000000000000..1d1e1b20d826 --- /dev/null +++ b/datafusion-examples/examples/proto/main.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Examples demonstrating DataFusion's plan serialization via the `datafusion-proto` crate +//! +//! These examples show how to use multiple extension codecs for serialization / deserialization. +//! +//! ## Usage +//! ```bash +//! cargo run --example proto -- [composed_extension_codec] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `composed_extension_codec` — example of using multiple extension codecs for serialization / deserialization + +mod composed_extension_codec; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + ComposedExtensionCodec, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "composed_extension_codec" => Ok(Self::ComposedExtensionCodec), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example proto -- [composed_extension_codec]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::ComposedExtensionCodec => { + composed_extension_codec::composed_extension_codec().await? + } + } + + Ok(()) +} diff --git a/datafusion-examples/examples/analyzer_rule.rs b/datafusion-examples/examples/query_planning/analyzer_rule.rs similarity index 99% rename from datafusion-examples/examples/analyzer_rule.rs rename to datafusion-examples/examples/query_planning/analyzer_rule.rs index cb81cd167a88..b6c97679cb43 100644 --- a/datafusion-examples/examples/analyzer_rule.rs +++ b/datafusion-examples/examples/query_planning/analyzer_rule.rs @@ -35,8 +35,7 @@ use std::sync::{Arc, Mutex}; /// level access control scheme by introducing a filter to the query. /// /// See [optimizer_rule.rs] for an example of a optimizer rule -#[tokio::main] -pub async fn main() -> Result<()> { +pub async fn analyzer_rule() -> Result<()> { // AnalyzerRules run before OptimizerRules. // // DataFusion includes several built in AnalyzerRules for tasks such as type diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/query_planning/expr_api.rs similarity index 99% rename from datafusion-examples/examples/expr_api.rs rename to datafusion-examples/examples/query_planning/expr_api.rs index 56f960870e58..236ac4319bb6 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/query_planning/expr_api.rs @@ -55,8 +55,7 @@ use datafusion::prelude::*; /// 5. Analyze predicates for boundary ranges: [`range_analysis_demo`] /// 6. Get the types of the expressions: [`expression_type_demo`] /// 7. Apply type coercion to expressions: [`type_coercion_demo`] -#[tokio::main] -async fn main() -> Result<()> { +pub async fn expr_api() -> Result<()> { // The easiest way to do create expressions is to use the // "fluent"-style API: let expr = col("a") + lit(5); diff --git a/datafusion-examples/examples/query_planning/main.rs b/datafusion-examples/examples/query_planning/main.rs new file mode 100644 index 000000000000..6aa85d20958c --- /dev/null +++ b/datafusion-examples/examples/query_planning/main.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # These examples of internal mechanics of the query planning and optimization layers +//! +//! This example demonstrates internal mechanics of the query planning and optimization layers. +//! +//! ## Usage +//! ```bash +//! cargo run --example query_planning -- [analyzer_rule|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `analyzer_rule` — use a custom AnalyzerRule to change a query's semantics (row level access control) +//! - `expr_api` — create, execute, simplify, analyze and coerce `Expr`s +//! - `optimizer_rule` — use a custom OptimizerRule to replace certain predicates +//! - `parse_sql_expr` — parse SQL text into DataFusion `Expr` +//! - `plan_to_sql` — generate SQL from DataFusion `Expr` and `LogicalPlan` +//! - `planner_api` — APIs to manipulate logical and physical plans +//! - `pruning` — APIs to manipulate logical and physical plans +//! - `thread_pools` — demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages and shows how to implement memory-aware ExecutionPlan with memory reservation and spilling + +mod analyzer_rule; +mod expr_api; +mod optimizer_rule; +mod parse_sql_expr; +mod plan_to_sql; +mod planner_api; +mod pruning; +mod thread_pools; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + AnalyzerRule, + ExprApi, + OptimizerRule, + ParseSqlExpr, + PlanToSql, + PlannerApi, + Pruning, + ThreadPools, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "analyzer_rule" => Ok(Self::AnalyzerRule), + "expr_api" => Ok(Self::ExprApi), + "optimizer_rule" => Ok(Self::OptimizerRule), + "parse_sql_expr" => Ok(Self::ParseSqlExpr), + "plan_to_sql" => Ok(Self::PlanToSql), + "planner_api" => Ok(Self::PlannerApi), + "pruning" => Ok(Self::Pruning), + "thread_pools" => Ok(Self::ThreadPools), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example query_planning -- [analyzer_rule|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::AnalyzerRule => analyzer_rule::analyzer_rule().await?, + ExampleKind::ExprApi => expr_api::expr_api().await?, + ExampleKind::OptimizerRule => optimizer_rule::optimizer_rule().await?, + ExampleKind::ParseSqlExpr => parse_sql_expr::parse_sql_expr().await?, + ExampleKind::PlanToSql => plan_to_sql::plan_to_sql_example().await?, + ExampleKind::PlannerApi => planner_api::planner_api().await?, + ExampleKind::Pruning => pruning::pruning().await?, + ExampleKind::ThreadPools => thread_pools::thread_pools().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/optimizer_rule.rs b/datafusion-examples/examples/query_planning/optimizer_rule.rs similarity index 99% rename from datafusion-examples/examples/optimizer_rule.rs rename to datafusion-examples/examples/query_planning/optimizer_rule.rs index 9c137b67432c..4af5ef50b3df 100644 --- a/datafusion-examples/examples/optimizer_rule.rs +++ b/datafusion-examples/examples/query_planning/optimizer_rule.rs @@ -37,8 +37,7 @@ use std::sync::Arc; /// /// See [analyzer_rule.rs] for an example of AnalyzerRules, which are for /// changing plan semantics. -#[tokio::main] -pub async fn main() -> Result<()> { +pub async fn optimizer_rule() -> Result<()> { // DataFusion includes many built in OptimizerRules for tasks such as outer // to inner join conversion and constant folding. // diff --git a/datafusion-examples/examples/parse_sql_expr.rs b/datafusion-examples/examples/query_planning/parse_sql_expr.rs similarity index 97% rename from datafusion-examples/examples/parse_sql_expr.rs rename to datafusion-examples/examples/query_planning/parse_sql_expr.rs index 5387e7c4a05d..44e6b3cf5f67 100644 --- a/datafusion-examples/examples/parse_sql_expr.rs +++ b/datafusion-examples/examples/query_planning/parse_sql_expr.rs @@ -32,17 +32,15 @@ use datafusion::{ /// The code in this example shows how to: /// /// 1. [`simple_session_context_parse_sql_expr_demo`]: Parse a simple SQL text into a logical -/// expression using a schema at [`SessionContext`]. +/// expression using a schema at [`SessionContext`]. /// /// 2. [`simple_dataframe_parse_sql_expr_demo`]: Parse a simple SQL text into a logical expression -/// using a schema at [`DataFrame`]. +/// using a schema at [`DataFrame`]. /// /// 3. [`query_parquet_demo`]: Query a parquet file using the parsed_sql_expr from a DataFrame. /// /// 4. [`round_trip_parse_sql_expr_demo`]: Parse a SQL text and convert it back to SQL using [`Unparser`]. - -#[tokio::main] -async fn main() -> Result<()> { +pub async fn parse_sql_expr() -> Result<()> { // See how to evaluate expressions simple_session_context_parse_sql_expr_demo()?; simple_dataframe_parse_sql_expr_demo().await?; diff --git a/datafusion-examples/examples/plan_to_sql.rs b/datafusion-examples/examples/query_planning/plan_to_sql.rs similarity index 96% rename from datafusion-examples/examples/plan_to_sql.rs rename to datafusion-examples/examples/query_planning/plan_to_sql.rs index 54483b143a16..31ee74f63ff6 100644 --- a/datafusion-examples/examples/plan_to_sql.rs +++ b/datafusion-examples/examples/query_planning/plan_to_sql.rs @@ -43,28 +43,26 @@ use std::sync::Arc; /// The code in this example shows how to: /// /// 1. [`simple_expr_to_sql_demo`]: Create a simple expression [`Exprs`] with -/// fluent API and convert to sql suitable for passing to another database +/// fluent API and convert to sql suitable for passing to another database /// /// 2. [`simple_expr_to_pretty_sql_demo`] Create a simple expression -/// [`Exprs`] with fluent API and convert to sql without extra parentheses, -/// suitable for displaying to humans +/// [`Exprs`] with fluent API and convert to sql without extra parentheses, +/// suitable for displaying to humans /// /// 3. [`simple_expr_to_sql_demo_escape_mysql_style`]" Create a simple -/// expression [`Exprs`] with fluent API and convert to sql escaping column -/// names in MySQL style. +/// expression [`Exprs`] with fluent API and convert to sql escaping column +/// names in MySQL style. /// /// 4. [`simple_plan_to_sql_demo`]: Create a simple logical plan using the -/// DataFrames API and convert to sql string. +/// DataFrames API and convert to sql string. /// /// 5. [`round_trip_plan_to_sql_demo`]: Create a logical plan from a SQL string, modify it using the -/// DataFrames API and convert it back to a sql string. +/// DataFrames API and convert it back to a sql string. /// /// 6. [`unparse_my_logical_plan_as_statement`]: Create a custom logical plan and unparse it as a statement. /// /// 7. [`unparse_my_logical_plan_as_subquery`]: Create a custom logical plan and unparse it as a subquery. - -#[tokio::main] -async fn main() -> Result<()> { +pub async fn plan_to_sql_example() -> Result<()> { // See how to evaluate expressions simple_expr_to_sql_demo()?; simple_expr_to_pretty_sql_demo()?; diff --git a/datafusion-examples/examples/planner_api.rs b/datafusion-examples/examples/query_planning/planner_api.rs similarity index 99% rename from datafusion-examples/examples/planner_api.rs rename to datafusion-examples/examples/query_planning/planner_api.rs index 55aec7b0108a..dd3643471ead 100644 --- a/datafusion-examples/examples/planner_api.rs +++ b/datafusion-examples/examples/query_planning/planner_api.rs @@ -32,8 +32,7 @@ use datafusion::prelude::*; /// physical plan: /// - Via the combined `create_physical_plan` API. /// - Utilizing the analyzer, optimizer, and query planner APIs separately. -#[tokio::main] -async fn main() -> Result<()> { +pub async fn planner_api() -> Result<()> { // Set up a DataFusion context and load a Parquet file let ctx = SessionContext::new(); let testdata = datafusion::test_util::parquet_test_data(); diff --git a/datafusion-examples/examples/pruning.rs b/datafusion-examples/examples/query_planning/pruning.rs similarity index 98% rename from datafusion-examples/examples/pruning.rs rename to datafusion-examples/examples/query_planning/pruning.rs index 9a61789662cd..8bbad1104581 100644 --- a/datafusion-examples/examples/pruning.rs +++ b/datafusion-examples/examples/query_planning/pruning.rs @@ -22,6 +22,7 @@ use arrow::array::{ArrayRef, BooleanArray, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::common::pruning::PruningStatistics; use datafusion::common::{DFSchema, ScalarValue}; +use datafusion::error::Result; use datafusion::execution::context::ExecutionProps; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_optimizer::pruning::PruningPredicate; @@ -40,8 +41,7 @@ use datafusion::prelude::*; /// one might do as part of a higher level storage engine. See /// `parquet_index.rs` for an example that uses pruning in the context of an /// individual query. -#[tokio::main] -async fn main() { +pub async fn pruning() -> Result<()> { // In this example, we'll use the PruningPredicate to determine if // the expression `x = 5 AND y = 10` can never be true based on statistics @@ -69,7 +69,7 @@ async fn main() { let predicate = create_pruning_predicate(expr, &my_catalog.schema); // Evaluate the predicate for the three files in the catalog - let prune_results = predicate.prune(&my_catalog).unwrap(); + let prune_results = predicate.prune(&my_catalog)?; println!("Pruning results: {prune_results:?}"); // The result is a `Vec` of bool values, one for each file in the catalog @@ -93,6 +93,8 @@ async fn main() { false ] ); + + Ok(()) } /// A simple model catalog that has information about the three files that store @@ -104,9 +106,10 @@ struct MyCatalog { // (min, max) for y y_values: Vec<(Option, Option)>, } + impl MyCatalog { fn new() -> Self { - MyCatalog { + Self { schema: Arc::new(Schema::new(vec![ Field::new("x", DataType::Int32, false), Field::new("y", DataType::Int32, false), diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/query_planning/thread_pools.rs similarity index 99% rename from datafusion-examples/examples/thread_pools.rs rename to datafusion-examples/examples/query_planning/thread_pools.rs index bba56b2932ab..70a212405935 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/query_planning/thread_pools.rs @@ -64,8 +64,7 @@ use url::Url; /// when using Rust libraries such as `tonic`. Using a separate `Runtime` for /// CPU bound tasks will often be simpler in larger applications, even though it /// makes this example slightly more complex. -#[tokio::main] -async fn main() -> Result<()> { +pub async fn thread_pools() -> Result<()> { // The first two examples read local files. Enabling the URL table feature // lets us treat filenames as tables in SQL. let ctx = SessionContext::new().enable_url_table(); diff --git a/datafusion-examples/examples/simple_udf/main.rs b/datafusion-examples/examples/simple_udf/main.rs new file mode 100644 index 000000000000..d8d53abc1eb0 --- /dev/null +++ b/datafusion-examples/examples/simple_udf/main.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Simple UDF/UDAF/UDWF/UDFT Examples +//! +//! This example demonstrates simple user-defined functions in DataFusion. +//! +//! ## Usage +//! ```bash +//! cargo run --example simple_udf -- [udf|udaf|udwf|udtf] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `udf` — user defined scalar function example +//! - `udaf` — user defined aggregate function example +//! - `udwf` — user defined window function example +//! - `udtf` — user defined table function example + +mod udaf; +mod udf; +mod udtf; +mod udwf; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + Udf, + Udaf, + Udwf, + Udtf, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "udf" => Ok(Self::Udf), + "udaf" => Ok(Self::Udaf), + "udwf" => Ok(Self::Udwf), + "udtf" => Ok(Self::Udtf), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("Usage: cargo run --example simple_udf -- [udf|udaf|udwf|udtf]"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::Udf => udf::simple_udf().await?, + ExampleKind::Udaf => udaf::simple_udaf().await?, + ExampleKind::Udtf => udtf::simple_udtf().await?, + ExampleKind::Udwf => udwf::simple_udwf().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/simple_udaf.rs b/datafusion-examples/examples/simple_udf/udaf.rs similarity index 97% rename from datafusion-examples/examples/simple_udaf.rs rename to datafusion-examples/examples/simple_udf/udaf.rs index 82bde7c034a5..e9f905e72099 100644 --- a/datafusion-examples/examples/simple_udaf.rs +++ b/datafusion-examples/examples/simple_udf/udaf.rs @@ -135,8 +135,9 @@ impl Accumulator for GeometricMean { } } -#[tokio::main] -async fn main() -> Result<()> { +/// In this example we register `GeometricMean` +/// as user defined aggregate function and invoke it via the DataFrame API and SQL +pub async fn simple_udaf() -> Result<()> { let ctx = create_context()?; // here is where we define the UDAF. We also declare its signature: diff --git a/datafusion-examples/examples/simple_udf.rs b/datafusion-examples/examples/simple_udf/udf.rs similarity index 99% rename from datafusion-examples/examples/simple_udf.rs rename to datafusion-examples/examples/simple_udf/udf.rs index 5612e0939f70..7d4f3588e313 100644 --- a/datafusion-examples/examples/simple_udf.rs +++ b/datafusion-examples/examples/simple_udf/udf.rs @@ -57,8 +57,7 @@ fn create_context() -> Result { } /// In this example we will declare a single-type, single return type UDF that exponentiates f64, a^b -#[tokio::main] -async fn main() -> Result<()> { +pub async fn simple_udf() -> Result<()> { let ctx = create_context()?; // First, declare the actual implementation of the calculation diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udf/udtf.rs similarity index 99% rename from datafusion-examples/examples/simple_udtf.rs rename to datafusion-examples/examples/simple_udf/udtf.rs index b65ffb8d7174..018628d9914e 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udf/udtf.rs @@ -41,33 +41,6 @@ use std::sync::Arc; // 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`] // 3. Register the function using [`SessionContext::register_udtf`] -/// This example demonstrates how to register a TableFunction -#[tokio::main] -async fn main() -> Result<()> { - // create local execution context - let ctx = SessionContext::new(); - - // register the table function that will be called in SQL statements by `read_csv` - ctx.register_udtf("read_csv", Arc::new(LocalCsvTableFunc {})); - - let testdata = datafusion::test_util::arrow_test_data(); - let csv_file = format!("{testdata}/csv/aggregate_test_100.csv"); - - // Pass 2 arguments, read csv with at most 2 rows (simplify logic makes 1+1 --> 2) - let df = ctx - .sql(format!("SELECT * FROM read_csv('{csv_file}', 1 + 1);").as_str()) - .await?; - df.show().await?; - - // just run, return all rows - let df = ctx - .sql(format!("SELECT * FROM read_csv('{csv_file}');").as_str()) - .await?; - df.show().await?; - - Ok(()) -} - /// Table Function that mimics the [`read_csv`] function in DuckDB. /// /// Usage: `read_csv(filename, [limit])` @@ -180,3 +153,29 @@ fn read_csv_batches(csv_path: impl AsRef) -> Result<(SchemaRef, Vec Result<()> { + // create local execution context + let ctx = SessionContext::new(); + + // register the table function that will be called in SQL statements by `read_csv` + ctx.register_udtf("read_csv", Arc::new(LocalCsvTableFunc {})); + + let testdata = datafusion::test_util::arrow_test_data(); + let csv_file = format!("{testdata}/csv/aggregate_test_100.csv"); + + // Pass 2 arguments, read csv with at most 2 rows (simplify logic makes 1+1 --> 2) + let df = ctx + .sql(format!("SELECT * FROM read_csv('{csv_file}', 1 + 1);").as_str()) + .await?; + df.show().await?; + + // just run, return all rows + let df = ctx + .sql(format!("SELECT * FROM read_csv('{csv_file}');").as_str()) + .await?; + df.show().await?; + + Ok(()) +} diff --git a/datafusion-examples/examples/simple_udwf.rs b/datafusion-examples/examples/simple_udf/udwf.rs similarity index 99% rename from datafusion-examples/examples/simple_udwf.rs rename to datafusion-examples/examples/simple_udf/udwf.rs index 1736ff00bd70..81b47690e356 100644 --- a/datafusion-examples/examples/simple_udwf.rs +++ b/datafusion-examples/examples/simple_udf/udwf.rs @@ -41,9 +41,69 @@ async fn create_context() -> Result { Ok(ctx) } +/// Create a `PartitionEvaluator` to evaluate this function on a new +/// partition. +fn make_partition_evaluator() -> Result> { + Ok(Box::new(MyPartitionEvaluator::new())) +} + +/// This implements the lowest level evaluation for a window function +/// +/// It handles calculating the value of the window function for each +/// distinct values of `PARTITION BY` (each car type in our example) +#[derive(Clone, Debug)] +struct MyPartitionEvaluator {} + +impl MyPartitionEvaluator { + fn new() -> Self { + Self {} + } +} + +/// Different evaluation methods are called depending on the various +/// settings of WindowUDF. This example uses the simplest and most +/// general, `evaluate`. See `PartitionEvaluator` for the other more +/// advanced uses. +impl PartitionEvaluator for MyPartitionEvaluator { + /// Tell DataFusion the window function varies based on the value + /// of the window frame. + fn uses_window_frame(&self) -> bool { + true + } + + /// This function is called once per input row. + /// + /// `range`specifies which indexes of `values` should be + /// considered for the calculation. + /// + /// Note this is the SLOWEST, but simplest, way to evaluate a + /// window function. It is much faster to implement + /// evaluate_all or evaluate_all_with_rank, if possible + fn evaluate( + &mut self, + values: &[ArrayRef], + range: &std::ops::Range, + ) -> Result { + // Again, the input argument is an array of floating + // point numbers to calculate a moving average + let arr: &Float64Array = values[0].as_ref().as_primitive::(); + + let range_len = range.end - range.start; + + // our smoothing function will average all the values in the + let output = if range_len > 0 { + let sum: f64 = arr.values().iter().skip(range.start).take(range_len).sum(); + Some(sum / range_len as f64) + } else { + None + }; + + Ok(ScalarValue::Float64(output)) + } +} + /// In this example we will declare a user defined window function that computes a moving average and then run it using SQL -#[tokio::main] -async fn main() -> Result<()> { +pub async fn simple_udwf() -> Result<()> { let ctx = create_context().await?; // here is where we define the UDWF. We also declare its signature: @@ -130,64 +190,3 @@ async fn main() -> Result<()> { Ok(()) } - -/// Create a `PartitionEvaluator` to evaluate this function on a new -/// partition. -fn make_partition_evaluator() -> Result> { - Ok(Box::new(MyPartitionEvaluator::new())) -} - -/// This implements the lowest level evaluation for a window function -/// -/// It handles calculating the value of the window function for each -/// distinct values of `PARTITION BY` (each car type in our example) -#[derive(Clone, Debug)] -struct MyPartitionEvaluator {} - -impl MyPartitionEvaluator { - fn new() -> Self { - Self {} - } -} - -/// Different evaluation methods are called depending on the various -/// settings of WindowUDF. This example uses the simplest and most -/// general, `evaluate`. See `PartitionEvaluator` for the other more -/// advanced uses. -impl PartitionEvaluator for MyPartitionEvaluator { - /// Tell DataFusion the window function varies based on the value - /// of the window frame. - fn uses_window_frame(&self) -> bool { - true - } - - /// This function is called once per input row. - /// - /// `range`specifies which indexes of `values` should be - /// considered for the calculation. - /// - /// Note this is the SLOWEST, but simplest, way to evaluate a - /// window function. It is much faster to implement - /// evaluate_all or evaluate_all_with_rank, if possible - fn evaluate( - &mut self, - values: &[ArrayRef], - range: &std::ops::Range, - ) -> Result { - // Again, the input argument is an array of floating - // point numbers to calculate a moving average - let arr: &Float64Array = values[0].as_ref().as_primitive::(); - - let range_len = range.end - range.start; - - // our smoothing function will average all the values in the - let output = if range_len > 0 { - let sum: f64 = arr.values().iter().skip(range.start).take(range_len).sum(); - Some(sum / range_len as f64) - } else { - None - }; - - Ok(ScalarValue::Float64(output)) - } -} diff --git a/datafusion-examples/examples/sql_analysis.rs b/datafusion-examples/examples/sql_ops/analysis.rs similarity index 99% rename from datafusion-examples/examples/sql_analysis.rs rename to datafusion-examples/examples/sql_ops/analysis.rs index 4ff669faf1d0..7b9f860d872a 100644 --- a/datafusion-examples/examples/sql_analysis.rs +++ b/datafusion-examples/examples/sql_ops/analysis.rs @@ -165,8 +165,7 @@ fn count_tree(join: &LogicalPlan) -> (usize, Vec<&LogicalPlan>) { (total, inputs) } -#[tokio::main] -async fn main() -> Result<()> { +pub async fn analysis() -> Result<()> { // To show how we can count the joins in a sql query we'll be using query 88 // from the TPC-DS benchmark. // diff --git a/datafusion-examples/examples/sql_dialect.rs b/datafusion-examples/examples/sql_ops/dialect.rs similarity index 98% rename from datafusion-examples/examples/sql_dialect.rs rename to datafusion-examples/examples/sql_ops/dialect.rs index 20b515506f3b..986b3e31407e 100644 --- a/datafusion-examples/examples/sql_dialect.rs +++ b/datafusion-examples/examples/sql_ops/dialect.rs @@ -26,8 +26,7 @@ use datafusion::sql::{ /// This example demonstrates how to use the DFParser to parse a statement in a custom way /// /// This technique can be used to implement a custom SQL dialect, for example. -#[tokio::main] -async fn main() -> Result<()> { +pub async fn dialect() -> Result<()> { let mut my_parser = MyParser::new("COPY source_table TO 'file.fasta' STORED AS FASTA")?; diff --git a/datafusion-examples/examples/sql_frontend.rs b/datafusion-examples/examples/sql_ops/frontend.rs similarity index 99% rename from datafusion-examples/examples/sql_frontend.rs rename to datafusion-examples/examples/sql_ops/frontend.rs index 1fc9ce24ecbb..432af968bcf3 100644 --- a/datafusion-examples/examples/sql_frontend.rs +++ b/datafusion-examples/examples/sql_ops/frontend.rs @@ -44,7 +44,7 @@ use std::sync::Arc; /// /// In this example, we demonstrate how to use the lower level APIs directly, /// which only requires the `datafusion-sql` dependency. -pub fn main() -> Result<()> { +pub fn frontend() -> Result<()> { // First, we parse the SQL string. Note that we use the DataFusion // Parser, which wraps the `sqlparser-rs` SQL parser and adds DataFusion // specific syntax such as `CREATE EXTERNAL TABLE` diff --git a/datafusion-examples/examples/sql_ops/main.rs b/datafusion-examples/examples/sql_ops/main.rs new file mode 100644 index 000000000000..c2e659e7c2e0 --- /dev/null +++ b/datafusion-examples/examples/sql_ops/main.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # SQL Examples +//! +//! This example demonstrates SQL in DataFusion. +//! +//! ## Usage +//! ```bash +//! cargo run --example sql_ops -- [analysis|dialect|frontend|query] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `analysis` — analyse SQL queries with DataFusion structures +//! - `dialect` — implementing a custom SQL dialect on top of DFParser +//! - `frontend` — create LogicalPlans (only) from sql strings +//! - `query` — query data using SQL (in memory RecordBatches, local Parquet files) + +mod analysis; +mod dialect; +mod frontend; +mod query; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + Analysis, + Dialect, + Frontend, + Query, +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "analysis" => Ok(Self::Analysis), + "dialect" => Ok(Self::Dialect), + "frontend" => Ok(Self::Frontend), + "query" => Ok(Self::Query), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!( + "Usage: cargo run --example sql_ops -- [analysis|dialect|frontend|query]" + ); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::Analysis => analysis::analysis().await?, + ExampleKind::Dialect => dialect::dialect().await?, + ExampleKind::Frontend => frontend::frontend()?, + ExampleKind::Query => query::query().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/sql_query.rs b/datafusion-examples/examples/sql_ops/query.rs similarity index 98% rename from datafusion-examples/examples/sql_query.rs rename to datafusion-examples/examples/sql_ops/query.rs index 0ac203cfb7e7..0db35f834963 100644 --- a/datafusion-examples/examples/sql_query.rs +++ b/datafusion-examples/examples/sql_ops/query.rs @@ -33,15 +33,14 @@ use std::sync::Arc; /// [`query_memtable`]: a simple query against a [`MemTable`] /// [`query_parquet`]: a simple query against a directory with multiple Parquet files /// -#[tokio::main] -async fn main() -> Result<()> { +pub async fn query() -> Result<()> { query_memtable().await?; query_parquet().await?; Ok(()) } /// Run a simple query against a [`MemTable`] -pub async fn query_memtable() -> Result<()> { +async fn query_memtable() -> Result<()> { let mem_table = create_memtable()?; // create local execution context