Skip to content

Commit

Permalink
Working session windows
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Aug 11, 2023
1 parent d350b1c commit fe61f80
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 28 deletions.
2 changes: 1 addition & 1 deletion arroyo-datastream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1363,7 +1363,7 @@ impl Program {
let gap = duration_to_syn_expr(*gap);
quote! {
Box::new(SessionWindowFunc::<#in_k, #in_t, #out_t>::new(
#gap, #agg
#agg, #gap
))
}
}
Expand Down
5 changes: 2 additions & 3 deletions arroyo-sql-testing/src/full_query_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ full_pipeline_codegen! {"cast_bug",
"SELECT CAST(1 as FLOAT)
from nexmark; "}

/* full_pipeline_codegen! {"session_window",
"SELECT count(*), session(INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS window
full_pipeline_codegen! {"session_window",
"SELECT count(*), session(INTERVAL '10' SECOND) AS window
from nexmark
group by window, auction.id; "}
*/
10 changes: 10 additions & 0 deletions arroyo-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ impl ArroyoSchemaProvider {
Arc::new(create_udf(
"tumble",
vec![DataType::Interval(datatypes::IntervalUnit::MonthDayNano)],
window_return_type.clone(),
Volatility::Volatile,
make_scalar_function(fn_impl),
)),
);
functions.insert(
"session".to_string(),
Arc::new(create_udf(
"session",
vec![DataType::Interval(datatypes::IntervalUnit::MonthDayNano)],
window_return_type,
Volatility::Volatile,
make_scalar_function(fn_impl),
Expand Down
9 changes: 8 additions & 1 deletion arroyo-sql/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ impl<'a> SqlPipelineBuilder<'a> {
fn is_window(expression: &Expr) -> bool {
match expression {
Expr::ScalarUDF(ScalarUDF { fun, args: _ }) => {
matches!(fun.name.as_str(), "hop" | "tumble")
matches!(fun.name.as_str(), "hop" | "tumble" | "session")
}
Expr::Alias(datafusion_expr::expr::Alias { expr, name: _ }) => Self::is_window(expr),
_ => false,
Expand All @@ -695,6 +695,13 @@ impl<'a> SqlPipelineBuilder<'a> {
let width = Self::get_duration(&args[0])?;
Ok(Some(WindowType::Tumbling { width }))
}
"session" => {
if args.len() != 1 {
unreachable!("wrong number of arguments for session(), expected one");
}
let gap = Self::get_duration(&args[0])?;
Ok(Some(WindowType::Session { gap }))
}
_ => Ok(None),
},
Expr::Alias(datafusion_expr::expr::Alias { expr, name: _ }) => Self::find_window(expr),
Expand Down
4 changes: 2 additions & 2 deletions arroyo-state/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,10 @@ impl<'a, K: Key, V: Data, S: BackingStore> KeyedState<'a, K, V, S> {
self.cache.insert(key, wrapped.unwrap());
}

pub async fn remove(&mut self, mut key: K) {
pub async fn remove(&mut self, key: &mut K) {
self.cache.remove(&key);
self.backing_state
.write_key_value::<K, Option<V>>(self.table, &mut key, &mut None)
.write_key_value::<K, Option<V>>(self.table, key, &mut None)
.await;
}

Expand Down
2 changes: 1 addition & 1 deletion arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Window {
}

pub fn contains(&self, t: SystemTime) -> bool {
self.start >= t && t < self.end
self.start <= t && t < self.end
}
}

Expand Down
5 changes: 4 additions & 1 deletion arroyo-worker/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ impl<K: Key, D: Data> PeriodicWatermarkGenerator<K, D> {
async fn handle_tick(&mut self, _: u64, ctx: &mut Context<K, D>) {
if let Some(idle_time) = self.idle_time {
if self.last_event.elapsed().unwrap_or(Duration::ZERO) > idle_time && !self.idle {
info!("Setting partition {} to idle", ctx.task_info.task_index);
info!(
"Setting partition {} to idle after {:?}",
ctx.task_info.task_index, idle_time
);
ctx.broadcast(Message::Watermark(Watermark::Idle)).await;
self.idle = true;
}
Expand Down
2 changes: 1 addition & 1 deletion arroyo-worker/src/operators/updating_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<K: Key, T: Data, BinA: Data, OutT: Data> UpdatingAggregateOperator<K, T, Bi
.await;
}
StateOp::Delete => {
aggregating_map.remove(mut_key).await;
aggregating_map.remove(&mut mut_key).await;
}
StateOp::Update { new, old: _ } => {
aggregating_map.insert(record.timestamp, mut_key, new).await;
Expand Down
48 changes: 30 additions & 18 deletions arroyo-worker/src/operators/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl<K: Key, T: Data, OutT: Data> SessionWindowFunc<K, T, OutT> {
"SessionWindow".to_string()
}

pub async fn new(operation: WindowOperation<K, T, OutT>, gap_size: Duration) -> Self {
pub fn new(operation: WindowOperation<K, T, OutT>, gap_size: Duration) -> Self {
Self {
operation,
gap_size,
Expand Down Expand Up @@ -299,10 +299,10 @@ impl<K: Key, T: Data, OutT: Data> SessionWindowFunc<K, T, OutT> {
};

if let Some(remove) = remove {
let _: Option<SystemTime> = ctx.cancel_timer(&mut key, remove).await;
let _: Option<Window> = ctx.cancel_timer(&mut key, remove).await;
}
if let Some(add) = add {
// we use UNIX_EPOCH as the start of our windows to aovid having to update them when we extend
// we use UNIX_EPOCH as the start of our windows to avoid having to update them when we extend
// the beginning; this works because windows are always handled in order
ctx.schedule_timer(&mut key, add, Window::new(SystemTime::UNIX_EPOCH, add))
.await;
Expand All @@ -326,6 +326,33 @@ impl<K: Key, T: Data, OutT: Data> SessionWindowFunc<K, T, OutT> {
}

async fn handle_timer(&mut self, mut key: K, window: Window, ctx: &mut Context<K, OutT>) {
println!("Handling timer for {:?}, {:?}", key, window);
let window = {
// get the actual window (as the timer one doesn't have the actual start time)
let mut t: KeyedState<'_, K, Vec<Window>, _> = ctx.state.get_key_state('s').await;
let mut windows: Vec<Window> = t
.get(&key)
.map(|t| t.iter().map(|w| *w).collect())
.expect("there must be a window for this key in state");

let window = *windows
.iter()
.find(|w| w.end == window.end)
.expect("this window must be in state");

windows.retain(|w| w.end != window.end);

if windows.is_empty() {
t.remove(&mut key).await;
} else {
let key = key.clone();
t.insert(windows.iter().map(|w| w.end).max().unwrap(), key, windows)
.await;
}

window
};

self.operation.operate(&mut key, window, 'w', ctx).await;

// clear this window and everything before it -- we're guaranteed that windows are executed in order and are
Expand All @@ -335,20 +362,5 @@ impl<K: Key, T: Data, OutT: Data> SessionWindowFunc<K, T, OutT> {
state
.clear_time_range(&mut key, window.start, window.end)
.await;

let mut t: KeyedState<'_, K, Vec<Window>, _> = ctx.state.get_key_state('s').await;
let mut windows: Vec<Window> = t
.get(&key)
.map(|t| t.iter().map(|w| *w).collect())
.expect("there must be a window for this key in state");

windows.retain(|w| w.end != window.end);

if windows.is_empty() {
t.remove(key).await;
} else {
t.insert(windows.iter().map(|w| w.end).max().unwrap(), key, windows)
.await;
}
}
}

0 comments on commit fe61f80

Please sign in to comment.