Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windowless Joins #61

Merged
merged 2 commits into from
Apr 20, 2023
Merged

Windowless Joins #61

merged 2 commits into from
Apr 20, 2023

Conversation

jacksonrnewhouse
Copy link
Contributor

This changes how we handle joins whose inputs don't contain a preceding window operator. Now each record is joined against all of the existing records on the other side, emitting pairs. This only supports inner joins. I've also made several changes/improvements, including

  • Standardize how Durations are quoted into a single function duration_to_syn_expr().
  • Fix support for impulses with a specified intervalMicros.
  • Add some out-of-orderness to Nexmark.
  • Set the default allowed-lateness for SQL pipelines to 1 second, rather than 0.

@jacksonrnewhouse jacksonrnewhouse requested a review from mwylde April 19, 2023 16:20
@akennedy4155
Copy link
Contributor

Looks like this addresses #65 as well.

let mut key = record.key.clone().unwrap();
let value = record.value.clone();
let records = {
let right_rows = right_state.get_all_values_with_timestamps(&mut key).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_all_values_with_timestamps materializes the data into a vec, which is somewhat wasteful since all we're going to do is iterate over it. Instead it could return an iterator to avoid that extra allocation.

records.push(Record {
timestamp: record.timestamp.max(timestamp),
key: Some(key.clone()),
value: (value.clone(), right_value.clone()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to clone right_value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a pointer at this point.

}
};

let mut left_state: KeyTimeMultiMap<K, T1, _> = ctx.state.get_key_time_multi_map('l').await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to share this logic between left/right?

sql: detect when a join without a preceding window is happening, plan to JoinWithExpiration
worker: introduce JoinWithExpiration, add some out of order to Nexmark.
control plane: Have the NodeScheduler tolerate a node restart.
frontend: Fix impulse source creation to properly create timed sources.
compiler: standardize Duration quoting, fix Impulse code generation, add JoinWithExpiration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants