Skip to content

Commit

Permalink
Update text
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 15, 2022
2 parents 159980f + 011bcf4 commit fe582a7
Show file tree
Hide file tree
Showing 248 changed files with 15,353 additions and 6,789 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/dev_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ jobs:
process:
name: Process
runs-on: ubuntu-latest
# only run for users whose permissions allow them to update PRs
# otherwise labeler is failing:
# https://github.com/apache/arrow-datafusion/issues/3743
permissions:
contents: read
pull-requests: write
steps:
- uses: actions/checkout@v3

Expand All @@ -36,7 +42,7 @@ jobs:
github.event_name == 'pull_request_target' &&
(github.event.action == 'opened' ||
github.event.action == 'synchronize')
uses: actions/labeler@v4.0.1
uses: actions/labeler@v4.0.2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
configuration-path: .github/workflows/dev_pr/labeler.yml
Expand Down
132 changes: 61 additions & 71 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ jobs:
- name: Check workspace with all features
run: |
cargo check --workspace --benches --features avro,jit,scheduler,json
- name: Check Cargo.lock for datafusion-cli
run: |
cargo check --manifest-path datafusion-cli/Cargo.toml --locked
# test the crate
linux-test:
Expand Down Expand Up @@ -101,13 +104,26 @@ jobs:
run: |
export PATH=$PATH:$HOME/d/protoc/bin
cargo test --features avro,jit,scheduler,json
- name: Run examples
run: |
export PATH=$PATH:$HOME/d/protoc/bin
# test datafusion-sql examples
cargo run --example sql
# test datafusion examples
cd datafusion-examples
# test datafusion-examples
cargo run --example avro_sql --features=datafusion/avro
cargo run --example csv_sql
cargo run --example custom_datasource
cargo run --example dataframe
cargo run --example dataframe_in_memory
cargo run --example deserialize_to_struct
cargo run --example expr_api
cargo run --example parquet_sql
cargo run --example avro_sql --features=datafusion/avro
cargo run --example parquet_sql_multiple_files
cargo run --example memtable
cargo run --example rewrite_expr
cargo run --example simple_udf
cargo run --example simple_udaf
integration-test:
name: "Compare to postgres"
Expand All @@ -119,6 +135,7 @@ jobs:
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: db_test
POSTGRES_INITDB_ARGS: --encoding=UTF-8 --lc-collate=C --lc-ctype=C
ports:
- 5432/tcp
options: >-
Expand Down Expand Up @@ -284,44 +301,47 @@ jobs:
echo '' > datafusion/proto/src/generated/datafusion.rs
ci/scripts/rust_fmt.sh
coverage:
name: coverage
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
submodules: true
- name: Install protobuf compiler
shell: bash
run: |
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
export PATH=$PATH:$HOME/d/protoc/bin
protoc --version
- name: Setup Rust toolchain
run: |
rustup toolchain install stable
rustup default stable
rustup component add rustfmt clippy
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /home/runner/.cargo
# this key is not equal because the user is different than on a container (runner vs github)
key: cargo-coverage-cache3-
- name: Run coverage
run: |
export PATH=$PATH:$HOME/d/protoc/bin
rustup toolchain install stable
rustup default stable
cargo install --version 0.20.1 cargo-tarpaulin
cargo tarpaulin --all --out Xml
- name: Report coverage
continue-on-error: true
run: bash <(curl -s https://codecov.io/bash)
# Coverage job disabled due to
# https://github.com/apache/arrow-datafusion/issues/3678

# coverage:
# name: coverage
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v3
# with:
# submodules: true
# - name: Install protobuf compiler
# shell: bash
# run: |
# mkdir -p $HOME/d/protoc
# cd $HOME/d/protoc
# export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
# curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
# unzip $PROTO_ZIP
# export PATH=$PATH:$HOME/d/protoc/bin
# protoc --version
# - name: Setup Rust toolchain
# run: |
# rustup toolchain install stable
# rustup default stable
# rustup component add rustfmt clippy
# - name: Cache Cargo
# uses: actions/cache@v3
# with:
# path: /home/runner/.cargo
# # this key is not equal because the user is different than on a container (runner vs github)
# key: cargo-coverage-cache3-
# - name: Run coverage
# run: |
# export PATH=$PATH:$HOME/d/protoc/bin
# rustup toolchain install stable
# rustup default stable
# cargo install --version 0.20.1 cargo-tarpaulin
# cargo tarpaulin --all --out Xml
# - name: Report coverage
# continue-on-error: true
# run: bash <(curl -s https://codecov.io/bash)

clippy:
name: clippy
Expand Down Expand Up @@ -353,36 +373,6 @@ jobs:
- name: Run clippy
run: ci/scripts/rust_clippy.sh

miri-checks:
name: cargo miri test (amd64)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
submodules: true
- uses: actions/cache@v3
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-miri-${{ hashFiles('**/Cargo.lock') }}
- name: Setup Rust toolchain
run: |
rustup toolchain install nightly-2022-01-17
rustup default nightly-2022-01-17
rustup component add rustfmt clippy miri
- name: Run Miri Checks
env:
RUST_BACKTRACE: full
RUST_LOG: "trace"
MIRIFLAGS: "-Zmiri-disable-isolation"
run: |
cargo miri setup
cargo clean
# Ignore MIRI errors until we can get a clean run
cargo miri test || true
# Check answers are correct when hash values collide
hash-collisions:
name: cargo test hash collisions (amd64)
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ cpp/Brewfile.lock.json
target
Cargo.lock
!datafusion-cli/Cargo.lock
!ballista-cli/Cargo.lock

rusty-tags.vi
.history
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,23 @@ Here are some of the projects known to use DataFusion:
- [Cloudfuse Buzz](https://github.com/cloudfuse-io/buzz-rust)
- [CnosDB](https://github.com/cnosdb/cnosdb) Open Source Distributed Time Series Database
- [Cube Store](https://github.com/cube-js/cube.js/tree/master/rust)
- [Dask SQL](https://github.com/dask-contrib/dask-sql) Distributed SQL query engine in Python
- [datafusion-tui](https://github.com/datafusion-contrib/datafusion-tui) Text UI for DataFusion
- [delta-rs](https://github.com/delta-io/delta-rs) Native Rust implementation of Delta Lake
- [Flock](https://github.com/flock-lab/flock)
- [InfluxDB IOx](https://github.com/influxdata/influxdb_iox) Time Series Database
- [Parseable](https://github.com/parseablehq/parseable) Log storage and observability platform
- [qv](https://github.com/timvw/qv) Quickly view your data
- [ROAPI](https://github.com/roapi/roapi)
- [Seafowl](https://github.com/splitgraph/seafowl) CDN-friendly analytical database
- [Tensorbase](https://github.com/tensorbase/tensorbase)
- [VegaFusion](https://vegafusion.io/) Server-side acceleration for the [Vega](https://vega.github.io/) visualization grammar

(if you know of another project, please submit a PR to add a link!)

## Example Usage

Please see [example usage](https://arrow.apache.org/datafusion/user-guide/example-usage.html) to find how to use DataFusion.
Please see the [example usage](https://arrow.apache.org/datafusion/user-guide/example-usage.html) in the user guide and the [datafusion-examples](https://github.com/apache/arrow-datafusion/tree/master/datafusion-examples) crate for more information on how to use DataFusion.

## Roadmap

Expand Down
5 changes: 4 additions & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
[package]
name = "datafusion-benchmarks"
description = "DataFusion Benchmarks"
version = "12.0.0"
version = "13.0.0"
edition = "2021"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
homepage = "https://github.com/apache/arrow-datafusion"
Expand All @@ -32,11 +32,14 @@ simd = ["datafusion/simd"]
snmalloc = ["snmalloc-rs"]

[dependencies]
arrow = "24.0.0"
datafusion = { path = "../datafusion/core" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
object_store = "0.5.0"
parquet = "24.0.0"
rand = "0.8.4"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
Expand Down
34 changes: 34 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,37 @@ h2o groupby query 1 took 1669 ms
[1]: http://www.tpc.org/tpch/
[2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
## Parquet filter pushdown benchmarks
This is a set of benchmarks for testing and verifying performance of parquet filter pushdown. The queries are executed on
a synthetic dataset generated during the benchmark execution and designed to simulate web server access logs.
```base
cargo run --release --bin parquet_filter_pushdown -- --path ./data --scale-factor 1.0
```
This will generate the synthetic dataset at `./data/logs.parquet`. The size of the dataset can be controlled through the `size_factor`
(with the default value of `1.0` generating a ~1GB parquet file).
For each filter we will run the query using different `ParquetScanOption` settings.
Example run:
```
Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data", batch_size: 8192, scale_factor: 1.0 }
Generated test dataset with 10699521 rows
Executing with filter 'request_method = Utf8("GET")'
Using scan options ParquetScanOptions { pushdown_filters: false, reorder_predicates: false, enable_page_index: false }
Iteration 0 returned 10699521 rows in 1303 ms
Iteration 1 returned 10699521 rows in 1288 ms
Iteration 2 returned 10699521 rows in 1266 ms
Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: true, enable_page_index: true }
Iteration 0 returned 1781686 rows in 1970 ms
Iteration 1 returned 1781686 rows in 2002 ms
Iteration 2 returned 1781686 rows in 1988 ms
Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: false, enable_page_index: true }
Iteration 0 returned 1781686 rows in 1940 ms
Iteration 1 returned 1781686 rows in 1986 ms
Iteration 2 returned 1781686 rows in 1947 ms
...
```
6 changes: 6 additions & 0 deletions benchmarks/expected-plans/q1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST
Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS count_order
Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2))CAST(lineitem.l_discount AS Decimal128(23, 2))lineitem.l_discountDecimal128(Some(100),23,2)CAST(lineitem.l_extendedprice AS Decimal128(38, 4))lineitem.l_extendedprice AS lineitem.l_extendedprice * Decimal128(Some(100),23,2) - lineitem.l_discount) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(CAST(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2))CAST(lineitem.l_discount AS Decimal128(23, 2))lineitem.l_discountDecimal128(Some(100),23,2)CAST(lineitem.l_extendedprice AS Decimal128(38, 4))lineitem.l_extendedprice AS lineitem.l_extendedprice * Decimal128(Some(100),23,2) - lineitem.l_discount AS Decimal128(38, 6)) * CAST(Decimal128(Some(100),23,2) + CAST(lineitem.l_tax AS Decimal128(23, 2)) AS Decimal128(38, 6))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]]
Projection: CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2))CAST(lineitem.l_discount AS Decimal128(23, 2))lineitem.l_discountDecimal128(Some(100),23,2)CAST(lineitem.l_extendedprice AS Decimal128(38, 4))lineitem.l_extendedprice, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
Filter: lineitem.l_shipdate <= Date32("10471")
TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate]
12 changes: 12 additions & 0 deletions benchmarks/expected-plans/q10.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Sort: revenue DESC NULLS FIRST
Projection: customer.c_custkey, customer.c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone, customer.c_comment
Aggregate: groupBy=[[customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Inner Join: customer.c_nationkey = nation.n_nationkey
Inner Join: orders.o_orderkey = lineitem.l_orderkey
Inner Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment]
Filter: orders.o_orderdate >= Date32("8674") AND orders.o_orderdate < Date32("8766")
TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate]
Filter: lineitem.l_returnflag = Utf8("R")
TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag]
TableScan: nation projection=[n_nationkey, n_name]
19 changes: 19 additions & 0 deletions benchmarks/expected-plans/q11.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Sort: value DESC NULLS FIRST
Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 17)) > __sq_1.__value
CrossJoin:
Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_nationkey]
Filter: nation.n_name = Utf8("GERMANY")
TableScan: nation projection=[n_nationkey, n_name]
Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 17)) * Decimal128(Some(10000000000000),38,17) AS __value, alias=__sq_1
Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_nationkey]
Filter: nation.n_name = Utf8("GERMANY")
TableScan: nation projection=[n_nationkey, n_name]
7 changes: 7 additions & 0 deletions benchmarks/expected-plans/q12.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Sort: lineitem.l_shipmode ASC NULLS LAST
Projection: lineitem.l_shipmode, SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count
Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
Inner Join: lineitem.l_orderkey = orders.o_orderkey
Filter: lineitem.l_shipmode IN ([Utf8("MAIL"), Utf8("SHIP")]) AND lineitem.l_commitdate < lineitem.l_receiptdate AND lineitem.l_shipdate < lineitem.l_commitdate AND lineitem.l_receiptdate >= Date32("8766") AND lineitem.l_receiptdate < Date32("9131")
TableScan: lineitem projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode]
TableScan: orders projection=[o_orderkey, o_orderpriority]
11 changes: 11 additions & 0 deletions benchmarks/expected-plans/q13.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC NULLS FIRST
Projection: c_orders.c_count, COUNT(UInt8(1)) AS custdist
Aggregate: groupBy=[[c_orders.c_count]], aggr=[[COUNT(UInt8(1))]]
Projection: c_orders.COUNT(orders.o_orderkey) AS c_count, alias=c_orders
Projection: c_orders.COUNT(orders.o_orderkey), alias=c_orders
Projection: COUNT(orders.o_orderkey), alias=c_orders
Aggregate: groupBy=[[customer.c_custkey]], aggr=[[COUNT(orders.o_orderkey)]]
Left Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey]
Filter: orders.o_comment NOT LIKE Utf8("%special%requests%")
TableScan: orders projection=[o_orderkey, o_custkey, o_comment]
Loading

0 comments on commit fe582a7

Please sign in to comment.