From ebc4485245f7306a95868f55904bc7a3b7ed0988 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 2 Jul 2024 19:03:48 +0200 Subject: [PATCH 1/2] Move compressed CSV example to own file Split `csv_sql` into two: - `csv_sql`: simple SQL-based example of reading a CSV file - `csv_compressed` (new): example of reading a compressed CSV file --- .../examples/csv_compressed.rs | 56 +++++++++++++++++++ datafusion-examples/examples/csv_sql.rs | 18 ------ 2 files changed, 56 insertions(+), 18 deletions(-) create mode 100644 datafusion-examples/examples/csv_compressed.rs diff --git a/datafusion-examples/examples/csv_compressed.rs b/datafusion-examples/examples/csv_compressed.rs new file mode 100644 index 000000000000..f1138f461417 --- /dev/null +++ b/datafusion-examples/examples/csv_compressed.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::datasource::file_format::file_compression_type::FileCompressionType; +use datafusion::error::Result; +use datafusion::prelude::*; + +/// This example demonstrates executing a simple query against a compressed CSV file +#[tokio::main] +async fn main() -> Result<()> { + // create local execution context + let ctx = SessionContext::new(); + + let testdata = datafusion::test_util::arrow_test_data(); + + // register csv file with the execution context + ctx.register_csv( + "aggregate_test_100", + &format!("{testdata}/csv/aggregate_test_100.csv"), + CsvReadOptions::new(), + ) + .await?; + + // query compressed CSV with specific options + let csv_options = CsvReadOptions::default() + .has_header(true) + .file_compression_type(FileCompressionType::GZIP) + .file_extension("csv.gz"); + let df = ctx + .read_csv( + &format!("{testdata}/csv/aggregate_test_100.csv.gz"), + csv_options, + ) + .await?; + let df = df + .filter(col("c1").eq(lit("a")))? + .select_columns(&["c2", "c3"])?; + + df.show().await?; + + Ok(()) +} diff --git a/datafusion-examples/examples/csv_sql.rs b/datafusion-examples/examples/csv_sql.rs index 851fdcb626d2..ce602e0e4816 100644 --- a/datafusion-examples/examples/csv_sql.rs +++ b/datafusion-examples/examples/csv_sql.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::error::Result; use datafusion::prelude::*; @@ -49,22 +48,5 @@ async fn main() -> Result<()> { // print the results df.show().await?; - // query compressed CSV with specific options - let csv_options = CsvReadOptions::default() - .has_header(true) - .file_compression_type(FileCompressionType::GZIP) - .file_extension("csv.gz"); - let df = ctx - .read_csv( - &format!("{testdata}/csv/aggregate_test_100.csv.gz"), - csv_options, - ) - .await?; - let df = df - .filter(col("c1").eq(lit("a")))? - .select_columns(&["c2", "c3"])?; - - df.show().await?; - Ok(()) } From 4095cb896bc42a79f3ef4901078e8233b1543519 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 2 Jul 2024 19:04:57 +0200 Subject: [PATCH 2/2] Add example of dataframe API aggregations Provide example for min, max, count. This is useful for users to update their code following the conversion from builtin to UDAFs. --- datafusion-examples/examples/csv_dataframe.rs | 48 +++++++++++++++++++ datafusion-examples/examples/csv_sql.rs | 4 +- 2 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 datafusion-examples/examples/csv_dataframe.rs diff --git a/datafusion-examples/examples/csv_dataframe.rs b/datafusion-examples/examples/csv_dataframe.rs new file mode 100644 index 000000000000..8f79bd7394e8 --- /dev/null +++ b/datafusion-examples/examples/csv_dataframe.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::error::Result; +use datafusion::functions_aggregate::count::count; +use datafusion::prelude::*; + +/// This example demonstrates executing a DataFrame operation against an Arrow data source (CSV) and +/// fetching results. See `csv_sql.rs` for a SQL version of this example. +#[tokio::main] +async fn main() -> Result<()> { + // create local execution context + let ctx = SessionContext::new(); + + let testdata = datafusion::test_util::arrow_test_data(); + + // execute the query + let df = ctx + .read_csv( + &format!("{testdata}/csv/aggregate_test_100.csv"), + CsvReadOptions::new(), + ) + .await? + .filter(col("c11").gt(lit(0.1)).and(col("c11").lt(lit(0.9))))? + .aggregate( + vec![col("c1")], + vec![min(col("c12")), max(col("c12")), count(wildcard())], + )?; + + // print the results + df.show().await?; + + Ok(()) +} diff --git a/datafusion-examples/examples/csv_sql.rs b/datafusion-examples/examples/csv_sql.rs index ce602e0e4816..304eadc3690e 100644 --- a/datafusion-examples/examples/csv_sql.rs +++ b/datafusion-examples/examples/csv_sql.rs @@ -19,7 +19,7 @@ use datafusion::error::Result; use datafusion::prelude::*; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and -/// fetching results +/// fetching results. See `csv_dataframe.rs` for a DataFrame version of this example. #[tokio::main] async fn main() -> Result<()> { // create local execution context @@ -38,7 +38,7 @@ async fn main() -> Result<()> { // execute the query let df = ctx .sql( - "SELECT c1, MIN(c12), MAX(c12) \ + "SELECT c1, MIN(c12), MAX(c12), COUNT(*) \ FROM aggregate_test_100 \ WHERE c11 > 0.1 AND c11 < 0.9 \ GROUP BY c1",