Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session windows #243

Merged
merged 11 commits into from
Aug 14, 2023
27 changes: 26 additions & 1 deletion arroyo-datastream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -109,6 +109,7 @@ pub enum WindowType {
Tumbling { width: Duration },
Sliding { width: Duration, slide: Duration },
Instant,
Session { gap: Duration },
}

fn format_duration(duration: Duration) -> String {
@@ -146,6 +147,9 @@ impl Debug for WindowType {
Self::Instant => {
write!(f, "InstantWindow")
}
Self::Session { gap } => {
write!(f, "SessionWindow({})", format_duration(*gap))
}
}
}
}
@@ -1324,7 +1328,7 @@ impl Program {
};

quote! {
WindowOperation::#operation(|mut arg: Vec<_>| {
WindowOperation::#operation(|key: &#in_k, window: arroyo_types::Window, mut arg: Vec<_>| {
#expr
})
}
@@ -1355,6 +1359,14 @@ impl Program {
instant_window(#agg))
}
}
WindowType::Session { gap } => {
let gap = duration_to_syn_expr(*gap);
quote! {
Box::new(SessionWindowFunc::<#in_k, #in_t, #out_t>::new(
#agg, #gap
))
}
}
}
}
Operator::Watermark(watermark) => {
@@ -1432,6 +1444,9 @@ impl Program {
instant_window())
}
}
WindowType::Session { .. } => {
unimplemented!("Session windows are not supported in joins")
}
}
}
Operator::Count => {
@@ -2098,6 +2113,11 @@ impl From<WindowType> for GrpcApi::window::Window {
WindowType::Instant => {
GrpcApi::window::Window::InstantWindow(GrpcApi::InstantWindow {})
}
WindowType::Session { gap } => {
GrpcApi::window::Window::SessionWindow(GrpcApi::SessionWindow {
gap_micros: gap.as_micros() as u64,
})
}
}
}
}
@@ -2386,6 +2406,11 @@ impl From<arroyo_rpc::grpc::api::Window> for WindowType {
}
}
Some(arroyo_rpc::grpc::api::window::Window::InstantWindow(_)) => WindowType::Instant,
Some(arroyo_rpc::grpc::api::window::Window::SessionWindow(session)) => {
WindowType::Session {
gap: Duration::from_micros(session.gap_micros),
}
}
None => todo!(),
}
}
5 changes: 5 additions & 0 deletions arroyo-rpc/proto/api.proto
Original file line number Diff line number Diff line change
@@ -194,6 +194,7 @@ message Window {
SlidingWindow sliding_window = 2;
TumblingWindow tumbling_window = 3;
InstantWindow instant_window = 4;
SessionWindow session_window = 5;
}
}

@@ -207,6 +208,10 @@ message TumblingWindow {
}
message InstantWindow {}

message SessionWindow {
uint64 gap_micros = 1;
}

enum Aggregator {
NONE = 0;
COUNT_AGGREGATE = 1;
5 changes: 5 additions & 0 deletions arroyo-sql-testing/src/full_query_tests.rs
Original file line number Diff line number Diff line change
@@ -129,3 +129,8 @@ INSERT INTO Bids select bid.auction, bid.bidder, bid.price , bid.datetime FROM n
full_pipeline_codegen! {"cast_bug",
"SELECT CAST(1 as FLOAT)
from nexmark; "}

full_pipeline_codegen! {"session_window",
"SELECT count(*), session(INTERVAL '10' SECOND) AS window
from nexmark
group by window, auction.id; "}
10 changes: 10 additions & 0 deletions arroyo-sql/src/lib.rs
Original file line number Diff line number Diff line change
@@ -90,6 +90,16 @@ impl ArroyoSchemaProvider {
Arc::new(create_udf(
"tumble",
vec![DataType::Interval(datatypes::IntervalUnit::MonthDayNano)],
window_return_type.clone(),
Volatility::Volatile,
make_scalar_function(fn_impl),
)),
);
functions.insert(
"session".to_string(),
Arc::new(create_udf(
"session",
vec![DataType::Interval(datatypes::IntervalUnit::MonthDayNano)],
window_return_type,
Volatility::Volatile,
make_scalar_function(fn_impl),
Loading