diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 1f23a2a42e2a..bedc0973e6ad 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -30,6 +30,7 @@ build = "build.rs" simd = ["datafusion/simd"] [dependencies] +ahash = "0.7" async-trait = "0.1.36" futures = "0.3" log = "0.4" diff --git a/ballista/rust/core/src/execution_plans/query_stage.rs b/ballista/rust/core/src/execution_plans/query_stage.rs index 233dee5b9b52..ef415eeb4fcf 100644 --- a/ballista/rust/core/src/execution_plans/query_stage.rs +++ b/ballista/rust/core/src/execution_plans/query_stage.rs @@ -20,8 +20,9 @@ //! a query stage either forms the input of another query stage or can be the final result of //! a query. +use std::iter::Iterator; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Instant; use std::{any::Any, pin::Pin}; @@ -29,13 +30,22 @@ use crate::error::BallistaError; use crate::memory_stream::MemoryStream; use crate::utils; +use crate::serde::scheduler::PartitionStats; use async_trait::async_trait; -use datafusion::arrow::array::{ArrayRef, StringBuilder}; +use datafusion::arrow::array::{ + Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, + UInt64Builder, +}; +use datafusion::arrow::compute::take; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::ipc::writer::FileWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; +use datafusion::physical_plan::hash_join::create_hashes; use datafusion::physical_plan::{ExecutionPlan, Partitioning, RecordBatchStream}; +use futures::StreamExt; use log::info; +use std::fs::File; use uuid::Uuid; /// QueryStageExec represents a section of a query plan that has consistent partitioning and @@ -133,7 +143,6 @@ impl ExecutionPlan for QueryStageExec { None => { path.push(&format!("{}", partition)); std::fs::create_dir_all(&path)?; - path.push("data.arrow"); let path = path.to_str().unwrap(); info!("Writing results to {}", path); @@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec { stats ); - let schema = Arc::new(Schema::new(vec![ - Field::new("path", DataType::Utf8, false), - stats.arrow_struct_repr(), - ])); + let schema = result_schema(); // build result set with summary of the partition execution status - let mut c0 = StringBuilder::new(1); - c0.append_value(&path).unwrap(); - let path: ArrayRef = Arc::new(c0.finish()); + let mut part_builder = UInt32Builder::new(1); + part_builder.append_value(partition as u32)?; + let part: ArrayRef = Arc::new(part_builder.finish()); + + let mut path_builder = StringBuilder::new(1); + path_builder.append_value(&path)?; + let path: ArrayRef = Arc::new(path_builder.finish()); let stats: ArrayRef = stats .to_arrow_arrayref() .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; - let batch = RecordBatch::try_new(schema.clone(), vec![path, stats]) + let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats]) .map_err(DataFusionError::ArrowError)?; Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?)) } - Some(Partitioning::Hash(_, _)) => { - //TODO re-use code from RepartitionExec to split each batch into - // partitions and write to one IPC file per partition - // See https://github.com/apache/arrow-datafusion/issues/456 - Err(DataFusionError::NotImplemented( - "Shuffle partitioning not implemented yet".to_owned(), - )) + Some(Partitioning::Hash(exprs, n)) => { + let num_output_partitions = *n; + + // we won't necessary produce output for every possible partition, so we + // create writers on demand + let mut writers: Vec>>> = vec![]; + for _ in 0..num_output_partitions { + writers.push(None); + } + + let hashes_buf = &mut vec![]; + let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0); + while let Some(result) = stream.next().await { + let input_batch = result?; + let arrays = exprs + .iter() + .map(|expr| { + Ok(expr + .evaluate(&input_batch)? + .into_array(input_batch.num_rows())) + }) + .collect::>>()?; + hashes_buf.clear(); + hashes_buf.resize(arrays[0].len(), 0); + // Hash arrays and compute buckets based on number of partitions + let hashes = create_hashes(&arrays, &random_state, hashes_buf)?; + let mut indices = vec![vec![]; num_output_partitions]; + for (index, hash) in hashes.iter().enumerate() { + indices[(*hash % num_output_partitions as u64) as usize] + .push(index as u64) + } + for (num_output_partition, partition_indices) in + indices.into_iter().enumerate() + { + let indices = partition_indices.into(); + // Produce batches based on indices + let columns = input_batch + .columns() + .iter() + .map(|c| { + take(c.as_ref(), &indices, None).map_err(|e| { + DataFusionError::Execution(e.to_string()) + }) + }) + .collect::>>>()?; + + let output_batch = + RecordBatch::try_new(input_batch.schema(), columns)?; + + // write batch out + match &writers[num_output_partition] { + Some(w) => { + let mut w = w.lock().unwrap(); + w.write(&output_batch)?; + } + None => { + let mut path = path.clone(); + path.push(&format!("{}", partition)); + std::fs::create_dir_all(&path)?; + + path.push("data.arrow"); + let path = path.to_str().unwrap(); + info!("Writing results to {}", path); + + let mut writer = + ShuffleWriter::new(path, stream.schema().as_ref())?; + + writer.write(&output_batch)?; + writers[num_output_partition] = + Some(Arc::new(Mutex::new(writer))); + } + } + } + } + + // build metadata result batch + let num_writers = writers.iter().filter(|w| w.is_some()).count(); + let mut partition_builder = UInt32Builder::new(num_writers); + let mut path_builder = StringBuilder::new(num_writers); + let mut num_rows_builder = UInt64Builder::new(num_writers); + let mut num_batches_builder = UInt64Builder::new(num_writers); + let mut num_bytes_builder = UInt64Builder::new(num_writers); + + for (i, w) in writers.iter().enumerate() { + match w { + Some(w) => { + let mut w = w.lock().unwrap(); + w.finish()?; + path_builder.append_value(w.path())?; + partition_builder.append_value(i as u32)?; + num_rows_builder.append_value(w.num_rows)?; + num_batches_builder.append_value(w.num_batches)?; + num_bytes_builder.append_value(w.num_bytes)?; + } + None => {} + } + } + + // build arrays + let partition_num: ArrayRef = Arc::new(partition_builder.finish()); + let path: ArrayRef = Arc::new(path_builder.finish()); + let field_builders: Vec> = vec![ + Box::new(num_rows_builder), + Box::new(num_batches_builder), + Box::new(num_bytes_builder), + ]; + let mut stats_builder = StructBuilder::new( + PartitionStats::default().arrow_struct_fields(), + field_builders, + ); + for _ in 0..num_writers { + stats_builder.append(true)?; + } + let stats = Arc::new(stats_builder.finish()); + + // build result batch containing metadata + let schema = result_schema(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![partition_num, path, stats], + ) + .map_err(DataFusionError::ArrowError)?; + + Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?)) } _ => Err(DataFusionError::Execution( @@ -185,10 +312,69 @@ impl ExecutionPlan for QueryStageExec { } } +fn result_schema() -> SchemaRef { + let stats = PartitionStats::default(); + Arc::new(Schema::new(vec![ + Field::new("partition", DataType::UInt32, false), + Field::new("path", DataType::Utf8, false), + stats.arrow_struct_repr(), + ])) +} + +struct ShuffleWriter { + path: String, + writer: FileWriter, + num_batches: u64, + num_rows: u64, + num_bytes: u64, +} + +impl ShuffleWriter { + fn new(path: &str, schema: &Schema) -> Result { + let file = File::create(path) + .map_err(|e| { + BallistaError::General(format!( + "Failed to create partition file at {}: {:?}", + path, e + )) + }) + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + Ok(Self { + num_batches: 0, + num_rows: 0, + num_bytes: 0, + path: path.to_owned(), + writer: FileWriter::try_new(file, schema)?, + }) + } + + fn write(&mut self, batch: &RecordBatch) -> Result<()> { + self.writer.write(batch)?; + self.num_batches += 1; + self.num_rows += batch.num_rows() as u64; + let num_bytes: usize = batch + .columns() + .iter() + .map(|array| array.get_array_memory_size()) + .sum(); + self.num_bytes += num_bytes as u64; + Ok(()) + } + + fn finish(&mut self) -> Result<()> { + self.writer.finish().map_err(DataFusionError::ArrowError) + } + + fn path(&self) -> &str { + &self.path + } +} + #[cfg(test)] mod tests { use super::*; use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array}; + use datafusion::physical_plan::expressions::Column; use datafusion::physical_plan::memory::MemoryExec; use tempfile::TempDir; @@ -207,17 +393,17 @@ mod tests { let batches = utils::collect_stream(&mut stream) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; - assert!(batches.len() == 1); + assert_eq!(1, batches.len()); let batch = &batches[0]; - assert_eq!(2, batch.num_columns()); + assert_eq!(3, batch.num_columns()); assert_eq!(1, batch.num_rows()); - let path = batch.columns()[0] + let path = batch.columns()[1] .as_any() .downcast_ref::() .unwrap(); let file = path.value(0); assert!(file.ends_with("data.arrow")); - let stats = batch.columns()[1] + let stats = batch.columns()[2] .as_any() .downcast_ref::() .unwrap(); @@ -231,6 +417,41 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_partitioned() -> Result<()> { + let input_plan = create_input_plan()?; + let work_dir = TempDir::new()?; + let query_stage = QueryStageExec::try_new( + "jobOne".to_owned(), + 1, + input_plan, + work_dir.into_path().to_str().unwrap().to_owned(), + Some(Partitioning::Hash(vec![Arc::new(Column::new("a"))], 2)), + )?; + let mut stream = query_stage.execute(0).await?; + let batches = utils::collect_stream(&mut stream) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + assert_eq!(1, batches.len()); + let batch = &batches[0]; + assert_eq!(3, batch.num_columns()); + assert_eq!(2, batch.num_rows()); + let stats = batch.columns()[2] + .as_any() + .downcast_ref::() + .unwrap(); + let num_rows = stats + .column_by_name("num_rows") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(2, num_rows.value(0)); + assert_eq!(2, num_rows.value(1)); + + Ok(()) + } + fn create_input_plan() -> Result> { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::UInt32, true), diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index b502c325595f..faa38e3e8741 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -134,7 +134,8 @@ impl PartitionStats { false, ) } - fn arrow_struct_fields(self) -> Vec { + + pub fn arrow_struct_fields(self) -> Vec { vec![ Field::new("num_rows", DataType::UInt64, false), Field::new("num_batches", DataType::UInt64, false), diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index af6969c43cbd..b4336866ef8a 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -390,7 +390,8 @@ impl ColumnarValue { } } - fn into_array(self, num_rows: usize) -> ArrayRef { + /// Convert a columnar value into an ArrayRef + pub fn into_array(self, num_rows: usize) -> ArrayRef { match self { ColumnarValue::Array(array) => array, ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),