Skip to content

Commit

Permalink
fix(hydroflow): fix scheduler spinning on replay, fix hydro-project#961
Browse files Browse the repository at this point in the history
… (hydro-project#1171)

fixes the added tests failing in the previous commit
  • Loading branch information
MingweiSamuel committed Apr 24, 2024
1 parent 0f0af5b commit d44ffbd
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ digraph {
n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n7v1 [label="(n7v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n9v1 [label="(n9v1) identity()", shape=invhouse, fillcolor="#88aaff"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n4v1 -> n5v1
n3v1 -> n6v1
n2v1 -> n7v1
n1v1 -> n8v1
n6v1 -> n4v1 [color=red]
n6v1 -> n9v1
n7v1 -> n3v1
n8v1 -> n2v1 [color=red]
n9v1 -> n10v1
n10v1 -> n4v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
Expand All @@ -45,5 +49,11 @@ digraph {
n4v1
n5v1
}
subgraph "cluster n5v1" {
fillcolor="#dddddd"
style=filled
label = "sg_5v1\nstratum 2"
n9v1
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ linkStyle default stroke:#aaa
6v1["(6v1) <code>handoff</code>"]:::otherClass
7v1["(7v1) <code>handoff</code>"]:::otherClass
8v1["(8v1) <code>handoff</code>"]:::otherClass
9v1[\"(9v1) <code>identity()</code>"/]:::pullClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
4v1-->5v1
3v1-->6v1
2v1-->7v1
1v1-->8v1
6v1--o4v1; linkStyle 4 stroke:red
6v1-->9v1
7v1-->3v1
8v1--x2v1; linkStyle 6 stroke:red
9v1-->10v1
10v1--o4v1; linkStyle 8 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
end
Expand All @@ -36,4 +40,7 @@ subgraph sg_4v1 ["sg_4v1 stratum 0"]
4v1
5v1
end
subgraph sg_5v1 ["sg_5v1 stratum 2"]
9v1
end

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ digraph {
n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n11v1 [label="(n11v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n12v1 [label="(n12v1) identity()", shape=invhouse, fillcolor="#88aaff"]
n13v1 [label="(n13v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n1v1 -> n2v1
n6v1 -> n7v1
n5v1 -> n6v1
Expand All @@ -24,9 +26,11 @@ digraph {
n2v1 -> n3v1
n7v1 -> n8v1
n7v1 -> n11v1
n9v1 -> n5v1 [color=red]
n9v1 -> n12v1
n10v1 -> n4v1 [color=red]
n11v1 -> n2v1
n12v1 -> n13v1
n13v1 -> n5v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
Expand Down Expand Up @@ -68,5 +72,11 @@ digraph {
n7v1
}
}
subgraph "cluster n4v1" {
fillcolor="#dddddd"
style=filled
label = "sg_4v1\nstratum 2"
n12v1
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ linkStyle default stroke:#aaa
9v1["(9v1) <code>handoff</code>"]:::otherClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
11v1["(11v1) <code>handoff</code>"]:::otherClass
12v1[\"(12v1) <code>identity()</code>"/]:::pullClass
13v1["(13v1) <code>handoff</code>"]:::otherClass
1v1-->2v1
6v1-->7v1
5v1-->6v1
Expand All @@ -27,9 +29,11 @@ linkStyle default stroke:#aaa
2v1-->3v1
7v1-->8v1
7v1-->11v1
9v1--o5v1; linkStyle 8 stroke:red
9v1-->12v1
10v1--x4v1; linkStyle 9 stroke:red
11v1-->2v1
12v1-->13v1
13v1--o5v1; linkStyle 12 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
2v1
Expand Down Expand Up @@ -58,4 +62,7 @@ subgraph sg_3v1 ["sg_3v1 stratum 0"]
7v1
end
end
subgraph sg_4v1 ["sg_4v1 stratum 2"]
12v1
end

11 changes: 5 additions & 6 deletions hydroflow_lang/src/graph/flat_to_partitioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,11 @@ fn find_subgraph_strata(
let dst_stratum = partitioned_graph.subgraph_stratum(dst_sg);
match delay_type {
DelayType::Tick | DelayType::TickLazy => {
// If tick edge goes forward in stratum, need to buffer.
let is_lazy = matches!(delay_type, DelayType::TickLazy);
// If tick edge goes foreward in stratum, need to buffer.
// (TODO(mingwei): could use a different kind of handoff.)
if src_stratum <= dst_stratum {
// Or if lazy, need to create extra subgraph to mark as lazy.
if src_stratum <= dst_stratum || is_lazy {
// We inject a new subgraph between the src/dst which runs as the last stratum
// of the tick and therefore delays the data until the next tick.

Expand Down Expand Up @@ -445,10 +447,7 @@ fn find_subgraph_strata(
partitioned_graph.set_subgraph_stratum(new_subgraph_id, extra_stratum);

// Assign laziness.
partitioned_graph.set_subgraph_laziness(
new_subgraph_id,
matches!(delay_type, DelayType::TickLazy),
);
partitioned_graph.set_subgraph_laziness(new_subgraph_id, is_lazy);
}
}
DelayType::Stratum => {
Expand Down

0 comments on commit d44ffbd

Please sign in to comment.