Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ordered aggregation #6189

Closed
mustafasrepo opened this issue May 2, 2023 · 0 comments · Fixed by #6332
Closed

Add support for ordered aggregation #6189

mustafasrepo opened this issue May 2, 2023 · 0 comments · Fixed by #6332
Labels
enhancement New feature or request

Comments

@mustafasrepo
Copy link
Contributor

mustafasrepo commented May 2, 2023

Is your feature request related to a problem or challenge?

For some aggregation functions input ordering changes the result emitted. In these cases, it is helpful to require desired ordering in the input. Consider query below

SELECT (ARRAY_AGG(s.amount ORDER BY s.sn DESC)) AS amounts
            FROM sales_global AS s
            GROUP BY s.ts

This query successfully runs in Postgre.
However, in datafusion it returns following message.
NotImplemented("ORDER BY not supported in ARRAY_AGG: s.sn DESC")

Describe the solution you'd like

I would like to have this support. With this feature in place, we can add new aggregate functions that makes sense with this feature, such as FIRST/ FIRST_VALUE, LAST/ LAST_VALUE etc.

Describe alternatives you've considered

No response

Additional context

To reproduce problem, you can use test below.

#[tokio::test]
async fn test_ordered_aggregation() -> Result<()> {
    let config = SessionConfig::new()
        .with_target_partitions(1);
    let ctx = SessionContext::with_config(config);
    ctx.sql("CREATE TABLE sales_global (
      sn INT PRIMARY KEY,
      ts TIMESTAMP,
      currency VARCHAR(3),
      amount INT
    ) as VALUES
      (1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.00),
      (2, '2022-01-01 11:30:00'::timestamp, 'EUR', 75.00),
      (3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.00),
      (4, '2022-01-03 10:00:00'::timestamp, 'EUR', 100.00)").await?;
    let sql = "SELECT (ARRAY_AGG(s.amount ORDER BY s.sn DESC)) AS amounts
        FROM sales_global AS s
        GROUP BY s.sn";

    let msg = format!("Creating logical plan for '{sql}'");
    let dataframe: DataFrame = ctx.sql(sql).await.expect(&msg);
    let physical_plan = dataframe.create_physical_plan().await?;
    let batches = collect(physical_plan, ctx.task_ctx()).await?;
    print_batches(&batches)?;
    Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant