-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Closed
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
For now, DataFusion hasn't support sum(distinct) yet. Though optimizer SingleDistinctToGroupBy (#1315) supports single distinct usage, there are more SQL scenes have not been covered.
For example, run the following unit test,
#[tokio::test]
async fn query_sum_distinct() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int64, true),
Field::new("c2", DataType::Int64, true),
]));
let data = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![
Some(0),
Some(1),
None,
Some(3),
Some(3),
])),
Arc::new(Int64Array::from(vec![
None,
Some(1),
Some(1),
Some(2),
Some(2),
])),
],
)?;
let table = MemTable::try_new(schema, vec![vec![data]])?;
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table))?;
let sql = "SELECT AVG(c1), SUM(DISTINCT c2) FROM test";
let actual = execute_to_batches(&ctx, sql).await;
Ok(())
}
then error raises like
NotImplemented(\"SUM(DISTINCT) aggregations are not available\") at Creating physical plan for 'SELECT AVG(c1), SUM(DISTINCT c2) FROM test': Projection: #AVG(test.c1), #SUM(DISTINCT test.c2)\n Aggregate: groupBy=[[]], aggr=[[AVG(#test.c1), SUM(DISTINCT #test.c2)]]\n TableScan: test projection=Some([0, 1])
Describe the solution you'd like
Introduces expressions::DistinctSum into DF
- Maintains a
HashSetto record unique numeric list, updateHashSetwhen new item input. - Aggregate state stores in
ScalarValue::List, which Built fromHashSet. - During Final aggregate, evaluate
DistinctSumby computing sum from numeric stored inHashSet
Describe alternatives you've considered
No.
Additional context
No.
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request