diff --git a/arroyo-datastream/src/lib.rs b/arroyo-datastream/src/lib.rs index dc90b76c5..18b827b0a 100644 --- a/arroyo-datastream/src/lib.rs +++ b/arroyo-datastream/src/lib.rs @@ -1363,7 +1363,7 @@ impl Program { let gap = duration_to_syn_expr(*gap); quote! { Box::new(SessionWindowFunc::<#in_k, #in_t, #out_t>::new( - #gap, #agg + #agg, #gap )) } } diff --git a/arroyo-sql-testing/src/full_query_tests.rs b/arroyo-sql-testing/src/full_query_tests.rs index cad747101..85e1a4fa5 100644 --- a/arroyo-sql-testing/src/full_query_tests.rs +++ b/arroyo-sql-testing/src/full_query_tests.rs @@ -135,8 +135,7 @@ full_pipeline_codegen! {"cast_bug", "SELECT CAST(1 as FLOAT) from nexmark; "} -/* full_pipeline_codegen! {"session_window", -"SELECT count(*), session(INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS window +full_pipeline_codegen! {"session_window", +"SELECT count(*), session(INTERVAL '10' SECOND) AS window from nexmark group by window, auction.id; "} - */ diff --git a/arroyo-sql/src/lib.rs b/arroyo-sql/src/lib.rs index 9b9c708df..fbcb4e97a 100644 --- a/arroyo-sql/src/lib.rs +++ b/arroyo-sql/src/lib.rs @@ -90,6 +90,16 @@ impl ArroyoSchemaProvider { Arc::new(create_udf( "tumble", vec![DataType::Interval(datatypes::IntervalUnit::MonthDayNano)], + window_return_type.clone(), + Volatility::Volatile, + make_scalar_function(fn_impl), + )), + ); + functions.insert( + "session".to_string(), + Arc::new(create_udf( + "session", + vec![DataType::Interval(datatypes::IntervalUnit::MonthDayNano)], window_return_type, Volatility::Volatile, make_scalar_function(fn_impl), diff --git a/arroyo-sql/src/pipeline.rs b/arroyo-sql/src/pipeline.rs index 666f8eae6..f88dccf44 100644 --- a/arroyo-sql/src/pipeline.rs +++ b/arroyo-sql/src/pipeline.rs @@ -670,7 +670,7 @@ impl<'a> SqlPipelineBuilder<'a> { fn is_window(expression: &Expr) -> bool { match expression { Expr::ScalarUDF(ScalarUDF { fun, args: _ }) => { - matches!(fun.name.as_str(), "hop" | "tumble") + matches!(fun.name.as_str(), "hop" | "tumble" | "session") } Expr::Alias(datafusion_expr::expr::Alias { expr, name: _ }) => Self::is_window(expr), _ => false, @@ -695,6 +695,13 @@ impl<'a> SqlPipelineBuilder<'a> { let width = Self::get_duration(&args[0])?; Ok(Some(WindowType::Tumbling { width })) } + "session" => { + if args.len() != 1 { + unreachable!("wrong number of arguments for session(), expected one"); + } + let gap = Self::get_duration(&args[0])?; + Ok(Some(WindowType::Session { gap })) + } _ => Ok(None), }, Expr::Alias(datafusion_expr::expr::Alias { expr, name: _ }) => Self::find_window(expr), diff --git a/arroyo-state/src/tables.rs b/arroyo-state/src/tables.rs index 0dd4bc77c..7ae62baa3 100644 --- a/arroyo-state/src/tables.rs +++ b/arroyo-state/src/tables.rs @@ -502,10 +502,10 @@ impl<'a, K: Key, V: Data, S: BackingStore> KeyedState<'a, K, V, S> { self.cache.insert(key, wrapped.unwrap()); } - pub async fn remove(&mut self, mut key: K) { + pub async fn remove(&mut self, key: &mut K) { self.cache.remove(&key); self.backing_state - .write_key_value::>(self.table, &mut key, &mut None) + .write_key_value::>(self.table, key, &mut None) .await; } diff --git a/arroyo-types/src/lib.rs b/arroyo-types/src/lib.rs index 5e34b0f72..2289244ef 100644 --- a/arroyo-types/src/lib.rs +++ b/arroyo-types/src/lib.rs @@ -32,7 +32,7 @@ impl Window { } pub fn contains(&self, t: SystemTime) -> bool { - self.start >= t && t < self.end + self.start <= t && t < self.end } } diff --git a/arroyo-worker/src/operators/mod.rs b/arroyo-worker/src/operators/mod.rs index cf71b6bc6..29a3e9fc7 100644 --- a/arroyo-worker/src/operators/mod.rs +++ b/arroyo-worker/src/operators/mod.rs @@ -221,7 +221,10 @@ impl PeriodicWatermarkGenerator { async fn handle_tick(&mut self, _: u64, ctx: &mut Context) { if let Some(idle_time) = self.idle_time { if self.last_event.elapsed().unwrap_or(Duration::ZERO) > idle_time && !self.idle { - info!("Setting partition {} to idle", ctx.task_info.task_index); + info!( + "Setting partition {} to idle after {:?}", + ctx.task_info.task_index, idle_time + ); ctx.broadcast(Message::Watermark(Watermark::Idle)).await; self.idle = true; } diff --git a/arroyo-worker/src/operators/updating_aggregate.rs b/arroyo-worker/src/operators/updating_aggregate.rs index 51fcf73dc..bff0d24f3 100644 --- a/arroyo-worker/src/operators/updating_aggregate.rs +++ b/arroyo-worker/src/operators/updating_aggregate.rs @@ -130,7 +130,7 @@ impl UpdatingAggregateOperator { - aggregating_map.remove(mut_key).await; + aggregating_map.remove(&mut mut_key).await; } StateOp::Update { new, old: _ } => { aggregating_map.insert(record.timestamp, mut_key, new).await; diff --git a/arroyo-worker/src/operators/windows.rs b/arroyo-worker/src/operators/windows.rs index 82a591744..46196eed8 100644 --- a/arroyo-worker/src/operators/windows.rs +++ b/arroyo-worker/src/operators/windows.rs @@ -205,7 +205,7 @@ impl SessionWindowFunc { "SessionWindow".to_string() } - pub async fn new(operation: WindowOperation, gap_size: Duration) -> Self { + pub fn new(operation: WindowOperation, gap_size: Duration) -> Self { Self { operation, gap_size, @@ -299,10 +299,10 @@ impl SessionWindowFunc { }; if let Some(remove) = remove { - let _: Option = ctx.cancel_timer(&mut key, remove).await; + let _: Option = ctx.cancel_timer(&mut key, remove).await; } if let Some(add) = add { - // we use UNIX_EPOCH as the start of our windows to aovid having to update them when we extend + // we use UNIX_EPOCH as the start of our windows to avoid having to update them when we extend // the beginning; this works because windows are always handled in order ctx.schedule_timer(&mut key, add, Window::new(SystemTime::UNIX_EPOCH, add)) .await; @@ -326,6 +326,33 @@ impl SessionWindowFunc { } async fn handle_timer(&mut self, mut key: K, window: Window, ctx: &mut Context) { + println!("Handling timer for {:?}, {:?}", key, window); + let window = { + // get the actual window (as the timer one doesn't have the actual start time) + let mut t: KeyedState<'_, K, Vec, _> = ctx.state.get_key_state('s').await; + let mut windows: Vec = t + .get(&key) + .map(|t| t.iter().map(|w| *w).collect()) + .expect("there must be a window for this key in state"); + + let window = *windows + .iter() + .find(|w| w.end == window.end) + .expect("this window must be in state"); + + windows.retain(|w| w.end != window.end); + + if windows.is_empty() { + t.remove(&mut key).await; + } else { + let key = key.clone(); + t.insert(windows.iter().map(|w| w.end).max().unwrap(), key, windows) + .await; + } + + window + }; + self.operation.operate(&mut key, window, 'w', ctx).await; // clear this window and everything before it -- we're guaranteed that windows are executed in order and are @@ -335,20 +362,5 @@ impl SessionWindowFunc { state .clear_time_range(&mut key, window.start, window.end) .await; - - let mut t: KeyedState<'_, K, Vec, _> = ctx.state.get_key_state('s').await; - let mut windows: Vec = t - .get(&key) - .map(|t| t.iter().map(|w| *w).collect()) - .expect("there must be a window for this key in state"); - - windows.retain(|w| w.end != window.end); - - if windows.is_empty() { - t.remove(key).await; - } else { - t.insert(windows.iter().map(|w| w.end).max().unwrap(), key, windows) - .await; - } } }