Skip to content

Commit

Permalink
Merge pull request #10125 from zhang2014/refactor/aggregator
Browse files Browse the repository at this point in the history
refactor(query): use accumulating to impl single state aggregator
  • Loading branch information
zhang2014 authored Feb 19, 2023
2 parents 73f6585 + a961217 commit 72bde50
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 214 deletions.
18 changes: 0 additions & 18 deletions src/query/functions/src/aggregates/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::fmt;
use std::sync::Arc;

use common_arrow::arrow::bitmap::Bitmap;
use common_base::runtime::ThreadPool;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::Column;
Expand Down Expand Up @@ -81,22 +79,6 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
// TODO append the value into the column builder
fn merge_result(&self, _place: StateAddr, _builder: &mut ColumnBuilder) -> Result<()>;

fn support_merge_parallel(&self) -> bool {
false
}

fn merge_parallel(
&self,
_pool: &mut ThreadPool,
_place: StateAddr,
_rhs: StateAddr,
) -> Result<()> {
Err(ErrorCode::Unimplemented(format!(
"merge_parallel is not implemented for {}",
self.name()
)))
}

// std::mem::needs_drop::<State>
// if true will call drop_state
fn need_manual_drop_state(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod sort;
pub mod transform;
pub mod transform_accumulating;
pub mod transform_block_compact;
pub mod transform_block_compact_no_split;
pub mod transform_compact;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use common_exception::Result;
use common_expression::DataBlock;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::Processor;

pub trait AccumulatingTransform: Send {
const NAME: &'static str;

fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>>;

fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
Ok(None)
}
}

pub struct AccumulatingTransformer<T: AccumulatingTransform + 'static> {
inner: T,
input: Arc<InputPort>,
output: Arc<OutputPort>,

called_on_finish: bool,
input_data: Option<DataBlock>,
output_data: Option<DataBlock>,
}

impl<T: AccumulatingTransform + 'static> AccumulatingTransformer<T> {
pub fn create(input: Arc<InputPort>, output: Arc<OutputPort>, inner: T) -> Box<dyn Processor> {
Box::new(Self {
inner,
input,
output,
input_data: None,
output_data: None,
called_on_finish: false,
})
}
}

impl<T: AccumulatingTransform + 'static> Drop for AccumulatingTransformer<T> {
fn drop(&mut self) {
if !self.called_on_finish {
self.inner.on_finish(false).unwrap();
}
}
}

#[async_trait::async_trait]
impl<T: AccumulatingTransform + 'static> Processor for AccumulatingTransformer<T> {
fn name(&self) -> String {
String::from(T::NAME)
}

fn as_any(&mut self) -> &mut dyn Any {
self
}

fn event(&mut self) -> Result<Event> {
if self.output.is_finished() {
if !self.called_on_finish {
return Ok(Event::Sync);
}

self.input.finish();
return Ok(Event::Finished);
}

if !self.output.can_push() {
self.input.set_not_need_data();
return Ok(Event::NeedConsume);
}

if let Some(data_block) = self.output_data.take() {
self.output.push_data(Ok(data_block));
return Ok(Event::NeedConsume);
}

if self.input_data.is_some() {
return Ok(Event::Sync);
}

if self.input.has_data() {
self.input_data = Some(self.input.pull_data().unwrap()?);
return Ok(Event::Sync);
}

if self.input.is_finished() {
return match !self.called_on_finish {
true => Ok(Event::Sync),
false => {
self.output.finish();
Ok(Event::Finished)
}
};
}

self.input.set_need_data();
Ok(Event::NeedData)
}

fn process(&mut self) -> Result<()> {
if let Some(data_block) = self.input_data.take() {
self.output_data = self.inner.transform(data_block)?;
return Ok(());
}

if !self.called_on_finish {
self.called_on_finish = true;
self.output_data = self.inner.on_finish(true)?;
}

Ok(())
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::pipelines::processors::transforms::aggregator::aggregate_info::Aggreg
use crate::pipelines::processors::transforms::aggregator::aggregator_final_parallel::ParallelFinalAggregator;
use crate::pipelines::processors::transforms::aggregator::AggregateHashStateInfo;
use crate::pipelines::processors::transforms::aggregator::PartialAggregator;
use crate::pipelines::processors::transforms::aggregator::SingleStateAggregator;
use crate::pipelines::processors::transforms::group_by::KeysColumnBuilder;
use crate::pipelines::processors::transforms::group_by::PartitionedHashMethod;
use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper;
Expand Down Expand Up @@ -240,16 +239,6 @@ where
}
}

impl PartitionedAggregatorLike for SingleStateAggregator<true> {
const SUPPORT_TWO_LEVEL: bool = false;
type PartitionedAggregator = SingleStateAggregator<true>;
}

impl PartitionedAggregatorLike for SingleStateAggregator<false> {
const SUPPORT_TWO_LEVEL: bool = false;
type PartitionedAggregator = SingleStateAggregator<false>;
}

impl<Method, const HAS_AGG: bool> PartitionedAggregatorLike
for ParallelFinalAggregator<HAS_AGG, Method>
where
Expand Down
Loading

1 comment on commit 72bde50

@vercel
Copy link

@vercel vercel bot commented on 72bde50 Feb 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.rs

Please sign in to comment.