Skip to content

Commit

Permalink
Merge pull request #9782 from zhang2014/refactor/new_pipe
Browse files Browse the repository at this point in the history
refactor(executor): merge simple pipe and resize pipe
  • Loading branch information
BohuTANG authored Jan 30, 2023
2 parents 6c6f2ea + ee85049 commit 5467558
Show file tree
Hide file tree
Showing 29 changed files with 385 additions and 506 deletions.
1 change: 0 additions & 1 deletion src/query/pipeline/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub mod pipeline;
pub mod pipeline_display;
pub mod unsafe_cell_wrap;

pub use pipe::Pipe;
pub use pipe::SinkPipeBuilder;
pub use pipe::SourcePipeBuilder;
pub use pipe::TransformPipeBuilder;
Expand Down
121 changes: 42 additions & 79 deletions src/query/pipeline/core/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,131 +19,95 @@ use crate::processors::port::OutputPort;
use crate::processors::processor::ProcessorPtr;

#[derive(Clone)]
pub enum Pipe {
SimplePipe {
processors: Vec<ProcessorPtr>,
inputs_port: Vec<Arc<InputPort>>,
outputs_port: Vec<Arc<OutputPort>>,
},
ResizePipe {
processor: ProcessorPtr,
inputs_port: Vec<Arc<InputPort>>,
outputs_port: Vec<Arc<OutputPort>>,
},
pub struct PipeItem {
pub processor: ProcessorPtr,
pub inputs_port: Vec<Arc<InputPort>>,
pub outputs_port: Vec<Arc<OutputPort>>,
}

impl Pipe {
pub fn size(&self) -> usize {
match self {
Pipe::ResizePipe { .. } => 1,
Pipe::SimplePipe { processors, .. } => processors.len(),
}
}

pub fn input_size(&self) -> usize {
match self {
Pipe::SimplePipe { inputs_port, .. } => inputs_port.len(),
Pipe::ResizePipe { inputs_port, .. } => inputs_port.len(),
impl PipeItem {
pub fn create(
proc: ProcessorPtr,
inputs: Vec<Arc<InputPort>>,
outputs: Vec<Arc<OutputPort>>,
) -> PipeItem {
PipeItem {
processor: proc,
inputs_port: inputs,
outputs_port: outputs,
}
}
}

pub fn output_size(&self) -> usize {
match self {
Pipe::SimplePipe { outputs_port, .. } => outputs_port.len(),
Pipe::ResizePipe { outputs_port, .. } => outputs_port.len(),
}
}
#[derive(Clone)]
pub struct Pipe {
pub items: Vec<PipeItem>,
pub input_length: usize,
pub output_length: usize,
}

pub fn processor_by_index(&self, index: usize) -> ProcessorPtr {
match self {
Pipe::SimplePipe { processors, .. } => processors[index].clone(),
Pipe::ResizePipe { processor, .. } => processor.clone(),
impl Pipe {
pub fn create(inputs: usize, outputs: usize, items: Vec<PipeItem>) -> Pipe {
Pipe {
items,
input_length: inputs,
output_length: outputs,
}
}
}

#[derive(Clone)]
pub struct SourcePipeBuilder {
processors: Vec<ProcessorPtr>,
outputs_port: Vec<Arc<OutputPort>>,
items: Vec<PipeItem>,
}

impl SourcePipeBuilder {
pub fn create() -> SourcePipeBuilder {
SourcePipeBuilder {
processors: vec![],
outputs_port: vec![],
}
SourcePipeBuilder { items: vec![] }
}

pub fn finalize(self) -> Pipe {
assert_eq!(self.processors.len(), self.outputs_port.len());
Pipe::SimplePipe {
processors: self.processors,
inputs_port: vec![],
outputs_port: self.outputs_port,
}
Pipe::create(0, self.items.len(), self.items)
}

pub fn add_source(&mut self, output_port: Arc<OutputPort>, source: ProcessorPtr) {
self.processors.push(source);
self.outputs_port.push(output_port);
self.items
.push(PipeItem::create(source, vec![], vec![output_port]));
}
}

#[allow(dead_code)]
pub struct SinkPipeBuilder {
processors: Vec<ProcessorPtr>,
inputs_port: Vec<Arc<InputPort>>,
items: Vec<PipeItem>,
}

#[allow(dead_code)]
impl SinkPipeBuilder {
pub fn create() -> SinkPipeBuilder {
SinkPipeBuilder {
processors: vec![],
inputs_port: vec![],
}
SinkPipeBuilder { items: vec![] }
}

pub fn finalize(self) -> Pipe {
assert_eq!(self.processors.len(), self.inputs_port.len());
Pipe::SimplePipe {
processors: self.processors,
inputs_port: self.inputs_port,
outputs_port: vec![],
}
Pipe::create(self.items.len(), 0, self.items)
}

pub fn add_sink(&mut self, inputs_port: Arc<InputPort>, sink: ProcessorPtr) {
self.processors.push(sink);
self.inputs_port.push(inputs_port);
self.items
.push(PipeItem::create(sink, vec![inputs_port], vec![]));
}
}

pub struct TransformPipeBuilder {
processors: Vec<ProcessorPtr>,
inputs_port: Vec<Arc<InputPort>>,
outputs_port: Vec<Arc<OutputPort>>,
items: Vec<PipeItem>,
}

impl TransformPipeBuilder {
pub fn create() -> TransformPipeBuilder {
TransformPipeBuilder {
processors: vec![],
inputs_port: vec![],
outputs_port: vec![],
}
TransformPipeBuilder { items: vec![] }
}

pub fn finalize(self) -> Pipe {
assert_eq!(self.processors.len(), self.inputs_port.len());
assert_eq!(self.processors.len(), self.outputs_port.len());
Pipe::SimplePipe {
processors: self.processors,
inputs_port: self.inputs_port,
outputs_port: self.outputs_port,
}
Pipe::create(self.items.len(), self.items.len(), self.items)
}

pub fn add_transform(
Expand All @@ -152,8 +116,7 @@ impl TransformPipeBuilder {
output: Arc<OutputPort>,
proc: ProcessorPtr,
) {
self.processors.push(proc);
self.inputs_port.push(input);
self.outputs_port.push(output);
self.items
.push(PipeItem::create(proc, vec![input], vec![output]));
}
}
38 changes: 22 additions & 16 deletions src/query/pipeline/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ use std::sync::Arc;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::pipe::Pipe;
use crate::pipe::PipeItem;
use crate::processors::port::InputPort;
use crate::processors::port::OutputPort;
use crate::processors::processor::ProcessorPtr;
use crate::processors::ResizeProcessor;
use crate::Pipe;
use crate::SinkPipeBuilder;
use crate::SourcePipeBuilder;
use crate::TransformPipeBuilder;
Expand Down Expand Up @@ -66,10 +67,14 @@ impl Pipeline {
}
}

pub fn is_empty(&self) -> bool {
self.pipes.is_empty()
}

// We need to push data to executor
pub fn is_pushing_pipeline(&self) -> Result<bool> {
match self.pipes.first() {
Some(pipe) => Ok(pipe.input_size() != 0),
Some(pipe) => Ok(pipe.input_length != 0),
None => Err(ErrorCode::Internal(
"Logical error, call is_pushing on empty pipeline.",
)),
Expand All @@ -79,7 +84,7 @@ impl Pipeline {
// We need to pull data from executor
pub fn is_pulling_pipeline(&self) -> Result<bool> {
match self.pipes.last() {
Some(pipe) => Ok(pipe.output_size() != 0),
Some(pipe) => Ok(pipe.output_length != 0),
None => Err(ErrorCode::Internal(
"Logical error, call is_pulling on empty pipeline.",
)),
Expand All @@ -102,23 +107,21 @@ impl Pipeline {
pub fn input_len(&self) -> usize {
match self.pipes.first() {
None => 0,
Some(Pipe::SimplePipe { inputs_port, .. }) => inputs_port.len(),
Some(Pipe::ResizePipe { inputs_port, .. }) => inputs_port.len(),
Some(pipe) => pipe.input_length,
}
}

pub fn output_len(&self) -> usize {
match self.pipes.last() {
None => 0,
Some(Pipe::SimplePipe { outputs_port, .. }) => outputs_port.len(),
Some(Pipe::ResizePipe { outputs_port, .. }) => outputs_port.len(),
Some(pipe) => pipe.output_length,
}
}

pub fn set_max_threads(&mut self, max_threads: usize) {
let mut max_pipe_size = 0;
for pipe in &self.pipes {
max_pipe_size = std::cmp::max(max_pipe_size, pipe.size());
max_pipe_size = std::cmp::max(max_pipe_size, pipe.items.len());
}

self.max_threads = std::cmp::min(max_pipe_size, max_threads);
Expand Down Expand Up @@ -178,19 +181,22 @@ impl Pipeline {
pub fn resize(&mut self, new_size: usize) -> Result<()> {
match self.pipes.last() {
None => Err(ErrorCode::Internal("Cannot resize empty pipe.")),
Some(pipe) if pipe.output_size() == 0 => {
Some(pipe) if pipe.output_length == 0 => {
Err(ErrorCode::Internal("Cannot resize empty pipe."))
}
Some(pipe) if pipe.output_size() == new_size => Ok(()),
Some(pipe) if pipe.output_length == new_size => Ok(()),
Some(pipe) => {
let processor = ResizeProcessor::create(pipe.output_size(), new_size);
let processor = ResizeProcessor::create(pipe.output_length, new_size);
let inputs_port = processor.get_inputs().to_vec();
let outputs_port = processor.get_outputs().to_vec();
self.pipes.push(Pipe::ResizePipe {
inputs_port,
outputs_port,
processor: ProcessorPtr::create(Box::new(processor)),
});
self.pipes
.push(Pipe::create(inputs_port.len(), outputs_port.len(), vec![
PipeItem::create(
ProcessorPtr::create(Box::new(processor)),
inputs_port,
outputs_port,
),
]));
Ok(())
}
}
Expand Down
Loading

1 comment on commit 5467558

@vercel
Copy link

@vercel vercel bot commented on 5467558 Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend-git-main-databend.vercel.app
databend.rs
databend.vercel.app

Please sign in to comment.