Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista: Implement map-side shuffle #543

Merged
merged 5 commits into from
Jun 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ build = "build.rs"
simd = ["datafusion/simd"]

[dependencies]
ahash = "0.7"
async-trait = "0.1.36"
futures = "0.3"
log = "0.4"
Expand Down
265 changes: 243 additions & 22 deletions ballista/rust/core/src/execution_plans/query_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,32 @@
//! a query stage either forms the input of another query stage or can be the final result of
//! a query.

use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use std::{any::Any, pin::Pin};

use crate::error::BallistaError;
use crate::memory_stream::MemoryStream;
use crate::utils;

use crate::serde::scheduler::PartitionStats;
use async_trait::async_trait;
use datafusion::arrow::array::{ArrayRef, StringBuilder};
use datafusion::arrow::array::{
Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
UInt64Builder,
};
use datafusion::arrow::compute::take;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_join::create_hashes;
use datafusion::physical_plan::{ExecutionPlan, Partitioning, RecordBatchStream};
use futures::StreamExt;
use log::info;
use std::fs::File;
use uuid::Uuid;

/// QueryStageExec represents a section of a query plan that has consistent partitioning and
Expand Down Expand Up @@ -133,7 +143,6 @@ impl ExecutionPlan for QueryStageExec {
None => {
path.push(&format!("{}", partition));
std::fs::create_dir_all(&path)?;

path.push("data.arrow");
let path = path.to_str().unwrap();
info!("Writing results to {}", path);
Expand All @@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
stats
);

let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
stats.arrow_struct_repr(),
]));
let schema = result_schema();

// build result set with summary of the partition execution status
let mut c0 = StringBuilder::new(1);
c0.append_value(&path).unwrap();
let path: ArrayRef = Arc::new(c0.finish());
let mut part_builder = UInt32Builder::new(1);
part_builder.append_value(partition as u32)?;
let part: ArrayRef = Arc::new(part_builder.finish());

let mut path_builder = StringBuilder::new(1);
path_builder.append_value(&path)?;
let path: ArrayRef = Arc::new(path_builder.finish());

let stats: ArrayRef = stats
.to_arrow_arrayref()
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
.map_err(DataFusionError::ArrowError)?;

Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
}

