Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
leiysky committed May 17, 2021
1 parent f26fbfc commit 4766d2d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 25 deletions.
14 changes: 12 additions & 2 deletions fusequery/query/src/pipelines/processors/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::pipelines::transforms::FilterTransform;
use crate::pipelines::transforms::GroupByFinalTransform;
use crate::pipelines::transforms::GroupByPartialTransform;
use crate::pipelines::transforms::LimitTransform;
use crate::pipelines::transforms::NestedLoopJoinTransform;
use crate::pipelines::transforms::ProjectionTransform;
use crate::pipelines::transforms::RemoteTransform;
use crate::pipelines::transforms::SortMergeTransform;
Expand Down Expand Up @@ -96,8 +97,8 @@ impl PipelineBuilder {
Ok(pipeline)
}

fn visit_stage_plan(&self, pipeline: &mut Pipeline, plan: &&StagePlan) -> Result<bool> {
let executors = PlanScheduler::reschedule(self.ctx.clone(), &plan.input.as_ref())?;
fn visit_stage_plan(&self, pipeline: &mut Pipeline, plan: &StagePlan) -> Result<bool> {
let executors = PlanScheduler::reschedule(self.ctx.clone(), plan.input.as_ref())?;

// If the executors is not empty.
if !executors.is_empty() {
Expand Down Expand Up @@ -285,6 +286,15 @@ impl PipelineBuilder {
}

fn visit_join_plan(&self, pipeline: &mut Pipeline, plan: &JoinPlan) -> Result<bool> {
let left = PipelineBuilder::create(self.ctx.clone(), *plan.left_child).build()?;
let right = PipelineBuilder::create(self.ctx.clone(), *plan.right_child).build()?;
pipeline.add_simple_transform(|| {
Box::new(NestedLoopJoinTransform::try_create(
plan.schema().clone(),
left,
right
)?)
});
Ok(false)
}
}
1 change: 1 addition & 0 deletions fusequery/query/src/pipelines/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub use transform_expression::ExpressionTransform;
pub use transform_filter::FilterTransform;
pub use transform_groupby_final::GroupByFinalTransform;
pub use transform_groupby_partial::GroupByPartialTransform;
pub use transform_join::NestedLoopJoinTransform;
pub use transform_limit::LimitTransform;
pub use transform_projection::ProjectionTransform;
pub use transform_remote::RemoteTransform;
Expand Down
71 changes: 48 additions & 23 deletions fusequery/query/src/pipelines/transforms/transform_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
use std::any::Any;
use std::sync::Arc;

use common_arrow::arrow::array::new_empty_array;
use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataValue;
use common_exception::Result;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
Expand Down Expand Up @@ -68,34 +68,59 @@ impl IProcessor for NestedLoopJoinTransform {
let left_buffer = read_left_task.await.unwrap()?;
let right_buffer = read_right_task.await.unwrap()?;

let result_columns = self
.schema
.fields()
.into_iter()
.map(|field| new_empty_array(field.data_type()))
.collect();
let result_block = DataBlock::create(self.schema.clone(), result_columns.clone());
// let result = left_buffer
// .into_iter()
// .map(|inner_block| {
// right_buffer
// .iter()
// .cloned()
// .map(|outer_block| {
// let mut joined_blocks: Vec<DataBlock> = vec![];
// for i in 0..inner_block.num_rows() {
// let mut result_columns = vec![];
// for column in inner_block.columns() {
// let inner_value = DataValue::try_from_array(column, i).unwrap();
// result_columns.push(
// inner_value
// .to_array_with_size(outer_block.num_rows())
// .unwrap()
// );
// }
// for column in outer_block.columns() {
// result_columns.push(column.clone());
// }
// joined_blocks
// .push(DataBlock::create(self.schema.clone(), result_columns));
// }
// joined_blocks
// })
// .flatten()
// })
// .flatten()
// .collect();

let result = right_buffer
.iter()
.flat_map(|outer_block| {
left_buffer.iter().flat_map(|inner_block| {
let mut joined_blocks = vec![];
for i in 0..inner_block.num_rows() {
let block = result_block.clone();
for j in 0..outer_block.num_rows() {
let columns = block.columns();
}
let mut joined_blocks: Vec<DataBlock> = vec![];
for outer_block in &right_buffer {
for inner_block in &left_buffer {
for i in 0..inner_block.num_rows() {
let mut result_columns = vec![];
for column in inner_block.columns() {
let inner_value = DataValue::try_from_array(column, i)?;
result_columns
.push(inner_value.to_array_with_size(outer_block.num_rows())?);
}
joined_blocks
})
})
.collect();
for column in outer_block.columns() {
result_columns.push(column.clone());
}
joined_blocks.push(DataBlock::create(self.schema.clone(), result_columns));
}
}
}

Ok(Box::pin(DataBlockStream::create(
self.schema.clone(),
None,
result
joined_blocks
)))
}
}

0 comments on commit 4766d2d

Please sign in to comment.