Skip to content

Commit

Permalink
fix!: fold takes initial value by closure rather than by value (#948)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: first argument of fold() is now a closure instead of a value
  • Loading branch information
zzlk authored Oct 18, 2023
1 parent 35b1e9e commit 3136e0f
Show file tree
Hide file tree
Showing 30 changed files with 63 additions and 59 deletions.
2 changes: 1 addition & 1 deletion benches/benches/micro_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ fn ops(c: &mut Criterion) {
#[allow(clippy::unnecessary_fold)]
{
hydroflow_syntax! {
source_iter(black_box(input0)) -> fold::<'tick>(0, |accum: &mut _, elem| { *accum += elem }) -> for_each(|x| { black_box(x); });
source_iter(black_box(input0)) -> fold::<'tick>(|| 0, |accum: &mut _, elem| { *accum += elem }) -> for_each(|x| { black_box(x); });
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/pn_counter/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn main() {

let df = hydroflow_syntax! {
next_state = union()
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| {
-> fold::<'static>(|| (HashMap::<u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| {
if context.current_tick() != *last_tick {
modified_tweets.clear();
}
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/pn_counter_delta/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn main() {

let df = hydroflow_syntax! {
next_state = union()
-> fold::<'static>((HashMap::<u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| {
-> fold::<'static>(|| (HashMap::<u64, Rc<RefCell<(Vec<u32>, Vec<u32>)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| {
if context.current_tick() != *last_tick {
modified_tweets.clear();
}
Expand Down
16 changes: 8 additions & 8 deletions hydro_cli_examples/examples/topolotree/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn main() {
from_parent = source_stream(from_parent)
-> map(|x| deserialize_from_bytes::<Vec<(u64, TimestampedValue<i32>)>>(x.unwrap()).unwrap())
-> fold::<'static>(
(HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|| (HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue<i32>>, HashSet<_>, _), req: Vec<(u64, TimestampedValue<i32>)>| {
if *prev_tick != context.current_tick() {
modified_tweets.clear();
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn main() {
from_left = source_stream(from_left)
-> map(|x| deserialize_from_bytes::<Vec<(u64, TimestampedValue<i32>)>>(x.unwrap()).unwrap())
-> fold::<'static>(
(HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|| (HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue<i32>>, HashSet<_>, _), req: Vec<(u64, TimestampedValue<i32>)>| {
if *prev_tick != context.current_tick() {
modified_tweets.clear();
Expand Down Expand Up @@ -175,7 +175,7 @@ async fn main() {
from_right = source_stream(from_right)
-> map(|x| deserialize_from_bytes::<Vec<(u64, TimestampedValue<i32>)>>(x.unwrap()).unwrap())
-> fold::<'static>(
(HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|| (HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue<i32>>, HashSet<_>, _), req: Vec<(u64, TimestampedValue<i32>)>| {
if *prev_tick != context.current_tick() {
modified_tweets.clear();
Expand Down Expand Up @@ -205,7 +205,7 @@ async fn main() {
-> map(|x| deserialize_from_bytes::<IncrementRequest>(&x.unwrap()).unwrap())
-> map(|x| (x.tweet_id, x.likes))
-> fold::<'static>(
(HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|| (HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue<i32>>, HashSet<_>, usize), req: UpdateType| {
if *prev_tick != context.current_tick() {
modified_tweets.clear();
Expand All @@ -228,7 +228,7 @@ async fn main() {

to_right
-> fold::<'static>(
(vec![HashMap::<u64, TimestampedValue<i32>>::new(); 3], HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|| (vec![HashMap::<u64, TimestampedValue<i32>>::new(); 3], HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec<HashMap<u64, TimestampedValue<i32>>>, HashMap<_, TimestampedValue<i32>>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| {
if *prev_tick != context.current_tick() {
modified_tweets.clear();
Expand Down Expand Up @@ -257,7 +257,7 @@ async fn main() {

to_left
-> fold::<'static>(
(vec![HashMap::<u64, TimestampedValue<i32>>::new(); 3], HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|| (vec![HashMap::<u64, TimestampedValue<i32>>::new(); 3], HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec<HashMap<u64, TimestampedValue<i32>>>, HashMap<_, TimestampedValue<i32>>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| {
if *prev_tick != context.current_tick() {
modified_tweets.clear();
Expand Down Expand Up @@ -286,7 +286,7 @@ async fn main() {

to_parent
-> fold::<'static>(
(vec![HashMap::<u64, TimestampedValue<i32>>::new(); 3], HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|| (vec![HashMap::<u64, TimestampedValue<i32>>::new(); 3], HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec<HashMap<u64, TimestampedValue<i32>>>, HashMap<_, TimestampedValue<i32>>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| {
if *prev_tick != context.current_tick() {
modified_tweets.clear();
Expand Down Expand Up @@ -316,7 +316,7 @@ async fn main() {

to_query
-> fold::<'static>(
(vec![HashMap::<u64, TimestampedValue<i32>>::new(); 4], HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|| (vec![HashMap::<u64, TimestampedValue<i32>>::new(); 4], HashMap::<u64, TimestampedValue<i32>>::new(), HashSet::new(), 0),
|(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec<HashMap<u64, TimestampedValue<i32>>>, HashMap<_, TimestampedValue<i32>>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| {
if *prev_tick != context.current_tick() {
modified_tweets.clear();
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/deadlock_detector/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub(crate) async fn run_detector(opts: Opts, peer_list: Vec<String>) {
new_edges -> [0]edges;

// gossip all edges
edges[1] -> fold::<'static>(Message::new(), |m: &mut Message, edge| {
edges[1] -> fold::<'static>(Message::new, |m: &mut Message, edge| {
m.edges.insert(edge);
}) -> gossip;

Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/lamport_clock/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts
// given the inbound packet, bump the Lamport clock and merge this in
inbound_chan[merge] -> map(|(msg, _sender): (EchoMsg, SocketAddr)| msg.lamport_clock) -> [net]mergevc;
mergevc = union() -> fold::<'static>(
bot,
|| bot,
|old: &mut Max<usize>, lamport_clock: Max<usize>| {
let bump = Max::new(old.into_reveal() + 1);
old.merge(bump);
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/lamport_clock/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
// merge in the msg vc to the local vc
inbound_chan[merge] -> map(|(msg, _addr): (EchoMsg, SocketAddr)| msg.lamport_clock) -> mergevc;
mergevc = fold::<'static>(
bot,
|| bot,
|old: &mut Max<usize>, lamport_clock: Max<usize>| {
let bump = Max::new(old.into_reveal() + 1);
old.merge(bump);
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/two_pc/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) async fn run_coordinator(outbound: UdpSink, inbound: UdpStream, opts:
-> fold_keyed::<'static, u16, u32>(|| 0, |acc: &mut _, val| *acc += val);

// count subordinates
subord_total = subords[0] -> fold::<'tick>(0, |a: &mut _, _b| *a += 1); // -> for_each(|n| println!("There are {} subordinates.", n));
subord_total = subords[0] -> fold::<'tick>(|| 0, |a: &mut _, _b| *a += 1); // -> for_each(|n| println!("There are {} subordinates.", n));

// If commit_votes for this xid is the same as all_votes, send a P2 Commit message
committed = join() -> map(|(_c, (xid, ()))| xid);
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/vector_clock/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(crate) async fn run_client(

// given the inbound packet, bump the local clock and merge this in
inbound_chan[merge] -> map(|(msg, _sender): (EchoMsg, SocketAddr)| msg.vc) -> [net]mergevc;
mergevc = union() -> fold::<'static> (VecClock::default(), |old: &mut VecClock, vc| {
mergevc = union() -> fold::<'static> (VecClock::default, |old: &mut VecClock, vc| {
let my_addr = format!("{:?}", addr);
let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.as_reveal_mut().entry(my_addr).or_insert(Max::new(0)).into_reveal() + 1)));
old.merge(bump);
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/vector_clock/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: crat

// merge in the msg vc to the local vc
inbound_chan[merge] -> map(|(msg, _addr): (EchoMsg, SocketAddr)| msg.vc) -> mergevc;
mergevc = fold::<'static> (VecClock::default(), |old: &mut VecClock, vc| {
mergevc = fold::<'static> (VecClock::default, |old: &mut VecClock, vc| {
let my_addr = format!("{:?}", opts.addr.unwrap());
let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.as_reveal_mut().entry(my_addr).or_insert(Max::new(0)).into_reveal() + 1)));
old.merge(bump);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) fold::<'tick>(Vec::new(), Vec::push)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) fold::<'tick>(Vec::new, Vec::push)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) flat_map(|mut vec| {\l vec.sort();\l vec\l})\l", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) for_each(|v| print!(\"{:?}, \", v))", shape=house, fillcolor="#ffff88"]
n5v1 [label="(n5v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_stream(items_recv)</code>"/]:::pullClass
2v1[\"(2v1) <code>fold::&lt;'tick&gt;(Vec::new(), Vec::push)</code>"/]:::pullClass
2v1[\"(2v1) <code>fold::&lt;'tick&gt;(Vec::new, Vec::push)</code>"/]:::pullClass
3v1[\"<div style=text-align:center>(3v1)</div> <code>flat_map(|mut vec| {<br> vec.sort();<br> vec<br>})</code>"/]:::pullClass
4v1[/"(4v1) <code>for_each(|v| print!(&quot;{:?}, &quot;, v))</code>"\]:::pushClass
5v1["(5v1) <code>handoff</code>"]:::otherClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) fold::<\l 'static,\l>(\l Vec::new(),\l |old: &mut Vec<u32>, mut x: Vec<u32>| {\l old.append(&mut x);\l },\l)\l", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) fold::<\l 'static,\l>(\l Vec::new,\l |old: &mut Vec<u32>, mut x: Vec<u32>| {\l old.append(&mut x);\l },\l)\l", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"]
n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n3v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_stream(items_recv)</code>"/]:::pullClass
2v1[\"<div style=text-align:center>(2v1)</div> <code>fold::&lt;<br> 'static,<br>&gt;(<br> Vec::new(),<br> |old: &amp;mut Vec&lt;u32&gt;, mut x: Vec&lt;u32&gt;| {<br> old.append(&amp;mut x);<br> },<br>)</code>"/]:::pullClass
2v1[\"<div style=text-align:center>(2v1)</div> <code>fold::&lt;<br> 'static,<br>&gt;(<br> Vec::new,<br> |old: &amp;mut Vec&lt;u32&gt;, mut x: Vec&lt;u32&gt;| {<br> old.append(&amp;mut x);<br> },<br>)</code>"/]:::pullClass
3v1[/"(3v1) <code>for_each(|v| result_send.send(v).unwrap())</code>"\]:::pushClass
4v1["(4v1) <code>handoff</code>"]:::otherClass
2v1-->3v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) fold::<\l 'tick,\l>(\l Vec::new(),\l |old: &mut Vec<u32>, mut x: Vec<u32>| {\l old.append(&mut x);\l },\l)\l", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) fold::<\l 'tick,\l>(\l Vec::new,\l |old: &mut Vec<u32>, mut x: Vec<u32>| {\l old.append(&mut x);\l },\l)\l", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"]
n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n3v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_stream(items_recv)</code>"/]:::pullClass
2v1[\"<div style=text-align:center>(2v1)</div> <code>fold::&lt;<br> 'tick,<br>&gt;(<br> Vec::new(),<br> |old: &amp;mut Vec&lt;u32&gt;, mut x: Vec&lt;u32&gt;| {<br> old.append(&amp;mut x);<br> },<br>)</code>"/]:::pullClass
2v1[\"<div style=text-align:center>(2v1)</div> <code>fold::&lt;<br> 'tick,<br>&gt;(<br> Vec::new,<br> |old: &amp;mut Vec&lt;u32&gt;, mut x: Vec&lt;u32&gt;| {<br> old.append(&amp;mut x);<br> },<br>)</code>"/]:::pullClass
3v1[/"(3v1) <code>for_each(|v| result_send.send(v).unwrap())</code>"\]:::pushClass
4v1["(4v1) <code>handoff</code>"]:::otherClass
2v1-->3v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ digraph {
n1v1 [label="(n1v1) source_iter([1])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) fold(0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n4v1 -> n5v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([1])</code>"/]:::pullClass
2v1[\"(2v1) <code>persist()</code>"/]:::pullClass
3v1[\"(3v1) <code>persist()</code>"/]:::pullClass
4v1[\"(4v1) <code>fold(0, |a: &amp;mut _, b| *a += b)</code>"/]:::pullClass
4v1[\"(4v1) <code>fold(|| 0, |a: &amp;mut _, b| *a += b)</code>"/]:::pullClass
5v1[/"(5v1) <code>for_each(|x| result_send.send(x).unwrap())</code>"\]:::pushClass
6v1["(6v1) <code>handoff</code>"]:::otherClass
4v1-->5v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ digraph {
n5v1 [label="(n5v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) null()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) union()", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) fold(0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n9v1 [label="(n9v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n4v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ linkStyle default stroke:#aaa
5v1[\"(5v1) <code>persist()</code>"/]:::pullClass
6v1[\"(6v1) <code>null()</code>"/]:::pullClass
7v1[\"(7v1) <code>union()</code>"/]:::pullClass
8v1[\"(8v1) <code>fold(0, |a: &amp;mut _, b| *a += b)</code>"/]:::pullClass
8v1[\"(8v1) <code>fold(|| 0, |a: &amp;mut _, b| *a += b)</code>"/]:::pullClass
9v1[/"(9v1) <code>for_each(|x| result_send.send(x).unwrap())</code>"\]:::pushClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
2v1-->4v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ digraph {
n5v1 [label="(n5v1) persist()", shape=house, fillcolor="#ffff88"]
n6v1 [label="(n6v1) tee()", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) null()", shape=house, fillcolor="#ffff88"]
n8v1 [label="(n8v1) fold(0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n9v1 [label="(n9v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n3v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ linkStyle default stroke:#aaa
5v1[/"(5v1) <code>persist()</code>"\]:::pushClass
6v1[/"(6v1) <code>tee()</code>"\]:::pushClass
7v1[/"(7v1) <code>null()</code>"\]:::pushClass
8v1[\"(8v1) <code>fold(0, |a: &amp;mut _, b| *a += b)</code>"/]:::pullClass
8v1[\"(8v1) <code>fold(|| 0, |a: &amp;mut _, b| *a += b)</code>"/]:::pullClass
9v1[/"(9v1) <code>for_each(|x| result_send.send(x).unwrap())</code>"\]:::pushClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
2v1-->3v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ digraph {
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_stream(persist_input)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) fold::<'tick>(0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) fold::<'tick>(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) next_stratum()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) source_stream(other_input)", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) cross_join::<'tick, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
Expand Down
Loading

0 comments on commit 3136e0f

Please sign in to comment.