diff --git a/hydroflow/tests/surface_multiset_delta.rs b/hydroflow/tests/surface_multiset_delta.rs index 8049d1a914d0..12813a5017d4 100644 --- a/hydroflow/tests/surface_multiset_delta.rs +++ b/hydroflow/tests/surface_multiset_delta.rs @@ -4,8 +4,8 @@ use multiplatform_test::multiplatform_test; #[multiplatform_test] pub fn test_multiset_delta() { - let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); - let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::(); + let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); + let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::(); let mut flow = hydroflow::hydroflow_syntax! { source_stream(input_recv) @@ -14,19 +14,31 @@ pub fn test_multiset_delta() { }; assert_graphvis_snapshots!(flow); - input_send.send(3).unwrap(); - input_send.send(4).unwrap(); - input_send.send(3).unwrap(); + input_send.send('a').unwrap(); + input_send.send('b').unwrap(); + input_send.send('a').unwrap(); flow.run_tick(); - assert_eq!(&[3, 4, 3], &*collect_ready::, _>(&mut result_recv)); + // 'a', 'b', 'a' + assert_eq!(&['a', 'b', 'a'], &*collect_ready::, _>(&mut result_recv)); - input_send.send(3).unwrap(); - input_send.send(5).unwrap(); - input_send.send(3).unwrap(); - input_send.send(3).unwrap(); + input_send.send('a').unwrap(); + input_send.send('c').unwrap(); + input_send.send('a').unwrap(); + input_send.send('a').unwrap(); flow.run_tick(); - // First two "3"s are removed due to previous tick. - assert_eq!(&[5, 3], &*collect_ready::, _>(&mut result_recv)); + // 'c', 'a' + // First two 'a's are removed due to previous tick. + assert_eq!(&['c', 'a'], &*collect_ready::, _>(&mut result_recv)); + + input_send.send('b').unwrap(); + input_send.send('c').unwrap(); + input_send.send('a').unwrap(); + input_send.send('a').unwrap(); + input_send.send('a').unwrap(); + input_send.send('a').unwrap(); + flow.run_tick(); + // 3 'a's and the 'c' are removed due to previous tick. + assert_eq!(&['b', 'a'], &*collect_ready::, _>(&mut result_recv)); } #[multiplatform_test] diff --git a/hydroflow_lang/src/graph/ops/multiset_delta.rs b/hydroflow_lang/src/graph/ops/multiset_delta.rs index 2f38926c5b06..4b076ef15791 100644 --- a/hydroflow_lang/src/graph/ops/multiset_delta.rs +++ b/hydroflow_lang/src/graph/ops/multiset_delta.rs @@ -15,7 +15,8 @@ use super::{ /// /// This operator does a similar inversion but for multiset semantics, with some caveats. When it /// receives duplicate items, instead of ignoring them, it "subtracts" them from the items received -/// in the previous tick, and only releases new items if there are more than in the previous tick. +/// in the previous tick: i.e. if we received `k` copies of an item in the previous tick, and we +/// receive `l > k` copies in the current tick, we output `l - k` copies of the item. /// However unlike `unique`, this count is only maintained for the previous tick, not over all time. /// /// In the example below, in the second tick two 'a's are removed because two 'a's were received in @@ -42,6 +43,16 @@ use super::{ /// flow.run_tick(); /// // 'c', 'a' /// // First two 'a's are removed due to previous tick. +/// +/// input_send.send('b').unwrap(); +/// input_send.send('c').unwrap(); +/// input_send.send('a').unwrap(); +/// input_send.send('a').unwrap(); +/// input_send.send('a').unwrap(); +/// input_send.send('a').unwrap(); +/// flow.run_tick(); +/// // 'b', 'a' +/// // 3 'a's and the 'c' are removed due to previous tick. /// ``` pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints { name: "multiset_delta",