Skip to content

Window aggregates output order broken due to hash repartitioning #16888

@crepererum

Description

@crepererum

Describe the bug

It seems that under certain circumstances, window aggregation outputs are not sorted. In my test, this requires set datafusion.execution.batch_size = 1;, but there might be other cases.

To Reproduce

Use this sqllogictest:

The output is NOT deterministic!

statement ok
CREATE TABLE t (
    k VARCHAR,
    v Int,
    time TIMESTAMP WITH TIME ZONE
);

statement ok
INSERT INTO t (k, v, time) VALUES
    ('a', 1, '1970-01-01T00:01:00.00Z'),
    ('a', 1, '1970-01-01T00:02:00.00Z'),
    ('a', 1, '1970-01-01T00:03:00.00Z'),
    ('a', 2, '1970-01-01T00:03:00.00Z'),
    ('a', 1, '1970-01-01T00:04:00.00Z'),
    ('b', 3, '1970-01-01T00:01:00.00Z'),
    ('b', 3, '1970-01-01T00:02:00.00Z'),
    ('b', 4, '1970-01-01T00:03:00.00Z'),
    ('b', 4, '1970-01-01T00:03:00.00Z');

query TPI
SELECT
    k,
    time,
    COUNT(v) OVER (
        PARTITION BY k
        ORDER BY time
        RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
    ) AS normal_count
FROM t
ODER BY k, time;
----
a 1970-01-01T00:01:00Z 1
a 1970-01-01T00:02:00Z 2
a 1970-01-01T00:03:00Z 4
a 1970-01-01T00:03:00Z 4
a 1970-01-01T00:04:00Z 4
b 1970-01-01T00:01:00Z 1
b 1970-01-01T00:02:00Z 2
b 1970-01-01T00:03:00Z 4
b 1970-01-01T00:03:00Z 4

query TT
EXPLAIN SELECT
    k,
    time,
    COUNT(v) OVER (
        PARTITION BY k
        ORDER BY time
        RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
    ) AS normal_count
FROM t
ODER BY k, time;
----
logical_plan
01)Projection: oder.k, oder.time, count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS normal_count
02)--WindowAggr: windowExpr=[[count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]]
03)----SubqueryAlias: oder
04)------TableScan: t projection=[k, v, time]
physical_plan
01)ProjectionExec: expr=[k@0 as k, time@2 as time, count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as normal_count]
02)--BoundedWindowAggExec: wdw=[count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted]
03)----SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[false]
04)------DataSourceExec: partitions=1, partition_sizes=[1]


statement ok
set datafusion.execution.batch_size = 1;

query TPI
SELECT
    k,
    time,
    COUNT(v) OVER (
        PARTITION BY k
        ORDER BY time
        RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
    ) AS normal_count
FROM t
ODER BY k, time;
----
b 1970-01-01T00:01:00Z 1
b 1970-01-01T00:02:00Z 2
a 1970-01-01T00:01:00Z 1
a 1970-01-01T00:02:00Z 2
a 1970-01-01T00:03:00Z 4
a 1970-01-01T00:03:00Z 4
b 1970-01-01T00:03:00Z 4
b 1970-01-01T00:03:00Z 4
a 1970-01-01T00:04:00Z 4

query TT
EXPLAIN SELECT
    k,
    time,
    COUNT(v) OVER (
        PARTITION BY k
        ORDER BY time
        RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
    ) AS normal_count
FROM t
ODER BY k, time;
----
logical_plan
01)Projection: oder.k, oder.time, count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS normal_count
02)--WindowAggr: windowExpr=[[count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]]
03)----SubqueryAlias: oder
04)------TableScan: t projection=[k, v, time]
physical_plan
01)ProjectionExec: expr=[k@0 as k, time@2 as time, count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as normal_count]
02)--BoundedWindowAggExec: wdw=[count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted]
03)----SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=1
05)--------RepartitionExec: partitioning=Hash([k@0], 4), input_partitions=4
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------DataSourceExec: partitions=1, partition_sizes=[1]

Expected behavior

Both queries in the test file above should always result in an ordered output.

Additional context

Reproduced with 3d4fdf2 .

This could be related to #15833.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions