Skip to content

Commit 8226ebf

Browse files
cj-zhukovSergey Zhukov
andauthored
Consolidate data io examples (#18142) (#18591)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - part of ##18142. ## Rationale for this change This PR is for consolidating all the `data_io` examples (parquet, catalog, remote_catalog, json_shredding, query_http_csv) into a single example binary. We are agreed on the pattern and we can apply it to the remaining examples <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Sergey Zhukov <szhukov@aligntech.com>
1 parent 30eaff3 commit 8226ebf

File tree

12 files changed

+227
-58
lines changed

12 files changed

+227
-58
lines changed

datafusion-examples/README.md

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@ cargo run --example dataframe
4949
- [`examples/udf/advanced_udaf.rs`](examples/udf/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
5050
- [`examples/udf/advanced_udf.rs`](examples/udf/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
5151
- [`examples/udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
52-
- [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
52+
- [`examples/data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
5353
- [`examples/udf/async_udf.rs`](examples/udf/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF)
5454
- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control)
55-
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
55+
- [`examples/data_io/catalog.rs`](examples/data_io/catalog.rs): Register the table into a custom catalog
56+
- [`examples/data_io/json_shredding.rs`](examples/data_io/json_shredding.rs): Shows how to implement custom filter rewriting for JSON shredding
5657
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
5758
- [`examples/custom_data_source/csv_sql_streaming.rs`](examples/custom_data_source/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
5859
- [`examples/custom_data_source/csv_json_opener.rs`](examples/custom_data_source/csv_json_opener.rs): Use low level `FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es
@@ -71,19 +72,19 @@ cargo run --example dataframe
7172
- [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages
7273
- [`memory_pool_execution_plan.rs`](examples/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling
7374
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
74-
- [`parquet_embedded_index.rs`](examples/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries
75-
- [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion
76-
- [`parquet_encrypted_with_kms.rs`](examples/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory
77-
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
78-
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
75+
- [`examples/data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries
76+
- [`examples/data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion
77+
- [`examples/data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory
78+
- [`examples/data_io/parquet_index.rs`](examples/data_io/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
79+
- [`examples/data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
7980
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
8081
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
8182
- [`planner_api.rs`](examples/planner_api.rs) APIs to manipulate logical and physical plans
8283
- [`pruning.rs`](examples/pruning.rs): Use pruning to rule out files based on statistics
8384
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
84-
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
85+
- [`examples/data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs): Configure `object_store` and run a query against files via HTTP
8586
- [`examples/builtin_functions/regexp.rs`](examples/builtin_functions/regexp.rs): Examples of using regular expression functions
86-
- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network)
87+
- [`examples/data_io/remote_catalog.rs`](examples/data_io/remote_catalog.rs): Examples of interfacing with a remote catalog (e.g. over a network)
8788
- [`examples/udf/simple_udaf.rs`](examples/udf/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
8889
- [`examples/udf/simple_udf.rs`](examples/udf/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
8990
- [`examples/udf/simple_udtf.rs`](examples/udf/simple_udtf.rs): Define and invoke a User Defined Table Function (UDTF)

datafusion-examples/examples/catalog.rs renamed to datafusion-examples/examples/data_io/catalog.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ use std::{any::Any, collections::HashMap, path::Path, sync::Arc};
3434
use std::{fs::File, io::Write};
3535
use tempfile::TempDir;
3636

37-
#[tokio::main]
38-
async fn main() -> Result<()> {
37+
/// Register the table into a custom catalog
38+
pub async fn catalog() -> Result<()> {
3939
env_logger::builder()
4040
.filter_level(log::LevelFilter::Info)
4141
.init();
@@ -134,12 +134,14 @@ struct DirSchemaOpts<'a> {
134134
dir: &'a Path,
135135
format: Arc<dyn FileFormat>,
136136
}
137+
137138
/// Schema where every file with extension `ext` in a given `dir` is a table.
138139
#[derive(Debug)]
139140
struct DirSchema {
140141
ext: String,
141142
tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
142143
}
144+
143145
impl DirSchema {
144146
async fn create(state: &SessionState, opts: DirSchemaOpts<'_>) -> Result<Arc<Self>> {
145147
let DirSchemaOpts { ext, dir, format } = opts;
@@ -172,6 +174,7 @@ impl DirSchema {
172174
ext: ext.to_string(),
173175
}))
174176
}
177+
175178
#[allow(unused)]
176179
fn name(&self) -> &str {
177180
&self.ext
@@ -198,6 +201,7 @@ impl SchemaProvider for DirSchema {
198201
let tables = self.tables.read().unwrap();
199202
tables.contains_key(name)
200203
}
204+
201205
fn register_table(
202206
&self,
203207
name: String,
@@ -223,17 +227,20 @@ impl SchemaProvider for DirSchema {
223227
struct DirCatalog {
224228
schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
225229
}
230+
226231
impl DirCatalog {
227232
fn new() -> Self {
228233
Self {
229234
schemas: RwLock::new(HashMap::new()),
230235
}
231236
}
232237
}
238+
233239
impl CatalogProvider for DirCatalog {
234240
fn as_any(&self) -> &dyn Any {
235241
self
236242
}
243+
237244
fn register_schema(
238245
&self,
239246
name: &str,
@@ -260,22 +267,26 @@ impl CatalogProvider for DirCatalog {
260267
}
261268
}
262269
}
270+
263271
/// Catalog lists holds multiple catalog providers. Each context has a single catalog list.
264272
#[derive(Debug)]
265273
struct CustomCatalogProviderList {
266274
catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
267275
}
276+
268277
impl CustomCatalogProviderList {
269278
fn new() -> Self {
270279
Self {
271280
catalogs: RwLock::new(HashMap::new()),
272281
}
273282
}
274283
}
284+
275285
impl CatalogProviderList for CustomCatalogProviderList {
276286
fn as_any(&self) -> &dyn Any {
277287
self
278288
}
289+
279290
fn register_catalog(
280291
&self,
281292
name: String,

datafusion-examples/examples/json_shredding.rs renamed to datafusion-examples/examples/data_io/json_shredding.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ use object_store::{ObjectStore, PutPayload};
6363
// 1. Push down predicates for better filtering
6464
// 2. Avoid expensive JSON parsing at query time
6565
// 3. Leverage columnar storage benefits for the materialized fields
66-
#[tokio::main]
67-
async fn main() -> Result<()> {
66+
pub async fn json_shredding() -> Result<()> {
6867
println!("=== Creating example data with flat columns and underscore prefixes ===");
6968

7069
// Create sample data with flat columns using underscore prefixes
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! # These examples of data formats and I/O
19+
//!
20+
//! These examples demonstrate data formats and I/O.
21+
//!
22+
//! ## Usage
23+
//! ```bash
24+
//! cargo run --example data_io -- [catalog|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog]
25+
//! ```
26+
//!
27+
//! Each subcommand runs a corresponding example:
28+
//! - `catalog` — register the table into a custom catalog
29+
//! - `json_shredding` — shows how to implement custom filter rewriting for JSON shredding
30+
//! - `parquet_adv_idx` — create a detailed secondary index that covers the contents of several parquet files
31+
//! - `parquet_emb_idx` — store a custom index inside a Parquet file and use it to speed up queries
32+
//! - `parquet_enc_with_kms` — read and write encrypted Parquet files using an encryption factory
33+
//! - `parquet_enc` — read and write encrypted Parquet files using DataFusion
34+
//! - `parquet_exec_visitor` — extract statistics by visiting an ExecutionPlan after execution
35+
//! - `parquet_idx` — create an secondary index over several parquet files and use it to speed up queries
36+
//! - `query_http_csv` — configure `object_store` and run a query against files via HTTP
37+
//! - `remote_catalog` — interfacing with a remote catalog (e.g. over a network)
38+
39+
mod catalog;
40+
mod json_shredding;
41+
mod parquet_advanced_index;
42+
mod parquet_embedded_index;
43+
mod parquet_encrypted;
44+
mod parquet_encrypted_with_kms;
45+
mod parquet_exec_visitor;
46+
mod parquet_index;
47+
mod query_http_csv;
48+
mod remote_catalog;
49+
50+
use std::str::FromStr;
51+
52+
use datafusion::error::{DataFusionError, Result};
53+
54+
enum ExampleKind {
55+
Catalog,
56+
JsonShredding,
57+
ParquetAdvIdx,
58+
ParquetEmbIdx,
59+
ParquetEnc,
60+
ParquetEncWithKms,
61+
ParquetExecVisitor,
62+
ParquetIdx,
63+
QueryHttpCsv,
64+
RemoteCatalog,
65+
}
66+
67+
impl AsRef<str> for ExampleKind {
68+
fn as_ref(&self) -> &str {
69+
match self {
70+
Self::Catalog => "catalog",
71+
Self::JsonShredding => "json_shredding",
72+
Self::ParquetAdvIdx => "parquet_adv_idx",
73+
Self::ParquetEmbIdx => "parquet_emb_idx",
74+
Self::ParquetEnc => "parquet_enc",
75+
Self::ParquetEncWithKms => "parquet_enc_with_kms",
76+
Self::ParquetExecVisitor => "parquet_exec_visitor",
77+
Self::ParquetIdx => "parquet_idx",
78+
Self::QueryHttpCsv => "query_http_csv",
79+
Self::RemoteCatalog => "remote_catalog",
80+
}
81+
}
82+
}
83+
84+
impl FromStr for ExampleKind {
85+
type Err = DataFusionError;
86+
87+
fn from_str(s: &str) -> Result<Self> {
88+
match s {
89+
"catalog" => Ok(Self::Catalog),
90+
"json_shredding" => Ok(Self::JsonShredding),
91+
"parquet_adv_idx" => Ok(Self::ParquetAdvIdx),
92+
"parquet_emb_idx" => Ok(Self::ParquetEmbIdx),
93+
"parquet_enc" => Ok(Self::ParquetEnc),
94+
"parquet_enc_with_kms" => Ok(Self::ParquetEncWithKms),
95+
"parquet_exec_visitor" => Ok(Self::ParquetExecVisitor),
96+
"parquet_idx" => Ok(Self::ParquetIdx),
97+
"query_http_csv" => Ok(Self::QueryHttpCsv),
98+
"remote_catalog" => Ok(Self::RemoteCatalog),
99+
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
100+
}
101+
}
102+
}
103+
104+
impl ExampleKind {
105+
const ALL: [Self; 10] = [
106+
Self::Catalog,
107+
Self::JsonShredding,
108+
Self::ParquetAdvIdx,
109+
Self::ParquetEmbIdx,
110+
Self::ParquetEnc,
111+
Self::ParquetEncWithKms,
112+
Self::ParquetExecVisitor,
113+
Self::ParquetIdx,
114+
Self::QueryHttpCsv,
115+
Self::RemoteCatalog,
116+
];
117+
118+
const EXAMPLE_NAME: &str = "data_io";
119+
120+
fn variants() -> Vec<&'static str> {
121+
Self::ALL.iter().map(|x| x.as_ref()).collect()
122+
}
123+
}
124+
125+
#[tokio::main]
126+
async fn main() -> Result<()> {
127+
let usage = format!(
128+
"Usage: cargo run --example {} -- [{}]",
129+
ExampleKind::EXAMPLE_NAME,
130+
ExampleKind::variants().join("|")
131+
);
132+
133+
let arg = std::env::args().nth(1).ok_or_else(|| {
134+
eprintln!("{usage}");
135+
DataFusionError::Execution("Missing argument".to_string())
136+
})?;
137+
138+
match arg.parse::<ExampleKind>()? {
139+
ExampleKind::Catalog => catalog::catalog().await?,
140+
ExampleKind::JsonShredding => json_shredding::json_shredding().await?,
141+
ExampleKind::ParquetAdvIdx => {
142+
parquet_advanced_index::parquet_advanced_index().await?
143+
}
144+
ExampleKind::ParquetEmbIdx => {
145+
parquet_embedded_index::parquet_embedded_index().await?
146+
}
147+
ExampleKind::ParquetEncWithKms => {
148+
parquet_encrypted_with_kms::parquet_encrypted_with_kms().await?
149+
}
150+
ExampleKind::ParquetEnc => parquet_encrypted::parquet_encrypted().await?,
151+
ExampleKind::ParquetExecVisitor => {
152+
parquet_exec_visitor::parquet_exec_visitor().await?
153+
}
154+
ExampleKind::ParquetIdx => parquet_index::parquet_index().await?,
155+
ExampleKind::QueryHttpCsv => query_http_csv::query_http_csv().await?,
156+
ExampleKind::RemoteCatalog => remote_catalog::remote_catalog().await?,
157+
}
158+
159+
Ok(())
160+
}

datafusion-examples/examples/advanced_parquet_index.rs renamed to datafusion-examples/examples/data_io/parquet_advanced_index.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ use url::Url;
155155
///
156156
/// [`ListingTable`]: datafusion::datasource::listing::ListingTable
157157
/// [Page Index](https://github.com/apache/parquet-format/blob/master/PageIndex.md)
158-
#[tokio::main]
159-
async fn main() -> Result<()> {
158+
pub async fn parquet_advanced_index() -> Result<()> {
160159
// the object store is used to read the parquet files (in this case, it is
161160
// a local file system, but in a real system it could be S3, GCS, etc)
162161
let object_store: Arc<dyn ObjectStore> =
@@ -239,6 +238,7 @@ pub struct IndexTableProvider {
239238
/// if true, use row selections in addition to row group selections
240239
use_row_selections: AtomicBool,
241240
}
241+
242242
impl IndexTableProvider {
243243
/// Create a new IndexTableProvider
244244
/// * `object_store` - the object store implementation to use for reading files
@@ -539,6 +539,7 @@ impl CachedParquetFileReaderFactory {
539539
metadata: HashMap::new(),
540540
}
541541
}
542+
542543
/// Add the pre-parsed information about the file to the factor
543544
fn with_file(mut self, indexed_file: &IndexedFile) -> Self {
544545
self.metadata.insert(

datafusion-examples/examples/parquet_embedded_index.rs renamed to datafusion-examples/examples/data_io/parquet_embedded_index.rs

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,31 @@ use std::path::{Path, PathBuf};
136136
use std::sync::Arc;
137137
use tempfile::TempDir;
138138

139+
/// Store a custom index inside a Parquet file and use it to speed up queries
140+
pub async fn parquet_embedded_index() -> Result<()> {
141+
// 1. Create temp dir and write 3 Parquet files with different category sets
142+
let tmp = TempDir::new()?;
143+
let dir = tmp.path();
144+
write_file_with_index(&dir.join("a.parquet"), &["foo", "bar", "foo"])?;
145+
write_file_with_index(&dir.join("b.parquet"), &["baz", "qux"])?;
146+
write_file_with_index(&dir.join("c.parquet"), &["foo", "quux", "quux"])?;
147+
148+
// 2. Register our custom TableProvider
149+
let field = Field::new("category", DataType::Utf8, false);
150+
let schema_ref = Arc::new(Schema::new(vec![field]));
151+
let provider = Arc::new(DistinctIndexTable::try_new(dir, schema_ref.clone())?);
152+
153+
let ctx = SessionContext::new();
154+
ctx.register_table("t", provider)?;
155+
156+
// 3. Run a query: only files containing 'foo' get scanned. The rest are pruned.
157+
// based on the distinct index.
158+
let df = ctx.sql("SELECT * FROM t WHERE category = 'foo'").await?;
159+
df.show().await?;
160+
161+
Ok(())
162+
}
163+
139164
/// An index of distinct values for a single column
140165
///
141166
/// In this example the index is a simple set of strings, but in a real
@@ -452,28 +477,3 @@ impl TableProvider for DistinctIndexTable {
452477
Ok(vec![TableProviderFilterPushDown::Inexact; fs.len()])
453478
}
454479
}
455-
456-
#[tokio::main]
457-
async fn main() -> Result<()> {
458-
// 1. Create temp dir and write 3 Parquet files with different category sets
459-
let tmp = TempDir::new()?;
460-
let dir = tmp.path();
461-
write_file_with_index(&dir.join("a.parquet"), &["foo", "bar", "foo"])?;
462-
write_file_with_index(&dir.join("b.parquet"), &["baz", "qux"])?;
463-
write_file_with_index(&dir.join("c.parquet"), &["foo", "quux", "quux"])?;
464-
465-
// 2. Register our custom TableProvider
466-
let field = Field::new("category", DataType::Utf8, false);
467-
let schema_ref = Arc::new(Schema::new(vec![field]));
468-
let provider = Arc::new(DistinctIndexTable::try_new(dir, schema_ref.clone())?);
469-
470-
let ctx = SessionContext::new();
471-
ctx.register_table("t", provider)?;
472-
473-
// 3. Run a query: only files containing 'foo' get scanned. The rest are pruned.
474-
// based on the distinct index.
475-
let df = ctx.sql("SELECT * FROM t WHERE category = 'foo'").await?;
476-
df.show().await?;
477-
478-
Ok(())
479-
}

datafusion-examples/examples/parquet_encrypted.rs renamed to datafusion-examples/examples/data_io/parquet_encrypted.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use datafusion::prelude::{ParquetReadOptions, SessionContext};
2525
use std::sync::Arc;
2626
use tempfile::TempDir;
2727

28-
#[tokio::main]
29-
async fn main() -> datafusion::common::Result<()> {
28+
/// Read and write encrypted Parquet files using DataFusion
29+
pub async fn parquet_encrypted() -> datafusion::common::Result<()> {
3030
// The SessionContext is the main high level API for interacting with DataFusion
3131
let ctx = SessionContext::new();
3232

0 commit comments

Comments
 (0)