-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathmincostflow.py
128 lines (116 loc) · 3.97 KB
/
mincostflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from typing import NamedTuple, Optional, List, Tuple, cast
from heapq import heappush, heappop
class MCFGraph:
class Edge(NamedTuple):
src: int
dst: int
cap: int
flow: int
cost: int
class _Edge:
def __init__(self, dst: int, cap: int, cost: int) -> None:
self.dst = dst
self.cap = cap
self.cost = cost
self.rev: Optional[MCFGraph._Edge] = None
def __init__(self, n: int) -> None:
self._n = n
self._g: List[List[MCFGraph._Edge]] = [[] for _ in range(n)]
self._edges: List[MCFGraph._Edge] = []
def add_edge(self, src: int, dst: int, cap: int, cost: int) -> int:
assert 0 <= src < self._n
assert 0 <= dst < self._n
assert 0 <= cap
m = len(self._edges)
e = MCFGraph._Edge(dst, cap, cost)
re = MCFGraph._Edge(src, 0, -cost)
e.rev = re
re.rev = e
self._g[src].append(e)
self._g[dst].append(re)
self._edges.append(e)
return m
def get_edge(self, i: int) -> Edge:
assert 0 <= i < len(self._edges)
e = self._edges[i]
re = cast(MCFGraph._Edge, e.rev)
return MCFGraph.Edge(
re.dst,
e.dst,
e.cap + re.cap,
re.cap,
e.cost
)
def edges(self) -> List[Edge]:
return [self.get_edge(i) for i in range(len(self._edges))]
def flow(self, s: int, t: int,
flow_limit: Optional[int] = None) -> Tuple[int, int]:
return self.slope(s, t, flow_limit)[-1]
def slope(self, s: int, t: int,
flow_limit: Optional[int] = None) -> List[Tuple[int, int]]:
assert 0 <= s < self._n
assert 0 <= t < self._n
assert s != t
if flow_limit is None:
flow_limit = cast(int, sum(e.cap for e in self._g[s]))
dual = [0] * self._n
prev: List[Optional[Tuple[int, MCFGraph._Edge]]] = [None] * self._n
def refine_dual() -> bool:
pq = [(0, s)]
visited = [False] * self._n
dist: List[Optional[int]] = [None] * self._n
dist[s] = 0
while pq:
dist_v, v = heappop(pq)
if visited[v]:
continue
visited[v] = True
if v == t:
break
dual_v = dual[v]
for e in self._g[v]:
w = e.dst
if visited[w] or e.cap == 0:
continue
reduced_cost = e.cost - dual[w] + dual_v
new_dist = dist_v + reduced_cost
dist_w = dist[w]
if dist_w is None or new_dist < dist_w:
dist[w] = new_dist
prev[w] = v, e
heappush(pq, (new_dist, w))
else:
return False
dist_t = dist[t]
for v in range(self._n):
if visited[v]:
dual[v] -= cast(int, dist_t) - cast(int, dist[v])
return True
flow = 0
cost = 0
prev_cost_per_flow: Optional[int] = None
result = [(flow, cost)]
while flow < flow_limit:
if not refine_dual():
break
f = flow_limit - flow
v = t
while prev[v] is not None:
u, e = cast(Tuple[int, MCFGraph._Edge], prev[v])
f = min(f, e.cap)
v = u
v = t
while prev[v] is not None:
u, e = cast(Tuple[int, MCFGraph._Edge], prev[v])
e.cap -= f
assert e.rev is not None
e.rev.cap += f
v = u
c = -dual[s]
flow += f
cost += f * c
if c == prev_cost_per_flow:
result.pop()
result.append((flow, cost))
prev_cost_per_flow = c
return result