Skip to content

Commit

Permalink
fix: rework UWheelOptimizer to use OptimizerRule, closes #11
Browse files Browse the repository at this point in the history
  • Loading branch information
Max-Meldrum committed Aug 6, 2024
1 parent cfaabbb commit f88ea80
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 560 deletions.
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ resolver = "2"
members = ["benchmarks/nyc_taxi_bench", "datafusion-uwheel"]

[workspace.package]
version = "38.0.0"
version = "40.0.0"
edition = "2021"
authors = ["Max Meldrum <max@meldrum.se>"]
license = "Apache-2.0"
Expand All @@ -19,9 +19,7 @@ uwheel = { version = "0.2.0", default-features = false, features = [
"max",
"all",
] }
datafusion = "38.0.0"
async-trait = "0.1.81"
datafusion = "40.0.0"
chrono = "0.4.38"
bitpacking = "0.9.2"
tokio = "1.38.1"
futures = "0.3.30"
2 changes: 1 addition & 1 deletion benchmarks/nyc_taxi_bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ debug = []

[dependencies]
datafusion-uwheel = { path = "../../datafusion-uwheel" }
datafusion = "38.0.0"
datafusion = "40.0.0"
mimalloc = { version = "*", default-features = false, optional = true }
tokio = { version = "1", features = ["full"] }
chrono = "0.4.38"
Expand Down
78 changes: 40 additions & 38 deletions benchmarks/nyc_taxi_bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion_uwheel::UWheelOptimizer;
use datafusion_uwheel::{IndexBuilder, UWheelOptimizer};

