Skip to content

Commit

Permalink
Merge pull request #4307 from sundy-li/count-optimize
Browse files Browse the repository at this point in the history
Optimizer: improve trivial count
  • Loading branch information
BohuTANG authored Mar 3, 2022
2 parents f65d570 + fce6426 commit 16e06e4
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 62 deletions.
22 changes: 8 additions & 14 deletions common/functions/src/aggregates/aggregate_count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,14 @@ impl AggregateCountFunction {
}))
}

pub fn try_create_own(
display_name: &str,
_params: Vec<DataValue>,
arguments: Vec<DataField>,
) -> Result<Self> {
assert_variadic_arguments(display_name, arguments.len(), (0, 1))?;
Ok(AggregateCountFunction {
display_name: display_name.to_string(),
arguments,
nullable: false,
})
}

pub fn desc() -> AggregateFunctionDescription {
AggregateFunctionDescription::creator(Box::new(Self::try_create))
let properties = super::aggregate_function_factory::AggregateFunctionProperties {
returns_default_when_only_null: true,
};
AggregateFunctionDescription::creator_with_properties(
Box::new(Self::try_create),
properties,
)
}
}

Expand Down Expand Up @@ -96,6 +89,7 @@ impl AggregateFunction for AggregateCountFunction {
input_rows: usize,
) -> Result<()> {
let state = place.get::<AggregateCountState>();

let nulls = match validity {
Some(b) => b.null_count(),
None => 0,
Expand Down
55 changes: 25 additions & 30 deletions query/src/optimizers/optimizer_statistics_exact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use common_exception::Result;
use common_planners::AggregatorFinalPlan;
use common_planners::AggregatorPartialPlan;
use common_planners::Expression;
use common_planners::ExpressionPlan;
use common_planners::PlanBuilder;
use common_planners::PlanNode;
use common_planners::PlanRewriter;
Expand Down Expand Up @@ -53,35 +52,30 @@ impl PlanRewriter for StatisticsExactImpl<'_> {
ref args,
..
}],
PlanNode::Expression(ExpressionPlan { input, .. }),
) if op == "count" && args.len() == 1 => match (&args[0], input.as_ref()) {
(Expression::Literal { .. }, PlanNode::ReadSource(read_source_plan))
if read_source_plan.statistics.is_exact =>
{
let db_name = "system";
let table_name = "one";

futures::executor::block_on(async move {
let table = self.ctx.get_table(db_name, table_name).await?;
let source_plan = table.read_plan(self.ctx.clone(), None).await?;
let dummy_read_plan = PlanNode::ReadSource(source_plan);

let expr = Expression::create_literal_with_type(
DataValue::UInt64(read_source_plan.statistics.read_rows as u64),
u64::to_data_type(),
);

self.rewritten = true;
let alias_name = plan.aggr_expr[0].column_name();
PlanBuilder::from(&dummy_read_plan)
.expression(&[expr.clone()], "Exact Statistics")?
.project(&[expr.alias(&alias_name)])?
.build()
})?
}
_ => PlanNode::AggregatorPartial(plan.clone()),
},
(_, _, _) => PlanNode::AggregatorPartial(plan.clone()),
PlanNode::ReadSource(read_source_plan),
) if op == "count" && args.is_empty() && read_source_plan.statistics.is_exact => {
let db_name = "system";
let table_name = "one";

futures::executor::block_on(async move {
let table = self.ctx.get_table(db_name, table_name).await?;
let source_plan = table.read_plan(self.ctx.clone(), None).await?;
let dummy_read_plan = PlanNode::ReadSource(source_plan);

let expr = Expression::create_literal_with_type(
DataValue::UInt64(read_source_plan.statistics.read_rows as u64),
u64::to_data_type(),
);

self.rewritten = true;
let alias_name = plan.aggr_expr[0].column_name();
PlanBuilder::from(&dummy_read_plan)
.expression(&[expr.clone()], "Exact Statistics")?
.project(&[expr.alias(&alias_name)])?
.build()
})?
}
_ => PlanNode::AggregatorPartial(plan.clone()),
};
Ok(new_plan)
}
Expand Down Expand Up @@ -118,6 +112,7 @@ impl Optimizer for StatisticsExactOptimizer {
)
)
*/

