-
Notifications
You must be signed in to change notification settings - Fork 1
/
mp.py
204 lines (161 loc) · 5.35 KB
/
mp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# coding-utf8
from multiprocessing import Manager, Value, cpu_count, Pool
from numpy import array, empty, append, poly1d, polyfit, linalg, zeros, intersect1d
from math import sqrt
from math import tanh
import sys
from time import time
from datetime import timedelta
# import python from parent directory like pointed out here:
# https://stackoverflow.com/questions/714063/importing-modules-from-parent-folder
import os
import sys
import inspect
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)
from PyNite import FEModel3D
import pickle
import gc
gc.disable()
def print_data(text):
"""
Print data for debugging
:param text: Needs a text as string (Do not pass as list)
"""
print("Phaenotyp |", text)
# get arguments
directory_blend = sys.argv[1]
path_import = directory_blend + "/Phaenotyp-export_mp.p"
scipy_available = sys.argv[2]
calculation_type = sys.argv[3]
# start timer
start_time = time()
def import_models():
# get models stored as dict with frame as key
file = open(path_import, 'rb')
imported_models = pickle.load(file)
file.close()
return imported_models
# run one single fea and save result into feas (multiprocessing manager dict)
def run_fea_pn(scipy_available, calculation_type, feas, model, frame):
# the variables model, and frame are passed to mp
# this variables can not be returned with multiprocessing
# instead of this a dict with multiprocessing.Manager is created
# the dict feas stores one anlysis for each frame
# the dict fea is created temporarily in run_fea and is wirrten to feas
# analyze the model
# start time
start_time = time()
if scipy_available == "True":
if calculation_type == "first_order":
model.analyze(check_statics=False, sparse=True)
elif calculation_type == "first_order_linear":
model.analyze_linear(check_statics=False, sparse=True)
else:
model.analyze_PDelta(check_stability=False, sparse=True)
if scipy_available == "False":
if calculation_type == "first_order":
model.analyze(check_statics=False, sparse=False)
elif calculation_type == "first_order_linear":
model.analyze_linear(check_statics=False, sparse=False)
else:
model.analyze_PDelta(check_stability=False, sparse=False)
feas[str(frame)] = model
# get duration
elapsed = time() - start_time
text = calculation_type + " calculation for frame " + str(frame) + " done"
text += " | " + str(timedelta(seconds=elapsed))
print_data(text)
sys.stdout.flush()
def run_fea_fd(feas, model, frame):
# based on:
# Oliver Natt
# Physik mit Python
# Simulationen, Visualisierungen und Animationen von Anfang an
# 1. Auflage, Springer Spektrum, 2020
# https://pyph.de/1/1/index.php?name=code&kap=5&pgm=4
# start time
start_time = time()
# amount of dimensions
dim = 3
points_array = model[0]
supports_ids = model[1]
edges_array = model[2]
forces_array = model[3]
# amount of points, edges, supports, verts
n_points_array = points_array.shape[0]
n_edges_array = edges_array.shape[0]
n_supports = len(supports_ids)
n_verts = n_points_array - n_supports
n_equation = n_verts * dim
# create list of indicies
verts_id = list(set(range(n_points_array)) - set(supports_ids))
def vector(vertices, edge):
v_0, v_1 = edges_array[edge]
if vertices == v_0:
vec = points_array[v_1] - points_array[v_0]
else:
vec = points_array[v_0] - points_array[v_1]
return vec / linalg.norm(vec)
# create equation
model = zeros((n_equation, n_equation))
for id, edge in enumerate(edges_array):
for k in intersect1d(edge, verts_id):
n = verts_id.index(k)
model[n * dim:(n + 1) * dim, id] = vector(k, id)
# Löse das Gleichungssystem A @ F = -forces_array nach den Kräften F.
b = -forces_array[verts_id].reshape(-1)
F = linalg.solve(model, b)
# Berechne die äußeren Kräfte.
for id, edge in enumerate(edges_array):
for k in intersect1d(edge, supports_ids):
forces_array[k] -= F[id] * vector(k, id)
feas[str(frame)] = F
# get duration
elapsed = time() - start_time
text = calculation_type + " calculation for frame " + str(frame) + " done"
text += " | " + str(timedelta(seconds=elapsed))
print_data(text)
sys.stdout.flush()
def mp_pool():
global scipy_available
manager = Manager() # needed for mp
feas = manager.dict() # is saving all calculations by frame
cores = cpu_count()
'''
text = "rendering with " + str(cores) + " cores."
print_data(text)
'''
pool = Pool(processes=cores)
# for PyNite
if calculation_type != "force_distribution":
for frame, model in imported_models.items():
pool.apply_async(run_fea_pn, args=(scipy_available, calculation_type, feas, model, frame,))
# for force distribution
else:
for frame, model in imported_models.items():
pool.apply_async(run_fea_fd, args=(feas, model, frame,))
pool.close()
pool.join()
return feas
def export_models():
# export back to blender
path_export = directory_blend + "/Phaenotyp-return_mp.p"
file = open(path_export, 'wb')
pickle.dump(dict(feas), file) # use dict() to convert mp_dict to dict
file.close()
if __name__ == "__main__":
imported_models = import_models()
feas = mp_pool()
export_models()
# give feedback to user
end_time = time()
# print only frames to make progress.http work correctly
'''
elapsed_time = end_time - start_time
text = "time elapsed: " + str(elapsed_time) + " s"
print_data(text)
'''
# exit
sys.exit()