Skip to content

Commit

Permalink
feat(arena): optimize data frame deserialization (#453)
Browse files Browse the repository at this point in the history
* feat(arena): optimize data frame deserialization

* fix: no serialization for response
  • Loading branch information
gangliao authored Jan 27, 2022
1 parent 1e355f9 commit ee7df97
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 49 deletions.
30 changes: 12 additions & 18 deletions flock-function/src/aws/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,11 @@ pub async fn handler(
let (input, status) = prepare_data_sources(ctx, arena, event).await?;

if status == HashAggregateStatus::Processed {
let info = format!("[Ok] Function {}: data is already processed.", ctx.name);
info!("{}", info);
return Ok(json!({ "response": info }));
info!("[Ok] Function {}: data is already processed.", ctx.name);
return Ok(Value::Null);
} else if status == HashAggregateStatus::NotReady {
let info = format!("[Ok] Function {}: data aggregation is not ready.", ctx.name);
info!("{}", info);
return Ok(json!({ "response": info }));
info!("[Ok] Function {}: data aggregation is not ready.", ctx.name);
return Ok(Value::Null);
}

let output = collect(ctx, input).await?;
Expand Down Expand Up @@ -181,7 +179,8 @@ async fn prepare_data_sources(
if status == HashAggregateStatus::Ready {
info!("Received all data packets for the window: {:?}", window_id);
arena
.take_batches(&window_id)
.take(&window_id)
.await?
.into_iter()
.for_each(|b| input.push(b));
PROCESSED_WINDOWS.lock().unwrap().insert(window_id);
Expand Down Expand Up @@ -223,7 +222,8 @@ async fn prepare_data_sources(
if arena.is_complete(&window_id) {
info!("Received all data packets for the window: {:?}", window_id);
arena
.take_batches(&window_id)
.take(&window_id)
.await?
.into_iter()
.for_each(|b| input.push(b));
status = HashAggregateStatus::Ready;
Expand Down Expand Up @@ -287,7 +287,7 @@ async fn invoke_next_functions(
.write(sink_type.clone(), DataSinkFormat::SerdeBinary)
.await
} else {
Ok(json!({ "response": "No data to sink." }))
Ok(Value::Null)
}
}
CloudFunction::Lambda(group_name) => {
Expand Down Expand Up @@ -352,9 +352,7 @@ async fn invoke_next_functions(
);
lambda::invoke_function(group_name, &invocation_type, Some(bytes.into())).await?;
}
Ok(json!({
"response": format!("next function: {}", group_name)
}))
Ok(Value::Null)
}
CloudFunction::Group((group_name, _)) => {
if !ctx.is_shuffling().await? {
Expand Down Expand Up @@ -419,9 +417,7 @@ async fn invoke_next_functions(

futures::future::join_all(tasks).await;

Ok(json!({
"response": format!("next function group: {}", group_name)
}))
Ok(Value::Null)
} else {
let output = Arc::new(output);
let mut rng = StdRng::seed_from_u64(0xDEAD); // Predictable RNG clutch
Expand Down Expand Up @@ -533,9 +529,7 @@ async fn invoke_next_functions(
.collect::<Vec<tokio::task::JoinHandle<Result<()>>>>();
futures::future::join_all(tasks).await;

Ok(json!({
"response": format!("next function group: {}", group_name)
}))
Ok(Value::Null)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion flock-function/src/aws/nexmark/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ pub async fn handler(ctx: &mut ExecutionContext, payload: Payload) -> Result<Val
_ => unimplemented!(),
};

Ok(json!({"name": &ctx.name, "type": "nexmark_bench".to_string()}))
Ok(Value::Null)
}
123 changes: 93 additions & 30 deletions flock/src/runtime/arena/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
mod bitmap;
pub use bitmap::Bitmap;

use crate::encoding::Encoding;
use crate::error::{FlockError, Result};
use crate::runtime::payload::Payload;
use crate::runtime::payload::{DataFrame, Payload};
use crate::transmute::*;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow_flight::utils::flight_data_to_arrow_batch;
use datafusion::arrow_flight::FlightData;
use hashbrown::HashMap;
use rayon::prelude::*;
use std::ops::{Deref, DerefMut};
use tokio::task::JoinHandle;

type QueryId = String;
type ShuffleId = usize;
Expand Down Expand Up @@ -61,29 +67,38 @@ pub struct Arena(HashMap<WindowId, WindowSession>);
pub struct WindowSession {
/// The number of data fragments in the window.
/// [`WindowSession::size`] equals to [`Uuid::seq_len`].
pub size: usize,
/// Aggregate record batches for the first relation.
pub r1_records: Vec<Vec<RecordBatch>>,
/// Aggregate record batches for the second relation.
pub r2_records: Vec<Vec<RecordBatch>>,
pub size: usize,
/// Aggregate the encoded data frames for the first relation.
/// https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/
pub r1_flight_data: Vec<Vec<DataFrame>>,
/// The schema of the first relation.
pub r1_schema: Vec<u8>,
/// Aggregate the encoded data frames for the second relation.
/// https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/
pub r2_flight_data: Vec<Vec<DataFrame>>,
/// The schema of the second relation.
pub r2_schema: Vec<u8>,
/// Bitmap indicating the data existence in the window.
pub bitmap: Bitmap,
pub bitmap: Bitmap,
/// The compression method.
pub encoding: Encoding,
}

impl WindowSession {
/// Return the schema of data fragments in the temporal window.
pub fn schema(&self) -> Result<(SchemaRef, Option<SchemaRef>)> {
if self.r1_records.is_empty() || self.r1_records[0].is_empty() {
if self.r1_schema.is_empty() {
return Err(FlockError::Internal(
"Record batches are empty.".to_string(),
));
}
if !self.r2_records.is_empty() && !self.r2_records[0].is_empty() {
Ok((self.r1_records[0][0].schema(), None))

if self.r2_schema.is_empty() {
Ok((schema_from_bytes(&self.r1_schema)?, None))
} else {
Ok((
self.r1_records[0][0].schema(),
Some(self.r2_records[0][0].schema()),
schema_from_bytes(&self.r1_schema)?,
Some(schema_from_bytes(&self.r2_schema)?),
))
}
}
Expand All @@ -95,12 +110,59 @@ impl Arena {
Arena(HashMap::<WindowId, WindowSession>::new())
}

/// Get the data fragments in the temporal window via the key.
pub fn take_batches(&mut self, window_id: &WindowId) -> Vec<Vec<Vec<RecordBatch>>> {
/// Take a window from the arena.
pub async fn take(&mut self, window_id: &WindowId) -> Result<Vec<Vec<Vec<RecordBatch>>>> {
let to_batches = |df: Vec<DataFrame>, schema: SchemaRef| -> Vec<RecordBatch> {
df.into_par_iter()
.map(|d| {
flight_data_to_arrow_batch(
&FlightData {
data_body: d.body,
data_header: d.header,
app_metadata: vec![],
flight_descriptor: None,
},
schema.clone(),
&[],
)
.unwrap()
})
.collect()
};

if let Some(window) = (*self).remove(window_id) {
vec![window.r1_records, window.r2_records]
let (schema1, schema2) = window.schema()?;

let mut tasks: Vec<JoinHandle<Vec<Vec<RecordBatch>>>> = vec![];

let encoding = window.encoding.clone();
tasks.push(tokio::spawn(async move {
window
.r1_flight_data
.into_par_iter()
.map(|d| to_batches(unmarshal(d, encoding.clone()), schema1.clone()))
.collect()
}));

if schema2.is_some() {
let encoding = window.encoding.clone();
let schema2 = schema2.unwrap();
tasks.push(tokio::spawn(async move {
window
.r2_flight_data
.into_par_iter()
.map(|d| to_batches(unmarshal(d, encoding.clone()), schema2.clone()))
.collect()
}));
}

Ok(futures::future::join_all(tasks)
.await
.into_iter()
.map(|r| r.unwrap())
.collect())
} else {
vec![vec![], vec![]]
Ok(vec![vec![], vec![]])
}
}

Expand All @@ -112,7 +174,7 @@ impl Arena {
/// Return true if the temporal window is empty.
pub fn is_complete(&self, window_id: &WindowId) -> bool {
self.get(window_id)
.map(|window| window.size == window.r1_records.len())
.map(|window| window.size == window.r1_flight_data.len())
.unwrap_or(false)
}

Expand All @@ -132,12 +194,11 @@ impl Arena {
Some(window) => {
assert!(uuid.seq_len == window.size);
if !window.bitmap.is_set(uuid.seq_num) {
let (r1, r2) = payload.to_record_batch();
window.r1_records.push(r1);
window.r2_records.push(r2);
assert!(window.r1_records.len() == window.r2_records.len());
window.r1_flight_data.push(payload.data);
window.r2_flight_data.push(payload.data2);
assert!(window.r1_flight_data.len() == window.r2_flight_data.len());
window.bitmap.set(uuid.seq_num);
if window.size == window.r1_records.len() {
if window.size == window.r1_flight_data.len() {
HashAggregateStatus::Ready
} else {
HashAggregateStatus::NotReady
Expand All @@ -147,12 +208,14 @@ impl Arena {
}
}
None => {
let (r1, r2) = payload.to_record_batch();
let mut window = WindowSession {
size: uuid.seq_len,
r1_records: vec![r1],
r2_records: vec![r2],
bitmap: Bitmap::new(uuid.seq_len + 1), // Starts from 1.
size: uuid.seq_len,
r1_flight_data: vec![payload.data],
r2_flight_data: vec![payload.data2],
r1_schema: payload.schema,
r2_schema: payload.schema2,
bitmap: Bitmap::new(uuid.seq_len + 1), // Starts from 1.
encoding: payload.encoding,
};
// SEQ_NUM is used to indicate the data existence in the window via bitmap.
window.bitmap.set(uuid.seq_num);
Expand Down Expand Up @@ -243,12 +306,12 @@ mod tests {

if let Some(window) = (*arena).get(&window_id) {
assert_eq!(8, window.size);
assert_eq!(8, window.r1_records.len());
assert_eq!(8, window.r1_flight_data.len());
(0..8).for_each(|i| assert!(window.bitmap.is_set(i + 1)));
}

assert_eq!(8, arena.take_batches(&window_id)[0].len());
assert_eq!(0, arena.take_batches(&("no exists".to_owned(), 0))[0].len());
assert_eq!(8, arena.take(&window_id).await?[0].len());
assert_eq!(0, arena.take(&("no exists".to_owned(), 0)).await?[0].len());

Ok(())
}
Expand Down

0 comments on commit ee7df97

Please sign in to comment.