Skip to content

Commit

Permalink
Merge pull request #9954 from zhang2014/chore/transform_on_finish
Browse files Browse the repository at this point in the history
chore(processor): add on_start and on_finish for transform
  • Loading branch information
zhang2014 authored Feb 10, 2023
2 parents 147f146 + 0d8a1cb commit 7b9495f
Showing 1 changed file with 42 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,23 @@ pub trait Transform: Send {
fn name(&self) -> String {
Self::NAME.to_string()
}

fn on_start(&mut self) -> Result<()> {
Ok(())
}

fn on_finish(&mut self) -> Result<()> {
Ok(())
}
}

pub struct Transformer<T: Transform + 'static> {
transform: T,
input: Arc<InputPort>,
output: Arc<OutputPort>,

called_on_start: bool,
called_on_finish: bool,
input_data: Option<DataBlock>,
output_data: Option<DataBlock>,
}
Expand All @@ -52,6 +62,8 @@ impl<T: Transform + 'static> Transformer<T> {
transform: inner,
input_data: None,
output_data: None,
called_on_start: false,
called_on_finish: false,
}))
}
}
Expand All @@ -67,6 +79,10 @@ impl<T: Transform + 'static> Processor for Transformer<T> {
}

fn event(&mut self) -> Result<Event> {
if !self.called_on_start {
return Ok(Event::Sync);
}

match self.output.is_finished() {
true => self.finish_input(),
false if !self.output.can_push() => self.not_need_data(),
Expand All @@ -82,9 +98,21 @@ impl<T: Transform + 'static> Processor for Transformer<T> {
}

fn process(&mut self) -> Result<()> {
if !self.called_on_start {
self.called_on_start = true;
self.transform.on_start()?;
return Ok(());
}

if let Some(data_block) = self.input_data.take() {
let data_block = self.transform.transform(data_block)?;
self.output_data = Some(data_block);
return Ok(());
}

if !self.called_on_finish {
self.called_on_finish = true;
self.transform.on_finish()?;
}

Ok(())
Expand All @@ -99,8 +127,13 @@ impl<T: Transform> Transformer<T> {
}

if self.input.is_finished() {
self.output.finish();
return Ok(Event::Finished);
return match !self.called_on_finish {
true => Ok(Event::Sync),
false => {
self.output.finish();
Ok(Event::Finished)
}
};
}

self.input.set_need_data();
Expand All @@ -113,7 +146,12 @@ impl<T: Transform> Transformer<T> {
}

fn finish_input(&mut self) -> Result<Event> {
self.input.finish();
Ok(Event::Finished)
match !self.called_on_finish {
true => Ok(Event::Sync),
false => {
self.input.finish();
Ok(Event::Finished)
}
}
}
}

1 comment on commit 7b9495f

@vercel
Copy link

@vercel vercel bot commented on 7b9495f Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-databend.vercel.app
databend-git-main-databend.vercel.app
databend.rs

Please sign in to comment.