Some(Partitioning::Hash(_, _)) => {
//TODO re-use code from RepartitionExec to split each batch into
// partitions and write to one IPC file per partition
// See https://github.com/apache/arrow-datafusion/issues/456
Err(DataFusionError::NotImplemented(
"Shuffle partitioning not implemented yet".to_owned(),
))
Some(Partitioning::Hash(exprs, n)) => {
Copy link
Contributor

@edrevo edrevo Jun 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just thinking out loud without any data to back me up, but maybe it is worth special-casing when n==1, so we don't actually perform the hash of everything, since all of the data is going to end up in the same partition anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I filed https://github.com/apache/arrow-datafusion/issues/626 for this. I'd like to get the basic end-to-end shuffle mechanism working before we start optimizing too much.

let num_output_partitions = *n;

// we won't necessary produce output for every possible partition, so we
// create writers on demand
let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Arc + Mutex is unnecessary if you use .iter_mut() when necessary

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried changing this but ran into ownership issues. I'll go ahead and merge and perhaps someone can help me with fixing this as a follow up PR.

for _ in 0..num_output_partitions {
writers.push(None);
}

let hashes_buf = &mut vec![];
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
while let Some(result) = stream.next().await {
let input_batch = result?;
let arrays = exprs
.iter()
.map(|expr| {
Ok(expr
.evaluate(&input_batch)?
.into_array(input_batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
hashes_buf.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could reuse the code better at some moment?

hashes_buf.resize(arrays[0].len(), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noob question: is there a guarantee that all recordbatches have at least one element?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There needs to be at least one column based on the expressions in hash repartitioning - which I think should be a prerequisite when doing hash repartitioning - I am not sure whether DataFusion checks on that explicitly when constructing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Hash arrays and compute buckets based on number of partitions
let hashes = create_hashes(&arrays, &random_state, hashes_buf)?;
let mut indices = vec![vec![]; num_output_partitions];
for (index, hash) in hashes.iter().enumerate() {
indices[(*hash % num_output_partitions as u64) as usize]
.push(index as u64)
}
for (num_output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let indices = partition_indices.into();
// Produce batches based on indices
let columns = input_batch
.columns()
.iter()
.map(|c| {
take(c.as_ref(), &indices, None).map_err(|e| {
DataFusionError::Execution(e.to_string())
})
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;

let output_batch =
RecordBatch::try_new(input_batch.schema(), columns)?;

// write batch out
match &writers[num_output_partition] {
Some(w) => {
let mut w = w.lock().unwrap();
w.write(&output_batch)?;
}
None => {
let mut path = path.clone();
path.push(&format!("{}", partition));
std::fs::create_dir_all(&path)?;

path.push("data.arrow");
let path = path.to_str().unwrap();
info!("Writing results to {}", path);

let mut writer =
ShuffleWriter::new(path, stream.schema().as_ref())?;

writer.write(&output_batch)?;
writers[num_output_partition] =
Some(Arc::new(Mutex::new(writer)));
}
}
}
}

// build metadata result batch
let num_writers = writers.iter().filter(|w| w.is_some()).count();
let mut partition_builder = UInt32Builder::new(num_writers);
let mut path_builder = StringBuilder::new(num_writers);
let mut num_rows_builder = UInt64Builder::new(num_writers);
let mut num_batches_builder = UInt64Builder::new(num_writers);
let mut num_bytes_builder = UInt64Builder::new(num_writers);

for (i, w) in writers.iter().enumerate() {
match w {
Some(w) => {
let mut w = w.lock().unwrap();
w.finish()?;
path_builder.append_value(w.path())?;
partition_builder.append_value(i as u32)?;
num_rows_builder.append_value(w.num_rows)?;
num_batches_builder.append_value(w.num_batches)?;
num_bytes_builder.append_value(w.num_bytes)?;
}
None => {}
}
}

// build arrays
let partition_num: ArrayRef = Arc::new(partition_builder.finish());
let path: ArrayRef = Arc::new(path_builder.finish());
let field_builders: Vec<Box<dyn ArrayBuilder>> = vec![
Box::new(num_rows_builder),
Box::new(num_batches_builder),
Box::new(num_bytes_builder),
];
let mut stats_builder = StructBuilder::new(
PartitionStats::default().arrow_struct_fields(),
field_builders,
);
for _ in 0..num_writers {
stats_builder.append(true)?;
}
let stats = Arc::new(stats_builder.finish());

// build result batch containing metadata
let schema = result_schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![partition_num, path, stats],
)
.map_err(DataFusionError::ArrowError)?;

Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
}

_ => Err(DataFusionError::Execution(
Expand All @@ -185,10 +312,69 @@ impl ExecutionPlan for QueryStageExec {
}
}

fn result_schema() -> SchemaRef {
let stats = PartitionStats::default();
Arc::new(Schema::new(vec![
Field::new("partition", DataType::UInt32, false),
Field::new("path", DataType::Utf8, false),
stats.arrow_struct_repr(),
]))
}

struct ShuffleWriter {
path: String,
writer: FileWriter<File>,
num_batches: u64,
num_rows: u64,
num_bytes: u64,
}

impl ShuffleWriter {
fn new(path: &str, schema: &Schema) -> Result<Self> {
let file = File::create(path)
.map_err(|e| {
BallistaError::General(format!(
"Failed to create partition file at {}: {:?}",
path, e
))
})
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Ok(Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
path: path.to_owned(),
writer: FileWriter::try_new(file, schema)?,
})
}

fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows() as u64;
let num_bytes: usize = batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
.sum();
self.num_bytes += num_bytes as u64;
Ok(())
}

fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(DataFusionError::ArrowError)
}

fn path(&self) -> &str {
&self.path
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::memory::MemoryExec;
use tempfile::TempDir;

Expand All @@ -207,17 +393,17 @@ mod tests {
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
assert!(batches.len() == 1);
assert_eq!(1, batches.len());
let batch = &batches[0];
assert_eq!(2, batch.num_columns());
assert_eq!(3, batch.num_columns());
assert_eq!(1, batch.num_rows());
let path = batch.columns()[0]
let path = batch.columns()[1]
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let file = path.value(0);
assert!(file.ends_with("data.arrow"));
let stats = batch.columns()[1]
let stats = batch.columns()[2]
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
Expand All @@ -231,6 +417,41 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_partitioned() -> Result<()> {
let input_plan = create_input_plan()?;
let work_dir = TempDir::new()?;
let query_stage = QueryStageExec::try_new(
"jobOne".to_owned(),
1,
input_plan,
work_dir.into_path().to_str().unwrap().to_owned(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a"))], 2)),
)?;
let mut stream = query_stage.execute(0).await?;
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
assert_eq!(1, batches.len());
let batch = &batches[0];
assert_eq!(3, batch.num_columns());
assert_eq!(2, batch.num_rows());
let stats = batch.columns()[2]
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let num_rows = stats
.column_by_name("num_rows")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(2, num_rows.value(0));
assert_eq!(2, num_rows.value(1));

Ok(())
}

fn create_input_plan() -> Result<Arc<dyn ExecutionPlan>> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::UInt32, true),
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ impl PartitionStats {
false,
)
}
fn arrow_struct_fields(self) -> Vec<Field> {

pub fn arrow_struct_fields(self) -> Vec<Field> {
vec![
Field::new("num_rows", DataType::UInt64, false),
Field::new("num_batches", DataType::UInt64, false),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ impl ColumnarValue {
}
}

fn into_array(self, num_rows: usize) -> ArrayRef {
/// Convert a columnar value into an ArrayRef
pub fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
Expand Down