KAFKA-15022: [2/N] introduce graph to compute min cost#13996
KAFKA-15022: [2/N] introduce graph to compute min cost#13996mjsax merged 9 commits intoapache:trunkfrom
Conversation
mjsax
left a comment
There was a problem hiding this comment.
Couple of initial comments.
Task :streams:checkstyleTest
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13996/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:47:30: ',' is not followed by whitespace. [WhitespaceAfter]
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13996/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:49:30: ',' is not followed by whitespace. [WhitespaceAfter]
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13996/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:158:19: Variable 'exception' should be declared final. [FinalLocalVariable]
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13996/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:169:19: Variable 'exception' should be declared final. [FinalLocalVariable]
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13996/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:182:19: Variable 'exception' should be declared final. [FinalLocalVariable]
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13996/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:194:19: Variable 'exception' should be declared final. [FinalLocalVariable]
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13996/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:206:19: Variable 'exception' should be declared final. [FinalLocalVariable]
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13996/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:218:19: Variable 'exception' should be declared final. [FinalLocalVariable]
There was a problem hiding this comment.
compareTo is to establish an order, right? Why do we order by (destination,capacity,cost); does is matter, or could we use any order as long as deterministic?
There was a problem hiding this comment.
Any order should be fine. But as you pointed out. We can actually remove this.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
I'll remove compareTo
There was a problem hiding this comment.
In Kafka, it's common practice to omit the get on getter, so it should just be nodes() (similar for other methods)
There was a problem hiding this comment.
Should this rather go into the constructor of Edge class?
There was a problem hiding this comment.
Why do we use nested SortedMap? Is seems SortedMap<V, List<Edge> would be sufficient?
There was a problem hiding this comment.
It's for getting the edge between two nodes efficiently. e.g. to get edge between node 0 and node 1, we can do adjList.get(0).get(1).
There was a problem hiding this comment.
Why do we implement Comparable? Could not spot the reason why it's required.
There was a problem hiding this comment.
Good catch! I originally used SortedMap<V, SortedSet<Edge> as adjList which need to sort Edge. Later I changed it to SortedMap<V, SortedMap<V, Edge>> and we can remove this.
There was a problem hiding this comment.
This means their can't be a flow in this edge which essentially means these two nodes are disconnected.
There was a problem hiding this comment.
Looking into https://en.wikipedia.org/wiki/Bellman%E2%80%93Ford_algorithm that you linked from the KIP, the algorithms says "go over all edges" -- we need 2 for-loops to this but what is not totally obvious; might be worth to add a coment?
There was a problem hiding this comment.
From https://en.wikipedia.org/wiki/Bellman%E2%80%93Ford_algorithm it says, so it N-1 times, but we do it N times. Why?
There was a problem hiding this comment.
Good question. N-1 times will find shortest path, we need to do it one more time to see if the path can be even shorter which mean there's a negative cycle. That's why there's a check on line 335.
There was a problem hiding this comment.
Can this be final? (cf comment below)
There was a problem hiding this comment.
Why do we need this update? Seems it would not change? Can we remove this line?
There was a problem hiding this comment.
Why do we need this update? Seems it would not change? Can we remove this line?
(We cannot make parentEdge final thought it seems, as it's set to a different value below)
There was a problem hiding this comment.
Actually parentEdge is changing from line 283
There was a problem hiding this comment.
If I read this correctly, we are finding the minimum edge inside the cycle?
There was a problem hiding this comment.
Yeah. If you want to flow through all the edges in the cycle, the maximum you can flow is the the minimum capacity (residualFlow) in all the edges.
There was a problem hiding this comment.
I was first confused about why we don't start with nodeInCycle and do everything is a single loop -- now I understand that we do it to construct an "exit" criteria -- seems worth to add a comment to explain it upfront.
There was a problem hiding this comment.
Not sure if I get this condition. Can you elaborate?
There was a problem hiding this comment.
If there's some flow going in one direction, there should be the same amount of flow going in opposite direction.
In the case of forward edge, the counterEdge.flow is always larger than possibleFlow because forward edge's flow is backward edge's residual flow and possibleFlow is always smaller than or equal to residual flow.
In the case of backward edge, counterEdge.flow can be smaller than possibleFlow because forwardEdge's residual flow is bounded by capacity. Actually in case's counterEdge.flow < possibleFlow, we also need to reset it to 0. I'll fix that.
There was a problem hiding this comment.
OK. After closer look, I think updating backward edge doesn't make much sense. So I added a forwardEdge bool in Edge to indicate whether it's forward edge and only update the flow in forward edge.
There was a problem hiding this comment.
This is to update the original graph edge to the computed flow. Basically iterating all edges in original graph and change the flow to what's in residualGraph's corresponding edge.
mjsax
left a comment
There was a problem hiding this comment.
Couple is smaller comments/questions. Can be addressed by follow up PR (or if you want inside N/3).
| int totalCost = 0; | ||
| for (final Map.Entry<V, SortedMap<V, Edge>> nodeEdges : adjList.entrySet()) { | ||
| final SortedMap<V, Edge> edges = nodeEdges.getValue(); | ||
| for (final Entry<V, Edge> nodeEdge : edges.entrySet()) { |
There was a problem hiding this comment.
nit: we could just iterate over the valueSet ?
| } | ||
|
|
||
| public Edge(final V destination, final int capacity, final int cost, final int residualFlow, final int flow, | ||
| final boolean forwardEdge) { |
There was a problem hiding this comment.
nit: formatting (if it does not fit in one line, we should move each parameter into it's one line to simplify reading)
public Edge(
final V destination,
final int capacity,
final int cost,
final int residualFlow,
final int flow,
final boolean forwardEdge
) {
|
|
||
| final Graph<?>.Edge otherEdge = (Graph<?>.Edge) other; | ||
|
|
||
| return destination.equals(otherEdge.destination) && capacity == otherEdge.capacity |
There was a problem hiding this comment.
nit formatting:
return destination.equals(otherEdge.destination)
&& capacity == otherEdge.capacity
&& cost == otherEdge.cost
&& residualFlow == otherEdge.residualFlow
&& flow == otherEdge.flow
&& forwardEdge == otherEdge.forwardEdge;
|
|
||
| @Override | ||
| public String toString() { | ||
| return "{destination= " + destination + ", capacity=" + capacity + ", cost=" + cost |
| @Override | ||
| public String toString() { | ||
| return "{destination= " + destination + ", capacity=" + capacity + ", cost=" + cost | ||
| + ", residualFlow=" + residualFlow + ", flow=" + flow + ", forwardEdge=" + forwardEdge; |
There was a problem hiding this comment.
missing }.
Should we also switch to one line per parameter we print to simplify reading?
| graph.addEdge(4, 2, 1, 0, 1); | ||
| graph.addEdge(1, 5, 1, 0, 1); | ||
| graph.addEdge(3, 5, 1, 0, 1); | ||
| graph.setSourceNode(4); |
There was a problem hiding this comment.
Would it be simpler to use 0 as source? (At least for my mind it's easier to follow what going on, if numbers are "ordered")
Or to avoid a lot of re-writing, name the source -1 (and the sink 99) so both a clearly different, and we don't need to update too much code.
There was a problem hiding this comment.
Sounds good. Will use -1 and 99 then
| final V destination; | ||
| final int capacity; | ||
| final int cost; | ||
| int residualFlow; |
There was a problem hiding this comment.
Do we actually need this field?
If I read the code right, for a forward edge, it's always capacity - flow and for a backward edge it's always 0. So it seem redundant (and potentially error prone to store it expliclity)? -- Instead we could have a residualFlow() method that compute it on-the-fly (we could also simplify the update logic when modifying flow as we only need to update the flow itself)? Or do I read the update logic inside cancelNegativeCycle incorrectly and those properties are not an invariant?
There was a problem hiding this comment.
for a backward edge it's always 0
residualFlow for backward edge is the flow in forward edge: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java#L178
There was a problem hiding this comment.
Ah. Parameter order... Thought it's , flow, residualFlow, (not sure why you picked the "reverse" order)
| edge = edges.get(1); | ||
| assertEquals(1, edge.flow); | ||
| assertEquals(0, edge.residualFlow); | ||
| } |
There was a problem hiding this comment.
Seems we do not check all conditions? Flow from 0->1 is shifted to 0->3, right?
There was a problem hiding this comment.
0->3 flow is checked on line 269?
| graph1.addEdge(4, 6, 1, 0, 1); | ||
|
|
||
| graph1.setSourceNode(5); | ||
| graph1.setSinkNode(6); |
| edgeList.add(new TestEdge(4, 5, 1, 1, 0)); | ||
|
|
||
| // Test no matter the order of adding edges, min cost flow flows from 0 to 2 and then from 2 to 5 | ||
| for (int i = 0; i < 10; i++) { |
There was a problem hiding this comment.
Why do we need to test this 10 times? Given that the test runs for each PR on nightly builds, it seems sufficient to just run it once?
There was a problem hiding this comment.
Want to test it more if someone trigger this manually. I feel 10 times won't hurt much, but I'm fine switching to 1 if you feel strongly 😄
There was a problem hiding this comment.
Test should run fast. Don't feel strongly about it.
Part of KIP-925. Reviewers: Matthias J. Sax <matthias@confluent.io>
Part of KIP-925. Reviewers: Matthias J. Sax <matthias@confluent.io>
Part of KIP-925. Reviewers: Matthias J. Sax <matthias@confluent.io>
Description
Introduce graph to calculate min-flow if an existing flow already input
Test
Unit test.