forked from blondimi/qcover
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcoverability.py
160 lines (121 loc) · 4.96 KB
/
coverability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Copyright 2017 Michael Blondin, Alain Finkel, Christoph Haase, Serge Haddad
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import z3
from petri import load_petrinet, constraint_vector, petrinet_lossy
from cpn import reachable, build_cpn_solver
from solver_utils import set_markings_constrained
from upward_sets import update_upward, in_upward, merge_upward
def sum_norm(v):
return sum(v)
def support_norm(v):
return len([0 for x in v if x > 0])
# Add omegas to places that can take arbitrary values
def _omega_marking(cmp_marking):
x = [0] * len(cmp_marking)
for i in range(len(x)):
comparison, value = cmp_marking[i]
x[i] = value if comparison == '=' else float('inf')
return tuple(x)
def pre_upward(petrinet, markings, precomputed=None, prune=True):
pre_matrix, post_matrix = petrinet
num_places, num_transitions = pre_matrix.shape
basis = set()
for m in markings:
if precomputed is not None and m in precomputed:
to_merge = {pre_m for pre_m in precomputed[m] if not
in_upward(pre_m, markings)}
merge_upward(basis, to_merge)
continue
else:
precomputed[m] = set()
for t in range(num_transitions):
pre_m = [0] * num_places
for p in range(num_places):
pre, post = int(pre_matrix[p, t]), int(post_matrix[p, t])
pre_m[p] = max(pre, m[p] + pre - post)
pre_m = tuple(pre_m)
if precomputed is not None:
update_upward(precomputed[m], pre_m)
if not prune or not in_upward(pre_m, markings):
update_upward(basis, pre_m)
return basis
def check_cpn_coverability(petrinet, init, targets):
petri_cover = petrinet_lossy(petrinet, init)
init_marking = tuple(constraint_vector(init))
has_unknown = False
for target in targets:
target_marking = tuple(constraint_vector(target))
result = reachable(petri_cover, init_marking, target_marking)[0]
if result is None:
has_unknown = True
elif result == True:
return True
return None if has_unknown else False
def check_cpn_coverability_z3(solver, target_vars, markings):
solver.push()
set_markings_constrained(solver, target_vars, [[('>=', p) for p in m]
for m in markings])
result = solver.check()
solver.pop()
if result == z3.sat:
return True
elif result == z3.unsat:
return False
else:
return None
def non_coverable(petrinet, init, targets):
MAX_TARGETS_SINGLE_TEST = 10
if len(targets) <= MAX_TARGETS_SINGLE_TEST:
if check_cpn_coverability(petrinet, init, targets) == False:
return True
else:
solver, _ = build_cpn_solver(petrinet, init, targets, domain='N')
if solver.check() == z3.unsat:
return True
return False
def coverability(petrinet, init, targets, prune=False, max_iter=None):
# Verify if non coverable in CPN first
if prune and non_coverable(petrinet, init, targets):
return False
# Otherwise, proceed with backward coverability
def smallest_elems(x):
return set(sorted(x, key=sum_norm)[:int(10 + 0.2 * len(x))])
solver, variables = build_cpn_solver(petrinet, init, targets=None,
domain='N')
_, _, target_vars = variables
def coverable(markings):
return check_cpn_coverability_z3(solver, target_vars, markings)
init_marking = _omega_marking(init)
basis = {tuple(constraint_vector(m)) for m in targets}
precomputed = {}
covered = in_upward(init_marking, basis)
num_iter = 0
while not covered:
if max_iter is not None and num_iter >= max_iter:
return None # Unknown result
else:
num_iter += 1
# Compute prebasis
prebasis = pre_upward(petrinet, basis, precomputed)
# Coverability pruning
pruned = {x for x in prebasis if prune and not coverable([x])}
prebasis.difference_update(pruned)
for x in pruned:
solver.add(z3.Or([target_vars[p] < x[p] for p in
range(len(x))]))
# Continue?
if len(prebasis) == 0:
break
else:
prebasis = smallest_elems(prebasis)
merge_upward(basis, prebasis)
covered = in_upward(init_marking, basis)
return covered