Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: update ballista client front page #1171

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,25 @@ use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// create DataFusion SessionContext with ballista standalone cluster started
let ctx = SessionContext::standalone();
// create SessionContext with ballista support
// standalone context will start all required
// ballista infrastructure in the background as well
let ctx = SessionContext::standalone().await?;

// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// everything else remains the same

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new())
.await?;

// execute and print results
df.show().await?;
Ok(())
// create a plan to run a SQL query
let df = ctx
.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100")
.await?;

// execute and print results
df.show().await?;
Ok(())
}
```

Expand Down
130 changes: 84 additions & 46 deletions ballista/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,69 @@

# Ballista: Distributed Scheduler for Apache Arrow DataFusion

Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and
DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and
Java) to be supported as first-class citizens without paying a penalty for serialization costs.
Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.

The foundational technologies in Ballista are:
![logo](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/ballista-logo.png?raw=true)

- [Apache Arrow](https://arrow.apache.org/) memory model and compute kernels for efficient processing of data.
- [Apache Arrow Flight Protocol](https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/) for efficient
data transfer between processes.
- [Google Protocol Buffers](https://developers.google.com/protocol-buffers) for serializing query plans.
- [Docker](https://www.docker.com/) for packaging up executors along with user-defined code.
Ballista is a distributed query execution engine that enhances [Apache DataFusion](https://github.com/apache/datafusion) by enabling the parallelized execution of workloads across multiple nodes in a distributed environment.

Ballista can be deployed as a standalone cluster and also supports [Kubernetes](https://kubernetes.io/). In either
case, the scheduler can be configured to use [etcd](https://etcd.io/) as a backing store to (eventually) provide
redundancy in the case of a scheduler failing.
Existing DataFusion application:

## Rust Version Compatibility
```rust,no_run
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// datafusion context
let ctx = SessionContext::new();

This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.
// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

// execute and print results
df.show().await?;
Ok(())
}
```

can be distributed with few lines changed:

```rust,no_run
use ballista::prelude::*;
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// create SessionContext with ballista support
// standalone context will start all required
// ballista infrastructure in the background as well
let ctx = SessionContext::standalone().await?;

// everything else remains the same

// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new())
.await?;

// create a plan to run a SQL query
let df = ctx
.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100")
.await?;

// execute and print results
df.show().await?;
Ok(())
}
```

## Starting a cluster

There are numerous ways to start a Ballista cluster, including support for Docker and
Kubernetes. For full documentation, refer to the deployment section of the
[Ballista User Guide](https://datafusion.apache.org/ballista/user-guide/deployment/)
![architecture](https://github.com/apache/datafusion-ballista/blob/main/docs/source/contributors-guide/ballista_architecture.excalidraw.svg?raw=true)

A simple way to start a local cluster for testing purposes is to use cargo to install
the scheduler and executor crates.
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.

```bash
cargo install --locked ballista-scheduler
Expand All @@ -61,35 +96,27 @@ RUST_LOG=info ballista-scheduler

The scheduler will bind to port `50050` by default.

Next, start an executor processes in a new terminal session with the specified concurrency
level.
Next, start an executor processes in a new terminal session with the specified concurrency level.

```bash
RUST_LOG=info ballista-executor -c 4
```

The executor will bind to port `50051` by default. Additional executors can be started by
manually specifying a bind port. For example:
The executor will bind to port `50051` by default. Additional executors can be started by manually specifying a bind port.

```bash
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
```
For full documentation, refer to the deployment section of the
[Ballista User Guide](https://datafusion.apache.org/ballista/user-guide/deployment/)

## Executing a query
## Executing a Query

Ballista provides a `BallistaContext` as a starting point for creating queries. DataFrames can be created
by invoking the `read_csv`, `read_parquet`, and `sql` methods.
Ballista provides a custom `SessionContext` as a starting point for creating queries. DataFrames can be created by invoking the `read_csv`, `read_parquet`, and `sql` methods.

To build a simple ballista example, run the following command to add the dependencies to your `Cargo.toml` file:

```bash
cargo add ballista datafusion tokio
```

The following example runs a simple aggregate SQL query against a Parquet file (`yellow_tripdata_2022-01.parquet`) from the
[New York Taxi and Limousine Commission](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
data set. Download the file and add it to the `testdata` folder before running the example.

```rust,no_run
use ballista::prelude::*;
use datafusion::common::Result;
Expand All @@ -99,7 +126,6 @@ use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, aver
#[tokio::main]
async fn main() -> Result<()> {


// connect to Ballista scheduler
let ctx = SessionContext::remote("df://localhost:50050").await?;

Expand All @@ -121,13 +147,6 @@ async fn main() -> Result<()> {
)?
.sort(vec![col("passenger_count").sort(true, true)])?;

// this is equivalent to the following SQL
// SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
// FROM tripdata
// GROUP BY passenger_count
// ORDER BY passenger_count

// print the results
df.show().await?;

Ok(())
Expand All @@ -146,12 +165,31 @@ The output should look similar to the following table.
| 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 |
| 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 |
| 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 |
| 5 | -52 | 668 | 12.160378472086954 | 624289.51 |
| 6 | -52 | 252.5 | 12.576583325529857 | 402916 |
| 7 | 7 | 79 | 61.77777777777778 | 556 |
| 8 | 8.3 | 115 | 79.9125 | 639.3 |
| 9 | 9.3 | 96.5 | 65.26666666666667 | 195.8 |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
```

More [examples](../../examples/examples/) can be found in the arrow-ballista repository.

## Performance

We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations.

These are benchmarks derived from TPC-H and not official TPC-H benchmarks. These results are from running individual queries at scale factor 100 (100 GB) on a single node with a single executor and 8 concurrent tasks.

### Overall Speedup

The overall speedup is 2.9x

![benchmarks](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/tpch_allqueries.png?raw=true)

### Per Query Comparison

![benchmarks](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/tpch_queries_compare.png?raw=true)

### Relative Speedup

![benchmarks](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/tpch_queries_speedup_rel.png?raw=true)

### Absolute Speedup

![benchmarks](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/tpch_queries_speedup_abs.png?raw=true)
Loading