Skip to content

Commit

Permalink
refactor(query): update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Feb 19, 2023
1 parent 00eda3a commit a961217
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub trait AccumulatingTransform: Send {

fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>>;

fn on_finish(&mut self) -> Result<Option<DataBlock>> {
fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
Ok(None)
}
}
Expand Down Expand Up @@ -58,7 +58,7 @@ impl<T: AccumulatingTransform + 'static> AccumulatingTransformer<T> {
impl<T: AccumulatingTransform + 'static> Drop for AccumulatingTransformer<T> {
fn drop(&mut self) {
if !self.called_on_finish {
self.inner.on_finish().unwrap();
self.inner.on_finish(false).unwrap();
}
}
}
Expand Down Expand Up @@ -124,7 +124,7 @@ impl<T: AccumulatingTransform + 'static> Processor for AccumulatingTransformer<T

if !self.called_on_finish {
self.called_on_finish = true;
self.inner.on_finish()?;
self.output_data = self.inner.on_finish(true)?;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,24 @@ impl AccumulatingTransform for PartialSingleStateAggregator {
Ok(None)
}

fn on_finish(&mut self) -> Result<Option<DataBlock>> {
let mut columns = Vec::with_capacity(self.funcs.len());
fn on_finish(&mut self, generate_data: bool) -> Result<Option<DataBlock>> {
let mut generate_data_block = None;

for (idx, func) in self.funcs.iter().enumerate() {
let place = self.places[idx];
if generate_data {
let mut columns = Vec::with_capacity(self.funcs.len());

for (idx, func) in self.funcs.iter().enumerate() {
let place = self.places[idx];

let mut data = Vec::with_capacity(4);
func.serialize(place, &mut data)?;
columns.push(BlockEntry {
data_type: DataType::String,
value: Value::Scalar(Scalar::String(data)),
});
let mut data = Vec::with_capacity(4);
func.serialize(place, &mut data)?;
columns.push(BlockEntry {
data_type: DataType::String,
value: Value::Scalar(Scalar::String(data)),
});
}

generate_data_block = Some(DataBlock::new(columns, 1));
}

// destroy states
Expand All @@ -126,7 +132,7 @@ impl AccumulatingTransform for PartialSingleStateAggregator {
}
}

Ok(Some(DataBlock::new(columns, 1)))
Ok(generate_data_block)
}
}

Expand Down Expand Up @@ -202,38 +208,44 @@ impl AccumulatingTransform for FinalSingleStateAggregator {
Ok(None)
}

fn on_finish(&mut self) -> Result<Option<DataBlock>> {
let mut aggr_values = {
let mut builders = vec![];
for func in &self.funcs {
let data_type = func.return_type()?;
builders.push(ColumnBuilder::with_capacity(&data_type, 1));
}
builders
};
fn on_finish(&mut self, generate_data: bool) -> Result<Option<DataBlock>> {
let mut generate_data_block = None;

let main_places = self.new_places();
for (index, func) in self.funcs.iter().enumerate() {
let main_place = main_places[index];
if generate_data {
let mut aggr_values = {
let mut builders = vec![];
for func in &self.funcs {
let data_type = func.return_type()?;
builders.push(ColumnBuilder::with_capacity(&data_type, 1));
}
builders
};

for place in self.to_merge_places[index].iter() {
func.merge(main_place, *place)?;
}
let main_places = self.new_places();
for (index, func) in self.funcs.iter().enumerate() {
let main_place = main_places[index];

let array = aggr_values[index].borrow_mut();
func.merge_result(main_place, array)?;
}
for place in self.to_merge_places[index].iter() {
func.merge(main_place, *place)?;
}

let mut columns = Vec::with_capacity(self.funcs.len());
for builder in aggr_values {
columns.push(builder.build());
}
let array = aggr_values[index].borrow_mut();
func.merge_result(main_place, array)?;
}

// destroy states
for (place, func) in main_places.iter().zip(self.funcs.iter()) {
if func.need_manual_drop_state() {
unsafe { func.drop_state(*place) }
let mut columns = Vec::with_capacity(self.funcs.len());
for builder in aggr_values {
columns.push(builder.build());
}

// destroy states
for (place, func) in main_places.iter().zip(self.funcs.iter()) {
if func.need_manual_drop_state() {
unsafe { func.drop_state(*place) }
}
}

generate_data_block = Some(DataBlock::new_from_columns(columns));
}

for (places, func) in self.to_merge_places.iter().zip(self.funcs.iter()) {
Expand All @@ -244,6 +256,6 @@ impl AccumulatingTransform for FinalSingleStateAggregator {
}
}

Ok(Some(DataBlock::new_from_columns(columns)))
Ok(generate_data_block)
}
}

0 comments on commit a961217

Please sign in to comment.