use chrono::{DateTime, NaiveDate, Utc};
use clap::Parser;
Expand Down Expand Up @@ -45,7 +45,7 @@ async fn main() -> Result<()> {
let filename = "../../data/yellow_tripdata_2022-01.parquet";

// register parquet file with the execution context
ctx.register_parquet("yellow_tripdata", &filename, ParquetReadOptions::default())
ctx.register_parquet("yellow_tripdata", filename, ParquetReadOptions::default())
.await?;

// Create ctx with UWheelOptimizer
Expand Down Expand Up @@ -78,14 +78,24 @@ async fn main() -> Result<()> {
Builder::new("tpep_dropoff_datetime")
.with_name("yellow_tripdata")
.with_min_max_wheels(vec!["fare_amount", "trip_distance"]) // Create Min/Max wheels for the columns "fare_amount" and "trip_distance"
.with_sum_wheels(vec!["fare_amount"])
.build_with_provider(provider)
.await
.unwrap(),
);

// Set UWheelOptimizer as the query planner
let session_state = uwheel_ctx.state().with_query_planner(optimizer.clone());
// Build index on fare_amount using SUM as aggregate
optimizer
.build_index(IndexBuilder::with_col_and_aggregate(
"fare_amount",
datafusion_uwheel::AggregateType::Sum,
))
.await
.unwrap();

// Set UWheelOptimizer as optimizer rule
let session_state = uwheel_ctx
.state()
.with_optimizer_rules(vec![optimizer.clone()]);
let uwheel_ctx = SessionContext::new_with_state(session_state);

// Register the table using the underlying provider
Expand Down Expand Up @@ -131,33 +141,33 @@ pub async fn bench(
ranges: &[(u64, u64)],
fares: &[f64],
) {
bench_datafusion_count("datafusion-count(*)", &ctx, &ranges).await;
bench_datafusion_count("datafusion-uwheel-count(*)", &uwheel_ctx, &ranges).await;
bench_datafusion_count("datafusion-count(*)", ctx, ranges).await;
bench_datafusion_count("datafusion-uwheel-count(*)", uwheel_ctx, ranges).await;

bench_datafusion_sum_fare_amount("datafusion-sum(fare_amount)", &ctx, &ranges).await;
bench_datafusion_sum_fare_amount("datafusion-uwheel-sum(fare_amount)", &uwheel_ctx, &ranges)
bench_datafusion_sum_fare_amount("datafusion-sum(fare_amount)", ctx, ranges).await;
bench_datafusion_sum_fare_amount("datafusion-uwheel-sum(fare_amount)", uwheel_ctx, ranges)
.await;

bench_min_max_projection(
"datafusion-select(*)-fare-amount-filter",
&ctx,
&ranges,
&fares,
ctx,
ranges,
fares,
)
.await;
bench_min_max_projection(
"datafusion-uwheel-select(*)-fare-amount-filter",
&uwheel_ctx,
&ranges,
&fares,
uwheel_ctx,
ranges,
fares,
)
.await;

bench_datafusion_temporal_projection("datafusion-select(*)-count-filter", &ctx, &ranges).await;
bench_datafusion_temporal_projection("datafusion-select(*)-count-filter", ctx, ranges).await;
bench_datafusion_temporal_projection(
"datafusion-uwheel-select(*)-count-filter",
&uwheel_ctx,
&ranges,
uwheel_ctx,
ranges,
)
.await;
}
Expand Down Expand Up @@ -244,13 +254,11 @@ async fn bench_datafusion_count(id: &str, ctx: &SessionContext, ranges: &[(u64,
.map(|(start, end)| {
let start = DateTime::from_timestamp_millis(start as i64)
.unwrap()
.to_utc()
.naive_utc()
.to_rfc3339()
.to_string();
let end = DateTime::from_timestamp_millis(end as i64)
.unwrap()
.to_utc()
.naive_utc()
.to_rfc3339()
.to_string();
format!(
"SELECT COUNT(*) FROM yellow_tripdata \
Expand Down Expand Up @@ -298,18 +306,16 @@ async fn bench_datafusion_sum_fare_amount(id: &str, ctx: &SessionContext, ranges
.map(|(start, end)| {
let start = DateTime::from_timestamp_millis(start as i64)
.unwrap()
.to_utc()
.naive_utc()
.to_rfc3339()
.to_string();
let end = DateTime::from_timestamp_millis(end as i64)
.unwrap()
.to_utc()
.naive_utc()
.to_rfc3339()
.to_string();
format!(
"SELECT SUM(fare_amount) FROM yellow_tripdata \
WHERE tpep_dropoff_datetime >= '{}' \
AND tpep_dropoff_datetime < '{}'",
WHERE tpep_dropoff_datetime >= '{}' \
AND tpep_dropoff_datetime < '{}'",
start, end
)
})
Expand Down Expand Up @@ -358,13 +364,11 @@ async fn bench_min_max_projection(
.map(|((start, end), fare)| {
let start = DateTime::from_timestamp_millis(start as i64)
.unwrap()
.to_utc()
.naive_utc()
.to_rfc3339()
.to_string();
let end = DateTime::from_timestamp_millis(end as i64)
.unwrap()
.to_utc()
.naive_utc()
.to_rfc3339()
.to_string();
format!(
"SELECT * FROM yellow_tripdata \
Expand All @@ -382,7 +386,7 @@ async fn bench_min_max_projection(
// dbg!(&query);
let now = Instant::now();
let df = ctx.sql(&query).await.unwrap();
let res = df.collect().await.unwrap();
let _res = df.collect().await.unwrap();
hist.record(now.elapsed().as_micros() as u64).unwrap();
}
let runtime = full.elapsed();
Expand All @@ -409,13 +413,11 @@ async fn bench_datafusion_temporal_projection(
.map(|(start, end)| {
let start = DateTime::from_timestamp_millis(start as i64)
.unwrap()
.to_utc()
.naive_utc()
.to_rfc3339()
.to_string();
let end = DateTime::from_timestamp_millis(end as i64)
.unwrap()
.to_utc()
.naive_utc()
.to_rfc3339()
.to_string();
format!(
"SELECT * FROM yellow_tripdata \
Expand All @@ -432,7 +434,7 @@ async fn bench_datafusion_temporal_projection(
// dbg!(&query);
let now = Instant::now();
let df = ctx.sql(&query).await.unwrap();
let res = df.collect().await.unwrap();
let _res = df.collect().await.unwrap();
hist.record(now.elapsed().as_micros() as u64).unwrap();
}
let runtime = full.elapsed();
Expand Down
3 changes: 0 additions & 3 deletions datafusion-uwheel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ edition.workspace = true
[dependencies]
datafusion.workspace = true
uwheel.workspace = true
async-trait.workspace = true
chrono.workspace = true
bitpacking.workspace = true
futures.workspace = true


[dev-dependencies]
tokio = "1.38.1"
74 changes: 16 additions & 58 deletions datafusion-uwheel/examples/nyc_taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@ use datafusion::{
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::Result,
physical_plan::{coalesce_batches::CoalesceBatchesExec, collect, empty::EmptyExec},
prelude::{col, count, lit, SessionContext},
physical_plan::collect,
prelude::{col, lit, SessionContext},
scalar::ScalarValue,
};
use datafusion_uwheel::{
builder::Builder,
exec::{UWheelCountExec, UWheelSumExec},
AggregateType, IndexBuilder, UWheelOptimizer,
};
use datafusion_uwheel::{builder::Builder, AggregateType, IndexBuilder, UWheelOptimizer};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
Expand Down Expand Up @@ -47,41 +43,39 @@ async fn main() -> Result<()> {
Builder::new("tpep_dropoff_datetime")
.with_name("yellow_tripdata")
.with_min_max_wheels(vec!["fare_amount", "trip_distance"])
.with_sum_wheels(vec!["fare_amount"])
.build_with_provider(provider)
.await
.unwrap(),
);

// Build a wheel SUM on fare_amount
let builder = IndexBuilder::with_col_and_aggregate("fare_amount", AggregateType::Sum);
optimizer.build_index(builder).await?;

// Build a wheel for a custom expression
let builder = IndexBuilder::with_col_and_aggregate("fare_amount", AggregateType::Sum)
.with_filter(col("passenger_count").eq(lit(ScalarValue::Float64(Some(4.0)))));

optimizer.build_index(builder).await?;

// Set UWheelOptimizer as the query planner
let session_state = ctx.state().with_query_planner(optimizer.clone());
let session_state = ctx.state().with_optimizer_rules(vec![optimizer.clone()]);
let ctx = SessionContext::new_with_state(session_state);

// Register the table using the underlying provider
ctx.register_table("yellow_tripdata", optimizer.provider())
.unwrap();

// This query will then use the UWheelOptimizer to execute
let plan = ctx
let df = ctx
.sql(
"SELECT COUNT(*) FROM yellow_tripdata
WHERE tpep_dropoff_datetime >= '2022-01-01T00:00:00Z'
AND tpep_dropoff_datetime < '2022-02-01T00:00:00Z'",
)
.await?
.create_physical_plan()
.await?;

// The plan should be a UWheelCountExec
let uwheel_exec = plan.as_any().downcast_ref::<UWheelCountExec>().unwrap();
dbg!(uwheel_exec);

let plan = df.create_physical_plan().await?;
// Execute the plan
let results: Vec<RecordBatch> = collect(plan, ctx.task_ctx()).await?;
arrow::util::pretty::print_batches(&results).unwrap();
Expand All @@ -99,7 +93,7 @@ async fn main() -> Result<()> {
let results: Vec<RecordBatch> = collect(sum_plan, ctx.task_ctx()).await?;
arrow::util::pretty::print_batches(&results).unwrap();

let physical_plan = ctx
let filter_plan = ctx
.sql(
"SELECT * FROM yellow_tripdata
WHERE tpep_dropoff_datetime >= '2022-01-01T00:00:00Z'
Expand All @@ -109,23 +103,8 @@ async fn main() -> Result<()> {
.create_physical_plan()
.await?;

// Verify that the plan is optimized to EmptyExec
assert!(physical_plan.as_any().downcast_ref::<EmptyExec>().is_some());

let physical_plan = ctx
.sql(
"SELECT * FROM yellow_tripdata
WHERE tpep_dropoff_datetime >= '2022-01-01T00:00:00Z'
AND tpep_dropoff_datetime < '2022-01-02T00:00:00Z'",
)
.await?
.create_physical_plan()
.await?;

assert!(physical_plan
.as_any()
.downcast_ref::<CoalesceBatchesExec>()
.is_some());
let results: Vec<RecordBatch> = collect(filter_plan, ctx.task_ctx()).await?;
arrow::util::pretty::print_batches(&results).unwrap();

let min_max = ctx
.sql(
Expand All @@ -137,25 +116,9 @@ async fn main() -> Result<()> {
.await?
.create_physical_plan()
.await?;
// verify that it returned an EmptyExec
assert!(min_max.as_any().downcast_ref::<EmptyExec>().is_some());

let between_plan = ctx
.sql(
"SELECT COUNT(*) FROM yellow_tripdata
WHERE tpep_dropoff_datetime BETWEEN '2022-01-01T00:00:00Z'
AND '2022-02-01T00:00:00Z'",
)
.await?
.create_physical_plan()
.await?;

// The plan should be a UWheelCountExec
let uwheel_exec = between_plan
.as_any()
.downcast_ref::<UWheelCountExec>()
.unwrap();
dbg!(uwheel_exec);
let results: Vec<RecordBatch> = collect(min_max, ctx.task_ctx()).await?;
assert!(results.is_empty());
arrow::util::pretty::print_batches(&results).unwrap();

// We created an index for this SQL query earlier so it execute as a UWheelSumExec
let sum_keyed_plan = ctx
Expand All @@ -169,11 +132,6 @@ async fn main() -> Result<()> {
.create_physical_plan()
.await?;

assert!(sum_keyed_plan
.as_any()
.downcast_ref::<UWheelSumExec>()
.is_some());

let results: Vec<RecordBatch> = collect(sum_keyed_plan, ctx.task_ctx()).await?;
arrow::util::pretty::print_batches(&results).unwrap();

Expand Down
Loading

0 comments on commit f88ea80

Please sign in to comment.