let mut visitor = StatisticsExactImpl {
ctx: &self.ctx,
rewritten: false,
Expand Down
14 changes: 9 additions & 5 deletions query/src/sql/statements/analyzer_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,18 @@ impl ExpressionAnalyzer {
};
}

if info.name.eq_ignore_ascii_case("count")
&& !args.is_empty()
&& matches!(args[0], Expression::Wildcard)
{
let optimize_remove_count_args = info.name.eq_ignore_ascii_case("count")
&& !info.distinct
&& (args.len() == 1 && matches!(args[0], Expression::Wildcard)
|| args.iter().all(
|expr| matches!(expr, Expression::Literal { value, .. } if !value.is_null()),
));

if optimize_remove_count_args {
Ok(Expression::AggregateFunction {
op: info.name.clone(),
distinct: info.distinct,
args: vec![common_planners::lit(0i64)],
args: vec![],
params: parameters,
})
} else {
Expand Down
12 changes: 4 additions & 8 deletions query/tests/it/optimizers/optimizer_statistics_exact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,21 @@ fn test_statistics_exact_optimizer() -> Result<()> {
op: "count".to_string(),
distinct: false,
params: vec![],
args: vec![Expression::create_literal(DataValue::UInt64(0))],
args: vec![],
};

let plan = PlanBuilder::from(&source_plan)
.expression(
&[Expression::create_literal(DataValue::UInt64(0))],
"Before GroupBy",
)?
.aggregate_partial(&[aggr_expr.clone()], &[])?
.aggregate_final(source_plan.schema(), &[aggr_expr], &[])?
.project(&[Expression::Column("count(0)".to_string())])?
.project(&[Expression::Column("count()".to_string())])?
.build()?;

let mut statistics_exact = StatisticsExactOptimizer::create(ctx);
let optimized = statistics_exact.optimize(&plan)?;

let expect = "\
Projection: count(0):UInt64\
\n Projection: 10000 as count(0):UInt64\
Projection: count():UInt64\
\n Projection: 10000 as count():UInt64\
\n Expression: 10000:UInt64 (Exact Statistics)\
\n ReadDataSource: scan schema: [dummy:UInt8], statistics: [read_rows: 1, read_bytes: 1, partitions_scanned: 1, partitions_total: 1]";
let actual = format!("{:?}", optimized);
Expand Down
10 changes: 5 additions & 5 deletions query/tests/it/storages/fuse/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ async fn test_fuse_history_optimize_compact() -> Result<()> {
// optimize compact should keep the histories
// there should be 6 history items there, 5 for the above insertions, 1 for that compaction
let expected = vec![
"+----------+",
"| count(0) |",
"+----------+",
"| 6 |",
"+----------+",
"+---------+",
"| count() |",
"+---------+",
"| 6 |",
"+---------+",
];
let qry = format!("select count(*) from fuse_history('{}', '{}')", db, tbl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
1
1000
3000
0
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ select 1, sum(number) from numbers_mt(1000000);
select count(*) = count(1) from numbers(1000);
select count(1) from numbers(1000);
select sum(3) from numbers(1000);
select count(null) from numbers(1000);
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Projection: count():UInt64
Projection: 3 as count():UInt64
Expression: 3:UInt64 (Exact Statistics)
ReadDataSource: scan schema: [dummy:UInt8], statistics: [read_rows: 1, read_bytes: 1, partitions_scanned: 1, partitions_total: 1]
1
1
5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ insert into t values (7);

optimize table t compact;

-- optimize exact
explain select count(1) from t;

-- expects 4 history items, 3 of previous insertion, 1 for last compaction
select count(*)=4 from fuse_history('db_09_0008', 't');

Expand Down

0 comments on commit 16e06e4

Please sign in to comment.