Skip to content

Commit

Permalink
feat(batch): parallelize exchange source creation (#7125)
Browse files Browse the repository at this point in the history
- Parallelize exchange source creation to speed up sql like `select * from v limit 1` if `v` has so many partitions.

Approved-By: lmatz
Approved-By: liurenjie1024
Approved-By: BugenZhao
Approved-By: xxchan

Co-Authored-By: Dylan Chen <zilin@singularity-data.com>
Co-Authored-By: Dylan <chenzl25@mail2.sysu.edu.cn>
  • Loading branch information
chenzl25 and chenzl25 authored Dec 29, 2022
1 parent 85aab8c commit b8bdfdf
Showing 1 changed file with 89 additions and 78 deletions.
167 changes: 89 additions & 78 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ use crate::execution::local_exchange::LocalExchangeSource;
use crate::executor::ExecutorBuilder;
use crate::task::{BatchTaskContext, TaskId};

pub type ExchangeExecutor<C> = GenericExchangeExecutor<C>;
pub type ExchangeExecutor<C> = GenericExchangeExecutor<DefaultCreateSource, C>;
use super::BatchTaskMetricsWithTaskLabels;
use crate::executor::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor};
pub struct GenericExchangeExecutor<C> {
sources: Vec<ExchangeSourceImpl>,

pub struct GenericExchangeExecutor<CS, C> {
proto_sources: Vec<ProstExchangeSource>,
/// Mock-able CreateSource.
source_creators: Vec<CS>,
context: C,

schema: Schema,
Expand Down Expand Up @@ -123,22 +126,15 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
)?;

ensure!(!node.get_sources().is_empty());
let prost_sources: Vec<ProstExchangeSource> = node.get_sources().to_vec();
let proto_sources: Vec<ProstExchangeSource> = node.get_sources().to_vec();
let source_creators =
vec![DefaultCreateSource::new(source.context().client_pool()); prost_sources.len()];
let mut sources: Vec<ExchangeSourceImpl> = vec![];

for (prost_source, source_creator) in prost_sources.iter().zip_eq(source_creators) {
let source = source_creator
.create_source(source.context.clone(), prost_source)
.await?;
sources.push(source);
}
vec![DefaultCreateSource::new(source.context().client_pool()); proto_sources.len()];

let input_schema: Vec<NodeField> = node.get_input_schema().to_vec();
let fields = input_schema.iter().map(Field::from).collect::<Vec<Field>>();
Ok(Box::new(GenericExchangeExecutor::<C> {
sources,
Ok(Box::new(ExchangeExecutor::<C> {
proto_sources,
source_creators,
context: source.context().clone(),
schema: Schema { fields },
task_id: source.task_id.clone(),
Expand All @@ -148,7 +144,9 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
}
}

impl<C: BatchTaskContext> Executor for GenericExchangeExecutor<C> {
impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> Executor
for GenericExchangeExecutor<CS, C>
{
fn schema(&self) -> &Schema {
&self.schema
}
Expand All @@ -162,14 +160,21 @@ impl<C: BatchTaskContext> Executor for GenericExchangeExecutor<C> {
}
}

impl<C: BatchTaskContext> GenericExchangeExecutor<C> {
impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExecutor<CS, C> {
#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn do_execute(self: Box<Self>) {
let mut stream = select_all(
self.sources
self.proto_sources
.into_iter()
.map(|source| {
data_chunk_stream(source, self.metrics.clone(), self.identity.clone())
.zip_eq(self.source_creators)
.map(|(prost_source, source_creator)| {
Self::data_chunk_stream(
prost_source,
source_creator,
self.context.clone(),
self.metrics.clone(),
self.identity.clone(),
)
})
.collect_vec(),
)
Expand All @@ -180,52 +185,57 @@ impl<C: BatchTaskContext> GenericExchangeExecutor<C> {
yield data_chunk
}
}
}

#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn data_chunk_stream(
mut source: ExchangeSourceImpl,
metrics: Option<BatchTaskMetricsWithTaskLabels>,
identity: String,
) {
// create the collector
let source_id = source.get_task_id();
let counter = if let Some(ref metrics) = metrics {
let mut labels = metrics.task_labels();
let source_stage_id = source_id.stage_id.to_string();
let source_task_id = source_id.stage_id.to_string();
labels.extend_from_slice(&[
identity.as_str(),
source_id.query_id.as_str(),
source_stage_id.as_str(),
source_task_id.as_str(),
]);

Some(
metrics
.metrics
.task_exchange_recv_row_number
.with_label_values(&labels[..]),
)
} else {
// no metrics to collect, no counter
None
};

loop {
if let Some(res) = source.take_data().await? {
if res.cardinality() == 0 {
debug!("Exchange source {:?} output empty chunk.", source);
}

if let Some(ref counter) = counter {
counter.inc_by(res.cardinality().try_into().unwrap());
#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn data_chunk_stream(
prost_source: ProstExchangeSource,
source_creator: CS,
context: C,
metrics: Option<BatchTaskMetricsWithTaskLabels>,
identity: String,
) {
let mut source = source_creator
.create_source(context.clone(), &prost_source)
.await?;
// create the collector
let source_id = source.get_task_id();
let counter = if let Some(ref metrics) = metrics {
let mut labels = metrics.task_labels();
let source_stage_id = source_id.stage_id.to_string();
let source_task_id = source_id.stage_id.to_string();
labels.extend_from_slice(&[
identity.as_str(),
source_id.query_id.as_str(),
source_stage_id.as_str(),
source_task_id.as_str(),
]);

Some(
metrics
.metrics
.task_exchange_recv_row_number
.with_label_values(&labels[..]),
)
} else {
// no metrics to collect, no counter
None
};

loop {
if let Some(res) = source.take_data().await? {
if res.cardinality() == 0 {
debug!("Exchange source {:?} output empty chunk.", source);
}

if let Some(ref counter) = counter {
counter.inc_by(res.cardinality().try_into().unwrap());
}

yield res;
continue;
}

yield res;
continue;
break;
}
break;
}
}

Expand All @@ -244,31 +254,32 @@ mod tests {
#[tokio::test]
async fn test_exchange_multiple_sources() {
let context = ComputeNodeContext::for_test();
let mut sources = vec![];
let mut proto_sources = vec![];
let mut source_creators = vec![];
for _ in 0..2 {
let mut rng = rand::thread_rng();
let i = rng.gen_range(1..=100000);
let chunk = DataChunk::new(vec![array_nonnull! { I32Array, [i] }.into()], 1);
let chunks = vec![Some(chunk); 100];
let fake_exchange_source = FakeExchangeSource::new(chunks);
let fake_create_source = FakeCreateSource::new(fake_exchange_source);
let source = fake_create_source
.create_source(context.clone(), &ProstExchangeSource::default())
.await
.unwrap();
sources.push(source);
proto_sources.push(ProstExchangeSource::default());
source_creators.push(fake_create_source);
}

let executor = Box::new(GenericExchangeExecutor::<ComputeNodeContext> {
metrics: None,
sources,
context,
schema: Schema {
fields: vec![Field::unnamed(DataType::Int32)],
let executor = Box::new(
GenericExchangeExecutor::<FakeCreateSource, ComputeNodeContext> {
metrics: None,
proto_sources,
source_creators,
context,
schema: Schema {
fields: vec![Field::unnamed(DataType::Int32)],
},
task_id: TaskId::default(),
identity: "GenericExchangeExecutor2".to_string(),
},
task_id: TaskId::default(),
identity: "GenericExchangeExecutor2".to_string(),
});
);

let mut stream = executor.execute();
let mut chunks: Vec<DataChunk> = vec![];
Expand Down

0 comments on commit b8bdfdf

Please sign in to comment.