Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamingWindowExec to DataFusion physical plan to support aggregations over unbounded data #11366

Open
ameyc opened this issue Jul 9, 2024 · 4 comments
Labels
enhancement New feature or request

Comments

@ameyc
Copy link
Contributor

ameyc commented Jul 9, 2024

Is your feature request related to a problem or challenge?

Currently DataFusion somewhat supports computations over Unbounded data, with SymmetricHashJoinExec being able to join unbounded streams of data. However ability to aggregate over unbounded data seems to be missing in project as yet. In stream processing, aggregating over windows of streaming data is a common concept. A Streaming Window however behaves more like an AggregationExec than a WindowExec, we have a POC StreamingWindowExec built off a fork of DataFusion 39.0.

We would like to collaborate with the community to upstream this operator as well improve the design.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@ameyc ameyc added the enhancement New feature or request label Jul 9, 2024
@mustafasrepo
Copy link
Contributor

Actually existing AggregateExec supports streaming. It has a member input_order_mode which is following enum:

pub enum InputOrderMode {
    /// There is no partial permutation of the expressions satisfying the
    /// existing ordering.
    Linear,
    /// There is a partial permutation of the expressions satisfying the
    /// existing ordering. Indices describing the longest partial permutation
    /// are stored in the vector.
    PartiallySorted(Vec<usize>),
    /// There is a (full) permutation of the expressions satisfying the
    /// existing ordering.
    Sorted,
}

When mode is either Sorted or PartiallySorted (decided by planner according to ordering at the input) operation is streamable. However, to trigger this modes at least one of the group by expression should be ordered.

@ozankabak
Copy link
Contributor

To add to the comment above -- BoundedWindowAggExec also supports streaming (when possible).

@ameyc
Copy link
Contributor Author

ameyc commented Jul 11, 2024

@mustafasrepo & @ozankabak thanks for the feedback. the target usecases we were going for are flink style workloads, with data read from kafka that is generally not be ordered and thus needs to be watermarked. we tried the vanilla aggregates and ran into PipelineBreaking panics.

An example workload we're trying to compute is of the nature, lmk if this can already be expressed with current operators as is --

    let windowed_df = df
        .clone()
        .streaming_window(
            vec![],
            vec![
                max(col("imu_measurement").field("gps").field("speed")),
                min(col("imu_measurement").field("gps").field("altitude")),
                count(col("imu_measurement")).alias("count"),
            ],
            Duration::from_millis(5_000), // 5 second window
            Some(Duration::from_millis(1_000)), // 1 second slide
        )
        .unwrap();

@ozankabak
Copy link
Contributor

To help as best as I can, let me first reiterate my understanding of your use case: You have a streaming source, which has some columns like speed and altitude, but your ts column may be out-of-order and is not monotonic. Or, maybe you don't have such a column at all. Still, you want to do streaming computations based on processing time on such data.

In such a case, what you can do is to use a projection to add an order defining column based on processing time, and use BoundedWindowAggExec to do streaming/incremental windowing based on that. So basically two built-in operators compose together to give you what you want (or I should say, what I think you want 🙂 ). Let me know if this helps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants