Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add datafusion-spark crate #14392

Open
wants to merge 69 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
33f3e9c
feat: Create new `datafusion-comet-spark-expr` crate containing Spark…
andygrove Jul 10, 2024
96a2f41
feat: Move `IfExpr` to `spark-expr` crate (#653)
andygrove Jul 11, 2024
2f22a4d
chore: Refactoring of CometError/SparkError (#655)
andygrove Jul 12, 2024
11138bb
chore: Move `cast` to `spark-expr` crate (#654)
andygrove Jul 12, 2024
fb7b198
remove utils crate and move utils into spark-expr crate (#658)
andygrove Jul 12, 2024
d510649
chore: Move temporal kernels and expressions to spark-expr crate (#660)
andygrove Jul 15, 2024
46e8bf2
fix: Optimize some functions to rewrite dictionary-encoded strings (#…
vaibhawvipul Jul 15, 2024
2179331
Change suffix on some expressions from Exec to Expr (#673)
andygrove Jul 16, 2024
01e21a9
chore: Disable abs and signum because they return incorrect results (…
andygrove Jul 20, 2024
01362b5
chore: Make rust clippy happy (#701)
Xuanwo Jul 21, 2024
e2d838e
perf: Optimize IfExpr by delegating to CaseExpr (#681)
andygrove Jul 24, 2024
5dcf713
chore: make Cast's logic reusable for other projects (#716)
Blizzara Jul 27, 2024
2a4dc7b
chore: move scalar_funcs into spark-expr (#712)
Blizzara Jul 28, 2024
0a00325
chore: Add criterion benchmark for decimal_div (#743)
andygrove Jul 31, 2024
abfce13
Add support for time-zone, 3 & 5 digit years: Cast from string to tim…
akhilss99 Aug 1, 2024
1a02d58
fix: Remove castting on decimals with a small precision to decimal256…
kazuyukitanimura Aug 1, 2024
33f1ce9
feat: Implement basic version of RLIKE (#734)
andygrove Aug 2, 2024
607ee7d
feat: Add GetStructField expression (#731)
Kimahriman Aug 3, 2024
e3709ea
fix: Optimize read_side_padding (#772)
kazuyukitanimura Aug 8, 2024
d2853ef
feat: Optimze CreateNamedStruct preserve dictionaries (#789)
eejbyfeldt Aug 9, 2024
5a37afc
feat: Enable `clippy::clone_on_ref_ptr` on `proto` and `spark_exprs` …
comphead Aug 21, 2024
3cf7c4f
feat: Implement basic version of string to float/double/decimal (#870)
andygrove Aug 28, 2024
9d86fa9
feat: Implement to_json for subset of types (#805)
andygrove Aug 28, 2024
d9eecde
feat: Support sort merge join with a join condition (#553)
viirya Aug 30, 2024
74097da
feat: Array element extraction (#899)
Kimahriman Sep 5, 2024
b5763f6
chore: Upgrade to latest DataFusion revision (#909)
andygrove Sep 5, 2024
c7ed2eb
build: fix build (#917)
andygrove Sep 5, 2024
c7ec300
feat: date_add and date_sub functions (#910)
mbutrovich Sep 16, 2024
ff41f1b
feat: Support `GetArrayStructFields` expression (#993)
Kimahriman Oct 7, 2024
e1e5483
chore: Bump arrow-rs to 53.1.0 and datafusion (#1001)
kazuyukitanimura Oct 14, 2024
2fea49f
chore: Use twox-hash 2.0 xxhash64 oneshot api instead of custom imple…
NoeB Oct 30, 2024
118701a
chore: Upgrade to DataFusion 43.0.0-rc1 (#1057)
andygrove Nov 5, 2024
850cde4
feat: Implement CAST from struct to string (#1066)
andygrove Nov 8, 2024
b534608
feat: Implement CAST between struct types (#1074)
andygrove Nov 11, 2024
e07a64b
fix: Unsigned type related bugs (#1095)
kazuyukitanimura Nov 19, 2024
97b750e
feat: support array_insert (#1073)
SemyonSinchenko Nov 22, 2024
7fff547
chore: Refactor cast to use SparkCastOptions param (#1146)
andygrove Dec 6, 2024
df2350f
chore: Move more expressions from core crate to spark-expr crate (#1152)
andygrove Dec 9, 2024
3859724
chore: Move string kernels and expressions to spark-expr crate (#1164)
andygrove Dec 12, 2024
f69148d
chore: Move remaining expressions to spark-expr crate + some minor re…
andygrove Dec 12, 2024
7bd99f6
feat: Add Spark-compatible implementation of SchemaAdapterFactory (#1…
andygrove Dec 17, 2024
67038db
fix: stddev_pop should not directly return 0.0 when count is 1.0 (#1184)
viirya Dec 19, 2024
cf44902
chore: Upgrade to DataFusion 44.0.0-rc2 (#1154)
andygrove Dec 28, 2024
4af3a2f
extract struct expressions to folders based on spark grouping (#1216)
rluvaton Jan 6, 2025
11421f7
chore: extract static invoke expressions to folders based on spark gr…
rluvaton Jan 6, 2025
37b7acb
chore: extract agg_funcs expressions to folders based on spark groupi…
rluvaton Jan 7, 2025
ac22924
extract datetime_funcs expressions to folders based on spark grouping…
rluvaton Jan 7, 2025
7a23f62
chore: extract strings file to `strings_func` like in spark grouping …
rluvaton Jan 8, 2025
5f773e6
chore: extract predicate_functions expressions to folders based on sp…
rluvaton Jan 8, 2025
8e6e444
extract json_funcs expressions to folders based on spark grouping (#1…
rluvaton Jan 8, 2025
31a725f
chore: extract hash_funcs expressions to folders based on spark group…
rluvaton Jan 9, 2025
2cd57b5
chore: extract conversion_funcs, conditional_funcs, bitwise_funcs and…
rluvaton Jan 18, 2025
ba08511
fix: partially fix consistency issue of hash functions with decimal i…
wForget Jan 19, 2025
219a133
chore: extract math_funcs expressions to folders based on spark group…
rluvaton Jan 20, 2025
bbb8548
chore: merge comet-parquet-exec branch into main (#1318)
andygrove Jan 21, 2025
1ba1e2b
fix merge conflicts (#1320)
andygrove Jan 22, 2025
d9fe1b3
chore: Fix merge conflicts from merging comet-parquet-exec into main …
mbutrovich Jan 23, 2025
35821f1
chore: Prepare for DataFusion 45 (bump to DataFusion rev 5592834 + Ar…
andygrove Jan 30, 2025
fc41754
Add spark-expr crate
andygrove Jan 31, 2025
53aec1a
move benches
andygrove Jan 31, 2025
1c0901a
fix Cargo.toml
andygrove Jan 31, 2025
c404172
fix header format
andygrove Jan 31, 2025
024912a
add to workspace
andygrove Jan 31, 2025
071268d
prettier
andygrove Jan 31, 2025
f563cb7
fmt
andygrove Jan 31, 2025
af2ec26
fix merge conflict in README
andygrove Jan 31, 2025
d7eaf68
fix cargo doc failures
andygrove Jan 31, 2025
96f2136
taplo fmt
andygrove Jan 31, 2025
77e7831
rename functions-spark to spark
andygrove Jan 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ members = [
"datafusion/proto/gen",
"datafusion/proto-common",
"datafusion/proto-common/gen",
"datafusion/spark",
"datafusion/sql",
"datafusion/sqllogictest",
"datafusion/substrait",
Expand Down Expand Up @@ -84,6 +85,7 @@ arrow-array = { version = "54.0.0", default-features = false, features = [
"chrono-tz",
] }
arrow-buffer = { version = "54.0.0", default-features = false }
arrow-data = { version = "54.0.0", default-features = false }
arrow-flight = { version = "54.0.0", features = [
"flight-sql-experimental",
] }
Expand Down
78 changes: 78 additions & 0 deletions datafusion/spark/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-spark"
description = "DataFusion expressions that emulate Apache Spark's behavior"
version = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
authors = { workspace = true }
readme = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
chrono-tz = "0.10.1"
datafusion = { workspace = true, features = ["parquet"] }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-expr-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
futures = { workspace = true }
num = "0.4.3"
rand = { workspace = true }
regex = { workspace = true }
thiserror = "2.0.11"
twox-hash = "2.0.0"

[dev-dependencies]
arrow-data = { workspace = true }
criterion = "0.5.1"
parquet = { workspace = true, features = ["arrow"] }
rand = { workspace = true }
tokio = { version = "1", features = ["rt-multi-thread"] }

[lib]
name = "datafusion_comet_spark_expr"
path = "src/lib.rs"

[[bench]]
name = "cast_from_string"
harness = false

[[bench]]
name = "cast_numeric"
harness = false

[[bench]]
name = "conditional"
harness = false

[[bench]]
name = "decimal_div"
harness = false

[[bench]]
name = "aggregate"
harness = false
22 changes: 22 additions & 0 deletions datafusion/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# datafusion-spark: Spark-compatible Expressions

This crate provides Apache Spark-compatible expressions for use with DataFusion.
201 changes: 201 additions & 0 deletions datafusion/spark/benches/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::{Decimal128Builder, StringBuilder};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::SchemaRef;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion::execution::TaskContext;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_comet_spark_expr::AvgDecimal;
use datafusion_comet_spark_expr::SumDecimal;
use datafusion_expr::AggregateUDF;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::Column;
use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("aggregate");
let num_rows = 8192;
let batch = create_record_batch(num_rows);
let mut batches = Vec::new();
for _ in 0..10 {
batches.push(batch.clone());
}
let partitions = &[batches];
let c0: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c0", 0));
let c1: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c1", 1));

let rt = Runtime::new().unwrap();

group.bench_function("avg_decimal_datafusion", |b| {
let datafusion_sum_decimal = avg_udaf();
b.to_async(&rt).iter(|| {
black_box(agg_test(
partitions,
c0.clone(),
c1.clone(),
datafusion_sum_decimal.clone(),
"avg",
))
})
});

group.bench_function("avg_decimal_comet", |b| {
let comet_avg_decimal = Arc::new(AggregateUDF::new_from_impl(AvgDecimal::new(
DataType::Decimal128(38, 10),
DataType::Decimal128(38, 10),
)));
b.to_async(&rt).iter(|| {
black_box(agg_test(
partitions,
c0.clone(),
c1.clone(),
comet_avg_decimal.clone(),
"avg",
))
})
});

group.bench_function("sum_decimal_datafusion", |b| {
let datafusion_sum_decimal = sum_udaf();
b.to_async(&rt).iter(|| {
black_box(agg_test(
partitions,
c0.clone(),
c1.clone(),
datafusion_sum_decimal.clone(),
"sum",
))
})
});

group.bench_function("sum_decimal_comet", |b| {
let comet_sum_decimal = Arc::new(AggregateUDF::new_from_impl(
SumDecimal::try_new(DataType::Decimal128(38, 10)).unwrap(),
));
b.to_async(&rt).iter(|| {
black_box(agg_test(
partitions,
c0.clone(),
c1.clone(),
comet_sum_decimal.clone(),
"sum",
))
})
});

group.finish();
}

async fn agg_test(
partitions: &[Vec<RecordBatch>],
c0: Arc<dyn PhysicalExpr>,
c1: Arc<dyn PhysicalExpr>,
aggregate_udf: Arc<AggregateUDF>,
alias: &str,
) {
let schema = &partitions[0][0].schema();
let scan: Arc<dyn ExecutionPlan> =
Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap());
let aggregate =
create_aggregate(scan, c0.clone(), c1.clone(), schema, aggregate_udf, alias);
let mut stream = aggregate
.execute(0, Arc::new(TaskContext::default()))
.unwrap();
while let Some(batch) = stream.next().await {
let _batch = batch.unwrap();
}
}

fn create_aggregate(
scan: Arc<dyn ExecutionPlan>,
c0: Arc<dyn PhysicalExpr>,
c1: Arc<dyn PhysicalExpr>,
schema: &SchemaRef,
aggregate_udf: Arc<AggregateUDF>,
alias: &str,
) -> Arc<AggregateExec> {
let aggr_expr = AggregateExprBuilder::new(aggregate_udf, vec![c1])
.schema(schema.clone())
.alias(alias)
.with_ignore_nulls(false)
.with_distinct(false)
.build()
.unwrap();

Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::new_single(vec![(c0, "c0".to_string())]),
vec![aggr_expr.into()],
vec![None], // no filter expressions
scan,
Arc::clone(schema),
)
.unwrap(),
)
}

fn create_record_batch(num_rows: usize) -> RecordBatch {
let mut decimal_builder = Decimal128Builder::with_capacity(num_rows);
let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
for i in 0..num_rows {
decimal_builder.append_value(i as i128);
string_builder.append_value(format!("this is string #{}", i % 1024));
}
let decimal_array = Arc::new(decimal_builder.finish());
let string_array = Arc::new(string_builder.finish());

let mut fields = vec![];
let mut columns: Vec<ArrayRef> = vec![];

// string column
fields.push(Field::new("c0", DataType::Utf8, false));
columns.push(string_array);

// decimal column
fields.push(Field::new("c1", DataType::Decimal128(38, 10), false));
columns.push(decimal_array);

let schema = Schema::new(fields);
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
}

fn config() -> Criterion {
Criterion::default()
.measurement_time(Duration::from_millis(500))
.warm_up_time(Duration::from_millis(500))
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
79 changes: 79 additions & 0 deletions datafusion/spark/benches/cast_from_string.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_comet_spark_expr::{Cast, EvalMode, SparkCastOptions};
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
let batch = create_utf8_batch();
let expr = Arc::new(Column::new("a", 0));
let spark_cast_options = SparkCastOptions::new(EvalMode::Legacy, "", false);
let cast_string_to_i8 =
Cast::new(expr.clone(), DataType::Int8, spark_cast_options.clone());
let cast_string_to_i16 =
Cast::new(expr.clone(), DataType::Int16, spark_cast_options.clone());
let cast_string_to_i32 =
Cast::new(expr.clone(), DataType::Int32, spark_cast_options.clone());
let cast_string_to_i64 = Cast::new(expr, DataType::Int64, spark_cast_options);

let mut group = c.benchmark_group("cast_string_to_int");
group.bench_function("cast_string_to_i8", |b| {
b.iter(|| cast_string_to_i8.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i16", |b| {
b.iter(|| cast_string_to_i16.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i32", |b| {
b.iter(|| cast_string_to_i32.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i64", |b| {
b.iter(|| cast_string_to_i64.evaluate(&batch).unwrap());
});
}

// Create UTF8 batch with strings representing ints, floats, nulls
fn create_utf8_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let mut b = StringBuilder::new();
for i in 0..1000 {
if i % 10 == 0 {
b.append_null();
} else if i % 2 == 0 {
b.append_value(format!("{}", rand::random::<f64>()));
} else {
b.append_value(format!("{}", rand::random::<i64>()));
}
}
let array = b.finish();

RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap()
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
Loading