Skip to content

Commit

Permalink
Merge branch 'main' into left
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 authored Jun 26, 2023
2 parents ffcb83e + f2ffd0f commit 297d173
Show file tree
Hide file tree
Showing 24 changed files with 102 additions and 102 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions src/query/pipeline/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,17 @@ impl Pipeline {
}

/// Add a ResizePipe to pipes
pub fn resize(&mut self, new_size: usize) -> Result<()> {
pub fn try_resize(&mut self, new_size: usize) -> Result<()> {
self.resize(new_size, false)
}

pub fn resize(&mut self, new_size: usize, force: bool) -> Result<()> {
match self.pipes.last() {
None => Err(ErrorCode::Internal("Cannot resize empty pipe.")),
Some(pipe) if pipe.output_length == 0 => {
Err(ErrorCode::Internal("Cannot resize empty pipe."))
}
Some(pipe) if pipe.output_length == new_size => Ok(()),
Some(pipe) if !force && pipe.output_length == new_size => Ok(()),
Some(pipe) => {
let processor = ResizeProcessor::create(pipe.output_length, new_size);
let inputs_port = processor.get_inputs().to_vec();
Expand Down
1 change: 0 additions & 1 deletion src/query/pipeline/sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ common-storage = { path = "../../../common/storage" }

async-trait = { version = "0.1.57", package = "async-trait-fn" }
bstr = "1.0.1"
crossbeam-channel = "0.5.6"
csv-core = "0.1.10"
dashmap = "5.4.0"
futures = "0.3.24"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use common_arrow::parquet::metadata::RowGroupMetaData;
use common_arrow::parquet::read::read_metadata;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::BlockMetaInfo;
use common_expression::DataBlock;
use common_expression::DataField;
use common_expression::DataSchema;
Expand Down Expand Up @@ -236,6 +237,7 @@ impl DynData for SplitMeta {
}
}

#[derive(serde::Serialize, serde::Deserialize)]
pub struct RowGroupInMemory {
pub split_info: String,
pub meta: RowGroupMetaData,
Expand All @@ -257,6 +259,21 @@ impl RowBatchTrait for RowGroupInMemory {
}
}

#[typetag::serde(name = "row_batch_parquet")]
impl BlockMetaInfo for RowGroupInMemory {
fn as_any(&self) -> &dyn Any {
self
}

fn equals(&self, _info: &Box<dyn BlockMetaInfo>) -> bool {
unreachable!("RowGroupInMemory as BlockMetaInfo is not expected to be compared.")
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
unreachable!("RowGroupInMemory as BlockMetaInfo is not expected to be cloned.")
}
}

impl RowGroupInMemory {
fn read<R: Read + Seek>(
split_info: String,
Expand Down
18 changes: 18 additions & 0 deletions src/query/pipeline/sources/src/input_formats/input_format_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
Expand All @@ -22,6 +23,7 @@ use common_compress::DecompressState;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::string::StringColumnBuilder;
use common_expression::BlockMetaInfo;
use common_expression::Column;
use common_expression::ColumnBuilder;
use common_expression::DataBlock;
Expand Down Expand Up @@ -352,6 +354,7 @@ impl<T: InputFormatTextBase> InputFormat for T {
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct RowBatch {
pub data: Vec<u8>,
pub row_ends: Vec<usize>,
Expand Down Expand Up @@ -388,6 +391,21 @@ impl RowBatchTrait for RowBatch {
}
}

#[typetag::serde(name = "row_batch")]
impl BlockMetaInfo for RowBatch {
fn as_any(&self) -> &dyn Any {
self
}

fn equals(&self, _info: &Box<dyn BlockMetaInfo>) -> bool {
unreachable!("RowBatch as BlockMetaInfo is not expected to be compared.")
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
unreachable!("RowBatch as BlockMetaInfo is not expected to be cloned.")
}
}

pub struct AligningStateMaybeCompressed<T: InputFormatTextBase> {
#[allow(unused)]
ctx: Arc<InputContext>,
Expand Down
23 changes: 9 additions & 14 deletions src/query/pipeline/sources/src/input_formats/input_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_base::runtime::TrySpawn;
use common_compress::CompressAlgorithm;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::BlockMetaInfo;
use common_expression::DataBlock;
use common_pipeline_core::Pipeline;
use futures::AsyncRead;
Expand Down Expand Up @@ -81,7 +82,7 @@ impl ReadBatchTrait for Vec<u8> {
}
}

pub trait RowBatchTrait: Send {
pub trait RowBatchTrait: Send + BlockMetaInfo {
fn size(&self) -> usize;
fn rows(&self) -> usize;
}
Expand Down Expand Up @@ -267,31 +268,25 @@ pub trait InputFormatPipe: Sized + Send + 'static {
pipeline: &mut Pipeline,
) -> Result<()> {
let n_threads = ctx.settings.get_max_threads()? as usize;
let max_aligner = match ctx.plan {
let max_aligner = match &ctx.plan {
InputPlan::CopyInto(_) => ctx.splits.len(),
InputPlan::StreamingLoad(StreamPlan { is_multi_part, .. }) => {
if is_multi_part {
if *is_multi_part {
3
} else {
1
}
}
};
let (row_batch_tx, row_batch_rx) = crossbeam_channel::bounded(n_threads);
pipeline.add_source(
|output| {
Aligner::<Self>::try_create(
output,
ctx.clone(),
split_rx.clone(),
row_batch_tx.clone(),
)
},
|output| Aligner::<Self>::try_create(output, ctx.clone(), split_rx.clone()),
std::cmp::min(max_aligner, n_threads),
)?;
pipeline.resize(n_threads)?;
// aligners may own files of different sizes, so we need to balance the load
let force_balance = matches!(&ctx.plan, InputPlan::CopyInto(_));
pipeline.resize(n_threads, force_balance)?;
pipeline.add_transform(|input, output| {
DeserializeTransformer::<Self>::create(ctx.clone(), input, output, row_batch_rx.clone())
DeserializeTransformer::<Self>::create(ctx.clone(), input, output)
})?;
Ok(())
}
Expand Down
31 changes: 9 additions & 22 deletions src/query/pipeline/sources/src/input_formats/source_aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use common_base::base::tokio::sync::mpsc::Receiver;
use common_base::base::ProgressValues;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::processors::Processor;
use crossbeam_channel::TrySendError;

use crate::input_formats::input_pipeline::AligningStateTrait;
use crate::input_formats::input_pipeline::InputFormatPipe;
Expand All @@ -50,21 +50,18 @@ pub struct Aligner<I: InputFormatPipe> {

// output
row_batches: VecDeque<I::RowBatch>,
row_batch_tx: crossbeam_channel::Sender<I::RowBatch>,
}

impl<I: InputFormatPipe> Aligner<I> {
pub(crate) fn try_create(
output: Arc<OutputPort>,
ctx: Arc<InputContext>,
split_rx: async_channel::Receiver<Result<Split<I>>>,
batch_tx: crossbeam_channel::Sender<I::RowBatch>,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(Box::new(Self {
ctx,
output,
split_rx,
row_batch_tx: batch_tx,
state: None,
read_batch: None,
batch_rx: None,
Expand All @@ -86,27 +83,17 @@ impl<I: InputFormatPipe> Processor for Aligner<I> {
}

fn event(&mut self) -> Result<Event> {
if self.no_more_split && self.row_batches.is_empty() && self.read_batch.is_none() {
if self.output.is_finished() {
Ok(Event::Finished)
} else if self.no_more_split && self.row_batches.is_empty() && self.read_batch.is_none() {
self.output.finish();
Ok(Event::Finished)
} else if !self.output.can_push() {
Ok(Event::NeedConsume)
} else if let Some(rb) = self.row_batches.pop_front() {
match self.row_batch_tx.try_send(rb) {
Ok(()) => {
tracing::debug!("aligner send row batch ok");
self.output.push_data(Err(ErrorCode::Ok("")));
Ok(Event::NeedConsume)
}
Err(TrySendError::Full(b)) => {
tracing::debug!("aligner send row batch full");
self.row_batches.push_front(b);
Ok(Event::NeedConsume)
}
Err(TrySendError::Disconnected(_)) => {
tracing::debug!("aligner send row batch disconnected");
self.output.finish();
Ok(Event::Finished)
}
}
let block = DataBlock::empty_with_meta(Box::new(rb));
self.output.push_data(Ok(block));
Ok(Event::NeedConsume)
} else if self.read_batch.is_some() || self.is_flushing_split {
Ok(Event::Sync)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use std::collections::VecDeque;
use std::sync::Arc;

use common_exception::Result;
use common_expression::BlockMetaInfoDowncast;
use common_expression::DataBlock;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::processors::Processor;
use crossbeam_channel::TryRecvError;

use crate::input_formats::input_pipeline::BlockBuilderTrait;
use crate::input_formats::input_pipeline::InputFormatPipe;
Expand Down Expand Up @@ -59,7 +59,6 @@ pub struct DeserializeTransformer<I: InputFormatPipe> {
processor: DeserializeProcessor<I>,
input: Arc<InputPort>,
output: Arc<OutputPort>,
rx: crossbeam_channel::Receiver<I::RowBatch>,
flushing: bool,
}

Expand All @@ -68,14 +67,12 @@ impl<I: InputFormatPipe> DeserializeTransformer<I> {
ctx: Arc<InputContext>,
input: Arc<InputPort>,
output: Arc<OutputPort>,
rx: crossbeam_channel::Receiver<I::RowBatch>,
) -> Result<ProcessorPtr> {
let processor = DeserializeProcessor::create(ctx)?;
Ok(ProcessorPtr::create(Box::new(Self {
processor,
input,
output,
rx,
flushing: false,
})))
}
Expand Down Expand Up @@ -108,38 +105,22 @@ impl<I: InputFormatPipe> Processor for DeserializeTransformer<I> {
None => {
if self.processor.input_buffer.is_some() {
Ok(Event::Sync)
} else {
if self.input.has_data() {
self.input.pull_data();
match self.rx.try_recv() {
Ok(read_batch) => {
self.processor.input_buffer = Some(read_batch);
return Ok(Event::Sync);
}
Err(TryRecvError::Disconnected) => {
tracing::warn!("DeserializeTransformer rx disconnected");
self.input.finish();
self.flushing = true;
return Ok(Event::Finished);
}
Err(TryRecvError::Empty) => {
// do nothing
}
}
}
// !has_data() or try_recv return Empty
if self.input.is_finished() {
if self.flushing {
self.output.finish();
Ok(Event::Finished)
} else {
self.flushing = true;
Ok(Event::Sync)
}
} else if self.input.has_data() {
let block = self.input.pull_data().unwrap()?;
let block_meta = block.get_owned_meta().unwrap();
self.processor.input_buffer = I::RowBatch::downcast_from(block_meta);
Ok(Event::Sync)
} else if self.input.is_finished() {
if self.flushing {
self.output.finish();
Ok(Event::Finished)
} else {
self.input.set_need_data();
Ok(Event::NeedData)
self.flushing = true;
Ok(Event::Sync)
}
} else {
self.input.set_need_data();
Ok(Event::NeedData)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/api/rpc/exchange/exchange_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ExchangeSink {
)]));
}

pipeline.resize(1)?;
pipeline.try_resize(1)?;
assert_eq!(flight_senders.len(), 1);
let item = create_writer_item(flight_senders.remove(0));
pipeline.add_pipe(Pipe::create(1, 0, vec![item]));
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/api/rpc/exchange/exchange_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ pub fn via_exchange_source(
let last_output_len = pipeline.output_len();
exchange_source_reader::via_reader(last_output_len, pipeline, flight_receivers);

pipeline.resize(last_output_len)?;
pipeline.try_resize(last_output_len)?;
injector.apply_merge_deserializer(params, pipeline)
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl ExchangeTransform {
pipeline.add_pipe(Pipe::create(len, new_outputs, items));

if params.exchange_injector.exchange_sorting().is_none() {
pipeline.resize(max_threads)?;
pipeline.try_resize(max_threads)?;
}

injector.apply_shuffle_deserializer(params, pipeline)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl Interpreter for RefreshIndexInterpreter {

let data_accessor = self.ctx.get_data_operator()?;

build_res.main_pipeline.resize(1)?;
build_res.main_pipeline.try_resize(1)?;
build_res.main_pipeline.add_sink(|input| {
AggIndexSink::try_create(
input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl PipelinePushingExecutor {
source_pipe_builder.add_source(output, pushing_source);

new_pipeline.add_pipe(source_pipe_builder.finalize());
new_pipeline.resize(pipeline.input_len())?;
new_pipeline.try_resize(pipeline.input_len())?;
for pipe in &pipeline.pipes {
new_pipeline.add_pipe(pipe.clone())
}
Expand Down
Loading

0 comments on commit 297d173

Please sign in to comment.