diff --git a/src/untimed_monitoring_combinators.rs b/src/untimed_monitoring_combinators.rs index facf851..06ffc80 100644 --- a/src/untimed_monitoring_combinators.rs +++ b/src/untimed_monitoring_combinators.rs @@ -1,9 +1,10 @@ -use crate::core::Value; use crate::core::StreamData; +use crate::core::Value; use crate::{ lola_expression, MonitoringSemantics, OutputStream, StreamContext, UntimedLolaSemantics, VarName, }; +use async_stream::stream; use core::panic; use futures::{ stream::{self, BoxStream}, @@ -14,12 +15,13 @@ use tokio::join; use winnow::Parser; pub trait CloneFn1: -Fn(T) -> S + Clone + Sync + Send + 'static -{} -impl CloneFn1 for T -where - T: Fn(S) -> R + Sync + Send + Clone + 'static, -{} + Fn(T) -> S + Clone + Sync + Send + 'static +{ +} +impl CloneFn1 for T where + T: Fn(S) -> R + Sync + Send + Clone + 'static +{ +} pub fn lift1( f: impl CloneFn1, @@ -31,12 +33,13 @@ pub fn lift1( } pub trait CloneFn2: -Fn(S, R) -> U + Clone + Sync + Send + 'static -{} -impl CloneFn2 for T -where - T: Fn(S, R) -> U + Clone + Sync + Send + 'static, -{} + Fn(S, R) -> U + Clone + Sync + Send + 'static +{ +} +impl CloneFn2 for T where + T: Fn(S, R) -> U + Clone + Sync + Send + 'static +{ +} pub fn lift2( f: impl CloneFn2, @@ -48,12 +51,13 @@ pub fn lift2( } pub trait CloneFn3: -Fn(S, R, U) -> V + Clone + Sync + Send + 'static -{} -impl CloneFn3 for T -where - T: Fn(S, R, U) -> V + Clone + Sync + Send + 'static, -{} + Fn(S, R, U) -> V + Clone + Sync + Send + 'static +{ +} +impl CloneFn3 for T where + T: Fn(S, R, U) -> V + Clone + Sync + Send + 'static +{ +} pub fn lift3( f: impl CloneFn3, @@ -71,62 +75,35 @@ pub fn lift3( ) as BoxStream<'static, U> } -pub fn and( - x: OutputStream, - y: OutputStream, -) -> OutputStream { +pub fn and(x: OutputStream, y: OutputStream) -> OutputStream { lift2( - |x, y| { - Value::Bool( - x == Value::Bool(true) && y == Value::Bool(true), - ) - }, + |x, y| Value::Bool(x == Value::Bool(true) && y == Value::Bool(true)), x, y, ) } -pub fn or( - x: OutputStream, - y: OutputStream, -) -> OutputStream { +pub fn or(x: OutputStream, y: OutputStream) -> OutputStream { lift2( - |x, y| { - Value::Bool( - x == Value::Bool(true) || y == Value::Bool(true), - ) - }, + |x, y| Value::Bool(x == Value::Bool(true) || y == Value::Bool(true)), x, y, ) } pub fn not(x: OutputStream) -> OutputStream { - lift1( - |x| Value::Bool(x == Value::Bool(true)), - x, - ) + lift1(|x| Value::Bool(x == Value::Bool(true)), x) } -pub fn eq( - x: OutputStream, - y: OutputStream, -) -> OutputStream { +pub fn eq(x: OutputStream, y: OutputStream) -> OutputStream { lift2(|x, y| Value::Bool(x == y), x, y) } -pub fn le( - x: OutputStream, - y: OutputStream, -) -> OutputStream { +pub fn le(x: OutputStream, y: OutputStream) -> OutputStream { lift2( |x, y| match (x, y) { - (Value::Int(x), Value::Int(y)) => { - Value::Bool(x <= y) - } - (Value::Bool(a), Value::Bool(b)) => { - Value::Bool(a <= b) - } + (Value::Int(x), Value::Int(y)) => Value::Bool(x <= y), + (Value::Bool(a), Value::Bool(b)) => Value::Bool(a <= b), _ => panic!("Invalid comparison"), }, x, @@ -156,11 +133,7 @@ pub fn if_stm( ) } -pub fn index( - x: OutputStream, - i: isize, - c: Value, -) -> OutputStream { +pub fn index(x: OutputStream, i: isize, c: Value) -> OutputStream { let c = c.clone(); if i < 0 { let n = i.abs() as usize; @@ -172,15 +145,10 @@ pub fn index( } } -pub fn plus( - x: OutputStream, - y: OutputStream, -) -> OutputStream { +pub fn plus(x: OutputStream, y: OutputStream) -> OutputStream { lift2( |x, y| match (x, y) { - (Value::Int(x), Value::Int(y)) => { - Value::Int(x + y) - } + (Value::Int(x), Value::Int(y)) => Value::Int(x + y), (x, y) => panic!("Invalid addition with types: {:?}, {:?}", x, y), }, x, @@ -188,15 +156,10 @@ pub fn plus( ) } -pub fn minus( - x: OutputStream, - y: OutputStream, -) -> OutputStream { +pub fn minus(x: OutputStream, y: OutputStream) -> OutputStream { lift2( |x, y| match (x, y) { - (Value::Int(x), Value::Int(y)) => { - Value::Int(x - y) - } + (Value::Int(x), Value::Int(y)) => Value::Int(x - y), _ => panic!("Invalid subtraction"), }, x, @@ -204,15 +167,10 @@ pub fn minus( ) } -pub fn mult( - x: OutputStream, - y: OutputStream, -) -> OutputStream { +pub fn mult(x: OutputStream, y: OutputStream) -> OutputStream { lift2( |x, y| match (x, y) { - (Value::Int(x), Value::Int(y)) => { - Value::Int(x * y) - } + (Value::Int(x), Value::Int(y)) => Value::Int(x * y), _ => panic!("Invalid multiplication"), }, x, @@ -220,15 +178,10 @@ pub fn mult( ) } -pub fn div( - x: OutputStream, - y: OutputStream, -) -> OutputStream { +pub fn div(x: OutputStream, y: OutputStream) -> OutputStream { lift2( |x, y| match (x, y) { - (Value::Int(x), Value::Int(y)) => { - Value::Int(x / y) - } + (Value::Int(x), Value::Int(y)) => Value::Int(x / y), _ => panic!("Invalid multiplication"), }, x, @@ -236,10 +189,7 @@ pub fn div( ) } -pub fn concat( - x: OutputStream, - y: OutputStream, -) -> OutputStream { +pub fn concat(x: OutputStream, y: OutputStream) -> OutputStream { lift2( |x, y| match (x, y) { (Value::Str(x), Value::Str(y)) => { @@ -262,11 +212,7 @@ pub fn eval( let subcontext = ctx.subcontext(history_length); /*unfold() creates a Stream from a seed value.*/ Box::pin(stream::unfold( - ( - subcontext, - x, - None::<(Value, OutputStream)>, - ), + (subcontext, x, None::<(Value, OutputStream)>), |(subcontext, mut x, last)| async move { /* x.next() returns None if we are done unfolding. Return in that case.*/ let current = x.next().await?; @@ -304,10 +250,7 @@ pub fn eval( subcontext.advance(); let eval_res = es.next().await?; // println!("eval producing {:?}", eval_res); - return Some(( - eval_res, - (subcontext, x, Some((Value::Str(s), es))), - )); + return Some((eval_res, (subcontext, x, Some((Value::Str(s), es))))); } x => { unimplemented!("Invalid eval type {:?}", x) @@ -317,10 +260,7 @@ pub fn eval( )) as OutputStream } -pub fn var( - ctx: &dyn StreamContext, - x: VarName, -) -> OutputStream { +pub fn var(ctx: &dyn StreamContext, x: VarName) -> OutputStream { match ctx.var(&x) { Some(x) => x, None => { @@ -357,10 +297,7 @@ pub fn defer( let mut es = UntimedLolaSemantics::to_async_stream(expr, subcontext.deref()); let eval_res = es.next().await?; subcontext.advance(); - return Some(( - eval_res, - (subcontext, x, Some(Value::Str(defer_s))), - )); + return Some((eval_res, (subcontext, x, Some(Value::Str(defer_s))))); } Value::Unknown => { // Consume a sample from the subcontext but return Unknown (aka. Waiting) @@ -375,92 +312,68 @@ pub fn defer( // Update for a synchronized language - in this case UntimedLolaSemantics. // We use Unknown for simulating no data on the stream -pub fn update( - x: OutputStream, - y: OutputStream, -) -> OutputStream { - // Pre is x isn't ready yet - enum Phase { - Pre, - Sync, - Post, - } - use Phase::*; - // Pre phase means that x is not ready yet - // Note: Returns the three values uses by unfold below - async fn handle_pre_phase( - mut x: OutputStream, - mut y: OutputStream, - ) -> Option<( - Value, - ( - OutputStream, - OutputStream, - Phase, - ), - )> { - match x.next().await { - Some(x_val) if x_val != Value::Unknown => { - let y_val = y.next().await?; +pub fn update(mut x: OutputStream, mut y: OutputStream) -> OutputStream { + return Box::pin(stream! { + let mut skip_sync = false; + + // Pre phase + println!("Pre phase"); + loop { + let x_val = x.next().await; + match x_val { + Some(x_val) => { + if x_val == Value::Unknown { + // Keep yielding from x until we get a value + yield x_val; + } else { + // We are done with the pre phase as x is ready + match y.next().await { + Some(Value::Unknown) => { + // Move to sync phase + yield x_val; + break; + } + Some(y_val) => { + // Move to post phase + yield y_val; + skip_sync = true; + break; + } + None => return + } + } + } + None => return + } + } + + // Sync phase + if !skip_sync { + // If y_val is unknown start the syncing phase where we return x + // until y is ready + while let (Some(x_val), Some(y_val)) = join!(x.next(), y.next()) { match y_val { // If y_val is unknown go into syncing phase - Value::Unknown => Some((x_val, (x, y, Sync))), + Value::Unknown => { + yield x_val; + } // Otherwise go directly to post - y_val => Some((y_val, (x, y, Post))), + y_val => { + yield y_val; + break; + } } } - Some(Value::Unknown) => Some((Value::Unknown, (x, y, Pre))), - _ => None, } - } - // Sync phase is x is ready but y isn't - async fn handle_sync_phase( - mut x: OutputStream, - mut y: OutputStream, - ) -> Option<( - Value, - ( - OutputStream, - OutputStream, - Phase, - ), - )> { - let (x_next, y_next) = join!(x.next(), y.next()); - match (x_next, y_next) { - // y is still unknown - yield x: - (Some(x_val), Some(Value::Unknown)) => Some((x_val, (x, y, Sync))), - // first time y is known - yield y: - (_, Some(y_val)) => Some((y_val, (x, y, Post))), - // End of stream - _ => None, + + // Post phase + println!("Post phase"); + while let Some(y_val) = y.next().await { + yield y_val; } - } - // Post phase is y was ready - so care about x - async fn handle_post_phase( - x: OutputStream, - mut y: OutputStream, - ) -> Option<( - Value, - ( - OutputStream, - OutputStream, - Phase, - ), - )> { - let y_val = y.next().await?; - Some((y_val, (x, y, Post))) - } - // Unfold while keeping track of phase - Box::pin(futures::stream::unfold( - (x, y, Pre), - move |(x, y, phase)| async move { - match phase { - Pre => handle_pre_phase(x, y).await, - Sync => handle_sync_phase(x, y).await, - Post => handle_post_phase(x, y).await, - } - }, - )) + + return; + }); } mod tests { @@ -493,7 +406,7 @@ mod tests { } impl FromIterator<(VarName, Vec)> for VarMap { - fn from_iter)>>(iter: I) -> Self { + fn from_iter)>>(iter: I) -> Self { let mut map = VarMap(BTreeMap::new()); for (key, vec) in iter { map.insert(key, Mutex::new(vec)); @@ -549,11 +462,9 @@ mod tests { #[tokio::test] async fn test_plus() { - let x: OutputStream = Box::pin(stream::iter( - vec![Value::Int(1), 3.into()].into_iter(), - )); - let y: OutputStream = - Box::pin(stream::iter(vec![2.into(), 4.into()].into_iter())); + let x: OutputStream = + Box::pin(stream::iter(vec![Value::Int(1), 3.into()].into_iter())); + let y: OutputStream = Box::pin(stream::iter(vec![2.into(), 4.into()].into_iter())); let z: Vec = vec![3.into(), 7.into()]; let res: Vec = plus(x, y).collect().await; assert_eq!(res, z); @@ -561,10 +472,8 @@ mod tests { #[tokio::test] async fn test_str_concat() { - let x: OutputStream = - Box::pin(stream::iter(vec!["hello ".into(), "olleh ".into()])); - let y: OutputStream = - Box::pin(stream::iter(vec!["world".into(), "dlrow".into()])); + let x: OutputStream = Box::pin(stream::iter(vec!["hello ".into(), "olleh ".into()])); + let y: OutputStream = Box::pin(stream::iter(vec!["world".into(), "dlrow".into()])); let exp = vec!["hello world".into(), "olleh dlrow".into()]; let res: Vec = concat(x, y).collect().await; assert_eq!(res, exp) @@ -572,8 +481,7 @@ mod tests { #[tokio::test] async fn test_eval() { - let e: OutputStream = - Box::pin(stream::iter(vec!["x + 1".into(), "x + 2".into()])); + let e: OutputStream = Box::pin(stream::iter(vec!["x + 1".into(), "x + 2".into()])); let map: VarMap = vec![(VarName("x".into()), vec![1.into(), 2.into()]).into()] .into_iter() .collect(); @@ -586,8 +494,7 @@ mod tests { #[tokio::test] async fn test_eval_x_squared() { // This test is interesting since we use x twice in the eval strings - let e: OutputStream = - Box::pin(stream::iter(vec!["x * x".into(), "x * x".into()])); + let e: OutputStream = Box::pin(stream::iter(vec!["x * x".into(), "x * x".into()])); let map: VarMap = vec![(VarName("x".into()), vec![2.into(), 3.into()]).into()] .into_iter() .collect(); @@ -600,8 +507,7 @@ mod tests { #[tokio::test] async fn test_defer() { // Notice that even though we first say "x + 1", "x + 2", it continues evaluating "x + 1" - let e: OutputStream = - Box::pin(stream::iter(vec!["x + 1".into(), "x + 2".into()])); + let e: OutputStream = Box::pin(stream::iter(vec!["x + 1".into(), "x + 2".into()])); let map: VarMap = vec![(VarName("x".into()), vec![1.into(), 2.into()]).into()] .into_iter() .collect(); @@ -627,17 +533,12 @@ mod tests { #[tokio::test] async fn test_defer_unknown() { // Using unknown to represent no data on the stream - let e: OutputStream = Box::pin(stream::iter(vec![ - Value::Unknown, - "x + 1".into(), - ])); + let e: OutputStream = Box::pin(stream::iter(vec![Value::Unknown, "x + 1".into()])); let map: VarMap = vec![(VarName("x".into()), vec![2.into(), 3.into()]).into()] .into_iter() .collect(); let ctx = MockContext { xs: map }; - let res = defer(&ctx, e, 10) - .collect::>() - .await; + let res = defer(&ctx, e, 10).collect::>().await; let exp: Vec = vec![Value::Unknown, 4.into()]; assert_eq!(res, exp) } @@ -654,19 +555,15 @@ mod tests { .into_iter() .collect(); let ctx = MockContext { xs: map }; - let res = defer(&ctx, e, 10) - .collect::>() - .await; + let res = defer(&ctx, e, 10).collect::>().await; let exp: Vec = vec![Value::Unknown, 4.into(), 5.into()]; assert_eq!(res, exp) } #[tokio::test] async fn test_update_both_init() { - let x: OutputStream = - Box::pin(stream::iter(vec!["x0".into(), "x1".into()])); - let y: OutputStream = - Box::pin(stream::iter(vec!["y0".into(), "y1".into()])); + let x: OutputStream = Box::pin(stream::iter(vec!["x0".into(), "x1".into()])); + let y: OutputStream = Box::pin(stream::iter(vec!["y0".into(), "y1".into()])); let res: Vec = update(x, y).collect().await; let exp: Vec = vec!["y0".into(), "y1".into()]; assert_eq!(res, exp) @@ -687,12 +584,7 @@ mod tests { "y3".into(), ])); let res: Vec = update(x, y).collect().await; - let exp: Vec = vec![ - "x0".into(), - "y1".into(), - Value::Unknown, - "y3".into(), - ]; + let exp: Vec = vec!["x0".into(), "y1".into(), Value::Unknown, "y3".into()]; assert_eq!(res, exp) }